diff --git a/li.strolch.service/src/main/java/li/strolch/execution/Controller.java b/li.strolch.service/src/main/java/li/strolch/execution/Controller.java index e6a997740..f384b2024 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/Controller.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/Controller.java @@ -7,9 +7,10 @@ import static li.strolch.runtime.StrolchConstants.SYSTEM_USER_AGENT; import java.util.HashMap; import java.util.Map; import java.util.ResourceBundle; +import java.util.Set; -import li.strolch.agent.api.ComponentContainer; import li.strolch.agent.api.ObserverEvent; +import li.strolch.agent.api.StrolchAgent; import li.strolch.agent.api.StrolchLockException; import li.strolch.agent.api.StrolchRealm; import li.strolch.execution.command.*; @@ -27,29 +28,30 @@ import li.strolch.model.log.LogSeverity; import li.strolch.persistence.api.StrolchTransaction; import li.strolch.privilege.model.Certificate; import li.strolch.runtime.privilege.PrivilegedRunnable; +import li.strolch.runtime.privilege.PrivilegedRunnableWithResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Controller { - private static final Logger logger = LoggerFactory.getLogger(Controller.class); + protected static final Logger logger = LoggerFactory.getLogger(Controller.class); - private final int lockRetries; - private final String realm; - private final ComponentContainer container; - private final ExecutionHandler executionHandler; + protected final int lockRetries; + protected final String realm; + protected final StrolchAgent agent; + protected final ExecutionHandler executionHandler; - private final String activityType; - private final String activityId; - private final Locator locator; - - private Activity activity; + protected final String activityType; + protected final String activityId; + protected final Locator locator; private final Map inExecution; + protected Activity activity; + public Controller(String realm, ExecutionHandler executionHandler, Activity activity) { this.realm = realm; - this.container = executionHandler.getContainer(); + this.agent = executionHandler.getAgent(); this.executionHandler = executionHandler; this.locator = activity.getLocator(); this.activityType = activity.getType(); @@ -76,6 +78,14 @@ public class Controller { return this.activity; } + public Set getInExecutionActionLocators() { + return this.inExecution.keySet(); + } + + public ExecutionPolicy getExecutionPolicy(Locator actionLoc) { + return this.inExecution.get(actionLoc); + } + public ExecutionPolicy refreshExecutionPolicy(StrolchTransaction tx, Action action) { ExecutionPolicy executionPolicy = this.inExecution.computeIfAbsent(action.getLocator(), e -> { Resource resource = tx.getResourceFor(action, true); @@ -97,7 +107,11 @@ public class Controller { } protected void runAsAgent(PrivilegedRunnable runnable) throws Exception { - this.executionHandler.runAsAgent(runnable); + this.agent.runAsAgent(runnable); + } + + protected T runAsAgentWithResult(PrivilegedRunnableWithResult runnable) throws Exception { + return this.agent.runAsAgentWithResult(runnable); } @SuppressWarnings("BooleanMethodIsAlwaysInverted") @@ -114,37 +128,69 @@ public class Controller { } /** - * Starts the execution of this {@link Activity} + * Stops the execution of all actions of this controller */ - public void execute() throws Exception { - boolean[] trigger = new boolean[1]; - this.executionHandler.runAsAgent(ctx -> { - try (StrolchTransaction tx = openTx(ctx.getCertificate())) { - lockWithRetries(tx); - trigger[0] = execute(tx); - if (tx.needsCommit()) { - tx.commitOnClose(); - } - } - }); - - if (trigger[0]) - this.executionHandler.triggerExecution(this.realm); - } - - /** - * Stops the execution of all actions - */ - public void stop() { + public void stopExecutions() { synchronized (this.inExecution) { this.inExecution.values().forEach(ExecutionPolicy::stop); } } - private boolean execute(StrolchTransaction tx) { - if (!refreshActivity(tx)) - return false; + /** + * Executes {@link Action Actions} for {@link Activity} of {@link Controller#getLocator()}. Keeps executing till no {@link Action} was set to {@link State#EXECUTED} + */ + public boolean execute() throws Exception { + return runAsAgentWithResult(ctx -> { + try (StrolchTransaction tx = openTx(ctx.getCertificate())) { + lockWithRetries(tx); + if (!refreshActivity(tx)) + return false; + boolean trigger = internalExecute(tx); + + // we trigger execution for the same activity if the controller says it is needed + if (trigger) { + logger.info("Triggering additional execution of controller " + this + " after execution."); + triggerExecute(tx); + } + + if (tx.needsCommit()) + tx.commitOnClose(); + + return trigger; + } + }); + } + + /** + * Executes the activity in the given TX. Keeps executing till no {@link Action} was set to {@link State#EXECUTED} + * + * @param tx + * the TX + */ + public void execute(StrolchTransaction tx) { + lockWithRetries(tx); + if (!refreshActivity(tx)) + return; + + boolean trigger = internalExecute(tx); + + // we trigger execution for the same activity if the controller says it is needed + if (trigger) { + logger.info("Triggering additional execution of controller " + this + " after execution."); + triggerExecute(tx); + } + } + + /** + * Executes the activity in the given TX by calling the {@link ExecuteActivityCommand} + * + * @param tx + * the TX + * + * @return true if execute should be called again, i.e. the {@link ExecuteActivityCommand#needsRetriggerOfExecution()} returns true and the activity isn't complete yet + */ + protected boolean internalExecute(StrolchTransaction tx) { if (this.activity.getState().isExecuted()) { this.executionHandler.removeFromExecution(this); logger.info("Archiving executed activity " + this.locator + " with state " + this.activity.getState()); @@ -166,78 +212,111 @@ public class Controller { updateObservers(); - return command.needsRetriggerOfExecution(); + return command.needsRetriggerOfExecution() && !this.activity.inClosedPhase(); } /** - * Completes the execution of the given {@link Action} with the given {@link Locator} + * Completes the execution of the given {@link Action} with the given {@link Locator}, executing next {@link Action Actions} if possible * * @param actionLoc - * the {@link Locator} of the {@link Action} + * the {@link Locator} of the {@link Action} to set to executed */ public void toExecuted(Locator actionLoc) throws Exception { - this.executionHandler.runAsAgent(ctx -> { + runAsAgent(ctx -> { try (StrolchTransaction tx = openTx(ctx.getCertificate())) { - lockWithRetries(tx); - - if (!refreshActivity(tx)) + if (invalidActionContext(tx, actionLoc)) return; Action action = this.activity.getElementByLocator(actionLoc); - - // set this action to executed - toExecuted(tx, action); - - updateObservers(); - - // flush so we can see the changes performed - tx.flush(); + setToExecuted(tx, action); // now try and execute the next action(s) - execute(tx); + triggerExecute(tx); if (tx.needsCommit()) tx.commitOnClose(); } }); - - this.executionHandler.triggerExecution(this.realm); } /** - * Completes the execution of the given {@link Action} + * Completes the execution of the given {@link Action}. No further processing is done. + * + * @param tx + * the TX + * @param actionLoc + * the {@link Locator} of the {@link Action} to set to executed + */ + public void toExecuted(StrolchTransaction tx, Locator actionLoc) throws Exception { + if (invalidActionContext(tx, actionLoc)) + return; + + Action action = this.activity.getElementByLocator(actionLoc); + setToExecuted(tx, action); + } + + private boolean invalidActionContext(StrolchTransaction tx, Locator actionLoc) { + lockWithRetries(tx); + if (!this.inExecution.containsKey(actionLoc)) + throw new IllegalStateException(actionLoc + " is not in execution!"); + return !refreshActivity(tx); + } + + /** + *

Simply calls the {@link SetActionToExecutedCommand} and then updates the observers

+ * + *

Note: Usually you will want to call {@link #toExecuted(Locator)} or + * {@link #toExecuted(StrolchTransaction, Locator)}. This method expects the associated {@link Activity} to + * already be locked, and validated that this action is in execution

* * @param tx * the TX * @param action - * the {@link Action} to set to executed + * the Action to set to executed */ - public void toExecuted(StrolchTransaction tx, Action action) throws Exception { + public void setToExecuted(StrolchTransaction tx, Action action) { + + // set this action to executed SetActionToExecutedCommand command = new SetActionToExecutedCommand(tx); command.setExecutionPolicy(refreshExecutionPolicy(tx, action)); command.setAction(action); command.validate(); command.doCommand(); + + updateObservers(); + } + + /** + *

Keeps triggering till {@link #internalExecute(StrolchTransaction)} returns false.

+ * + *

This occurs when the {@link Action} which is executed, has state set to {@link State#EXECUTED} instead of + * {@link State#EXECUTION}. Thus the execution thread stays with this activity, keeping resources bound to it, + * till we can wait and allow other activities to execute

+ * + * @param tx + * the TX + */ + protected void triggerExecute(StrolchTransaction tx) { + boolean trigger; + do { + trigger = internalExecute(tx); + } while (trigger); } /** * Sets the state of the {@link Action} with the given {@link Locator} to {@link State#STOPPED} * * @param actionLoc - * the {@link Locator} of the {@link Action} + * the {@link Locator} of the {@link Action} to set to stopped */ public void toStopped(Locator actionLoc) throws Exception { - this.executionHandler.runAsAgent(ctx -> { + runAsAgent(ctx -> { try (StrolchTransaction tx = openTx(ctx.getCertificate())) { - lockWithRetries(tx); - - if (!refreshActivity(tx)) + if (invalidActionContext(tx, actionLoc)) return; Action action = this.activity.getElementByLocator(actionLoc); - - // set this action to executed - toStopped(tx, action); + internalToStopped(tx, action); tx.commitOnClose(); } @@ -251,10 +330,18 @@ public class Controller { * * @param tx * the TX - * @param action - * the {@link Action} to set to stopped + * @param actionLoc + * the {@link Locator} of the {@link Action} to set to stopped */ - public void toStopped(StrolchTransaction tx, Action action) throws Exception { + public void toStopped(StrolchTransaction tx, Locator actionLoc) throws Exception { + if (invalidActionContext(tx, actionLoc)) + return; + + Action action = this.activity.getElementByLocator(actionLoc); + internalToStopped(tx, action); + } + + protected void internalToStopped(StrolchTransaction tx, Action action) { SetActionToStoppedCommand command = new SetActionToStoppedCommand(tx); command.setExecutionPolicy(refreshExecutionPolicy(tx, action)); command.setAction(action); @@ -266,20 +353,16 @@ public class Controller { * Sets the state of the {@link Action} with the given {@link Locator} to {@link State#ERROR} * * @param actionLoc - * the {@link Locator} of the {@link Action} + * the {@link Locator} of the {@link Action} to set to executed */ public void toError(Locator actionLoc) throws Exception { - this.executionHandler.runAsAgent(ctx -> { + runAsAgent(ctx -> { try (StrolchTransaction tx = openTx(ctx.getCertificate())) { - lockWithRetries(tx); - - if (!refreshActivity(tx)) + if (invalidActionContext(tx, actionLoc)) return; Action action = this.activity.getElementByLocator(actionLoc); - - // set this action to error - toError(tx, action); + internalToError(tx, action); tx.commitOnClose(); } @@ -293,10 +376,18 @@ public class Controller { * * @param tx * the TX - * @param action - * the {@link Action} to set to error + * @param actionLoc + * the {@link Locator} of the {@link Action} to set to error */ - public void toError(StrolchTransaction tx, Action action) throws Exception { + public void toError(StrolchTransaction tx, Locator actionLoc) throws Exception { + if (invalidActionContext(tx, actionLoc)) + return; + + Action action = this.activity.getElementByLocator(actionLoc); + internalToError(tx, action); + } + + protected void internalToError(StrolchTransaction tx, Action action) { SetActionToErrorCommand command = new SetActionToErrorCommand(tx); command.setExecutionPolicy(refreshExecutionPolicy(tx, action)); command.setAction(action); @@ -308,20 +399,18 @@ public class Controller { * Sets the state of the {@link Action} with the given {@link Locator} to {@link State#WARNING} * * @param actionLoc - * the {@link Locator} of the {@link Action} + * the {@link Locator} of the {@link Action} to set to warning */ public void toWarning(Locator actionLoc) throws Exception { - this.executionHandler.runAsAgent(ctx -> { + runAsAgent(ctx -> { try (StrolchTransaction tx = openTx(ctx.getCertificate())) { - lockWithRetries(tx); - - if (!refreshActivity(tx)) + if (invalidActionContext(tx, actionLoc)) return; Action action = this.activity.getElementByLocator(actionLoc); // set this action to warning - toWarning(tx, action); + internalToWarning(tx, action); tx.commitOnClose(); } @@ -335,10 +424,18 @@ public class Controller { * * @param tx * the TX - * @param action - * the {@link Action} to set to error + * @param actionLoc + * the {@link Locator} of the {@link Action} to set to error */ - public void toWarning(StrolchTransaction tx, Action action) throws Exception { + public void toWarning(StrolchTransaction tx, Locator actionLoc) throws Exception { + if (invalidActionContext(tx, actionLoc)) + return; + + Action action = this.activity.getElementByLocator(actionLoc); + internalToWarning(tx, action); + } + + protected void internalToWarning(StrolchTransaction tx, Action action) { SetActionToWarningCommand command = new SetActionToWarningCommand(tx); command.setExecutionPolicy(refreshExecutionPolicy(tx, action)); command.setAction(action); @@ -359,8 +456,8 @@ public class Controller { } catch (Exception e) { logger.error("Failed to set " + locator + " to error due to " + e.getMessage(), e); - if (this.container.hasComponent(OperationsLog.class)) { - this.container.getComponent(OperationsLog.class).addMessage( + if (this.agent.hasComponent(OperationsLog.class)) { + this.agent.getComponent(OperationsLog.class).addMessage( new LogMessage(realm, SYSTEM_USER_AGENT, locator, LogSeverity.Exception, LogMessageState.Information, ResourceBundle.getBundle("strolch-service"), "execution.handler.failed.error").withException(e).value("reason", e)); @@ -382,8 +479,8 @@ public class Controller { } catch (Exception e) { logger.error("Failed to set " + locator + " to warning due to " + e.getMessage(), e); - if (this.container.hasComponent(OperationsLog.class)) { - this.container.getComponent(OperationsLog.class).addMessage( + if (this.agent.hasComponent(OperationsLog.class)) { + this.agent.getComponent(OperationsLog.class).addMessage( new LogMessage(realm, SYSTEM_USER_AGENT, locator, LogSeverity.Exception, LogMessageState.Information, ResourceBundle.getBundle("strolch-service"), "execution.handler.failed.warning").withException(e).value("reason", e)); @@ -392,7 +489,10 @@ public class Controller { }); } - private void lockWithRetries(StrolchTransaction tx) throws StrolchLockException { + protected void lockWithRetries(StrolchTransaction tx) throws StrolchLockException { + if (tx.hasLock(this.locator)) + return; + int tries = 0; while (true) { try { @@ -424,4 +524,9 @@ public class Controller { observerEvent.updated.addElement(Tags.CONTROLLER, this.activity); realm.getObserverHandler().notify(observerEvent); } + + @Override + public String toString() { + return "Controller{" + "realm='" + realm + '\'' + ", locator=" + locator + '}'; + } } diff --git a/li.strolch.service/src/main/java/li/strolch/execution/EventBasedExecutionHandler.java b/li.strolch.service/src/main/java/li/strolch/execution/EventBasedExecutionHandler.java index c9fe16987..eb1661344 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/EventBasedExecutionHandler.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/EventBasedExecutionHandler.java @@ -23,7 +23,6 @@ import li.strolch.model.parameter.StringParameter; import li.strolch.persistence.api.StrolchTransaction; import li.strolch.privilege.model.Certificate; import li.strolch.privilege.model.PrivilegeContext; -import li.strolch.runtime.configuration.ComponentConfiguration; import li.strolch.utils.collections.MapOfMaps; /** @@ -34,13 +33,14 @@ import li.strolch.utils.collections.MapOfMaps; */ public class EventBasedExecutionHandler extends ExecutionHandler { + private final MapOfMaps controllers; private Map statesByRealm; - private MapOfMaps controllers; private DelayedExecutionTimer delayedExecutionTimer; public EventBasedExecutionHandler(ComponentContainer container, String componentName) { super(container, componentName); + this.controllers = synchronizedMapOfMaps(new MapOfMaps<>(true)); } @Override @@ -71,17 +71,13 @@ public class EventBasedExecutionHandler extends ExecutionHandler { return activities.keySet(); } - @Override - public void initialize(ComponentConfiguration configuration) throws Exception { - this.controllers = synchronizedMapOfMaps(new MapOfMaps<>(true)); - super.initialize(configuration); + protected Controller newController(String realm, Activity activity) { + return new Controller(realm, this, activity); } @Override public void start() throws Exception { - evaluateStateByRealm(); - this.delayedExecutionTimer = new SimpleDurationExecutionTimer(getContainer().getAgent()); // restart execution of activities already in execution @@ -98,6 +94,11 @@ public class EventBasedExecutionHandler extends ExecutionHandler { @Override public void stop() throws Exception { + // first stop and clear all existing controllers + synchronized (this.controllers) { + this.controllers.keySet().forEach(this::stopControllers); + } + if (this.delayedExecutionTimer != null) { this.delayedExecutionTimer.destroy(); this.delayedExecutionTimer = null; @@ -106,6 +107,24 @@ public class EventBasedExecutionHandler extends ExecutionHandler { super.stop(); } + protected void stopControllers(String realm) { + logger.info("Stopping controllers for realm " + realm + "..."); + synchronized (this.controllers) { + Map map = this.controllers.getMap(realm); + if (map == null) { + logger.error("No controllers for realm " + realm); + return; + } + + map.values().forEach(controller -> { + logger.info("Stopping controller " + controller); + controller.stopExecutions(); + }); + + this.controllers.removeMap(realm); + } + } + @Override public void toExecution(String realm, Activity activity) { ExecutionHandlerState state = this.statesByRealm.getOrDefault(realm, ExecutionHandlerState.Running); @@ -115,7 +134,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler { Controller controller = this.controllers.getElement(realm, activity.getLocator()); if (controller == null) { - controller = new Controller(realm, this, activity); + controller = newController(realm, activity); this.controllers.addElement(realm, activity.getLocator(), controller); notifyObserverAdd(controller); } @@ -162,17 +181,17 @@ public class EventBasedExecutionHandler extends ExecutionHandler { getExecutor().submit(() -> notifyObserverRemove(realm, removed)); } - private void restartActivityExecution(PrivilegeContext ctx) { - // iterate the realms - for (String realmName : getContainer().getRealmNames()) { - reloadActivitiesInExecution(ctx, realmName); - } + protected void restartActivityExecution(PrivilegeContext ctx) { + getContainer().getRealmNames().forEach(realmName -> reloadActivitiesInExecution(ctx, realmName)); } @Override public void reloadActivitiesInExecution(PrivilegeContext ctx, String realmName) { try (StrolchTransaction tx = openTx(realmName, ctx.getCertificate(), false)) { + // first stop and clear all existing controllers + stopControllers(realmName); + // iterate all activities tx.streamActivities().forEach(activity -> { @@ -196,7 +215,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler { tx.update(activity); // register for execution - Controller controller = new Controller(realmName, this, activity); + Controller controller = newController(realmName, activity); this.controllers.addElement(realmName, activity.getLocator(), controller); }); @@ -205,19 +224,18 @@ public class EventBasedExecutionHandler extends ExecutionHandler { } // trigger execution of the registered activities + logger.info("Triggering execution for realm " + realmName + " after reloading activities..."); triggerExecution(realmName); } @Override public void triggerExecution(String realm) { - ExecutionHandlerState state = this.statesByRealm.getOrDefault(realm, ExecutionHandlerState.Running); if (state == ExecutionHandlerState.Paused) { logger.warn("Ignoring trigger for paused realm " + realm); return; } - //noinspection SynchronizeOnNonFinalField synchronized (this.controllers) { Map controllers = this.controllers.getMap(realm); if (controllers != null) @@ -225,78 +243,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler { } } - @Override - public ExecutionHandlerState getState(String realm) { - return this.statesByRealm.getOrDefault(realm, ExecutionHandlerState.Running); - } - - private void evaluateStateByRealm() throws Exception { - - this.statesByRealm = Collections.synchronizedMap(new HashMap<>()); - - runAsAgent(ctx -> getContainer().getRealmNames().forEach(realm -> { - try (StrolchTransaction tx = openTx(realm, ctx.getCertificate(), false)) { - Resource executionHandlerConfig = tx.getResourceBy(TYPE_CONFIGURATION, - ExecutionHandler.class.getSimpleName()); - if (executionHandlerConfig == null) { - this.statesByRealm.put(realm, ExecutionHandlerState.Running); - } else { - ParameterBag parameters = executionHandlerConfig.getParameterBag(BAG_PARAMETERS); - if (parameters == null) { - this.statesByRealm.put(realm, ExecutionHandlerState.Running); - } else { - StringParameter stateP = parameters.getParameter(PARAM_STATE); - if (stateP == null) { - this.statesByRealm.put(realm, ExecutionHandlerState.Running); - } else { - ExecutionHandlerState state; - try { - state = ExecutionHandlerState.valueOf(stateP.getValue()); - } catch (Exception e) { - state = ExecutionHandlerState.Running; - stateP.setValue(ExecutionHandlerState.Running.name()); - tx.update(executionHandlerConfig); - tx.commitOnClose(); - logger.error("Failed to read unhandled state " + stateP.getValue(), e); - } - this.statesByRealm.put(realm, state); - } - } - } - } - })); - } - - @Override - public void setState(Certificate cert, String realm, ExecutionHandlerState state) { - try (StrolchTransaction tx = openTx(realm, cert, false)) { - Resource executionHandlerConfig = tx.getResourceBy(TYPE_CONFIGURATION, - ExecutionHandler.class.getSimpleName()); - if (executionHandlerConfig == null) { - executionHandlerConfig = new Resource(ExecutionHandler.class.getSimpleName(), - "ExecutionHandler Configuration", TYPE_CONFIGURATION); - } - ParameterBag parameters = executionHandlerConfig.getParameterBag(BAG_PARAMETERS); - if (parameters == null) { - parameters = new ParameterBag(BAG_PARAMETERS, "Parameters", TYPE_PARAMETERS); - executionHandlerConfig.addParameterBag(parameters); - } - StringParameter stateP = parameters.getParameter(PARAM_STATE); - if (stateP == null) { - stateP = new StringParameter(PARAM_STATE, "State", state); - parameters.addParameter(stateP); - } - - stateP.setValueE(state); - - tx.addOrUpdate(executionHandlerConfig); - tx.commitOnClose(); - - this.statesByRealm.put(realm, state); - } - } - - private void toExecution(Controller controller) { + protected void toExecution(Controller controller) { String realm = controller.getRealm(); ExecutionHandlerState state = this.statesByRealm.getOrDefault(realm, ExecutionHandlerState.Running); @@ -307,7 +254,15 @@ public class EventBasedExecutionHandler extends ExecutionHandler { getExecutor().execute(() -> { try { - controller.execute(); + + // execute the controller + boolean trigger = controller.execute(); + + if (trigger) { + logger.info("Triggering of controllers for realm " + realm + " after executing " + controller); + triggerExecution(realm); + } + } catch (Exception e) { logger.error("Failed to set " + controller.getLocator() + " to execution", e); @@ -330,6 +285,8 @@ public class EventBasedExecutionHandler extends ExecutionHandler { if (controller != null) controller.toExecuted(locator); + triggerExecution(realm); + } catch (Exception e) { logger.error("Failed to set " + locator + " to executed due to " + e.getMessage(), e); @@ -471,4 +428,66 @@ public class EventBasedExecutionHandler extends ExecutionHandler { public DelayedExecutionTimer getDelayedExecutionTimer() { return this.delayedExecutionTimer; } + + @Override + public ExecutionHandlerState getState(String realm) { + return this.statesByRealm.getOrDefault(realm, ExecutionHandlerState.Running); + } + + @Override + public void setState(Certificate cert, String realm, ExecutionHandlerState state) { + try (StrolchTransaction tx = openTx(realm, cert, false)) { + Resource executionHandlerConfig = tx.getResourceBy(TYPE_CONFIGURATION, + ExecutionHandler.class.getSimpleName()); + if (executionHandlerConfig == null) { + executionHandlerConfig = new Resource(ExecutionHandler.class.getSimpleName(), + "ExecutionHandler Configuration", TYPE_CONFIGURATION); + } + ParameterBag parameters = executionHandlerConfig.getParameterBag(BAG_PARAMETERS); + if (parameters == null) { + parameters = new ParameterBag(BAG_PARAMETERS, "Parameters", TYPE_PARAMETERS); + executionHandlerConfig.addParameterBag(parameters); + } + StringParameter stateP = parameters.getParameter(PARAM_STATE); + if (stateP == null) { + stateP = new StringParameter(PARAM_STATE, "State", state); + parameters.addParameter(stateP); + } + + stateP.setValueE(state); + + tx.addOrUpdate(executionHandlerConfig); + tx.commitOnClose(); + + this.statesByRealm.put(realm, state); + } + } + + private void evaluateStateByRealm() throws Exception { + this.statesByRealm = Collections.synchronizedMap(new HashMap<>()); + + runAsAgent(ctx -> getContainer().getRealmNames().forEach(realm -> { + try (StrolchTransaction tx = openTx(realm, ctx.getCertificate(), false)) { + + Resource config = tx.getResourceBy(TYPE_CONFIGURATION, ExecutionHandler.class.getSimpleName()); + if (config == null) { + this.statesByRealm.put(realm, ExecutionHandlerState.Running); + } else { + String currentState = config.getString(PARAM_STATE); + ExecutionHandlerState state = ExecutionHandlerState.Running; + try { + if (!currentState.isEmpty()) + state = ExecutionHandlerState.valueOf(currentState); + } catch (Exception e) { + config.setString(PARAM_STATE, ExecutionHandlerState.Running); + tx.update(config); + tx.commitOnClose(); + logger.error("Failed to read unhandled state " + currentState, e); + } + + this.statesByRealm.put(realm, state); + } + } + })); + } } diff --git a/li.strolch.service/src/main/java/li/strolch/execution/ExecutionHandler.java b/li.strolch.service/src/main/java/li/strolch/execution/ExecutionHandler.java index d48ce5ab0..4b483900e 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/ExecutionHandler.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/ExecutionHandler.java @@ -14,10 +14,10 @@ import li.strolch.model.activity.Action; import li.strolch.model.activity.Activity; import li.strolch.model.activity.TimeOrdering; import li.strolch.persistence.api.StrolchTransaction; -import li.strolch.privilege.base.PrivilegeException; import li.strolch.privilege.model.Certificate; import li.strolch.privilege.model.PrivilegeContext; import li.strolch.runtime.privilege.PrivilegedRunnable; +import li.strolch.runtime.privilege.PrivilegedRunnableWithResult; /** *

@@ -49,14 +49,21 @@ public abstract class ExecutionHandler extends StrolchComponent { public static final String PARAM_STATE = "state"; + @Override public StrolchTransaction openTx(String realm, Certificate cert, Class action, boolean readOnly) { return super.openTx(realm, cert, action.getName(), readOnly); } - public void runAsAgent(PrivilegedRunnable runnable) throws PrivilegeException, Exception { + @Override + public void runAsAgent(PrivilegedRunnable runnable) throws Exception { super.runAsAgent(runnable); } + @Override + public T runAsAgentWithResult(PrivilegedRunnableWithResult runnable) throws Exception { + return super.runAsAgentWithResult(runnable); + } + public ExecutorService getExecutor() { return getExecutorService("ExecutionHandler"); } diff --git a/li.strolch.service/src/main/java/li/strolch/execution/SimpleDurationExecutionTimer.java b/li.strolch.service/src/main/java/li/strolch/execution/SimpleDurationExecutionTimer.java index 217d07433..7b203cd0e 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/SimpleDurationExecutionTimer.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/SimpleDurationExecutionTimer.java @@ -88,6 +88,7 @@ public class SimpleDurationExecutionTimer implements DelayedExecutionTimer { try { controller.toExecuted(locator); + executionHandler.triggerExecution(realm); } catch (Exception e) { logger.error("Failed to set " + locator + " to executed due to " + e.getMessage(), e); diff --git a/li.strolch.service/src/main/java/li/strolch/execution/command/ExecuteActivityCommand.java b/li.strolch.service/src/main/java/li/strolch/execution/command/ExecuteActivityCommand.java index e133c92ad..005a5a266 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/command/ExecuteActivityCommand.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/command/ExecuteActivityCommand.java @@ -50,6 +50,12 @@ public class ExecuteActivityCommand extends BasePlanningAndExecutionCommand updateOrderState(tx(), activity, currentState, activity.getState()); } + @Override + public Void visitActivity(Activity activity) { + activity.getTimeOrdering().accept(this, activity); + return null; + } + @Override public Void visitAction(Action action) { execute(action); @@ -60,15 +66,22 @@ public class ExecuteActivityCommand extends BasePlanningAndExecutionCommand // first plan if (action.getState().compareTo(State.PLANNED) < 0) { + State currentState = action.getState(); getPlanningPolicy(action).plan(action); + if (action.getState() != State.PLANNED) { + if (currentState != action.getState() && action.isResourceDefined()) + getConfirmationPolicy(action).doConfirmation(action); + logger.info("Action " + action.getLocator() + " was not planned, can thus not executed."); return; } + + // planning is complete, so we can now confirm it + getConfirmationPolicy(action).toPlanned(action); } tx().lock(action.getResourceLocator()); - tx().removeFromCache(action.getResourceLocator()); ConfirmationPolicy confirmationPolicy = getConfirmationPolicy(action); ExecutionPolicy executionPolicy = getExecutionPolicy(action); @@ -165,7 +178,6 @@ public class ExecuteActivityCommand extends BasePlanningAndExecutionCommand @Override public void visitParallel(Activity activity) { - if (activity.getState().compareTo(State.EXECUTED) >= 0) return; @@ -177,16 +189,8 @@ public class ExecuteActivityCommand extends BasePlanningAndExecutionCommand // in parallel we execute all the actions in the activity - boolean canExecute = isExecutable(element); - if (canExecute) { + if (isExecutable(element)) element.accept(this); - } } } - - @Override - public Void visitActivity(Activity activity) { - activity.getTimeOrdering().accept(this, activity); - return null; - } } diff --git a/li.strolch.service/src/main/java/li/strolch/execution/command/PlanActionCommand.java b/li.strolch.service/src/main/java/li/strolch/execution/command/PlanActionCommand.java index 703c8c835..9607cf429 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/command/PlanActionCommand.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/command/PlanActionCommand.java @@ -17,7 +17,6 @@ package li.strolch.execution.command; import static li.strolch.execution.policy.NoPlanning.DEFAULT_PLANNING; -import li.strolch.execution.policy.ConfirmationPolicy; import li.strolch.execution.policy.PlanningPolicy; import li.strolch.model.Resource; import li.strolch.model.State; @@ -70,17 +69,17 @@ public class PlanActionCommand extends PlanningCommand { public Void visitAction(Action action) { if (action.getState() != State.CREATED && action.getState() != State.PLANNING) throw new IllegalStateException("Can not plan action in state " + action.getState()); - ConfirmationPolicy confirmationPolicy = getConfirmationPolicy(action); action.setState(State.PLANNING); - confirmationPolicy.toPlanning(action); + if (action.isResourceDefined()) + getConfirmationPolicy(action).doConfirmation(action); PolicyDef planningPolicyDef = action.findPolicy(PlanningPolicy.class, DEFAULT_PLANNING); PlanningPolicy planningPolicy = tx().getPolicy(PlanningPolicy.class, planningPolicyDef); planningPolicy.plan(action); if (action.getState() == State.PLANNED) - confirmationPolicy.toPlanned(action); + getConfirmationPolicy(action).toPlanned(action); return null; } diff --git a/li.strolch.service/src/main/java/li/strolch/execution/command/SetActionToPlannedCommand.java b/li.strolch.service/src/main/java/li/strolch/execution/command/SetActionToPlannedCommand.java index 004ebe5dd..dcd7f6c3c 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/command/SetActionToPlannedCommand.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/command/SetActionToPlannedCommand.java @@ -42,9 +42,10 @@ public class SetActionToPlannedCommand extends BasePlanningAndExecutionCommand { Activity rootElement = this.action.getRootElement(); State currentState = rootElement.getState(); - this.action.setState(State.PLANNED); - - getConfirmationPolicy(this.action).toPlanned(this.action); + State currentActionState = this.action.getState(); + getPlanningPolicy(this.action).plan(this.action); + if (currentActionState != this.action.getState() && this.action.isResourceDefined()) + getConfirmationPolicy(this.action).doConfirmation(this.action); updateOrderState(tx(), rootElement, currentState, rootElement.getState()); } diff --git a/li.strolch.service/src/main/java/li/strolch/execution/policy/NoPlanning.java b/li.strolch.service/src/main/java/li/strolch/execution/policy/NoPlanning.java index e40ba9ad5..dd54f9ec6 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/policy/NoPlanning.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/policy/NoPlanning.java @@ -12,12 +12,6 @@ public class NoPlanning extends PlanningPolicy { super(tx); } - @Override - public Resource evaluateAndSetResource(Action action) { - tx().lock(Resource.locatorFor(action.getResourceType(), action.getResourceId())); - return tx().getResourceBy(action.getResourceType(), action.getResourceId()); - } - @Override public void plan(Action action) { DBC.PRE.assertEquals("Can not plan illegal state", State.CREATED, action.getState()); diff --git a/li.strolch.service/src/main/java/li/strolch/execution/policy/PlanningPolicy.java b/li.strolch.service/src/main/java/li/strolch/execution/policy/PlanningPolicy.java index d9c673e67..fe04f519e 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/policy/PlanningPolicy.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/policy/PlanningPolicy.java @@ -8,21 +8,21 @@ import li.strolch.policy.StrolchPolicy; public abstract class PlanningPolicy extends StrolchPolicy { - public static PolicyDef DEFAULT_PLANNING = PolicyDef - .valueOf(PlanningPolicy.class.getSimpleName(), "key:DefaultPlanning"); + public static PolicyDef DEFAULT_PLANNING = PolicyDef.getKeyPolicy(PlanningPolicy.class, "DefaultPlanning"); public PlanningPolicy(StrolchTransaction tx) { super(tx); } - public abstract Resource evaluateAndSetResource(Action action); + public Resource evaluateAndSetResource(Action action) { + if (!action.isResourceDefined()) + return null; + + tx().lock(action.getResourceLocator()); + return tx().getResourceBy(action.getResourceType(), action.getResourceId()); + } public abstract void plan(Action action); public abstract void unplan(Action action); - - @Override - public void undo() { - // do nothing - } } diff --git a/li.strolch.service/src/main/java/li/strolch/execution/policy/ReservationExecution.java b/li.strolch.service/src/main/java/li/strolch/execution/policy/ReservationExecution.java index 3acda5859..278831dab 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/policy/ReservationExecution.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/policy/ReservationExecution.java @@ -33,40 +33,26 @@ public class ReservationExecution extends DurationExecution { @Override public boolean isExecutable(Action action) { - switch (action.getType()) { - case TYPE_RESERVE: - return !isReserved(tx(), action); - case TYPE_RELEASE: - return true; - default: - return super.isExecutable(action); - } + return switch (action.getType()) { + case TYPE_RESERVE -> !isReserved(tx(), action); + case TYPE_RELEASE -> true; + default -> super.isExecutable(action); + }; } @Override public void toExecution(Action action) { switch (action.getType()) { - case TYPE_RESERVE: - case TYPE_RELEASE: - - toExecuted(action); - break; - - default: - super.toExecution(action); + case TYPE_RESERVE, TYPE_RELEASE -> toExecuted(action); + default -> super.toExecution(action); } } @Override public void toExecuted(Action action) { switch (action.getType()) { - - case TYPE_RESERVE: - setReservation(tx(), action, true); - break; - case TYPE_RELEASE: - setReservation(tx(), action, false); - break; + case TYPE_RESERVE -> setReservation(tx(), action, true); + case TYPE_RELEASE -> setReservation(tx(), action, false); } super.toExecuted(action); diff --git a/li.strolch.service/src/main/java/li/strolch/execution/policy/SimplePlanning.java b/li.strolch.service/src/main/java/li/strolch/execution/policy/SimplePlanning.java index c7cb6847e..bd8a5db0e 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/policy/SimplePlanning.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/policy/SimplePlanning.java @@ -17,16 +17,6 @@ public class SimplePlanning extends PlanningPolicy { super(tx); } - @Override - public Resource evaluateAndSetResource(Action action) { - if (action.hasResourceDefined()) { - tx().lock(action.getResourceLocator()); - return tx().getResourceBy(action.getResourceType(), action.getResourceId()); - } - - return null; - } - /** * Command to plan an {@link Action} to a {@link Resource}. It is assumed that the {@link IValueChange} objects of * the action are already constructed. The resource is evaluated using {@link #evaluateAndSetResource(Action)} diff --git a/li.strolch.service/src/main/java/li/strolch/execution/service/ExecuteActionService.java b/li.strolch.service/src/main/java/li/strolch/execution/service/ExecuteActionService.java index 0642665bc..1ca89ffda 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/service/ExecuteActionService.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/service/ExecuteActionService.java @@ -1,5 +1,6 @@ package li.strolch.execution.service; +import li.strolch.execution.Controller; import li.strolch.execution.ExecutionHandler; import li.strolch.model.State; import li.strolch.model.activity.Action; @@ -24,33 +25,20 @@ public class ExecuteActionService extends AbstractService { ExecutionHandler executionHandler = getContainer().getComponent(ExecutionHandler.class); executionHandler.setState(getCertificate(), realm, ExecutionHandlerState.Running); executionHandler.triggerExecution(realm); - - break; } - case "HaltNew": { - + case "HaltNew" -> { ExecutionHandler executionHandler = getContainer().getComponent(ExecutionHandler.class); executionHandler.setState(getCertificate(), realm, ExecutionHandlerState.HaltNew); - - break; } - case "Paused": { - + case "Paused" -> { ExecutionHandler executionHandler = getContainer().getComponent(ExecutionHandler.class); executionHandler.setState(getCertificate(), realm, ExecutionHandlerState.Paused); - - break; } - case "Trigger": { - + case "Trigger" -> { ExecutionHandler executionHandler = getContainer().getComponent(ExecutionHandler.class); executionHandler.triggerExecution(realm); - - break; } - case "ReloadActivities": { - + case "ReloadActivities" -> { ExecutionHandler executionHandler = getContainer().getComponent(ExecutionHandler.class); executionHandler.reloadActivitiesInExecution(getPrivilegeContext(), realm); - - break; } - - default: - throw new UnsupportedOperationException("Unhandled state " + state); + default -> throw new UnsupportedOperationException("Unhandled state " + state); } return ServiceResult.success();