[New] Added new ServiceExecutionHandler

The ServiceExecutionHandler is used to execute long running services
without needing singletons
This commit is contained in:
Robert von Burg 2015-02-21 00:32:55 +01:00
parent f46e72bbdb
commit a7de76933b
2 changed files with 214 additions and 0 deletions

View File

@ -0,0 +1,142 @@
package li.strolch.service.executor;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import li.strolch.agent.api.ComponentContainer;
import li.strolch.agent.api.StrolchComponent;
import li.strolch.exception.StrolchException;
import li.strolch.runtime.configuration.ComponentConfiguration;
import li.strolch.service.api.Service;
import li.strolch.service.api.ServiceArgument;
import li.strolch.service.api.ServiceHandler;
import li.strolch.service.api.ServiceResult;
import ch.eitchnet.privilege.model.Certificate;
/**
* The {@link ServiceExecutionHandler} is used to perform long running services so that no singletons etc. are required.
*
* @author Robert von Burg <eitch@eitchnet.ch>
*/
public class ServiceExecutionHandler extends StrolchComponent {
private Map<String, ServiceExecutionStatus> serviceContextMap;
private BlockingQueue<ServiceContext<? extends ServiceArgument, ? extends ServiceResult>> queue;
private Thread thread;
private volatile boolean interrupted;
public ServiceExecutionHandler(ComponentContainer container, String componentName) {
super(container, componentName);
}
@Override
public void initialize(ComponentConfiguration configuration) {
this.serviceContextMap = Collections.synchronizedMap(new HashMap<>());
this.queue = new LinkedBlockingQueue<>();
this.thread = new Thread(new Runnable() {
@Override
public void run() {
try {
while (!interrupted) {
doService(queue.take());
}
} catch (InterruptedException ex) {
logger.error(ex.getLocalizedMessage());
}
}
}, "ServiceExecutor");
this.thread.setDaemon(true);
super.initialize(configuration);
}
private <T extends ServiceArgument, U extends ServiceResult> void doService(ServiceContext<T, U> svcCtx) {
if (this.interrupted)
return;
String serviceName = svcCtx.service.getClass().getName();
ServiceExecutionStatus status = this.serviceContextMap.get(serviceName);
status.started();
ServiceHandler svcHandler = getContainer().getComponent(ServiceHandler.class);
U svcResult = svcHandler.doService(svcCtx.certificate, svcCtx.service, svcCtx.argument);
status.setResult(svcResult);
}
@Override
public void start() {
this.thread.start();
super.start();
}
@Override
public void stop() {
if (this.thread != null) {
this.thread.interrupt();
try {
this.thread.join(2000l);
} catch (InterruptedException e) {
logger.error(e.getMessage());
}
}
super.stop();
}
@Override
public void destroy() {
this.thread = null;
super.destroy();
}
public ServiceExecutionStatus getStatus(Class<?> clazz) {
ServiceExecutionStatus status = this.serviceContextMap.get(clazz.getName());
if (status == null)
return new ServiceExecutionStatus(clazz.getName());
return status;
}
public <T extends ServiceArgument, U extends ServiceResult> ServiceExecutionStatus doService(
Certificate certificate, Service<T, U> service, T argument) {
String serviceName = service.getClass().getName();
if (this.serviceContextMap.containsKey(serviceName)) {
ServiceExecutionStatus serviceExecutionStatus = this.serviceContextMap.get(serviceName);
if (!serviceExecutionStatus.isDone()) {
throw new StrolchException("A service with name " + serviceName + " is already running!");
}
}
ServiceContext<T, U> svcCtx = new ServiceContext<T, U>(certificate, service, argument);
try {
ServiceExecutionStatus status = new ServiceExecutionStatus(serviceName);
this.serviceContextMap.put(serviceName, status);
this.queue.put(svcCtx);
Thread.sleep(20l);
return status;
} catch (InterruptedException e) {
this.serviceContextMap.remove(serviceName);
throw new StrolchException("Failed to register service context: " + e.getMessage(), e);
}
}
public class ServiceContext<T extends ServiceArgument, U extends ServiceResult> {
private Certificate certificate;
private Service<T, U> service;
private T argument;
public ServiceContext(Certificate certificate, Service<T, U> service, T argument) {
this.certificate = certificate;
this.service = service;
this.argument = argument;
}
}
}

View File

@ -0,0 +1,72 @@
package li.strolch.service.executor;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlRootElement;
import li.strolch.service.api.ServiceResult;
import ch.eitchnet.utils.helper.StringHelper;
/**
* @author Robert von Burg <eitch@eitchnet.ch>
*/
@XmlRootElement(name = "ServiceExecutionStatus")
@XmlAccessorType(XmlAccessType.NONE)
public class ServiceExecutionStatus {
private String serviceName;
private volatile boolean started;
private volatile ServiceResult result;
public ServiceExecutionStatus() {
// no arg constructor for JAXB
}
public ServiceExecutionStatus(String serviceName) {
this.serviceName = serviceName;
}
@XmlAttribute(name = "serviceName")
public String getServiceName() {
return serviceName;
}
public synchronized ServiceResult getResult() {
return result;
}
public synchronized void setResult(ServiceResult svcResult) {
this.result = svcResult;
}
@XmlAttribute(name = "resultMessage")
public String getResultMessage() {
if (this.result == null)
return StringHelper.DASH;
if (this.result.getMessage() == null)
return StringHelper.DASH;
return this.result.getMessage();
}
@XmlAttribute(name = "resultState")
public String getResultState() {
if (this.result == null)
return StringHelper.DASH;
return this.result.getState().name();
}
@XmlAttribute(name = "done")
public boolean isDone() {
return this.result != null;
}
@XmlAttribute(name = "started")
public synchronized boolean isStarted() {
return started;
}
public synchronized void started() {
this.started = true;
}
}