diff --git a/li.strolch.agent/src/main/java/li/strolch/agent/impl/DefaultObserverHandler.java b/li.strolch.agent/src/main/java/li/strolch/agent/impl/DefaultObserverHandler.java index 2d6eed840..81baebbd9 100644 --- a/li.strolch.agent/src/main/java/li/strolch/agent/impl/DefaultObserverHandler.java +++ b/li.strolch.agent/src/main/java/li/strolch/agent/impl/DefaultObserverHandler.java @@ -17,12 +17,15 @@ package li.strolch.agent.impl; import static li.strolch.model.Tags.AGENT; import static li.strolch.runtime.StrolchConstants.SYSTEM_USER_AGENT; +import static li.strolch.utils.collections.SynchronizedCollections.synchronizedMapOfLists; import java.text.MessageFormat; +import java.util.ArrayList; import java.util.List; import java.util.ResourceBundle; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.BlockingDeque; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; import li.strolch.agent.api.*; import li.strolch.handler.operationslog.OperationsLog; @@ -47,104 +50,129 @@ public class DefaultObserverHandler implements ObserverHandler { private final StrolchAgent agent; private final StrolchRealm realm; + private volatile boolean run; + + private final BlockingDeque eventQueue; + private final MapOfLists observerMap; + private Future updateTask; public DefaultObserverHandler(StrolchAgent agent, StrolchRealm realm) { this.agent = agent; this.realm = realm; - this.observerMap = new MapOfLists<>(); + this.observerMap = synchronizedMapOfLists(new MapOfLists<>()); + this.eventQueue = new LinkedBlockingDeque<>(); } @Override public void start() { - // nothing to do + this.run = true; + this.updateTask = this.agent.getSingleThreadExecutor("Observer").submit(this::doUpdates); } @Override public void stop() { - // nothing to do + this.run = false; + this.updateTask.cancel(true); } @Override public void notify(ObserverEvent event) { - if (event.added.isEmpty() && event.updated.isEmpty() && event.removed.isEmpty()) - return; - - ExecutorService service = this.agent.getExecutor("Observer"); - if (!service.isShutdown()) - service.submit(() -> doUpdates(event)); + if (!(event.added.isEmpty() && event.updated.isEmpty() && event.removed.isEmpty())) + this.eventQueue.addLast(event); } - protected void doUpdates(ObserverEvent event) { - synchronized (this.observerMap) { - for (String key : event.added.keySet()) { - add(key, event.added.getList(key)); - } - for (String key : event.updated.keySet()) { - update(key, event.updated.getList(key)); - } - for (String key : event.removed.keySet()) { - remove(key, event.removed.getList(key)); + protected void doUpdates() { + while (this.run) { + try { + ObserverEvent event = this.eventQueue.takeFirst(); + + for (String key : event.added.keySet()) { + List list = event.added.getList(key); + if (list != null) + notifyAdd(key, list); + } + for (String key : event.updated.keySet()) { + List list = event.updated.getList(key); + if (list != null) + notifyUpdate(key, list); + } + for (String key : event.removed.keySet()) { + List list = event.removed.getList(key); + if (list != null) + notifyRemove(key, list); + } + + } catch (InterruptedException e) { + if (this.run) + logger.error("Failed to do updates!", e); + else + logger.warn("Interrupted!"); } } } - private void add(String key, List elements) { - if (elements == null || elements.isEmpty()) + private List getObservers(String key) { + List observerList = this.observerMap.getList(key); + if (observerList == null) + return null; + + observerList = new ArrayList<>(observerList); + if (observerList.isEmpty()) + return null; + return observerList; + } + + private void notifyAdd(String key, List elements) { + List observerList = getObservers(key); + if (observerList == null) return; - List observerList = this.observerMap.getList(key); - if (observerList != null && !observerList.isEmpty()) { - for (Observer observer : observerList) { - try { - observer.add(key, elements); - } catch (Exception e) { - String msg = "Failed to update observer {0} with {1} due to {2}"; //$NON-NLS-1$ - msg = MessageFormat.format(msg, key, observer, e.getMessage()); - logger.error(msg, e); + for (Observer observer : observerList) { + try { + observer.add(key, elements); + } catch (Exception e) { + String msg = "Failed to update observer {0} with {1} due to {2}"; //$NON-NLS-1$ + msg = MessageFormat.format(msg, key, observer, e.getMessage()); + logger.error(msg, e); - addLogMessage("add", e); - } + addLogMessage("add", e); } } } - private void update(String key, List elements) { - if (elements == null || elements.isEmpty()) + private void notifyUpdate(String key, List elements) { + List observerList = getObservers(key); + if (observerList == null) return; - List observerList = this.observerMap.getList(key); - if (observerList != null && !observerList.isEmpty()) { - for (Observer observer : observerList) { - try { - observer.update(key, elements); - } catch (Exception e) { - String msg = "Failed to update observer {0} with {1} due to {2}"; //$NON-NLS-1$ - msg = MessageFormat.format(msg, key, observer, e.getMessage()); - logger.error(msg, e); + for (Observer observer : observerList) { + try { + observer.update(key, elements); + } catch (Exception e) { + String msg = "Failed to update observer {0} with {1} due to {2}"; //$NON-NLS-1$ + msg = MessageFormat.format(msg, key, observer, e.getMessage()); + logger.error(msg, e); - addLogMessage("update", e); - } + addLogMessage("update", e); } } } - private void remove(String key, List elements) { - if (elements == null || elements.isEmpty()) + private void notifyRemove(String key, List elements) { + List observerList = getObservers(key); + if (observerList == null) return; - List observerList = this.observerMap.getList(key); - if (observerList != null && !observerList.isEmpty()) { - for (Observer observer : observerList) { - try { - observer.remove(key, elements); - } catch (Exception e) { - String msg = "Failed to update observer {0} with {1} due to {2}"; //$NON-NLS-1$ - msg = MessageFormat.format(msg, key, observer, e.getMessage()); - logger.error(msg, e); + for (Observer observer : observerList) { + try { + observer.remove(key, elements); + } catch (Exception e) { + String msg = "Failed to update observer {0} with {1} due to {2}"; //$NON-NLS-1$ + msg = MessageFormat.format(msg, key, observer, e.getMessage()); + logger.error(msg, e); - addLogMessage("remove", e); - } + addLogMessage("remove", e); } } } @@ -162,20 +190,16 @@ public class DefaultObserverHandler implements ObserverHandler { @Override public void registerObserver(String key, Observer observer) { - synchronized (this.observerMap) { - this.observerMap.addElement(key, observer); - String msg = MessageFormat.format("Registered observer {0} with {1}", key, observer); //$NON-NLS-1$ - logger.info(msg); - } + this.observerMap.addElement(key, observer); + String msg = MessageFormat.format("Registered observer {0} with {1}", key, observer); //$NON-NLS-1$ + logger.info(msg); } @Override public void unregisterObserver(String key, Observer observer) { - synchronized (this.observerMap) { - if (this.observerMap.removeElement(key, observer)) { - String msg = MessageFormat.format("Unregistered observer {0} with {1}", key, observer); //$NON-NLS-1$ - logger.info(msg); - } + if (this.observerMap.removeElement(key, observer)) { + String msg = MessageFormat.format("Unregistered observer {0} with {1}", key, observer); //$NON-NLS-1$ + logger.info(msg); } } } diff --git a/li.strolch.agent/src/main/java/li/strolch/agent/impl/DefaultRealmHandler.java b/li.strolch.agent/src/main/java/li/strolch/agent/impl/DefaultRealmHandler.java index 4d892e233..eaf099b4c 100644 --- a/li.strolch.agent/src/main/java/li/strolch/agent/impl/DefaultRealmHandler.java +++ b/li.strolch.agent/src/main/java/li/strolch/agent/impl/DefaultRealmHandler.java @@ -39,7 +39,6 @@ public class DefaultRealmHandler extends StrolchComponent implements RealmHandle public static final String PROP_ENABLE_AUDIT_TRAIL = "enableAuditTrail"; //$NON-NLS-1$ public static final String PROP_ENABLE_AUDIT_TRAIL_FOR_READ = "enableAuditTrailForRead"; //$NON-NLS-1$ public static final String PROP_ENABLE_OBSERVER_UPDATES = "enableObserverUpdates"; //$NON-NLS-1$ - public static final String PROP_ENABLED_DELAYED_OBSERVER_UPDATES = "enableDelayedObserverUpdates"; //$NON-NLS-1$ public static final String PROP_ENABLE_VERSIONING = "enableVersioning"; //$NON-NLS-1$ public static final String PREFIX_DATA_STORE_MODE = "dataStoreMode"; //$NON-NLS-1$ public static final String PREFIX_DATA_STORE_FILE = "dataStoreFile"; //$NON-NLS-1$ diff --git a/li.strolch.agent/src/main/java/li/strolch/agent/impl/EventCollectingObserverHandler.java b/li.strolch.agent/src/main/java/li/strolch/agent/impl/EventCollectingObserverHandler.java deleted file mode 100644 index 075b3b5ac..000000000 --- a/li.strolch.agent/src/main/java/li/strolch/agent/impl/EventCollectingObserverHandler.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Copyright 2013 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.agent.impl; - -import static li.strolch.model.Tags.AGENT; -import static li.strolch.runtime.StrolchConstants.SYSTEM_USER_AGENT; - -import java.text.MessageFormat; -import java.util.List; -import java.util.ResourceBundle; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import li.strolch.agent.api.*; -import li.strolch.model.log.LogMessage; -import li.strolch.model.log.LogMessageState; -import li.strolch.model.log.LogSeverity; -import li.strolch.handler.operationslog.OperationsLog; -import li.strolch.model.Locator; -import li.strolch.model.StrolchRootElement; -import li.strolch.utils.collections.MapOfLists; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A simple {@link ObserverHandler} which keeps a reference to all registered {@link Observer Observers} and notifies - * them when one of the notify methods are called - * - * @author Robert von Burg - */ -public class EventCollectingObserverHandler implements ObserverHandler { - - private static final Logger logger = LoggerFactory.getLogger(DefaultObserverHandler.class); - - private final MapOfLists observerMap; - - private ObserverEvent observerEvent; - - private ScheduledFuture future; - private final StrolchAgent agent; - private final StrolchRealm realm; - - public EventCollectingObserverHandler(StrolchAgent agent, StrolchRealm realm) { - this.agent = agent; - this.realm = realm; - this.observerMap = new MapOfLists<>(); - } - - @Override - public void start() { - // nothing to do - } - - @Override - public void stop() { - if (this.future != null) { - this.future.cancel(false); - this.future = null; - } - } - - private ScheduledExecutorService getExecutor() { - return this.agent.getScheduledExecutor("Observer"); - } - - @Override - public synchronized void notify(ObserverEvent event) { - if (event.added.isEmpty() && event.updated.isEmpty() && event.removed.isEmpty()) - return; - - synchronized (this) { - if (this.observerEvent == null) { - this.observerEvent = event; - } else { - this.observerEvent.added.addAll(event.added); - this.observerEvent.updated.addAll(event.updated); - this.observerEvent.removed.addAll(event.removed); - } - } - - if (this.future == null || this.future.isDone()) { - this.future = getExecutor().scheduleAtFixedRate(this::doUpdates, 100, 100, TimeUnit.MILLISECONDS); - } - } - - protected void doUpdates() { - - ObserverEvent event; - synchronized (this) { - if (this.observerEvent == null) - return; - - event = this.observerEvent; - this.observerEvent = null; - } - - synchronized (this.observerMap) { - for (String key : event.added.keySet()) { - add(key, event.added.getList(key)); - } - for (String key : event.updated.keySet()) { - update(key, event.updated.getList(key)); - } - for (String key : event.removed.keySet()) { - remove(key, event.removed.getList(key)); - } - } - - synchronized (this) { - if (this.observerEvent != null) { - this.future = getExecutor().scheduleAtFixedRate(this::doUpdates, 100, 100, TimeUnit.MILLISECONDS); - } - } - } - - private void add(String key, List elements) { - if (elements == null || elements.isEmpty()) - return; - - List observerList = this.observerMap.getList(key); - if (observerList != null && !observerList.isEmpty()) { - for (Observer observer : observerList) { - try { - observer.add(key, elements); - } catch (Exception e) { - String msg = "Failed to update observer {0} with {1} due to {2}"; //$NON-NLS-1$ - msg = MessageFormat.format(msg, key, observer, e.getMessage()); - logger.error(msg, e); - - addLogMessage("add", e); - } - } - } - } - - private void update(String key, List elements) { - if (elements == null || elements.isEmpty()) - return; - - List observerList = this.observerMap.getList(key); - if (observerList != null && !observerList.isEmpty()) { - for (Observer observer : observerList) { - try { - observer.update(key, elements); - } catch (Exception e) { - String msg = "Failed to update observer {0} with {1} due to {2}"; //$NON-NLS-1$ - msg = MessageFormat.format(msg, key, observer, e.getMessage()); - logger.error(msg, e); - - addLogMessage("update", e); - } - } - } - } - - private void remove(String key, List elements) { - if (elements == null || elements.isEmpty()) - return; - - List observerList = this.observerMap.getList(key); - if (observerList != null && !observerList.isEmpty()) { - for (Observer observer : observerList) { - try { - observer.remove(key, elements); - } catch (Exception e) { - String msg = "Failed to update observer {0} with {1} due to {2}"; //$NON-NLS-1$ - msg = MessageFormat.format(msg, key, observer, e.getMessage()); - logger.error(msg, e); - - addLogMessage("remove", e); - } - } - } - } - - private void addLogMessage(String type, Exception e) { - ComponentContainer container = this.agent.getContainer(); - if (container.hasComponent(OperationsLog.class)) { - OperationsLog operationsLog = container.getComponent(OperationsLog.class); - operationsLog.addMessage(new LogMessage(this.realm.getRealm(), SYSTEM_USER_AGENT, - Locator.valueOf(AGENT, ObserverHandler.class.getName(), type, StrolchAgent.getUniqueId()), - LogSeverity.Exception, LogMessageState.Information, ResourceBundle.getBundle("strolch-agent"), "agent.observers.update.failed") - .withException(e).value("type", type).value("reason", e)); - } - } - - @Override - public void registerObserver(String key, Observer observer) { - synchronized (this.observerMap) { - this.observerMap.addElement(key, observer); - String msg = MessageFormat.format("Registered observer {0} with {1}", key, observer); //$NON-NLS-1$ - logger.info(msg); - } - } - - @Override - public void unregisterObserver(String key, Observer observer) { - synchronized (this.observerMap) { - if (this.observerMap.removeElement(key, observer)) { - String msg = MessageFormat.format("Unregistered observer {0} with {1}", key, observer); //$NON-NLS-1$ - logger.info(msg); - } - } - } -} diff --git a/li.strolch.agent/src/main/java/li/strolch/agent/impl/InternalStrolchRealm.java b/li.strolch.agent/src/main/java/li/strolch/agent/impl/InternalStrolchRealm.java index 779c045cf..8eb756fe9 100644 --- a/li.strolch.agent/src/main/java/li/strolch/agent/impl/InternalStrolchRealm.java +++ b/li.strolch.agent/src/main/java/li/strolch/agent/impl/InternalStrolchRealm.java @@ -37,7 +37,7 @@ public abstract class InternalStrolchRealm implements StrolchRealm { public static final String PROP_TRY_LOCK_TIME_UNIT = "tryLockTimeUnit"; //$NON-NLS-1$ public static final String PROP_TRY_LOCK_TIME = "tryLockTime"; //$NON-NLS-1$ protected static final Logger logger = LoggerFactory.getLogger(StrolchRealm.class); - private String realm; + private final String realm; private LockHandler lockHandler; private boolean auditTrailEnabled; private boolean auditTrailEnabledForRead; @@ -89,13 +89,7 @@ public abstract class InternalStrolchRealm implements StrolchRealm { String updateObserversKey = makeRealmKey(getRealm(), PROP_ENABLE_OBSERVER_UPDATES); this.updateObservers = configuration.getBoolean(updateObserversKey, Boolean.FALSE); if (this.updateObservers) { - String delayedObserversKey = makeRealmKey(getRealm(), PROP_ENABLED_DELAYED_OBSERVER_UPDATES); - if (configuration.getBoolean(delayedObserversKey, Boolean.FALSE)) { - this.observerHandler = new EventCollectingObserverHandler(container.getAgent(), this); - logger.info("Enabled Delayed Observer Updates."); - } else { - this.observerHandler = new DefaultObserverHandler(container.getAgent(), this); - } + this.observerHandler = new DefaultObserverHandler(container.getAgent(), this); } // lock timeout diff --git a/li.strolch.agent/src/test/java/li/strolch/agent/ObserverHandlerTests.java b/li.strolch.agent/src/test/java/li/strolch/agent/ObserverHandlerTests.java new file mode 100644 index 000000000..f6152a683 --- /dev/null +++ b/li.strolch.agent/src/test/java/li/strolch/agent/ObserverHandlerTests.java @@ -0,0 +1,97 @@ +package li.strolch.agent; + +import static org.junit.Assert.assertEquals; + +import java.security.SecureRandom; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import li.strolch.RuntimeMock; +import li.strolch.agent.api.Observer; +import li.strolch.agent.api.StrolchRealm; +import li.strolch.model.Resource; +import li.strolch.model.StrolchRootElement; +import li.strolch.model.Tags; +import li.strolch.persistence.api.StrolchTransaction; +import li.strolch.privilege.model.Certificate; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ObserverHandlerTests { + + public static final Logger logger = LoggerFactory.getLogger(ObserverHandlerTests.class); + + private static final String TARGET_PATH = "target/" + ObserverHandlerTests.class.getSimpleName(); + private static final String SOURCE_PATH = "src/test/resources/transienttest"; + public static final int MAX_WAIT_PER_UPDATE = 100; + public static final int UPDATE_SIZE = 50; + + private static RuntimeMock runtimeMock; + private static Certificate cert; + + private static final AtomicInteger updateCount = new AtomicInteger(0); + + @BeforeClass + public static void beforeClass() { + runtimeMock = new RuntimeMock(TARGET_PATH, SOURCE_PATH).mockRuntime(); + runtimeMock.startContainer(); + cert = runtimeMock.getPrivilegeHandler().authenticate("test", "test".toCharArray()); + + // register an observer + StrolchRealm realm = runtimeMock.getAgent().getContainer().getRealm(cert); + realm.getObserverHandler().registerObserver(Tags.RESOURCE, new TestObserver()); + } + + @AfterClass + public static void afterClass() { + if (cert != null) + runtimeMock.getPrivilegeHandler().invalidate(cert); + if (runtimeMock != null) + runtimeMock.destroyRuntime(); + } + + @Test + public void shouldNotifyObservers() throws InterruptedException { + + updateCount.set(0); + long expectedCount = 0; + + for (int i = 0; i < UPDATE_SIZE; i++) { + StrolchRealm realm = runtimeMock.getAgent().getContainer().getRealm(cert); + try (StrolchTransaction tx = realm.openTx(cert, ParallelTests.class, false)) { + tx.add(new Resource("res_" + i, "Resource " + i, "MyType")); + tx.commitOnClose(); + } + + expectedCount++; + } + + long start = System.currentTimeMillis(); + long maxWaitTime = MAX_WAIT_PER_UPDATE * expectedCount; + while (updateCount.get() != expectedCount) { + Thread.sleep(50L); + if ((System.currentTimeMillis() - start) > maxWaitTime) + throw new IllegalStateException("Updates didn't complete in " + maxWaitTime + "ms"); + } + + assertEquals("Expected " + UPDATE_SIZE + " updates!", UPDATE_SIZE, updateCount.get()); + } + + static class TestObserver implements Observer { + private final SecureRandom random = new SecureRandom(); + + @Override + public void add(String key, List elements) throws Exception { + int wait = random.nextInt(MAX_WAIT_PER_UPDATE); + Thread.sleep(wait); + for (StrolchRootElement element : elements) { + logger.info("Added " + key + " " + element.getLocator() + " took " + wait + "ms"); + } + + updateCount.incrementAndGet(); + } + } +} diff --git a/li.strolch.agent/src/test/resources/transienttest/config/StrolchConfiguration.xml b/li.strolch.agent/src/test/resources/transienttest/config/StrolchConfiguration.xml index f71e13e9f..a51580697 100644 --- a/li.strolch.agent/src/test/resources/transienttest/config/StrolchConfiguration.xml +++ b/li.strolch.agent/src/test/resources/transienttest/config/StrolchConfiguration.xml @@ -23,6 +23,7 @@ TRANSIENT StrolchModel.xml + true