[Major][Fix] Fixed seldom occurrences of deadlocks on execution

Don't call the controller directly, always call the execution handler.
This commit is contained in:
Robert von Burg 2023-02-20 15:38:07 +01:00
parent 687396f295
commit 9759b1e724
Signed by: eitch
GPG Key ID: 75DB9C85C74331F7
9 changed files with 105 additions and 145 deletions

View File

@ -2,11 +2,9 @@ package li.strolch.execution;
import static java.util.Collections.synchronizedMap;
import static li.strolch.execution.EventBasedExecutionHandler.PROP_LOCK_RETRIES;
import static li.strolch.runtime.StrolchConstants.SYSTEM_USER_AGENT;
import java.util.HashMap;
import java.util.Map;
import java.util.ResourceBundle;
import java.util.Set;
import li.strolch.agent.api.ObserverEvent;
@ -15,16 +13,12 @@ import li.strolch.agent.api.StrolchLockException;
import li.strolch.agent.api.StrolchRealm;
import li.strolch.execution.command.*;
import li.strolch.execution.policy.ExecutionPolicy;
import li.strolch.handler.operationslog.OperationsLog;
import li.strolch.model.Locator;
import li.strolch.model.Resource;
import li.strolch.model.State;
import li.strolch.model.Tags;
import li.strolch.model.activity.Action;
import li.strolch.model.activity.Activity;
import li.strolch.model.log.LogMessage;
import li.strolch.model.log.LogMessageState;
import li.strolch.model.log.LogSeverity;
import li.strolch.persistence.api.StrolchTransaction;
import li.strolch.privilege.model.Certificate;
import li.strolch.runtime.privilege.PrivilegedRunnable;
@ -457,52 +451,6 @@ public class Controller {
command.doCommand();
}
/**
* Sets the state of the {@link Action} with the given {@link Locator} to {@link State#ERROR}
*
* @param actionLoc
* the {@link Locator} of the {@link Action}
*/
public void asyncToError(Locator actionLoc) {
this.executionHandler.getExecutor().submit(() -> {
try {
toError(actionLoc);
} catch (Exception e) {
logger.error("Failed to set " + locator + " to error due to " + e.getMessage(), e);
if (this.agent.hasComponent(OperationsLog.class)) {
this.agent.getComponent(OperationsLog.class)
.addMessage(new LogMessage(realm, SYSTEM_USER_AGENT, locator, LogSeverity.Exception,
LogMessageState.Information, ResourceBundle.getBundle("strolch-service"),
"execution.handler.failed.error").withException(e).value("reason", e));
}
}
});
}
/**
* Sets the state of the {@link Action} with the given {@link Locator} to {@link State#WARNING}
*
* @param actionLoc
* the {@link Locator} of the {@link Action}
*/
public void asyncToWarning(Locator actionLoc) {
this.executionHandler.getExecutor().submit(() -> {
try {
toWarning(actionLoc);
} catch (Exception e) {
logger.error("Failed to set " + locator + " to warning due to " + e.getMessage(), e);
if (this.agent.hasComponent(OperationsLog.class)) {
this.agent.getComponent(OperationsLog.class)
.addMessage(new LogMessage(realm, SYSTEM_USER_AGENT, locator, LogSeverity.Exception,
LogMessageState.Information, ResourceBundle.getBundle("strolch-service"),
"execution.handler.failed.warning").withException(e).value("reason", e));
}
}
});
}
protected void lockWithRetries(StrolchTransaction tx) throws StrolchLockException {
if (tx.hasLock(this.locator))
return;

View File

@ -44,6 +44,10 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
this.controllers = synchronizedMapOfMaps(new MapOfMaps<>(true));
}
private static Locator trimLocator(Locator locator) {
return locator.trim(3);
}
@Override
public boolean isControlling(Activity activity) {
return this.controllers.containsElement(getDefaultRealm(), activity.getLocator());
@ -56,11 +60,11 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
@Override
public boolean isControlling(Locator locator) {
return this.controllers.containsElement(getDefaultRealm(), locator);
return this.controllers.containsElement(getDefaultRealm(), trimLocator(locator));
}
public boolean isControlling(String realm, Locator locator) {
return this.controllers.containsElement(realm, locator);
return this.controllers.containsElement(realm, trimLocator(locator));
}
@Override
@ -93,7 +97,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
@Override
public Controller getController(String realm, Locator locator) {
return this.controllers.getElement(realm, locator.trim(3));
return this.controllers.getElement(realm, trimLocator(locator));
}
@Override
@ -180,6 +184,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
if (this.controllers.containsElement(realm, activity.getLocator()))
throw new IllegalStateException(activity.getLocator() + " is already registered for execution!");
logger.info("Added " + activity.getLocator() + " @ " + realm);
Controller controller = newController(realm, activity);
this.controllers.addElement(realm, activity.getLocator(), controller);
notifyObserverAdd(controller);
@ -221,9 +226,10 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
throw new IllegalStateException(
"ExecutionHandler state is " + state + ", can not add activities for execution!");
Controller controller = this.controllers.getElement(realm, activityLoc);
Locator trimmedLocator = trimLocator(activityLoc);
Controller controller = this.controllers.getElement(realm, trimmedLocator);
if (controller == null)
throw new IllegalStateException("No controller registered for activity " + activityLoc);
throw new IllegalStateException("No controller registered for activity " + trimmedLocator);
toExecution(controller);
}
@ -246,8 +252,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
@Override
public void removeFromExecution(String realm, Locator activityLoc) {
Locator rootElemLoc = activityLoc.trim(3);
Controller controller = this.controllers.removeElement(realm, rootElemLoc);
Controller controller = this.controllers.removeElement(realm, trimLocator(activityLoc));
if (controller != null)
getExecutor().submit(() -> notifyObserverRemove(controller));
}
@ -332,10 +337,12 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
return;
}
logger.info("Triggering execution...");
synchronized (this.controllers) {
controllerStream(realm).forEach(this::toExecution);
}
logger.info("Triggering execution for all controllers on realm " + realm + "...");
getExecutor().execute(() -> {
synchronized (this.controllers) {
controllerStream(realm).forEach(this::toExecution);
}
});
}
protected void toExecution(Controller controller) {
@ -347,6 +354,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
return;
}
logger.info("Added toExecution task for " + controller.getLocator() + " @ " + realm);
getExecutor().execute(() -> {
try {
@ -378,10 +386,11 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
@Override
public void toExecuted(String realm, Locator locator) {
logger.info("Added toExecuted task for " + locator + " @ " + realm);
getExecutor().execute(() -> {
try {
Controller controller = this.controllers.getElement(realm, locator.trim(3));
Controller controller = this.controllers.getElement(realm, trimLocator(locator));
if (controller != null)
controller.toExecuted(locator);
@ -407,10 +416,11 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
@Override
public void toStopped(String realm, Locator locator) {
logger.warn("Added toStopped task for " + locator + " @ " + realm);
getExecutor().execute(() -> {
try {
Controller controller = this.controllers.getElement(realm, locator.trim(3));
Controller controller = this.controllers.getElement(realm, trimLocator(locator));
if (controller != null)
controller.toStopped(locator);
@ -429,15 +439,16 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
@Override
public void toError(Locator actionLoc) {
toExecuted(getDefaultRealm(), actionLoc);
toError(getDefaultRealm(), actionLoc);
}
@Override
public void toError(String realm, Locator locator) {
logger.error("Added toError task for " + locator + " @ " + realm);
getExecutor().execute(() -> {
try {
Controller controller = this.controllers.getElement(realm, locator.trim(3));
Controller controller = this.controllers.getElement(realm, trimLocator(locator));
if (controller != null)
controller.toError(locator);
@ -461,10 +472,11 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
@Override
public void toWarning(String realm, Locator locator) {
logger.warn("Added toWarning task for " + locator + " @ " + realm);
getExecutor().execute(() -> {
try {
Controller controller = this.controllers.getElement(realm, locator.trim(3));
Controller controller = this.controllers.getElement(realm, trimLocator(locator));
if (controller != null)
controller.toWarning(locator);
@ -488,23 +500,25 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
@Override
public void archiveActivity(String realm, Locator activityLoc) {
logger.info("Added archiveActivity task for " + activityLoc + " @ " + realm);
Locator trimmedLocator = trimLocator(activityLoc);
getExecutor().execute(() -> {
try {
runAsAgent(ctx -> {
try (StrolchTransaction tx = openTx(realm, ctx.getCertificate(), ArchiveActivityCommand.class,
false)) {
ArchiveActivityCommand command = new ArchiveActivityCommand(tx);
command.setActivityLoc(activityLoc);
command.setActivityLoc(trimmedLocator);
tx.addCommand(command);
tx.commitOnClose();
}
});
} catch (Exception e) {
logger.error("Failed to archive " + activityLoc + " due to " + e.getMessage(), e);
logger.error("Failed to archive " + trimmedLocator + " due to " + e.getMessage(), e);
if (getContainer().hasComponent(OperationsLog.class)) {
getComponent(OperationsLog.class).addMessage(
new LogMessage(realm, SYSTEM_USER_AGENT, activityLoc, LogSeverity.Exception,
new LogMessage(realm, SYSTEM_USER_AGENT, trimmedLocator, LogSeverity.Exception,
LogMessageState.Information, ResourceBundle.getBundle("strolch-service"),
"execution.handler.failed.archive").withException(e).value("reason", e));
}

View File

@ -1,23 +1,18 @@
package li.strolch.execution;
import static li.strolch.runtime.StrolchConstants.SYSTEM_USER_AGENT;
import static li.strolch.utils.helper.StringHelper.formatMillisecondsDuration;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.ResourceBundle;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import li.strolch.agent.api.ComponentContainer;
import li.strolch.agent.api.StrolchAgent;
import li.strolch.handler.operationslog.OperationsLog;
import li.strolch.model.Locator;
import li.strolch.model.log.LogMessage;
import li.strolch.model.log.LogMessageState;
import li.strolch.model.log.LogSeverity;
import li.strolch.utils.time.PeriodDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -74,7 +69,7 @@ public class SimpleDurationExecutionTimer implements DelayedExecutionTimer {
SimulationTask task = new SimulationTask(realm, container, actionLocator);
ScheduledFuture<?> future = getExecutor().schedule(task, duration, TimeUnit.MILLISECONDS);
this.simulationTasks.put(actionLocator, future);
logger.info("Registered execution timer for " + actionLocator);
logger.info("Scheduled a delay of " + formatMillisecondsDuration(duration) + " for " + actionLocator);
}
}
}
@ -87,31 +82,9 @@ public class SimpleDurationExecutionTimer implements DelayedExecutionTimer {
this.simulationTasks.remove(locator);
ExecutionHandler executionHandler = container.getComponent(ExecutionHandler.class);
Controller controller = executionHandler.getController(realm, locator);
if (controller == null) {
logger.warn("Controller already remove for " + locator);
return;
}
if (controller.isStopped(locator)) {
logger.warn("Execution for " + locator + " is already stopped.");
return;
}
try {
controller.toExecuted(locator);
executionHandler.triggerExecution(realm);
} catch (Exception e) {
logger.error("Failed to set " + locator + " to executed due to " + e.getMessage(), e);
if (this.agent.getContainer().hasComponent(OperationsLog.class)) {
this.agent.getContainer()
.getComponent(OperationsLog.class)
.addMessage(new LogMessage(realm, SYSTEM_USER_AGENT, locator, LogSeverity.Exception,
LogMessageState.Information, ResourceBundle.getBundle("strolch-service"),
"execution.handler.failed.executed").withException(e).value("reason", e));
}
}
logger.info("Completing task " + locator);
executionHandler.toExecuted(realm, locator);
}
private class SimulationTask implements Runnable {

View File

@ -285,6 +285,51 @@ public abstract class ExecutionPolicy extends StrolchPolicy {
getDelayedExecutionTimer().delay(delayMs, runnable);
}
/**
* Delays the given {@link Runnable} by the given duration, but randomly changing the duration in milliseconds by
* the given min and max factors
*/
protected void delayRandom(Duration duration, double minFactor, double maxFactor, Runnable runnable) {
long durationMs = duration.toMillis();
delayRandom(durationMs, minFactor, maxFactor, TimeUnit.MILLISECONDS, runnable);
}
/**
* Delays the given {@link Runnable} by the given duration, but randomly changing the duration in milliseconds by
* the given min and max factors
*/
protected void delayRandom(long duration, double minFactor, double maxFactor, TimeUnit delayUnit,
Runnable runnable) {
delayRandom((long) (duration * minFactor), (long) (duration * maxFactor), delayUnit, runnable);
}
/**
* Delays the given {@link Runnable} by randomly choosing a value by calling
* {@link ThreadLocalRandom#nextLong(long, long)} passing min and max as origin and bound respectively
*/
protected void delayRandom(long min, long max, TimeUnit delayUnit, Runnable runnable) {
long delay = ThreadLocalRandom.current().nextLong(min, max + 1);
delayRandom(delay, delayUnit, runnable);
}
/**
* Delays the given {@link Runnable} by the given delay value
*
* @param delay
* the delay time
* @param delayUnit
* the UOM of the delay time
*/
protected void delayRandom(long delay, TimeUnit delayUnit, Runnable runnable) {
long delayMs = delayUnit.toMillis(delay);
if (delayMs < 20) {
logger.warn("Delay time for " + this.actionLoc + " is less than 20ms, overriding!");
delayMs = 20;
}
logger.info("Delaying runnable " + runnable + " by " + formatMillisecondsDuration(delayMs));
getDelayedExecutionTimer().delay(delayMs, runnable);
}
/**
* Async method to delay setting the given {@link Action} to executed by the duration defined by the
* {@link DurationParameter} found by calling {@link Action#findParameter(String, String, boolean)}

View File

@ -2,11 +2,9 @@ package li.strolch.execution.policy;
import java.util.concurrent.ScheduledFuture;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import li.strolch.exception.StrolchException;
import li.strolch.execution.ExecutionHandler;
import li.strolch.handler.operationslog.OperationsLog;
import li.strolch.model.State;
import li.strolch.model.activity.Action;
@ -123,8 +121,7 @@ public class SimpleExecution extends ExecutionPolicy {
protected void toExecuted() throws Exception {
cancelWarningTask();
stop();
getController().toExecuted(this.actionLoc);
getComponent(ExecutionHandler.class).triggerExecution(this.realm);
getExecutionHandler().toExecuted(this.realm, this.actionLoc);
}
protected void toError(LogMessage message) {
@ -132,42 +129,31 @@ public class SimpleExecution extends ExecutionPolicy {
stop();
logger.error("Action " + message.getLocator() + " failed because of: " + message.formatMessage());
addMessage(message);
getController().asyncToError(message.getLocator());
getExecutionHandler().toError(this.realm, message.getLocator());
}
protected void toWarning(LogMessage message) {
cancelWarningTask();
addMessage(message);
getController().asyncToWarning(message.getLocator());
getExecutionHandler().toWarning(this.realm, message.getLocator());
}
protected StrolchTransaction openLocalTx(PrivilegeContext ctx, boolean readOnly) throws StrolchException {
return getContainer().getRealm(ctx.getCertificate()).openTx(ctx.getCertificate(), getClass(), readOnly);
}
protected void runWithFreshAction(Consumer<Action> consumer) {
runWithFreshAction(consumer, true);
protected void runWithFreshActionReadonly(BiConsumer<StrolchTransaction, Action> consumer,
Supplier<String> failMsgSupplier) {
runWithFreshAction(true, consumer, failMsgSupplier);
}
protected void runWithFreshAction(Consumer<Action> consumer, boolean readOnly) {
try {
runAsAgent(ctx -> {
try (StrolchTransaction tx = openLocalTx(ctx, readOnly)) {
tx.lock(this.actionLoc.trim(3));
Action action = tx.findElement(this.actionLoc);
consumer.accept(action);
}
});
} catch (Exception e) {
logger.error("Failed to perform consumer " + consumer.toString(), e);
}
protected void runWithFreshActionWritable(BiConsumer<StrolchTransaction, Action> consumer,
Supplier<String> failMsgSupplier) {
runWithFreshAction(false, consumer, failMsgSupplier);
}
protected void runWithFreshAction(BiConsumer<StrolchTransaction, Action> consumer) {
runWithFreshAction(true, consumer);
}
protected void runWithFreshAction(boolean readOnly, BiConsumer<StrolchTransaction, Action> consumer) {
private void runWithFreshAction(boolean readOnly, BiConsumer<StrolchTransaction, Action> consumer,
Supplier<String> failMsgSupplier) {
try {
runAsAgent(ctx -> {
try (StrolchTransaction tx = openLocalTx(ctx, readOnly)) {
@ -180,7 +166,7 @@ public class SimpleExecution extends ExecutionPolicy {
}
});
} catch (Exception e) {
logger.error("Failed to perform consumer " + consumer.toString(), e);
logger.error(failMsgSupplier.get(), e);
}
}
}

View File

@ -1,6 +1,5 @@
package li.strolch.execution.service;
import li.strolch.execution.Controller;
import li.strolch.execution.ExecutionHandler;
import li.strolch.execution.command.SetActionToErrorCommand;
import li.strolch.model.activity.Action;
@ -26,9 +25,9 @@ public class SetActionToErrorService extends AbstractService<LocatorArgument, Se
protected ServiceResult internalDoService(LocatorArgument arg) throws Exception {
ExecutionHandler executionHandler = getComponent(ExecutionHandler.class);
Controller controller = executionHandler.getController(getArgOrUserRealm(arg), arg.locator.trim(3));
if (controller != null) {
controller.toError(arg.locator);
String realm = getArgOrUserRealm(arg);
if (executionHandler.isControlling(realm, arg.locator.trim(3))) {
executionHandler.toError(realm, arg.locator);
return ServiceResult.success();
}

View File

@ -1,6 +1,5 @@
package li.strolch.execution.service;
import li.strolch.execution.Controller;
import li.strolch.execution.ExecutionHandler;
import li.strolch.execution.command.SetActionToExecutedCommand;
import li.strolch.model.activity.Action;
@ -27,10 +26,8 @@ public class SetActionToExecutedService extends AbstractService<LocatorArgument,
ExecutionHandler executionHandler = getComponent(ExecutionHandler.class);
String realm = getArgOrUserRealm(arg);
Controller controller = executionHandler.getController(realm, arg.locator.trim(3));
if (controller != null) {
controller.toExecuted(arg.locator);
executionHandler.triggerExecution(realm);
if (executionHandler.isControlling(realm, arg.locator.trim(3))) {
executionHandler.toExecuted(realm, arg.locator);
return ServiceResult.success();
}

View File

@ -1,6 +1,5 @@
package li.strolch.execution.service;
import li.strolch.execution.Controller;
import li.strolch.execution.ExecutionHandler;
import li.strolch.execution.command.SetActionToStoppedCommand;
import li.strolch.model.activity.Action;
@ -26,9 +25,9 @@ public class SetActionToStoppedService extends AbstractService<LocatorArgument,
protected ServiceResult internalDoService(LocatorArgument arg) throws Exception {
ExecutionHandler executionHandler = getComponent(ExecutionHandler.class);
Controller controller = executionHandler.getController(getArgOrUserRealm(arg), arg.locator.trim(3));
if (controller != null) {
controller.toStopped(arg.locator);
String realm = getArgOrUserRealm(arg);
if (executionHandler.isControlling(realm, arg.locator.trim(3))) {
executionHandler.toStopped(realm, arg.locator);
return ServiceResult.success();
}

View File

@ -1,6 +1,5 @@
package li.strolch.execution.service;
import li.strolch.execution.Controller;
import li.strolch.execution.ExecutionHandler;
import li.strolch.execution.command.SetActionToWarningCommand;
import li.strolch.model.activity.Action;
@ -26,9 +25,9 @@ public class SetActionToWarningService extends AbstractService<LocatorArgument,
protected ServiceResult internalDoService(LocatorArgument arg) throws Exception {
ExecutionHandler executionHandler = getComponent(ExecutionHandler.class);
Controller controller = executionHandler.getController(getArgOrUserRealm(arg), arg.locator.trim(3));
if (controller != null) {
controller.toWarning(arg.locator);
String realm = getArgOrUserRealm(arg);
if (executionHandler.isControlling(realm, arg.locator.trim(3))) {
executionHandler.toWarning(realm, arg.locator);
return ServiceResult.success();
}