diff --git a/li.strolch.agent/src/main/java/li/strolch/agent/impl/DefaultObserverHandler.java b/li.strolch.agent/src/main/java/li/strolch/agent/impl/DefaultObserverHandler.java index 9cc10386a..c5dc8d679 100644 --- a/li.strolch.agent/src/main/java/li/strolch/agent/impl/DefaultObserverHandler.java +++ b/li.strolch.agent/src/main/java/li/strolch/agent/impl/DefaultObserverHandler.java @@ -77,20 +77,24 @@ public class DefaultObserverHandler implements ObserverHandler { return; this.executorService.execute(() -> { - synchronized (this.observerMap) { - for (String key : event.added.keySet()) { - add(key, event.added.getList(key)); - } - for (String key : event.updated.keySet()) { - update(key, event.updated.getList(key)); - } - for (String key : event.removed.keySet()) { - remove(key, event.removed.getList(key)); - } - } + doUpdates(event); }); } + protected void doUpdates(ObserverEvent event) { + synchronized (this.observerMap) { + for (String key : event.added.keySet()) { + add(key, event.added.getList(key)); + } + for (String key : event.updated.keySet()) { + update(key, event.updated.getList(key)); + } + for (String key : event.removed.keySet()) { + remove(key, event.removed.getList(key)); + } + } + } + private void add(String key, List elements) { if (elements == null || elements.isEmpty()) return; diff --git a/li.strolch.agent/src/main/java/li/strolch/agent/impl/DefaultRealmHandler.java b/li.strolch.agent/src/main/java/li/strolch/agent/impl/DefaultRealmHandler.java index 5f2032e2e..f6abb4b9c 100644 --- a/li.strolch.agent/src/main/java/li/strolch/agent/impl/DefaultRealmHandler.java +++ b/li.strolch.agent/src/main/java/li/strolch/agent/impl/DefaultRealmHandler.java @@ -39,6 +39,7 @@ public class DefaultRealmHandler extends StrolchComponent implements RealmHandle public static final String PROP_ENABLE_AUDIT_TRAIL = "enableAuditTrail"; //$NON-NLS-1$ public static final String PROP_ENABLE_AUDIT_TRAIL_FOR_READ = "enableAuditTrailForRead"; //$NON-NLS-1$ public static final String PROP_ENABLE_OBSERVER_UPDATES = "enableObserverUpdates"; //$NON-NLS-1$ + public static final String PROP_ENABLED_DELAYED_OBSERVER_UPDATES = "enableDelayedObserverUpdates"; //$NON-NLS-1$ public static final String PROP_ENABLE_VERSIONING = "enableVersioning"; //$NON-NLS-1$ public static final String PREFIX_DATA_STORE_MODE = "dataStoreMode"; //$NON-NLS-1$ public static final String PREFIX_DATA_STORE_FILE = "dataStoreFile"; //$NON-NLS-1$ diff --git a/li.strolch.agent/src/main/java/li/strolch/agent/impl/EventCollectingObserverHandler.java b/li.strolch.agent/src/main/java/li/strolch/agent/impl/EventCollectingObserverHandler.java new file mode 100644 index 000000000..a23955ae6 --- /dev/null +++ b/li.strolch.agent/src/main/java/li/strolch/agent/impl/EventCollectingObserverHandler.java @@ -0,0 +1,207 @@ +/* + * Copyright 2013 Robert von Burg + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package li.strolch.agent.impl; + +import java.text.MessageFormat; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import li.strolch.agent.api.Observer; +import li.strolch.agent.api.ObserverEvent; +import li.strolch.agent.api.ObserverHandler; +import li.strolch.model.StrolchRootElement; +import li.strolch.runtime.ThreadPoolFactory; +import li.strolch.utils.collections.MapOfLists; + +/** + * A simple {@link ObserverHandler} which keeps a reference to all registered {@link Observer Observers} and notifies + * them when one of the notify methods are called + * + * @author Robert von Burg + */ +public class EventCollectingObserverHandler implements ObserverHandler { + + private static final Logger logger = LoggerFactory.getLogger(DefaultObserverHandler.class); + + private MapOfLists observerMap; + + private ScheduledExecutorService executorService; + private ObserverEvent observerEvent; + + private ScheduledFuture future; + + public EventCollectingObserverHandler() { + this.observerMap = new MapOfLists<>(); + } + + @Override + public void start() { + this.executorService = Executors.newScheduledThreadPool(1, new ThreadPoolFactory("ObserverHandler")); + } + + @Override + public void stop() { + + if (this.future != null) { + this.future.cancel(false); + this.future = null; + } + + if (this.executorService != null) { + this.executorService.shutdown(); + while (!this.executorService.isTerminated()) { + logger.info("Waiting for last update to complete..."); + try { + Thread.sleep(50L); + } catch (InterruptedException e) { + logger.error("Interrupted!"); + } + } + + this.executorService = null; + } + } + + @Override + public synchronized void notify(ObserverEvent event) { + if (event.added.isEmpty() && event.updated.isEmpty() && event.removed.isEmpty()) + return; + + synchronized (this) { + if (this.observerEvent == null) { + this.observerEvent = event; + } else { + this.observerEvent.added.addAll(event.added); + this.observerEvent.updated.addAll(event.updated); + this.observerEvent.removed.addAll(event.removed); + } + } + + if (this.future == null || this.future.isDone()) { + this.future = this.executorService.scheduleAtFixedRate(() -> doUpdates(), 100, 100, TimeUnit.MILLISECONDS); + } + } + + protected void doUpdates() { + + ObserverEvent event; + synchronized (this) { + if (this.observerEvent == null) + return; + + event = this.observerEvent; + this.observerEvent = null; + } + + synchronized (this.observerMap) { + for (String key : event.added.keySet()) { + add(key, event.added.getList(key)); + } + for (String key : event.updated.keySet()) { + update(key, event.updated.getList(key)); + } + for (String key : event.removed.keySet()) { + remove(key, event.removed.getList(key)); + } + } + + synchronized (this) { + if (this.observerEvent != null) { + this.future = this.executorService.scheduleAtFixedRate(() -> doUpdates(), 100, 100, + TimeUnit.MILLISECONDS); + } + } + } + + private void add(String key, List elements) { + if (elements == null || elements.isEmpty()) + return; + + List observerList = this.observerMap.getList(key); + if (observerList != null && !observerList.isEmpty()) { + for (Observer observer : observerList) { + try { + observer.add(key, elements); + } catch (Exception e) { + String msg = "Failed to update observer {0} with {1} due to {2}"; //$NON-NLS-1$ + msg = MessageFormat.format(msg, key, observer, e.getMessage()); + logger.error(msg, e); + } + } + } + } + + private void update(String key, List elements) { + if (elements == null || elements.isEmpty()) + return; + + List observerList = this.observerMap.getList(key); + if (observerList != null && !observerList.isEmpty()) { + for (Observer observer : observerList) { + try { + observer.update(key, elements); + } catch (Exception e) { + String msg = "Failed to update observer {0} with {1} due to {2}"; //$NON-NLS-1$ + msg = MessageFormat.format(msg, key, observer, e.getMessage()); + logger.error(msg, e); + } + } + } + } + + private void remove(String key, List elements) { + if (elements == null || elements.isEmpty()) + return; + + List observerList = this.observerMap.getList(key); + if (observerList != null && !observerList.isEmpty()) { + for (Observer observer : observerList) { + try { + observer.remove(key, elements); + } catch (Exception e) { + String msg = "Failed to update observer {0} with {1} due to {2}"; //$NON-NLS-1$ + msg = MessageFormat.format(msg, key, observer, e.getMessage()); + logger.error(msg, e); + } + } + } + } + + @Override + public void registerObserver(String key, Observer observer) { + synchronized (this.observerMap) { + this.observerMap.addElement(key, observer); + String msg = MessageFormat.format("Registered observer {0} with {1}", key, observer); //$NON-NLS-1$ + logger.info(msg); + } + } + + @Override + public void unregisterObserver(String key, Observer observer) { + synchronized (this.observerMap) { + if (this.observerMap.removeElement(key, observer)) { + String msg = MessageFormat.format("Unregistered observer {0} with {1}", key, observer); //$NON-NLS-1$ + logger.info(msg); + } + } + } +} diff --git a/li.strolch.agent/src/main/java/li/strolch/agent/impl/InternalStrolchRealm.java b/li.strolch.agent/src/main/java/li/strolch/agent/impl/InternalStrolchRealm.java index 6a764fecd..d3e981ef2 100644 --- a/li.strolch.agent/src/main/java/li/strolch/agent/impl/InternalStrolchRealm.java +++ b/li.strolch.agent/src/main/java/li/strolch/agent/impl/InternalStrolchRealm.java @@ -15,10 +15,12 @@ */ package li.strolch.agent.impl; +import static li.strolch.agent.impl.DefaultRealmHandler.PROP_ENABLED_DELAYED_OBSERVER_UPDATES; import static li.strolch.agent.impl.DefaultRealmHandler.PROP_ENABLE_AUDIT_TRAIL; import static li.strolch.agent.impl.DefaultRealmHandler.PROP_ENABLE_AUDIT_TRAIL_FOR_READ; import static li.strolch.agent.impl.DefaultRealmHandler.PROP_ENABLE_OBSERVER_UPDATES; import static li.strolch.agent.impl.DefaultRealmHandler.PROP_ENABLE_VERSIONING; +import static li.strolch.runtime.StrolchConstants.makeRealmKey; import java.text.MessageFormat; import java.util.concurrent.TimeUnit; @@ -36,7 +38,6 @@ import li.strolch.agent.api.ResourceMap; import li.strolch.agent.api.StrolchRealm; import li.strolch.model.Locator; import li.strolch.privilege.model.PrivilegeContext; -import li.strolch.runtime.StrolchConstants; import li.strolch.runtime.configuration.ComponentConfiguration; import li.strolch.utils.dbc.DBC; @@ -87,28 +88,35 @@ public abstract class InternalStrolchRealm implements StrolchRealm { logger.info("Initializing Realm " + getRealm() + "..."); // audits - String enableAuditKey = StrolchConstants.makeRealmKey(getRealm(), PROP_ENABLE_AUDIT_TRAIL); + String enableAuditKey = makeRealmKey(getRealm(), PROP_ENABLE_AUDIT_TRAIL); this.auditTrailEnabled = configuration.getBoolean(enableAuditKey, Boolean.FALSE); // audits for read - String enableAuditForReadKey = StrolchConstants.makeRealmKey(getRealm(), PROP_ENABLE_AUDIT_TRAIL_FOR_READ); + String enableAuditForReadKey = makeRealmKey(getRealm(), PROP_ENABLE_AUDIT_TRAIL_FOR_READ); this.auditTrailEnabledForRead = configuration.getBoolean(enableAuditForReadKey, Boolean.FALSE); // observer updates - String updateObserversKey = StrolchConstants.makeRealmKey(getRealm(), PROP_ENABLE_OBSERVER_UPDATES); + String updateObserversKey = makeRealmKey(getRealm(), PROP_ENABLE_OBSERVER_UPDATES); this.updateObservers = configuration.getBoolean(updateObserversKey, Boolean.FALSE); - if (this.updateObservers) - this.observerHandler = new DefaultObserverHandler(); + if (this.updateObservers) { + String delayedObserversKey = makeRealmKey(getRealm(), PROP_ENABLED_DELAYED_OBSERVER_UPDATES); + if (configuration.getBoolean(delayedObserversKey, Boolean.FALSE)) { + this.observerHandler = new DefaultObserverHandler(); + } else { + this.observerHandler = new EventCollectingObserverHandler(); + logger.info("Enabled Delayed Observer Updates."); + } + } // lock timeout - String propTryLockTimeUnit = StrolchConstants.makeRealmKey(this.realm, PROP_TRY_LOCK_TIME_UNIT); - String propTryLockTime = StrolchConstants.makeRealmKey(this.realm, PROP_TRY_LOCK_TIME); + String propTryLockTimeUnit = makeRealmKey(this.realm, PROP_TRY_LOCK_TIME_UNIT); + String propTryLockTime = makeRealmKey(this.realm, PROP_TRY_LOCK_TIME); TimeUnit timeUnit = TimeUnit.valueOf(configuration.getString(propTryLockTimeUnit, TimeUnit.SECONDS.name())); long time = configuration.getLong(propTryLockTime, 10L); this.lockHandler = new DefaultLockHandler(this.realm, timeUnit, time); // versioning - String enableVersioningKey = StrolchConstants.makeRealmKey(getRealm(), PROP_ENABLE_VERSIONING); + String enableVersioningKey = makeRealmKey(getRealm(), PROP_ENABLE_VERSIONING); this.versioningEnabled = configuration.getBoolean(enableVersioningKey, Boolean.FALSE); if (this.auditTrailEnabled)