[Major] Major rewrite of Controller/Activity execution

Controller now never triggers execution on the ExecutionHandler, but calls itself as often as necessary to keep execution of the same Activity in the same thread run. This leads to a more predictable sequence of executing of actions.

Further it is now possible to override the Controller's instantiated, to hook into the trigger of further execution of an Activity.
This commit is contained in:
Robert von Burg 2022-01-19 19:48:43 +01:00
parent a2afd18834
commit b29d4703b3
20 changed files with 385 additions and 308 deletions

View File

@ -7,9 +7,10 @@ import static li.strolch.runtime.StrolchConstants.SYSTEM_USER_AGENT;
import java.util.HashMap;
import java.util.Map;
import java.util.ResourceBundle;
import java.util.Set;
import li.strolch.agent.api.ComponentContainer;
import li.strolch.agent.api.ObserverEvent;
import li.strolch.agent.api.StrolchAgent;
import li.strolch.agent.api.StrolchLockException;
import li.strolch.agent.api.StrolchRealm;
import li.strolch.execution.command.*;
@ -27,29 +28,30 @@ import li.strolch.model.log.LogSeverity;
import li.strolch.persistence.api.StrolchTransaction;
import li.strolch.privilege.model.Certificate;
import li.strolch.runtime.privilege.PrivilegedRunnable;
import li.strolch.runtime.privilege.PrivilegedRunnableWithResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Controller {
private static final Logger logger = LoggerFactory.getLogger(Controller.class);
protected static final Logger logger = LoggerFactory.getLogger(Controller.class);
private final int lockRetries;
private final String realm;
private final ComponentContainer container;
private final ExecutionHandler executionHandler;
protected final int lockRetries;
protected final String realm;
protected final StrolchAgent agent;
protected final ExecutionHandler executionHandler;
private final String activityType;
private final String activityId;
private final Locator locator;
private Activity activity;
protected final String activityType;
protected final String activityId;
protected final Locator locator;
private final Map<Locator, ExecutionPolicy> inExecution;
protected Activity activity;
public Controller(String realm, ExecutionHandler executionHandler, Activity activity) {
this.realm = realm;
this.container = executionHandler.getContainer();
this.agent = executionHandler.getAgent();
this.executionHandler = executionHandler;
this.locator = activity.getLocator();
this.activityType = activity.getType();
@ -76,6 +78,14 @@ public class Controller {
return this.activity;
}
public Set<Locator> getInExecutionActionLocators() {
return this.inExecution.keySet();
}
public ExecutionPolicy getExecutionPolicy(Locator actionLoc) {
return this.inExecution.get(actionLoc);
}
public ExecutionPolicy refreshExecutionPolicy(StrolchTransaction tx, Action action) {
ExecutionPolicy executionPolicy = this.inExecution.computeIfAbsent(action.getLocator(), e -> {
Resource resource = tx.getResourceFor(action, true);
@ -97,7 +107,11 @@ public class Controller {
}
protected void runAsAgent(PrivilegedRunnable runnable) throws Exception {
this.executionHandler.runAsAgent(runnable);
this.agent.runAsAgent(runnable);
}
protected <T> T runAsAgentWithResult(PrivilegedRunnableWithResult<T> runnable) throws Exception {
return this.agent.runAsAgentWithResult(runnable);
}
@SuppressWarnings("BooleanMethodIsAlwaysInverted")
@ -114,37 +128,69 @@ public class Controller {
}
/**
* Starts the execution of this {@link Activity}
* Stops the execution of all actions of this controller
*/
public void execute() throws Exception {
boolean[] trigger = new boolean[1];
this.executionHandler.runAsAgent(ctx -> {
try (StrolchTransaction tx = openTx(ctx.getCertificate())) {
lockWithRetries(tx);
trigger[0] = execute(tx);
if (tx.needsCommit()) {
tx.commitOnClose();
}
}
});
if (trigger[0])
this.executionHandler.triggerExecution(this.realm);
}
/**
* Stops the execution of all actions
*/
public void stop() {
public void stopExecutions() {
synchronized (this.inExecution) {
this.inExecution.values().forEach(ExecutionPolicy::stop);
}
}
private boolean execute(StrolchTransaction tx) {
if (!refreshActivity(tx))
return false;
/**
* Executes {@link Action Actions} for {@link Activity} of {@link Controller#getLocator()}. Keeps executing till no {@link Action} was set to {@link State#EXECUTED}
*/
public boolean execute() throws Exception {
return runAsAgentWithResult(ctx -> {
try (StrolchTransaction tx = openTx(ctx.getCertificate())) {
lockWithRetries(tx);
if (!refreshActivity(tx))
return false;
boolean trigger = internalExecute(tx);
// we trigger execution for the same activity if the controller says it is needed
if (trigger) {
logger.info("Triggering additional execution of controller " + this + " after execution.");
triggerExecute(tx);
}
if (tx.needsCommit())
tx.commitOnClose();
return trigger;
}
});
}
/**
* Executes the activity in the given TX. Keeps executing till no {@link Action} was set to {@link State#EXECUTED}
*
* @param tx
* the TX
*/
public void execute(StrolchTransaction tx) {
lockWithRetries(tx);
if (!refreshActivity(tx))
return;
boolean trigger = internalExecute(tx);
// we trigger execution for the same activity if the controller says it is needed
if (trigger) {
logger.info("Triggering additional execution of controller " + this + " after execution.");
triggerExecute(tx);
}
}
/**
* Executes the activity in the given TX by calling the {@link ExecuteActivityCommand}
*
* @param tx
* the TX
*
* @return true if execute should be called again, i.e. the {@link ExecuteActivityCommand#needsRetriggerOfExecution()} returns true and the activity isn't complete yet
*/
protected boolean internalExecute(StrolchTransaction tx) {
if (this.activity.getState().isExecuted()) {
this.executionHandler.removeFromExecution(this);
logger.info("Archiving executed activity " + this.locator + " with state " + this.activity.getState());
@ -166,78 +212,111 @@ public class Controller {
updateObservers();
return command.needsRetriggerOfExecution();
return command.needsRetriggerOfExecution() && !this.activity.inClosedPhase();
}
/**
* Completes the execution of the given {@link Action} with the given {@link Locator}
* Completes the execution of the given {@link Action} with the given {@link Locator}, executing next {@link Action Actions} if possible
*
* @param actionLoc
* the {@link Locator} of the {@link Action}
* the {@link Locator} of the {@link Action} to set to executed
*/
public void toExecuted(Locator actionLoc) throws Exception {
this.executionHandler.runAsAgent(ctx -> {
runAsAgent(ctx -> {
try (StrolchTransaction tx = openTx(ctx.getCertificate())) {
lockWithRetries(tx);
if (!refreshActivity(tx))
if (invalidActionContext(tx, actionLoc))
return;
Action action = this.activity.getElementByLocator(actionLoc);
// set this action to executed
toExecuted(tx, action);
updateObservers();
// flush so we can see the changes performed
tx.flush();
setToExecuted(tx, action);
// now try and execute the next action(s)
execute(tx);
triggerExecute(tx);
if (tx.needsCommit())
tx.commitOnClose();
}
});
this.executionHandler.triggerExecution(this.realm);
}
/**
* Completes the execution of the given {@link Action}
* Completes the execution of the given {@link Action}. No further processing is done.
*
* @param tx
* the TX
* @param actionLoc
* the {@link Locator} of the {@link Action} to set to executed
*/
public void toExecuted(StrolchTransaction tx, Locator actionLoc) throws Exception {
if (invalidActionContext(tx, actionLoc))
return;
Action action = this.activity.getElementByLocator(actionLoc);
setToExecuted(tx, action);
}
private boolean invalidActionContext(StrolchTransaction tx, Locator actionLoc) {
lockWithRetries(tx);
if (!this.inExecution.containsKey(actionLoc))
throw new IllegalStateException(actionLoc + " is not in execution!");
return !refreshActivity(tx);
}
/**
* <p>Simply calls the {@link SetActionToExecutedCommand} and then updates the observers</p>
*
* <p><b>Note:</b> Usually you will want to call {@link #toExecuted(Locator)} or
* {@link #toExecuted(StrolchTransaction, Locator)}. This method expects the associated {@link Activity} to
* already be locked, and validated that this action is in execution</p>
*
* @param tx
* the TX
* @param action
* the {@link Action} to set to executed
* the Action to set to executed
*/
public void toExecuted(StrolchTransaction tx, Action action) throws Exception {
public void setToExecuted(StrolchTransaction tx, Action action) {
// set this action to executed
SetActionToExecutedCommand command = new SetActionToExecutedCommand(tx);
command.setExecutionPolicy(refreshExecutionPolicy(tx, action));
command.setAction(action);
command.validate();
command.doCommand();
updateObservers();
}
/**
* <p>Keeps triggering till {@link #internalExecute(StrolchTransaction)} returns false.</p>
*
* <p>This occurs when the {@link Action} which is executed, has state set to {@link State#EXECUTED} instead of
* {@link State#EXECUTION}. Thus the execution thread stays with this activity, keeping resources bound to it,
* till we can wait and allow other activities to execute</p>
*
* @param tx
* the TX
*/
protected void triggerExecute(StrolchTransaction tx) {
boolean trigger;
do {
trigger = internalExecute(tx);
} while (trigger);
}
/**
* Sets the state of the {@link Action} with the given {@link Locator} to {@link State#STOPPED}
*
* @param actionLoc
* the {@link Locator} of the {@link Action}
* the {@link Locator} of the {@link Action} to set to stopped
*/
public void toStopped(Locator actionLoc) throws Exception {
this.executionHandler.runAsAgent(ctx -> {
runAsAgent(ctx -> {
try (StrolchTransaction tx = openTx(ctx.getCertificate())) {
lockWithRetries(tx);
if (!refreshActivity(tx))
if (invalidActionContext(tx, actionLoc))
return;
Action action = this.activity.getElementByLocator(actionLoc);
// set this action to executed
toStopped(tx, action);
internalToStopped(tx, action);
tx.commitOnClose();
}
@ -251,10 +330,18 @@ public class Controller {
*
* @param tx
* the TX
* @param action
* the {@link Action} to set to stopped
* @param actionLoc
* the {@link Locator} of the {@link Action} to set to stopped
*/
public void toStopped(StrolchTransaction tx, Action action) throws Exception {
public void toStopped(StrolchTransaction tx, Locator actionLoc) throws Exception {
if (invalidActionContext(tx, actionLoc))
return;
Action action = this.activity.getElementByLocator(actionLoc);
internalToStopped(tx, action);
}
protected void internalToStopped(StrolchTransaction tx, Action action) {
SetActionToStoppedCommand command = new SetActionToStoppedCommand(tx);
command.setExecutionPolicy(refreshExecutionPolicy(tx, action));
command.setAction(action);
@ -266,20 +353,16 @@ public class Controller {
* Sets the state of the {@link Action} with the given {@link Locator} to {@link State#ERROR}
*
* @param actionLoc
* the {@link Locator} of the {@link Action}
* the {@link Locator} of the {@link Action} to set to executed
*/
public void toError(Locator actionLoc) throws Exception {
this.executionHandler.runAsAgent(ctx -> {
runAsAgent(ctx -> {
try (StrolchTransaction tx = openTx(ctx.getCertificate())) {
lockWithRetries(tx);
if (!refreshActivity(tx))
if (invalidActionContext(tx, actionLoc))
return;
Action action = this.activity.getElementByLocator(actionLoc);
// set this action to error
toError(tx, action);
internalToError(tx, action);
tx.commitOnClose();
}
@ -293,10 +376,18 @@ public class Controller {
*
* @param tx
* the TX
* @param action
* the {@link Action} to set to error
* @param actionLoc
* the {@link Locator} of the {@link Action} to set to error
*/
public void toError(StrolchTransaction tx, Action action) throws Exception {
public void toError(StrolchTransaction tx, Locator actionLoc) throws Exception {
if (invalidActionContext(tx, actionLoc))
return;
Action action = this.activity.getElementByLocator(actionLoc);
internalToError(tx, action);
}
protected void internalToError(StrolchTransaction tx, Action action) {
SetActionToErrorCommand command = new SetActionToErrorCommand(tx);
command.setExecutionPolicy(refreshExecutionPolicy(tx, action));
command.setAction(action);
@ -308,20 +399,18 @@ public class Controller {
* Sets the state of the {@link Action} with the given {@link Locator} to {@link State#WARNING}
*
* @param actionLoc
* the {@link Locator} of the {@link Action}
* the {@link Locator} of the {@link Action} to set to warning
*/
public void toWarning(Locator actionLoc) throws Exception {
this.executionHandler.runAsAgent(ctx -> {
runAsAgent(ctx -> {
try (StrolchTransaction tx = openTx(ctx.getCertificate())) {
lockWithRetries(tx);
if (!refreshActivity(tx))
if (invalidActionContext(tx, actionLoc))
return;
Action action = this.activity.getElementByLocator(actionLoc);
// set this action to warning
toWarning(tx, action);
internalToWarning(tx, action);
tx.commitOnClose();
}
@ -335,10 +424,18 @@ public class Controller {
*
* @param tx
* the TX
* @param action
* the {@link Action} to set to error
* @param actionLoc
* the {@link Locator} of the {@link Action} to set to error
*/
public void toWarning(StrolchTransaction tx, Action action) throws Exception {
public void toWarning(StrolchTransaction tx, Locator actionLoc) throws Exception {
if (invalidActionContext(tx, actionLoc))
return;
Action action = this.activity.getElementByLocator(actionLoc);
internalToWarning(tx, action);
}
protected void internalToWarning(StrolchTransaction tx, Action action) {
SetActionToWarningCommand command = new SetActionToWarningCommand(tx);
command.setExecutionPolicy(refreshExecutionPolicy(tx, action));
command.setAction(action);
@ -359,8 +456,8 @@ public class Controller {
} catch (Exception e) {
logger.error("Failed to set " + locator + " to error due to " + e.getMessage(), e);
if (this.container.hasComponent(OperationsLog.class)) {
this.container.getComponent(OperationsLog.class).addMessage(
if (this.agent.hasComponent(OperationsLog.class)) {
this.agent.getComponent(OperationsLog.class).addMessage(
new LogMessage(realm, SYSTEM_USER_AGENT, locator, LogSeverity.Exception,
LogMessageState.Information, ResourceBundle.getBundle("strolch-service"),
"execution.handler.failed.error").withException(e).value("reason", e));
@ -382,8 +479,8 @@ public class Controller {
} catch (Exception e) {
logger.error("Failed to set " + locator + " to warning due to " + e.getMessage(), e);
if (this.container.hasComponent(OperationsLog.class)) {
this.container.getComponent(OperationsLog.class).addMessage(
if (this.agent.hasComponent(OperationsLog.class)) {
this.agent.getComponent(OperationsLog.class).addMessage(
new LogMessage(realm, SYSTEM_USER_AGENT, locator, LogSeverity.Exception,
LogMessageState.Information, ResourceBundle.getBundle("strolch-service"),
"execution.handler.failed.warning").withException(e).value("reason", e));
@ -392,7 +489,10 @@ public class Controller {
});
}
private void lockWithRetries(StrolchTransaction tx) throws StrolchLockException {
protected void lockWithRetries(StrolchTransaction tx) throws StrolchLockException {
if (tx.hasLock(this.locator))
return;
int tries = 0;
while (true) {
try {
@ -424,4 +524,9 @@ public class Controller {
observerEvent.updated.addElement(Tags.CONTROLLER, this.activity);
realm.getObserverHandler().notify(observerEvent);
}
@Override
public String toString() {
return "Controller{" + "realm='" + realm + '\'' + ", locator=" + locator + '}';
}
}

View File

@ -23,7 +23,6 @@ import li.strolch.model.parameter.StringParameter;
import li.strolch.persistence.api.StrolchTransaction;
import li.strolch.privilege.model.Certificate;
import li.strolch.privilege.model.PrivilegeContext;
import li.strolch.runtime.configuration.ComponentConfiguration;
import li.strolch.utils.collections.MapOfMaps;
/**
@ -34,13 +33,14 @@ import li.strolch.utils.collections.MapOfMaps;
*/
public class EventBasedExecutionHandler extends ExecutionHandler {
private final MapOfMaps<String, Locator, Controller> controllers;
private Map<String, ExecutionHandlerState> statesByRealm;
private MapOfMaps<String, Locator, Controller> controllers;
private DelayedExecutionTimer delayedExecutionTimer;
public EventBasedExecutionHandler(ComponentContainer container, String componentName) {
super(container, componentName);
this.controllers = synchronizedMapOfMaps(new MapOfMaps<>(true));
}
@Override
@ -71,17 +71,13 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
return activities.keySet();
}
@Override
public void initialize(ComponentConfiguration configuration) throws Exception {
this.controllers = synchronizedMapOfMaps(new MapOfMaps<>(true));
super.initialize(configuration);
protected Controller newController(String realm, Activity activity) {
return new Controller(realm, this, activity);
}
@Override
public void start() throws Exception {
evaluateStateByRealm();
this.delayedExecutionTimer = new SimpleDurationExecutionTimer(getContainer().getAgent());
// restart execution of activities already in execution
@ -98,6 +94,11 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
@Override
public void stop() throws Exception {
// first stop and clear all existing controllers
synchronized (this.controllers) {
this.controllers.keySet().forEach(this::stopControllers);
}
if (this.delayedExecutionTimer != null) {
this.delayedExecutionTimer.destroy();
this.delayedExecutionTimer = null;
@ -106,6 +107,24 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
super.stop();
}
protected void stopControllers(String realm) {
logger.info("Stopping controllers for realm " + realm + "...");
synchronized (this.controllers) {
Map<Locator, Controller> map = this.controllers.getMap(realm);
if (map == null) {
logger.error("No controllers for realm " + realm);
return;
}
map.values().forEach(controller -> {
logger.info("Stopping controller " + controller);
controller.stopExecutions();
});
this.controllers.removeMap(realm);
}
}
@Override
public void toExecution(String realm, Activity activity) {
ExecutionHandlerState state = this.statesByRealm.getOrDefault(realm, ExecutionHandlerState.Running);
@ -115,7 +134,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
Controller controller = this.controllers.getElement(realm, activity.getLocator());
if (controller == null) {
controller = new Controller(realm, this, activity);
controller = newController(realm, activity);
this.controllers.addElement(realm, activity.getLocator(), controller);
notifyObserverAdd(controller);
}
@ -162,17 +181,17 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
getExecutor().submit(() -> notifyObserverRemove(realm, removed));
}
private void restartActivityExecution(PrivilegeContext ctx) {
// iterate the realms
for (String realmName : getContainer().getRealmNames()) {
reloadActivitiesInExecution(ctx, realmName);
}
protected void restartActivityExecution(PrivilegeContext ctx) {
getContainer().getRealmNames().forEach(realmName -> reloadActivitiesInExecution(ctx, realmName));
}
@Override
public void reloadActivitiesInExecution(PrivilegeContext ctx, String realmName) {
try (StrolchTransaction tx = openTx(realmName, ctx.getCertificate(), false)) {
// first stop and clear all existing controllers
stopControllers(realmName);
// iterate all activities
tx.streamActivities().forEach(activity -> {
@ -196,7 +215,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
tx.update(activity);
// register for execution
Controller controller = new Controller(realmName, this, activity);
Controller controller = newController(realmName, activity);
this.controllers.addElement(realmName, activity.getLocator(), controller);
});
@ -205,19 +224,18 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
}
// trigger execution of the registered activities
logger.info("Triggering execution for realm " + realmName + " after reloading activities...");
triggerExecution(realmName);
}
@Override
public void triggerExecution(String realm) {
ExecutionHandlerState state = this.statesByRealm.getOrDefault(realm, ExecutionHandlerState.Running);
if (state == ExecutionHandlerState.Paused) {
logger.warn("Ignoring trigger for paused realm " + realm);
return;
}
//noinspection SynchronizeOnNonFinalField
synchronized (this.controllers) {
Map<Locator, Controller> controllers = this.controllers.getMap(realm);
if (controllers != null)
@ -225,78 +243,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
}
}
@Override
public ExecutionHandlerState getState(String realm) {
return this.statesByRealm.getOrDefault(realm, ExecutionHandlerState.Running);
}
private void evaluateStateByRealm() throws Exception {
this.statesByRealm = Collections.synchronizedMap(new HashMap<>());
runAsAgent(ctx -> getContainer().getRealmNames().forEach(realm -> {
try (StrolchTransaction tx = openTx(realm, ctx.getCertificate(), false)) {
Resource executionHandlerConfig = tx.getResourceBy(TYPE_CONFIGURATION,
ExecutionHandler.class.getSimpleName());
if (executionHandlerConfig == null) {
this.statesByRealm.put(realm, ExecutionHandlerState.Running);
} else {
ParameterBag parameters = executionHandlerConfig.getParameterBag(BAG_PARAMETERS);
if (parameters == null) {
this.statesByRealm.put(realm, ExecutionHandlerState.Running);
} else {
StringParameter stateP = parameters.getParameter(PARAM_STATE);
if (stateP == null) {
this.statesByRealm.put(realm, ExecutionHandlerState.Running);
} else {
ExecutionHandlerState state;
try {
state = ExecutionHandlerState.valueOf(stateP.getValue());
} catch (Exception e) {
state = ExecutionHandlerState.Running;
stateP.setValue(ExecutionHandlerState.Running.name());
tx.update(executionHandlerConfig);
tx.commitOnClose();
logger.error("Failed to read unhandled state " + stateP.getValue(), e);
}
this.statesByRealm.put(realm, state);
}
}
}
}
}));
}
@Override
public void setState(Certificate cert, String realm, ExecutionHandlerState state) {
try (StrolchTransaction tx = openTx(realm, cert, false)) {
Resource executionHandlerConfig = tx.getResourceBy(TYPE_CONFIGURATION,
ExecutionHandler.class.getSimpleName());
if (executionHandlerConfig == null) {
executionHandlerConfig = new Resource(ExecutionHandler.class.getSimpleName(),
"ExecutionHandler Configuration", TYPE_CONFIGURATION);
}
ParameterBag parameters = executionHandlerConfig.getParameterBag(BAG_PARAMETERS);
if (parameters == null) {
parameters = new ParameterBag(BAG_PARAMETERS, "Parameters", TYPE_PARAMETERS);
executionHandlerConfig.addParameterBag(parameters);
}
StringParameter stateP = parameters.getParameter(PARAM_STATE);
if (stateP == null) {
stateP = new StringParameter(PARAM_STATE, "State", state);
parameters.addParameter(stateP);
}
stateP.setValueE(state);
tx.addOrUpdate(executionHandlerConfig);
tx.commitOnClose();
this.statesByRealm.put(realm, state);
}
}
private void toExecution(Controller controller) {
protected void toExecution(Controller controller) {
String realm = controller.getRealm();
ExecutionHandlerState state = this.statesByRealm.getOrDefault(realm, ExecutionHandlerState.Running);
@ -307,7 +254,15 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
getExecutor().execute(() -> {
try {
controller.execute();
// execute the controller
boolean trigger = controller.execute();
if (trigger) {
logger.info("Triggering of controllers for realm " + realm + " after executing " + controller);
triggerExecution(realm);
}
} catch (Exception e) {
logger.error("Failed to set " + controller.getLocator() + " to execution", e);
@ -330,6 +285,8 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
if (controller != null)
controller.toExecuted(locator);
triggerExecution(realm);
} catch (Exception e) {
logger.error("Failed to set " + locator + " to executed due to " + e.getMessage(), e);
@ -471,4 +428,66 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
public DelayedExecutionTimer getDelayedExecutionTimer() {
return this.delayedExecutionTimer;
}
@Override
public ExecutionHandlerState getState(String realm) {
return this.statesByRealm.getOrDefault(realm, ExecutionHandlerState.Running);
}
@Override
public void setState(Certificate cert, String realm, ExecutionHandlerState state) {
try (StrolchTransaction tx = openTx(realm, cert, false)) {
Resource executionHandlerConfig = tx.getResourceBy(TYPE_CONFIGURATION,
ExecutionHandler.class.getSimpleName());
if (executionHandlerConfig == null) {
executionHandlerConfig = new Resource(ExecutionHandler.class.getSimpleName(),
"ExecutionHandler Configuration", TYPE_CONFIGURATION);
}
ParameterBag parameters = executionHandlerConfig.getParameterBag(BAG_PARAMETERS);
if (parameters == null) {
parameters = new ParameterBag(BAG_PARAMETERS, "Parameters", TYPE_PARAMETERS);
executionHandlerConfig.addParameterBag(parameters);
}
StringParameter stateP = parameters.getParameter(PARAM_STATE);
if (stateP == null) {
stateP = new StringParameter(PARAM_STATE, "State", state);
parameters.addParameter(stateP);
}
stateP.setValueE(state);
tx.addOrUpdate(executionHandlerConfig);
tx.commitOnClose();
this.statesByRealm.put(realm, state);
}
}
private void evaluateStateByRealm() throws Exception {
this.statesByRealm = Collections.synchronizedMap(new HashMap<>());
runAsAgent(ctx -> getContainer().getRealmNames().forEach(realm -> {
try (StrolchTransaction tx = openTx(realm, ctx.getCertificate(), false)) {
Resource config = tx.getResourceBy(TYPE_CONFIGURATION, ExecutionHandler.class.getSimpleName());
if (config == null) {
this.statesByRealm.put(realm, ExecutionHandlerState.Running);
} else {
String currentState = config.getString(PARAM_STATE);
ExecutionHandlerState state = ExecutionHandlerState.Running;
try {
if (!currentState.isEmpty())
state = ExecutionHandlerState.valueOf(currentState);
} catch (Exception e) {
config.setString(PARAM_STATE, ExecutionHandlerState.Running);
tx.update(config);
tx.commitOnClose();
logger.error("Failed to read unhandled state " + currentState, e);
}
this.statesByRealm.put(realm, state);
}
}
}));
}
}

View File

@ -14,10 +14,10 @@ import li.strolch.model.activity.Action;
import li.strolch.model.activity.Activity;
import li.strolch.model.activity.TimeOrdering;
import li.strolch.persistence.api.StrolchTransaction;
import li.strolch.privilege.base.PrivilegeException;
import li.strolch.privilege.model.Certificate;
import li.strolch.privilege.model.PrivilegeContext;
import li.strolch.runtime.privilege.PrivilegedRunnable;
import li.strolch.runtime.privilege.PrivilegedRunnableWithResult;
/**
* <p>
@ -49,14 +49,21 @@ public abstract class ExecutionHandler extends StrolchComponent {
public static final String PARAM_STATE = "state";
@Override
public StrolchTransaction openTx(String realm, Certificate cert, Class<?> action, boolean readOnly) {
return super.openTx(realm, cert, action.getName(), readOnly);
}
public void runAsAgent(PrivilegedRunnable runnable) throws PrivilegeException, Exception {
@Override
public void runAsAgent(PrivilegedRunnable runnable) throws Exception {
super.runAsAgent(runnable);
}
@Override
public <T> T runAsAgentWithResult(PrivilegedRunnableWithResult<T> runnable) throws Exception {
return super.runAsAgentWithResult(runnable);
}
public ExecutorService getExecutor() {
return getExecutorService("ExecutionHandler");
}

View File

@ -88,6 +88,7 @@ public class SimpleDurationExecutionTimer implements DelayedExecutionTimer {
try {
controller.toExecuted(locator);
executionHandler.triggerExecution(realm);
} catch (Exception e) {
logger.error("Failed to set " + locator + " to executed due to " + e.getMessage(), e);

View File

@ -50,6 +50,12 @@ public class ExecuteActivityCommand extends BasePlanningAndExecutionCommand
updateOrderState(tx(), activity, currentState, activity.getState());
}
@Override
public Void visitActivity(Activity activity) {
activity.getTimeOrdering().accept(this, activity);
return null;
}
@Override
public Void visitAction(Action action) {
execute(action);
@ -60,15 +66,22 @@ public class ExecuteActivityCommand extends BasePlanningAndExecutionCommand
// first plan
if (action.getState().compareTo(State.PLANNED) < 0) {
State currentState = action.getState();
getPlanningPolicy(action).plan(action);
if (action.getState() != State.PLANNED) {
if (currentState != action.getState() && action.isResourceDefined())
getConfirmationPolicy(action).doConfirmation(action);
logger.info("Action " + action.getLocator() + " was not planned, can thus not executed.");
return;
}
// planning is complete, so we can now confirm it
getConfirmationPolicy(action).toPlanned(action);
}
tx().lock(action.getResourceLocator());
tx().removeFromCache(action.getResourceLocator());
ConfirmationPolicy confirmationPolicy = getConfirmationPolicy(action);
ExecutionPolicy executionPolicy = getExecutionPolicy(action);
@ -165,7 +178,6 @@ public class ExecuteActivityCommand extends BasePlanningAndExecutionCommand
@Override
public void visitParallel(Activity activity) {
if (activity.getState().compareTo(State.EXECUTED) >= 0)
return;
@ -177,16 +189,8 @@ public class ExecuteActivityCommand extends BasePlanningAndExecutionCommand
// in parallel we execute all the actions in the activity
boolean canExecute = isExecutable(element);
if (canExecute) {
if (isExecutable(element))
element.accept(this);
}
}
}
@Override
public Void visitActivity(Activity activity) {
activity.getTimeOrdering().accept(this, activity);
return null;
}
}

View File

@ -17,7 +17,6 @@ package li.strolch.execution.command;
import static li.strolch.execution.policy.NoPlanning.DEFAULT_PLANNING;
import li.strolch.execution.policy.ConfirmationPolicy;
import li.strolch.execution.policy.PlanningPolicy;
import li.strolch.model.Resource;
import li.strolch.model.State;
@ -70,17 +69,17 @@ public class PlanActionCommand extends PlanningCommand {
public Void visitAction(Action action) {
if (action.getState() != State.CREATED && action.getState() != State.PLANNING)
throw new IllegalStateException("Can not plan action in state " + action.getState());
ConfirmationPolicy confirmationPolicy = getConfirmationPolicy(action);
action.setState(State.PLANNING);
confirmationPolicy.toPlanning(action);
if (action.isResourceDefined())
getConfirmationPolicy(action).doConfirmation(action);
PolicyDef planningPolicyDef = action.findPolicy(PlanningPolicy.class, DEFAULT_PLANNING);
PlanningPolicy planningPolicy = tx().getPolicy(PlanningPolicy.class, planningPolicyDef);
planningPolicy.plan(action);
if (action.getState() == State.PLANNED)
confirmationPolicy.toPlanned(action);
getConfirmationPolicy(action).toPlanned(action);
return null;
}

View File

@ -42,9 +42,10 @@ public class SetActionToPlannedCommand extends BasePlanningAndExecutionCommand {
Activity rootElement = this.action.getRootElement();
State currentState = rootElement.getState();
this.action.setState(State.PLANNED);
getConfirmationPolicy(this.action).toPlanned(this.action);
State currentActionState = this.action.getState();
getPlanningPolicy(this.action).plan(this.action);
if (currentActionState != this.action.getState() && this.action.isResourceDefined())
getConfirmationPolicy(this.action).doConfirmation(this.action);
updateOrderState(tx(), rootElement, currentState, rootElement.getState());
}

View File

@ -12,12 +12,6 @@ public class NoPlanning extends PlanningPolicy {
super(tx);
}
@Override
public Resource evaluateAndSetResource(Action action) {
tx().lock(Resource.locatorFor(action.getResourceType(), action.getResourceId()));
return tx().getResourceBy(action.getResourceType(), action.getResourceId());
}
@Override
public void plan(Action action) {
DBC.PRE.assertEquals("Can not plan illegal state", State.CREATED, action.getState());

View File

@ -8,21 +8,21 @@ import li.strolch.policy.StrolchPolicy;
public abstract class PlanningPolicy extends StrolchPolicy {
public static PolicyDef DEFAULT_PLANNING = PolicyDef
.valueOf(PlanningPolicy.class.getSimpleName(), "key:DefaultPlanning");
public static PolicyDef DEFAULT_PLANNING = PolicyDef.getKeyPolicy(PlanningPolicy.class, "DefaultPlanning");
public PlanningPolicy(StrolchTransaction tx) {
super(tx);
}
public abstract Resource evaluateAndSetResource(Action action);
public Resource evaluateAndSetResource(Action action) {
if (!action.isResourceDefined())
return null;
tx().lock(action.getResourceLocator());
return tx().getResourceBy(action.getResourceType(), action.getResourceId());
}
public abstract void plan(Action action);
public abstract void unplan(Action action);
@Override
public void undo() {
// do nothing
}
}

View File

@ -33,40 +33,26 @@ public class ReservationExecution extends DurationExecution {
@Override
public boolean isExecutable(Action action) {
switch (action.getType()) {
case TYPE_RESERVE:
return !isReserved(tx(), action);
case TYPE_RELEASE:
return true;
default:
return super.isExecutable(action);
}
return switch (action.getType()) {
case TYPE_RESERVE -> !isReserved(tx(), action);
case TYPE_RELEASE -> true;
default -> super.isExecutable(action);
};
}
@Override
public void toExecution(Action action) {
switch (action.getType()) {
case TYPE_RESERVE:
case TYPE_RELEASE:
toExecuted(action);
break;
default:
super.toExecution(action);
case TYPE_RESERVE, TYPE_RELEASE -> toExecuted(action);
default -> super.toExecution(action);
}
}
@Override
public void toExecuted(Action action) {
switch (action.getType()) {
case TYPE_RESERVE:
setReservation(tx(), action, true);
break;
case TYPE_RELEASE:
setReservation(tx(), action, false);
break;
case TYPE_RESERVE -> setReservation(tx(), action, true);
case TYPE_RELEASE -> setReservation(tx(), action, false);
}
super.toExecuted(action);

View File

@ -17,16 +17,6 @@ public class SimplePlanning extends PlanningPolicy {
super(tx);
}
@Override
public Resource evaluateAndSetResource(Action action) {
if (action.hasResourceDefined()) {
tx().lock(action.getResourceLocator());
return tx().getResourceBy(action.getResourceType(), action.getResourceId());
}
return null;
}
/**
* Command to plan an {@link Action} to a {@link Resource}. It is assumed that the {@link IValueChange} objects of
* the action are already constructed. The resource is evaluated using {@link #evaluateAndSetResource(Action)}

View File

@ -1,5 +1,6 @@
package li.strolch.execution.service;
import li.strolch.execution.Controller;
import li.strolch.execution.ExecutionHandler;
import li.strolch.model.State;
import li.strolch.model.activity.Action;
@ -24,33 +25,20 @@ public class ExecuteActionService extends AbstractService<LocatorArgument, Servi
@Override
protected ServiceResult internalDoService(LocatorArgument arg) throws Exception {
String realm = getArgOrUserRealm(arg);
ExecutionHandler executionHandler = getComponent(ExecutionHandler.class);
String realm;
Activity activity;
try (StrolchTransaction tx = openArgOrUserTx(arg)) {
realm = tx.getRealmName();
tx.lock(arg.locator.trim(3));
if (arg.locator.getSize() == 3) {
activity = tx.findElement(arg.locator);
} else {
Action action = tx.findElement(arg.locator);
// this is so we can re-execute stopped actions
if (action.getState() == State.STOPPED) {
action.setState(State.EXECUTABLE);
tx.update(action.getRootElement());
tx.commitOnClose();
}
activity = action.getRootElement();
}
Controller controller = executionHandler.getController(realm, arg.locator.trim(3));
if (controller != null) {
controller.execute();
return ServiceResult.success();
}
getComponent(ExecutionHandler.class).toExecution(realm, activity);
try (StrolchTransaction tx = openTx(realm, true)) {
tx.lock(arg.locator);
Activity activity = tx.getActivityBy(arg.locator.get(1), arg.locator.get(2), true);
executionHandler.toExecution(realm, activity);
}
return ServiceResult.success();
}

View File

@ -24,8 +24,8 @@ public class SetActionToClosedService extends AbstractService<LocatorArgument, S
protected ServiceResult internalDoService(LocatorArgument arg) throws Exception {
try (StrolchTransaction tx = openArgOrUserTx(arg)) {
tx.lock(arg.locator.trim(3));
Action action = tx.findElement(arg.locator);
tx.lock(action.getResourceLocator());

View File

@ -24,10 +24,10 @@ public class SetActionToCreatedService extends AbstractService<LocatorArgument,
protected ServiceResult internalDoService(LocatorArgument arg) throws Exception {
try (StrolchTransaction tx = openArgOrUserTx(arg)) {
tx.lock(arg.locator.trim(3));
Action action = tx.findElement(arg.locator);
if (action.hasResourceDefined())
if (action.isResourceDefined())
tx.lock(action.getResourceLocator());
SetActionToCreatedCommand command = new SetActionToCreatedCommand(tx);

View File

@ -33,8 +33,8 @@ public class SetActionToErrorService extends AbstractService<LocatorArgument, Se
}
try (StrolchTransaction tx = openArgOrUserTx(arg)) {
tx.lock(arg.locator.trim(3));
Action action = tx.findElement(arg.locator);
tx.lock(action.getResourceLocator());

View File

@ -33,8 +33,8 @@ public class SetActionToExecutedService extends AbstractService<LocatorArgument,
}
try (StrolchTransaction tx = openArgOrUserTx(arg)) {
tx.lock(arg.locator.trim(3));
Action action = tx.findElement(arg.locator);
tx.lock(action.getResourceLocator());

View File

@ -24,11 +24,11 @@ public class SetActionToPlannedService extends AbstractService<LocatorArgument,
protected ServiceResult internalDoService(LocatorArgument arg) throws Exception {
try (StrolchTransaction tx = openArgOrUserTx(arg)) {
tx.lock(arg.locator.trim(3));
Action action = tx.findElement(arg.locator);
if (!action.hasResourceDefined())
return ServiceResult.error("Resource not defined for " + action.getLocator());
if (action.isResourceDefined())
tx.lock(action.getResourceLocator());
SetActionToPlannedCommand command = new SetActionToPlannedCommand(tx);
command.setAction(action);

View File

@ -33,8 +33,8 @@ public class SetActionToStoppedService extends AbstractService<LocatorArgument,
}
try (StrolchTransaction tx = openArgOrUserTx(arg)) {
tx.lock(arg.locator.trim(3));
Action action = tx.findElement(arg.locator);
tx.lock(action.getResourceLocator());

View File

@ -33,8 +33,8 @@ public class SetActionToWarningService extends AbstractService<LocatorArgument,
}
try (StrolchTransaction tx = openArgOrUserTx(arg)) {
tx.lock(arg.locator.trim(3));
Action action = tx.findElement(arg.locator);
tx.lock(action.getResourceLocator());

View File

@ -32,45 +32,28 @@ public class SetExecutionHandlerStateService extends AbstractService<StringMapAr
getPrivilegeContext().validateAction(new SimpleRestrictable(getPrivilegeValue(), state));
switch (state) {
case "Running": {
case "Running" -> {
ExecutionHandler executionHandler = getContainer().getComponent(ExecutionHandler.class);
executionHandler.setState(getCertificate(), realm, ExecutionHandlerState.Running);
executionHandler.triggerExecution(realm);
break;
}
case "HaltNew": {
case "HaltNew" -> {
ExecutionHandler executionHandler = getContainer().getComponent(ExecutionHandler.class);
executionHandler.setState(getCertificate(), realm, ExecutionHandlerState.HaltNew);
break;
}
case "Paused": {
case "Paused" -> {
ExecutionHandler executionHandler = getContainer().getComponent(ExecutionHandler.class);
executionHandler.setState(getCertificate(), realm, ExecutionHandlerState.Paused);
break;
}
case "Trigger": {
case "Trigger" -> {
ExecutionHandler executionHandler = getContainer().getComponent(ExecutionHandler.class);
executionHandler.triggerExecution(realm);
break;
}
case "ReloadActivities": {
case "ReloadActivities" -> {
ExecutionHandler executionHandler = getContainer().getComponent(ExecutionHandler.class);
executionHandler.reloadActivitiesInExecution(getPrivilegeContext(), realm);
break;
}
default:
throw new UnsupportedOperationException("Unhandled state " + state);
default -> throw new UnsupportedOperationException("Unhandled state " + state);
}
return ServiceResult.success();