[Fix] Fixed reservation problem in concurrent execution

This commit is contained in:
Robert von Burg 2017-10-09 12:31:02 +02:00
parent efb15f305c
commit c7fc11f563
6 changed files with 50 additions and 53 deletions

View File

@ -31,7 +31,7 @@ public interface DelayedExecutionTimer {
/**
* Cancels any delayed execution for the given {@link Locator}
*
* @param actionLocator
* @param locator
* the {@link Action}'s {@link Locator}
*/
void cancel(Locator locator);

View File

@ -33,7 +33,7 @@ import li.strolch.utils.dbc.DBC;
/**
* The event based execution handler waits for events in that the {@link ExecutionPolicy} implementations must call the
* relevant methods when the work is complete. Afterwards the next {@link Action} in the procedure is executed
*
*
* @author Robert von Burg <eitch@eitchnet.ch>
*/
public class EventBasedExecutionHandler extends ExecutionHandler {
@ -70,9 +70,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
logger.info("Not restarting execution of activities.");
} else {
logger.info("Restarting execution of activities.");
runAsAgent(ctx -> {
restartActivityExecution(ctx);
});
runAsAgent(this::restartActivityExecution);
}
super.start();
@ -182,7 +180,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
if (getContainer().hasComponent(OperationsLog.class)) {
getComponent(OperationsLog.class).addMessage(new LogMessage(realm, locator, LogSeverity.EXCEPTION,
ResourceBundle.getBundle("strolch-service"), "execution.handler.failed.execution")
.value("reason", e));
.value("reason", e));
}
}
});
@ -201,7 +199,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
if (getContainer().hasComponent(OperationsLog.class)) {
getComponent(OperationsLog.class).addMessage(new LogMessage(realm, locator, LogSeverity.EXCEPTION,
ResourceBundle.getBundle("strolch-service"), "execution.handler.failed.executed")
.value("reason", e));
.value("reason", e));
}
}
});
@ -220,7 +218,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
if (getContainer().hasComponent(OperationsLog.class)) {
getComponent(OperationsLog.class).addMessage(new LogMessage(realm, locator, LogSeverity.EXCEPTION,
ResourceBundle.getBundle("strolch-service"), "execution.handler.failed.stopped")
.value("reason", e));
.value("reason", e));
}
}
});
@ -239,7 +237,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
if (getContainer().hasComponent(OperationsLog.class)) {
getComponent(OperationsLog.class).addMessage(new LogMessage(realm, locator, LogSeverity.EXCEPTION,
ResourceBundle.getBundle("strolch-service"), "execution.handler.failed.error")
.value("reason", e));
.value("reason", e));
}
}
});
@ -258,7 +256,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
if (getContainer().hasComponent(OperationsLog.class)) {
getComponent(OperationsLog.class).addMessage(new LogMessage(realm, locator, LogSeverity.EXCEPTION,
ResourceBundle.getBundle("strolch-service"), "execution.handler.failed.warning")
.value("reason", e));
.value("reason", e));
}
}
});
@ -294,10 +292,10 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
logger.error("Failed to archive " + activityLoc + " due to " + e.getMessage(), e);
if (getContainer().hasComponent(OperationsLog.class)) {
getComponent(OperationsLog.class)
.addMessage(new LogMessage(realm, activityLoc, LogSeverity.EXCEPTION,
getComponent(OperationsLog.class).addMessage(
new LogMessage(realm, activityLoc, LogSeverity.EXCEPTION,
ResourceBundle.getBundle("strolch-service"), "execution.handler.failed.archive")
.value("reason", e));
.value("reason", e));
}
}
});
@ -320,7 +318,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
ExecuteActivityCommand command = new ExecuteActivityCommand(getContainer(), tx);
command.setActivity(activity);
tx.addCommand(command);
command.doCommand();
tx.commitOnClose();
}
@ -339,7 +337,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
// set this action to executed
SetActionToExecutedCommand command = new SetActionToExecutedCommand(getContainer(), tx);
command.setAction(action);
tx.addCommand(command);
command.doCommand();
// flush so we can see that changes performed
tx.flush();
@ -361,9 +359,9 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
ExecuteActivityCommand execCommand = new ExecuteActivityCommand(getContainer(), tx);
execCommand.setActivity(activity);
tx.addCommand(execCommand);
execCommand.doCommand();
// flush so we can see that changes performed
// flush so we can see the changes performed
tx.flush();
}
@ -384,7 +382,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
SetActionToWarningCommand command = new SetActionToWarningCommand(getContainer(), tx);
command.setAction((Action) elem);
tx.addCommand(command);
command.doCommand();
tx.commitOnClose();
}
@ -400,7 +398,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
SetActionToErrorCommand command = new SetActionToErrorCommand(getContainer(), tx);
command.setAction((Action) elem);
tx.addCommand(command);
command.doCommand();
tx.commitOnClose();
}
@ -416,7 +414,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
SetActionToStoppedCommand command = new SetActionToStoppedCommand(getContainer(), tx);
command.setAction((Action) elem);
tx.addCommand(command);
command.doCommand();
tx.commitOnClose();
}

