diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/CommandKey.java b/li.strolch.utils/src/main/java/li/strolch/communication/CommandKey.java deleted file mode 100644 index ce78c316e..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/CommandKey.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication; - -import li.strolch.utils.helper.StringHelper; - -/** - * @author Robert von Burg <eitch@eitchnet.ch> - */ -public class CommandKey { - private final String key1; - private final String key2; - private final int hashCode; - - /** - * @param key1 - * key1 - * @param key2 - * key2 - */ - public CommandKey(String key1, String key2) { - this.key1 = key1; - this.key2 = key2; - - final int prime = 31; - int result = 1; - result = prime * result + ((this.key1 == null) ? 0 : this.key1.hashCode()); - result = prime * result + ((this.key2 == null) ? 0 : this.key2.hashCode()); - this.hashCode = result; - } - - public static CommandKey key(String key1, String key2) { - return new CommandKey(key1, key2); - } - - @Override - public int hashCode() { - return this.hashCode; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - CommandKey other = (CommandKey) obj; - return this.key1.equals(other.key1) && this.key2.equals(other.key2); - } - - @Override - public String toString() { - return this.key1 + StringHelper.COLON + this.key2; - } -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/CommunicationConnection.java b/li.strolch.utils/src/main/java/li/strolch/communication/CommunicationConnection.java deleted file mode 100644 index 0ef429306..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/CommunicationConnection.java +++ /dev/null @@ -1,412 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication; - -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingDeque; -import java.util.concurrent.LinkedBlockingDeque; - -import li.strolch.communication.IoMessage.State; -import li.strolch.utils.collections.MapOfLists; -import li.strolch.utils.dbc.DBC; -import li.strolch.utils.helper.StringHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author Robert von Burg <eitch@eitchnet.ch> - */ -public class CommunicationConnection implements Runnable { - - protected static final Logger logger = LoggerFactory.getLogger(CommunicationConnection.class); - - private String id; - private ConnectionMode mode; - private Map parameters; - - private ConnectionState state; - private String stateMsg; - - private BlockingDeque messageQueue; - private Thread queueThread; - private volatile boolean run; - private MapOfLists connectionObservers; - private List connectionStateObservers; - - private CommunicationEndpoint endpoint; - private IoMessageVisitor messageVisitor; - - private IoMessageArchive archive; - - public CommunicationConnection(String id, ConnectionMode mode, Map parameters, - CommunicationEndpoint endpoint, IoMessageVisitor messageVisitor) { - - DBC.PRE.assertNotEmpty("Id must be set!", id); //$NON-NLS-1$ - DBC.PRE.assertNotNull("ConnectionMode must be set!", mode); //$NON-NLS-1$ - DBC.PRE.assertNotNull("Paramerters must not be null!", parameters); //$NON-NLS-1$ - DBC.PRE.assertNotNull("Endpoint must be set!", endpoint); //$NON-NLS-1$ - DBC.PRE.assertNotNull("IoMessageVisitor must be set!", messageVisitor); //$NON-NLS-1$ - - this.id = id; - this.mode = mode; - this.parameters = parameters; - this.endpoint = endpoint; - this.messageVisitor = messageVisitor; - - this.state = ConnectionState.CREATED; - this.stateMsg = this.state.toString(); - this.messageQueue = new LinkedBlockingDeque<>(); - this.connectionObservers = new MapOfLists<>(); - this.connectionStateObservers = new ArrayList<>(); - } - - public void setArchive(IoMessageArchive archive) { - this.archive = archive; - } - - public IoMessageArchive getArchive() { - return this.archive; - } - - public String getId() { - return this.id; - } - - public int getQueueSize() { - return this.messageQueue.size(); - } - - public ConnectionState getState() { - return this.state; - } - - public String getStateMsg() { - return this.stateMsg; - } - - public ConnectionMode getMode() { - return this.mode; - } - - public Map getParameters() { - return this.parameters; - } - - public void clearQueue() { - this.messageQueue.clear(); - } - - public void addConnectionObserver(CommandKey key, ConnectionObserver observer) { - synchronized (this.connectionObservers) { - this.connectionObservers.addElement(key, observer); - } - } - - public void removeConnectionObserver(CommandKey key, ConnectionObserver observer) { - synchronized (this.connectionObservers) { - this.connectionObservers.removeElement(key, observer); - } - } - - public void addConnectionStateObserver(ConnectionStateObserver observer) { - synchronized (this.connectionStateObservers) { - this.connectionStateObservers.add(observer); - } - } - - public void removeConnectionStateObserver(ConnectionStateObserver observer) { - synchronized (this.connectionStateObservers) { - this.connectionStateObservers.remove(observer); - } - } - - public void notifyStateChange(ConnectionState state, String stateMsg) { - ConnectionState oldState = this.state; - String oldStateMsg = this.stateMsg; - this.state = state; - this.stateMsg = stateMsg; - - List observers; - synchronized (this.connectionStateObservers) { - observers = new ArrayList<>(this.connectionStateObservers); - } - for (ConnectionStateObserver observer : observers) { - observer.notify(oldState, oldStateMsg, state, stateMsg); - } - } - - public void switchMode(ConnectionMode mode) { - ConnectionMessages.assertConfigured(this, "Can not switch modes yet!"); //$NON-NLS-1$ - if (mode == ConnectionMode.OFF) { - stop(); - } else if (mode == ConnectionMode.ON) { - stop(); - start(); - } - - this.mode = mode; - } - - /** - * Configure the underlying {@link CommunicationEndpoint} and {@link IoMessageVisitor} - */ - public void configure() { - this.messageVisitor.configure(this); - this.endpoint.configure(this, this.messageVisitor); - notifyStateChange(ConnectionState.INITIALIZED, ConnectionState.INITIALIZED.name()); - } - - public void start() { - ConnectionMessages.assertConfigured(this, "Can not start yet!"); //$NON-NLS-1$ - - switch (this.mode) { - case OFF: - logger.info("Not connecting as mode is currently OFF"); //$NON-NLS-1$ - break; - case SIMULATION: - logger.info("Started SIMULATION connection!"); //$NON-NLS-1$ - break; - case ON: - if (this.queueThread != null) { - logger.warn(MessageFormat.format("{0}: Already connected!", this.id)); //$NON-NLS-1$ - } else { - logger.info(MessageFormat - .format("Starting Connection {0} to {1}...", this.id, getRemoteUri())); //$NON-NLS-1$ - this.run = true; - this.queueThread = new Thread(this, MessageFormat.format("{0}_OUT", this.id)); //$NON-NLS-1$ - this.queueThread.start(); - - connectEndpoint(); - } - break; - default: - logger.error("Unhandled mode " + this.mode); //$NON-NLS-1$ - break; - } - } - - public void stop() { - ConnectionMessages.assertConfigured(this, "Can not stop yet!"); //$NON-NLS-1$ - - switch (this.mode) { - case OFF: - break; - case SIMULATION: - logger.info("Disconnected SIMULATION connection!"); //$NON-NLS-1$ - break; - case ON: - logger.info("Disconnecting..."); //$NON-NLS-1$ - if (this.queueThread == null) { - logger.warn(MessageFormat.format("{0}: Already disconnected!", this.id)); //$NON-NLS-1$ - } else { - this.run = false; - - try { - disconnectEndpoint(); - } catch (Exception e) { - String msg = "Caught exception while disconnecting endpoint: {0}"; //$NON-NLS-1$ - logger.error(MessageFormat.format(msg, e.getLocalizedMessage()), e); - } - - try { - this.queueThread.interrupt(); - } catch (Exception e) { - String msg = "Caught exception while stopping queue thread: {0}"; //$NON-NLS-1$ - logger.warn(MessageFormat.format(msg, e.getLocalizedMessage())); - } - String msg = "{0} is stopped"; //$NON-NLS-1$ - logger.info(MessageFormat.format(msg, this.queueThread.getName())); - this.queueThread = null; - } - break; - default: - logger.error("Unhandled mode " + this.mode); //$NON-NLS-1$ - break; - } - } - - /** - * Called by the underlying endpoint when a new message has been received and parsed - * - * @param message - * the message to handle - */ - public void handleNewMessage(IoMessage message) { - ConnectionMessages.assertConfigured(this, "Can not be notified of new message yet!"); //$NON-NLS-1$ - logger.info("Received new message " + message.getKey() + " " + message.getId() + " " + message.getState()); - - // if the state of the message is already later than ACCEPTED - // then an underlying component has already set the state, so - // we don't need to set it - if (message.getState().compareTo(State.ACCEPTED) < 0) - message.setState(State.ACCEPTED, StringHelper.DASH); - - notifyObservers(message); - } - - public void notifyObservers(IoMessage message) { - - List observers; - synchronized (this.connectionObservers) { - List list = this.connectionObservers.getList(message.getKey()); - if (list == null || list.isEmpty()) { - logger.info("No observers waiting for key " + message.getKey()); - return; - } - - observers = new ArrayList<>(list); - } - - for (ConnectionObserver observer : observers) { - try { - logger.info("Notifying observer " + observer.getClass().getSimpleName()); - observer.notify(message.getKey(), message); - } catch (Exception e) { - String msg = "Failed to notify observer for key {0} on message with id {1}"; //$NON-NLS-1$ - logger.error(MessageFormat.format(msg, message.getKey(), message.getId()), e); - } - } - - if (this.archive != null) - this.archive.archive(message); - } - - @Override - public void run() { - while (this.run) { - - IoMessage message = null; - - try { - - message = this.messageQueue.take(); - logger.info(MessageFormat.format("Processing message {0}...", message.getId())); //$NON-NLS-1$ - - if (this.mode == ConnectionMode.ON) - this.endpoint.send(message); - else if (this.mode == ConnectionMode.SIMULATION) - this.endpoint.simulate(message); - - // notify the caller that the message has been processed - if (message.getState().compareTo(State.DONE) < 0) - message.setState(State.DONE, StringHelper.DASH); - - done(message); - - } catch (InterruptedException e) { - logger.warn(MessageFormat.format("{0} connection has been interruped!", this.id)); //$NON-NLS-1$ - - // an interrupted exception means the thread must stop - this.run = false; - - if (message != null) { - logger.error(MessageFormat.format("Can not send message {0}", message.getId())); //$NON-NLS-1$ - message.setState(State.FATAL, e.getLocalizedMessage()); - done(message); - } - - } catch (Exception e) { - logger.error(e.getMessage(), e); - - if (message != null) { - logger.error(MessageFormat.format("Can not send message {0}", message.getId())); //$NON-NLS-1$ - message.setState(State.FATAL, e.getLocalizedMessage()); - done(message); - } - } finally { - if (message != null && this.archive != null) { - this.archive.archive(message); - } - } - } - } - - /** - * Called when an outgoing message has been handled. This method logs the message state and then notifies all - * observers - * - * @param message - * the message which is done - */ - public void done(IoMessage message) { - ConnectionMessages.assertConfigured(this, "Can not notify observers yet!"); //$NON-NLS-1$ - - switch (message.getState()) { - case ACCEPTED: - case CREATED: - case DONE: - case PENDING: - logger.info(MessageFormat.format("Sent message {0}", message.toString())); //$NON-NLS-1$ - break; - case FAILED: - case FATAL: - logger.error(MessageFormat.format("Failed to send message {0}", message.toString())); //$NON-NLS-1$ - break; - default: - logger.error(MessageFormat.format("Unhandled state for message {0}", message.toString())); //$NON-NLS-1$ - break; - } - - notifyObservers(message); - } - - public String getRemoteUri() { - return this.endpoint == null ? "0.0.0.0:0" : this.endpoint.getRemoteUri(); //$NON-NLS-1$ - } - - public String getLocalUri() { - return this.endpoint == null ? "0.0.0.0:0" : this.endpoint.getLocalUri(); //$NON-NLS-1$ - } - - public void reset() { - ConnectionMessages.assertConfigured(this, "Can not resest yet!"); //$NON-NLS-1$ - this.endpoint.reset(); - } - - /** - * Called when the connection is connected, thus the underlying endpoint can be started - */ - protected void connectEndpoint() { - this.endpoint.start(); - } - - /** - * Called when the connection is disconnected, thus the underlying endpoint must be stopped - */ - protected void disconnectEndpoint() { - this.endpoint.stop(); - } - - /** - * Send the message using the underlying endpoint. Do not change the state of the message, this will be done by the - * caller - * - * @param message - * the message to send - */ - public void send(IoMessage message) { - ConnectionMessages.assertConfigured(this, "Can not send yet"); //$NON-NLS-1$ - if (this.mode == ConnectionMode.OFF) - throw ConnectionMessages.throwNotConnected(this, message); - - message.setState(State.PENDING, State.PENDING.name()); - - this.messageQueue.add(message); - } -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/CommunicationEndpoint.java b/li.strolch.utils/src/main/java/li/strolch/communication/CommunicationEndpoint.java deleted file mode 100644 index 6c0ccf0c4..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/CommunicationEndpoint.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication; - -/** - * @author Robert von Burg <eitch@eitchnet.ch> - */ -public interface CommunicationEndpoint { - - public void configure(CommunicationConnection connection, IoMessageVisitor converter); - - public void start(); - - public void stop(); - - public void reset(); - - public String getLocalUri(); - - public String getRemoteUri(); - - public void send(IoMessage message) throws Exception; - - public void simulate(IoMessage message) throws Exception; -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/ConnectionException.java b/li.strolch.utils/src/main/java/li/strolch/communication/ConnectionException.java deleted file mode 100644 index 9db01c3d7..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/ConnectionException.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication; - -/** - * Base Exception for exceptions thrown by the communication classes - * - * @author Robert von Burg <eitch@eitchnet.ch> - */ -public class ConnectionException extends RuntimeException { - - public ConnectionException(String message, Throwable cause) { - super(message, cause); - } - - public ConnectionException(String message) { - super(message); - } -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/ConnectionInfo.java b/li.strolch.utils/src/main/java/li/strolch/communication/ConnectionInfo.java deleted file mode 100644 index f6fc124c0..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/ConnectionInfo.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Copyright (c) 2012, Robert von Burg - * - * All rights reserved. - * - * This file is part of the XXX. - * - * XXX is free software: you can redistribute - * it and/or modify it under the terms of the GNU General Public License as - * published by the Free Software Foundation, either version 3 of the License, - * or (at your option) any later version. - * - * XXX is distributed in the hope that it will - * be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with XXX. If not, see - * . - */ -package li.strolch.communication; - -/** - * @author Robert von Burg <eitch@eitchnet.ch> - */ -public class ConnectionInfo { - - private String id; - private String localUri; - private String remoteUri; - private ConnectionMode mode; - private int queueSize; - private ConnectionState state; - private String stateMsg; - - /** - * @return the id - */ - public String getId() { - return this.id; - } - - /** - * @param id - * the id to set - */ - public void setId(String id) { - this.id = id; - } - - /** - * @return the localUri - */ - public String getLocalUri() { - return this.localUri; - } - - /** - * @param localUri - * the localUri to set - */ - public void setLocalUri(String localUri) { - this.localUri = localUri; - } - - /** - * @return the remoteUri - */ - public String getRemoteUri() { - return this.remoteUri; - } - - /** - * @param remoteUri - * the remoteUri to set - */ - public void setRemoteUri(String remoteUri) { - this.remoteUri = remoteUri; - } - - /** - * @return the mode - */ - public ConnectionMode getMode() { - return this.mode; - } - - /** - * @param mode - * the mode to set - */ - public void setMode(ConnectionMode mode) { - this.mode = mode; - } - - /** - * @return the state - */ - public ConnectionState getState() { - return this.state; - } - - /** - * @param state - * the state to set - */ - public void setState(ConnectionState state) { - this.state = state; - } - - /** - * @return the stateMsg - */ - public String getStateMsg() { - return this.stateMsg; - } - - /** - * @param stateMsg - * the stateMsg to set - */ - public void setStateMsg(String stateMsg) { - this.stateMsg = stateMsg; - } - - /** - * @return the queueSize - */ - public int getQueueSize() { - return this.queueSize; - } - - /** - * @param queueSize - * the queueSize to set - */ - public void setQueueSize(int queueSize) { - this.queueSize = queueSize; - } - - public static ConnectionInfo valueOf(CommunicationConnection connection) { - ConnectionInfo info = new ConnectionInfo(); - info.setId(connection.getId()); - info.setLocalUri(connection.getLocalUri()); - info.setRemoteUri(connection.getRemoteUri()); - info.setMode(connection.getMode()); - info.setState(connection.getState()); - info.setStateMsg(connection.getStateMsg()); - info.setQueueSize(connection.getQueueSize()); - return info; - } -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/ConnectionMessages.java b/li.strolch.utils/src/main/java/li/strolch/communication/ConnectionMessages.java deleted file mode 100644 index 67c1cead0..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/ConnectionMessages.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication; - -import java.text.MessageFormat; -import java.util.HashMap; -import java.util.Map; - -import li.strolch.utils.helper.StringHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Helper class to thrown connection messages - * - * @author Robert von Burg <eitch@eitchnet.ch> - */ -public class ConnectionMessages { - - private static Logger logger = LoggerFactory.getLogger(ConnectionMessages.class); - - /** - * Utility class - */ - private ConnectionMessages() { - // - } - - /** - * Convenience method to throw an exception when an illegal {@link ConnectionState} change occurs - * - * @param current - * the current state - * @param change - * the new state - * - * @return the exception - */ - public static ConnectionException throwIllegalConnectionState(ConnectionState current, ConnectionState change) { - String msg = "The connection with state {0} cannot be changed to {1}"; //$NON-NLS-1$ - msg = MessageFormat.format(msg, current.name(), change.name()); - ConnectionException e = new ConnectionException(msg); - return e; - } - - /** - * Convenience method to throw an exception when an invalid parameter is set in the configuration - * - * @param clazz - * clazz type - * @param parameterName - * the name of the parameter - * @param parameterValue - * the value of the parameter - * - * @return the exception - */ - public static ConnectionException throwInvalidParameter(Class clazz, String parameterName, - String parameterValue) { - String value; - if (parameterValue != null && !parameterValue.isEmpty()) - value = parameterValue; - else - value = StringHelper.NULL; - - String msg = "{0}: parameter ''{1}'' has invalid value ''{2}''"; //$NON-NLS-1$ - msg = MessageFormat.format(msg, clazz.getSimpleName(), parameterName, value); - ConnectionException e = new ConnectionException(msg); - return e; - } - - /** - * Convenience method to throw an exception when an two conflicting parameters are activated - * - * @param clazz - * clazz type - * @param parameter1 - * parameter 1 - * @param parameter2 - * parameter 2 - * - * @return the exception - */ - public static ConnectionException throwConflictingParameters(Class clazz, String parameter1, String parameter2) { - String msg = "{0} : The parameters {1} and {2} can not be both activated as they conflict"; //$NON-NLS-1$ - msg = MessageFormat.format(msg, clazz.getSimpleName(), parameter1, parameter1); - ConnectionException e = new ConnectionException(msg); - return e; - } - - /** - * Convenience method to log a warning when a parameter is not set in the configuration - * - * @param clazz - * the clazz of the warning - * @param parameterName - * the parameter name - * @param defValue - * the default value to be used - */ - public static void warnUnsetParameter(Class clazz, String parameterName, String defValue) { - if (logger.isDebugEnabled()) { - String msg = "{0}: parameter ''{1}'' is not set, using default value ''{2}''"; //$NON-NLS-1$ - msg = MessageFormat.format(msg, clazz.getSimpleName(), parameterName, defValue); - Map properties = new HashMap<>(); - logger.warn(MessageFormat.format(msg, properties)); - } - } - - /** - * Convenience method to throw an exception when the connection is not yet configured - * - * @param connection - * the connection - * @param message - * the message - */ - public static void assertConfigured(CommunicationConnection connection, String message) { - if (connection.getState() == ConnectionState.CREATED) { - String msg = "{0} : Not yet configured: {1}"; //$NON-NLS-1$ - msg = MessageFormat.format(msg, connection.getId(), message); - throw new ConnectionException(msg); - } - } - - /** - * Convenience method to throw an exception when the connection is not connected - * - * @param connection - * the connection - * @param message - * the message - * - * @return the exception - */ - public static ConnectionException throwNotConnected(CommunicationConnection connection, String message) { - String msg = "{0} : Not connected: {1}"; //$NON-NLS-1$ - msg = MessageFormat.format(msg, connection.getId(), message); - ConnectionException e = new ConnectionException(msg); - return e; - } - - /** - * Convenience method to throw an exception when the connection is not connected - * - * @param connection - * the connection - * @param message - * the message - * - * @return the exception - */ - public static ConnectionException throwNotConnected(CommunicationConnection connection, IoMessage message) { - String msg = "{0} : Not connected, can not send message with id {1}"; //$NON-NLS-1$ - msg = MessageFormat.format(msg, connection.getId(), message.getId()); - ConnectionException e = new ConnectionException(msg); - return e; - } - - public static void assertLegalMessageVisitor(Class endpoint, - Class expectedVisitor, IoMessageVisitor actualVisitor) { - if (!(expectedVisitor.isAssignableFrom(actualVisitor.getClass()))) { - String msg = "{0} requires {1} but has received illegal type {2}"; //$NON-NLS-1$ - msg = MessageFormat - .format(msg, endpoint.getName(), expectedVisitor.getName(), actualVisitor.getClass().getName()); - throw new ConnectionException(msg); - } - } -} \ No newline at end of file diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/ConnectionMode.java b/li.strolch.utils/src/main/java/li/strolch/communication/ConnectionMode.java deleted file mode 100644 index 8952cb73d..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/ConnectionMode.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication; - -import java.io.IOException; - -/** - *

- * The mode of an {@link CommunicationConnection} can be one of the following enum values. This makes it possible use - * the connection without starting connection and later starting the connection when required - *

- * The modes have the following semantics: - *
    - *
  • OFF - the connection can only have states {@link ConnectionState#CREATED} and {@link - * ConnectionState#INITIALIZED} - * . Trying to use the connection will throw an exception
  • - *
  • ON - the connection can be used normally
  • - *
  • SIMULATION - the same as ON, with the difference that the connection should silently drop any work
  • - *
- * - * @author Robert von Burg <eitch@eitchnet.ch> - */ -public enum ConnectionMode { - - /** - * Denotes that the {@link CommunicationConnection} is off. This means it cannot accept messages, process messages - * or do any other kind of work - */ - OFF { - @Override - public boolean isSimulation() { - return false; - } - }, - - /** - * Denotes that the {@link CommunicationConnection} is on. This means that the {@link CommunicationConnection} - * accepts and process messages. Any connections which need to be established will automatically be connected and - * re-established should an {@link IOException} occur - */ - ON { - @Override - public boolean isSimulation() { - return false; - } - }, - - /** - * Denotes that the {@link CommunicationConnection} is in simulation mode. Mostly this means that the {@link - * CommunicationConnection} accepts messages, but silently swallows them, instead of processing them - */ - SIMULATION { - @Override - public boolean isSimulation() { - return true; - } - }; - - /** - * @return true if the current mode is simulation, false otherwise - */ - public abstract boolean isSimulation(); -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/ConnectionObserver.java b/li.strolch.utils/src/main/java/li/strolch/communication/ConnectionObserver.java deleted file mode 100644 index eb5ec6aa3..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/ConnectionObserver.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication; - -/** - * @author Robert von Burg <eitch@eitchnet.ch> - */ -public interface ConnectionObserver { - - public void notify(CommandKey key, IoMessage message); -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/ConnectionState.java b/li.strolch.utils/src/main/java/li/strolch/communication/ConnectionState.java deleted file mode 100644 index 402d8e9d7..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/ConnectionState.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication; - -/** - *

- * a {@link CommunicationConnection} undergoes a serious of state changes. These states can be viewed by a client to - * monitor the state of the connection - *

- * The states have the following semantics: - *
    - *
  • CREATED - initial state
  • - *
  • INITIALIZED - the appropriate connection parameters are found
  • - *
  • CONNECTING - the connection is trying to build up a connection
  • - *
  • WAITING - the connection is waiting before retrying to connect
  • - *
  • CONNECTED - the connection has just been established
  • - *
  • IDLE - the connection is connected, but waiting for work
  • - *
  • WORKING - the connection is working
  • - *
  • BROKEN - the connection has lost the connection and is waiting before reconnecting, or another unknown failure - * occurred
  • - *
  • DISCONNECTED - the connection has been disconnected
  • - *
- * - * @author Robert von Burg <eitch@eitchnet.ch> - */ -public enum ConnectionState { - - // initial - CREATED, - // configured and ready to connect - INITIALIZED, - // working - CONNECTING, - WAITING, - CONNECTED, - IDLE, - WORKING, - BROKEN, - // disconnected due to connection error or manual disconnect/stop - DISCONNECTED; -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/ConnectionStateObserver.java b/li.strolch.utils/src/main/java/li/strolch/communication/ConnectionStateObserver.java deleted file mode 100644 index 156dd1056..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/ConnectionStateObserver.java +++ /dev/null @@ -1,6 +0,0 @@ -package li.strolch.communication; - -public interface ConnectionStateObserver { - - public void notify(ConnectionState oldState, String oldStateMsg, ConnectionState newState, String newStateMsg); -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/IoMessage.java b/li.strolch.utils/src/main/java/li/strolch/communication/IoMessage.java deleted file mode 100644 index d21401f85..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/IoMessage.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication; - -import java.util.Date; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - -import li.strolch.utils.helper.StringHelper; -import li.strolch.utils.iso8601.ISO8601FormatFactory; - -/** - *

- * An {@link IoMessage} is the object containing the data to be transmitted in IO. Implementations of {@link - * CommunicationConnection} should implement sub classes of this method to hold the actual payload. - *

- * - *

- * This class also contains a {@link Map} to store transient meta data to the actual payload - *

- * - * @author Robert von Burg <eitch@eitchnet.ch> - */ -public class IoMessage { - - private final String id; - private final CommandKey key; - private final String connectionId; - private Date updated; - private State state; - private String stateMsg; - private Map parameters; - - /** - * @param id - * the id - * @param key - * the key - * @param connectionId - * the connection - */ - public IoMessage(String id, CommandKey key, String connectionId) { - this.id = id; - this.key = key; - this.connectionId = connectionId; - this.state = State.CREATED; - this.stateMsg = StringHelper.DASH; - this.updated = new Date(); - this.parameters = new HashMap<>(); - } - - /** - * @return the id - */ - public String getId() { - return this.id; - } - - /** - * @return the key - */ - public CommandKey getKey() { - return this.key; - } - - /** - * @return the connectionId - */ - public String getConnectionId() { - return this.connectionId; - } - - /** - * @return the updated - */ - public Date getUpdated() { - return this.updated; - } - - /** - * Used for testing purposes only! - * - * @param date - */ - void setUpdated(Date date) { - this.updated = date; - } - - /** - * @return the state - */ - public State getState() { - return this.state; - } - - /** - * @return the stateMsg - */ - public String getStateMsg() { - return this.stateMsg; - } - - /** - * @param state - * the state - * @param stateMsg - * the stateMsg - */ - public void setState(State state, String stateMsg) { - this.state = state; - this.stateMsg = stateMsg; - this.updated = new Date(); - } - - @SuppressWarnings("unchecked") - public T getParam(String key) { - return (T) this.parameters.get(key); - } - - /** - * Add a transient parameter to this message - * - * @param key - * the key under which the parameter is to be stored - * @param value - * the value to store under the given key - */ - public void addParam(String key, Object value) { - this.parameters.put(key, value); - } - - /** - * Removes the parameter with the given key - * - * @param key - * The give of the parameter to be removed - * - * @return the removed value, or null if the object didn't exist - */ - @SuppressWarnings("unchecked") - public T removeParam(String key) { - return (T) this.parameters.remove(key); - } - - /** - * @return the set of parameter keys - */ - public Set getParamKeys() { - return this.parameters.keySet(); - } - - @SuppressWarnings("nls") - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append(this.getClass().getSimpleName() + " [id="); - builder.append(this.id); - builder.append(", key="); - builder.append(this.key); - builder.append(", updated="); - builder.append(ISO8601FormatFactory.getInstance().formatDate(this.updated)); - builder.append(", state="); - builder.append(this.state); - if (StringHelper.isNotEmpty(this.stateMsg)) { - builder.append(", stateMsg="); - builder.append(this.stateMsg); - } - builder.append("]"); - return builder.toString(); - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((this.id == null) ? 0 : this.id.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - IoMessage other = (IoMessage) obj; - if (this.id == null) { - if (other.id != null) - return false; - } else if (!this.id.equals(other.id)) - return false; - return true; - } - - public enum State { - CREATED, // new - PENDING, // outbound - ACCEPTED, // inbound - DONE, // completed - FAILED, // completed - FATAL; // not sendable - } -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/IoMessageArchive.java b/li.strolch.utils/src/main/java/li/strolch/communication/IoMessageArchive.java deleted file mode 100644 index 8102666b3..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/IoMessageArchive.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication; - -import java.util.List; - -public interface IoMessageArchive { - - public int getMaxSize(); - - public void setMaxSize(int maxSize); - - public int getTrimSize(); - - public void setTrimSize(int trimSize); - - public int size(); - - public List getAll(); - - public List getBy(String connectionId); - - public List getBy(String connectionId, CommandKey key); - - public void clearArchive(); - - public void archive(IoMessage message); -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/IoMessageStateObserver.java b/li.strolch.utils/src/main/java/li/strolch/communication/IoMessageStateObserver.java deleted file mode 100644 index 075a8e533..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/IoMessageStateObserver.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2013 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication; - -import li.strolch.communication.IoMessage.State; - -/** - * @author Robert von Burg <eitch@eitchnet.ch> - */ -public class IoMessageStateObserver implements ConnectionObserver { - - private CommandKey key; - private String messageId; - private State state; - - public IoMessageStateObserver(CommandKey key, String messageId) { - this.key = key; - this.messageId = messageId; - } - - @Override - public void notify(CommandKey key, IoMessage message) { - if (this.key.equals(key) && message.getId().equals(this.messageId)) { - this.state = message.getState(); - - synchronized (this) { - notifyAll(); - } - } - } - - public State getState() { - return this.state; - } -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/IoMessageVisitor.java b/li.strolch.utils/src/main/java/li/strolch/communication/IoMessageVisitor.java deleted file mode 100644 index 87bb8b48b..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/IoMessageVisitor.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication; - -import li.strolch.communication.console.ConsoleMessageVisitor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - *

- * Visitors to read and write {@link IoMessage} using different kind of endpoints. Different endpoints will require - * different ways of writing or reading message, thus this is not defined here. Known extensions are {@link - * ConsoleMessageVisitor}, {@link StreamMessageVisitor}. - *

- * - *

- * Concrete implementations must be thread safe! - *

- * - * @author Robert von Burg <eitch@eitchnet.ch> - */ -public abstract class IoMessageVisitor { - - protected static final Logger logger = LoggerFactory.getLogger(IoMessageVisitor.class); - - protected CommunicationConnection connection; - - public void configure(CommunicationConnection connection) { - this.connection = connection; - } - - public void simulate(IoMessage ioMessage) { - // allow for subclasses to implement - } -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/SimpleMessageArchive.java b/li.strolch.utils/src/main/java/li/strolch/communication/SimpleMessageArchive.java deleted file mode 100644 index 7177224bb..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/SimpleMessageArchive.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication; - -import java.util.*; - -public class SimpleMessageArchive implements IoMessageArchive { - - private int maxSize; - private int trimSize; - private TreeSet messageArchive; - - public SimpleMessageArchive() { - this(1000, 100); - } - - public SimpleMessageArchive(int maxSize, int trimSize) { - this.maxSize = maxSize; - this.trimSize = trimSize; - - this.messageArchive = new TreeSet<>(new Comparator() { - @Override - public int compare(IoMessage o1, IoMessage o2) { - return o1.getUpdated().compareTo(o2.getUpdated()); - } - }); - } - - @Override - public synchronized int getMaxSize() { - return this.maxSize; - } - - @Override - public synchronized void setMaxSize(int maxSize) { - this.maxSize = maxSize; - trim(); - } - - @Override - public synchronized int getTrimSize() { - return this.trimSize; - } - - @Override - public synchronized void setTrimSize(int trimSize) { - this.trimSize = trimSize; - } - - @Override - public synchronized int size() { - return this.messageArchive.size(); - } - - @Override - public synchronized List getAll() { - List all = new ArrayList<>(this.messageArchive); - return all; - } - - @Override - public synchronized List getBy(String connectionId) { - List all = new ArrayList<>(); - for (IoMessage msg : this.messageArchive) { - if (msg.getConnectionId().equals(connectionId)) - all.add(msg); - } - return all; - } - - @Override - public synchronized List getBy(String connectionId, CommandKey key) { - List all = new ArrayList<>(); - for (IoMessage msg : this.messageArchive) { - if (msg.getConnectionId().equals(connectionId) && msg.getKey().equals(key)) - all.add(msg); - } - return all; - } - - @Override - public synchronized void clearArchive() { - this.messageArchive.clear(); - } - - @Override - public synchronized void archive(IoMessage message) { - this.messageArchive.add(message); - trim(); - } - - protected void trim() { - if (this.messageArchive.size() <= this.maxSize) - return; - - Iterator iter = this.messageArchive.iterator(); - for (int i = 0; i <= this.trimSize; i++) { - if (iter.hasNext()) { - iter.next(); - iter.remove(); - } - } - } -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/StreamMessageVisitor.java b/li.strolch.utils/src/main/java/li/strolch/communication/StreamMessageVisitor.java deleted file mode 100644 index 77666b5c4..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/StreamMessageVisitor.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication; - -import java.io.InputStream; -import java.io.OutputStream; - -/** - *

- * {@link IoMessageVisitor} to read or write using IO Streams. - *

- * - *

- * Concrete implementations must be thread safe! - *

- * - * @author Robert von Burg <eitch@eitchnet.ch> - */ -public abstract class StreamMessageVisitor extends IoMessageVisitor { - - public abstract void visit(OutputStream outputStream, IoMessage message) throws Exception; - - public abstract IoMessage visit(InputStream inputStream) throws Exception; -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/chat/Chat.java b/li.strolch.utils/src/main/java/li/strolch/communication/chat/Chat.java deleted file mode 100644 index e96d1f258..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/chat/Chat.java +++ /dev/null @@ -1,134 +0,0 @@ -package li.strolch.communication.chat; - -import java.net.InetAddress; -import java.net.NetworkInterface; -import java.net.SocketException; -import java.net.UnknownHostException; -import java.text.MessageFormat; -import java.util.Enumeration; - -import li.strolch.utils.helper.StringHelper; - -public class Chat { - - public static void main(String[] args) { - - if (args.length < 4) - printIllegalArgsAndExit(args); - - if (args[0].equals("server")) { //$NON-NLS-1$ - startServer(args); - } else if (args[0].equals("client")) { //$NON-NLS-1$ - startClient(args); - } - } - - private static void startServer(String[] args) { - - // server - InetAddress host; - String hostS = args[1]; - try { - host = InetAddress.getByName(hostS); - } catch (UnknownHostException e1) { - System.err.println(MessageFormat.format("Illegal server address: {0}", hostS)); //$NON-NLS-1$ - printIllegalArgsAndExit(args); - return; - } - boolean isHostAddress = false; - try { - for (Enumeration interfaces = NetworkInterface.getNetworkInterfaces(); interfaces - .hasMoreElements(); ) { - NetworkInterface iface = interfaces.nextElement(); - for (Enumeration addresses = iface.getInetAddresses(); addresses.hasMoreElements(); ) { - InetAddress inetAddress = addresses.nextElement(); - if (inetAddress.getHostAddress().equals(host.getHostAddress())) - isHostAddress = true; - } - } - } catch (SocketException e) { - System.err.println("Oops: " + e.getMessage()); //$NON-NLS-1$ - } - - if (!isHostAddress) { - System.err.println( - MessageFormat.format("The address {0} is not an address of this host!", hostS)); //$NON-NLS-1$ - printIllegalArgsAndExit(args); - } - - // port - int port; - String portS = args[2]; - try { - port = Integer.parseInt(portS); - } catch (NumberFormatException e) { - System.err.println(MessageFormat.format("Illegal port: {0}", portS)); //$NON-NLS-1$ - printIllegalArgsAndExit(args); - return; - } - if (port < 1 || port > 65535) { - System.err.println(MessageFormat.format("Illegal port: {0}", port)); //$NON-NLS-1$ - printIllegalArgsAndExit(args); - return; - } - - // username - String username = args[3]; - - // start - ChatServer chatServer = new ChatServer(host, port, username); - chatServer.start(); - } - - private static void startClient(String[] args) { - - // server - InetAddress host; - String hostS = args[1]; - try { - host = InetAddress.getByName(hostS); - } catch (UnknownHostException e1) { - System.err.println(MessageFormat.format("Illegal server address: {0}", hostS)); //$NON-NLS-1$ - printIllegalArgsAndExit(args); - return; - } - - // port - int port; - String portS = args[2]; - try { - port = Integer.parseInt(portS); - } catch (NumberFormatException e) { - System.err.println(MessageFormat.format("Illegal port: {0}", portS)); //$NON-NLS-1$ - printIllegalArgsAndExit(args); - return; - } - if (port < 1 || port > 65535) { - System.err.println(MessageFormat.format("Illegal port: {0}", port)); //$NON-NLS-1$ - printIllegalArgsAndExit(args); - return; - } - - // username - String username = args[3]; - - // start - ChatClient chatClient = new ChatClient(host, port, username); - chatClient.start(); - } - - private static void printIllegalArgsAndExit(String[] args) { - System.err.print("Illegal arguments: "); //$NON-NLS-1$ - if (args.length == 0) { - System.err.print("(none)"); //$NON-NLS-1$ - } else { - for (String arg : args) { - System.err.print(arg + StringHelper.SPACE); - } - } - System.err.println(); - System.err.println("Usage: java ...Chat server "); //$NON-NLS-1$ - System.err.println("Usage: java ...Chat client "); //$NON-NLS-1$ - System.exit(1); - } -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/chat/ChatClient.java b/li.strolch.utils/src/main/java/li/strolch/communication/chat/ChatClient.java deleted file mode 100644 index e7c6cf8d7..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/chat/ChatClient.java +++ /dev/null @@ -1,87 +0,0 @@ -package li.strolch.communication.chat; - -import static li.strolch.communication.chat.ChatMessageVisitor.inboundKey; -import static li.strolch.communication.chat.ChatMessageVisitor.outboundKey; - -import java.net.InetAddress; -import java.util.HashMap; -import java.util.Map; -import java.util.Scanner; - -import li.strolch.communication.*; -import li.strolch.communication.tcpip.ClientSocketEndpoint; -import li.strolch.communication.tcpip.SocketEndpointConstants; -import li.strolch.communication.tcpip.SocketMessageVisitor; - -public class ChatClient implements ConnectionObserver, ConnectionStateObserver { - - private static final String id = "ChatClient"; //$NON-NLS-1$ - private InetAddress host; - private int port; - private String username; - private boolean connected; - - public ChatClient(InetAddress host, int port, String username) { - this.host = host; - this.port = port; - this.username = username; - } - - public void start() { - ConnectionMode mode = ConnectionMode.ON; - - Map parameters = new HashMap<>(); - parameters.put(SocketEndpointConstants.PARAMETER_RETRY, "10000"); //$NON-NLS-1$ - parameters.put(SocketEndpointConstants.PARAMETER_USE_TIMEOUT, Boolean.FALSE.toString()); - parameters.put(SocketEndpointConstants.PARAMETER_REMOTE_INPUT_ADDRESS, this.host.getHostAddress()); - parameters.put(SocketEndpointConstants.PARAMETER_REMOTE_INPUT_PORT, Integer.toString(this.port)); - parameters.put(SocketEndpointConstants.PARAMETER_CLOSE_AFTER_SEND, Boolean.FALSE.toString()); - - SocketMessageVisitor messageVisitor = new ChatMessageVisitor(id); - ClientSocketEndpoint endpoint = new ClientSocketEndpoint(); - - CommunicationConnection connection = new CommunicationConnection(id, mode, parameters, endpoint, - messageVisitor); - connection.addConnectionObserver(outboundKey, this); - connection.addConnectionObserver(inboundKey, this); - connection.addConnectionStateObserver(this); - connection.configure(); - connection.start(); - - while (!this.connected) { - synchronized (this) { - try { - this.wait(2000l); - } catch (InterruptedException e) { - System.err.println("oops: " + e.getMessage()); //$NON-NLS-1$ - } - } - } - - System.out.println("Connected. Send messages to user:"); //$NON-NLS-1$ - while (true) { - @SuppressWarnings("resource") - Scanner in = new Scanner(System.in); - //System.out.print(this.username + ": "); - String line = in.nextLine(); - connection.send(ChatIoMessage.msg(outboundKey, id, this.username, line)); - } - } - - @Override - public void notify(CommandKey key, IoMessage message) { - if (key.equals(inboundKey)) { - ChatIoMessage chatIoMessage = (ChatIoMessage) message; - System.out.println(chatIoMessage.getChatMsg()); - } - } - - @Override - public void notify(ConnectionState oldState, String oldStateMsg, ConnectionState newState, String newStateMsg) { - this.connected = newState == ConnectionState.CONNECTED || newState == ConnectionState.IDLE - || newState == ConnectionState.WORKING; - synchronized (this) { - notifyAll(); - } - } -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/chat/ChatIoMessage.java b/li.strolch.utils/src/main/java/li/strolch/communication/chat/ChatIoMessage.java deleted file mode 100644 index 9806d1a4e..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/chat/ChatIoMessage.java +++ /dev/null @@ -1,31 +0,0 @@ -package li.strolch.communication.chat; - -import li.strolch.communication.CommandKey; -import li.strolch.communication.IoMessage; -import li.strolch.utils.helper.StringHelper; - -public class ChatIoMessage extends IoMessage { - - private String chatMsg; - - public ChatIoMessage(String id, CommandKey key, String connectionId) { - super(id, key, connectionId); - } - - public String getChatMsg() { - return this.chatMsg; - } - - public void setChatMsg(String chagMsg) { - this.chatMsg = chagMsg; - } - - public static ChatIoMessage msg(CommandKey key, String connId, String username, String msg) { - - String line = username + StringHelper.COLON + StringHelper.SPACE + msg; - - ChatIoMessage chatIoMessage = new ChatIoMessage(StringHelper.getUniqueId(), key, connId); - chatIoMessage.setChatMsg(line); - return chatIoMessage; - } -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/chat/ChatMessageVisitor.java b/li.strolch.utils/src/main/java/li/strolch/communication/chat/ChatMessageVisitor.java deleted file mode 100644 index 15deb881d..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/chat/ChatMessageVisitor.java +++ /dev/null @@ -1,44 +0,0 @@ -package li.strolch.communication.chat; - -import java.io.BufferedReader; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.InputStreamReader; - -import li.strolch.communication.CommandKey; -import li.strolch.communication.IoMessage; -import li.strolch.communication.tcpip.SocketMessageVisitor; -import li.strolch.utils.helper.StringHelper; - -public class ChatMessageVisitor extends SocketMessageVisitor { - - public static final CommandKey inboundKey = CommandKey.key("server", "msg"); //$NON-NLS-1$//$NON-NLS-2$ - public static final CommandKey outboundKey = CommandKey.key("client", "msg"); //$NON-NLS-1$ //$NON-NLS-2$ - - public ChatMessageVisitor(String connectionId) { - super(connectionId); - } - - @Override - public IoMessage visit(DataInputStream inputStream, DataOutputStream outputStream) throws Exception { - - BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); - String line = bufferedReader.readLine(); - if (line == null) { - bufferedReader.close(); - return null; - } - - ChatIoMessage chatIoMessage = new ChatIoMessage(StringHelper.getUniqueId(), inboundKey, this.connectionId); - chatIoMessage.setChatMsg(line); - return chatIoMessage; - } - - @Override - public void visit(DataInputStream inputStream, DataOutputStream outputStream, IoMessage message) throws Exception { - ChatIoMessage chatIoMessage = (ChatIoMessage) message; - outputStream.writeChars(chatIoMessage.getChatMsg()); - outputStream.writeChar('\n'); - outputStream.flush(); - } -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/chat/ChatServer.java b/li.strolch.utils/src/main/java/li/strolch/communication/chat/ChatServer.java deleted file mode 100644 index 0cb51e5b9..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/chat/ChatServer.java +++ /dev/null @@ -1,88 +0,0 @@ -package li.strolch.communication.chat; - -import static li.strolch.communication.chat.ChatMessageVisitor.inboundKey; -import static li.strolch.communication.chat.ChatMessageVisitor.outboundKey; - -import java.net.InetAddress; -import java.util.HashMap; -import java.util.Map; -import java.util.Scanner; - -import li.strolch.communication.*; -import li.strolch.communication.tcpip.ServerSocketEndpoint; -import li.strolch.communication.tcpip.SocketEndpointConstants; -import li.strolch.communication.tcpip.SocketMessageVisitor; - -public class ChatServer implements ConnectionObserver, ConnectionStateObserver { - - private static final String id = "ChatServer"; //$NON-NLS-1$ - private InetAddress address; - private int port; - private String username; - private boolean connected; - - public ChatServer(InetAddress address, int port, String username) { - this.address = address; - this.port = port; - this.username = username; - } - - public void start() { - ConnectionMode mode = ConnectionMode.ON; - - Map parameters = new HashMap<>(); - parameters.put(SocketEndpointConstants.PARAMETER_RETRY, "10000"); //$NON-NLS-1$ - parameters.put(SocketEndpointConstants.PARAMETER_USE_TIMEOUT, Boolean.FALSE.toString()); - parameters.put(SocketEndpointConstants.PARAMETER_LOCAL_INPUT_ADDRESS, this.address.getHostAddress()); - parameters.put(SocketEndpointConstants.PARAMETER_LOCAL_INPUT_PORT, Integer.toString(this.port)); - parameters.put(SocketEndpointConstants.PARAMETER_CLOSE_AFTER_SEND, Boolean.FALSE.toString()); - parameters.put(SocketEndpointConstants.PARAMETER_CLOSE_AFTER_SEND, Boolean.FALSE.toString()); - - SocketMessageVisitor messageVisitor = new ChatMessageVisitor(id); - ServerSocketEndpoint endpoint = new ServerSocketEndpoint(); - - CommunicationConnection connection = new CommunicationConnection(id, mode, parameters, endpoint, - messageVisitor); - connection.addConnectionObserver(outboundKey, this); - connection.addConnectionObserver(inboundKey, this); - connection.addConnectionStateObserver(this); - connection.configure(); - connection.start(); - - while (!this.connected) { - synchronized (this) { - try { - this.wait(2000l); - } catch (InterruptedException e) { - System.err.println("oops: " + e.getMessage()); //$NON-NLS-1$ - } - } - } - - System.out.println("Connected. Send messages to user:"); //$NON-NLS-1$ - while (true) { - @SuppressWarnings("resource") - Scanner in = new Scanner(System.in); - //System.out.print(this.username + ": "); - String line = in.nextLine(); - connection.send(ChatIoMessage.msg(outboundKey, id, this.username, line)); - } - } - - @Override - public void notify(CommandKey key, IoMessage message) { - if (key.equals(inboundKey)) { - ChatIoMessage chatIoMessage = (ChatIoMessage) message; - System.out.println(chatIoMessage.getChatMsg()); - } - } - - @Override - public void notify(ConnectionState oldState, String oldStateMsg, ConnectionState newState, String newStateMsg) { - this.connected = newState == ConnectionState.CONNECTED || newState == ConnectionState.IDLE - || newState == ConnectionState.WORKING; - synchronized (this) { - notifyAll(); - } - } -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/console/ConsoleEndpoint.java b/li.strolch.utils/src/main/java/li/strolch/communication/console/ConsoleEndpoint.java deleted file mode 100644 index 3f17fc1b9..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/console/ConsoleEndpoint.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication.console; - -import li.strolch.communication.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author Robert von Burg <eitch@eitchnet.ch> - */ -public class ConsoleEndpoint implements CommunicationEndpoint { - - private static final Logger logger = LoggerFactory.getLogger(ConsoleEndpoint.class); - private CommunicationConnection connection; - private ConsoleMessageVisitor messageVisitor; - - @Override - public void configure(CommunicationConnection connection, IoMessageVisitor messageVisitor) { - this.connection = connection; - ConnectionMessages.assertLegalMessageVisitor(this.getClass(), ConsoleMessageVisitor.class, messageVisitor); - this.messageVisitor = (ConsoleMessageVisitor) messageVisitor; - } - - @Override - public void start() { - this.connection.notifyStateChange(ConnectionState.IDLE, ConnectionState.IDLE.toString()); - } - - @Override - public void stop() { - this.connection.notifyStateChange(ConnectionState.DISCONNECTED, ConnectionState.DISCONNECTED.toString()); - } - - @Override - public void reset() { - this.connection.notifyStateChange(ConnectionState.INITIALIZED, ConnectionState.INITIALIZED.toString()); - } - - @Override - public String getLocalUri() { - return "console"; //$NON-NLS-1$ - } - - @Override - public String getRemoteUri() { - return "console"; //$NON-NLS-1$ - } - - @Override - public void send(IoMessage message) throws Exception { - this.connection.notifyStateChange(ConnectionState.WORKING, ConnectionState.WORKING.toString()); - try { - this.messageVisitor.visit(logger, message); - } finally { - this.connection.notifyStateChange(ConnectionState.IDLE, ConnectionState.IDLE.toString()); - } - } - - @Override - public void simulate(IoMessage message) throws Exception { - send(message); - } -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/console/ConsoleMessageVisitor.java b/li.strolch.utils/src/main/java/li/strolch/communication/console/ConsoleMessageVisitor.java deleted file mode 100644 index c3cc1e81c..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/console/ConsoleMessageVisitor.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication.console; - -import li.strolch.communication.IoMessage; -import li.strolch.communication.IoMessageVisitor; -import org.slf4j.Logger; - -public abstract class ConsoleMessageVisitor extends IoMessageVisitor { - - public abstract void visit(Logger logger, IoMessage message) throws Exception; -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/file/FileEndpoint.java b/li.strolch.utils/src/main/java/li/strolch/communication/file/FileEndpoint.java deleted file mode 100644 index 2ecbed416..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/file/FileEndpoint.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication.file; - -import java.io.File; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.text.MessageFormat; -import java.util.Map; - -import li.strolch.communication.*; -import li.strolch.utils.helper.StringHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * An {@link CommunicationEndpoint} which writes and/or reads from a designated file - * - * @author Robert von Burg <eitch@eitchnet.ch> - */ -public class FileEndpoint implements CommunicationEndpoint, Runnable { - - public static final String ENDPOINT_MODE = "endpointMode"; //$NON-NLS-1$ - public static final String INBOUND_FILENAME = "inboundFilename"; //$NON-NLS-1$ - public static final String OUTBOUND_FILENAME = "outboundFilename"; //$NON-NLS-1$ - public static final long POLL_TIME = 1000l; - - private static final Logger logger = LoggerFactory.getLogger(FileEndpoint.class); - - private CommunicationConnection connection; - - private FileEndpointMode endpointMode; - private String inboundFilename; - private String outboundFilename; - private Thread thread; - private boolean run = false; - private StreamMessageVisitor messageVisitor; - - /** - * {@link FileEndpoint} needs the following parameters on the configuration to be initialized - *
    - *
  • outboundFilename: the file name where the {@link IoMessage} contents are written to. The value may contain - * {@link System#getProperty(String)} place holders which will be evaluated
  • - *
- */ - @Override - public void configure(CommunicationConnection connection, IoMessageVisitor messageVisitor) { - this.connection = connection; - - ConnectionMessages.assertLegalMessageVisitor(this.getClass(), StreamMessageVisitor.class, messageVisitor); - this.messageVisitor = (StreamMessageVisitor) messageVisitor; - - configure(); - } - - private void configure() { - Map parameters = this.connection.getParameters(); - - String endpointModeS = parameters.get(ENDPOINT_MODE); - if (StringHelper.isEmpty(endpointModeS)) { - throw ConnectionMessages.throwInvalidParameter(FileEndpoint.class, ENDPOINT_MODE, endpointModeS); - } - try { - this.endpointMode = FileEndpointMode.valueOf(endpointModeS); - } catch (Exception e) { - throw ConnectionMessages.throwInvalidParameter(FileEndpoint.class, ENDPOINT_MODE, endpointModeS); - } - - if (this.endpointMode.isRead()) { - this.inboundFilename = parameters.get(INBOUND_FILENAME); - if (StringHelper.isEmpty(this.inboundFilename)) { - throw ConnectionMessages - .throwInvalidParameter(FileEndpoint.class, INBOUND_FILENAME, this.inboundFilename); - } - } - - if (this.endpointMode.isWrite()) { - this.outboundFilename = parameters.get(OUTBOUND_FILENAME); - if (StringHelper.isEmpty(this.outboundFilename)) { - throw ConnectionMessages - .throwInvalidParameter(FileEndpoint.class, OUTBOUND_FILENAME, this.outboundFilename); - } - } - } - - @Override - public String getLocalUri() { - return new File(this.inboundFilename).getAbsolutePath(); - } - - @Override - public String getRemoteUri() { - return new File(this.outboundFilename).getAbsolutePath(); - } - - @Override - public void start() { - if (this.endpointMode.isRead()) { - this.thread = new Thread(this, new File(this.inboundFilename).getName()); - this.run = true; - this.thread.start(); - } - this.connection.notifyStateChange(ConnectionState.IDLE, ConnectionState.IDLE.toString()); - } - - @Override - public void stop() { - stopThread(); - this.connection.notifyStateChange(ConnectionState.DISCONNECTED, ConnectionState.DISCONNECTED.toString()); - } - - @Override - public void reset() { - stopThread(); - configure(); - this.connection.notifyStateChange(ConnectionState.INITIALIZED, ConnectionState.INITIALIZED.toString()); - } - - private void stopThread() { - this.run = false; - if (this.thread != null) { - try { - this.thread.interrupt(); - this.thread.join(2000l); - } catch (Exception e) { - logger.error(MessageFormat - .format("Error while interrupting thread: {0}", e.getLocalizedMessage())); //$NON-NLS-1$ - } - - this.thread = null; - } - } - - @Override - public void send(IoMessage message) throws Exception { - if (!this.endpointMode.isWrite()) { - String msg = "FileEnpoint mode is {0} and thus write is not allowed!"; //$NON-NLS-1$ - msg = MessageFormat.format(msg, this.endpointMode); - throw new ConnectionException(msg); - } - - this.connection.notifyStateChange(ConnectionState.WORKING, ConnectionState.WORKING.toString()); - - // open the stream - try (OutputStream outputStream = Files.newOutputStream(Paths.get(this.outboundFilename))) { - - // write the message using the visitor - this.messageVisitor.visit(outputStream, message); - - } finally { - this.connection.notifyStateChange(ConnectionState.IDLE, ConnectionState.IDLE.toString()); - } - } - - @Override - public void simulate(IoMessage message) throws Exception { - this.messageVisitor.simulate(message); - } - - @Override - public void run() { - - File file = new File(this.inboundFilename); - - long lastModified = 0l; - - logger.info("Starting..."); //$NON-NLS-1$ - while (this.run) { - - try { - - if (file.canRead()) { - long tmpModified = file.lastModified(); - if (tmpModified > lastModified) { - - logger.info(MessageFormat.format("Handling file {0}", file.getAbsolutePath())); //$NON-NLS-1$ - - this.connection.notifyStateChange(ConnectionState.WORKING, ConnectionState.WORKING.toString()); - - // file is changed - lastModified = tmpModified; - - // read the file - handleFile(file); - - this.connection.notifyStateChange(ConnectionState.IDLE, ConnectionState.IDLE.toString()); - } - } - - if (this.run) { - this.connection.notifyStateChange(ConnectionState.WAITING, ConnectionState.WAITING.toString()); - try { - synchronized (this) { - this.wait(POLL_TIME); - } - } catch (InterruptedException e) { - this.run = false; - logger.info("Interrupted!"); //$NON-NLS-1$ - } - } - - } catch (Exception e) { - logger.error(MessageFormat.format("Error reading file: {0}", file.getAbsolutePath())); //$NON-NLS-1$ - logger.error(e.getMessage(), e); - - this.connection.notifyStateChange(ConnectionState.BROKEN, e.getLocalizedMessage()); - } - } - } - - /** - * Reads the file and handle using {@link StreamMessageVisitor} - * - * @param file - * the {@link File} to read - */ - protected void handleFile(File file) throws Exception { - - try (InputStream inputStream = Files.newInputStream(file.toPath())) { - - // convert the object to an integration message - IoMessage message = this.messageVisitor.visit(inputStream); - - // and forward to the connection - if (message != null) { - this.connection.handleNewMessage(message); - } - } - } -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/file/FileEndpointMode.java b/li.strolch.utils/src/main/java/li/strolch/communication/file/FileEndpointMode.java deleted file mode 100644 index 603150478..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/file/FileEndpointMode.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication.file; - -public enum FileEndpointMode { - READ(true, false), - WRITE(false, true), - READ_WRITE(true, true); - private boolean read; - private boolean write; - - private FileEndpointMode(boolean read, boolean write) { - this.read = read; - this.write = write; - } - - public boolean isRead() { - return this.read; - } - - public boolean isWrite() { - return this.write; - } -} \ No newline at end of file diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/tcpip/ClientSocketEndpoint.java b/li.strolch.utils/src/main/java/li/strolch/communication/tcpip/ClientSocketEndpoint.java deleted file mode 100644 index 97679c437..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/tcpip/ClientSocketEndpoint.java +++ /dev/null @@ -1,544 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication.tcpip; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.net.InetAddress; -import java.net.Socket; -import java.net.UnknownHostException; -import java.text.MessageFormat; -import java.util.Map; - -import li.strolch.communication.*; -import li.strolch.communication.IoMessage.State; -import li.strolch.utils.helper.ExceptionHelper; -import li.strolch.utils.helper.StringHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - *

- * This {@link CommunicationEndpoint} is an abstract implementation with everything needed to connect through a {@link - * Socket} to a remote server which is listening for incoming {@link Socket} connections. This {@link Socket} endpoint - * can send messages to the remote side, as well as receive messages from the remote side - *

- *

- * This endpoint is maintained as a client connection. This means that this endpoint opens the {@link Socket} to the - * remote server - *

- * - * @author Robert von Burg <eitch@eitchnet.ch> - */ -public class ClientSocketEndpoint implements CommunicationEndpoint { - - protected static final Logger logger = LoggerFactory.getLogger(ClientSocketEndpoint.class); - - // state variables - private boolean connected; - private boolean closed; - private long lastConnect; - private boolean useTimeout; - private int timeout; - private long retry; - private boolean clearOnConnect; - private boolean connectOnStart; - private boolean closeAfterSend; - - // remote address - private String remoteInputAddressS; - private int remoteInputPort; - private String localOutputAddressS; - private int localOutputPort; - - private InetAddress remoteInputAddress; - private InetAddress localOutputAddress; - - // connection - private Socket socket; - - protected DataOutputStream outputStream; - protected DataInputStream inputStream; - - protected CommunicationConnection connection; - - protected SocketMessageVisitor messageVisitor; - - /** - * Default constructor - */ - public ClientSocketEndpoint() { - this.connected = false; - this.closed = true; - } - - /** - * Checks the state of the connection and returns true if {@link Socket} is connected and ready for transmission, - * false otherwise - * - * @return true if {@link Socket} is connected and ready for transmission, false otherwise - */ - protected boolean checkConnection() { - return !this.closed && this.connected && (this.socket != null && !this.socket.isClosed() && this.socket - .isBound() && this.socket.isConnected() && !this.socket.isInputShutdown() && !this.socket - .isOutputShutdown()); - } - - /** - * Establishes a {@link Socket} connection to the remote server. This method blocks till a connection is - * established. In the event of a connection failure, the method waits a configured time before retrying - */ - protected void openConnection() { - - ConnectionState state = this.connection.getState(); - - // do not allow connecting if state is - // - CREATED - // - CONNECTING - // - WAITING - // - CLOSED - if (state == ConnectionState.CREATED || state == ConnectionState.CONNECTING || state == ConnectionState.WAITING - || state == ConnectionState.DISCONNECTED) { - - ConnectionMessages.throwIllegalConnectionState(state, ConnectionState.CONNECTING); - } - - // first close the connection - closeConnection(); - - while (!this.connected && !this.closed) { - try { - - this.connection.notifyStateChange(ConnectionState.CONNECTING, ConnectionState.CONNECTING.toString()); - - // only try in proper intervals - long currentTime = System.currentTimeMillis(); - long timeDifference = currentTime - this.lastConnect; - if (timeDifference < this.retry) { - long wait = (this.retry - timeDifference); - logger.info(MessageFormat.format("Waiting: {0}ms", wait)); //$NON-NLS-1$ - - this.connection.notifyStateChange(ConnectionState.WAITING, ConnectionState.WAITING.toString()); - Thread.sleep(wait); - this.connection - .notifyStateChange(ConnectionState.CONNECTING, ConnectionState.CONNECTING.toString()); - } - - // don't try and connect if we are closed! - if (this.closed) { - logger.error("The connection has been closed and can not be connected"); //$NON-NLS-1$ - closeConnection(); - this.connection.notifyStateChange(ConnectionState.DISCONNECTED, null); - return; - } - - // keep track of the time of this connection attempt - this.lastConnect = System.currentTimeMillis(); - - // open the socket - if (this.localOutputAddress != null) { - String msg = "Opening connection to {0}:{1} from {2}:{3}..."; //$NON-NLS-1$ - logger.info(MessageFormat.format(msg, this.remoteInputAddress.getHostAddress(), - Integer.toString(this.remoteInputPort), this.localOutputAddress.getHostAddress(), - Integer.toString(this.localOutputPort))); - this.socket = new Socket(this.remoteInputAddress, this.remoteInputPort, this.localOutputAddress, - this.localOutputPort); - } else { - String msg = "Opening connection to {0}:{1}..."; //$NON-NLS-1$ - logger.info(MessageFormat.format(msg, this.remoteInputAddress.getHostAddress(), - Integer.toString(this.remoteInputPort))); - this.socket = new Socket(this.remoteInputAddress, this.remoteInputPort); - } - - // configure the socket - if (logger.isDebugEnabled()) { - String msg = "BufferSize (send/read): {0} / {1} SoLinger: {2} TcpNoDelay: {3}"; //$NON-NLS-1$ - logger.info(MessageFormat - .format(msg, this.socket.getSendBufferSize(), this.socket.getReceiveBufferSize(), - this.socket.getSoLinger(), this.socket.getTcpNoDelay())); - } - //outputSocket.setSendBufferSize(1); - //outputSocket.setSoLinger(true, 0); - //outputSocket.setTcpNoDelay(true); - - // activate connection timeout - if (this.useTimeout) { - this.socket.setSoTimeout(this.timeout); - } - - // get the streams - this.outputStream = new DataOutputStream(this.socket.getOutputStream()); - this.inputStream = new DataInputStream(this.socket.getInputStream()); - - if (this.clearOnConnect) { - // clear the input stream - int available = this.inputStream.available(); - logger.info(MessageFormat.format("clearOnConnect: skipping {0} bytes.", available)); //$NON-NLS-1$ - this.inputStream.skip(available); - } - - String msg = "Connected {0}: {1}:{2} with local side {3}:{4}"; //$NON-NLS-1$ - logger.info(MessageFormat.format(msg, this.connection.getId(), this.remoteInputAddressS, - Integer.toString(this.remoteInputPort), this.socket.getLocalAddress().getHostAddress(), - Integer.toString(this.socket.getLocalPort()))); - - // we are connected! - this.connection.notifyStateChange(ConnectionState.CONNECTED, ConnectionState.CONNECTED.toString()); - this.connected = true; - - } catch (InterruptedException e) { - logger.info("Interrupted!"); //$NON-NLS-1$ - this.closed = true; - this.connection.notifyStateChange(ConnectionState.DISCONNECTED, null); - } catch (Exception e) { - String msg = "Error while connecting to {0}:{1}: {2}"; //$NON-NLS-1$ - logger.error(MessageFormat.format(msg, this.remoteInputAddressS, Integer.toString(this.remoteInputPort), - ExceptionHelper.formatExceptionMessage(e))); - this.connection.notifyStateChange(ConnectionState.BROKEN, e.getLocalizedMessage()); - } - } - } - - /** - * closes the connection HARD by calling close() on the streams and socket. All Exceptions are caught to make sure - * that the connections are cleaned up - */ - protected void closeConnection() { - - this.connected = false; - this.connection.notifyStateChange(ConnectionState.BROKEN, null); - - if (this.outputStream != null) { - try { - this.outputStream.close(); - } catch (IOException e) { - logger.error( - MessageFormat.format("Error closing OutputStream: {0}", e.getLocalizedMessage())); //$NON-NLS-1$ - } finally { - this.outputStream = null; - } - } - - if (this.inputStream != null) { - try { - this.inputStream.close(); - } catch (IOException e) { - logger.error( - MessageFormat.format("Error closing InputStream: {0}", e.getLocalizedMessage())); //$NON-NLS-1$ - } finally { - this.inputStream = null; - } - } - - if (this.socket != null) { - try { - this.socket.close(); - } catch (IOException e) { - logger.error( - MessageFormat.format("Error closing OutputSocket: {0}", e.getLocalizedMessage())); //$NON-NLS-1$ - } finally { - this.socket = null; - } - - String msg = "Socket closed for connection {0} at remote input address {1}:{2}"; //$NON-NLS-1$ - logger.info(MessageFormat.format(msg, this.connection.getId(), this.remoteInputAddressS, - Integer.toString(this.remoteInputPort))); - } - } - - /** - *

- * Configures this {@link ClientSocketEndpoint} - *

- * gets the parameter map from the connection and reads the following parameters from the map: - *
    - *
  • remoteInputAddress - the IP or Hostname of the remote server
  • - *
  • remoteInputPort - the port to which the socket should be established
  • - *
  • localOutputAddress - the IP or Hostname of the local server (if null, then the network layer will - * decide)
  • - *
  • localOutputPort - the local port from which the socket should go out of (if null, then the network layer - * will - * decide)
  • - *
  • retry - a configured retry wait time. Default is {@link SocketEndpointConstants#RETRY}
  • - *
  • timeout - the timeout after which an idle socket is deemed dead. Default is - * {@link SocketEndpointConstants#TIMEOUT}
  • - *
  • useTimeout - if true, then the timeout is activated, otherwise it is. default is - * {@link SocketEndpointConstants#USE_TIMEOUT}
  • - *
  • clearOnConnect - if true, then the after a successful connect the input is cleared by discarding all - * available bytes. This can be useful in cases where the channel is clogged with stale data. default is {@link - * SocketEndpointConstants#CLEAR_ON_CONNECT}
  • - *
  • connectOnStart - if true, then when the connection is started, the connection to the remote address is - * attempted. default is {@link SocketEndpointConstants#CONNECT_ON_START} - *
- * - * @see CommunicationEndpoint#configure(CommunicationConnection, IoMessageVisitor) - */ - @Override - public void configure(CommunicationConnection connection, IoMessageVisitor messageVisitor) { - if (this.connection != null && connection.getState().compareTo(ConnectionState.INITIALIZED) > 0) { - String msg = "{0}:{1} already configured."; //$NON-NLS-1$ - logger.warn(MessageFormat.format(msg, this.getClass().getSimpleName(), connection.getId())); - return; - } - - ConnectionMessages.assertLegalMessageVisitor(this.getClass(), SocketMessageVisitor.class, messageVisitor); - this.messageVisitor = (SocketMessageVisitor) messageVisitor; - this.connection = connection; - configure(); - } - - private void configure() { - Map parameters = this.connection.getParameters(); - - this.remoteInputAddressS = parameters.get(SocketEndpointConstants.PARAMETER_REMOTE_INPUT_ADDRESS); - String remoteInputPortS = parameters.get(SocketEndpointConstants.PARAMETER_REMOTE_INPUT_PORT); - this.localOutputAddressS = parameters.get(SocketEndpointConstants.PARAMETER_LOCAL_OUTPUT_ADDRESS); - String localOutputPortS = parameters.get(SocketEndpointConstants.PARAMETER_LOCAL_OUTPUT_PORT); - - // parse remote input Address to InetAddress object - try { - this.remoteInputAddress = InetAddress.getByName(this.remoteInputAddressS); - } catch (UnknownHostException e) { - throw ConnectionMessages.throwInvalidParameter(ClientSocketEndpoint.class, - SocketEndpointConstants.PARAMETER_REMOTE_INPUT_ADDRESS, this.remoteInputAddressS); - } - - // parse remote input address port to integer - try { - this.remoteInputPort = Integer.parseInt(remoteInputPortS); - } catch (NumberFormatException e) { - throw ConnectionMessages.throwInvalidParameter(ClientSocketEndpoint.class, - SocketEndpointConstants.PARAMETER_REMOTE_INPUT_PORT, remoteInputPortS); - } - - // if local output address is not set, then we will use the localhost InetAddress - if (this.localOutputAddressS == null || this.localOutputAddressS.length() == 0) { - logger.debug("No localOutputAddress set. Using localhost"); //$NON-NLS-1$ - } else { - - // parse local output address name to InetAddress object - try { - this.localOutputAddress = InetAddress.getByName(this.localOutputAddressS); - } catch (UnknownHostException e) { - String msg = "The host name ''{0}'' can not be evaluated to an internet address"; //$NON-NLS-1$ - msg = MessageFormat.format(msg, this.localOutputAddressS); - throw new ConnectionException(msg, e); - } - - // parse local output address port to integer - try { - this.localOutputPort = Integer.parseInt(localOutputPortS); - } catch (NumberFormatException e) { - throw ConnectionMessages.throwInvalidParameter(ClientSocketEndpoint.class, - SocketEndpointConstants.PARAMETER_LOCAL_OUTPUT_PORT, localOutputPortS); - } - } - - // configure retry wait time - String retryS = parameters.get(SocketEndpointConstants.PARAMETER_RETRY); - if (retryS == null || retryS.length() == 0) { - ConnectionMessages.warnUnsetParameter(ClientSocketEndpoint.class, SocketEndpointConstants.PARAMETER_RETRY, - String.valueOf(SocketEndpointConstants.RETRY)); - this.retry = SocketEndpointConstants.RETRY; - } else { - try { - this.retry = Long.parseLong(retryS); - } catch (NumberFormatException e) { - throw ConnectionMessages - .throwInvalidParameter(ClientSocketEndpoint.class, SocketEndpointConstants.PARAMETER_RETRY, - retryS); - } - } - - // configure connect on start - String connectOnStartS = parameters.get(SocketEndpointConstants.PARAMETER_CONNECT_ON_START); - if (StringHelper.isNotEmpty(connectOnStartS)) { - this.connectOnStart = StringHelper.parseBoolean(connectOnStartS); - } else { - ConnectionMessages - .warnUnsetParameter(ClientSocketEndpoint.class, SocketEndpointConstants.PARAMETER_CONNECT_ON_START, - String.valueOf(SocketEndpointConstants.CONNECT_ON_START)); - this.connectOnStart = SocketEndpointConstants.CONNECT_ON_START; - } - - // configure closeAfterSend - String closeAfterSendS = parameters.get(SocketEndpointConstants.PARAMETER_CLOSE_AFTER_SEND); - if (StringHelper.isNotEmpty(closeAfterSendS)) { - this.closeAfterSend = StringHelper.parseBoolean(closeAfterSendS); - } else { - ConnectionMessages - .warnUnsetParameter(ClientSocketEndpoint.class, SocketEndpointConstants.PARAMETER_CLOSE_AFTER_SEND, - String.valueOf(SocketEndpointConstants.CLOSE_AFTER_SEND)); - this.closeAfterSend = SocketEndpointConstants.CLOSE_AFTER_SEND; - } - - // configure if timeout on connection should be activated - String useTimeoutS = parameters.get(SocketEndpointConstants.PARAMETER_USE_TIMEOUT); - if (useTimeoutS == null || useTimeoutS.length() == 0) { - ConnectionMessages - .warnUnsetParameter(ClientSocketEndpoint.class, SocketEndpointConstants.PARAMETER_USE_TIMEOUT, - String.valueOf(SocketEndpointConstants.USE_TIMEOUT)); - this.useTimeout = SocketEndpointConstants.USE_TIMEOUT; - } else { - this.useTimeout = Boolean.parseBoolean(useTimeoutS); - } - - if (this.useTimeout) { - // configure timeout on connection - String timeoutS = parameters.get(SocketEndpointConstants.PARAMETER_TIMEOUT); - if (timeoutS == null || timeoutS.length() == 0) { - ConnectionMessages - .warnUnsetParameter(ClientSocketEndpoint.class, SocketEndpointConstants.PARAMETER_TIMEOUT, - String.valueOf(SocketEndpointConstants.TIMEOUT)); - this.timeout = SocketEndpointConstants.TIMEOUT; - } else { - try { - this.timeout = Integer.parseInt(timeoutS); - } catch (NumberFormatException e) { - throw ConnectionMessages.throwInvalidParameter(ClientSocketEndpoint.class, - SocketEndpointConstants.PARAMETER_TIMEOUT, timeoutS); - } - } - } - - // configure if the connection should be cleared on connect - String clearOnConnectS = parameters.get(SocketEndpointConstants.PARAMETER_CLEAR_ON_CONNECT); - if (clearOnConnectS == null || clearOnConnectS.length() == 0) { - ConnectionMessages - .warnUnsetParameter(ClientSocketEndpoint.class, SocketEndpointConstants.PARAMETER_CLEAR_ON_CONNECT, - String.valueOf(SocketEndpointConstants.CLEAR_ON_CONNECT)); - this.clearOnConnect = SocketEndpointConstants.CLEAR_ON_CONNECT; - } else { - this.clearOnConnect = Boolean.parseBoolean(clearOnConnectS); - } - } - - /** - * @return the uri as String to which this {@link ClientSocketEndpoint} is locally bound to - */ - @Override - public String getLocalUri() { - if (this.socket != null) { - InetAddress localAddress = this.socket.getLocalAddress(); - return localAddress.getHostAddress() + StringHelper.COLON + this.socket.getLocalPort(); - } else if (this.localOutputAddress != null) { - return this.localOutputAddress.getHostAddress() + StringHelper.COLON + this.localOutputPort; - } - - return "0.0.0.0:0"; //$NON-NLS-1$ - } - - /** - * @return the uri as String to which this {@link ClientSocketEndpoint} is connecting to - */ - @Override - public String getRemoteUri() { - if (this.socket != null) { - InetAddress remoteAddress = this.socket.getInetAddress(); - return remoteAddress.getHostAddress() + StringHelper.COLON + this.socket.getPort(); - } else if (this.remoteInputAddress != null) { - return this.remoteInputAddress.getHostAddress() + StringHelper.COLON + this.remoteInputPort; - } - - return this.remoteInputAddressS + StringHelper.COLON + this.remoteInputPort; - } - - /** - * Allows this end point to connect and then opens the connection to the defined remote server - * - * @see CommunicationEndpoint#start() - */ - @Override - public void start() { - if (!this.closed) { - logger.warn(MessageFormat - .format("CommunicationConnection {0} already started.", this.connection.getId())); //$NON-NLS-1$ - } else { - // logger.info(MessageFormat.format("Enabling connection {0}...", this.connection.getId())); //$NON-NLS-1$ - this.closed = false; - this.connection.notifyStateChange(ConnectionState.INITIALIZED, ConnectionState.INITIALIZED.toString()); - if (this.connectOnStart) { - openConnection(); - } - } - } - - /** - * Closes this connection and disallows this end point to reconnect - * - * @see CommunicationEndpoint#stop() - */ - @Override - public void stop() { - this.closed = true; - - closeConnection(); - this.connection.notifyStateChange(ConnectionState.DISCONNECTED, ConnectionState.DISCONNECTED.toString()); - - logger.info(MessageFormat.format("Disabled connection {0}.", this.connection.getId())); //$NON-NLS-1$ - } - - @Override - public void reset() { - this.closed = true; - closeConnection(); - this.connection.notifyStateChange(ConnectionState.INITIALIZED, ConnectionState.INITIALIZED.toString()); - } - - @Override - public void simulate(IoMessage message) throws Exception { - this.messageVisitor.simulate(message); - } - - @Override - public void send(IoMessage message) throws Exception { - - while (!this.closed && message.getState() == State.PENDING) { - try { - - // open the connection - if (!checkConnection()) - openConnection(); - - // read and write to the client socket - this.messageVisitor.visit(this.inputStream, this.outputStream, message); - - message.setState(State.DONE, State.DONE.name()); - - // since send was ok, allow to reconnect immediately - if (this.closeAfterSend) { - this.lastConnect = System.currentTimeMillis() - this.retry; - } - - } catch (Exception e) { - if (this.closed) { - logger.warn("Socket has been closed!"); //$NON-NLS-1$ - message.setState(State.FATAL, "Socket has been closed!"); //$NON-NLS-1$ - } else { - closeConnection(); - logger.error(e.getMessage(), e); - message.setState(State.FATAL, e.getLocalizedMessage()); - this.connection.notifyStateChange(ConnectionState.BROKEN, e.getLocalizedMessage()); - } - } finally { - if (this.closeAfterSend && !this.closed) { - closeConnection(); - } - } - } - } -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/tcpip/ServerSocketEndpoint.java b/li.strolch.utils/src/main/java/li/strolch/communication/tcpip/ServerSocketEndpoint.java deleted file mode 100644 index 16a1e65a3..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/tcpip/ServerSocketEndpoint.java +++ /dev/null @@ -1,589 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication.tcpip; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.net.*; -import java.text.MessageFormat; -import java.util.Map; - -import li.strolch.communication.*; -import li.strolch.utils.helper.StringHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - *

- * This {@link CommunicationEndpoint} is an abstract implementation with everything needed to start a {@link Socket} - * server which waits for a request from a single client - *

- *

- * This end point only allows a single connection at a time and implements all exception handling for opening and - * closing a {@link Socket} connection - *

- * - * @author Robert von Burg <eitch@eitchnet.ch> - * @see ServerSocketEndpoint#configure(CommunicationConnection, IoMessageVisitor) for details on configuring the end - * point - */ -public class ServerSocketEndpoint implements CommunicationEndpoint, Runnable { - - protected static final Logger logger = LoggerFactory.getLogger(ServerSocketEndpoint.class); - - private Thread serverThread; - - // state variables - private boolean connected; - private boolean closed; - private boolean fatal; - private long lastConnect; - private boolean useTimeout; - private int timeout; - private long retry; - private boolean clearOnConnect; - - // address - private String localInputAddressS; - private int localInputPort; - private String remoteOutputAddressS; - private int remoteOutputPort; - - private InetAddress localInputAddress; - private InetAddress remoteOutputAddress; - - // connection - private ServerSocket serverSocket; - private Socket socket; - - protected DataOutputStream outputStream; - protected DataInputStream inputStream; - - protected CommunicationConnection connection; - - protected SocketMessageVisitor messageVisitor; - - /** - * Default constructor - */ - public ServerSocketEndpoint() { - this.connected = false; - this.closed = true; - this.fatal = false; - } - - /** - * Checks the state of the connection and returns true if {@link Socket} is connected and ready for transmission, - * false otherwise - * - * @return true if {@link Socket} is connected and ready for transmission, false otherwise - */ - protected boolean checkConnection() { - return !this.closed && this.connected && (this.socket != null && !this.socket.isClosed() && this.socket - .isBound() && this.socket.isConnected() && !this.socket.isInputShutdown() && !this.socket - .isOutputShutdown()); - } - - /** - * Listens on the {@link ServerSocket} for an incoming connection. Prepares the connection then for use. If the - * remote address has been defined, then the remote connection is validated to come from this appropriate host. - * CommunicationConnection attempts are always separated by a configured amount of time - */ - protected void openConnection() { - - ConnectionState state = this.connection.getState(); - - // do not open the connection if state is - // - CREATED - // - CONNECTING - // - WAITING - // - CLOSED - if (state == ConnectionState.CREATED || state == ConnectionState.CONNECTING || state == ConnectionState.WAITING - || state == ConnectionState.DISCONNECTED) { - - ConnectionMessages.throwIllegalConnectionState(state, ConnectionState.CONNECTING); - } - - // first close the connection - closeConnection(); - - while (!this.connected && !this.closed) { - try { - - this.connection.notifyStateChange(ConnectionState.CONNECTING, ConnectionState.CONNECTING.toString()); - - // only try in proper intervals - long currentTime = System.currentTimeMillis(); - long timeDifference = currentTime - this.lastConnect; - if (timeDifference < this.retry) { - long wait = this.retry - timeDifference; - logger.info(MessageFormat.format("Waiting: {0}ms", wait)); //$NON-NLS-1$ - - this.connection.notifyStateChange(ConnectionState.WAITING, ConnectionState.WAITING.toString()); - Thread.sleep(wait); - this.connection - .notifyStateChange(ConnectionState.CONNECTING, ConnectionState.CONNECTING.toString()); - } - - // don't try and connect if we are closed! - if (this.closed) { - logger.error("The connection has been closed and can not be connected"); //$NON-NLS-1$ - closeConnection(); - this.connection.notifyStateChange(ConnectionState.DISCONNECTED, null); - return; - } - - // keep track of the time of this connection attempt - this.lastConnect = System.currentTimeMillis(); - - // open the socket - String msg = "Waiting for connections on: {0}:{1}..."; //$NON-NLS-1$ - logger.info(MessageFormat - .format(msg, this.localInputAddress.getHostAddress(), Integer.toString(this.localInputPort))); - this.socket = this.serverSocket.accept(); - - // validate that the remote side of the socket is really the client we want - if (this.remoteOutputAddress != null) { - - String remoteAddr = this.socket.getInetAddress().getHostAddress(); - if (!remoteAddr.equals(this.remoteOutputAddress.getHostAddress())) { - msg = "Illegal remote client at address {0}. Expected is {1}"; //$NON-NLS-1$ - msg = MessageFormat.format(msg, remoteAddr, this.remoteOutputAddress.getHostAddress()); - logger.error(msg); - - closeConnection(); - - throw new ConnectionException(msg); - } - } - - // configure the socket - if (logger.isDebugEnabled()) { - msg = "BufferSize (send/read): {0} / {1} SoLinger: {2} TcpNoDelay: {3}"; //$NON-NLS-1$ - logger.debug(MessageFormat - .format(msg, this.socket.getSendBufferSize(), this.socket.getReceiveBufferSize(), - this.socket.getSoLinger(), this.socket.getTcpNoDelay())); - } - //inputSocket.setSendBufferSize(1); - //inputSocket.setSoLinger(true, 0); - //inputSocket.setTcpNoDelay(true); - - // activate connection timeout - if (this.useTimeout) { - this.socket.setSoTimeout(this.timeout); - } - - // get the streams - this.outputStream = new DataOutputStream(this.socket.getOutputStream()); - this.inputStream = new DataInputStream(this.socket.getInputStream()); - - if (this.clearOnConnect) { - // clear the input stream - int available = this.inputStream.available(); - logger.info(MessageFormat.format("clearOnConnect: skipping {0} bytes.", available)); //$NON-NLS-1$ - this.inputStream.skip(available); - } - - msg = "Connected {0}{1}: {2}:{3} with local side {4}:{5}"; //$NON-NLS-1$ - logger.info(MessageFormat.format(msg, this.getClass().getSimpleName(), this.connection.getId(), - this.socket.getInetAddress().getHostName(), Integer.toString(this.socket.getPort()), - this.socket.getLocalAddress().getHostAddress(), Integer.toString(this.socket.getLocalPort()))); - - // we are connected! - this.connection.notifyStateChange(ConnectionState.CONNECTED, ConnectionState.CONNECTED.toString()); - this.connected = true; - - } catch (InterruptedException e) { - logger.warn("Interrupted!"); //$NON-NLS-1$ - this.closed = true; - this.connection.notifyStateChange(ConnectionState.DISCONNECTED, null); - } catch (Exception e) { - if (this.closed && e instanceof SocketException) { - logger.warn("Socket closed!"); //$NON-NLS-1$ - this.connection.notifyStateChange(ConnectionState.DISCONNECTED, null); - } else { - String msg = "Error while opening socket for inbound connection {0}: {1}"; //$NON-NLS-1$ - logger.error(MessageFormat.format(msg, this.connection.getId()), e.getMessage()); - this.connected = false; - this.connection.notifyStateChange(ConnectionState.BROKEN, e.getLocalizedMessage()); - } - } - } - } - - /** - * closes the connection HARD by calling close() on the streams and socket. All Exceptions are caught to make sure - * that the connections are cleaned up - */ - protected void closeConnection() { - - this.connected = false; - this.connection.notifyStateChange(ConnectionState.BROKEN, null); - - if (this.outputStream != null) { - try { - this.outputStream.close(); - } catch (IOException e) { - logger.error( - MessageFormat.format("Error closing OutputStream: {0}", e.getLocalizedMessage())); //$NON-NLS-1$ - } finally { - this.outputStream = null; - } - } - - if (this.inputStream != null) { - try { - this.inputStream.close(); - } catch (IOException e) { - logger.error( - MessageFormat.format("Error closing InputStream: {0}", e.getLocalizedMessage())); //$NON-NLS-1$ - } finally { - this.inputStream = null; - } - } - - if (this.socket != null) { - try { - this.socket.close(); - } catch (IOException e) { - logger.error( - MessageFormat.format("Error closing InputSocket: {0}", e.getLocalizedMessage())); //$NON-NLS-1$ - } finally { - this.socket = null; - } - - String msg = "Socket closed for inbound connection {0} at local input address {1}:{2}"; //$NON-NLS-1$ - logger.info(MessageFormat.format(msg, this.connection.getId(), this.localInputAddressS, - Integer.toString(this.localInputPort))); - } - } - - /** - *

- * Configures this {@link ServerSocketEndpoint} - *

- * gets the parameter map from the connection and reads the following parameters from the map: - *
    - *
  • localInputAddress - the local IP or Hostname to bind to for incoming connections
  • - *
  • localInputPort - the local port on which to listen for incoming connections
  • - *
  • remoteOutputAddress - the IP or Hostname of the remote client. If this value is not null, then it will be - * verified that the connecting client is connecting from this address
  • - *
  • remoteOutputPort - the port from which the remote client must connect. If this value is not null, then it - * will be verified that the connecting client is connecting from this port
  • - *
  • retry - a configured retry wait time. Default is {@link SocketEndpointConstants#RETRY}
  • - *
  • timeout - the timeout after which an idle socket is deemed dead. Default is - * {@link SocketEndpointConstants#TIMEOUT}
  • - *
  • useTimeout - if true, then the timeout is activated. default is {@link SocketEndpointConstants#USE_TIMEOUT}
  • - *
  • clearOnConnect - if true, then the after a successful connect the input is cleared by discarding all - * available bytes. This can be useful in cases where the channel is clogged with stale data. default is {@link - * SocketEndpointConstants#CLEAR_ON_CONNECT}
  • - *
- * - * @see CommunicationEndpoint#configure(CommunicationConnection, IoMessageVisitor) - */ - @Override - public void configure(CommunicationConnection connection, IoMessageVisitor messageVisitor) { - if (this.connection != null && connection.getState().compareTo(ConnectionState.INITIALIZED) > 0) { - logger.warn(MessageFormat - .format("Inbound connection {0} already configured.", connection.getId())); //$NON-NLS-1$ - return; - } - - ConnectionMessages.assertLegalMessageVisitor(this.getClass(), SocketMessageVisitor.class, messageVisitor); - this.messageVisitor = (SocketMessageVisitor) messageVisitor; - this.connection = connection; - configure(); - } - - private void configure() { - Map parameters = this.connection.getParameters(); - - this.localInputAddressS = parameters.get(SocketEndpointConstants.PARAMETER_LOCAL_INPUT_ADDRESS); - String localInputPortS = parameters.get(SocketEndpointConstants.PARAMETER_LOCAL_INPUT_PORT); - this.remoteOutputAddressS = parameters.get(SocketEndpointConstants.PARAMETER_REMOTE_OUTPUT_ADDRESS); - String remoteOutputPortS = parameters.get(SocketEndpointConstants.PARAMETER_REMOTE_OUTPUT_PORT); - - // parse local Address to InetAddress object - try { - this.localInputAddress = InetAddress.getByName(this.localInputAddressS); - } catch (UnknownHostException e) { - throw ConnectionMessages.throwInvalidParameter(ServerSocketEndpoint.class, - SocketEndpointConstants.PARAMETER_LOCAL_INPUT_ADDRESS, this.localInputAddressS); - } - - // parse local address port to integer - try { - this.localInputPort = Integer.parseInt(localInputPortS); - } catch (NumberFormatException e) { - throw ConnectionMessages.throwInvalidParameter(ServerSocketEndpoint.class, - SocketEndpointConstants.PARAMETER_LOCAL_INPUT_PORT, localInputPortS); - } - - // if remote address is not set, then we will use the localhost InetAddress - if (this.remoteOutputAddressS == null || this.remoteOutputAddressS.length() == 0) { - logger.debug("No remoteOutputAddress set. Allowing connection from any remote address"); //$NON-NLS-1$ - } else { - - // parse remote output address name to InetAddress object - try { - this.remoteOutputAddress = InetAddress.getByName(this.remoteOutputAddressS); - } catch (UnknownHostException e) { - throw ConnectionMessages.throwInvalidParameter(ServerSocketEndpoint.class, - SocketEndpointConstants.PARAMETER_REMOTE_OUTPUT_ADDRESS, this.remoteOutputAddressS); - } - - // parse remote output address port to integer - try { - this.remoteOutputPort = Integer.parseInt(remoteOutputPortS); - } catch (NumberFormatException e) { - throw ConnectionMessages.throwInvalidParameter(ServerSocketEndpoint.class, - SocketEndpointConstants.PARAMETER_REMOTE_OUTPUT_PORT, remoteOutputPortS); - } - } - - // configure retry wait time - String retryS = parameters.get(SocketEndpointConstants.PARAMETER_RETRY); - if (retryS == null || retryS.length() == 0) { - ConnectionMessages.warnUnsetParameter(ServerSocketEndpoint.class, SocketEndpointConstants.PARAMETER_RETRY, - String.valueOf(SocketEndpointConstants.RETRY)); - this.retry = SocketEndpointConstants.RETRY; - } else { - try { - this.retry = Long.parseLong(retryS); - } catch (NumberFormatException e) { - throw ConnectionMessages - .throwInvalidParameter(ServerSocketEndpoint.class, SocketEndpointConstants.PARAMETER_RETRY, - retryS); - } - } - - // configure if timeout on connection should be activated - String useTimeoutS = parameters.get(SocketEndpointConstants.PARAMETER_USE_TIMEOUT); - if (useTimeoutS == null || useTimeoutS.length() == 0) { - ConnectionMessages - .warnUnsetParameter(ServerSocketEndpoint.class, SocketEndpointConstants.PARAMETER_USE_TIMEOUT, - String.valueOf(SocketEndpointConstants.USE_TIMEOUT)); - this.useTimeout = SocketEndpointConstants.USE_TIMEOUT; - } else { - this.useTimeout = Boolean.parseBoolean(useTimeoutS); - } - - if (this.useTimeout) { - // configure timeout on connection - String timeoutS = parameters.get(SocketEndpointConstants.PARAMETER_TIMEOUT); - if (timeoutS == null || timeoutS.length() == 0) { - ConnectionMessages - .warnUnsetParameter(ServerSocketEndpoint.class, SocketEndpointConstants.PARAMETER_TIMEOUT, - String.valueOf(SocketEndpointConstants.TIMEOUT)); - this.timeout = SocketEndpointConstants.TIMEOUT; - } else { - try { - this.timeout = Integer.parseInt(timeoutS); - } catch (NumberFormatException e) { - throw ConnectionMessages.throwInvalidParameter(ServerSocketEndpoint.class, - SocketEndpointConstants.PARAMETER_TIMEOUT, timeoutS); - } - } - } - - // configure if the connection should be cleared on connect - String clearOnConnectS = parameters.get(SocketEndpointConstants.PARAMETER_CLEAR_ON_CONNECT); - if (clearOnConnectS == null || clearOnConnectS.length() == 0) { - ConnectionMessages - .warnUnsetParameter(ServerSocketEndpoint.class, SocketEndpointConstants.PARAMETER_CLEAR_ON_CONNECT, - String.valueOf(SocketEndpointConstants.CLEAR_ON_CONNECT)); - this.clearOnConnect = SocketEndpointConstants.CLEAR_ON_CONNECT; - } else { - this.clearOnConnect = Boolean.parseBoolean(clearOnConnectS); - } - } - - /** - * @return the uri as String to which this {@link ServerSocketEndpoint} is locally bound to - */ - @Override - public String getLocalUri() { - if (this.socket != null) { - InetAddress localAddress = this.socket.getLocalAddress(); - return localAddress.getHostAddress() + StringHelper.COLON + this.socket.getLocalPort(); - } else if (this.localInputAddress != null) { - return this.localInputAddress.getHostAddress() + StringHelper.COLON + this.localInputPort; - } - - return "0.0.0.0:0"; //$NON-NLS-1$ - } - - /** - * @return the uri as String from which this {@link ServerSocketEndpoint} is receiving data from - */ - @Override - public String getRemoteUri() { - if (this.socket != null) { - InetAddress remoteAddress = this.socket.getInetAddress(); - return remoteAddress.getHostAddress() + StringHelper.COLON + this.socket.getPort(); - } else if (this.remoteOutputAddressS != null) { - return this.remoteOutputAddress.getHostAddress() + StringHelper.COLON + this.remoteOutputPort; - } - - return "0.0.0.0:0"; //$NON-NLS-1$ - } - - /** - * Starts the {@link Thread} to allow incoming connections - * - * @see CommunicationEndpoint#start() - */ - @Override - public void start() { - - if (this.fatal) { - String msg = "CommunicationConnection had a fatal exception and can not yet be started. Please check log file for further information!"; //$NON-NLS-1$ - throw new ConnectionException(msg); - } - - if (this.serverThread != null) { - logger.warn(MessageFormat - .format("CommunicationConnection {0} already started.", this.connection.getId())); //$NON-NLS-1$ - } else { - // logger.info(MessageFormat.format("Enabling connection {0}...", this.connection.getId())); //$NON-NLS-1$ - this.closed = false; - - this.serverThread = new Thread(this, this.connection.getId()); - this.serverThread.start(); - } - } - - /** - * Closes any open connection and then stops the {@link Thread} disallowing incoming connections - * - * @see CommunicationEndpoint#stop() - */ - @Override - public void stop() { - closeThread(); - closeConnection(); - this.connection.notifyStateChange(ConnectionState.DISCONNECTED, ConnectionState.DISCONNECTED.toString()); - - logger.info(MessageFormat.format("Disabled connection {0}.", this.connection.getId())); //$NON-NLS-1$ - } - - @Override - public void reset() { - closeThread(); - closeConnection(); - configure(); - this.connection.notifyStateChange(ConnectionState.INITIALIZED, ConnectionState.INITIALIZED.toString()); - } - - private void closeThread() { - this.closed = true; - this.fatal = false; - - if (this.serverThread != null) { - try { - this.serverThread.interrupt(); - if (this.serverSocket != null) - this.serverSocket.close(); - this.serverThread.join(2000l); - } catch (Exception e) { - logger.error(MessageFormat.format("Exception while interrupting server thread: {0}", - e.getLocalizedMessage())); //$NON-NLS-1$ - } - - this.serverThread = null; - } - } - - /** - * Thread is listening on the ServerSocket and opens a new connection if necessary - */ - @Override - public void run() { - - while (!this.closed) { - - // bomb-proof, catches all exceptions! - try { - - // if serverSocket is null or closed, open a new server socket - if (this.serverSocket == null || this.serverSocket.isClosed()) { - - try { - // String msg = "Opening socket on {0}:{1}..."; //$NON-NLS-1$ - // logger.info(MessageFormat.format(msg, this.localInputAddress.getHostAddress(), - // Integer.toString(this.localInputPort))); - this.serverSocket = new ServerSocket(this.localInputPort, 1, this.localInputAddress); - this.serverSocket.setReuseAddress(true); - } catch (BindException e) { - logger.error( - "Fatal BindException occurred! Port is already in use, or address is illegal!"); //$NON-NLS-1$ - logger.error(e.getMessage(), e); - this.closed = true; - this.fatal = true; - - String msg = "Fatal error while binding to server socket. ServerSocket endpoint is dead"; //$NON-NLS-1$ - throw new ConnectionException(msg); - } - } - - // open the connection - openConnection(); - - // as long as connection is connected - while (checkConnection()) { - - // read and write from the connected server socket - IoMessage message = this.messageVisitor.visit(this.inputStream, this.outputStream); - if (message != null) { - this.connection.handleNewMessage(message); - } - } - - } catch (Exception e) { - if (e instanceof InterruptedException) { - logger.error("Interrupted!"); //$NON-NLS-1$ - } else { - logger.error(e.getMessage(), e); - } - this.connection.notifyStateChange(ConnectionState.BROKEN, e.getLocalizedMessage()); - } finally { - closeConnection(); - } - } - - if (!this.fatal) { - logger.warn(MessageFormat.format("CommunicationConnection {0} is not running anymore!", - this.connection.getId())); //$NON-NLS-1$ - this.connection.notifyStateChange(ConnectionState.BROKEN, null); - } else { - String msg = "CommunicationConnection {0} is broken due to a fatal exception!"; //$NON-NLS-1$ - logger.error(MessageFormat.format(msg, this.connection.getId())); - this.connection.notifyStateChange(ConnectionState.DISCONNECTED, null); - } - } - - @Override - public void simulate(IoMessage message) throws Exception { - send(message); - } - - @Override - public void send(IoMessage message) throws Exception { - String msg = "The Server Socket can not send messages, use the {0} implementation instead!"; //$NON-NLS-1$ - throw new UnsupportedOperationException(MessageFormat.format(msg, ClientSocketEndpoint.class.getName())); - } -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/tcpip/SocketEndpointConstants.java b/li.strolch.utils/src/main/java/li/strolch/communication/tcpip/SocketEndpointConstants.java deleted file mode 100644 index 6582ffcfd..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/tcpip/SocketEndpointConstants.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication.tcpip; - -/** - * Constants used in the communication classes - * - * @author Robert von Burg <eitch@eitchnet.ch> - */ -public class SocketEndpointConstants { - - public static final String PARAMETER_USE_TIMEOUT = "useTimeout"; //$NON-NLS-1$ - public static final String PARAMETER_TIMEOUT = "timeout"; //$NON-NLS-1$ - public static final String PARAMETER_RETRY = "retry"; //$NON-NLS-1$ - public static final String PARAMETER_CLEAR_ON_CONNECT = "clearOnConnect"; //$NON-NLS-1$ - public static final String PARAMETER_CONNECT_ON_START = "connectOnStart"; //$NON-NLS-1$ - public static final String PARAMETER_CLOSE_AFTER_SEND = "closeAfterSend"; //$NON-NLS-1$ - - public static final String PARAMETER_REMOTE_OUTPUT_PORT = "remoteOutputPort"; //$NON-NLS-1$ - public static final String PARAMETER_REMOTE_OUTPUT_ADDRESS = "remoteOutputAddress"; //$NON-NLS-1$ - public static final String PARAMETER_LOCAL_INPUT_PORT = "localInputPort"; //$NON-NLS-1$ - public static final String PARAMETER_LOCAL_INPUT_ADDRESS = "localInputAddress"; //$NON-NLS-1$ - - public static final String PARAMETER_LOCAL_OUTPUT_ADDRESS = "localOutputAddress"; //$NON-NLS-1$ - public static final String PARAMETER_LOCAL_OUTPUT_PORT = "localOutputPort"; //$NON-NLS-1$ - public static final String PARAMETER_REMOTE_INPUT_ADDRESS = "remoteInputAddress"; //$NON-NLS-1$ - public static final String PARAMETER_REMOTE_INPUT_PORT = "remoteInputPort"; //$NON-NLS-1$ - - /** - * Time to wait in milliseconds before reestablishing a connection. Default is 60000ms - */ - public static final long RETRY = 60000l; - - /** - * The time after which a connection is deemed dead. Value is 60000ms - */ - public static final int TIMEOUT = 60000; - - /** - * Default is to use a timeout on socket connections, thus this value is true - */ - public static final boolean USE_TIMEOUT = true; - - /** - * Default is to not clear the input socket on connect, thus this value is false - */ - public static final boolean CLEAR_ON_CONNECT = false; - - /** - * Default is to connect on start of the connection - */ - public static final boolean CONNECT_ON_START = true; - - /** - * Default is to not close after sending - */ - public static final boolean CLOSE_AFTER_SEND = false; - - /** - * Default is to disconnect after a null message is received when reading from a TCP socket, thus this value is - * true - */ - public static final boolean DISCONNECT_ON_NULL_MSG = true; - - /** - * If {@link #DISCONNECT_ON_NULL_MSG} is activated, then this is the default time used to wait before reading again, - * which is 10000ms - */ - public static final long WAIT_TIME_ON_NULL_MSG = 10000l; -} diff --git a/li.strolch.utils/src/main/java/li/strolch/communication/tcpip/SocketMessageVisitor.java b/li.strolch.utils/src/main/java/li/strolch/communication/tcpip/SocketMessageVisitor.java deleted file mode 100644 index a4d3085c1..000000000 --- a/li.strolch.utils/src/main/java/li/strolch/communication/tcpip/SocketMessageVisitor.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication.tcpip; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.net.Socket; - -import li.strolch.communication.IoMessage; -import li.strolch.communication.IoMessageVisitor; - -/** - * This {@link IoMessageVisitor} implements and endpoint connecting to a {@link Socket}. - * - * @author Robert von Burg <eitch@eitchnet.ch> - */ -public abstract class SocketMessageVisitor extends IoMessageVisitor { - - protected final String connectionId; - - public SocketMessageVisitor(String connectionId) { - this.connectionId = connectionId; - } - - public String getConnectionId() { - return this.connectionId; - } - - /** - * This method is called when a message is read from the underlying {@link Socket} - * - * @param inputStream - * the input stream to read data from - * @param outputStream - * the output stream to write data to - * - * @return the parsed {@link IoMessage} - * - * @throws Exception - * if something goes wrong - */ - public abstract IoMessage visit(DataInputStream inputStream, DataOutputStream outputStream) throws Exception; - - /** - * This method is called when a message is to be sent to the underlying connected endpoint - * - * @param inputStream - * the input stream to read data from - * @param outputStream - * the output stream to write data to - * @param message - * the message to parse - * - * @throws Exception - * of something goes wrong - */ - public abstract void visit(DataInputStream inputStream, DataOutputStream outputStream, IoMessage message) - throws Exception; -} diff --git a/li.strolch.utils/src/test/java/li/strolch/communication/AbstractEndpointTest.java b/li.strolch.utils/src/test/java/li/strolch/communication/AbstractEndpointTest.java deleted file mode 100644 index 518f79760..000000000 --- a/li.strolch.utils/src/test/java/li/strolch/communication/AbstractEndpointTest.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication; - -import static org.junit.Assert.fail; - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -public class AbstractEndpointTest { - - static final Logger logger = LoggerFactory.getLogger(FileEndpointTest.class); - - public static TestIoMessage createTestMessage(String key1, String key2, String connectionId) { - return createTestMessage(CommandKey.key(key1, key2), connectionId); - } - - @SuppressWarnings("nls") - public static TestIoMessage createTestMessage(CommandKey key, String connectionId) { - TestIoMessage msg = new TestIoMessage(UUID.randomUUID().toString(), key, connectionId); - List lines = new ArrayList<>(); - lines.add("bla"); - lines.add("foo"); - lines.add("bar"); - lines.add("bla"); - msg.setContents(lines); - return msg; - } - - protected void waitForMessage(TestConnectionObserver observer) throws InterruptedException { - long start = System.currentTimeMillis(); - while (observer.getMessage() == null) { - if (System.currentTimeMillis() - start > 2000) - fail("Connection didn't send message in 2s!"); //$NON-NLS-1$ - Thread.sleep(50); - } - } -} diff --git a/li.strolch.utils/src/test/java/li/strolch/communication/ConsoleEndpointTest.java b/li.strolch.utils/src/test/java/li/strolch/communication/ConsoleEndpointTest.java deleted file mode 100644 index cf8a0dac6..000000000 --- a/li.strolch.utils/src/test/java/li/strolch/communication/ConsoleEndpointTest.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication; - -import static org.junit.Assert.assertEquals; - -import java.util.HashMap; -import java.util.Map; - -import li.strolch.communication.console.ConsoleEndpoint; -import li.strolch.communication.console.ConsoleMessageVisitor; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; - -/** - * @author Robert von Burg <eitch@eitchnet.ch> - */ -public class ConsoleEndpointTest extends AbstractEndpointTest { - - private static final String CONNECTION_ID = "Console"; //$NON-NLS-1$ - private CommunicationConnection connection; - - @Before - public void before() { - - Map parameters = new HashMap<>(); - CommunicationEndpoint endpoint = new ConsoleEndpoint(); - ConsoleMessageVisitor messageVisitor = new ConsoleMessageVisitorExtension(); - this.connection = new CommunicationConnection(CONNECTION_ID, ConnectionMode.ON, parameters, endpoint, - messageVisitor); - this.connection.configure(); - } - - @Test - public void testConsoleEndpoint() throws InterruptedException { - - this.connection.start(); - - CommandKey key = CommandKey.key(CONNECTION_ID, "logger"); //$NON-NLS-1$ - TestIoMessage msg = createTestMessage(key, CONNECTION_ID); - - TestConnectionObserver observer = new TestConnectionObserver(); - this.connection.addConnectionObserver(key, observer); - this.connection.send(msg); - waitForMessage(observer); - - assertEquals(msg.getKey(), observer.getMessage().getKey()); - - } - - private final class ConsoleMessageVisitorExtension extends ConsoleMessageVisitor { - public ConsoleMessageVisitorExtension() { - // no-op - } - - @Override - public void visit(Logger logger, IoMessage message) throws Exception { - TestIoMessage msg = (TestIoMessage) message; - for (String line : msg.getContents()) { - logger.info(line); - } - } - } -} diff --git a/li.strolch.utils/src/test/java/li/strolch/communication/FileEndpointTest.java b/li.strolch.utils/src/test/java/li/strolch/communication/FileEndpointTest.java deleted file mode 100644 index 4bf4712e6..000000000 --- a/li.strolch.utils/src/test/java/li/strolch/communication/FileEndpointTest.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication; - -import static org.junit.Assert.assertEquals; - -import java.io.*; -import java.util.*; - -import li.strolch.communication.file.FileEndpoint; -import li.strolch.communication.file.FileEndpointMode; -import li.strolch.utils.helper.FileHelper; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -/** - * @author Robert von Burg <eitch@eitchnet.ch> - */ -public class FileEndpointTest extends AbstractEndpointTest { - - public static final String INBOUND_FILENAME = "target/test_in.txt"; //$NON-NLS-1$ - public static final String OUTBOUND_FILENAME = "target/test_out.txt"; //$NON-NLS-1$ - public static final String CONNECTION_ID = "FileTestEndpoint"; //$NON-NLS-1$ - - private CommunicationConnection connection; - - @Before - public void before() { - - new File(OUTBOUND_FILENAME).delete(); - new File(INBOUND_FILENAME).delete(); - - Map parameters = new HashMap<>(); - parameters.put(FileEndpoint.ENDPOINT_MODE, FileEndpointMode.READ_WRITE.name()); - parameters.put(FileEndpoint.INBOUND_FILENAME, INBOUND_FILENAME); - parameters.put(FileEndpoint.OUTBOUND_FILENAME, OUTBOUND_FILENAME); - - ConnectionMode mode = ConnectionMode.ON; - CommunicationEndpoint endpoint = new FileEndpoint(); - StreamMessageVisitor messageVisitor = new StreamMessageVisitorExtension(); - - this.connection = new CommunicationConnection(CONNECTION_ID, mode, parameters, endpoint, messageVisitor); - this.connection.configure(); - } - - @After - public void after() { - if (this.connection != null) - this.connection.stop(); - } - - @Test - public void testFileEndpoint() throws InterruptedException { - - String inboundFilename = new File(INBOUND_FILENAME).getName(); - String outboundFilename = new File(OUTBOUND_FILENAME).getName(); - - // send a message - this.connection.start(); - TestConnectionObserver outboundObserver = new TestConnectionObserver(); - TestIoMessage message = createTestMessage(outboundFilename, FileEndpointMode.WRITE.name(), CONNECTION_ID); - this.connection.addConnectionObserver(message.getKey(), outboundObserver); - this.connection.send(message); - - // wait till the message has been sent - waitForMessage(outboundObserver); - this.connection.stop(); - assertEquals(message.getKey(), outboundObserver.getMessage().getKey()); - - // now test reading a file - this.connection.start(); - CommandKey inboundKey = CommandKey.key(inboundFilename, FileEndpointMode.READ.name()); - TestConnectionObserver inboundObserver = new TestConnectionObserver(); - this.connection.addConnectionObserver(inboundKey, inboundObserver); - FileHelper.writeStringToFile("Hello\nWorld!", new File(INBOUND_FILENAME)); //$NON-NLS-1$ - - // wait for thread to pick up the file - waitForMessage(inboundObserver); - assertEquals(inboundKey, inboundObserver.getMessage().getKey()); - } - - public static final class StreamMessageVisitorExtension extends StreamMessageVisitor { - private String inboundFilename; - - @Override - public void configure(CommunicationConnection connection) { - super.configure(connection); - Map parameters = connection.getParameters(); - String filePath = parameters.get(FileEndpoint.INBOUND_FILENAME); - this.inboundFilename = new File(filePath).getName(); - } - - @Override - public IoMessage visit(InputStream inputStream) throws Exception { - - BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); - List lines = new ArrayList<>(); - String line; - while ((line = reader.readLine()) != null) { - lines.add(line); - } - return new TestIoMessage(UUID.randomUUID().toString(), - CommandKey.key(this.inboundFilename, FileEndpointMode.READ.name()), CONNECTION_ID, lines); - } - - @Override - public void visit(OutputStream outputStream, IoMessage message) throws Exception { - TestIoMessage msg = (TestIoMessage) message; - for (String line : msg.getContents()) { - outputStream.write(line.getBytes()); - outputStream.write('\n'); - } - } - } -} diff --git a/li.strolch.utils/src/test/java/li/strolch/communication/SimpleMessageArchiveTest.java b/li.strolch.utils/src/test/java/li/strolch/communication/SimpleMessageArchiveTest.java deleted file mode 100644 index e763fed3f..000000000 --- a/li.strolch.utils/src/test/java/li/strolch/communication/SimpleMessageArchiveTest.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication; - -import static org.junit.Assert.assertEquals; - -import java.util.*; - -import org.junit.Test; - -/** - * @author Robert von Burg <eitch@eitchnet.ch> - */ -public class SimpleMessageArchiveTest extends AbstractEndpointTest { - - @Test - public void testArchive() throws InterruptedException { - - IoMessageArchive archive = new SimpleMessageArchive(20, 5); - - CommandKey key = CommandKey.key("key1", "key2"); //$NON-NLS-1$//$NON-NLS-2$ - String connectionId = "connection1"; //$NON-NLS-1$ - - int i = 0; - for (; i < 20; i++) { - TestIoMessage msg = new TestIoMessage(UUID.randomUUID().toString(), key, connectionId); - // update the time by plus 1, otherwise the tree set does not add it - msg.setUpdated(new Date(i + 1)); - archive.archive(msg); - } - - assertEquals(20, archive.size()); - - // add one more - TestIoMessage msg = new TestIoMessage(UUID.randomUUID().toString(), key, connectionId); - msg.setUpdated(new Date(i + 1)); - archive.archive(msg); - - // validate the trimming works - assertEquals(15, archive.size()); - - // Now make sure our last element is still in the list - List all = archive.getAll(); - Collections.sort(all, new Comparator() { - @Override - public int compare(IoMessage o1, IoMessage o2) { - return o1.getUpdated().compareTo(o2.getUpdated()); - } - }); - IoMessage message = all.get(all.size() - 1); - assertEquals(msg.getId(), message.getId()); - } -} diff --git a/li.strolch.utils/src/test/java/li/strolch/communication/SocketEndpointTest.java b/li.strolch.utils/src/test/java/li/strolch/communication/SocketEndpointTest.java deleted file mode 100644 index 8784303f5..000000000 --- a/li.strolch.utils/src/test/java/li/strolch/communication/SocketEndpointTest.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication; - -import static org.junit.Assert.assertEquals; - -import java.io.BufferedReader; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.InputStreamReader; -import java.text.MessageFormat; -import java.util.*; - -import li.strolch.communication.tcpip.ClientSocketEndpoint; -import li.strolch.communication.tcpip.ServerSocketEndpoint; -import li.strolch.communication.tcpip.SocketEndpointConstants; -import li.strolch.communication.tcpip.SocketMessageVisitor; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -/** - * @author Robert von Burg <eitch@eitchnet.ch> - */ -public class SocketEndpointTest extends AbstractEndpointTest { - - private static final String PORT = "45678"; //$NON-NLS-1$ - private static final String HOST = "localhost"; //$NON-NLS-1$ - private static final String CLIENT_CONNECTION_ID = "ClientSocket"; //$NON-NLS-1$ - private static final String SERVER_CONNECTION_ID = "ServerSocket"; //$NON-NLS-1$ - private CommunicationConnection clientConnection; - private CommunicationConnection serverConnection; - - @Before - public void before() { - - { - Map parameters = new HashMap<>(); - parameters.put(SocketEndpointConstants.PARAMETER_REMOTE_INPUT_ADDRESS, HOST); - parameters.put(SocketEndpointConstants.PARAMETER_REMOTE_INPUT_PORT, PORT); - - // we close after send, so that the server can read whole lines, as that is what we are sending - parameters.put(SocketEndpointConstants.PARAMETER_CLOSE_AFTER_SEND, Boolean.TRUE.toString()); - - CommunicationEndpoint endpoint = new ClientSocketEndpoint(); - SocketMessageVisitor messageVisitor = new SocketMessageVisitorExtension(CLIENT_CONNECTION_ID); - this.clientConnection = new CommunicationConnection(CLIENT_CONNECTION_ID, ConnectionMode.ON, parameters, - endpoint, messageVisitor); - this.clientConnection.configure(); - } - - { - Map parameters = new HashMap<>(); - parameters.put(SocketEndpointConstants.PARAMETER_LOCAL_INPUT_ADDRESS, HOST); - parameters.put(SocketEndpointConstants.PARAMETER_LOCAL_INPUT_PORT, PORT); - - CommunicationEndpoint endpoint = new ServerSocketEndpoint(); - SocketMessageVisitor messageVisitor = new SocketMessageVisitorExtension(SERVER_CONNECTION_ID); - this.serverConnection = new CommunicationConnection(SERVER_CONNECTION_ID, ConnectionMode.ON, parameters, - endpoint, messageVisitor); - this.serverConnection.configure(); - } - } - - @After - public void after() { - if (this.clientConnection != null) - this.clientConnection.stop(); - if (this.serverConnection != null) - this.serverConnection.stop(); - } - - @Test - public void testSocketEndpoints() throws Exception { - - this.serverConnection.start(); - Thread.sleep(100); - this.clientConnection.start(); - - TestConnectionObserver serverObserver = new TestConnectionObserver(); - CommandKey inboundKey = CommandKey.key(SERVER_CONNECTION_ID, "lines"); //$NON-NLS-1$ - this.serverConnection.addConnectionObserver(inboundKey, serverObserver); - - TestConnectionObserver clientObserver = new TestConnectionObserver(); - CommandKey outboundKey = CommandKey.key(CLIENT_CONNECTION_ID, "lines"); //$NON-NLS-1$ - this.clientConnection.addConnectionObserver(outboundKey, clientObserver); - - TestIoMessage outboundMsg = createTestMessage(outboundKey, CLIENT_CONNECTION_ID); - this.clientConnection.send(outboundMsg); - waitForMessage(clientObserver); - assertEquals(outboundMsg.getKey(), clientObserver.getMessage().getKey()); - - waitForMessage(serverObserver); - assertEquals(inboundKey, serverObserver.getMessage().getKey()); - assertEquals(outboundMsg.getContents(), ((TestIoMessage) serverObserver.getMessage()).getContents()); - } - - private final class SocketMessageVisitorExtension extends SocketMessageVisitor { - - public SocketMessageVisitorExtension(String connectionId) { - super(connectionId); - } - - @Override - public void visit(DataInputStream inputStream, DataOutputStream outputStream, IoMessage message) - throws Exception { - TestIoMessage msg = (TestIoMessage) message; - logger.info(MessageFormat - .format("Writing {0} lines for message {1}", msg.getContents().size(), msg.getId())); //$NON-NLS-1$ - for (String line : msg.getContents()) { - outputStream.writeBytes(line); - outputStream.write('\n'); - } - outputStream.flush(); - } - - @Override - public IoMessage visit(DataInputStream inputStream, DataOutputStream outputStream) throws Exception { - - List lines = new ArrayList<>(); - - // since we are reading whole lines, we must close the stream when we read null i.e. EOF - try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { - String line; - logger.info("Reading from stream..."); //$NON-NLS-1$ - while ((line = reader.readLine()) != null) { - lines.add(line); - } - } - logger.info(MessageFormat.format("Read {0} lines from stream.", lines.size())); //$NON-NLS-1$ - - return new TestIoMessage(UUID.randomUUID().toString(), CommandKey.key(SERVER_CONNECTION_ID, "lines"), - SERVER_CONNECTION_ID, lines); //$NON-NLS-1$ - } - } -} diff --git a/li.strolch.utils/src/test/java/li/strolch/communication/TestConnectionObserver.java b/li.strolch.utils/src/test/java/li/strolch/communication/TestConnectionObserver.java deleted file mode 100644 index 040d74ad3..000000000 --- a/li.strolch.utils/src/test/java/li/strolch/communication/TestConnectionObserver.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication; - -import java.text.MessageFormat; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author Robert von Burg <eitch@eitchnet.ch> - */ -public class TestConnectionObserver implements ConnectionObserver { - - private static final Logger logger = LoggerFactory.getLogger(FileEndpointTest.class); - - private IoMessage message; - - public IoMessage getMessage() { - return this.message; - } - - @Override - public void notify(CommandKey key, IoMessage message) { - this.message = message; - logger.info(MessageFormat.format("Received message with key {0} and message {1}", key, message)); //$NON-NLS-1$ - } -} \ No newline at end of file diff --git a/li.strolch.utils/src/test/java/li/strolch/communication/TestIoMessage.java b/li.strolch.utils/src/test/java/li/strolch/communication/TestIoMessage.java deleted file mode 100644 index 8570f1e27..000000000 --- a/li.strolch.utils/src/test/java/li/strolch/communication/TestIoMessage.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright 2014 Robert von Burg - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package li.strolch.communication; - -import java.util.List; - -/** - * @author Robert von Burg <eitch@eitchnet.ch> - */ -public class TestIoMessage extends IoMessage { - - private List contents; - - public TestIoMessage(String id, CommandKey key, String connectionId) { - super(id, key, connectionId); - } - - public TestIoMessage(String id, CommandKey key, String connectionId, List contents) { - super(id, key, connectionId); - this.contents = contents; - } - - public List getContents() { - return this.contents; - } - - public void setContents(List contents) { - this.contents = contents; - } -}