diff --git a/li.strolch.service/src/main/java/li/strolch/execution/EventBasedExecutionHandler.java b/li.strolch.service/src/main/java/li/strolch/execution/EventBasedExecutionHandler.java index 4b579cc4c..197a457d2 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/EventBasedExecutionHandler.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/EventBasedExecutionHandler.java @@ -7,16 +7,14 @@ import java.util.*; import java.util.concurrent.ExecutorService; import li.strolch.agent.api.ComponentContainer; +import li.strolch.agent.api.ObserverEvent; import li.strolch.execution.command.*; import li.strolch.execution.policy.ActivityArchivalPolicy; import li.strolch.execution.policy.ExecutionPolicy; import li.strolch.handler.operationslog.LogMessage; import li.strolch.handler.operationslog.LogSeverity; import li.strolch.handler.operationslog.OperationsLog; -import li.strolch.model.Locator; -import li.strolch.model.ParameterBag; -import li.strolch.model.Resource; -import li.strolch.model.State; +import li.strolch.model.*; import li.strolch.model.activity.Action; import li.strolch.model.activity.Activity; import li.strolch.model.activity.IActivityElement; @@ -97,6 +95,23 @@ public class EventBasedExecutionHandler extends ExecutionHandler { } } + @Override + public void addForExecution(String realm, Activity activity) { + + ExecutionHandlerState state = this.statesByRealm.getOrDefault(realm, ExecutionHandlerState.Running); + if (state == ExecutionHandlerState.HaltNew) + throw new IllegalStateException( + "ExecutionHandler state is " + state + ", can not add activities for execution!"); + + Locator rootElemLoc = activity.getLocator(); + synchronized (this.registeredActivities) { + this.registeredActivities.addElement(realm, rootElemLoc); + } + + notifyObserverAdd(realm, activity); + toExecution(realm, rootElemLoc); + } + @Override public void addForExecution(String realm, Locator activityLoc) { @@ -109,6 +124,8 @@ public class EventBasedExecutionHandler extends ExecutionHandler { synchronized (this.registeredActivities) { this.registeredActivities.addElement(realm, rootElemLoc); } + + getExecutor().submit(() -> notifyObserverAdd(realm, activityLoc)); toExecution(realm, activityLoc); } @@ -118,11 +135,13 @@ public class EventBasedExecutionHandler extends ExecutionHandler { synchronized (this.registeredActivities) { this.registeredActivities.removeElement(realm, rootElemLoc); } + getExecutor().submit(() -> notifyObserverRemove(realm, activityLoc)); } @Override public void clearAllCurrentExecutions(String realm) { - this.registeredActivities.removeSet(realm); + Set removed = this.registeredActivities.removeSet(realm); + getExecutor().submit(() -> notifyObserverRemove(realm, removed)); } private void restartActivityExecution(PrivilegeContext ctx) { @@ -430,6 +449,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler { synchronized (this.registeredActivities) { this.registeredActivities.removeElement(realm, activityLoc); } + notifyObserverRemove(realm, activityLoc); return; } @@ -440,6 +460,8 @@ public class EventBasedExecutionHandler extends ExecutionHandler { logger.warn("Activity " + activityLoc + " already removed from registered activities!"); } + notifyObserverRemove(tx, activity); + logger.info("Archiving activity " + activityLoc + " with state " + activity.getState()); archiveActivity(realm, activity.getLocator()); @@ -450,6 +472,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler { command.validate(); command.doCommand(); + notifyObserverUpdate(tx, activity); tx.commitOnClose(); } } @@ -471,6 +494,8 @@ public class EventBasedExecutionHandler extends ExecutionHandler { command.validate(); command.doCommand(); + notifyObserverUpdate(tx, action.getRootElement()); + // flush so we can see the changes performed tx.flush(); @@ -483,6 +508,8 @@ public class EventBasedExecutionHandler extends ExecutionHandler { logger.warn("Activity " + activityLoc + " already removed from registered activities!"); } + notifyObserverRemove(tx, action.getRootElement()); + logger.info("Archiving activity " + activityLoc + " with state " + activity.getState()); archiveActivity(realm, activity.getLocator()); @@ -500,6 +527,8 @@ public class EventBasedExecutionHandler extends ExecutionHandler { execCommand.validate(); execCommand.doCommand(); + notifyObserverUpdate(tx, action.getRootElement()); + // flush so we can see the changes performed tx.flush(); } @@ -525,6 +554,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler { command.validate(); command.doCommand(); + notifyObserverUpdate(tx, elem.getRootElement()); tx.commitOnClose(); } } @@ -542,6 +572,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler { command.validate(); command.doCommand(); + notifyObserverUpdate(tx, elem.getRootElement()); tx.commitOnClose(); } } @@ -559,6 +590,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler { command.validate(); command.doCommand(); + notifyObserverUpdate(tx, elem.getRootElement()); tx.commitOnClose(); } @@ -566,6 +598,93 @@ public class EventBasedExecutionHandler extends ExecutionHandler { triggerExecution(realm); } + private void notifyObserverAdd(String realm, Locator activityLoc) { + try { + runAsAgent(ctx -> { + try (StrolchTransaction tx = openTx(realm, ctx.getCertificate(), true)) { + Activity activity = tx.findElement(activityLoc, true); + if (activity != null) { + ObserverEvent observerEvent = new ObserverEvent(); + observerEvent.added.addElement(Tags.CONTROLLER, activity); + getContainer().getRealm(realm).getObserverHandler().notify(observerEvent); + } + } + }); + } catch (Exception e) { + logger.error("Failed to notify observers of new controller " + activityLoc); + } + } + + private void notifyObserverAdd(String realm, Activity rootElement) { + if (!getContainer().getRealm(realm).isUpdateObservers()) + return; + + ObserverEvent observerEvent = new ObserverEvent(); + observerEvent.added.addElement(Tags.CONTROLLER, rootElement); + getContainer().getRealm(realm).getObserverHandler().notify(observerEvent); + } + + private void notifyObserverUpdate(StrolchTransaction tx, Activity rootElement) { + if (!getContainer().getRealm(tx.getRealmName()).isUpdateObservers()) + return; + + ObserverEvent observerEvent = new ObserverEvent(); + observerEvent.updated.addElement(Tags.CONTROLLER, rootElement); + tx.getContainer().getRealm(tx.getRealmName()).getObserverHandler().notify(observerEvent); + } + + private void notifyObserverRemove(StrolchTransaction tx, Activity rootElement) { + if (!getContainer().getRealm(tx.getRealmName()).isUpdateObservers()) + return; + + ObserverEvent observerEvent = new ObserverEvent(); + observerEvent.removed.addElement(Tags.CONTROLLER, rootElement); + tx.getContainer().getRealm(tx.getRealmName()).getObserverHandler().notify(observerEvent); + } + + private void notifyObserverRemove(String realm, Locator activityLoc) { + if (!getContainer().getRealm(realm).isUpdateObservers()) + return; + + try { + runAsAgent(ctx -> { + try (StrolchTransaction tx = openTx(realm, ctx.getCertificate(), true)) { + Activity activity = tx.findElement(activityLoc, true); + if (activity != null) { + ObserverEvent observerEvent = new ObserverEvent(); + observerEvent.removed.addElement(Tags.CONTROLLER, activity); + getContainer().getRealm(realm).getObserverHandler().notify(observerEvent); + } + } + }); + } catch (Exception e) { + logger.error("Failed to notify observers of removed controller " + activityLoc); + } + } + + private void notifyObserverRemove(String realm, Set activityLocs) { + if (!getContainer().getRealm(realm).isUpdateObservers()) + return; + + try { + runAsAgent(ctx -> { + try (StrolchTransaction tx = openTx(realm, ctx.getCertificate(), true)) { + ObserverEvent observerEvent = new ObserverEvent(); + + for (Locator activityLoc : activityLocs) { + Activity activity = tx.findElement(activityLoc, true); + if (activity != null) + observerEvent.removed.addElement(Tags.CONTROLLER, activity); + } + + getContainer().getRealm(realm).getObserverHandler().notify(observerEvent); + } + }); + } catch (Exception e) { + logger.error("Failed to notify observers of removed controllers " + activityLocs); + } + } + @Override public DelayedExecutionTimer getDelayedExecutionTimer() { return this.delayedExecutionTimer; diff --git a/li.strolch.service/src/main/java/li/strolch/execution/ExecutionHandler.java b/li.strolch.service/src/main/java/li/strolch/execution/ExecutionHandler.java index 128c59533..e33faca4c 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/ExecutionHandler.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/ExecutionHandler.java @@ -21,8 +21,9 @@ import li.strolch.privilege.model.PrivilegeContext; * *

* To start the execution of an {@link Activity} add it to the {@link ExecutionHandler} by calling {@link - * #addForExecution(String, Locator)}. Actual execution is asynchronously performed and the {@link ExecutionPolicy} of - * the resources of the {@link Action Actions} will perform the actual execution. + * #addForExecution(String, Activity)} or {@link #addForExecution(String, Locator)}. Actual execution is asynchronously + * performed and the {@link ExecutionPolicy} of the resources of the {@link Action Actions} will perform the actual + * execution. *

* *

@@ -51,6 +52,17 @@ public abstract class ExecutionHandler extends StrolchComponent { */ public abstract void addForExecution(String realm, Locator activityLoc); + /** + * Registers the given {@link Activity} for execution, and submits it for execution immediately in an asynchronous + * manner + * + * @param realm + * the realm where the {@link Activity} resides + * @param activity + * the the {@link Activity} + */ + public abstract void addForExecution(String realm, Activity activity); + /** * Removes the given {@link Locator} for an {@link Activity} from execution, so it is not executed further *