[Major] New implementation of ObserverHandler for async updates

This commit is contained in:
Robert von Burg 2017-05-02 16:07:55 +02:00
parent b2bbfad26e
commit 22538dc7fa
9 changed files with 129 additions and 96 deletions

View File

@ -0,0 +1,10 @@
package li.strolch.agent.api;
import li.strolch.model.StrolchRootElement;
import li.strolch.utils.collections.MapOfLists;
public class ObserverEvent {
public MapOfLists<String, StrolchRootElement> added = new MapOfLists<>();
public MapOfLists<String, StrolchRootElement> updated = new MapOfLists<>();
public MapOfLists<String, StrolchRootElement> removed = new MapOfLists<>();
}

View File

@ -15,8 +15,6 @@
*/
package li.strolch.agent.api;
import java.util.List;
import li.strolch.model.StrolchRootElement;
import li.strolch.model.Tags;
@ -45,36 +43,12 @@ import li.strolch.model.Tags;
public interface ObserverHandler {
/**
* Notifies any registered {@link Observer} that the given elements under the given key have been added i.e. created
* Update observers with the given event
*
* @param key
* the key for which to notify observers
* @param elements
* the elements to notify observers with
* @param observerEvent
* containing the updates
*/
public void add(String key, List<StrolchRootElement> elements);
/**
* Notifies any registered {@link Observer} that the given elements under the given key have been updated i.e.
* modified
*
* @param key
* the key for which to notify observers
* @param elements
* the elements to notify observers with
*/
public void update(String key, List<StrolchRootElement> elements);
/**
* Notifies any registered {@link Observer} that the given elements under the given key have been removed i.e.
* deleted
*
* @param key
* the key for which to notify observers
* @param elements
* the elements to notify observers with
*/
public void remove(String key, List<StrolchRootElement> elements);
public void notify(ObserverEvent observerEvent);
/**
* Registers the {@link Observer} for notification of objects under the given key
@ -95,4 +69,14 @@ public interface ObserverHandler {
* the observer unregister
*/
public void unregisterObserver(String key, Observer observer);
/**
* Start the update thread
*/
public void start();
/**
* Stop the update thread
*/
public void stop();
}

View File