View File

@ -19,8 +19,6 @@ import li.strolch.model.activity.TimeOrderingVisitor;
import li.strolch.model.policy.PolicyDef;
import li.strolch.model.visitor.IActivityElementVisitor;
import li.strolch.persistence.api.StrolchTransaction;
import li.strolch.persistence.api.UpdateActivityCommand;
import li.strolch.persistence.api.UpdateOrderCommand;
import li.strolch.policy.PolicyHandler;
import li.strolch.service.api.Command;
import li.strolch.utils.helper.StringHelper;
@ -39,8 +37,7 @@ public abstract class ExecutionCommand extends Command implements TimeOrderingVi
if (StringHelper.isEmpty(resourceType) || resourceType.equals(DASH))
throw new StrolchException("No resourceType defined on action " + action.getLocator());
Resource resource = tx.getResourceBy(resourceType, resourceId, true);
return resource;
return tx.getResourceBy(resourceType, resourceId, true);
}
protected void updateOrderState(Activity rootElement, State currentState, State newState) {
@ -56,9 +53,7 @@ public abstract class ExecutionCommand extends Command implements TimeOrderingVi
order.setState(rootElement.getState());
UpdateOrderCommand cmd = new UpdateOrderCommand(getContainer(), tx());
cmd.setOrder(order);
cmd.doCommand();
tx().update(order);
}
protected ExecutionPolicy getExecutionPolicy(Action action) {
@ -89,8 +84,8 @@ public abstract class ExecutionCommand extends Command implements TimeOrderingVi
// in series we can never have two Actions in execution, so if we found the action in execution, we stop
if (element instanceof Action //
&& (state == State.EXECUTION //
|| state == State.WARNING //
|| state == State.ERROR)) {
|| state == State.WARNING //
|| state == State.ERROR)) {
break;
}
@ -155,7 +150,11 @@ public abstract class ExecutionCommand extends Command implements TimeOrderingVi
public Void visitAction(Action action) {
ExecutionPolicy executionPolicy = getExecutionPolicy(action);
if (executionPolicy.isExecutable(action)) {
if (!executionPolicy.isExecutable(action)) {
logger.info("Action " + action.getLocator() + " is not yet executable.");
} else {
logger.info("Action " + action.getLocator() + " is now being executed...");
// we catch all exceptions because we can't undo, thus need to set the state to ERROR in this case
// this is only required because we execute actions in same TX as we set to executed any previous actions
@ -166,9 +165,7 @@ public abstract class ExecutionCommand extends Command implements TimeOrderingVi
logger.error("Failed to set " + action.getLocator() + " to execution due to " + e.getMessage(), e);
action.setState(State.ERROR);
UpdateActivityCommand command = new UpdateActivityCommand(getContainer(), tx());
command.setActivity(action.getRootElement());
command.doCommand();
tx().update(action.getRootElement());
getConfirmationPolicy(action).toError(action);
}

View File

@ -9,7 +9,6 @@ import li.strolch.model.State;
import li.strolch.model.activity.Action;
import li.strolch.model.activity.Activity;
import li.strolch.persistence.api.StrolchTransaction;
import li.strolch.persistence.api.UpdateActivityCommand;
import li.strolch.policy.StrolchPolicy;
import li.strolch.privilege.base.PrivilegeException;
import li.strolch.privilege.model.Certificate;
@ -123,9 +122,7 @@ public abstract class ExecutionPolicy extends StrolchPolicy {
action.setState(state);
UpdateActivityCommand command = new UpdateActivityCommand(getContainer(), tx());
command.setActivity(action.getRootElement());
command.doCommand();
tx().update(action.getRootElement());
String msg = "Action " + action.getLocator() + " is now in state " + state;
if (state == State.ERROR)
@ -158,9 +155,6 @@ public abstract class ExecutionPolicy extends StrolchPolicy {
* {@link ComponentContainer#getRealm(Certificate)}. This transaction should be used in a try-with-resource clause
* so it is properly closed.
*
* @param action
* the action to use for the opened TX
*
* @return the open {@link StrolchTransaction}
*
* @throws StrolchException
@ -175,12 +169,12 @@ public abstract class ExecutionPolicy extends StrolchPolicy {
}
/**
* Performs the given {@link PrivilegedRunnable} as the system user {@link StrolchConstants#SYSTEM_USER_EXECUTION}
* Performs the given {@link PrivilegedRunnable} as the system user {@link StrolchConstants#SYSTEM_USER_AGENT}
*
* @param runnable
* the runnable to perform
*
* @throws PrivilegeException
* @throws PrivilegeException if the agent is missing the privilege
*/
protected void runAsAgent(PrivilegedRunnable runnable) throws PrivilegeException {
getContainer().getPrivilegeHandler().runAs(StrolchConstants.SYSTEM_USER_AGENT, runnable);

View File

@ -8,7 +8,6 @@ import li.strolch.model.State;
import li.strolch.model.activity.Action;
import li.strolch.model.parameter.BooleanParameter;
import li.strolch.persistence.api.StrolchTransaction;
import li.strolch.persistence.api.UpdateResourceCommand;
/**
* <p>
@ -17,12 +16,12 @@ import li.strolch.persistence.api.UpdateResourceCommand;
* and only allows execution if the value is false, in which case the {@link #toExecution(Action)} method sets the value
* to true, and the {@link #toExecuted(Action)} method returns the value to false.
* </p>
*
*
* <p>
* <b>Note:</b> the reservation is done for {@link Action} of type {@link #TYPE_RESERVE} and releasing is done for
* {@link Action} of type {@link #TYPE_RELEASE}
* </p>
*
*
* @author Robert von Burg <eitch@eitchnet.ch>
*/
public class ReservationExection extends DurationExecution {
@ -39,6 +38,8 @@ public class ReservationExection extends DurationExecution {
@Override
public boolean isExecutable(Action action) {
tx().lock(getResource(action));
// only check if reserve
if (!action.getType().equals(TYPE_RESERVE) && !action.getType().equals(TYPE_RELEASE)) {
// otherwise delegate to super class
@ -56,16 +57,19 @@ public class ReservationExection extends DurationExecution {
Resource resource = getResource(action);
if (!resource.hasParameter(BAG_PARAMETERS, PARAM_RESERVED))
throw new StrolchModelException("Parameter " + PARAM_RESERVED + " on bag " + BAG_PARAMETERS + " missing on "
+ resource.getLocator());
throw new StrolchModelException(
"Parameter " + PARAM_RESERVED + " on bag " + BAG_PARAMETERS + " missing on " + resource
.getLocator());
BooleanParameter reservedP = resource.getParameter(BAG_PARAMETERS, PARAM_RESERVED);
return reservedP.getValue().booleanValue();
return reservedP.getValue();
}
@Override
public void toExecution(Action action) {
tx().lock(getResource(action));
// only do if reserve
if (!action.getType().equals(TYPE_RESERVE) && !action.getType().equals(TYPE_RELEASE)) {
// otherwise delegate to super class
@ -86,6 +90,8 @@ public class ReservationExection extends DurationExecution {
@Override
public void toExecuted(Action action) {
tx().lock(getResource(action));
// only do if release
if (!action.getType().equals(TYPE_RESERVE) && !action.getType().equals(TYPE_RELEASE)) {
// otherwise delegate to super class
@ -109,8 +115,6 @@ public class ReservationExection extends DurationExecution {
reservedP.setValue(reserved);
// save changes
UpdateResourceCommand command = new UpdateResourceCommand(getContainer(), tx());
command.setResource(resource);
command.doCommand();
tx().update(resource);
}
}

View File

@ -13,7 +13,7 @@ import li.strolch.persistence.api.StrolchTransaction;
* The {@link ToErrorReservationExecution} executes same as {@link ReservationExection} with the difference that
* {@link #isExecutable(Action)} always returns true, and if the action's resource is currently reserved, the execution
* fails and the state is set to ERROR
*
*
* @author Robert von Burg <eitch@eitchnet.ch>
*/
public class ToErrorReservationExecution extends ReservationExection {
@ -28,6 +28,8 @@ public class ToErrorReservationExecution extends ReservationExection {
@Override
public boolean isExecutable(Action action) {
tx().lock(getResource(action));
if (action.getType().equals(TYPE_RESERVE)) {
return true;
}
@ -38,11 +40,13 @@ public class ToErrorReservationExecution extends ReservationExection {
@Override
public void toExecution(Action action) {
tx().lock(getResource(action));
if (action.getType().equals(TYPE_RESERVE) && isReserved(action)) {
setActionState(action, State.EXECUTION);
toError(new LogMessage(tx().getRealmName(), action.getLocator(), LogSeverity.ERROR,
ResourceBundle.getBundle("strolch-service"), "execution.policy.reservation.alreadyReserved")
.value("resourceLoc", getResource(action).getLocator().toString()));
.value("resourceLoc", getResource(action).getLocator().toString()));
} else {
super.toExecution(action);
}