[New] EventBasedExecutionHandler can now restart in-exec Activity
This commit is contained in:
parent
cfe8664ddc
commit
c1522fc1e7
|
@ -17,13 +17,13 @@ import li.strolch.handler.operationslog.LogMessage;
|
||||||
import li.strolch.handler.operationslog.LogSeverity;
|
import li.strolch.handler.operationslog.LogSeverity;
|
||||||
import li.strolch.handler.operationslog.OperationsLog;
|
import li.strolch.handler.operationslog.OperationsLog;
|
||||||
import li.strolch.model.Locator;
|
import li.strolch.model.Locator;
|
||||||
|
import li.strolch.model.State;
|
||||||
import li.strolch.model.activity.Action;
|
import li.strolch.model.activity.Action;
|
||||||
import li.strolch.model.activity.Activity;
|
import li.strolch.model.activity.Activity;
|
||||||
import li.strolch.model.activity.IActivityElement;
|
import li.strolch.model.activity.IActivityElement;
|
||||||
import li.strolch.model.policy.PolicyDef;
|
import li.strolch.model.policy.PolicyDef;
|
||||||
import li.strolch.persistence.api.StrolchTransaction;
|
import li.strolch.persistence.api.StrolchTransaction;
|
||||||
import li.strolch.policy.PolicyHandler;
|
import li.strolch.policy.PolicyHandler;
|
||||||
import li.strolch.privilege.model.Certificate;
|
|
||||||
import li.strolch.privilege.model.PrivilegeContext;
|
import li.strolch.privilege.model.PrivilegeContext;
|
||||||
import li.strolch.runtime.ThreadPoolFactory;
|
import li.strolch.runtime.ThreadPoolFactory;
|
||||||
import li.strolch.runtime.configuration.ComponentConfiguration;
|
import li.strolch.runtime.configuration.ComponentConfiguration;
|
||||||
|
@ -39,6 +39,7 @@ import li.strolch.utils.dbc.DBC;
|
||||||
public class EventBasedExecutionHandler extends ExecutionHandler {
|
public class EventBasedExecutionHandler extends ExecutionHandler {
|
||||||
|
|
||||||
private static final String KEY_DEFAULT_ACTIVITY_ARCHIVAL = "key:DefaultActivityArchival";
|
private static final String KEY_DEFAULT_ACTIVITY_ARCHIVAL = "key:DefaultActivityArchival";
|
||||||
|
private static final String PROP_RESTART_EXECUTION = "restartExecution";
|
||||||
|
|
||||||
private ExecutorService executorService;
|
private ExecutorService executorService;
|
||||||
|
|
||||||
|
@ -64,6 +65,16 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
|
||||||
this.executorService = Executors.newCachedThreadPool(new ThreadPoolFactory("ExecutionHandler"));
|
this.executorService = Executors.newCachedThreadPool(new ThreadPoolFactory("ExecutionHandler"));
|
||||||
this.delayedExecutionTimer = new SimpleDurationExecutionTimer();
|
this.delayedExecutionTimer = new SimpleDurationExecutionTimer();
|
||||||
|
|
||||||
|
// restart execution of activities already in execution
|
||||||
|
if (!getConfiguration().getBoolean(PROP_RESTART_EXECUTION, Boolean.FALSE)) {
|
||||||
|
logger.info("Not restarting execution of activities.");
|
||||||
|
} else {
|
||||||
|
logger.info("Restarting execution of activities.");
|
||||||
|
runAsAgent(ctx -> {
|
||||||
|
restartActivityExecution(ctx);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
super.start();
|
super.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,6 +115,47 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void restartActivityExecution(PrivilegeContext ctx) {
|
||||||
|
|
||||||
|
// iterate the realms
|
||||||
|
for (String realmName : getContainer().getRealmNames()) {
|
||||||
|
|
||||||
|
// open a TX for each realm
|
||||||
|
try (StrolchTransaction tx = openTx(realmName, ctx.getCertificate())) {
|
||||||
|
|
||||||
|
// iterate all activities
|
||||||
|
for (Activity activity : tx.getActivityMap().getAllElements(tx)) {
|
||||||
|
|
||||||
|
// we only want to restart activities which were in execution
|
||||||
|
State state = activity.getState();
|
||||||
|
if (!state.inExecutionPhase())
|
||||||
|
continue;
|
||||||
|
|
||||||
|
logger.info("Starting Execution of " + activity.getLocator() + " on realm " + realmName);
|
||||||
|
|
||||||
|
// Activities need to be in state STOPPED to restart
|
||||||
|
if (state == State.ERROR) {
|
||||||
|
activity.getActionsWithState(State.ERROR).forEach(a -> a.setState(State.STOPPED));
|
||||||
|
} else if (state == State.WARNING) {
|
||||||
|
activity.getActionsWithState(State.WARNING).forEach(a -> a.setState(State.STOPPED));
|
||||||
|
} else if (state == State.EXECUTION) {
|
||||||
|
activity.getActionsWithState(State.EXECUTION).forEach(a -> a.setState(State.STOPPED));
|
||||||
|
}
|
||||||
|
tx.updateActivity(activity);
|
||||||
|
|
||||||
|
// register for execution
|
||||||
|
this.registeredActivities.addElement(realmName, activity.getLocator());
|
||||||
|
}
|
||||||
|
|
||||||
|
// commit changes to state
|
||||||
|
tx.commitOnClose();
|
||||||
|
}
|
||||||
|
|
||||||
|
// trigger execution of the registered activities
|
||||||
|
triggerExecution(realmName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void triggerExecution(String realm) {
|
public void triggerExecution(String realm) {
|
||||||
synchronized (this.registeredActivities) {
|
synchronized (this.registeredActivities) {
|
||||||
|
@ -372,10 +424,6 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
|
||||||
triggerExecution(realm);
|
triggerExecution(realm);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected StrolchTransaction openTx(String realm, Certificate cert, Class<?> clazz) {
|
|
||||||
return getContainer().getRealm(realm).openTx(cert, clazz);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DelayedExecutionTimer getDelayedExecutionTimer() {
|
public DelayedExecutionTimer getDelayedExecutionTimer() {
|
||||||
return this.delayedExecutionTimer;
|
return this.delayedExecutionTimer;
|
||||||
|
|
Loading…
Reference in New Issue