@ -107,7 +107,8 @@ public class CachedRealm extends InternalStrolchRealm {
@Override
public void start(PrivilegeContext privilegeContext) {
super.start(privilegeContext);
long start = System.nanoTime();
int nrOfOrders = 0;
int nrOfResources = 0;
@ -163,11 +164,6 @@ public class CachedRealm extends InternalStrolchRealm {
logger.info(MessageFormat.format("Loaded {0} Activities", nrOfActivities)); //$NON-NLS-1$
}
@Override
public void stop() {
//
}
@Override
public void destroy() {
//

View File

@ -16,21 +16,23 @@
package li.strolch.agent.impl;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import li.strolch.agent.api.Observer;
import li.strolch.agent.api.ObserverEvent;
import li.strolch.agent.api.ObserverHandler;
import li.strolch.model.StrolchRootElement;
import li.strolch.runtime.ThreadPoolFactory;
import li.strolch.utils.collections.MapOfLists;
/**
* A simple {@link ObserverHandler} which keeps a reference to all registered {@link Observer} and notifies them when
* one of the notify methods are called
* A simple {@link ObserverHandler} which keeps a reference to all registered {@link Observer Observers} and notifies
* them when one of the notify methods are called
*
* @author Robert von Burg <eitch@eitchnet.ch>
*/
@ -38,18 +40,62 @@ public class DefaultObserverHandler implements ObserverHandler {
private static final Logger logger = LoggerFactory.getLogger(DefaultObserverHandler.class);
private Map<String, List<Observer>> observerMap;
private MapOfLists<String, Observer> observerMap;
private ExecutorService executorService;
public DefaultObserverHandler() {
this.observerMap = new HashMap<>();
this.observerMap = new MapOfLists<>();
}
@Override
public void add(String key, List<StrolchRootElement> elements) {
public void start() {
this.executorService = Executors.newSingleThreadExecutor(new ThreadPoolFactory("ObserverHandler"));
}
@Override
public void stop() {
if (this.executorService != null) {
this.executorService.shutdown();
while (!this.executorService.isTerminated()) {
logger.info("Waiting for last update to complete...");
try {
Thread.sleep(50L);
} catch (InterruptedException e) {
logger.error("Interrupted!");
}
}
this.executorService = null;
}
}
@Override
public void notify(ObserverEvent event) {
if (event.added.isEmpty() && event.updated.isEmpty() && event.removed.isEmpty())
return;
this.executorService.execute(() -> {
synchronized (this.observerMap) {
for (String key : event.added.keySet()) {
add(key, event.added.getList(key));
}
for (String key : event.updated.keySet()) {
update(key, event.updated.getList(key));
}
for (String key : event.removed.keySet()) {
remove(key, event.removed.getList(key));
}
}
});
}
private void add(String key, List<StrolchRootElement> elements) {
if (elements == null || elements.isEmpty())
return;
List<Observer> observerList = this.observerMap.get(key);
List<Observer> observerList = this.observerMap.getList(key);
if (observerList != null && !observerList.isEmpty()) {
for (Observer observer : observerList) {
try {
@ -63,12 +109,11 @@ public class DefaultObserverHandler implements ObserverHandler {
}
}
@Override
public void update(String key, List<StrolchRootElement> elements) {
private void update(String key, List<StrolchRootElement> elements) {
if (elements == null || elements.isEmpty())
return;
List<Observer> observerList = this.observerMap.get(key);
List<Observer> observerList = this.observerMap.getList(key);
if (observerList != null && !observerList.isEmpty()) {
for (Observer observer : observerList) {
try {
@ -82,12 +127,11 @@ public class DefaultObserverHandler implements ObserverHandler {
}
}
@Override
public void remove(String key, List<StrolchRootElement> elements) {
private void remove(String key, List<StrolchRootElement> elements) {
if (elements == null || elements.isEmpty())
return;
List<Observer> observerList = this.observerMap.get(key);
List<Observer> observerList = this.observerMap.getList(key);
if (observerList != null && !observerList.isEmpty()) {
for (Observer observer : observerList) {
try {
@ -103,22 +147,20 @@ public class DefaultObserverHandler implements ObserverHandler {
@Override
public void registerObserver(String key, Observer observer) {
List<Observer> observerList = this.observerMap.get(key);
if (observerList == null) {
observerList = new ArrayList<>();
this.observerMap.put(key, observerList);
synchronized (this.observerMap) {
this.observerMap.addElement(key, observer);
String msg = MessageFormat.format("Registered observer {0} with {1}", key, observer); //$NON-NLS-1$
logger.info(msg);
}
observerList.add(observer);
String msg = MessageFormat.format("Registered observer {0} with {1}", key, observer); //$NON-NLS-1$
logger.info(msg);
}
@Override
public void unregisterObserver(String key, Observer observer) {
List<Observer> observerList = this.observerMap.get(key);
if (observerList != null && observerList.remove(observer)) {
String msg = MessageFormat.format("Unregistered observer {0} with {1}", key, observer); //$NON-NLS-1$
logger.info(msg);
synchronized (this.observerMap) {
if (this.observerMap.removeElement(key, observer)) {
String msg = MessageFormat.format("Unregistered observer {0} with {1}", key, observer); //$NON-NLS-1$
logger.info(msg);
}
}
}
}

View File

@ -98,14 +98,10 @@ public class EmptyRealm extends InternalStrolchRealm {
@Override
public void start(PrivilegeContext privilegeContext) {
super.start(privilegeContext);
logger.info(MessageFormat.format("Initialized EMPTY Realm {0}", getRealm())); //$NON-NLS-1$
}
@Override
public void stop() {
//
}
@Override
public void destroy() {
//

View File

@ -158,9 +158,19 @@ public abstract class InternalStrolchRealm implements StrolchRealm {
return this.observerHandler;
}
public abstract void start(PrivilegeContext privilegeContext);
public void start(PrivilegeContext privilegeContext) {
public abstract void stop();
if (this.observerHandler != null) {
this.observerHandler.start();
}
}
public void stop() {
if (this.observerHandler != null) {
this.observerHandler.stop();
}
}
public abstract void destroy();
@ -171,5 +181,4 @@ public abstract class InternalStrolchRealm implements StrolchRealm {
public abstract ActivityMap getActivityMap();
public abstract AuditTrail getAuditTrail();
}

View File

@ -100,6 +100,7 @@ public class TransactionalRealm extends InternalStrolchRealm {
@Override
public void start(PrivilegeContext privilegeContext) {
super.start(privilegeContext);
long start = System.nanoTime();
int nrOfOrders = 0;
@ -127,11 +128,6 @@ public class TransactionalRealm extends InternalStrolchRealm {
logger.info(MessageFormat.format("There are {0} Activities", nrOfActivities)); //$NON-NLS-1$
}
@Override
public void stop() {
//
}
@Override
public void destroy() {
//

View File

@ -116,6 +116,7 @@ public class TransientRealm extends InternalStrolchRealm {
@Override
public void start(PrivilegeContext privilegeContext) {
super.start(privilegeContext);
ModelStatistics statistics;
try (StrolchTransaction tx = openTx(privilegeContext.getCertificate(), "strolch_boot")) {
@ -141,11 +142,6 @@ public class TransientRealm extends InternalStrolchRealm {
logger.info(MessageFormat.format("Loaded {0} Activities", statistics.nrOfActivities)); //$NON-NLS-1$
}
@Override
public void stop() {
//
}
@Override
public void destroy() {
//

View File

@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import li.strolch.agent.api.ActivityMap;
import li.strolch.agent.api.AuditTrail;
import li.strolch.agent.api.ObserverEvent;
import li.strolch.agent.api.ObserverHandler;
import li.strolch.agent.api.OrderMap;
import li.strolch.agent.api.ResourceMap;
@ -886,35 +887,38 @@ public abstract class AbstractTransaction implements StrolchTransaction {
long observerUpdateStart = System.nanoTime();
ObserverHandler observerHandler = this.realm.getObserverHandler();
if (this.orderMap != null) {
if (!this.orderMap.getCreated().isEmpty())
observerHandler.add(Tags.ORDER, new ArrayList<StrolchRootElement>(this.orderMap.getCreated()));
if (!this.orderMap.getUpdated().isEmpty())
observerHandler.update(Tags.ORDER, new ArrayList<StrolchRootElement>(this.orderMap.getUpdated()));
if (!this.orderMap.getDeleted().isEmpty())
observerHandler.remove(Tags.ORDER, new ArrayList<StrolchRootElement>(this.orderMap.getDeleted()));
}
ObserverEvent event = new ObserverEvent();
if (this.resourceMap != null) {
if (!this.resourceMap.getCreated().isEmpty())
observerHandler.add(Tags.RESOURCE, new ArrayList<StrolchRootElement>(this.resourceMap.getCreated()));
event.added.addList(Tags.RESOURCE, new ArrayList<StrolchRootElement>(this.resourceMap.getCreated()));
if (!this.resourceMap.getUpdated().isEmpty())
observerHandler.update(Tags.RESOURCE, new ArrayList<StrolchRootElement>(this.resourceMap.getUpdated()));
event.updated.addList(Tags.RESOURCE, new ArrayList<StrolchRootElement>(this.resourceMap.getUpdated()));
if (!this.resourceMap.getDeleted().isEmpty())
observerHandler.remove(Tags.RESOURCE, new ArrayList<StrolchRootElement>(this.resourceMap.getDeleted()));
event.removed.addList(Tags.RESOURCE, new ArrayList<StrolchRootElement>(this.resourceMap.getDeleted()));
}
if (this.orderMap != null) {
if (!this.orderMap.getCreated().isEmpty())
event.added.addList(Tags.ORDER, new ArrayList<StrolchRootElement>(this.orderMap.getCreated()));
if (!this.orderMap.getUpdated().isEmpty())
event.updated.addList(Tags.ORDER, new ArrayList<StrolchRootElement>(this.orderMap.getUpdated()));
if (!this.orderMap.getDeleted().isEmpty())
event.removed.addList(Tags.ORDER, new ArrayList<StrolchRootElement>(this.orderMap.getDeleted()));
}
if (this.activityMap != null) {
if (!this.activityMap.getCreated().isEmpty())
observerHandler.add(Tags.ACTIVITY, new ArrayList<StrolchRootElement>(this.activityMap.getCreated()));
event.added.addList(Tags.ACTIVITY, new ArrayList<StrolchRootElement>(this.activityMap.getCreated()));
if (!this.activityMap.getUpdated().isEmpty())
observerHandler.update(Tags.ACTIVITY, new ArrayList<StrolchRootElement>(this.activityMap.getUpdated()));
event.updated.addList(Tags.ACTIVITY, new ArrayList<StrolchRootElement>(this.activityMap.getUpdated()));
if (!this.activityMap.getDeleted().isEmpty())
observerHandler.remove(Tags.ACTIVITY, new ArrayList<StrolchRootElement>(this.activityMap.getDeleted()));
event.removed.addList(Tags.ACTIVITY, new ArrayList<StrolchRootElement>(this.activityMap.getDeleted()));
}
ObserverHandler observerHandler = this.realm.getObserverHandler();
observerHandler.notify(event);
long observerUpdateDuration = System.nanoTime() - observerUpdateStart;
return observerUpdateDuration;
}