[Major] Removed EventCollectingObserverHandler and using single thread for observer updates

This commit is contained in:
Robert von Burg 2022-07-01 14:04:54 +02:00
parent 1cdaaec6e3
commit d2c740e282
Signed by: eitch
GPG Key ID: 75DB9C85C74331F7
6 changed files with 193 additions and 297 deletions

View File

@ -17,12 +17,15 @@ package li.strolch.agent.impl;
import static li.strolch.model.Tags.AGENT;
import static li.strolch.runtime.StrolchConstants.SYSTEM_USER_AGENT;
import static li.strolch.utils.collections.SynchronizedCollections.synchronizedMapOfLists;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.ResourceBundle;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import li.strolch.agent.api.*;
import li.strolch.handler.operationslog.OperationsLog;
@ -47,104 +50,129 @@ public class DefaultObserverHandler implements ObserverHandler {
private final StrolchAgent agent;
private final StrolchRealm realm;
private volatile boolean run;
private final BlockingDeque<ObserverEvent> eventQueue;
private final MapOfLists<String, Observer> observerMap;
private Future<?> updateTask;
public DefaultObserverHandler(StrolchAgent agent, StrolchRealm realm) {
this.agent = agent;
this.realm = realm;
this.observerMap = new MapOfLists<>();
this.observerMap = synchronizedMapOfLists(new MapOfLists<>());
this.eventQueue = new LinkedBlockingDeque<>();
}
@Override
public void start() {
// nothing to do
this.run = true;
this.updateTask = this.agent.getSingleThreadExecutor("Observer").submit(this::doUpdates);
}
@Override
public void stop() {
// nothing to do
this.run = false;
this.updateTask.cancel(true);
}
@Override
public void notify(ObserverEvent event) {
if (event.added.isEmpty() && event.updated.isEmpty() && event.removed.isEmpty())
return;
ExecutorService service = this.agent.getExecutor("Observer");
if (!service.isShutdown())
service.submit(() -> doUpdates(event));
if (!(event.added.isEmpty() && event.updated.isEmpty() && event.removed.isEmpty()))
this.eventQueue.addLast(event);
}
protected void doUpdates(ObserverEvent event) {
synchronized (this.observerMap) {
for (String key : event.added.keySet()) {
add(key, event.added.getList(key));
}
for (String key : event.updated.keySet()) {
update(key, event.updated.getList(key));
}
for (String key : event.removed.keySet()) {
remove(key, event.removed.getList(key));
protected void doUpdates() {
while (this.run) {
try {
ObserverEvent event = this.eventQueue.takeFirst();
for (String key : event.added.keySet()) {
List<StrolchRootElement> list = event.added.getList(key);
if (list != null)
notifyAdd(key, list);
}
for (String key : event.updated.keySet()) {
List<StrolchRootElement> list = event.updated.getList(key);
if (list != null)
notifyUpdate(key, list);
}
for (String key : event.removed.keySet()) {
List<StrolchRootElement> list = event.removed.getList(key);
if (list != null)
notifyRemove(key, list);
}
} catch (InterruptedException e) {
if (this.run)
logger.error("Failed to do updates!", e);
else
logger.warn("Interrupted!");
}
}
}
private void add(String key, List<StrolchRootElement> elements) {
if (elements == null || elements.isEmpty())
private List<Observer> getObservers(String key) {
List<Observer> observerList = this.observerMap.getList(key);
if (observerList == null)
return null;
observerList = new ArrayList<>(observerList);
if (observerList.isEmpty())
return null;
return observerList;
}
private void notifyAdd(String key, List<StrolchRootElement> elements) {
List<Observer> observerList = getObservers(key);
if (observerList == null)
return;
List<Observer> observerList = this.observerMap.getList(key);
if (observerList != null && !observerList.isEmpty()) {
for (Observer observer : observerList) {
try {
observer.add(key, elements);
} catch (Exception e) {
String msg = "Failed to update observer {0} with {1} due to {2}"; //$NON-NLS-1$
msg = MessageFormat.format(msg, key, observer, e.getMessage());
logger.error(msg, e);
for (Observer observer : observerList) {
try {
observer.add(key, elements);
} catch (Exception e) {
String msg = "Failed to update observer {0} with {1} due to {2}"; //$NON-NLS-1$
msg = MessageFormat.format(msg, key, observer, e.getMessage());
logger.error(msg, e);
addLogMessage("add", e);
}
addLogMessage("add", e);
}
}
}
private void update(String key, List<StrolchRootElement> elements) {
if (elements == null || elements.isEmpty())
private void notifyUpdate(String key, List<StrolchRootElement> elements) {
List<Observer> observerList = getObservers(key);
if (observerList == null)
return;
List<Observer> observerList = this.observerMap.getList(key);
if (observerList != null && !observerList.isEmpty()) {
for (Observer observer : observerList) {
try {
observer.update(key, elements);
} catch (Exception e) {
String msg = "Failed to update observer {0} with {1} due to {2}"; //$NON-NLS-1$
msg = MessageFormat.format(msg, key, observer, e.getMessage());
logger.error(msg, e);
for (Observer observer : observerList) {
try {
observer.update(key, elements);
} catch (Exception e) {
String msg = "Failed to update observer {0} with {1} due to {2}"; //$NON-NLS-1$
msg = MessageFormat.format(msg, key, observer, e.getMessage());
logger.error(msg, e);
addLogMessage("update", e);
}
addLogMessage("update", e);
}
}
}
private void remove(String key, List<StrolchRootElement> elements) {
if (elements == null || elements.isEmpty())
private void notifyRemove(String key, List<StrolchRootElement> elements) {
List<Observer> observerList = getObservers(key);
if (observerList == null)
return;
List<Observer> observerList = this.observerMap.getList(key);
if (observerList != null && !observerList.isEmpty()) {
for (Observer observer : observerList) {
try {
observer.remove(key, elements);
} catch (Exception e) {
String msg = "Failed to update observer {0} with {1} due to {2}"; //$NON-NLS-1$
msg = MessageFormat.format(msg, key, observer, e.getMessage());
logger.error(msg, e);
for (Observer observer : observerList) {
try {
observer.remove(key, elements);
} catch (Exception e) {
String msg = "Failed to update observer {0} with {1} due to {2}"; //$NON-NLS-1$
msg = MessageFormat.format(msg, key, observer, e.getMessage());
logger.error(msg, e);
addLogMessage("remove", e);
}
addLogMessage("remove", e);
}
}
}
@ -162,20 +190,16 @@ public class DefaultObserverHandler implements ObserverHandler {
@Override
public void registerObserver(String key, Observer observer) {
synchronized (this.observerMap) {
this.observerMap.addElement(key, observer);
String msg = MessageFormat.format("Registered observer {0} with {1}", key, observer); //$NON-NLS-1$
logger.info(msg);
}
this.observerMap.addElement(key, observer);
String msg = MessageFormat.format("Registered observer {0} with {1}", key, observer); //$NON-NLS-1$
logger.info(msg);
}
@Override
public void unregisterObserver(String key, Observer observer) {
synchronized (this.observerMap) {
if (this.observerMap.removeElement(key, observer)) {
String msg = MessageFormat.format("Unregistered observer {0} with {1}", key, observer); //$NON-NLS-1$
logger.info(msg);
}
if (this.observerMap.removeElement(key, observer)) {
String msg = MessageFormat.format("Unregistered observer {0} with {1}", key, observer); //$NON-NLS-1$
logger.info(msg);
}
}
}

View File

@ -39,7 +39,6 @@ public class DefaultRealmHandler extends StrolchComponent implements RealmHandle
public static final String PROP_ENABLE_AUDIT_TRAIL = "enableAuditTrail"; //$NON-NLS-1$
public static final String PROP_ENABLE_AUDIT_TRAIL_FOR_READ = "enableAuditTrailForRead"; //$NON-NLS-1$
public static final String PROP_ENABLE_OBSERVER_UPDATES = "enableObserverUpdates"; //$NON-NLS-1$
public static final String PROP_ENABLED_DELAYED_OBSERVER_UPDATES = "enableDelayedObserverUpdates"; //$NON-NLS-1$
public static final String PROP_ENABLE_VERSIONING = "enableVersioning"; //$NON-NLS-1$
public static final String PREFIX_DATA_STORE_MODE = "dataStoreMode"; //$NON-NLS-1$
public static final String PREFIX_DATA_STORE_FILE = "dataStoreFile"; //$NON-NLS-1$

View File

@ -1,219 +0,0 @@
/*
* Copyright 2013 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.agent.impl;
import static li.strolch.model.Tags.AGENT;
import static li.strolch.runtime.StrolchConstants.SYSTEM_USER_AGENT;
import java.text.MessageFormat;
import java.util.List;
import java.util.ResourceBundle;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import li.strolch.agent.api.*;
import li.strolch.model.log.LogMessage;
import li.strolch.model.log.LogMessageState;
import li.strolch.model.log.LogSeverity;
import li.strolch.handler.operationslog.OperationsLog;
import li.strolch.model.Locator;
import li.strolch.model.StrolchRootElement;
import li.strolch.utils.collections.MapOfLists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A simple {@link ObserverHandler} which keeps a reference to all registered {@link Observer Observers} and notifies
* them when one of the notify methods are called
*
* @author Robert von Burg <eitch@eitchnet.ch>
*/
public class EventCollectingObserverHandler implements ObserverHandler {
private static final Logger logger = LoggerFactory.getLogger(DefaultObserverHandler.class);
private final MapOfLists<String, Observer> observerMap;
private ObserverEvent observerEvent;
private ScheduledFuture<?> future;
private final StrolchAgent agent;
private final StrolchRealm realm;
public EventCollectingObserverHandler(StrolchAgent agent, StrolchRealm realm) {
this.agent = agent;
this.realm = realm;
this.observerMap = new MapOfLists<>();
}
@Override
public void start() {
// nothing to do
}
@Override
public void stop() {
if (this.future != null) {
this.future.cancel(false);
this.future = null;
}
}
private ScheduledExecutorService getExecutor() {
return this.agent.getScheduledExecutor("Observer");
}
@Override
public synchronized void notify(ObserverEvent event) {
if (event.added.isEmpty() && event.updated.isEmpty() && event.removed.isEmpty())
return;
synchronized (this) {
if (this.observerEvent == null) {
this.observerEvent = event;
} else {
this.observerEvent.added.addAll(event.added);
this.observerEvent.updated.addAll(event.updated);
this.observerEvent.removed.addAll(event.removed);
}
}
if (this.future == null || this.future.isDone()) {
this.future = getExecutor().scheduleAtFixedRate(this::doUpdates, 100, 100, TimeUnit.MILLISECONDS);
}
}
protected void doUpdates() {
ObserverEvent event;
synchronized (this) {
if (this.observerEvent == null)
return;
event = this.observerEvent;
this.observerEvent = null;
}
synchronized (this.observerMap) {
for (String key : event.added.keySet()) {
add(key, event.added.getList(key));
}
for (String key : event.updated.keySet()) {
update(key, event.updated.getList(key));
}
for (String key : event.removed.keySet()) {
remove(key, event.removed.getList(key));
}
}
synchronized (this) {
if (this.observerEvent != null) {
this.future = getExecutor().scheduleAtFixedRate(this::doUpdates, 100, 100, TimeUnit.MILLISECONDS);
}
}
}
private void add(String key, List<StrolchRootElement> elements) {
if (elements == null || elements.isEmpty())
return;
List<Observer> observerList = this.observerMap.getList(key);
if (observerList != null && !observerList.isEmpty()) {
for (Observer observer : observerList) {
try {
observer.add(key, elements);
} catch (Exception e) {
String msg = "Failed to update observer {0} with {1} due to {2}"; //$NON-NLS-1$
msg = MessageFormat.format(msg, key, observer, e.getMessage());
logger.error(msg, e);
addLogMessage("add", e);
}
}
}
}
private void update(String key, List<StrolchRootElement> elements) {
if (elements == null || elements.isEmpty())
return;
List<Observer> observerList = this.observerMap.getList(key);
if (observerList != null && !observerList.isEmpty()) {
for (Observer observer : observerList) {
try {
observer.update(key, elements);
} catch (Exception e) {
String msg = "Failed to update observer {0} with {1} due to {2}"; //$NON-NLS-1$
msg = MessageFormat.format(msg, key, observer, e.getMessage());
logger.error(msg, e);
addLogMessage("update", e);
}
}
}
}
private void remove(String key, List<StrolchRootElement> elements) {
if (elements == null || elements.isEmpty())
return;
List<Observer> observerList = this.observerMap.getList(key);
if (observerList != null && !observerList.isEmpty()) {
for (Observer observer : observerList) {
try {
observer.remove(key, elements);
} catch (Exception e) {
String msg = "Failed to update observer {0} with {1} due to {2}"; //$NON-NLS-1$
msg = MessageFormat.format(msg, key, observer, e.getMessage());
logger.error(msg, e);
addLogMessage("remove", e);
}
}
}
}
private void addLogMessage(String type, Exception e) {
ComponentContainer container = this.agent.getContainer();
if (container.hasComponent(OperationsLog.class)) {
OperationsLog operationsLog = container.getComponent(OperationsLog.class);
operationsLog.addMessage(new LogMessage(this.realm.getRealm(), SYSTEM_USER_AGENT,
Locator.valueOf(AGENT, ObserverHandler.class.getName(), type, StrolchAgent.getUniqueId()),
LogSeverity.Exception, LogMessageState.Information, ResourceBundle.getBundle("strolch-agent"), "agent.observers.update.failed")
.withException(e).value("type", type).value("reason", e));
}
}
@Override
public void registerObserver(String key, Observer observer) {
synchronized (this.observerMap) {
this.observerMap.addElement(key, observer);
String msg = MessageFormat.format("Registered observer {0} with {1}", key, observer); //$NON-NLS-1$
logger.info(msg);
}
}
@Override
public void unregisterObserver(String key, Observer observer) {
synchronized (this.observerMap) {
if (this.observerMap.removeElement(key, observer)) {
String msg = MessageFormat.format("Unregistered observer {0} with {1}", key, observer); //$NON-NLS-1$
logger.info(msg);
}
}
}
}

View File

@ -37,7 +37,7 @@ public abstract class InternalStrolchRealm implements StrolchRealm {
public static final String PROP_TRY_LOCK_TIME_UNIT = "tryLockTimeUnit"; //$NON-NLS-1$
public static final String PROP_TRY_LOCK_TIME = "tryLockTime"; //$NON-NLS-1$
protected static final Logger logger = LoggerFactory.getLogger(StrolchRealm.class);
private String realm;
private final String realm;
private LockHandler lockHandler;
private boolean auditTrailEnabled;
private boolean auditTrailEnabledForRead;
@ -89,13 +89,7 @@ public abstract class InternalStrolchRealm implements StrolchRealm {
String updateObserversKey = makeRealmKey(getRealm(), PROP_ENABLE_OBSERVER_UPDATES);
this.updateObservers = configuration.getBoolean(updateObserversKey, Boolean.FALSE);
if (this.updateObservers) {
String delayedObserversKey = makeRealmKey(getRealm(), PROP_ENABLED_DELAYED_OBSERVER_UPDATES);
if (configuration.getBoolean(delayedObserversKey, Boolean.FALSE)) {
this.observerHandler = new EventCollectingObserverHandler(container.getAgent(), this);
logger.info("Enabled Delayed Observer Updates.");
} else {
this.observerHandler = new DefaultObserverHandler(container.getAgent(), this);
}
this.observerHandler = new DefaultObserverHandler(container.getAgent(), this);
}
// lock timeout

View File

@ -0,0 +1,97 @@
package li.strolch.agent;
import static org.junit.Assert.assertEquals;
import java.security.SecureRandom;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import li.strolch.RuntimeMock;
import li.strolch.agent.api.Observer;
import li.strolch.agent.api.StrolchRealm;
import li.strolch.model.Resource;
import li.strolch.model.StrolchRootElement;
import li.strolch.model.Tags;
import li.strolch.persistence.api.StrolchTransaction;
import li.strolch.privilege.model.Certificate;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ObserverHandlerTests {
public static final Logger logger = LoggerFactory.getLogger(ObserverHandlerTests.class);
private static final String TARGET_PATH = "target/" + ObserverHandlerTests.class.getSimpleName();
private static final String SOURCE_PATH = "src/test/resources/transienttest";
public static final int MAX_WAIT_PER_UPDATE = 100;
public static final int UPDATE_SIZE = 50;
private static RuntimeMock runtimeMock;
private static Certificate cert;
private static final AtomicInteger updateCount = new AtomicInteger(0);
@BeforeClass
public static void beforeClass() {
runtimeMock = new RuntimeMock(TARGET_PATH, SOURCE_PATH).mockRuntime();
runtimeMock.startContainer();
cert = runtimeMock.getPrivilegeHandler().authenticate("test", "test".toCharArray());
// register an observer
StrolchRealm realm = runtimeMock.getAgent().getContainer().getRealm(cert);
realm.getObserverHandler().registerObserver(Tags.RESOURCE, new TestObserver());
}
@AfterClass
public static void afterClass() {
if (cert != null)
runtimeMock.getPrivilegeHandler().invalidate(cert);
if (runtimeMock != null)
runtimeMock.destroyRuntime();
}
@Test
public void shouldNotifyObservers() throws InterruptedException {
updateCount.set(0);
long expectedCount = 0;
for (int i = 0; i < UPDATE_SIZE; i++) {
StrolchRealm realm = runtimeMock.getAgent().getContainer().getRealm(cert);
try (StrolchTransaction tx = realm.openTx(cert, ParallelTests.class, false)) {
tx.add(new Resource("res_" + i, "Resource " + i, "MyType"));
tx.commitOnClose();
}
expectedCount++;
}
long start = System.currentTimeMillis();
long maxWaitTime = MAX_WAIT_PER_UPDATE * expectedCount;
while (updateCount.get() != expectedCount) {
Thread.sleep(50L);
if ((System.currentTimeMillis() - start) > maxWaitTime)
throw new IllegalStateException("Updates didn't complete in " + maxWaitTime + "ms");
}
assertEquals("Expected " + UPDATE_SIZE + " updates!", UPDATE_SIZE, updateCount.get());
}
static class TestObserver implements Observer {
private final SecureRandom random = new SecureRandom();
@Override
public void add(String key, List<StrolchRootElement> elements) throws Exception {
int wait = random.nextInt(MAX_WAIT_PER_UPDATE);
Thread.sleep(wait);
for (StrolchRootElement element : elements) {
logger.info("Added " + key + " " + element.getLocator() + " took " + wait + "ms");
}
updateCount.incrementAndGet();
}
}
}

View File

@ -23,6 +23,7 @@
<Properties>
<dataStoreMode>TRANSIENT</dataStoreMode>
<dataStoreFile>StrolchModel.xml</dataStoreFile>
<enableObserverUpdates>true</enableObserverUpdates>
</Properties>
</Component>
<Component>