[Major] Using executor pools with names in StrolchAgent

This commit is contained in:
Robert von Burg 2017-12-19 14:54:57 +01:00
parent 7a6a3a3c2b
commit 50c379d06d
11 changed files with 295 additions and 260 deletions

View File

@ -1,12 +1,12 @@
/*
* Copyright 2013 Robert von Burg <eitch@eitchnet.ch>
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -23,7 +23,7 @@ import li.strolch.model.Tags;
* Implementing the observer pattern by allowing {@link Observer} to register themselves for updates to
* {@link StrolchRootElement}
* </p>
*
*
* <p>
* Note: The key in all the methods can be any string, but as a convenience it is mostly one of the following:
* </p>
@ -32,51 +32,51 @@ import li.strolch.model.Tags;
* <li>{@link Tags#ORDER}</li>
* <li>{@link Tags#ACTIVITY}</li>
* </ul>
*
*
* <p>
* Should a special case arise, then it a contract must be defined by which a key is negotiated between an event
* distributer and the observer
* </p>
*
*
* @author Robert von Burg <eitch@eitchnet.ch>
*/
public interface ObserverHandler {
/**
* Update observers with the given event
*
*
* @param observerEvent
* containing the updates
* containing the updates
*/
public void notify(ObserverEvent observerEvent);
void notify(ObserverEvent observerEvent);
/**
* Registers the {@link Observer} for notification of objects under the given key
*
*
* @param key
* the key for which to the observer wants to be notified
* the key for which to the observer wants to be notified
* @param observer
* the observer to register
* the observer to register
*/
public void registerObserver(String key, Observer observer);
void registerObserver(String key, Observer observer);
/**
* Unregister the given {@link Observer}
*
*
* @param key
* the key for which to the observer was registered
* the key for which to the observer was registered
* @param observer
* the observer unregister
* the observer unregister
*/
public void unregisterObserver(String key, Observer observer);
void unregisterObserver(String key, Observer observer);
/**
* Start the update thread
*/
public void start();
void start();
/**
* Stop the update thread
*/
public void stop();
void stop();
}

View File

