[New] Added EventBasedExecutionHandler.addForExecution(String, Activity)

This commit is contained in:
Robert von Burg 2020-02-12 15:00:39 +01:00
parent fba2d3890d
commit fbb05a2b01
2 changed files with 138 additions and 7 deletions

View File

@ -7,16 +7,14 @@ import java.util.*;
import java.util.concurrent.ExecutorService;
import li.strolch.agent.api.ComponentContainer;
import li.strolch.agent.api.ObserverEvent;
import li.strolch.execution.command.*;
import li.strolch.execution.policy.ActivityArchivalPolicy;
import li.strolch.execution.policy.ExecutionPolicy;
import li.strolch.handler.operationslog.LogMessage;
import li.strolch.handler.operationslog.LogSeverity;
import li.strolch.handler.operationslog.OperationsLog;
import li.strolch.model.Locator;
import li.strolch.model.ParameterBag;
import li.strolch.model.Resource;
import li.strolch.model.State;
import li.strolch.model.*;
import li.strolch.model.activity.Action;
import li.strolch.model.activity.Activity;
import li.strolch.model.activity.IActivityElement;
@ -97,6 +95,23 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
}
}
@Override
public void addForExecution(String realm, Activity activity) {
ExecutionHandlerState state = this.statesByRealm.getOrDefault(realm, ExecutionHandlerState.Running);
if (state == ExecutionHandlerState.HaltNew)
throw new IllegalStateException(
"ExecutionHandler state is " + state + ", can not add activities for execution!");
Locator rootElemLoc = activity.getLocator();
synchronized (this.registeredActivities) {
this.registeredActivities.addElement(realm, rootElemLoc);
}
notifyObserverAdd(realm, activity);
toExecution(realm, rootElemLoc);
}
@Override
public void addForExecution(String realm, Locator activityLoc) {
@ -109,6 +124,8 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
synchronized (this.registeredActivities) {
this.registeredActivities.addElement(realm, rootElemLoc);
}
getExecutor().submit(() -> notifyObserverAdd(realm, activityLoc));
toExecution(realm, activityLoc);
}
@ -118,11 +135,13 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
synchronized (this.registeredActivities) {
this.registeredActivities.removeElement(realm, rootElemLoc);
}
getExecutor().submit(() -> notifyObserverRemove(realm, activityLoc));
}
@Override
public void clearAllCurrentExecutions(String realm) {
this.registeredActivities.removeSet(realm);
Set<Locator> removed = this.registeredActivities.removeSet(realm);
getExecutor().submit(() -> notifyObserverRemove(realm, removed));
}
private void restartActivityExecution(PrivilegeContext ctx) {
@ -430,6 +449,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
synchronized (this.registeredActivities) {
this.registeredActivities.removeElement(realm, activityLoc);
}
notifyObserverRemove(realm, activityLoc);
return;
}
@ -440,6 +460,8 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
logger.warn("Activity " + activityLoc + " already removed from registered activities!");
}
notifyObserverRemove(tx, activity);
logger.info("Archiving activity " + activityLoc + " with state " + activity.getState());
archiveActivity(realm, activity.getLocator());
@ -450,6 +472,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
command.validate();
command.doCommand();
notifyObserverUpdate(tx, activity);
tx.commitOnClose();
}
}
@ -471,6 +494,8 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
command.validate();
command.doCommand();
notifyObserverUpdate(tx, action.getRootElement());
// flush so we can see the changes performed
tx.flush();
@ -483,6 +508,8 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
logger.warn("Activity " + activityLoc + " already removed from registered activities!");
}
notifyObserverRemove(tx, action.getRootElement());
logger.info("Archiving activity " + activityLoc + " with state " + activity.getState());
archiveActivity(realm, activity.getLocator());
@ -500,6 +527,8 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
execCommand.validate();
execCommand.doCommand();
notifyObserverUpdate(tx, action.getRootElement());
// flush so we can see the changes performed
tx.flush();
}
@ -525,6 +554,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
command.validate();
command.doCommand();
notifyObserverUpdate(tx, elem.getRootElement());
tx.commitOnClose();
}
}
@ -542,6 +572,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
command.validate();
command.doCommand();
notifyObserverUpdate(tx, elem.getRootElement());
tx.commitOnClose();
}
}
@ -559,6 +590,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
command.validate();
command.doCommand();
notifyObserverUpdate(tx, elem.getRootElement());
tx.commitOnClose();
}
@ -566,6 +598,93 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
triggerExecution(realm);
}
private void notifyObserverAdd(String realm, Locator activityLoc) {
try {
runAsAgent(ctx -> {
try (StrolchTransaction tx = openTx(realm, ctx.getCertificate(), true)) {
Activity activity = tx.findElement(activityLoc, true);
if (activity != null) {
ObserverEvent observerEvent = new ObserverEvent();
observerEvent.added.addElement(Tags.CONTROLLER, activity);
getContainer().getRealm(realm).getObserverHandler().notify(observerEvent);
}
}
});
} catch (Exception e) {
logger.error("Failed to notify observers of new controller " + activityLoc);
}
}
private void notifyObserverAdd(String realm, Activity rootElement) {
if (!getContainer().getRealm(realm).isUpdateObservers())
return;
ObserverEvent observerEvent = new ObserverEvent();
observerEvent.added.addElement(Tags.CONTROLLER, rootElement);
getContainer().getRealm(realm).getObserverHandler().notify(observerEvent);
}
private void notifyObserverUpdate(StrolchTransaction tx, Activity rootElement) {
if (!getContainer().getRealm(tx.getRealmName()).isUpdateObservers())
return;
ObserverEvent observerEvent = new ObserverEvent();
observerEvent.updated.addElement(Tags.CONTROLLER, rootElement);
tx.getContainer().getRealm(tx.getRealmName()).getObserverHandler().notify(observerEvent);
}
private void notifyObserverRemove(StrolchTransaction tx, Activity rootElement) {
if (!getContainer().getRealm(tx.getRealmName()).isUpdateObservers())
return;
ObserverEvent observerEvent = new ObserverEvent();
observerEvent.removed.addElement(Tags.CONTROLLER, rootElement);
tx.getContainer().getRealm(tx.getRealmName()).getObserverHandler().notify(observerEvent);
}
private void notifyObserverRemove(String realm, Locator activityLoc) {
if (!getContainer().getRealm(realm).isUpdateObservers())
return;
try {
runAsAgent(ctx -> {
try (StrolchTransaction tx = openTx(realm, ctx.getCertificate(), true)) {
Activity activity = tx.findElement(activityLoc, true);
if (activity != null) {
ObserverEvent observerEvent = new ObserverEvent();
observerEvent.removed.addElement(Tags.CONTROLLER, activity);
getContainer().getRealm(realm).getObserverHandler().notify(observerEvent);
}
}
});
} catch (Exception e) {
logger.error("Failed to notify observers of removed controller " + activityLoc);
}
}
private void notifyObserverRemove(String realm, Set<Locator> activityLocs) {
if (!getContainer().getRealm(realm).isUpdateObservers())
return;
try {
runAsAgent(ctx -> {
try (StrolchTransaction tx = openTx(realm, ctx.getCertificate(), true)) {
ObserverEvent observerEvent = new ObserverEvent();
for (Locator activityLoc : activityLocs) {
Activity activity = tx.findElement(activityLoc, true);
if (activity != null)
observerEvent.removed.addElement(Tags.CONTROLLER, activity);
}
getContainer().getRealm(realm).getObserverHandler().notify(observerEvent);
}
});
} catch (Exception e) {
logger.error("Failed to notify observers of removed controllers " + activityLocs);
}
}
@Override
public DelayedExecutionTimer getDelayedExecutionTimer() {
return this.delayedExecutionTimer;

View File

@ -21,8 +21,9 @@ import li.strolch.privilege.model.PrivilegeContext;
*
* <p>
* To start the execution of an {@link Activity} add it to the {@link ExecutionHandler} by calling {@link
* #addForExecution(String, Locator)}. Actual execution is asynchronously performed and the {@link ExecutionPolicy} of
* the resources of the {@link Action Actions} will perform the actual execution.
* #addForExecution(String, Activity)} or {@link #addForExecution(String, Locator)}. Actual execution is asynchronously
* performed and the {@link ExecutionPolicy} of the resources of the {@link Action Actions} will perform the actual
* execution.
* </p>
*
* <p>
@ -51,6 +52,17 @@ public abstract class ExecutionHandler extends StrolchComponent {
*/
public abstract void addForExecution(String realm, Locator activityLoc);
/**
* Registers the given {@link Activity} for execution, and submits it for execution immediately in an asynchronous
* manner
*
* @param realm
* the realm where the {@link Activity} resides
* @param activity
* the the {@link Activity}
*/
public abstract void addForExecution(String realm, Activity activity);
/**
* Removes the given {@link Locator} for an {@link Activity} from execution, so it is not executed further
*