[Major] Rewrote execution of actions. Added atomicParallelExecution

This commit is contained in:
Robert von Burg 2023-02-24 14:18:27 +01:00
parent 5f80c4a344
commit 378390d5a1
Signed by: eitch
GPG Key ID: 75DB9C85C74331F7
6 changed files with 97 additions and 65 deletions

View File

@ -120,5 +120,6 @@ public class StrolchModelConstants {
public static final String PARAM_ACTIVITY = "activity";
public static final String PARAM_JOB_COUNT_SEMAPHORE = "jobCountSemaphore";
public static final String PARAM_JOB_COUNT_SEMAPHORE_TYPES = "jobCountSemaphoreTypes";
public static final String PARAM_ATOMIC_PARALLEL_EXECUTION = "atomicParallelExecution";
}
}

View File

@ -183,13 +183,14 @@ public class Controller {
}
/**
* Executes the activity in the given TX by calling the {@link ExecuteActivityCommand}
* Executes the activity in the given TX by calling the {@link PlanAndExecuteActivityCommand}
*
* @param tx
* the TX
*
* @return true if execute should be called again, i.e. the
* {@link ExecuteActivityCommand#needsRetriggerOfExecution()} returns true and the activity isn't complete yet
* {@link PlanAndExecuteActivityCommand#needsRetriggerOfExecution()} returns true and the activity isn't complete
* yet
*/
protected boolean internalExecute(StrolchTransaction tx) {
if (this.activity.getState().isExecuted()) {
@ -205,7 +206,7 @@ public class Controller {
return false;
}
ExecuteActivityCommand command = new ExecuteActivityCommand(tx);
PlanAndExecuteActivityCommand command = new PlanAndExecuteActivityCommand(tx);
command.setController(this);
command.validate();
command.doCommand();

View File

@ -34,7 +34,7 @@ import li.strolch.utils.dbc.DBC;
* objects of the action are already constructed and {@link Action#getResourceId()} is set.
*
* <br>
*
* <p>
* It iterates the {@link IValueChange} operators and registers the resulting changes on the {@link StrolchTimedState}
* objects assigned to the {@link Resource}.
*
@ -67,11 +67,15 @@ public class PlanActivityCommand extends PlanningCommand {
@Override
public Void visitAction(Action action) {
State currentState = action.getState();
if (currentState.compareTo(State.PLANNED) >= 0)
return null;
PolicyDef planningPolicyDef = action.findPolicy(PlanningPolicy.class, DEFAULT_PLANNING);
PlanningPolicy planningPolicy = tx().getPolicy(PlanningPolicy.class, planningPolicyDef);
planningPolicy.plan(action);
if (action.getState() == State.PLANNED)
getConfirmationPolicy(action).toPlanned(action);
if (action.getState() != currentState)
getConfirmationPolicy(action).doConfirmation(action);
return null;
}
}

View File

@ -1,5 +1,7 @@
package li.strolch.execution.command;
import static li.strolch.model.StrolchModelConstants.PolicyConstants.PARAM_ATOMIC_PARALLEL_EXECUTION;
import java.util.Iterator;
import java.util.Map.Entry;
@ -11,17 +13,18 @@ import li.strolch.model.activity.Action;
import li.strolch.model.activity.Activity;
import li.strolch.model.activity.IActivityElement;
import li.strolch.model.activity.TimeOrderingVisitor;
import li.strolch.model.parameter.BooleanParameter;
import li.strolch.model.visitor.IActivityElementVisitor;
import li.strolch.persistence.api.StrolchTransaction;
import li.strolch.utils.dbc.DBC;
public class ExecuteActivityCommand extends BasePlanningAndExecutionCommand
public class PlanAndExecuteActivityCommand extends BasePlanningAndExecutionCommand
implements TimeOrderingVisitor, IActivityElementVisitor<Void> {
private Controller controller;
private boolean needsRetriggerOfExecution;
public ExecuteActivityCommand(StrolchTransaction tx) {
public PlanAndExecuteActivityCommand(StrolchTransaction tx) {
super(tx);
}
@ -58,28 +61,25 @@ public class ExecuteActivityCommand extends BasePlanningAndExecutionCommand
@Override
public Void visitAction(Action action) {
execute(action);
planAndExecute(action);
return null;
}
public void execute(Action action) {
private void planAndExecute(Action action) {
if (planAction(action))
executeAction(action);
}
// first plan
if (action.getState().compareTo(State.PLANNED) < 0) {
State currentState = action.getState();
getPlanningPolicy(action).plan(action);
private void executeAction(Action action) {
State currentState = action.getState();
if (currentState.compareTo(State.EXECUTED) >= 0)
return;
if (currentState == State.CREATED || currentState == State.PLANNING)
throw new IllegalStateException("Action " + action.getLocator() + " is in illegal state " + currentState);
if (action.getState() != State.PLANNED) {
if (currentState != action.getState() && action.isResourceDefined()) {
getConfirmationPolicy(action).doConfirmation(action);
}
logger.info("Action " + action.getLocator() + " was not planned, can thus not executed.");
return;
}
// planning is complete, so we can now confirm it
getConfirmationPolicy(action).toPlanned(action);
if (!currentState.canSetToExecution()) {
logger.warn("Action " + action.getLocator() + " can not be executed with state " + currentState);
return;
}
tx().lock(action.getResourceLocator());
@ -88,7 +88,7 @@ public class ExecuteActivityCommand extends BasePlanningAndExecutionCommand
ExecutionPolicy executionPolicy = getExecutionPolicy(action);
// we catch all exceptions because we can't undo, thus need to set the state to ERROR in this case
// this is only required because we execute actions in same TX as we set to executed any previous actions
// this is only required because we execute actions in same TX as we set to-executed any previous actions
try {
if (executionPolicy.isStopped()) {
@ -126,72 +126,94 @@ public class ExecuteActivityCommand extends BasePlanningAndExecutionCommand
}
}
protected boolean isExecutable(IActivityElement element) {
State state = element.getState();
if (state.compareTo(State.EXECUTED) >= 0)
return false;
private boolean planActivity(Activity activity) {
PlanActivityCommand planning = new PlanActivityCommand(tx());
planning.setActivity(activity);
planning.validateAndDoCommand();
return activity.getState().compareTo(State.PLANNED) >= 0;
}
if (element instanceof Activity)
private void unplanActivity(Activity activity) {
UnplanActivityCommand planning = new UnplanActivityCommand(tx());
planning.setActivity(activity);
planning.validateAndDoCommand();
}
private boolean planAction(Action action) {
State currentState = action.getState();
if (currentState.compareTo(State.PLANNED) >= 0)
return true;
// not yet in execution
if (state.compareTo(State.EXECUTION) < 0)
getPlanningPolicy(action).plan(action);
if (action.getState().compareTo(State.PLANNED) >= 0) {
// planning is complete, so we can now confirm it
getConfirmationPolicy(action).toPlanned(action);
return true;
}
// in stopped, means we can re-execute
if (state == State.STOPPED)
return true;
// if in ERROR, then must first be handled
if (currentState != action.getState() && action.isResourceDefined())
getConfirmationPolicy(action).doConfirmation(action);
logger.info("Failed to plan action " + action.getLocator() + ", thus no execution possible");
return false;
}
@Override
public void visitSeries(Activity activity) {
if (activity.getState().compareTo(State.EXECUTED) >= 0)
if (activity.inClosedPhase())
return;
Iterator<Entry<String, IActivityElement>> iter = activity.elementIterator();
while (iter.hasNext()) {
IActivityElement element = iter.next().getValue();
State state = element.getState();
if (element.getState().compareTo(State.EXECUTED) >= 0)
if (element.inClosedPhase())
continue;
// in series we can never have two Actions in execution, so if we found the action in execution, we stop
if (element instanceof Action //
&& (state == State.EXECUTION //
|| state == State.WARNING //
|| state == State.ERROR)) {
// in series, we can never have two Actions in execution, so if we found the action in execution, we stop
if (element instanceof Action && state.inExecutionPhase() && state != State.STOPPED)
break;
}
boolean canExecute = isExecutable(element);
if (canExecute) {
element.accept(this);
// in series we stop when the first action is set to execution
break;
}
// in series, we stop when the first action is set to execution
element.accept(this);
break;
}
}
@Override
public void visitParallel(Activity activity) {
if (activity.getState().compareTo(State.EXECUTED) >= 0)
if (activity.inClosedPhase())
return;
BooleanParameter atomicExecutionP = activity.findParameter(PARAM_ATOMIC_PARALLEL_EXECUTION, false);
boolean atomicExecution = atomicExecutionP != null && atomicExecutionP.getValue();
if (atomicExecution) {
if (!planActivity(activity)) {
unplanActivity(activity);
return;
}
// stop execution if at least one action is not executable from this entire tree
boolean anyActionNotExecutable = activity.streamActionsDeep().anyMatch(a -> {
if (!a.getState().canSetToExecution())
return false;
boolean executable = getExecutionPolicy(a).isExecutable(a);
if (!executable)
logger.info("Action " + a.getLocator() + " is not executable yet!");
return !executable;
});
if (anyActionNotExecutable)
return;
}
Iterator<Entry<String, IActivityElement>> iter = activity.elementIterator();
while (iter.hasNext()) {
IActivityElement element = iter.next().getValue();
if (element.getState().isExecuted())
if (element.inClosedPhase())
continue;
// in parallel we execute all the actions in the activity
if (isExecutable(element))
element.accept(this);
// in parallel, we execute all the actions in the activity
element.accept(this);
}
}
}

View File

@ -30,11 +30,11 @@ import li.strolch.service.api.Command;
import li.strolch.utils.dbc.DBC;
/**
* Command to unplan an {@link Activity} from a {@link Resource}. This {@link Command} assumes that the {@link
* IValueChange} objects of the action are already constructed and {@link Action#getResourceId()} is set.
* Command to unplan an {@link Activity} from a {@link Resource}. This {@link Command} assumes that the
* {@link IValueChange} objects of the action are already constructed and {@link Action#getResourceId()} is set.
*
* <br>
*
* <p>
* It iterates the {@link IValueChange} operators and unregisters the changes from the {@link StrolchTimedState} objects
* on the {@link Resource}.
*
@ -67,6 +67,10 @@ public class UnplanActivityCommand extends PlanningCommand {
@Override
public Void visitAction(Action action) {
State currentState = action.getState();
if (currentState == State.CREATED)
return null;
PolicyDef planningPolicyDef = action.findPolicy(PlanningPolicy.class, DEFAULT_PLANNING);
PlanningPolicy planningPolicy = tx().getPolicy(PlanningPolicy.class, planningPolicyDef);
planningPolicy.unplan(action);

View File

@ -1,6 +1,5 @@
package li.strolch.execution.policy;
import static li.strolch.model.StrolchModelConstants.PolicyConstants.BAG_OBJECTIVES;
import static li.strolch.model.StrolchModelConstants.PolicyConstants.PARAM_DURATION;
import static li.strolch.runtime.StrolchConstants.SYSTEM_USER_AGENT;
import static li.strolch.utils.helper.StringHelper.formatMillisecondsDuration;
@ -159,7 +158,8 @@ public abstract class ExecutionPolicy extends StrolchPolicy {
* executed
*/
public boolean isExecutable(Action action) {
return true;
State state = action.getState();
return state.isPlanned() || state.isStopped();
}
/**