@ -1,12 +1,12 @@
/*
* Copyright 2013 Robert von Burg <eitch@eitchnet.ch>
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -19,6 +19,8 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@ -26,15 +28,16 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import li.strolch.agent.impl.ComponentContainerImpl;
import li.strolch.runtime.ThreadPoolFactory;
import li.strolch.runtime.configuration.ConfigurationParser;
import li.strolch.runtime.configuration.RuntimeConfiguration;
import li.strolch.runtime.configuration.StrolchConfiguration;
import li.strolch.utils.dbc.DBC;
import li.strolch.utils.helper.StringHelper;
import li.strolch.utils.helper.SystemHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Robert von Burg <eitch@eitchnet.ch>
@ -48,8 +51,9 @@ public class StrolchAgent {
private ComponentContainerImpl container;
private StrolchConfiguration strolchConfiguration;
private StrolchVersion appVersion;
private ExecutorService executor;
private ScheduledExecutorService scheduledExecutor;
private Map<String, ExecutorService> executors;
private Map<String, ScheduledExecutorService> scheduledExecutors;
public StrolchAgent(StrolchVersion appVersion) {
this.appVersion = appVersion;
@ -57,7 +61,7 @@ public class StrolchAgent {
/**
* Return the {@link StrolchConfiguration}
*
*
* @return the {@link StrolchConfiguration}
*/
public StrolchConfiguration getStrolchConfiguration() {
@ -66,7 +70,7 @@ public class StrolchAgent {
/**
* Return the container
*
*
* @return the container
*/
public ComponentContainer getContainer() {
@ -82,20 +86,42 @@ public class StrolchAgent {
/**
* Return the {@link ExecutorService} instantiated for this agent
*
*
* @return the {@link ExecutorService} instantiated for this agent
*/
public ExecutorService getExecutor() {
return this.executor;
return getExecutor("Agent");
}
public synchronized ExecutorService getExecutor(String poolName) {
DBC.PRE.assertNotEmpty("poolName must be set!", poolName);
ExecutorService executor = this.executors.get(poolName);
if (executor == null) {
executor = Executors.newCachedThreadPool(new ThreadPoolFactory(poolName));
this.executors.put(poolName, executor);
}
return executor;
}
/**
* Return the {@link ScheduledExecutorService} instantiated for this agent
*
*
* @return the {@link ScheduledExecutorService} instantiated for this agent
*/
public ScheduledExecutorService getScheduledExecutor() {
return this.scheduledExecutor;
return getScheduledExecutor("Agent");
}
public synchronized ScheduledExecutorService getScheduledExecutor(String poolName) {
DBC.PRE.assertNotEmpty("poolName must be set!", poolName);
ScheduledExecutorService executor = this.scheduledExecutors.get(poolName);
if (executor == null) {
executor = Executors.newScheduledThreadPool(4, new ThreadPoolFactory(poolName));
this.scheduledExecutors.put(poolName, executor);
}
return executor;
}
/**
@ -105,8 +131,10 @@ public class StrolchAgent {
public void initialize() {
if (this.container == null)
throw new RuntimeException("Please call setup first!");
this.executor = Executors.newCachedThreadPool();
this.scheduledExecutor = Executors.newScheduledThreadPool(4);
this.executors = new HashMap<>();
this.scheduledExecutors = new HashMap<>();
this.container.initialize(this.strolchConfiguration);
}
@ -127,17 +155,23 @@ public class StrolchAgent {
this.container.stop();
}
private void shutdownExecutorService(ExecutorService executor) {
try {
logger.info("Shutting down executor...");
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("Was interrupted while shutting down tasks");
} finally {
if (!executor.isTerminated()) {
logger.error("Tasks not stopped after " + 5 + "s. Shutting down now.");
executor.shutdownNow();
private <T extends ExecutorService> void shutdownExecutorService(Map<String, T> executors) {
for (String poolName : executors.keySet()) {
logger.info("Shutting down executor pool " + poolName);
T executor = executors.get(poolName);
try {
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("Was interrupted while shutting down tasks");
} finally {
if (!executor.isTerminated()) {
logger.error("Tasks not stopped after " + 5 + "s. Shutting down now.");
executor.shutdownNow();
}
}
}
}
@ -147,10 +181,10 @@ public class StrolchAgent {
*/
public void destroy() {
if (this.executor != null)
shutdownExecutorService(this.executor);
if (this.scheduledExecutor != null)
shutdownExecutorService(this.scheduledExecutor);
if (this.executors != null)
shutdownExecutorService(this.executors);
if (this.scheduledExecutors != null)
shutdownExecutorService(this.scheduledExecutors);
if (this.container != null)
this.container.destroy();
@ -161,15 +195,19 @@ public class StrolchAgent {
* <p>
* <b>Note:</b> Use {@link StrolchBootstrapper} instead of calling this method directly!
* </p>
*
*
* <p>
* Sets up the agent by parsing the configuration file and initializes the given environment
* </p>
*
*
* @param environment
* the current environment
* @param configPathF
* the path to the config directory
* @param dataPathF
* the path to the data directory
* @param tempPathF
* the path to the temp directory
*/
void setup(String environment, File configPathF, File dataPathF, File tempPathF) {
@ -180,8 +218,8 @@ public class StrolchAgent {
logger.info(" - Temp: " + tempPathF.getAbsolutePath());
logger.info(" - user.dir: " + SystemHelper.getUserDir());
this.strolchConfiguration = ConfigurationParser.parseConfiguration(environment, configPathF, dataPathF,
tempPathF);
this.strolchConfiguration = ConfigurationParser
.parseConfiguration(environment, configPathF, dataPathF, tempPathF);
ComponentContainerImpl container = new ComponentContainerImpl(this);
container.setup(this.strolchConfiguration);
@ -189,7 +227,8 @@ public class StrolchAgent {
this.container = container;
RuntimeConfiguration config = this.strolchConfiguration.getRuntimeConfiguration();
logger.info(MessageFormat.format("Setup Agent {0}:{1}", config.getApplicationName(), config.getEnvironment())); //$NON-NLS-1$
logger.info(MessageFormat
.format("Setup Agent {0}:{1}", config.getApplicationName(), config.getEnvironment())); //$NON-NLS-1$
}
protected void assertContainerStarted() {
@ -217,7 +256,7 @@ public class StrolchAgent {
/**
* Returns the version of this agent
*
*
* @return the version of this agent
*/
public VersionQueryResult getVersion() {
@ -228,13 +267,14 @@ public class StrolchAgent {
Properties properties = new Properties();
try (InputStream stream = getClass().getResourceAsStream(AGENT_VERSION_PROPERTIES);) {
try (InputStream stream = getClass().getResourceAsStream(AGENT_VERSION_PROPERTIES)) {
properties.load(stream);
AgentVersion agentVersion = new AgentVersion(
getStrolchConfiguration().getRuntimeConfiguration().getApplicationName(), properties);
queryResult.setAgentVersion(agentVersion);
} catch (IOException e) {
String msg = MessageFormat.format("Failed to read version properties for agent: {0}", e.getMessage()); //$NON-NLS-1$
String msg = MessageFormat
.format("Failed to read version properties for agent: {0}", e.getMessage()); //$NON-NLS-1$
queryResult.getErrors().add(msg);
logger.error(msg, e);
}

View File

@ -1,12 +1,12 @@
/*
* Copyright 2013 Robert von Burg <eitch@eitchnet.ch>
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -42,17 +42,17 @@ import li.strolch.runtime.privilege.PrivilegedRunnableWithResult;
* A {@link StrolchComponent} is a configurable extension to Strolch. Every major feature should be implemented as a
* {@link StrolchComponent} so that they can be easily added or removed from a Strolch runtime.
* </p>
*
*
* <p>
* A {@link StrolchComponent} has access to the container and can perform different operations. They can be passive or
* active and their life cycle is bound to the container's life cycle
* </p>
*
*
* <p>
* A {@link StrolchComponent} is registered in the Strolch configuration file and can have different configuration
* depending on the container's runtime environment
* </p>
*
*
* @author Robert von Burg <eitch@eitchnet.ch>
*/
public class StrolchComponent {
@ -68,11 +68,11 @@ public class StrolchComponent {
/**
* Constructor which takes a reference to the container and the component's name under which it can be retrieved at
* runtime (although one mostly retrieves the component by interface class for automatic casting)
*
*
* @param container
* the container
* the container
* @param componentName
* the component name
* the component name
*/
public StrolchComponent(ComponentContainer container, String componentName) {
this.container = container;
@ -89,7 +89,7 @@ public class StrolchComponent {
/**
* Returns the current component's state
*
*
* @return the component's current state
*/
public ComponentState getState() {
@ -98,7 +98,7 @@ public class StrolchComponent {
/**
* Returns the reference to the container for sub classes
*
*
* @return the reference to the container
*/
protected ComponentContainer getContainer() {
@ -107,7 +107,7 @@ public class StrolchComponent {
/**
* The components current configuration dependent on the environment which is loaded
*
*
* @return the component's configuration
*/
protected ComponentConfiguration getConfiguration() {
@ -116,22 +116,46 @@ public class StrolchComponent {
/**
* Return the {@link ExecutorService} instantiated for this agent
*
*
* @return the {@link ExecutorService} instantiated for this agent
*/
protected ExecutorService getExecutorService() {
return this.container.getAgent().getExecutor();
}
/**
* Return the {@link ExecutorService} for the given poolName instantiated for this agent
*
* @param poolName
* the name of the pool
*
* @return the {@link ExecutorService} for the given poolName instantiated for this agent
*/
protected ExecutorService getExecutorService(String poolName) {
return this.container.getAgent().getExecutor(poolName);
}
/**
* Return the {@link ScheduledExecutorService} instantiated for this agent
*
*
* @return the {@link ScheduledExecutorService} instantiated for this agent
*/
protected ScheduledExecutorService getScheduledExecutor() {
return this.container.getAgent().getScheduledExecutor();
}
/**
* Return the {@link ScheduledExecutorService} for the given poolName instantiated for this agent
*
* @param poolName
* the name of the pool
*
* @return the {@link ScheduledExecutorService} instantiated for this agent
*/
protected ScheduledExecutorService getScheduledExecutor(String poolName) {
return this.container.getAgent().getScheduledExecutor(poolName);
}
/**
* Can be used by sub classes to assert that the component is started and thus ready to use, before any component
* methods are used
@ -156,8 +180,9 @@ public class StrolchComponent {
/**
* Life cycle step setup. This is a very early step in the container's startup phase.
*
*
* @param configuration
* the configuration
*/
public void setup(ComponentConfiguration configuration) {
this.state = this.state.validateStateChange(ComponentState.SETUP, getName());
@ -165,9 +190,12 @@ public class StrolchComponent {
/**
* Life cycle step initialize. Here you would typically read configuration values
*
*
* @param configuration
* the configuration
*
* @throws Exception
* if something goes wrong
*/
public void initialize(ComponentConfiguration configuration) throws Exception {
this.configuration = configuration;
@ -177,8 +205,9 @@ public class StrolchComponent {
/**
* Life cycle step start. This is the last step of startup and is where threads and connections etc. would be
* prepared. Can also be called after stop, to restart the component.
*
*
* @throws Exception
* if something goes wrong
*/
public void start() throws Exception {
this.state = this.state.validateStateChange(ComponentState.STARTED, getName());
@ -187,8 +216,9 @@ public class StrolchComponent {
/**
* Life cycle step stop. This is the first step in the tearing down of the container. Stop all active threads and
* connections here. After stop is called, another start might also be called to restart the component.
*
*
* @throws Exception
* if something goes wrong
*/
public void stop() throws Exception {
this.state = this.state.validateStateChange(ComponentState.STOPPED, getName());
@ -197,8 +227,9 @@ public class StrolchComponent {
/**
* Life cycle step destroy. This is the last step in the tearing down of the container. Here you would release
* remaining resources and the component can not be started anymore afterwards
*
*
* @throws Exception
* if something goes wrong
*/
public void destroy() throws Exception {
this.state = this.state.validateStateChange(ComponentState.DESTROYED, getName());
@ -207,13 +238,14 @@ public class StrolchComponent {
/**
* Returns the reference to the {@link StrolchComponent} with the given name, if it exists. If it does not exist, an
* {@link IllegalArgumentException} is thrown
*
*
* @param clazz
*
* the type of component to return
*
* @return the component with the given name
*
*
* @throws IllegalArgumentException
* if the component does not exist
* if the component does not exist
*/
protected <V> V getComponent(Class<V> clazz) {
return this.container.getComponent(clazz);
@ -221,13 +253,14 @@ public class StrolchComponent {
/**
* Performs the given {@link PrivilegedRunnable} as the given system user
*
*
* @param username
* the name of the system user to perform the action as
* the name of the system user to perform the action as
* @param action
* the action to perform
*
* the action to perform
*
* @throws PrivilegeException
* if the given username is not allowed to perform the action
*/
protected void runAs(String username, SystemAction action) throws PrivilegeException {
this.container.getPrivilegeHandler().runAs(username, action);
@ -235,15 +268,16 @@ public class StrolchComponent {
/**
* Performs the given {@link PrivilegedRunnable} as the given system user
*
*
* @param username
* the name of the system user to perform the action as
* the name of the system user to perform the action as
* @param action
* the action to perform
*
* the action to perform
*
* @return the result
*
*
* @throws PrivilegeException
* if the given username is not allowed to perform the action
*/
protected <T> T runWithResult(String username, SystemActionWithResult<T> action) throws PrivilegeException {
return this.container.getPrivilegeHandler().runWithResult(username, action);
@ -251,13 +285,14 @@ public class StrolchComponent {
/**
* Performs the given {@link PrivilegedRunnable} as the given system user
*
*
* @param username
* the name of the system user to perform the action as
* the name of the system user to perform the action as
* @param runnable
* the runnable to perform
*
* the runnable to perform
*
* @throws PrivilegeException
* if the given username is not allowed to perform the action
*/
protected void runAs(String username, PrivilegedRunnable runnable) throws PrivilegeException {
this.container.getPrivilegeHandler().runAs(username, runnable);
@ -265,15 +300,16 @@ public class StrolchComponent {
/**
* Performs the given {@link PrivilegedRunnable} as the given system user
*
*
* @param username
* the name of the system user to perform the action as
* the name of the system user to perform the action as
* @param runnable
* the runnable to perform
*
* the runnable to perform
*
* @return the result
*
*
* @throws PrivilegeException
* if the given username is not allowed to perform the action
*/
protected <T> T runWithResult(String username, PrivilegedRunnableWithResult<T> runnable) throws PrivilegeException {
return this.container.getPrivilegeHandler().runWithResult(username, runnable);
@ -281,13 +317,12 @@ public class StrolchComponent {
/**
* Performs the given {@link PrivilegedRunnable} as the given system user
*
* @param username
* the name of the system user to perform the action as
*
* @param action
* the action to perform
*
* the action to perform
*
* @throws PrivilegeException
* if the given username is not allowed to perform the action
*/
protected void runAsAgent(SystemAction action) throws PrivilegeException {
this.container.getPrivilegeHandler().runAsAgent(action);
@ -295,15 +330,14 @@ public class StrolchComponent {
/**
* Performs the given {@link PrivilegedRunnable} as the given system user
*
* @param username
* the name of the system user to perform the action as
*
* @param action
* the action to perform
*
* the action to perform
*
* @return the result
*
*
* @throws PrivilegeException
* if the given username is not allowed to perform the action
*/
protected <T> T runAsAgentWithResult(SystemActionWithResult<T> action) throws PrivilegeException {
return this.container.getPrivilegeHandler().runAsAgentWithResult(action);
@ -312,11 +346,12 @@ public class StrolchComponent {
/**
* Performs the given {@link PrivilegedRunnable} as the privileged system user
* {@link StrolchConstants#SYSTEM_USER_AGENT}
*
*
* @param runnable
* the runnable to perform
*
* the runnable to perform
*
* @throws PrivilegeException
* if the given username is not allowed to perform the action
*/
protected void runAsAgent(PrivilegedRunnable runnable) throws PrivilegeException {
this.container.getPrivilegeHandler().runAsAgent(runnable);
@ -325,13 +360,14 @@ public class StrolchComponent {
/**
* Performs the given {@link PrivilegedRunnable} as the privileged system user
* {@link StrolchConstants#SYSTEM_USER_AGENT}
*
*
* @param runnable
* the runnable to perform
*
* the runnable to perform
*
* @return the result
*
*
* @throws PrivilegeException
* if the given username is not allowed to perform the action
*/
protected <T> T runAsAgentWithResult(PrivilegedRunnableWithResult<T> runnable) throws PrivilegeException {
return this.container.getPrivilegeHandler().runAsAgentWithResult(runnable);
@ -339,10 +375,10 @@ public class StrolchComponent {
/**
* Opens a {@link StrolchTransaction} for the default realm and certificate
*
*
* @param cert
* the certificate authorizing the transaction
*
* the certificate authorizing the transaction
*
* @return the newly created transaction
*/
protected StrolchTransaction openTx(Certificate cert) {
@ -351,12 +387,12 @@ public class StrolchComponent {
/**
* Opens a {@link StrolchTransaction} for the given realm and certificate
*
*
* @param realm
* the name of the realm in which to open the transaction
* the name of the realm in which to open the transaction
* @param cert
* the certificate authorizing the transaction
*
* the certificate authorizing the transaction
*
* @return the newly created transaction
*/
protected StrolchTransaction openTx(String realm, Certificate cert) {
@ -365,14 +401,14 @@ public class StrolchComponent {
/**
* Opens a {@link StrolchTransaction} for the given realm and certificate
*
*
* @param realm
* the name of the realm in which to open the transaction
* the name of the realm in which to open the transaction
* @param cert
* the certificate authorizing the transaction
* the certificate authorizing the transaction
* @param clazz
* the clazz describing the transaction context
*
* the clazz describing the transaction context
*
* @return the newly created transaction
*/
protected StrolchTransaction openTx(String realm, Certificate cert, Class<?> clazz) {
@ -382,9 +418,11 @@ public class StrolchComponent {
/**
* Returns the version of this component. The version should be stored in the file
* {@link #COMPONENT_VERSION_PROPERTIES}. See {@link ComponentVersion} for more information
*
*
* @return the component's version.
*
* @throws IOException
* if the properties file containing the version could not be read
*/
public ComponentVersion getVersion() throws IOException {
if (this.version == null) {
@ -395,8 +433,7 @@ public class StrolchComponent {
Properties properties = new Properties();
properties.load(stream);
ComponentVersion componentVersion = new ComponentVersion(getName(), properties);
this.version = componentVersion;
this.version = new ComponentVersion(getName(), properties);
}
}

View File

@ -1,12 +1,12 @@
/*
* Copyright 2013 Robert von Burg <eitch@eitchnet.ch>
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -17,68 +17,62 @@ package li.strolch.agent.impl;
import java.text.MessageFormat;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import li.strolch.agent.api.Observer;
import li.strolch.agent.api.ObserverEvent;
import li.strolch.agent.api.ObserverHandler;
import li.strolch.agent.api.StrolchAgent;
import li.strolch.model.StrolchRootElement;
import li.strolch.runtime.ThreadPoolFactory;
import li.strolch.utils.collections.MapOfLists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A simple {@link ObserverHandler} which keeps a reference to all registered {@link Observer Observers} and notifies
* them when one of the notify methods are called
*
*
* @author Robert von Burg <eitch@eitchnet.ch>
*/
public class DefaultObserverHandler implements ObserverHandler {
private static final Logger logger = LoggerFactory.getLogger(DefaultObserverHandler.class);
private final StrolchAgent agent;
private MapOfLists<String, Observer> observerMap;
private Future<?> future;
private ExecutorService executorService;
public DefaultObserverHandler() {
public DefaultObserverHandler(StrolchAgent agent) {
this.agent = agent;
this.observerMap = new MapOfLists<>();
}
@Override
public void start() {
this.executorService = Executors.newSingleThreadExecutor(new ThreadPoolFactory("ObserverHandler"));
// nothing to do
}
@Override
public void stop() {
if (this.executorService != null) {
this.executorService.shutdown();
while (!this.executorService.isTerminated()) {
logger.info("Waiting for last update to complete...");
try {
Thread.sleep(50L);
} catch (InterruptedException e) {
logger.error("Interrupted!");
}
}
this.executorService = null;
if (this.future != null) {
this.future.cancel(false);
this.future = null;
}
}
private ScheduledExecutorService getExecutor() {
return this.agent.getScheduledExecutor("Observer");
}
@Override
public void notify(ObserverEvent event) {
if (event.added.isEmpty() && event.updated.isEmpty() && event.removed.isEmpty())
return;
this.executorService.execute(() -> {
doUpdates(event);
});
this.future = this.agent.getExecutor("Observer").submit(() -> doUpdates(event));
}
protected void doUpdates(ObserverEvent event) {

View File

@ -1,12 +1,12 @@
/*
* Copyright 2013 Robert von Burg <eitch@eitchnet.ch>
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -17,25 +17,23 @@ package li.strolch.agent.impl;
import java.text.MessageFormat;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import li.strolch.agent.api.Observer;
import li.strolch.agent.api.ObserverEvent;
import li.strolch.agent.api.ObserverHandler;
import li.strolch.agent.api.StrolchAgent;
import li.strolch.model.StrolchRootElement;
import li.strolch.runtime.ThreadPoolFactory;
import li.strolch.utils.collections.MapOfLists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A simple {@link ObserverHandler} which keeps a reference to all registered {@link Observer Observers} and notifies
* them when one of the notify methods are called
*
*
* @author Robert von Burg <eitch@eitchnet.ch>
*/
public class EventCollectingObserverHandler implements ObserverHandler {
@ -44,18 +42,19 @@ public class EventCollectingObserverHandler implements ObserverHandler {
private MapOfLists<String, Observer> observerMap;
private ScheduledExecutorService executorService;
private ObserverEvent observerEvent;
private ScheduledFuture<?> future;
private StrolchAgent agent;
public EventCollectingObserverHandler() {
public EventCollectingObserverHandler(StrolchAgent agent) {
this.agent = agent;
this.observerMap = new MapOfLists<>();
}
@Override
public void start() {
this.executorService = Executors.newScheduledThreadPool(1, new ThreadPoolFactory("ObserverHandler"));
// nothing to do
}
@Override
@ -65,20 +64,10 @@ public class EventCollectingObserverHandler implements ObserverHandler {
this.future.cancel(false);
this.future = null;
}
}
if (this.executorService != null) {
this.executorService.shutdown();
while (!this.executorService.isTerminated()) {
logger.info("Waiting for last update to complete...");
try {
Thread.sleep(50L);
} catch (InterruptedException e) {
logger.error("Interrupted!");
}
}
this.executorService = null;
}
private ScheduledExecutorService getExecutor() {
return this.agent.getScheduledExecutor("Observer");
}
@Override
@ -97,7 +86,7 @@ public class EventCollectingObserverHandler implements ObserverHandler {
}
if (this.future == null || this.future.isDone()) {
this.future = this.executorService.scheduleAtFixedRate(() -> doUpdates(), 100, 100, TimeUnit.MILLISECONDS);
this.future = getExecutor().scheduleAtFixedRate(this::doUpdates, 100, 100, TimeUnit.MILLISECONDS);
}
}
@ -126,8 +115,7 @@ public class EventCollectingObserverHandler implements ObserverHandler {
synchronized (this) {
if (this.observerEvent != null) {
this.future = this.executorService.scheduleAtFixedRate(() -> doUpdates(), 100, 100,
TimeUnit.MILLISECONDS);
this.future = getExecutor().scheduleAtFixedRate(this::doUpdates, 100, 100, TimeUnit.MILLISECONDS);
}
}
}

View File

@ -1,12 +1,12 @@
/*
* Copyright 2013 Robert von Burg <eitch@eitchnet.ch>
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -101,9 +101,9 @@ public abstract class InternalStrolchRealm implements StrolchRealm {
if (this.updateObservers) {
String delayedObserversKey = makeRealmKey(getRealm(), PROP_ENABLED_DELAYED_OBSERVER_UPDATES);
if (configuration.getBoolean(delayedObserversKey, Boolean.FALSE)) {
this.observerHandler = new DefaultObserverHandler();
this.observerHandler = new DefaultObserverHandler(container.getAgent());
} else {
this.observerHandler = new EventCollectingObserverHandler();
this.observerHandler = new EventCollectingObserverHandler(container.getAgent());
logger.info("Enabled Delayed Observer Updates.");
}
}
@ -136,7 +136,8 @@ public abstract class InternalStrolchRealm implements StrolchRealm {
else
logger.info("Versioning not enabled for realm " + getRealm()); //$NON-NLS-1$
logger.info(MessageFormat.format("Using a locking try timeout of {0}s", timeUnit.toSeconds(time))); //$NON-NLS-1$
logger.info(
MessageFormat.format("Using a locking try timeout of {0}s", timeUnit.toSeconds(time))); //$NON-NLS-1$
}
@Override

View File

@ -39,17 +39,18 @@ public abstract class StrolchJob implements Runnable {
}
protected ScheduledExecutorService getScheduledExecutor() {
return getAgent().getScheduledExecutor();
return getAgent().getScheduledExecutor("StrolchJob");
}
/**
* Performs the given {@link PrivilegedRunnable} as the privileged system user
* {@link StrolchConstants#SYSTEM_USER_AGENT}
*
*
* @param runnable
* the runnable to perform
*
* the runnable to perform
*
* @throws PrivilegeException
* if the agent can not perform the action
*/
protected void runAsAgent(PrivilegedRunnable runnable) throws PrivilegeException {
getContainer().getPrivilegeHandler().runAsAgent(runnable);
@ -57,10 +58,10 @@ public abstract class StrolchJob implements Runnable {
/**
* Opens a {@link StrolchTransaction} for the default realm and certificate
*
*
* @param cert
* the certificate authorizing the transaction
*
* the certificate authorizing the transaction
*
* @return the newly created transaction
*/
protected StrolchTransaction openTx(Certificate cert) {

View File

@ -1,12 +1,12 @@
/*
* Copyright 2013 Robert von Burg <eitch@eitchnet.ch>
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -97,7 +97,7 @@ public class DefaultStrolchSessionHandler extends StrolchComponent implements St
- this.certificateMap.size()) + " had timed out and were removed.");
}
this.sessionHandler = getScheduledExecutor()
this.sessionHandler = getScheduledExecutor("SessionHandler")
.scheduleWithFixedDelay(this::handleSessions, 5, 1, TimeUnit.MINUTES);
super.start();
@ -263,7 +263,8 @@ public class DefaultStrolchSessionHandler extends StrolchComponent implements St
}
@Override
public UserSession getSession(Certificate certificate, String sessionId) throws AccessDeniedException, PrivilegeException {
public UserSession getSession(Certificate certificate, String sessionId)
throws AccessDeniedException, PrivilegeException {
PrivilegeContext ctx = this.privilegeHandler.validate(certificate);
ctx.assertHasPrivilege(PRIVILEGE_GET_SESSION);
synchronized (this.certificateMap) {

View File

@ -3,14 +3,9 @@ package li.strolch.execution;
import java.util.ResourceBundle;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import li.strolch.agent.api.ComponentContainer;
import li.strolch.execution.command.ExecuteActivityCommand;
import li.strolch.execution.command.SetActionToErrorCommand;
import li.strolch.execution.command.SetActionToExecutedCommand;
import li.strolch.execution.command.SetActionToStoppedCommand;
import li.strolch.execution.command.SetActionToWarningCommand;
import li.strolch.execution.command.*;
import li.strolch.execution.policy.ActivityArchivalPolicy;
import li.strolch.execution.policy.ExecutionPolicy;
import li.strolch.handler.operationslog.LogMessage;
@ -25,7 +20,6 @@ import li.strolch.model.policy.PolicyDef;
import li.strolch.persistence.api.StrolchTransaction;
import li.strolch.policy.PolicyHandler;
import li.strolch.privilege.model.PrivilegeContext;
import li.strolch.runtime.ThreadPoolFactory;
import li.strolch.runtime.configuration.ComponentConfiguration;
import li.strolch.utils.collections.MapOfSets;
import li.strolch.utils.dbc.DBC;
@ -41,8 +35,6 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
private static final String KEY_DEFAULT_ACTIVITY_ARCHIVAL = "key:DefaultActivityArchival";
private static final String PROP_RESTART_EXECUTION = "restartExecution";
private ExecutorService executorService;
private MapOfSets<String, Locator> registeredActivities;
private DelayedExecutionTimer delayedExecutionTimer;
@ -62,8 +54,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
@Override
public void start() throws Exception {
this.executorService = Executors.newCachedThreadPool(new ThreadPoolFactory("ExecutionHandler"));
this.delayedExecutionTimer = new SimpleDurationExecutionTimer();
this.delayedExecutionTimer = new SimpleDurationExecutionTimer(getContainer().getAgent());
// restart execution of activities already in execution
if (!getConfiguration().getBoolean(PROP_RESTART_EXECUTION, Boolean.FALSE)) {
@ -79,15 +70,6 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
@Override
public void stop() throws Exception {
if (this.executorService != null) {
this.executorService.shutdown();
while (!this.executorService.isTerminated()) {
logger.info("Waiting for executor service to terminate...");
Thread.sleep(50L);
}
this.executorService = null;
}
if (this.delayedExecutionTimer != null) {
this.delayedExecutionTimer.destroy();
this.delayedExecutionTimer = null;
@ -169,7 +151,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
@Override
public void toExecution(String realm, Locator locator) {
this.executorService.execute(() -> {
getExecutor().execute(() -> {
try {
runAsAgent(ctx -> {
toExecution(realm, locator, ctx);
@ -186,9 +168,13 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
});
}
private ExecutorService getExecutor() {
return getExecutorService("ExecutionHandler");
}
@Override
public void toExecuted(String realm, Locator locator) {
this.executorService.execute(() -> {
getExecutor().execute(() -> {
try {
runAsAgent(ctx -> {
toExecuted(realm, locator, ctx);
@ -207,7 +193,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
@Override
public void toStopped(String realm, Locator locator) {
this.executorService.execute(() -> {
getExecutor().execute(() -> {
try {
runAsAgent(ctx -> {
toStopped(realm, locator, ctx);
@ -226,7 +212,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
@Override
public void toError(String realm, Locator locator) {
this.executorService.execute(() -> {
getExecutor().execute(() -> {
try {
runAsAgent(ctx -> {
toError(realm, locator, ctx);
@ -245,7 +231,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
@Override
public void toWarning(String realm, Locator locator) {
this.executorService.execute(() -> {
getExecutor().execute(() -> {
try {
runAsAgent(ctx -> {
toWarning(realm, locator, ctx);
@ -264,7 +250,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler {
@Override
public void archiveActivity(String realm, Locator activityLoc) {
this.executorService.execute(() -> {
getExecutor().execute(() -> {
try {
runAsAgent(ctx -> {
try (StrolchTransaction tx = openTx(realm, ctx.getCertificate(), ActivityArchivalPolicy.class)) {

View File

@ -2,48 +2,32 @@ package li.strolch.execution;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import li.strolch.agent.api.ComponentContainer;
import li.strolch.agent.api.StrolchAgent;
import li.strolch.model.Locator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import li.strolch.agent.api.ComponentContainer;
import li.strolch.model.Locator;
import li.strolch.runtime.ThreadPoolFactory;
public class SimpleDurationExecutionTimer implements DelayedExecutionTimer {
private static final Logger logger = LoggerFactory.getLogger(SimpleDurationExecutionTimer.class);
private Map<Locator, ScheduledFuture<?>> simulationTasks;
private ScheduledExecutorService scheduledExecutorService;
private StrolchAgent agent;
public SimpleDurationExecutionTimer() {
this.scheduledExecutorService = Executors.newScheduledThreadPool(0, new ThreadPoolFactory("DurationExecution"));
public SimpleDurationExecutionTimer(StrolchAgent agent) {
this.agent = agent;
this.simulationTasks = new HashMap<>();
}
@Override
public void destroy() {
this.simulationTasks.values().forEach(task -> task.cancel(false));
if (this.scheduledExecutorService != null) {
this.scheduledExecutorService.shutdown();
while (!this.scheduledExecutorService.isTerminated()) {
logger.info("Waiting for executor service to terminate...");
try {
Thread.sleep(50L);
} catch (InterruptedException e) {
logger.warn("Interrupted!");
}
}
this.scheduledExecutorService = null;
}
}
@Override
@ -59,12 +43,15 @@ public class SimpleDurationExecutionTimer implements DelayedExecutionTimer {
@Override
public void execute(String realm, ComponentContainer container, Locator actionLocator, long duration) {
SimulationTask task = new SimulationTask(realm, container, actionLocator);
ScheduledFuture<?> future = this.scheduledExecutorService.schedule(task, duration, TimeUnit.MILLISECONDS);
ScheduledFuture<?> future = getExecutor().schedule(task, duration, TimeUnit.MILLISECONDS);
this.simulationTasks.put(actionLocator, future);
logger.info("Registered execution timer for " + actionLocator);
}
private ScheduledExecutorService getExecutor() {
return this.agent.getScheduledExecutor("DurationExecution");
}
private void executed(String realm, ComponentContainer container, Locator locator) {
logger.info("Completing execution for " + locator);

View File

@ -1,12 +1,12 @@
/*
* Copyright 2015 Robert von Burg <eitch@eitchnet.ch>
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -82,7 +82,7 @@ public class ServiceExecutionHandler extends StrolchComponent {
ServiceExecutionStatus status = new ServiceExecutionStatus(serviceName);
this.serviceContextMap.put(serviceName, status);
getExecutorService().execute(() -> doService(svcCtx));
getExecutorService("ServiceExecution").execute(() -> doService(svcCtx));
Thread.sleep(20L);
return status;