diff --git a/li.strolch.agent/src/main/java/li/strolch/agent/api/ObserverHandler.java b/li.strolch.agent/src/main/java/li/strolch/agent/api/ObserverHandler.java index ce906410a..c151b227a 100644 --- a/li.strolch.agent/src/main/java/li/strolch/agent/api/ObserverHandler.java +++ b/li.strolch.agent/src/main/java/li/strolch/agent/api/ObserverHandler.java @@ -1,12 +1,12 @@ /* * Copyright 2013 Robert von Burg - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -23,7 +23,7 @@ import li.strolch.model.Tags; * Implementing the observer pattern by allowing {@link Observer} to register themselves for updates to * {@link StrolchRootElement} *

- * + * *

* Note: The key in all the methods can be any string, but as a convenience it is mostly one of the following: *

@@ -32,51 +32,51 @@ import li.strolch.model.Tags; *
  • {@link Tags#ORDER}
  • *
  • {@link Tags#ACTIVITY}
  • * - * + * *

    * Should a special case arise, then it a contract must be defined by which a key is negotiated between an event * distributer and the observer *

    - * + * * @author Robert von Burg */ public interface ObserverHandler { /** * Update observers with the given event - * + * * @param observerEvent - * containing the updates + * containing the updates */ - public void notify(ObserverEvent observerEvent); + void notify(ObserverEvent observerEvent); /** * Registers the {@link Observer} for notification of objects under the given key - * + * * @param key - * the key for which to the observer wants to be notified + * the key for which to the observer wants to be notified * @param observer - * the observer to register + * the observer to register */ - public void registerObserver(String key, Observer observer); + void registerObserver(String key, Observer observer); /** * Unregister the given {@link Observer} - * + * * @param key - * the key for which to the observer was registered + * the key for which to the observer was registered * @param observer - * the observer unregister + * the observer unregister */ - public void unregisterObserver(String key, Observer observer); + void unregisterObserver(String key, Observer observer); /** * Start the update thread */ - public void start(); + void start(); /** * Stop the update thread */ - public void stop(); + void stop(); } diff --git a/li.strolch.agent/src/main/java/li/strolch/agent/api/StrolchAgent.java b/li.strolch.agent/src/main/java/li/strolch/agent/api/StrolchAgent.java index 3767c0e9d..7704f30c5 100644 --- a/li.strolch.agent/src/main/java/li/strolch/agent/api/StrolchAgent.java +++ b/li.strolch.agent/src/main/java/li/strolch/agent/api/StrolchAgent.java @@ -1,12 +1,12 @@ /* * Copyright 2013 Robert von Burg - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,6 +19,8 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.text.MessageFormat; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -26,15 +28,16 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import li.strolch.agent.impl.ComponentContainerImpl; +import li.strolch.runtime.ThreadPoolFactory; import li.strolch.runtime.configuration.ConfigurationParser; import li.strolch.runtime.configuration.RuntimeConfiguration; import li.strolch.runtime.configuration.StrolchConfiguration; +import li.strolch.utils.dbc.DBC; import li.strolch.utils.helper.StringHelper; import li.strolch.utils.helper.SystemHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @author Robert von Burg @@ -48,8 +51,9 @@ public class StrolchAgent { private ComponentContainerImpl container; private StrolchConfiguration strolchConfiguration; private StrolchVersion appVersion; - private ExecutorService executor; - private ScheduledExecutorService scheduledExecutor; + + private Map executors; + private Map scheduledExecutors; public StrolchAgent(StrolchVersion appVersion) { this.appVersion = appVersion; @@ -57,7 +61,7 @@ public class StrolchAgent { /** * Return the {@link StrolchConfiguration} - * + * * @return the {@link StrolchConfiguration} */ public StrolchConfiguration getStrolchConfiguration() { @@ -66,7 +70,7 @@ public class StrolchAgent { /** * Return the container - * + * * @return the container */ public ComponentContainer getContainer() { @@ -82,20 +86,42 @@ public class StrolchAgent { /** * Return the {@link ExecutorService} instantiated for this agent - * + * * @return the {@link ExecutorService} instantiated for this agent */ public ExecutorService getExecutor() { - return this.executor; + return getExecutor("Agent"); + } + + public synchronized ExecutorService getExecutor(String poolName) { + DBC.PRE.assertNotEmpty("poolName must be set!", poolName); + ExecutorService executor = this.executors.get(poolName); + if (executor == null) { + executor = Executors.newCachedThreadPool(new ThreadPoolFactory(poolName)); + this.executors.put(poolName, executor); + } + + return executor; } /** * Return the {@link ScheduledExecutorService} instantiated for this agent - * + * * @return the {@link ScheduledExecutorService} instantiated for this agent */ public ScheduledExecutorService getScheduledExecutor() { - return this.scheduledExecutor; + return getScheduledExecutor("Agent"); + } + + public synchronized ScheduledExecutorService getScheduledExecutor(String poolName) { + DBC.PRE.assertNotEmpty("poolName must be set!", poolName); + ScheduledExecutorService executor = this.scheduledExecutors.get(poolName); + if (executor == null) { + executor = Executors.newScheduledThreadPool(4, new ThreadPoolFactory(poolName)); + this.scheduledExecutors.put(poolName, executor); + } + + return executor; } /** @@ -105,8 +131,10 @@ public class StrolchAgent { public void initialize() { if (this.container == null) throw new RuntimeException("Please call setup first!"); - this.executor = Executors.newCachedThreadPool(); - this.scheduledExecutor = Executors.newScheduledThreadPool(4); + + this.executors = new HashMap<>(); + this.scheduledExecutors = new HashMap<>(); + this.container.initialize(this.strolchConfiguration); } @@ -127,17 +155,23 @@ public class StrolchAgent { this.container.stop(); } - private void shutdownExecutorService(ExecutorService executor) { - try { - logger.info("Shutting down executor..."); - executor.shutdown(); - executor.awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - logger.error("Was interrupted while shutting down tasks"); - } finally { - if (!executor.isTerminated()) { - logger.error("Tasks not stopped after " + 5 + "s. Shutting down now."); - executor.shutdownNow(); + private void shutdownExecutorService(Map executors) { + + for (String poolName : executors.keySet()) { + logger.info("Shutting down executor pool " + poolName); + + T executor = executors.get(poolName); + + try { + executor.shutdown(); + executor.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.error("Was interrupted while shutting down tasks"); + } finally { + if (!executor.isTerminated()) { + logger.error("Tasks not stopped after " + 5 + "s. Shutting down now."); + executor.shutdownNow(); + } } } } @@ -147,10 +181,10 @@ public class StrolchAgent { */ public void destroy() { - if (this.executor != null) - shutdownExecutorService(this.executor); - if (this.scheduledExecutor != null) - shutdownExecutorService(this.scheduledExecutor); + if (this.executors != null) + shutdownExecutorService(this.executors); + if (this.scheduledExecutors != null) + shutdownExecutorService(this.scheduledExecutors); if (this.container != null) this.container.destroy(); @@ -161,15 +195,19 @@ public class StrolchAgent { *

    * Note: Use {@link StrolchBootstrapper} instead of calling this method directly! *

    - * + * *

    * Sets up the agent by parsing the configuration file and initializes the given environment *

    - * + * * @param environment + * the current environment * @param configPathF + * the path to the config directory * @param dataPathF + * the path to the data directory * @param tempPathF + * the path to the temp directory */ void setup(String environment, File configPathF, File dataPathF, File tempPathF) { @@ -180,8 +218,8 @@ public class StrolchAgent { logger.info(" - Temp: " + tempPathF.getAbsolutePath()); logger.info(" - user.dir: " + SystemHelper.getUserDir()); - this.strolchConfiguration = ConfigurationParser.parseConfiguration(environment, configPathF, dataPathF, - tempPathF); + this.strolchConfiguration = ConfigurationParser + .parseConfiguration(environment, configPathF, dataPathF, tempPathF); ComponentContainerImpl container = new ComponentContainerImpl(this); container.setup(this.strolchConfiguration); @@ -189,7 +227,8 @@ public class StrolchAgent { this.container = container; RuntimeConfiguration config = this.strolchConfiguration.getRuntimeConfiguration(); - logger.info(MessageFormat.format("Setup Agent {0}:{1}", config.getApplicationName(), config.getEnvironment())); //$NON-NLS-1$ + logger.info(MessageFormat + .format("Setup Agent {0}:{1}", config.getApplicationName(), config.getEnvironment())); //$NON-NLS-1$ } protected void assertContainerStarted() { @@ -217,7 +256,7 @@ public class StrolchAgent { /** * Returns the version of this agent - * + * * @return the version of this agent */ public VersionQueryResult getVersion() { @@ -228,13 +267,14 @@ public class StrolchAgent { Properties properties = new Properties(); - try (InputStream stream = getClass().getResourceAsStream(AGENT_VERSION_PROPERTIES);) { + try (InputStream stream = getClass().getResourceAsStream(AGENT_VERSION_PROPERTIES)) { properties.load(stream); AgentVersion agentVersion = new AgentVersion( getStrolchConfiguration().getRuntimeConfiguration().getApplicationName(), properties); queryResult.setAgentVersion(agentVersion); } catch (IOException e) { - String msg = MessageFormat.format("Failed to read version properties for agent: {0}", e.getMessage()); //$NON-NLS-1$ + String msg = MessageFormat + .format("Failed to read version properties for agent: {0}", e.getMessage()); //$NON-NLS-1$ queryResult.getErrors().add(msg); logger.error(msg, e); } diff --git a/li.strolch.agent/src/main/java/li/strolch/agent/api/StrolchComponent.java b/li.strolch.agent/src/main/java/li/strolch/agent/api/StrolchComponent.java index 724043e8c..bb1b286fa 100644 --- a/li.strolch.agent/src/main/java/li/strolch/agent/api/StrolchComponent.java +++ b/li.strolch.agent/src/main/java/li/strolch/agent/api/StrolchComponent.java @@ -1,12 +1,12 @@ /* * Copyright 2013 Robert von Burg - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -42,17 +42,17 @@ import li.strolch.runtime.privilege.PrivilegedRunnableWithResult; * A {@link StrolchComponent} is a configurable extension to Strolch. Every major feature should be implemented as a * {@link StrolchComponent} so that they can be easily added or removed from a Strolch runtime. *

    - * + * *

    * A {@link StrolchComponent} has access to the container and can perform different operations. They can be passive or * active and their life cycle is bound to the container's life cycle *

    - * + * *

    * A {@link StrolchComponent} is registered in the Strolch configuration file and can have different configuration * depending on the container's runtime environment *

    - * + * * @author Robert von Burg */ public class StrolchComponent { @@ -68,11 +68,11 @@ public class StrolchComponent { /** * Constructor which takes a reference to the container and the component's name under which it can be retrieved at * runtime (although one mostly retrieves the component by interface class for automatic casting) - * + * * @param container - * the container + * the container * @param componentName - * the component name + * the component name */ public StrolchComponent(ComponentContainer container, String componentName) { this.container = container; @@ -89,7 +89,7 @@ public class StrolchComponent { /** * Returns the current component's state - * + * * @return the component's current state */ public ComponentState getState() { @@ -98,7 +98,7 @@ public class StrolchComponent { /** * Returns the reference to the container for sub classes - * + * * @return the reference to the container */ protected ComponentContainer getContainer() { @@ -107,7 +107,7 @@ public class StrolchComponent { /** * The components current configuration dependent on the environment which is loaded - * + * * @return the component's configuration */ protected ComponentConfiguration getConfiguration() { @@ -116,22 +116,46 @@ public class StrolchComponent { /** * Return the {@link ExecutorService} instantiated for this agent - * + * * @return the {@link ExecutorService} instantiated for this agent */ protected ExecutorService getExecutorService() { return this.container.getAgent().getExecutor(); } + /** + * Return the {@link ExecutorService} for the given poolName instantiated for this agent + * + * @param poolName + * the name of the pool + * + * @return the {@link ExecutorService} for the given poolName instantiated for this agent + */ + protected ExecutorService getExecutorService(String poolName) { + return this.container.getAgent().getExecutor(poolName); + } + /** * Return the {@link ScheduledExecutorService} instantiated for this agent - * + * * @return the {@link ScheduledExecutorService} instantiated for this agent */ protected ScheduledExecutorService getScheduledExecutor() { return this.container.getAgent().getScheduledExecutor(); } + /** + * Return the {@link ScheduledExecutorService} for the given poolName instantiated for this agent + * + * @param poolName + * the name of the pool + * + * @return the {@link ScheduledExecutorService} instantiated for this agent + */ + protected ScheduledExecutorService getScheduledExecutor(String poolName) { + return this.container.getAgent().getScheduledExecutor(poolName); + } + /** * Can be used by sub classes to assert that the component is started and thus ready to use, before any component * methods are used @@ -156,8 +180,9 @@ public class StrolchComponent { /** * Life cycle step setup. This is a very early step in the container's startup phase. - * + * * @param configuration + * the configuration */ public void setup(ComponentConfiguration configuration) { this.state = this.state.validateStateChange(ComponentState.SETUP, getName()); @@ -165,9 +190,12 @@ public class StrolchComponent { /** * Life cycle step initialize. Here you would typically read configuration values - * + * * @param configuration + * the configuration + * * @throws Exception + * if something goes wrong */ public void initialize(ComponentConfiguration configuration) throws Exception { this.configuration = configuration; @@ -177,8 +205,9 @@ public class StrolchComponent { /** * Life cycle step start. This is the last step of startup and is where threads and connections etc. would be * prepared. Can also be called after stop, to restart the component. - * + * * @throws Exception + * if something goes wrong */ public void start() throws Exception { this.state = this.state.validateStateChange(ComponentState.STARTED, getName()); @@ -187,8 +216,9 @@ public class StrolchComponent { /** * Life cycle step stop. This is the first step in the tearing down of the container. Stop all active threads and * connections here. After stop is called, another start might also be called to restart the component. - * + * * @throws Exception + * if something goes wrong */ public void stop() throws Exception { this.state = this.state.validateStateChange(ComponentState.STOPPED, getName()); @@ -197,8 +227,9 @@ public class StrolchComponent { /** * Life cycle step destroy. This is the last step in the tearing down of the container. Here you would release * remaining resources and the component can not be started anymore afterwards - * + * * @throws Exception + * if something goes wrong */ public void destroy() throws Exception { this.state = this.state.validateStateChange(ComponentState.DESTROYED, getName()); @@ -207,13 +238,14 @@ public class StrolchComponent { /** * Returns the reference to the {@link StrolchComponent} with the given name, if it exists. If it does not exist, an * {@link IllegalArgumentException} is thrown - * + * * @param clazz - * + * the type of component to return + * * @return the component with the given name - * + * * @throws IllegalArgumentException - * if the component does not exist + * if the component does not exist */ protected V getComponent(Class clazz) { return this.container.getComponent(clazz); @@ -221,13 +253,14 @@ public class StrolchComponent { /** * Performs the given {@link PrivilegedRunnable} as the given system user - * + * * @param username - * the name of the system user to perform the action as + * the name of the system user to perform the action as * @param action - * the action to perform - * + * the action to perform + * * @throws PrivilegeException + * if the given username is not allowed to perform the action */ protected void runAs(String username, SystemAction action) throws PrivilegeException { this.container.getPrivilegeHandler().runAs(username, action); @@ -235,15 +268,16 @@ public class StrolchComponent { /** * Performs the given {@link PrivilegedRunnable} as the given system user - * + * * @param username - * the name of the system user to perform the action as + * the name of the system user to perform the action as * @param action - * the action to perform - * + * the action to perform + * * @return the result - * + * * @throws PrivilegeException + * if the given username is not allowed to perform the action */ protected T runWithResult(String username, SystemActionWithResult action) throws PrivilegeException { return this.container.getPrivilegeHandler().runWithResult(username, action); @@ -251,13 +285,14 @@ public class StrolchComponent { /** * Performs the given {@link PrivilegedRunnable} as the given system user - * + * * @param username - * the name of the system user to perform the action as + * the name of the system user to perform the action as * @param runnable - * the runnable to perform - * + * the runnable to perform + * * @throws PrivilegeException + * if the given username is not allowed to perform the action */ protected void runAs(String username, PrivilegedRunnable runnable) throws PrivilegeException { this.container.getPrivilegeHandler().runAs(username, runnable); @@ -265,15 +300,16 @@ public class StrolchComponent { /** * Performs the given {@link PrivilegedRunnable} as the given system user - * + * * @param username - * the name of the system user to perform the action as + * the name of the system user to perform the action as * @param runnable - * the runnable to perform - * + * the runnable to perform + * * @return the result - * + * * @throws PrivilegeException + * if the given username is not allowed to perform the action */ protected T runWithResult(String username, PrivilegedRunnableWithResult runnable) throws PrivilegeException { return this.container.getPrivilegeHandler().runWithResult(username, runnable); @@ -281,13 +317,12 @@ public class StrolchComponent { /** * Performs the given {@link PrivilegedRunnable} as the given system user - * - * @param username - * the name of the system user to perform the action as + * * @param action - * the action to perform - * + * the action to perform + * * @throws PrivilegeException + * if the given username is not allowed to perform the action */ protected void runAsAgent(SystemAction action) throws PrivilegeException { this.container.getPrivilegeHandler().runAsAgent(action); @@ -295,15 +330,14 @@ public class StrolchComponent { /** * Performs the given {@link PrivilegedRunnable} as the given system user - * - * @param username - * the name of the system user to perform the action as + * * @param action - * the action to perform - * + * the action to perform + * * @return the result - * + * * @throws PrivilegeException + * if the given username is not allowed to perform the action */ protected T runAsAgentWithResult(SystemActionWithResult action) throws PrivilegeException { return this.container.getPrivilegeHandler().runAsAgentWithResult(action); @@ -312,11 +346,12 @@ public class StrolchComponent { /** * Performs the given {@link PrivilegedRunnable} as the privileged system user * {@link StrolchConstants#SYSTEM_USER_AGENT} - * + * * @param runnable - * the runnable to perform - * + * the runnable to perform + * * @throws PrivilegeException + * if the given username is not allowed to perform the action */ protected void runAsAgent(PrivilegedRunnable runnable) throws PrivilegeException { this.container.getPrivilegeHandler().runAsAgent(runnable); @@ -325,13 +360,14 @@ public class StrolchComponent { /** * Performs the given {@link PrivilegedRunnable} as the privileged system user * {@link StrolchConstants#SYSTEM_USER_AGENT} - * + * * @param runnable - * the runnable to perform - * + * the runnable to perform + * * @return the result - * + * * @throws PrivilegeException + * if the given username is not allowed to perform the action */ protected T runAsAgentWithResult(PrivilegedRunnableWithResult runnable) throws PrivilegeException { return this.container.getPrivilegeHandler().runAsAgentWithResult(runnable); @@ -339,10 +375,10 @@ public class StrolchComponent { /** * Opens a {@link StrolchTransaction} for the default realm and certificate - * + * * @param cert - * the certificate authorizing the transaction - * + * the certificate authorizing the transaction + * * @return the newly created transaction */ protected StrolchTransaction openTx(Certificate cert) { @@ -351,12 +387,12 @@ public class StrolchComponent { /** * Opens a {@link StrolchTransaction} for the given realm and certificate - * + * * @param realm - * the name of the realm in which to open the transaction + * the name of the realm in which to open the transaction * @param cert - * the certificate authorizing the transaction - * + * the certificate authorizing the transaction + * * @return the newly created transaction */ protected StrolchTransaction openTx(String realm, Certificate cert) { @@ -365,14 +401,14 @@ public class StrolchComponent { /** * Opens a {@link StrolchTransaction} for the given realm and certificate - * + * * @param realm - * the name of the realm in which to open the transaction + * the name of the realm in which to open the transaction * @param cert - * the certificate authorizing the transaction + * the certificate authorizing the transaction * @param clazz - * the clazz describing the transaction context - * + * the clazz describing the transaction context + * * @return the newly created transaction */ protected StrolchTransaction openTx(String realm, Certificate cert, Class clazz) { @@ -382,9 +418,11 @@ public class StrolchComponent { /** * Returns the version of this component. The version should be stored in the file * {@link #COMPONENT_VERSION_PROPERTIES}. See {@link ComponentVersion} for more information - * + * * @return the component's version. + * * @throws IOException + * if the properties file containing the version could not be read */ public ComponentVersion getVersion() throws IOException { if (this.version == null) { @@ -395,8 +433,7 @@ public class StrolchComponent { Properties properties = new Properties(); properties.load(stream); - ComponentVersion componentVersion = new ComponentVersion(getName(), properties); - this.version = componentVersion; + this.version = new ComponentVersion(getName(), properties); } } diff --git a/li.strolch.agent/src/main/java/li/strolch/agent/impl/DefaultObserverHandler.java b/li.strolch.agent/src/main/java/li/strolch/agent/impl/DefaultObserverHandler.java index c5dc8d679..ca87d5cca 100644 --- a/li.strolch.agent/src/main/java/li/strolch/agent/impl/DefaultObserverHandler.java +++ b/li.strolch.agent/src/main/java/li/strolch/agent/impl/DefaultObserverHandler.java @@ -1,12 +1,12 @@ /* * Copyright 2013 Robert von Burg - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,68 +17,62 @@ package li.strolch.agent.impl; import java.text.MessageFormat; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import li.strolch.agent.api.Observer; import li.strolch.agent.api.ObserverEvent; import li.strolch.agent.api.ObserverHandler; +import li.strolch.agent.api.StrolchAgent; import li.strolch.model.StrolchRootElement; -import li.strolch.runtime.ThreadPoolFactory; import li.strolch.utils.collections.MapOfLists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A simple {@link ObserverHandler} which keeps a reference to all registered {@link Observer Observers} and notifies * them when one of the notify methods are called - * + * * @author Robert von Burg */ public class DefaultObserverHandler implements ObserverHandler { private static final Logger logger = LoggerFactory.getLogger(DefaultObserverHandler.class); + private final StrolchAgent agent; private MapOfLists observerMap; + private Future future; - private ExecutorService executorService; - - public DefaultObserverHandler() { + public DefaultObserverHandler(StrolchAgent agent) { + this.agent = agent; this.observerMap = new MapOfLists<>(); } @Override public void start() { - this.executorService = Executors.newSingleThreadExecutor(new ThreadPoolFactory("ObserverHandler")); + // nothing to do } @Override public void stop() { - if (this.executorService != null) { - this.executorService.shutdown(); - while (!this.executorService.isTerminated()) { - logger.info("Waiting for last update to complete..."); - try { - Thread.sleep(50L); - } catch (InterruptedException e) { - logger.error("Interrupted!"); - } - } - this.executorService = null; + if (this.future != null) { + this.future.cancel(false); + this.future = null; } } + private ScheduledExecutorService getExecutor() { + return this.agent.getScheduledExecutor("Observer"); + } + @Override public void notify(ObserverEvent event) { if (event.added.isEmpty() && event.updated.isEmpty() && event.removed.isEmpty()) return; - this.executorService.execute(() -> { - doUpdates(event); - }); + this.future = this.agent.getExecutor("Observer").submit(() -> doUpdates(event)); } protected void doUpdates(ObserverEvent event) { diff --git a/li.strolch.agent/src/main/java/li/strolch/agent/impl/EventCollectingObserverHandler.java b/li.strolch.agent/src/main/java/li/strolch/agent/impl/EventCollectingObserverHandler.java index a23955ae6..02a8bec06 100644 --- a/li.strolch.agent/src/main/java/li/strolch/agent/impl/EventCollectingObserverHandler.java +++ b/li.strolch.agent/src/main/java/li/strolch/agent/impl/EventCollectingObserverHandler.java @@ -1,12 +1,12 @@ /* * Copyright 2013 Robert von Burg - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,25 +17,23 @@ package li.strolch.agent.impl; import java.text.MessageFormat; import java.util.List; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import li.strolch.agent.api.Observer; import li.strolch.agent.api.ObserverEvent; import li.strolch.agent.api.ObserverHandler; +import li.strolch.agent.api.StrolchAgent; import li.strolch.model.StrolchRootElement; -import li.strolch.runtime.ThreadPoolFactory; import li.strolch.utils.collections.MapOfLists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A simple {@link ObserverHandler} which keeps a reference to all registered {@link Observer Observers} and notifies * them when one of the notify methods are called - * + * * @author Robert von Burg */ public class EventCollectingObserverHandler implements ObserverHandler { @@ -44,18 +42,19 @@ public class EventCollectingObserverHandler implements ObserverHandler { private MapOfLists observerMap; - private ScheduledExecutorService executorService; private ObserverEvent observerEvent; private ScheduledFuture future; + private StrolchAgent agent; - public EventCollectingObserverHandler() { + public EventCollectingObserverHandler(StrolchAgent agent) { + this.agent = agent; this.observerMap = new MapOfLists<>(); } @Override public void start() { - this.executorService = Executors.newScheduledThreadPool(1, new ThreadPoolFactory("ObserverHandler")); + // nothing to do } @Override @@ -65,20 +64,10 @@ public class EventCollectingObserverHandler implements ObserverHandler { this.future.cancel(false); this.future = null; } + } - if (this.executorService != null) { - this.executorService.shutdown(); - while (!this.executorService.isTerminated()) { - logger.info("Waiting for last update to complete..."); - try { - Thread.sleep(50L); - } catch (InterruptedException e) { - logger.error("Interrupted!"); - } - } - - this.executorService = null; - } + private ScheduledExecutorService getExecutor() { + return this.agent.getScheduledExecutor("Observer"); } @Override @@ -97,7 +86,7 @@ public class EventCollectingObserverHandler implements ObserverHandler { } if (this.future == null || this.future.isDone()) { - this.future = this.executorService.scheduleAtFixedRate(() -> doUpdates(), 100, 100, TimeUnit.MILLISECONDS); + this.future = getExecutor().scheduleAtFixedRate(this::doUpdates, 100, 100, TimeUnit.MILLISECONDS); } } @@ -126,8 +115,7 @@ public class EventCollectingObserverHandler implements ObserverHandler { synchronized (this) { if (this.observerEvent != null) { - this.future = this.executorService.scheduleAtFixedRate(() -> doUpdates(), 100, 100, - TimeUnit.MILLISECONDS); + this.future = getExecutor().scheduleAtFixedRate(this::doUpdates, 100, 100, TimeUnit.MILLISECONDS); } } } diff --git a/li.strolch.agent/src/main/java/li/strolch/agent/impl/InternalStrolchRealm.java b/li.strolch.agent/src/main/java/li/strolch/agent/impl/InternalStrolchRealm.java index d3e981ef2..60331eec9 100644 --- a/li.strolch.agent/src/main/java/li/strolch/agent/impl/InternalStrolchRealm.java +++ b/li.strolch.agent/src/main/java/li/strolch/agent/impl/InternalStrolchRealm.java @@ -1,12 +1,12 @@ /* * Copyright 2013 Robert von Burg - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -101,9 +101,9 @@ public abstract class InternalStrolchRealm implements StrolchRealm { if (this.updateObservers) { String delayedObserversKey = makeRealmKey(getRealm(), PROP_ENABLED_DELAYED_OBSERVER_UPDATES); if (configuration.getBoolean(delayedObserversKey, Boolean.FALSE)) { - this.observerHandler = new DefaultObserverHandler(); + this.observerHandler = new DefaultObserverHandler(container.getAgent()); } else { - this.observerHandler = new EventCollectingObserverHandler(); + this.observerHandler = new EventCollectingObserverHandler(container.getAgent()); logger.info("Enabled Delayed Observer Updates."); } } @@ -136,7 +136,8 @@ public abstract class InternalStrolchRealm implements StrolchRealm { else logger.info("Versioning not enabled for realm " + getRealm()); //$NON-NLS-1$ - logger.info(MessageFormat.format("Using a locking try timeout of {0}s", timeUnit.toSeconds(time))); //$NON-NLS-1$ + logger.info( + MessageFormat.format("Using a locking try timeout of {0}s", timeUnit.toSeconds(time))); //$NON-NLS-1$ } @Override diff --git a/li.strolch.agent/src/main/java/li/strolch/job/StrolchJob.java b/li.strolch.agent/src/main/java/li/strolch/job/StrolchJob.java index e8da50923..8d0d7c175 100644 --- a/li.strolch.agent/src/main/java/li/strolch/job/StrolchJob.java +++ b/li.strolch.agent/src/main/java/li/strolch/job/StrolchJob.java @@ -39,17 +39,18 @@ public abstract class StrolchJob implements Runnable { } protected ScheduledExecutorService getScheduledExecutor() { - return getAgent().getScheduledExecutor(); + return getAgent().getScheduledExecutor("StrolchJob"); } /** * Performs the given {@link PrivilegedRunnable} as the privileged system user * {@link StrolchConstants#SYSTEM_USER_AGENT} - * + * * @param runnable - * the runnable to perform - * + * the runnable to perform + * * @throws PrivilegeException + * if the agent can not perform the action */ protected void runAsAgent(PrivilegedRunnable runnable) throws PrivilegeException { getContainer().getPrivilegeHandler().runAsAgent(runnable); @@ -57,10 +58,10 @@ public abstract class StrolchJob implements Runnable { /** * Opens a {@link StrolchTransaction} for the default realm and certificate - * + * * @param cert - * the certificate authorizing the transaction - * + * the certificate authorizing the transaction + * * @return the newly created transaction */ protected StrolchTransaction openTx(Certificate cert) { diff --git a/li.strolch.rest/src/main/java/li/strolch/rest/DefaultStrolchSessionHandler.java b/li.strolch.rest/src/main/java/li/strolch/rest/DefaultStrolchSessionHandler.java index 89c52579f..00bde911b 100644 --- a/li.strolch.rest/src/main/java/li/strolch/rest/DefaultStrolchSessionHandler.java +++ b/li.strolch.rest/src/main/java/li/strolch/rest/DefaultStrolchSessionHandler.java @@ -1,12 +1,12 @@ /* * Copyright 2013 Robert von Burg - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -97,7 +97,7 @@ public class DefaultStrolchSessionHandler extends StrolchComponent implements St - this.certificateMap.size()) + " had timed out and were removed."); } - this.sessionHandler = getScheduledExecutor() + this.sessionHandler = getScheduledExecutor("SessionHandler") .scheduleWithFixedDelay(this::handleSessions, 5, 1, TimeUnit.MINUTES); super.start(); @@ -263,7 +263,8 @@ public class DefaultStrolchSessionHandler extends StrolchComponent implements St } @Override - public UserSession getSession(Certificate certificate, String sessionId) throws AccessDeniedException, PrivilegeException { + public UserSession getSession(Certificate certificate, String sessionId) + throws AccessDeniedException, PrivilegeException { PrivilegeContext ctx = this.privilegeHandler.validate(certificate); ctx.assertHasPrivilege(PRIVILEGE_GET_SESSION); synchronized (this.certificateMap) { diff --git a/li.strolch.service/src/main/java/li/strolch/execution/EventBasedExecutionHandler.java b/li.strolch.service/src/main/java/li/strolch/execution/EventBasedExecutionHandler.java index 816c511b5..a922ce4ea 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/EventBasedExecutionHandler.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/EventBasedExecutionHandler.java @@ -3,14 +3,9 @@ package li.strolch.execution; import java.util.ResourceBundle; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import li.strolch.agent.api.ComponentContainer; -import li.strolch.execution.command.ExecuteActivityCommand; -import li.strolch.execution.command.SetActionToErrorCommand; -import li.strolch.execution.command.SetActionToExecutedCommand; -import li.strolch.execution.command.SetActionToStoppedCommand; -import li.strolch.execution.command.SetActionToWarningCommand; +import li.strolch.execution.command.*; import li.strolch.execution.policy.ActivityArchivalPolicy; import li.strolch.execution.policy.ExecutionPolicy; import li.strolch.handler.operationslog.LogMessage; @@ -25,7 +20,6 @@ import li.strolch.model.policy.PolicyDef; import li.strolch.persistence.api.StrolchTransaction; import li.strolch.policy.PolicyHandler; import li.strolch.privilege.model.PrivilegeContext; -import li.strolch.runtime.ThreadPoolFactory; import li.strolch.runtime.configuration.ComponentConfiguration; import li.strolch.utils.collections.MapOfSets; import li.strolch.utils.dbc.DBC; @@ -41,8 +35,6 @@ public class EventBasedExecutionHandler extends ExecutionHandler { private static final String KEY_DEFAULT_ACTIVITY_ARCHIVAL = "key:DefaultActivityArchival"; private static final String PROP_RESTART_EXECUTION = "restartExecution"; - private ExecutorService executorService; - private MapOfSets registeredActivities; private DelayedExecutionTimer delayedExecutionTimer; @@ -62,8 +54,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler { @Override public void start() throws Exception { - this.executorService = Executors.newCachedThreadPool(new ThreadPoolFactory("ExecutionHandler")); - this.delayedExecutionTimer = new SimpleDurationExecutionTimer(); + this.delayedExecutionTimer = new SimpleDurationExecutionTimer(getContainer().getAgent()); // restart execution of activities already in execution if (!getConfiguration().getBoolean(PROP_RESTART_EXECUTION, Boolean.FALSE)) { @@ -79,15 +70,6 @@ public class EventBasedExecutionHandler extends ExecutionHandler { @Override public void stop() throws Exception { - if (this.executorService != null) { - this.executorService.shutdown(); - while (!this.executorService.isTerminated()) { - logger.info("Waiting for executor service to terminate..."); - Thread.sleep(50L); - } - this.executorService = null; - } - if (this.delayedExecutionTimer != null) { this.delayedExecutionTimer.destroy(); this.delayedExecutionTimer = null; @@ -169,7 +151,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler { @Override public void toExecution(String realm, Locator locator) { - this.executorService.execute(() -> { + getExecutor().execute(() -> { try { runAsAgent(ctx -> { toExecution(realm, locator, ctx); @@ -186,9 +168,13 @@ public class EventBasedExecutionHandler extends ExecutionHandler { }); } + private ExecutorService getExecutor() { + return getExecutorService("ExecutionHandler"); + } + @Override public void toExecuted(String realm, Locator locator) { - this.executorService.execute(() -> { + getExecutor().execute(() -> { try { runAsAgent(ctx -> { toExecuted(realm, locator, ctx); @@ -207,7 +193,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler { @Override public void toStopped(String realm, Locator locator) { - this.executorService.execute(() -> { + getExecutor().execute(() -> { try { runAsAgent(ctx -> { toStopped(realm, locator, ctx); @@ -226,7 +212,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler { @Override public void toError(String realm, Locator locator) { - this.executorService.execute(() -> { + getExecutor().execute(() -> { try { runAsAgent(ctx -> { toError(realm, locator, ctx); @@ -245,7 +231,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler { @Override public void toWarning(String realm, Locator locator) { - this.executorService.execute(() -> { + getExecutor().execute(() -> { try { runAsAgent(ctx -> { toWarning(realm, locator, ctx); @@ -264,7 +250,7 @@ public class EventBasedExecutionHandler extends ExecutionHandler { @Override public void archiveActivity(String realm, Locator activityLoc) { - this.executorService.execute(() -> { + getExecutor().execute(() -> { try { runAsAgent(ctx -> { try (StrolchTransaction tx = openTx(realm, ctx.getCertificate(), ActivityArchivalPolicy.class)) { diff --git a/li.strolch.service/src/main/java/li/strolch/execution/SimpleDurationExecutionTimer.java b/li.strolch.service/src/main/java/li/strolch/execution/SimpleDurationExecutionTimer.java index 959540c3e..9e9ab0dfd 100644 --- a/li.strolch.service/src/main/java/li/strolch/execution/SimpleDurationExecutionTimer.java +++ b/li.strolch.service/src/main/java/li/strolch/execution/SimpleDurationExecutionTimer.java @@ -2,48 +2,32 @@ package li.strolch.execution; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import li.strolch.agent.api.ComponentContainer; +import li.strolch.agent.api.StrolchAgent; +import li.strolch.model.Locator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import li.strolch.agent.api.ComponentContainer; -import li.strolch.model.Locator; -import li.strolch.runtime.ThreadPoolFactory; - public class SimpleDurationExecutionTimer implements DelayedExecutionTimer { private static final Logger logger = LoggerFactory.getLogger(SimpleDurationExecutionTimer.class); private Map> simulationTasks; - private ScheduledExecutorService scheduledExecutorService; + private StrolchAgent agent; - public SimpleDurationExecutionTimer() { - this.scheduledExecutorService = Executors.newScheduledThreadPool(0, new ThreadPoolFactory("DurationExecution")); + public SimpleDurationExecutionTimer(StrolchAgent agent) { + this.agent = agent; this.simulationTasks = new HashMap<>(); } @Override public void destroy() { - this.simulationTasks.values().forEach(task -> task.cancel(false)); - - if (this.scheduledExecutorService != null) { - this.scheduledExecutorService.shutdown(); - while (!this.scheduledExecutorService.isTerminated()) { - logger.info("Waiting for executor service to terminate..."); - try { - Thread.sleep(50L); - } catch (InterruptedException e) { - logger.warn("Interrupted!"); - } - } - this.scheduledExecutorService = null; - } } @Override @@ -59,12 +43,15 @@ public class SimpleDurationExecutionTimer implements DelayedExecutionTimer { @Override public void execute(String realm, ComponentContainer container, Locator actionLocator, long duration) { SimulationTask task = new SimulationTask(realm, container, actionLocator); - ScheduledFuture future = this.scheduledExecutorService.schedule(task, duration, TimeUnit.MILLISECONDS); + ScheduledFuture future = getExecutor().schedule(task, duration, TimeUnit.MILLISECONDS); this.simulationTasks.put(actionLocator, future); - logger.info("Registered execution timer for " + actionLocator); } + private ScheduledExecutorService getExecutor() { + return this.agent.getScheduledExecutor("DurationExecution"); + } + private void executed(String realm, ComponentContainer container, Locator locator) { logger.info("Completing execution for " + locator); diff --git a/li.strolch.service/src/main/java/li/strolch/service/executor/ServiceExecutionHandler.java b/li.strolch.service/src/main/java/li/strolch/service/executor/ServiceExecutionHandler.java index 59d880759..b904b19fc 100644 --- a/li.strolch.service/src/main/java/li/strolch/service/executor/ServiceExecutionHandler.java +++ b/li.strolch.service/src/main/java/li/strolch/service/executor/ServiceExecutionHandler.java @@ -1,12 +1,12 @@ /* * Copyright 2015 Robert von Burg - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -82,7 +82,7 @@ public class ServiceExecutionHandler extends StrolchComponent { ServiceExecutionStatus status = new ServiceExecutionStatus(serviceName); this.serviceContextMap.put(serviceName, status); - getExecutorService().execute(() -> doService(svcCtx)); + getExecutorService("ServiceExecution").execute(() -> doService(svcCtx)); Thread.sleep(20L); return status;