From b29d4703b31a35667df22079f819c577d5a23079 Mon Sep 17 00:00:00 2001 From: Robert von Burg Date: Wed, 19 Jan 2022 19:48:43 +0100 Subject: [PATCH] [Major] Major rewrite of Controller/Activity execution Controller now never triggers execution on the ExecutionHandler, but calls itself as often as necessary to keep execution of the same Activity in the same thread run. This leads to a more predictable sequence of executing of actions. Further it is now possible to override the Controller's instantiated, to hook into the trigger of further execution of an Activity. --- .../java/li/strolch/execution/Controller.java | 293 ++++++++++++------ .../execution/EventBasedExecutionHandler.java | 199 ++++++------ .../strolch/execution/ExecutionHandler.java | 11 +- .../SimpleDurationExecutionTimer.java | 1 + .../command/ExecuteActivityCommand.java | 26 +- .../execution/command/PlanActionCommand.java | 7 +- .../command/SetActionToPlannedCommand.java | 7 +- .../strolch/execution/policy/NoPlanning.java | 6 - .../execution/policy/PlanningPolicy.java | 16 +- .../policy/ReservationExecution.java | 32 +- .../execution/policy/SimplePlanning.java | 10 - .../service/ExecuteActionService.java | 36 +-- .../service/SetActionToClosedService.java | 2 +- .../service/SetActionToCreatedService.java | 4 +- .../service/SetActionToErrorService.java | 2 +- .../service/SetActionToExecutedService.java | 2 +- .../service/SetActionToPlannedService.java | 6 +- .../service/SetActionToStoppedService.java | 2 +- .../service/SetActionToWarningService.java | 2 +- .../SetExecutionHandlerStateService.java | 29 +- 20 files changed, 385 insertions(+), 308 deletions(-) diff --git a/li.strolch.service/src/main/java/li/strolch/execution/Controller.java b/li.strolch.service/src/main/java/li/strolch/execution/Controller.java index e6a997740..f384b2024 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/Controller.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/Controller.java @@ -7,9 +7,10 @@ import static li.strolch.runtime.StrolchConstants.SYSTEM_USER_AGENT; import java.util.HashMap; import java.util.Map; import java.util.ResourceBundle; +import java.util.Set; -import li.strolch.agent.api.ComponentContainer; import li.strolch.agent.api.ObserverEvent; +import li.strolch.agent.api.StrolchAgent; import li.strolch.agent.api.StrolchLockException; import li.strolch.agent.api.StrolchRealm; import li.strolch.execution.command.*; @@ -27,29 +28,30 @@ import li.strolch.model.log.LogSeverity; import li.strolch.persistence.api.StrolchTransaction; import li.strolch.privilege.model.Certificate; import li.strolch.runtime.privilege.PrivilegedRunnable; +import li.strolch.runtime.privilege.PrivilegedRunnableWithResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Controller { - private static final Logger logger = LoggerFactory.getLogger(Controller.class); + protected static final Logger logger = LoggerFactory.getLogger(Controller.class); - private final int lockRetries; - private final String realm; - private final ComponentContainer container; - private final ExecutionHandler executionHandler; + protected final int lockRetries; + protected final String realm; + protected final StrolchAgent agent; + protected final ExecutionHandler executionHandler; - private final String activityType; - private final String activityId; - private final Locator locator; - - private Activity activity; + protected final String activityType; + protected final String activityId; + protected final Locator locator; private final Map inExecution; + protected Activity activity; + public Controller(String realm, ExecutionHandler executionHandler, Activity activity) { this.realm = realm; - this.container = executionHandler.getContainer(); + this.agent = executionHandler.getAgent(); this.executionHandler = executionHandler; this.locator = activity.getLocator(); this.activityType = activity.getType(); @@ -76,6 +78,14 @@ public class Controller { return this.activity; } + public Set getInExecutionActionLocators() { + return this.inExecution.keySet(); + } + + public ExecutionPolicy getExecutionPolicy(Locator actionLoc) { + return this.inExecution.get(actionLoc); + } + public ExecutionPolicy refreshExecutionPolicy(StrolchTransaction tx, Action action) { ExecutionPolicy executionPolicy = this.inExecution.computeIfAbsent(action.getLocator(), e -> { Resource resource = tx.getResourceFor(action, true); @@ -97,7 +107,11 @@ public class Controller { } protected void runAsAgent(PrivilegedRunnable runnable) throws Exception { - this.executionHandler.runAsAgent(runnable); + this.agent.runAsAgent(runnable); + } + + protected T runAsAgentWithResult(PrivilegedRunnableWithResult runnable) throws Exception { + return this.agent.runAsAgentWithResult(runnable); } @SuppressWarnings("BooleanMethodIsAlwaysInverted") @@ -114,37 +128,69 @@ public class Controller { } /** - * Starts the execution of this {@link Activity} + * Stops the execution of all actions of this controller */ - public void execute() throws Exception { - boolean[] trigger = new boolean[1]; - this.executionHandler.runAsAgent(ctx -> { - try (StrolchTransaction tx = openTx(ctx.getCertificate())) { - lockWithRetries(tx); - trigger[0] = execute(tx); - if (tx.needsCommit()) { - tx.commitOnClose(); - } - } - }); - - if (trigger[0]) - this.executionHandler.triggerExecution(this.realm); - } - - /** - * Stops the execution of all actions - */ - public void stop() { + public void stopExecutions() { synchronized (this.inExecution) { this.inExecution.values().forEach(ExecutionPolicy::stop); } } - private boolean execute(StrolchTransaction tx) { - if (!refreshActivity(tx)) - return false; + /** + * Executes {@link Action Actions} for {@link Activity} of {@link Controller#getLocator()}. Keeps executing till no {@link Action} was set to {@link State#EXECUTED} + */ + public boolean execute() throws Exception { + return runAsAgentWithResult(ctx -> { + try (StrolchTransaction tx = openTx(ctx.getCertificate())) { + lockWithRetries(tx); + if (!refreshActivity(tx)) + return false; + boolean trigger = internalExecute(tx); + + // we trigger execution for the same activity if the controller says it is needed + if (trigger) { + logger.info("Triggering additional execution of controller " + this + " after execution."); + triggerExecute(tx); + } + + if (tx.needsCommit()) + tx.commitOnClose(); + + return trigger; + } + }); + } + + /** + * Executes the activity in the given TX. Keeps executing till no {@link Action} was set to {@link State#EXECUTED} + * + * @param tx + * the TX + */ + public void execute(StrolchTransaction tx) { + lockWithRetries(tx); + if (!refreshActivity(tx)) + return; + + boolean trigger = internalExecute(tx); + + // we trigger execution for the same activity if the controller says it is needed + if (trigger) { + logger.info("Triggering additional execution of controller " + this + " after execution."); + triggerExecute(tx); + } + } + + /** + * Executes the activity in the given TX by calling the {@link ExecuteActivityCommand} + * + * @param tx + * the TX + * + * @return true if execute should be called again, i.e. the {@link ExecuteActivityCommand#needsRetriggerOfExecution()} returns true and the activity isn't complete yet + */ + protected boolean internalExecute(StrolchTransaction tx) { if (this.activity.getState().isExecuted()) { this.executionHandler.removeFromExecution(this); logger.info("Archiving executed activity " + this.locator + " with state " + this.activity.getState()); @@ -166,78 +212,111 @@ public class Controller { updateObservers(); - return command.needsRetriggerOfExecution(); + return command.needsRetriggerOfExecution() && !this.activity.inClosedPhase(); } /** - * Completes the execution of the given {@link Action} with the given {@link Locator} + * Completes the execution of the given {@link Action} with the given {@link Locator}, executing next {@link Action Actions} if possible * * @param actionLoc - * the {@link Locator} of the {@link Action} + * the {@link Locator} of the {@link Action} to set to executed */ public void toExecuted(Locator actionLoc) throws Exception { - this.executionHandler.runAsAgent(ctx -> { + runAsAgent(ctx -> { try (StrolchTransaction tx = openTx(ctx.getCertificate())) { - lockWithRetries(tx); - - if (!refreshActivity(tx)) + if (invalidActionContext(tx, actionLoc)) return; Action action = this.activity.getElementByLocator(actionLoc); - - // set this action to executed - toExecuted(tx, action); - - updateObservers(); - - // flush so we can see the changes performed - tx.flush(); + setToExecuted(tx, action); // now try and execute the next action(s) - execute(tx); + triggerExecute(tx); if (tx.needsCommit()) tx.commitOnClose(); } }); - - this.executionHandler.triggerExecution(this.realm); } /** - * Completes the execution of the given {@link Action} + * Completes the execution of the given {@link Action}. No further processing is done. + * + * @param tx + * the TX + * @param actionLoc + * the {@link Locator} of the {@link Action} to set to executed + */ + public void toExecuted(StrolchTransaction tx, Locator actionLoc) throws Exception { + if (invalidActionContext(tx, actionLoc)) + return; + + Action action = this.activity.getElementByLocator(actionLoc); + setToExecuted(tx, action); + } + + private boolean invalidActionContext(StrolchTransaction tx, Locator actionLoc) { + lockWithRetries(tx); + if (!this.inExecution.containsKey(actionLoc)) + throw new IllegalStateException(actionLoc + " is not in execution!"); + return !refreshActivity(tx); + } + + /** + *

Simply calls the {@link SetActionToExecutedCommand} and then updates the observers

+ * + *

Note: Usually you will want to call {@link #toExecuted(Locator)} or + * {@link #toExecuted(StrolchTransaction, Locator)}. This method expects the associated {@link Activity} to + * already be locked, and validated that this action is in execution

* * @param tx * the TX * @param action - * the {@link Action} to set to executed + * the Action to set to executed */ - public void toExecuted(StrolchTransaction tx, Action action) throws Exception { + public void setToExecuted(StrolchTransaction tx, Action action) { + + // set this action to executed SetActionToExecutedCommand command = new SetActionToExecutedCommand(tx); command.setExecutionPolicy(refreshExecutionPolicy(tx, action)); command.setAction(action); command.validate(); command.doCommand(); + + updateObservers(); + } + + /** + *

Keeps triggering till {@link #internalExecute(StrolchTransaction)} returns false.

+ * + *

This occurs when the {@link Action} which is executed, has state set to {@link State#EXECUTED} instead of + * {@link State#EXECUTION}. Thus the execution thread stays with this activity, keeping resources bound to it, + * till we can wait and allow other activities to execute

+ * + * @param tx + * the TX + */ + protected void triggerExecute(StrolchTransaction tx) { + boolean trigger; + do { + trigger = internalExecute(tx); + } while (trigger); } /** * Sets the state of the {@link Action} with the given {@link Locator} to {@link State#STOPPED} * * @param actionLoc - * the {@link Locator} of the {@link Action} + * the {@link Locator} of the {@link Action} to set to stopped */ public void toStopped(Locator actionLoc) throws Exception { - this.executionHandler.runAsAgent(ctx -> { + runAsAgent(ctx -> { try (StrolchTransaction tx = openTx(ctx.getCertificate())) { - lockWithRetries(tx); - - if (!refreshActivity(tx)) + if (invalidActionContext(tx, actionLoc)) return; Action action = this.activity.getElementByLocator(actionLoc); - - // set this action to executed - toStopped(tx, action); + internalToStopped(tx, action); tx.commitOnClose(); } @@ -251,10 +330,18 @@ public class Controller { * * @param tx * the TX - * @param action - * the {@link Action} to set to stopped + * @param actionLoc + * the {@link Locator} of the {@link Action} to set to stopped */ - public void toStopped(StrolchTransaction tx, Action action) throws Exception { + public void toStopped(StrolchTransaction tx, Locator actionLoc) throws Exception { + if (invalidActionContext(tx, actionLoc)) + return; + + Action action = this.activity.getElementByLocator(actionLoc); + internalToStopped(tx, action); + } + + protected void internalToStopped(StrolchTransaction tx, Action action) { SetActionToStoppedCommand command = new SetActionToStoppedCommand(tx); command.setExecutionPolicy(refreshExecutionPolicy(tx, action)); command.setAction(action); @@ -266,20 +353,16 @@ public class Controller { * Sets the state of the {@link Action} with the given {@link Locator} to {@link State#ERROR} * * @param actionLoc - * the {@link Locator} of the {@link Action} + * the {@link Locator} of the {@link Action} to set to executed */ public void toError(Locator actionLoc) throws Exception { - this.executionHandler.runAsAgent(ctx -> { + runAsAgent(ctx -> { try (StrolchTransaction tx = openTx(ctx.getCertificate())) { - lockWithRetries(tx); - - if (!refreshActivity(tx)) + if (invalidActionContext(tx, actionLoc)) return; Action action = this.activity.getElementByLocator(actionLoc); - - // set this action to error - toError(tx, action); + internalToError(tx, action); tx.commitOnClose(); } @@ -293,10 +376,18 @@ public class Controller { * * @param tx * the TX - * @param action - * the {@link Action} to set to error + * @param actionLoc + * the {@link Locator} of the {@link Action} to set to error */ - public void toError(StrolchTransaction tx, Action action) throws Exception { + public void toError(StrolchTransaction tx, Locator actionLoc) throws Exception { + if (invalidActionContext(tx, actionLoc)) + return; + + Action action = this.activity.getElementByLocator(actionLoc); + internalToError(tx, action); + } + + protected void internalToError(StrolchTransaction tx, Action action) { SetActionToErrorCommand command = new SetActionToErrorCommand(tx); command.setExecutionPolicy(refreshExecutionPolicy(tx, action)); command.setAction(action); @@ -308,20 +399,18 @@ public class Controller { * Sets the state of the {@link Action} with the given {@link Locator} to {@link State#WARNING} * * @param actionLoc - * the {@link Locator} of the {@link Action} + * the {@link Locator} of the {@link Action} to set to warning */ public void toWarning(Locator actionLoc) throws Exception { - this.executionHandler.runAsAgent(ctx -> { + runAsAgent(ctx -> { try (StrolchTransaction tx = openTx(ctx.getCertificate())) { - lockWithRetries(tx); - - if (!refreshActivity(tx)) + if (invalidActionContext(tx, actionLoc)) return; Action action = this.activity.getElementByLocator(actionLoc); // set this action to warning - toWarning(tx, action); + internalToWarning(tx, action); tx.commitOnClose(); } @@ -335,10 +424,18 @@ public class Controller { * * @param tx * the TX - * @param action - * the {@link Action} to set to error + * @param actionLoc + * the {@link Locator} of the {@link Action} to set to error */ - public void toWarning(StrolchTransaction tx, Action action) throws Exception { + public void toWarning(StrolchTransaction tx, Locator actionLoc) throws Exception { + if (invalidActionContext(tx, actionLoc)) + return; + + Action action = this.activity.getElementByLocator(actionLoc); + internalToWarning(tx, action); + } + + protected void internalToWarning(StrolchTransaction tx, Action action) { SetActionToWarningCommand command = new SetActionToWarningCommand(tx); command.setExecutionPolicy(refreshExecutionPolicy(tx, action)); command.setAction(action); @@ -359,8 +456,8 @@ public class Controller { } catch (Exception e) { logger.error("Failed to set " + locator + " to error due to " + e.getMessage(), e); - if (this.container.hasComponent(OperationsLog.class)) { - this.container.getComponent(OperationsLog.class).addMessage( + if (this.agent.hasComponent(OperationsLog.class)) { + this.agent.getComponent(OperationsLog.class).addMessage( new LogMessage(realm, SYSTEM_USER_AGENT, locator, LogSeverity.Exception, LogMessageState.Information, ResourceBundle.getBundle("strolch-service"), "execution.handler.failed.error").withException(e).value("reason", e)); @@ -382,8 +479,8 @@ public class Controller { } catch (Exception e) { logger.error("Failed to set " + locator + " to warning due to " + e.getMessage(), e); - if (this.container.hasComponent(OperationsLog.class)) { - this.container.getComponent(OperationsLog.class).addMessage( + if (this.agent.hasComponent(OperationsLog.class)) { + this.agent.getComponent(OperationsLog.class).addMessage( new LogMessage(realm, SYSTEM_USER_AGENT, locator, LogSeverity.Exception, LogMessageState.Information, ResourceBundle.getBundle("strolch-service"), "execution.handler.failed.warning").withException(e).value("reason", e)); @@ -392,7 +489,10 @@ public class Controller { }); } - private void lockWithRetries(StrolchTransaction tx) throws StrolchLockException { + protected void lockWithRetries(StrolchTransaction tx) throws StrolchLockException { + if (tx.hasLock(this.locator)) + return; + int tries = 0; while (true) { try { @@ -424,4 +524,9 @@ public class Controller { observerEvent.updated.addElement(Tags.CONTROLLER, this.activity); realm.getObserverHandler().notify(observerEvent); } + + @Override + public String toString() { + return "Controller{" + "realm='" + realm + '\'' + ", locator=" + locator + '}'; + } } diff --git a/li.strolch.service/src/main/java/li/strolch/execution/EventBasedExecutionHandler.java b/li.strolch.service/src/main/java/li/strolch/execution/EventBasedExecutionHandler.java index c9fe16987..eb1661344 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/EventBasedExecutionHandler.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/EventBasedExecutionHandler.java @@ -23,7 +23,6 @@ import li.strolch.model.parameter.StringParameter; import li.strolch.persistence.api.StrolchTransaction; import li.strolch.privilege.model.Certificate; import li.strolch.privilege.model.PrivilegeContext; -import li.strolch.runtime.configuration.ComponentConfiguration; import li.strolch.utils.collections.MapOfMaps; /** @@ -34,13 +33,14 @@ import li.strolch.utils.collections.MapOfMaps; */ public class EventBasedExecutionHandler extends ExecutionHandler { + private final MapOfMaps controllers; private Map statesByRealm; - private MapOfMaps controllers; private DelayedExecutionTimer delayedExecutionTimer; public EventBasedExecutionHandler(ComponentContainer container, String componentName) { super(container, componentName); + this.controllers = synchronizedMapOfMaps(new MapOfMaps<>(true)); } @Override @@ -71,17 +71,13 @@ public class EventBasedExecutionHandler extends ExecutionHandler { return activities.keySet(); } - @Override - public void initialize(ComponentConfiguration configuration) throws Exception { - this.controllers = synchronizedMapOfMaps(new MapOfMaps<>(true)); - super.initialize(configuration); + protected Controller newController(String realm, Activity activity) { + return new Controller(realm, this, activity); } @Override public void start() throws Exception { - evaluateStateByRealm(); - this.delayedExecutionTimer = new SimpleDurationExecutionTimer(getContainer().getAgent()); // restart execution of activities already in execution @@ -98,6 +94,11 @@ public class EventBasedExecutionHandler extends ExecutionHandler { @Override public void stop() throws Exception { + // first stop and clear all existing controllers + synchronized (this.controllers) { + this.controllers.keySet().forEach(this::stopControllers); + } + if (this.delayedExecutionTimer != null) { this.delayedExecutionTimer.destroy(); this.delayedExecutionTimer = null; @@ -106,6 +107,24 @@ public class EventBasedExecutionHandler extends ExecutionHandler { super.stop(); } + protected void stopControllers(String realm) { + logger.info("Stopping controllers for realm " + realm + "..."); + synchronized (this.controllers) { + Map map = this.controllers.getMap(realm); + if (map == null) { + logger.error("No controllers for realm " + realm); + return; + } + + map.values().forEach(controller -> { + logger.info("Stopping controller " + controller); + controller.stopExecutions(); + }); + + this.controllers.removeMap(realm); + } + } + @Override public void toExecution(String realm, Activity activity) { ExecutionHandlerState state = this.statesByRealm.getOrDefault(realm, ExecutionHandlerState.Running); @@ -115,7 +134,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler { Controller controller = this.controllers.getElement(realm, activity.getLocator()); if (controller == null) { - controller = new Controller(realm, this, activity); + controller = newController(realm, activity); this.controllers.addElement(realm, activity.getLocator(), controller); notifyObserverAdd(controller); } @@ -162,17 +181,17 @@ public class EventBasedExecutionHandler extends ExecutionHandler { getExecutor().submit(() -> notifyObserverRemove(realm, removed)); } - private void restartActivityExecution(PrivilegeContext ctx) { - // iterate the realms - for (String realmName : getContainer().getRealmNames()) { - reloadActivitiesInExecution(ctx, realmName); - } + protected void restartActivityExecution(PrivilegeContext ctx) { + getContainer().getRealmNames().forEach(realmName -> reloadActivitiesInExecution(ctx, realmName)); } @Override public void reloadActivitiesInExecution(PrivilegeContext ctx, String realmName) { try (StrolchTransaction tx = openTx(realmName, ctx.getCertificate(), false)) { + // first stop and clear all existing controllers + stopControllers(realmName); + // iterate all activities tx.streamActivities().forEach(activity -> { @@ -196,7 +215,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler { tx.update(activity); // register for execution - Controller controller = new Controller(realmName, this, activity); + Controller controller = newController(realmName, activity); this.controllers.addElement(realmName, activity.getLocator(), controller); }); @@ -205,19 +224,18 @@ public class EventBasedExecutionHandler extends ExecutionHandler { } // trigger execution of the registered activities + logger.info("Triggering execution for realm " + realmName + " after reloading activities..."); triggerExecution(realmName); } @Override public void triggerExecution(String realm) { - ExecutionHandlerState state = this.statesByRealm.getOrDefault(realm, ExecutionHandlerState.Running); if (state == ExecutionHandlerState.Paused) { logger.warn("Ignoring trigger for paused realm " + realm); return; } - //noinspection SynchronizeOnNonFinalField synchronized (this.controllers) { Map controllers = this.controllers.getMap(realm); if (controllers != null) @@ -225,78 +243,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler { } } - @Override - public ExecutionHandlerState getState(String realm) { - return this.statesByRealm.getOrDefault(realm, ExecutionHandlerState.Running); - } - - private void evaluateStateByRealm() throws Exception { - - this.statesByRealm = Collections.synchronizedMap(new HashMap<>()); - - runAsAgent(ctx -> getContainer().getRealmNames().forEach(realm -> { - try (StrolchTransaction tx = openTx(realm, ctx.getCertificate(), false)) { - Resource executionHandlerConfig = tx.getResourceBy(TYPE_CONFIGURATION, - ExecutionHandler.class.getSimpleName()); - if (executionHandlerConfig == null) { - this.statesByRealm.put(realm, ExecutionHandlerState.Running); - } else { - ParameterBag parameters = executionHandlerConfig.getParameterBag(BAG_PARAMETERS); - if (parameters == null) { - this.statesByRealm.put(realm, ExecutionHandlerState.Running); - } else { - StringParameter stateP = parameters.getParameter(PARAM_STATE); - if (stateP == null) { - this.statesByRealm.put(realm, ExecutionHandlerState.Running); - } else { - ExecutionHandlerState state; - try { - state = ExecutionHandlerState.valueOf(stateP.getValue()); - } catch (Exception e) { - state = ExecutionHandlerState.Running; - stateP.setValue(ExecutionHandlerState.Running.name()); - tx.update(executionHandlerConfig); - tx.commitOnClose(); - logger.error("Failed to read unhandled state " + stateP.getValue(), e); - } - this.statesByRealm.put(realm, state); - } - } - } - } - })); - } - - @Override - public void setState(Certificate cert, String realm, ExecutionHandlerState state) { - try (StrolchTransaction tx = openTx(realm, cert, false)) { - Resource executionHandlerConfig = tx.getResourceBy(TYPE_CONFIGURATION, - ExecutionHandler.class.getSimpleName()); - if (executionHandlerConfig == null) { - executionHandlerConfig = new Resource(ExecutionHandler.class.getSimpleName(), - "ExecutionHandler Configuration", TYPE_CONFIGURATION); - } - ParameterBag parameters = executionHandlerConfig.getParameterBag(BAG_PARAMETERS); - if (parameters == null) { - parameters = new ParameterBag(BAG_PARAMETERS, "Parameters", TYPE_PARAMETERS); - executionHandlerConfig.addParameterBag(parameters); - } - StringParameter stateP = parameters.getParameter(PARAM_STATE); - if (stateP == null) { - stateP = new StringParameter(PARAM_STATE, "State", state); - parameters.addParameter(stateP); - } - - stateP.setValueE(state); - - tx.addOrUpdate(executionHandlerConfig); - tx.commitOnClose(); - - this.statesByRealm.put(realm, state); - } - } - - private void toExecution(Controller controller) { + protected void toExecution(Controller controller) { String realm = controller.getRealm(); ExecutionHandlerState state = this.statesByRealm.getOrDefault(realm, ExecutionHandlerState.Running); @@ -307,7 +254,15 @@ public class EventBasedExecutionHandler extends ExecutionHandler { getExecutor().execute(() -> { try { - controller.execute(); + + // execute the controller + boolean trigger = controller.execute(); + + if (trigger) { + logger.info("Triggering of controllers for realm " + realm + " after executing " + controller); + triggerExecution(realm); + } + } catch (Exception e) { logger.error("Failed to set " + controller.getLocator() + " to execution", e); @@ -330,6 +285,8 @@ public class EventBasedExecutionHandler extends ExecutionHandler { if (controller != null) controller.toExecuted(locator); + triggerExecution(realm); + } catch (Exception e) { logger.error("Failed to set " + locator + " to executed due to " + e.getMessage(), e); @@ -471,4 +428,66 @@ public class EventBasedExecutionHandler extends ExecutionHandler { public DelayedExecutionTimer getDelayedExecutionTimer() { return this.delayedExecutionTimer; } + + @Override + public ExecutionHandlerState getState(String realm) { + return this.statesByRealm.getOrDefault(realm, ExecutionHandlerState.Running); + } + + @Override + public void setState(Certificate cert, String realm, ExecutionHandlerState state) { + try (StrolchTransaction tx = openTx(realm, cert, false)) { + Resource executionHandlerConfig = tx.getResourceBy(TYPE_CONFIGURATION, + ExecutionHandler.class.getSimpleName()); + if (executionHandlerConfig == null) { + executionHandlerConfig = new Resource(ExecutionHandler.class.getSimpleName(), + "ExecutionHandler Configuration", TYPE_CONFIGURATION); + } + ParameterBag parameters = executionHandlerConfig.getParameterBag(BAG_PARAMETERS); + if (parameters == null) { + parameters = new ParameterBag(BAG_PARAMETERS, "Parameters", TYPE_PARAMETERS); + executionHandlerConfig.addParameterBag(parameters); + } + StringParameter stateP = parameters.getParameter(PARAM_STATE); + if (stateP == null) { + stateP = new StringParameter(PARAM_STATE, "State", state); + parameters.addParameter(stateP); + } + + stateP.setValueE(state); + + tx.addOrUpdate(executionHandlerConfig); + tx.commitOnClose(); + + this.statesByRealm.put(realm, state); + } + } + + private void evaluateStateByRealm() throws Exception { + this.statesByRealm = Collections.synchronizedMap(new HashMap<>()); + + runAsAgent(ctx -> getContainer().getRealmNames().forEach(realm -> { + try (StrolchTransaction tx = openTx(realm, ctx.getCertificate(), false)) { + + Resource config = tx.getResourceBy(TYPE_CONFIGURATION, ExecutionHandler.class.getSimpleName()); + if (config == null) { + this.statesByRealm.put(realm, ExecutionHandlerState.Running); + } else { + String currentState = config.getString(PARAM_STATE); + ExecutionHandlerState state = ExecutionHandlerState.Running; + try { + if (!currentState.isEmpty()) + state = ExecutionHandlerState.valueOf(currentState); + } catch (Exception e) { + config.setString(PARAM_STATE, ExecutionHandlerState.Running); + tx.update(config); + tx.commitOnClose(); + logger.error("Failed to read unhandled state " + currentState, e); + } + + this.statesByRealm.put(realm, state); + } + } + })); + } } diff --git a/li.strolch.service/src/main/java/li/strolch/execution/ExecutionHandler.java b/li.strolch.service/src/main/java/li/strolch/execution/ExecutionHandler.java index d48ce5ab0..4b483900e 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/ExecutionHandler.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/ExecutionHandler.java @@ -14,10 +14,10 @@ import li.strolch.model.activity.Action; import li.strolch.model.activity.Activity; import li.strolch.model.activity.TimeOrdering; import li.strolch.persistence.api.StrolchTransaction; -import li.strolch.privilege.base.PrivilegeException; import li.strolch.privilege.model.Certificate; import li.strolch.privilege.model.PrivilegeContext; import li.strolch.runtime.privilege.PrivilegedRunnable; +import li.strolch.runtime.privilege.PrivilegedRunnableWithResult; /** *

@@ -49,14 +49,21 @@ public abstract class ExecutionHandler extends StrolchComponent { public static final String PARAM_STATE = "state"; + @Override public StrolchTransaction openTx(String realm, Certificate cert, Class action, boolean readOnly) { return super.openTx(realm, cert, action.getName(), readOnly); } - public void runAsAgent(PrivilegedRunnable runnable) throws PrivilegeException, Exception { + @Override + public void runAsAgent(PrivilegedRunnable runnable) throws Exception { super.runAsAgent(runnable); } + @Override + public T runAsAgentWithResult(PrivilegedRunnableWithResult runnable) throws Exception { + return super.runAsAgentWithResult(runnable); + } + public ExecutorService getExecutor() { return getExecutorService("ExecutionHandler"); } diff --git a/li.strolch.service/src/main/java/li/strolch/execution/SimpleDurationExecutionTimer.java b/li.strolch.service/src/main/java/li/strolch/execution/SimpleDurationExecutionTimer.java index 217d07433..7b203cd0e 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/SimpleDurationExecutionTimer.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/SimpleDurationExecutionTimer.java @@ -88,6 +88,7 @@ public class SimpleDurationExecutionTimer implements DelayedExecutionTimer { try { controller.toExecuted(locator); + executionHandler.triggerExecution(realm); } catch (Exception e) { logger.error("Failed to set " + locator + " to executed due to " + e.getMessage(), e); diff --git a/li.strolch.service/src/main/java/li/strolch/execution/command/ExecuteActivityCommand.java b/li.strolch.service/src/main/java/li/strolch/execution/command/ExecuteActivityCommand.java index e133c92ad..005a5a266 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/command/ExecuteActivityCommand.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/command/ExecuteActivityCommand.java @@ -50,6 +50,12 @@ public class ExecuteActivityCommand extends BasePlanningAndExecutionCommand updateOrderState(tx(), activity, currentState, activity.getState()); } + @Override + public Void visitActivity(Activity activity) { + activity.getTimeOrdering().accept(this, activity); + return null; + } + @Override public Void visitAction(Action action) { execute(action); @@ -60,15 +66,22 @@ public class ExecuteActivityCommand extends BasePlanningAndExecutionCommand // first plan if (action.getState().compareTo(State.PLANNED) < 0) { + State currentState = action.getState(); getPlanningPolicy(action).plan(action); + if (action.getState() != State.PLANNED) { + if (currentState != action.getState() && action.isResourceDefined()) + getConfirmationPolicy(action).doConfirmation(action); + logger.info("Action " + action.getLocator() + " was not planned, can thus not executed."); return; } + + // planning is complete, so we can now confirm it + getConfirmationPolicy(action).toPlanned(action); } tx().lock(action.getResourceLocator()); - tx().removeFromCache(action.getResourceLocator()); ConfirmationPolicy confirmationPolicy = getConfirmationPolicy(action); ExecutionPolicy executionPolicy = getExecutionPolicy(action); @@ -165,7 +178,6 @@ public class ExecuteActivityCommand extends BasePlanningAndExecutionCommand @Override public void visitParallel(Activity activity) { - if (activity.getState().compareTo(State.EXECUTED) >= 0) return; @@ -177,16 +189,8 @@ public class ExecuteActivityCommand extends BasePlanningAndExecutionCommand // in parallel we execute all the actions in the activity - boolean canExecute = isExecutable(element); - if (canExecute) { + if (isExecutable(element)) element.accept(this); - } } } - - @Override - public Void visitActivity(Activity activity) { - activity.getTimeOrdering().accept(this, activity); - return null; - } } diff --git a/li.strolch.service/src/main/java/li/strolch/execution/command/PlanActionCommand.java b/li.strolch.service/src/main/java/li/strolch/execution/command/PlanActionCommand.java index 703c8c835..9607cf429 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/command/PlanActionCommand.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/command/PlanActionCommand.java @@ -17,7 +17,6 @@ package li.strolch.execution.command; import static li.strolch.execution.policy.NoPlanning.DEFAULT_PLANNING; -import li.strolch.execution.policy.ConfirmationPolicy; import li.strolch.execution.policy.PlanningPolicy; import li.strolch.model.Resource; import li.strolch.model.State; @@ -70,17 +69,17 @@ public class PlanActionCommand extends PlanningCommand { public Void visitAction(Action action) { if (action.getState() != State.CREATED && action.getState() != State.PLANNING) throw new IllegalStateException("Can not plan action in state " + action.getState()); - ConfirmationPolicy confirmationPolicy = getConfirmationPolicy(action); action.setState(State.PLANNING); - confirmationPolicy.toPlanning(action); + if (action.isResourceDefined()) + getConfirmationPolicy(action).doConfirmation(action); PolicyDef planningPolicyDef = action.findPolicy(PlanningPolicy.class, DEFAULT_PLANNING); PlanningPolicy planningPolicy = tx().getPolicy(PlanningPolicy.class, planningPolicyDef); planningPolicy.plan(action); if (action.getState() == State.PLANNED) - confirmationPolicy.toPlanned(action); + getConfirmationPolicy(action).toPlanned(action); return null; } diff --git a/li.strolch.service/src/main/java/li/strolch/execution/command/SetActionToPlannedCommand.java b/li.strolch.service/src/main/java/li/strolch/execution/command/SetActionToPlannedCommand.java index 004ebe5dd..dcd7f6c3c 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/command/SetActionToPlannedCommand.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/command/SetActionToPlannedCommand.java @@ -42,9 +42,10 @@ public class SetActionToPlannedCommand extends BasePlanningAndExecutionCommand { Activity rootElement = this.action.getRootElement(); State currentState = rootElement.getState(); - this.action.setState(State.PLANNED); - - getConfirmationPolicy(this.action).toPlanned(this.action); + State currentActionState = this.action.getState(); + getPlanningPolicy(this.action).plan(this.action); + if (currentActionState != this.action.getState() && this.action.isResourceDefined()) + getConfirmationPolicy(this.action).doConfirmation(this.action); updateOrderState(tx(), rootElement, currentState, rootElement.getState()); } diff --git a/li.strolch.service/src/main/java/li/strolch/execution/policy/NoPlanning.java b/li.strolch.service/src/main/java/li/strolch/execution/policy/NoPlanning.java index e40ba9ad5..dd54f9ec6 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/policy/NoPlanning.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/policy/NoPlanning.java @@ -12,12 +12,6 @@ public class NoPlanning extends PlanningPolicy { super(tx); } - @Override - public Resource evaluateAndSetResource(Action action) { - tx().lock(Resource.locatorFor(action.getResourceType(), action.getResourceId())); - return tx().getResourceBy(action.getResourceType(), action.getResourceId()); - } - @Override public void plan(Action action) { DBC.PRE.assertEquals("Can not plan illegal state", State.CREATED, action.getState()); diff --git a/li.strolch.service/src/main/java/li/strolch/execution/policy/PlanningPolicy.java b/li.strolch.service/src/main/java/li/strolch/execution/policy/PlanningPolicy.java index d9c673e67..fe04f519e 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/policy/PlanningPolicy.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/policy/PlanningPolicy.java @@ -8,21 +8,21 @@ import li.strolch.policy.StrolchPolicy; public abstract class PlanningPolicy extends StrolchPolicy { - public static PolicyDef DEFAULT_PLANNING = PolicyDef - .valueOf(PlanningPolicy.class.getSimpleName(), "key:DefaultPlanning"); + public static PolicyDef DEFAULT_PLANNING = PolicyDef.getKeyPolicy(PlanningPolicy.class, "DefaultPlanning"); public PlanningPolicy(StrolchTransaction tx) { super(tx); } - public abstract Resource evaluateAndSetResource(Action action); + public Resource evaluateAndSetResource(Action action) { + if (!action.isResourceDefined()) + return null; + + tx().lock(action.getResourceLocator()); + return tx().getResourceBy(action.getResourceType(), action.getResourceId()); + } public abstract void plan(Action action); public abstract void unplan(Action action); - - @Override - public void undo() { - // do nothing - } } diff --git a/li.strolch.service/src/main/java/li/strolch/execution/policy/ReservationExecution.java b/li.strolch.service/src/main/java/li/strolch/execution/policy/ReservationExecution.java index 3acda5859..278831dab 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/policy/ReservationExecution.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/policy/ReservationExecution.java @@ -33,40 +33,26 @@ public class ReservationExecution extends DurationExecution { @Override public boolean isExecutable(Action action) { - switch (action.getType()) { - case TYPE_RESERVE: - return !isReserved(tx(), action); - case TYPE_RELEASE: - return true; - default: - return super.isExecutable(action); - } + return switch (action.getType()) { + case TYPE_RESERVE -> !isReserved(tx(), action); + case TYPE_RELEASE -> true; + default -> super.isExecutable(action); + }; } @Override public void toExecution(Action action) { switch (action.getType()) { - case TYPE_RESERVE: - case TYPE_RELEASE: - - toExecuted(action); - break; - - default: - super.toExecution(action); + case TYPE_RESERVE, TYPE_RELEASE -> toExecuted(action); + default -> super.toExecution(action); } } @Override public void toExecuted(Action action) { switch (action.getType()) { - - case TYPE_RESERVE: - setReservation(tx(), action, true); - break; - case TYPE_RELEASE: - setReservation(tx(), action, false); - break; + case TYPE_RESERVE -> setReservation(tx(), action, true); + case TYPE_RELEASE -> setReservation(tx(), action, false); } super.toExecuted(action); diff --git a/li.strolch.service/src/main/java/li/strolch/execution/policy/SimplePlanning.java b/li.strolch.service/src/main/java/li/strolch/execution/policy/SimplePlanning.java index c7cb6847e..bd8a5db0e 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/policy/SimplePlanning.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/policy/SimplePlanning.java @@ -17,16 +17,6 @@ public class SimplePlanning extends PlanningPolicy { super(tx); } - @Override - public Resource evaluateAndSetResource(Action action) { - if (action.hasResourceDefined()) { - tx().lock(action.getResourceLocator()); - return tx().getResourceBy(action.getResourceType(), action.getResourceId()); - } - - return null; - } - /** * Command to plan an {@link Action} to a {@link Resource}. It is assumed that the {@link IValueChange} objects of * the action are already constructed. The resource is evaluated using {@link #evaluateAndSetResource(Action)} diff --git a/li.strolch.service/src/main/java/li/strolch/execution/service/ExecuteActionService.java b/li.strolch.service/src/main/java/li/strolch/execution/service/ExecuteActionService.java index 0642665bc..1ca89ffda 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/service/ExecuteActionService.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/service/ExecuteActionService.java @@ -1,5 +1,6 @@ package li.strolch.execution.service; +import li.strolch.execution.Controller; import li.strolch.execution.ExecutionHandler; import li.strolch.model.State; import li.strolch.model.activity.Action; @@ -24,33 +25,20 @@ public class ExecuteActionService extends AbstractService { ExecutionHandler executionHandler = getContainer().getComponent(ExecutionHandler.class); executionHandler.setState(getCertificate(), realm, ExecutionHandlerState.Running); executionHandler.triggerExecution(realm); - - break; } - case "HaltNew": { - + case "HaltNew" -> { ExecutionHandler executionHandler = getContainer().getComponent(ExecutionHandler.class); executionHandler.setState(getCertificate(), realm, ExecutionHandlerState.HaltNew); - - break; } - case "Paused": { - + case "Paused" -> { ExecutionHandler executionHandler = getContainer().getComponent(ExecutionHandler.class); executionHandler.setState(getCertificate(), realm, ExecutionHandlerState.Paused); - - break; } - case "Trigger": { - + case "Trigger" -> { ExecutionHandler executionHandler = getContainer().getComponent(ExecutionHandler.class); executionHandler.triggerExecution(realm); - - break; } - case "ReloadActivities": { - + case "ReloadActivities" -> { ExecutionHandler executionHandler = getContainer().getComponent(ExecutionHandler.class); executionHandler.reloadActivitiesInExecution(getPrivilegeContext(), realm); - - break; } - - default: - throw new UnsupportedOperationException("Unhandled state " + state); + default -> throw new UnsupportedOperationException("Unhandled state " + state); } return ServiceResult.success();