diff --git a/li.strolch.agent/src/main/java/li/strolch/agent/api/ObserverEvent.java b/li.strolch.agent/src/main/java/li/strolch/agent/api/ObserverEvent.java new file mode 100644 index 000000000..41cdbbe8b --- /dev/null +++ b/li.strolch.agent/src/main/java/li/strolch/agent/api/ObserverEvent.java @@ -0,0 +1,10 @@ +package li.strolch.agent.api; + +import li.strolch.model.StrolchRootElement; +import li.strolch.utils.collections.MapOfLists; + +public class ObserverEvent { + public MapOfLists added = new MapOfLists<>(); + public MapOfLists updated = new MapOfLists<>(); + public MapOfLists removed = new MapOfLists<>(); +} diff --git a/li.strolch.agent/src/main/java/li/strolch/agent/api/ObserverHandler.java b/li.strolch.agent/src/main/java/li/strolch/agent/api/ObserverHandler.java index f76dc9d66..ce906410a 100644 --- a/li.strolch.agent/src/main/java/li/strolch/agent/api/ObserverHandler.java +++ b/li.strolch.agent/src/main/java/li/strolch/agent/api/ObserverHandler.java @@ -15,8 +15,6 @@ */ package li.strolch.agent.api; -import java.util.List; - import li.strolch.model.StrolchRootElement; import li.strolch.model.Tags; @@ -45,36 +43,12 @@ import li.strolch.model.Tags; public interface ObserverHandler { /** - * Notifies any registered {@link Observer} that the given elements under the given key have been added i.e. created + * Update observers with the given event * - * @param key - * the key for which to notify observers - * @param elements - * the elements to notify observers with + * @param observerEvent + * containing the updates */ - public void add(String key, List elements); - - /** - * Notifies any registered {@link Observer} that the given elements under the given key have been updated i.e. - * modified - * - * @param key - * the key for which to notify observers - * @param elements - * the elements to notify observers with - */ - public void update(String key, List elements); - - /** - * Notifies any registered {@link Observer} that the given elements under the given key have been removed i.e. - * deleted - * - * @param key - * the key for which to notify observers - * @param elements - * the elements to notify observers with - */ - public void remove(String key, List elements); + public void notify(ObserverEvent observerEvent); /** * Registers the {@link Observer} for notification of objects under the given key @@ -95,4 +69,14 @@ public interface ObserverHandler { * the observer unregister */ public void unregisterObserver(String key, Observer observer); + + /** + * Start the update thread + */ + public void start(); + + /** + * Stop the update thread + */ + public void stop(); } diff --git a/li.strolch.agent/src/main/java/li/strolch/agent/impl/CachedRealm.java b/li.strolch.agent/src/main/java/li/strolch/agent/impl/CachedRealm.java index dfa7c1625..50d355e40 100644 --- a/li.strolch.agent/src/main/java/li/strolch/agent/impl/CachedRealm.java +++ b/li.strolch.agent/src/main/java/li/strolch/agent/impl/CachedRealm.java @@ -107,7 +107,8 @@ public class CachedRealm extends InternalStrolchRealm { @Override public void start(PrivilegeContext privilegeContext) { - + super.start(privilegeContext); + long start = System.nanoTime(); int nrOfOrders = 0; int nrOfResources = 0; @@ -163,11 +164,6 @@ public class CachedRealm extends InternalStrolchRealm { logger.info(MessageFormat.format("Loaded {0} Activities", nrOfActivities)); //$NON-NLS-1$ } - @Override - public void stop() { - // - } - @Override public void destroy() { // diff --git a/li.strolch.agent/src/main/java/li/strolch/agent/impl/DefaultObserverHandler.java b/li.strolch.agent/src/main/java/li/strolch/agent/impl/DefaultObserverHandler.java index 7c00c67fc..9cc10386a 100644 --- a/li.strolch.agent/src/main/java/li/strolch/agent/impl/DefaultObserverHandler.java +++ b/li.strolch.agent/src/main/java/li/strolch/agent/impl/DefaultObserverHandler.java @@ -16,21 +16,23 @@ package li.strolch.agent.impl; import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import li.strolch.agent.api.Observer; +import li.strolch.agent.api.ObserverEvent; import li.strolch.agent.api.ObserverHandler; import li.strolch.model.StrolchRootElement; +import li.strolch.runtime.ThreadPoolFactory; +import li.strolch.utils.collections.MapOfLists; /** - * A simple {@link ObserverHandler} which keeps a reference to all registered {@link Observer} and notifies them when - * one of the notify methods are called + * A simple {@link ObserverHandler} which keeps a reference to all registered {@link Observer Observers} and notifies + * them when one of the notify methods are called * * @author Robert von Burg */ @@ -38,18 +40,62 @@ public class DefaultObserverHandler implements ObserverHandler { private static final Logger logger = LoggerFactory.getLogger(DefaultObserverHandler.class); - private Map> observerMap; + private MapOfLists observerMap; + + private ExecutorService executorService; public DefaultObserverHandler() { - this.observerMap = new HashMap<>(); + this.observerMap = new MapOfLists<>(); } @Override - public void add(String key, List elements) { + public void start() { + this.executorService = Executors.newSingleThreadExecutor(new ThreadPoolFactory("ObserverHandler")); + } + + @Override + public void stop() { + if (this.executorService != null) { + this.executorService.shutdown(); + while (!this.executorService.isTerminated()) { + logger.info("Waiting for last update to complete..."); + try { + Thread.sleep(50L); + } catch (InterruptedException e) { + logger.error("Interrupted!"); + } + } + + this.executorService = null; + } + } + + @Override + public void notify(ObserverEvent event) { + + if (event.added.isEmpty() && event.updated.isEmpty() && event.removed.isEmpty()) + return; + + this.executorService.execute(() -> { + synchronized (this.observerMap) { + for (String key : event.added.keySet()) { + add(key, event.added.getList(key)); + } + for (String key : event.updated.keySet()) { + update(key, event.updated.getList(key)); + } + for (String key : event.removed.keySet()) { + remove(key, event.removed.getList(key)); + } + } + }); + } + + private void add(String key, List elements) { if (elements == null || elements.isEmpty()) return; - List observerList = this.observerMap.get(key); + List observerList = this.observerMap.getList(key); if (observerList != null && !observerList.isEmpty()) { for (Observer observer : observerList) { try { @@ -63,12 +109,11 @@ public class DefaultObserverHandler implements ObserverHandler { } } - @Override - public void update(String key, List elements) { + private void update(String key, List elements) { if (elements == null || elements.isEmpty()) return; - List observerList = this.observerMap.get(key); + List observerList = this.observerMap.getList(key); if (observerList != null && !observerList.isEmpty()) { for (Observer observer : observerList) { try { @@ -82,12 +127,11 @@ public class DefaultObserverHandler implements ObserverHandler { } } - @Override - public void remove(String key, List elements) { + private void remove(String key, List elements) { if (elements == null || elements.isEmpty()) return; - List observerList = this.observerMap.get(key); + List observerList = this.observerMap.getList(key); if (observerList != null && !observerList.isEmpty()) { for (Observer observer : observerList) { try { @@ -103,22 +147,20 @@ public class DefaultObserverHandler implements ObserverHandler { @Override public void registerObserver(String key, Observer observer) { - List observerList = this.observerMap.get(key); - if (observerList == null) { - observerList = new ArrayList<>(); - this.observerMap.put(key, observerList); + synchronized (this.observerMap) { + this.observerMap.addElement(key, observer); + String msg = MessageFormat.format("Registered observer {0} with {1}", key, observer); //$NON-NLS-1$ + logger.info(msg); } - observerList.add(observer); - String msg = MessageFormat.format("Registered observer {0} with {1}", key, observer); //$NON-NLS-1$ - logger.info(msg); } @Override public void unregisterObserver(String key, Observer observer) { - List observerList = this.observerMap.get(key); - if (observerList != null && observerList.remove(observer)) { - String msg = MessageFormat.format("Unregistered observer {0} with {1}", key, observer); //$NON-NLS-1$ - logger.info(msg); + synchronized (this.observerMap) { + if (this.observerMap.removeElement(key, observer)) { + String msg = MessageFormat.format("Unregistered observer {0} with {1}", key, observer); //$NON-NLS-1$ + logger.info(msg); + } } } } diff --git a/li.strolch.agent/src/main/java/li/strolch/agent/impl/EmptyRealm.java b/li.strolch.agent/src/main/java/li/strolch/agent/impl/EmptyRealm.java index 854cc9f3d..7ae03770d 100644 --- a/li.strolch.agent/src/main/java/li/strolch/agent/impl/EmptyRealm.java +++ b/li.strolch.agent/src/main/java/li/strolch/agent/impl/EmptyRealm.java @@ -98,14 +98,10 @@ public class EmptyRealm extends InternalStrolchRealm { @Override public void start(PrivilegeContext privilegeContext) { + super.start(privilegeContext); logger.info(MessageFormat.format("Initialized EMPTY Realm {0}", getRealm())); //$NON-NLS-1$ } - @Override - public void stop() { - // - } - @Override public void destroy() { // diff --git a/li.strolch.agent/src/main/java/li/strolch/agent/impl/InternalStrolchRealm.java b/li.strolch.agent/src/main/java/li/strolch/agent/impl/InternalStrolchRealm.java index 54fcb1a79..6a764fecd 100644 --- a/li.strolch.agent/src/main/java/li/strolch/agent/impl/InternalStrolchRealm.java +++ b/li.strolch.agent/src/main/java/li/strolch/agent/impl/InternalStrolchRealm.java @@ -158,9 +158,19 @@ public abstract class InternalStrolchRealm implements StrolchRealm { return this.observerHandler; } - public abstract void start(PrivilegeContext privilegeContext); + public void start(PrivilegeContext privilegeContext) { - public abstract void stop(); + if (this.observerHandler != null) { + this.observerHandler.start(); + } + } + + public void stop() { + + if (this.observerHandler != null) { + this.observerHandler.stop(); + } + } public abstract void destroy(); @@ -171,5 +181,4 @@ public abstract class InternalStrolchRealm implements StrolchRealm { public abstract ActivityMap getActivityMap(); public abstract AuditTrail getAuditTrail(); - } diff --git a/li.strolch.agent/src/main/java/li/strolch/agent/impl/TransactionalRealm.java b/li.strolch.agent/src/main/java/li/strolch/agent/impl/TransactionalRealm.java index 323306285..27c669871 100644 --- a/li.strolch.agent/src/main/java/li/strolch/agent/impl/TransactionalRealm.java +++ b/li.strolch.agent/src/main/java/li/strolch/agent/impl/TransactionalRealm.java @@ -100,6 +100,7 @@ public class TransactionalRealm extends InternalStrolchRealm { @Override public void start(PrivilegeContext privilegeContext) { + super.start(privilegeContext); long start = System.nanoTime(); int nrOfOrders = 0; @@ -127,11 +128,6 @@ public class TransactionalRealm extends InternalStrolchRealm { logger.info(MessageFormat.format("There are {0} Activities", nrOfActivities)); //$NON-NLS-1$ } - @Override - public void stop() { - // - } - @Override public void destroy() { // diff --git a/li.strolch.agent/src/main/java/li/strolch/agent/impl/TransientRealm.java b/li.strolch.agent/src/main/java/li/strolch/agent/impl/TransientRealm.java index 53bd07c23..60f63836f 100644 --- a/li.strolch.agent/src/main/java/li/strolch/agent/impl/TransientRealm.java +++ b/li.strolch.agent/src/main/java/li/strolch/agent/impl/TransientRealm.java @@ -116,6 +116,7 @@ public class TransientRealm extends InternalStrolchRealm { @Override public void start(PrivilegeContext privilegeContext) { + super.start(privilegeContext); ModelStatistics statistics; try (StrolchTransaction tx = openTx(privilegeContext.getCertificate(), "strolch_boot")) { @@ -141,11 +142,6 @@ public class TransientRealm extends InternalStrolchRealm { logger.info(MessageFormat.format("Loaded {0} Activities", statistics.nrOfActivities)); //$NON-NLS-1$ } - @Override - public void stop() { - // - } - @Override public void destroy() { // diff --git a/li.strolch.agent/src/main/java/li/strolch/persistence/api/AbstractTransaction.java b/li.strolch.agent/src/main/java/li/strolch/persistence/api/AbstractTransaction.java index de069d2fe..eaf67bef3 100644 --- a/li.strolch.agent/src/main/java/li/strolch/persistence/api/AbstractTransaction.java +++ b/li.strolch.agent/src/main/java/li/strolch/persistence/api/AbstractTransaction.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import li.strolch.agent.api.ActivityMap; import li.strolch.agent.api.AuditTrail; +import li.strolch.agent.api.ObserverEvent; import li.strolch.agent.api.ObserverHandler; import li.strolch.agent.api.OrderMap; import li.strolch.agent.api.ResourceMap; @@ -886,35 +887,38 @@ public abstract class AbstractTransaction implements StrolchTransaction { long observerUpdateStart = System.nanoTime(); - ObserverHandler observerHandler = this.realm.getObserverHandler(); - - if (this.orderMap != null) { - if (!this.orderMap.getCreated().isEmpty()) - observerHandler.add(Tags.ORDER, new ArrayList(this.orderMap.getCreated())); - if (!this.orderMap.getUpdated().isEmpty()) - observerHandler.update(Tags.ORDER, new ArrayList(this.orderMap.getUpdated())); - if (!this.orderMap.getDeleted().isEmpty()) - observerHandler.remove(Tags.ORDER, new ArrayList(this.orderMap.getDeleted())); - } + ObserverEvent event = new ObserverEvent(); if (this.resourceMap != null) { if (!this.resourceMap.getCreated().isEmpty()) - observerHandler.add(Tags.RESOURCE, new ArrayList(this.resourceMap.getCreated())); + event.added.addList(Tags.RESOURCE, new ArrayList(this.resourceMap.getCreated())); if (!this.resourceMap.getUpdated().isEmpty()) - observerHandler.update(Tags.RESOURCE, new ArrayList(this.resourceMap.getUpdated())); + event.updated.addList(Tags.RESOURCE, new ArrayList(this.resourceMap.getUpdated())); if (!this.resourceMap.getDeleted().isEmpty()) - observerHandler.remove(Tags.RESOURCE, new ArrayList(this.resourceMap.getDeleted())); + event.removed.addList(Tags.RESOURCE, new ArrayList(this.resourceMap.getDeleted())); + } + + if (this.orderMap != null) { + if (!this.orderMap.getCreated().isEmpty()) + event.added.addList(Tags.ORDER, new ArrayList(this.orderMap.getCreated())); + if (!this.orderMap.getUpdated().isEmpty()) + event.updated.addList(Tags.ORDER, new ArrayList(this.orderMap.getUpdated())); + if (!this.orderMap.getDeleted().isEmpty()) + event.removed.addList(Tags.ORDER, new ArrayList(this.orderMap.getDeleted())); } if (this.activityMap != null) { if (!this.activityMap.getCreated().isEmpty()) - observerHandler.add(Tags.ACTIVITY, new ArrayList(this.activityMap.getCreated())); + event.added.addList(Tags.ACTIVITY, new ArrayList(this.activityMap.getCreated())); if (!this.activityMap.getUpdated().isEmpty()) - observerHandler.update(Tags.ACTIVITY, new ArrayList(this.activityMap.getUpdated())); + event.updated.addList(Tags.ACTIVITY, new ArrayList(this.activityMap.getUpdated())); if (!this.activityMap.getDeleted().isEmpty()) - observerHandler.remove(Tags.ACTIVITY, new ArrayList(this.activityMap.getDeleted())); + event.removed.addList(Tags.ACTIVITY, new ArrayList(this.activityMap.getDeleted())); } + ObserverHandler observerHandler = this.realm.getObserverHandler(); + observerHandler.notify(event); + long observerUpdateDuration = System.nanoTime() - observerUpdateStart; return observerUpdateDuration; }