[Major] Removed connection from utils (use Apache Camel)

This commit is contained in:
Robert von Burg 2019-04-05 15:22:22 +02:00
parent 1cdd53e60f
commit 961e93fa35
36 changed files with 0 additions and 4189 deletions

View File

@ -1,70 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication;
import li.strolch.utils.helper.StringHelper;
/**
* @author Robert von Burg &lt;eitch@eitchnet.ch&gt;
*/
public class CommandKey {
private final String key1;
private final String key2;
private final int hashCode;
/**
* @param key1
* key1
* @param key2
* key2
*/
public CommandKey(String key1, String key2) {
this.key1 = key1;
this.key2 = key2;
final int prime = 31;
int result = 1;
result = prime * result + ((this.key1 == null) ? 0 : this.key1.hashCode());
result = prime * result + ((this.key2 == null) ? 0 : this.key2.hashCode());
this.hashCode = result;
}
public static CommandKey key(String key1, String key2) {
return new CommandKey(key1, key2);
}
@Override
public int hashCode() {
return this.hashCode;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
CommandKey other = (CommandKey) obj;
return this.key1.equals(other.key1) && this.key2.equals(other.key2);
}
@Override
public String toString() {
return this.key1 + StringHelper.COLON + this.key2;
}
}

View File

@ -1,412 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import li.strolch.communication.IoMessage.State;
import li.strolch.utils.collections.MapOfLists;
import li.strolch.utils.dbc.DBC;
import li.strolch.utils.helper.StringHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Robert von Burg &lt;eitch@eitchnet.ch&gt;
*/
public class CommunicationConnection implements Runnable {
protected static final Logger logger = LoggerFactory.getLogger(CommunicationConnection.class);
private String id;
private ConnectionMode mode;
private Map<String, String> parameters;
private ConnectionState state;
private String stateMsg;
private BlockingDeque<IoMessage> messageQueue;
private Thread queueThread;
private volatile boolean run;
private MapOfLists<CommandKey, ConnectionObserver> connectionObservers;
private List<ConnectionStateObserver> connectionStateObservers;
private CommunicationEndpoint endpoint;
private IoMessageVisitor messageVisitor;
private IoMessageArchive archive;
public CommunicationConnection(String id, ConnectionMode mode, Map<String, String> parameters,
CommunicationEndpoint endpoint, IoMessageVisitor messageVisitor) {
DBC.PRE.assertNotEmpty("Id must be set!", id); //$NON-NLS-1$
DBC.PRE.assertNotNull("ConnectionMode must be set!", mode); //$NON-NLS-1$
DBC.PRE.assertNotNull("Paramerters must not be null!", parameters); //$NON-NLS-1$
DBC.PRE.assertNotNull("Endpoint must be set!", endpoint); //$NON-NLS-1$
DBC.PRE.assertNotNull("IoMessageVisitor must be set!", messageVisitor); //$NON-NLS-1$
this.id = id;
this.mode = mode;
this.parameters = parameters;
this.endpoint = endpoint;
this.messageVisitor = messageVisitor;
this.state = ConnectionState.CREATED;
this.stateMsg = this.state.toString();
this.messageQueue = new LinkedBlockingDeque<>();
this.connectionObservers = new MapOfLists<>();
this.connectionStateObservers = new ArrayList<>();
}
public void setArchive(IoMessageArchive archive) {
this.archive = archive;
}
public IoMessageArchive getArchive() {
return this.archive;
}
public String getId() {
return this.id;
}
public int getQueueSize() {
return this.messageQueue.size();
}
public ConnectionState getState() {
return this.state;
}
public String getStateMsg() {
return this.stateMsg;
}
public ConnectionMode getMode() {
return this.mode;
}
public Map<String, String> getParameters() {
return this.parameters;
}
public void clearQueue() {
this.messageQueue.clear();
}
public void addConnectionObserver(CommandKey key, ConnectionObserver observer) {
synchronized (this.connectionObservers) {
this.connectionObservers.addElement(key, observer);
}
}
public void removeConnectionObserver(CommandKey key, ConnectionObserver observer) {
synchronized (this.connectionObservers) {
this.connectionObservers.removeElement(key, observer);
}
}
public void addConnectionStateObserver(ConnectionStateObserver observer) {
synchronized (this.connectionStateObservers) {
this.connectionStateObservers.add(observer);
}
}
public void removeConnectionStateObserver(ConnectionStateObserver observer) {
synchronized (this.connectionStateObservers) {
this.connectionStateObservers.remove(observer);
}
}
public void notifyStateChange(ConnectionState state, String stateMsg) {
ConnectionState oldState = this.state;
String oldStateMsg = this.stateMsg;
this.state = state;
this.stateMsg = stateMsg;
List<ConnectionStateObserver> observers;
synchronized (this.connectionStateObservers) {
observers = new ArrayList<>(this.connectionStateObservers);
}
for (ConnectionStateObserver observer : observers) {
observer.notify(oldState, oldStateMsg, state, stateMsg);
}
}
public void switchMode(ConnectionMode mode) {
ConnectionMessages.assertConfigured(this, "Can not switch modes yet!"); //$NON-NLS-1$
if (mode == ConnectionMode.OFF) {
stop();
} else if (mode == ConnectionMode.ON) {
stop();
start();
}
this.mode = mode;
}
/**
* Configure the underlying {@link CommunicationEndpoint} and {@link IoMessageVisitor}
*/
public void configure() {
this.messageVisitor.configure(this);
this.endpoint.configure(this, this.messageVisitor);
notifyStateChange(ConnectionState.INITIALIZED, ConnectionState.INITIALIZED.name());
}
public void start() {
ConnectionMessages.assertConfigured(this, "Can not start yet!"); //$NON-NLS-1$
switch (this.mode) {
case OFF:
logger.info("Not connecting as mode is currently OFF"); //$NON-NLS-1$
break;
case SIMULATION:
logger.info("Started SIMULATION connection!"); //$NON-NLS-1$
break;
case ON:
if (this.queueThread != null) {
logger.warn(MessageFormat.format("{0}: Already connected!", this.id)); //$NON-NLS-1$
} else {
logger.info(MessageFormat
.format("Starting Connection {0} to {1}...", this.id, getRemoteUri())); //$NON-NLS-1$
this.run = true;
this.queueThread = new Thread(this, MessageFormat.format("{0}_OUT", this.id)); //$NON-NLS-1$
this.queueThread.start();
connectEndpoint();
}
break;
default:
logger.error("Unhandled mode " + this.mode); //$NON-NLS-1$
break;
}
}
public void stop() {
ConnectionMessages.assertConfigured(this, "Can not stop yet!"); //$NON-NLS-1$
switch (this.mode) {
case OFF:
break;
case SIMULATION:
logger.info("Disconnected SIMULATION connection!"); //$NON-NLS-1$
break;
case ON:
logger.info("Disconnecting..."); //$NON-NLS-1$
if (this.queueThread == null) {
logger.warn(MessageFormat.format("{0}: Already disconnected!", this.id)); //$NON-NLS-1$
} else {
this.run = false;
try {
disconnectEndpoint();
} catch (Exception e) {
String msg = "Caught exception while disconnecting endpoint: {0}"; //$NON-NLS-1$
logger.error(MessageFormat.format(msg, e.getLocalizedMessage()), e);
}
try {
this.queueThread.interrupt();
} catch (Exception e) {
String msg = "Caught exception while stopping queue thread: {0}"; //$NON-NLS-1$
logger.warn(MessageFormat.format(msg, e.getLocalizedMessage()));
}
String msg = "{0} is stopped"; //$NON-NLS-1$
logger.info(MessageFormat.format(msg, this.queueThread.getName()));
this.queueThread = null;
}
break;
default:
logger.error("Unhandled mode " + this.mode); //$NON-NLS-1$
break;
}
}
/**
* Called by the underlying endpoint when a new message has been received and parsed
*
* @param message
* the message to handle
*/
public void handleNewMessage(IoMessage message) {
ConnectionMessages.assertConfigured(this, "Can not be notified of new message yet!"); //$NON-NLS-1$
logger.info("Received new message " + message.getKey() + " " + message.getId() + " " + message.getState());
// if the state of the message is already later than ACCEPTED
// then an underlying component has already set the state, so
// we don't need to set it
if (message.getState().compareTo(State.ACCEPTED) < 0)
message.setState(State.ACCEPTED, StringHelper.DASH);
notifyObservers(message);
}
public void notifyObservers(IoMessage message) {
List<ConnectionObserver> observers;
synchronized (this.connectionObservers) {
List<ConnectionObserver> list = this.connectionObservers.getList(message.getKey());
if (list == null || list.isEmpty()) {
logger.info("No observers waiting for key " + message.getKey());
return;
}
observers = new ArrayList<>(list);
}
for (ConnectionObserver observer : observers) {
try {
logger.info("Notifying observer " + observer.getClass().getSimpleName());
observer.notify(message.getKey(), message);
} catch (Exception e) {
String msg = "Failed to notify observer for key {0} on message with id {1}"; //$NON-NLS-1$
logger.error(MessageFormat.format(msg, message.getKey(), message.getId()), e);
}
}
if (this.archive != null)
this.archive.archive(message);
}
@Override
public void run() {
while (this.run) {
IoMessage message = null;
try {
message = this.messageQueue.take();
logger.info(MessageFormat.format("Processing message {0}...", message.getId())); //$NON-NLS-1$
if (this.mode == ConnectionMode.ON)
this.endpoint.send(message);
else if (this.mode == ConnectionMode.SIMULATION)
this.endpoint.simulate(message);
// notify the caller that the message has been processed
if (message.getState().compareTo(State.DONE) < 0)
message.setState(State.DONE, StringHelper.DASH);
done(message);
} catch (InterruptedException e) {
logger.warn(MessageFormat.format("{0} connection has been interruped!", this.id)); //$NON-NLS-1$
// an interrupted exception means the thread must stop
this.run = false;
if (message != null) {
logger.error(MessageFormat.format("Can not send message {0}", message.getId())); //$NON-NLS-1$
message.setState(State.FATAL, e.getLocalizedMessage());
done(message);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
if (message != null) {
logger.error(MessageFormat.format("Can not send message {0}", message.getId())); //$NON-NLS-1$
message.setState(State.FATAL, e.getLocalizedMessage());
done(message);
}
} finally {
if (message != null && this.archive != null) {
this.archive.archive(message);
}
}
}
}
/**
* Called when an outgoing message has been handled. This method logs the message state and then notifies all
* observers
*
* @param message
* the message which is done
*/
public void done(IoMessage message) {
ConnectionMessages.assertConfigured(this, "Can not notify observers yet!"); //$NON-NLS-1$
switch (message.getState()) {
case ACCEPTED:
case CREATED:
case DONE:
case PENDING:
logger.info(MessageFormat.format("Sent message {0}", message.toString())); //$NON-NLS-1$
break;
case FAILED:
case FATAL:
logger.error(MessageFormat.format("Failed to send message {0}", message.toString())); //$NON-NLS-1$
break;
default:
logger.error(MessageFormat.format("Unhandled state for message {0}", message.toString())); //$NON-NLS-1$
break;
}
notifyObservers(message);
}
public String getRemoteUri() {
return this.endpoint == null ? "0.0.0.0:0" : this.endpoint.getRemoteUri(); //$NON-NLS-1$
}
public String getLocalUri() {
return this.endpoint == null ? "0.0.0.0:0" : this.endpoint.getLocalUri(); //$NON-NLS-1$
}
public void reset() {
ConnectionMessages.assertConfigured(this, "Can not resest yet!"); //$NON-NLS-1$
this.endpoint.reset();
}
/**
* Called when the connection is connected, thus the underlying endpoint can be started
*/
protected void connectEndpoint() {
this.endpoint.start();
}
/**
* Called when the connection is disconnected, thus the underlying endpoint must be stopped
*/
protected void disconnectEndpoint() {
this.endpoint.stop();
}
/**
* Send the message using the underlying endpoint. Do not change the state of the message, this will be done by the
* caller
*
* @param message
* the message to send
*/
public void send(IoMessage message) {
ConnectionMessages.assertConfigured(this, "Can not send yet"); //$NON-NLS-1$
if (this.mode == ConnectionMode.OFF)
throw ConnectionMessages.throwNotConnected(this, message);
message.setState(State.PENDING, State.PENDING.name());
this.messageQueue.add(message);
}
}

View File

@ -1,38 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication;
/**
* @author Robert von Burg &lt;eitch@eitchnet.ch&gt;
*/
public interface CommunicationEndpoint {
public void configure(CommunicationConnection connection, IoMessageVisitor converter);
public void start();
public void stop();
public void reset();
public String getLocalUri();
public String getRemoteUri();
public void send(IoMessage message) throws Exception;
public void simulate(IoMessage message) throws Exception;
}

View File

@ -1,32 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication;
/**
* Base Exception for exceptions thrown by the communication classes
*
* @author Robert von Burg &lt;eitch@eitchnet.ch&gt;
*/
public class ConnectionException extends RuntimeException {
public ConnectionException(String message, Throwable cause) {
super(message, cause);
}
public ConnectionException(String message) {
super(message);
}
}

View File

@ -1,153 +0,0 @@
/*
* Copyright (c) 2012, Robert von Burg
*
* All rights reserved.
*
* This file is part of the XXX.
*
* XXX is free software: you can redistribute
* it and/or modify it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the License,
* or (at your option) any later version.
*
* XXX is distributed in the hope that it will
* be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with XXX. If not, see
* <http://www.gnu.org/licenses/>.
*/
package li.strolch.communication;
/**
* @author Robert von Burg &lt;eitch@eitchnet.ch&gt;
*/
public class ConnectionInfo {
private String id;
private String localUri;
private String remoteUri;
private ConnectionMode mode;
private int queueSize;
private ConnectionState state;
private String stateMsg;
/**
* @return the id
*/
public String getId() {
return this.id;
}
/**
* @param id
* the id to set
*/
public void setId(String id) {
this.id = id;
}
/**
* @return the localUri
*/
public String getLocalUri() {
return this.localUri;
}
/**
* @param localUri
* the localUri to set
*/
public void setLocalUri(String localUri) {
this.localUri = localUri;
}
/**
* @return the remoteUri
*/
public String getRemoteUri() {
return this.remoteUri;
}
/**
* @param remoteUri
* the remoteUri to set
*/
public void setRemoteUri(String remoteUri) {
this.remoteUri = remoteUri;
}
/**
* @return the mode
*/
public ConnectionMode getMode() {
return this.mode;
}
/**
* @param mode
* the mode to set
*/
public void setMode(ConnectionMode mode) {
this.mode = mode;
}
/**
* @return the state
*/
public ConnectionState getState() {
return this.state;
}
/**
* @param state
* the state to set
*/
public void setState(ConnectionState state) {
this.state = state;
}
/**
* @return the stateMsg
*/
public String getStateMsg() {
return this.stateMsg;
}
/**
* @param stateMsg
* the stateMsg to set
*/
public void setStateMsg(String stateMsg) {
this.stateMsg = stateMsg;
}
/**
* @return the queueSize
*/
public int getQueueSize() {
return this.queueSize;
}
/**
* @param queueSize
* the queueSize to set
*/
public void setQueueSize(int queueSize) {
this.queueSize = queueSize;
}
public static ConnectionInfo valueOf(CommunicationConnection connection) {
ConnectionInfo info = new ConnectionInfo();
info.setId(connection.getId());
info.setLocalUri(connection.getLocalUri());
info.setRemoteUri(connection.getRemoteUri());
info.setMode(connection.getMode());
info.setState(connection.getState());
info.setStateMsg(connection.getStateMsg());
info.setQueueSize(connection.getQueueSize());
return info;
}
}

View File

@ -1,182 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
import li.strolch.utils.helper.StringHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Helper class to thrown connection messages
*
* @author Robert von Burg &lt;eitch@eitchnet.ch&gt;
*/
public class ConnectionMessages {
private static Logger logger = LoggerFactory.getLogger(ConnectionMessages.class);
/**
* Utility class
*/
private ConnectionMessages() {
//
}
/**
* Convenience method to throw an exception when an illegal {@link ConnectionState} change occurs
*
* @param current
* the current state
* @param change
* the new state
*
* @return the exception
*/
public static ConnectionException throwIllegalConnectionState(ConnectionState current, ConnectionState change) {
String msg = "The connection with state {0} cannot be changed to {1}"; //$NON-NLS-1$
msg = MessageFormat.format(msg, current.name(), change.name());
ConnectionException e = new ConnectionException(msg);
return e;
}
/**
* Convenience method to throw an exception when an invalid parameter is set in the configuration
*
* @param clazz
* clazz type
* @param parameterName
* the name of the parameter
* @param parameterValue
* the value of the parameter
*
* @return the exception
*/
public static ConnectionException throwInvalidParameter(Class<?> clazz, String parameterName,
String parameterValue) {
String value;
if (parameterValue != null && !parameterValue.isEmpty())
value = parameterValue;
else
value = StringHelper.NULL;
String msg = "{0}: parameter ''{1}'' has invalid value ''{2}''"; //$NON-NLS-1$
msg = MessageFormat.format(msg, clazz.getSimpleName(), parameterName, value);
ConnectionException e = new ConnectionException(msg);
return e;
}
/**
* Convenience method to throw an exception when an two conflicting parameters are activated
*
* @param clazz
* clazz type
* @param parameter1
* parameter 1
* @param parameter2
* parameter 2
*
* @return the exception
*/
public static ConnectionException throwConflictingParameters(Class<?> clazz, String parameter1, String parameter2) {
String msg = "{0} : The parameters {1} and {2} can not be both activated as they conflict"; //$NON-NLS-1$
msg = MessageFormat.format(msg, clazz.getSimpleName(), parameter1, parameter1);
ConnectionException e = new ConnectionException(msg);
return e;
}
/**
* Convenience method to log a warning when a parameter is not set in the configuration
*
* @param clazz
* the clazz of the warning
* @param parameterName
* the parameter name
* @param defValue
* the default value to be used
*/
public static void warnUnsetParameter(Class<?> clazz, String parameterName, String defValue) {
if (logger.isDebugEnabled()) {
String msg = "{0}: parameter ''{1}'' is not set, using default value ''{2}''"; //$NON-NLS-1$
msg = MessageFormat.format(msg, clazz.getSimpleName(), parameterName, defValue);
Map<String, String> properties = new HashMap<>();
logger.warn(MessageFormat.format(msg, properties));
}
}
/**
* Convenience method to throw an exception when the connection is not yet configured
*
* @param connection
* the connection
* @param message
* the message
*/
public static void assertConfigured(CommunicationConnection connection, String message) {
if (connection.getState() == ConnectionState.CREATED) {
String msg = "{0} : Not yet configured: {1}"; //$NON-NLS-1$
msg = MessageFormat.format(msg, connection.getId(), message);
throw new ConnectionException(msg);
}
}
/**
* Convenience method to throw an exception when the connection is not connected
*
* @param connection
* the connection
* @param message
* the message
*
* @return the exception
*/
public static ConnectionException throwNotConnected(CommunicationConnection connection, String message) {
String msg = "{0} : Not connected: {1}"; //$NON-NLS-1$
msg = MessageFormat.format(msg, connection.getId(), message);
ConnectionException e = new ConnectionException(msg);
return e;
}
/**
* Convenience method to throw an exception when the connection is not connected
*
* @param connection
* the connection
* @param message
* the message
*
* @return the exception
*/
public static ConnectionException throwNotConnected(CommunicationConnection connection, IoMessage message) {
String msg = "{0} : Not connected, can not send message with id {1}"; //$NON-NLS-1$
msg = MessageFormat.format(msg, connection.getId(), message.getId());
ConnectionException e = new ConnectionException(msg);
return e;
}
public static void assertLegalMessageVisitor(Class<? extends CommunicationEndpoint> endpoint,
Class<? extends IoMessageVisitor> expectedVisitor, IoMessageVisitor actualVisitor) {
if (!(expectedVisitor.isAssignableFrom(actualVisitor.getClass()))) {
String msg = "{0} requires {1} but has received illegal type {2}"; //$NON-NLS-1$
msg = MessageFormat
.format(msg, endpoint.getName(), expectedVisitor.getName(), actualVisitor.getClass().getName());
throw new ConnectionException(msg);
}
}
}

View File

@ -1,76 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication;
import java.io.IOException;
/**
* <p>
* The mode of an {@link CommunicationConnection} can be one of the following enum values. This makes it possible use
* the connection without starting connection and later starting the connection when required
* </p>
* The modes have the following semantics:
* <ul>
* <li>OFF - the connection can only have states {@link ConnectionState#CREATED} and {@link
* ConnectionState#INITIALIZED}
* . Trying to use the connection will throw an exception</li>
* <li>ON - the connection can be used normally</li>
* <li>SIMULATION - the same as ON, with the difference that the connection should silently drop any work</li>
* </ul>
*
* @author Robert von Burg &lt;eitch@eitchnet.ch&gt;
*/
public enum ConnectionMode {
/**
* Denotes that the {@link CommunicationConnection} is off. This means it cannot accept messages, process messages
* or do any other kind of work
*/
OFF {
@Override
public boolean isSimulation() {
return false;
}
},
/**
* Denotes that the {@link CommunicationConnection} is on. This means that the {@link CommunicationConnection}
* accepts and process messages. Any connections which need to be established will automatically be connected and
* re-established should an {@link IOException} occur
*/
ON {
@Override
public boolean isSimulation() {
return false;
}
},
/**
* Denotes that the {@link CommunicationConnection} is in simulation mode. Mostly this means that the {@link
* CommunicationConnection} accepts messages, but silently swallows them, instead of processing them
*/
SIMULATION {
@Override
public boolean isSimulation() {
return true;
}
};
/**
* @return true if the current mode is simulation, false otherwise
*/
public abstract boolean isSimulation();
}

View File

@ -1,24 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication;
/**
* @author Robert von Burg &lt;eitch@eitchnet.ch&gt;
*/
public interface ConnectionObserver {
public void notify(CommandKey key, IoMessage message);
}

View File

@ -1,54 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication;
/**
* <p>
* a {@link CommunicationConnection} undergoes a serious of state changes. These states can be viewed by a client to
* monitor the state of the connection
* </p>
* The states have the following semantics:
* <ul>
* <li>CREATED - initial state</li>
* <li>INITIALIZED - the appropriate connection parameters are found</li>
* <li>CONNECTING - the connection is trying to build up a connection</li>
* <li>WAITING - the connection is waiting before retrying to connect</li>
* <li>CONNECTED - the connection has just been established</li>
* <li>IDLE - the connection is connected, but waiting for work</li>
* <li>WORKING - the connection is working</li>
* <li>BROKEN - the connection has lost the connection and is waiting before reconnecting, or another unknown failure
* occurred</li>
* <li>DISCONNECTED - the connection has been disconnected</li>
* </ul>
*
* @author Robert von Burg &lt;eitch@eitchnet.ch&gt;
*/
public enum ConnectionState {
// initial
CREATED,
// configured and ready to connect
INITIALIZED,
// working
CONNECTING,
WAITING,
CONNECTED,
IDLE,
WORKING,
BROKEN,
// disconnected due to connection error or manual disconnect/stop
DISCONNECTED;
}

View File

@ -1,6 +0,0 @@
package li.strolch.communication;
public interface ConnectionStateObserver {
public void notify(ConnectionState oldState, String oldStateMsg, ConnectionState newState, String newStateMsg);
}

View File

@ -1,219 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import li.strolch.utils.helper.StringHelper;
import li.strolch.utils.iso8601.ISO8601FormatFactory;
/**
* <p>
* An {@link IoMessage} is the object containing the data to be transmitted in IO. Implementations of {@link
* CommunicationConnection} should implement sub classes of this method to hold the actual payload.
* </p>
*
* <p>
* This class also contains a {@link Map} to store transient meta data to the actual payload
* </p>
*
* @author Robert von Burg &lt;eitch@eitchnet.ch&gt;
*/
public class IoMessage {
private final String id;
private final CommandKey key;
private final String connectionId;
private Date updated;
private State state;
private String stateMsg;
private Map<String, Object> parameters;
/**
* @param id
* the id
* @param key
* the key
* @param connectionId
* the connection
*/
public IoMessage(String id, CommandKey key, String connectionId) {
this.id = id;
this.key = key;
this.connectionId = connectionId;
this.state = State.CREATED;
this.stateMsg = StringHelper.DASH;
this.updated = new Date();
this.parameters = new HashMap<>();
}
/**
* @return the id
*/
public String getId() {
return this.id;
}
/**
* @return the key
*/
public CommandKey getKey() {
return this.key;
}
/**
* @return the connectionId
*/
public String getConnectionId() {
return this.connectionId;
}
/**
* @return the updated
*/
public Date getUpdated() {
return this.updated;
}
/**
* Used for testing purposes only!
*
* @param date
*/
void setUpdated(Date date) {
this.updated = date;
}
/**
* @return the state
*/
public State getState() {
return this.state;
}
/**
* @return the stateMsg
*/
public String getStateMsg() {
return this.stateMsg;
}
/**
* @param state
* the state
* @param stateMsg
* the stateMsg
*/
public void setState(State state, String stateMsg) {
this.state = state;
this.stateMsg = stateMsg;
this.updated = new Date();
}
@SuppressWarnings("unchecked")
public <T> T getParam(String key) {
return (T) this.parameters.get(key);
}
/**
* Add a transient parameter to this message
*
* @param key
* the key under which the parameter is to be stored
* @param value
* the value to store under the given key
*/
public void addParam(String key, Object value) {
this.parameters.put(key, value);
}
/**
* Removes the parameter with the given key
*
* @param key
* The give of the parameter to be removed
*
* @return the removed value, or null if the object didn't exist
*/
@SuppressWarnings("unchecked")
public <T> T removeParam(String key) {
return (T) this.parameters.remove(key);
}
/**
* @return the set of parameter keys
*/
public Set<String> getParamKeys() {
return this.parameters.keySet();
}
@SuppressWarnings("nls")
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(this.getClass().getSimpleName() + " [id=");
builder.append(this.id);
builder.append(", key=");
builder.append(this.key);
builder.append(", updated=");
builder.append(ISO8601FormatFactory.getInstance().formatDate(this.updated));
builder.append(", state=");
builder.append(this.state);
if (StringHelper.isNotEmpty(this.stateMsg)) {
builder.append(", stateMsg=");
builder.append(this.stateMsg);
}
builder.append("]");
return builder.toString();
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((this.id == null) ? 0 : this.id.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
IoMessage other = (IoMessage) obj;
if (this.id == null) {
if (other.id != null)
return false;
} else if (!this.id.equals(other.id))
return false;
return true;
}
public enum State {
CREATED, // new
PENDING, // outbound
ACCEPTED, // inbound
DONE, // completed
FAILED, // completed
FATAL; // not sendable
}
}

View File

@ -1,41 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication;
import java.util.List;
public interface IoMessageArchive {
public int getMaxSize();
public void setMaxSize(int maxSize);
public int getTrimSize();
public void setTrimSize(int trimSize);
public int size();
public List<IoMessage> getAll();
public List<IoMessage> getBy(String connectionId);
public List<IoMessage> getBy(String connectionId, CommandKey key);
public void clearArchive();
public void archive(IoMessage message);
}

View File

@ -1,48 +0,0 @@
/*
* Copyright 2013 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication;
import li.strolch.communication.IoMessage.State;
/**
* @author Robert von Burg &lt;eitch@eitchnet.ch&gt;
*/
public class IoMessageStateObserver implements ConnectionObserver {
private CommandKey key;
private String messageId;
private State state;
public IoMessageStateObserver(CommandKey key, String messageId) {
this.key = key;
this.messageId = messageId;
}
@Override
public void notify(CommandKey key, IoMessage message) {
if (this.key.equals(key) && message.getId().equals(this.messageId)) {
this.state = message.getState();
synchronized (this) {
notifyAll();
}
}
}
public State getState() {
return this.state;
}
}

View File

@ -1,48 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication;
import li.strolch.communication.console.ConsoleMessageVisitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>
* Visitors to read and write {@link IoMessage} using different kind of endpoints. Different endpoints will require
* different ways of writing or reading message, thus this is not defined here. Known extensions are {@link
* ConsoleMessageVisitor}, {@link StreamMessageVisitor}.
* </p>
*
* <p>
* Concrete implementations must be thread safe!
* </p>
*
* @author Robert von Burg &lt;eitch@eitchnet.ch&gt;
*/
public abstract class IoMessageVisitor {
protected static final Logger logger = LoggerFactory.getLogger(IoMessageVisitor.class);
protected CommunicationConnection connection;
public void configure(CommunicationConnection connection) {
this.connection = connection;
}
public void simulate(IoMessage ioMessage) {
// allow for subclasses to implement
}
}

View File

@ -1,117 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication;
import java.util.*;
public class SimpleMessageArchive implements IoMessageArchive {
private int maxSize;
private int trimSize;
private TreeSet<IoMessage> messageArchive;
public SimpleMessageArchive() {
this(1000, 100);
}
public SimpleMessageArchive(int maxSize, int trimSize) {
this.maxSize = maxSize;
this.trimSize = trimSize;
this.messageArchive = new TreeSet<>(new Comparator<IoMessage>() {
@Override
public int compare(IoMessage o1, IoMessage o2) {
return o1.getUpdated().compareTo(o2.getUpdated());
}
});
}
@Override
public synchronized int getMaxSize() {
return this.maxSize;
}
@Override
public synchronized void setMaxSize(int maxSize) {
this.maxSize = maxSize;
trim();
}
@Override
public synchronized int getTrimSize() {
return this.trimSize;
}
@Override
public synchronized void setTrimSize(int trimSize) {
this.trimSize = trimSize;
}
@Override
public synchronized int size() {
return this.messageArchive.size();
}
@Override
public synchronized List<IoMessage> getAll() {
List<IoMessage> all = new ArrayList<>(this.messageArchive);
return all;
}
@Override
public synchronized List<IoMessage> getBy(String connectionId) {
List<IoMessage> all = new ArrayList<>();
for (IoMessage msg : this.messageArchive) {
if (msg.getConnectionId().equals(connectionId))
all.add(msg);
}
return all;
}
@Override
public synchronized List<IoMessage> getBy(String connectionId, CommandKey key) {
List<IoMessage> all = new ArrayList<>();
for (IoMessage msg : this.messageArchive) {
if (msg.getConnectionId().equals(connectionId) && msg.getKey().equals(key))
all.add(msg);
}
return all;
}
@Override
public synchronized void clearArchive() {
this.messageArchive.clear();
}
@Override
public synchronized void archive(IoMessage message) {
this.messageArchive.add(message);
trim();
}
protected void trim() {
if (this.messageArchive.size() <= this.maxSize)
return;
Iterator<IoMessage> iter = this.messageArchive.iterator();
for (int i = 0; i <= this.trimSize; i++) {
if (iter.hasNext()) {
iter.next();
iter.remove();
}
}
}
}

View File

@ -1,37 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication;
import java.io.InputStream;
import java.io.OutputStream;
/**
* <p>
* {@link IoMessageVisitor} to read or write using IO Streams.
* </p>
*
* <p>
* Concrete implementations must be thread safe!
* </p>
*
* @author Robert von Burg &lt;eitch@eitchnet.ch&gt;
*/
public abstract class StreamMessageVisitor extends IoMessageVisitor {
public abstract void visit(OutputStream outputStream, IoMessage message) throws Exception;
public abstract IoMessage visit(InputStream inputStream) throws Exception;
}

View File

@ -1,134 +0,0 @@
package li.strolch.communication.chat;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.text.MessageFormat;
import java.util.Enumeration;
import li.strolch.utils.helper.StringHelper;
public class Chat {
public static void main(String[] args) {
if (args.length < 4)
printIllegalArgsAndExit(args);
if (args[0].equals("server")) { //$NON-NLS-1$
startServer(args);
} else if (args[0].equals("client")) { //$NON-NLS-1$
startClient(args);
}
}
private static void startServer(String[] args) {
// server
InetAddress host;
String hostS = args[1];
try {
host = InetAddress.getByName(hostS);
} catch (UnknownHostException e1) {
System.err.println(MessageFormat.format("Illegal server address: {0}", hostS)); //$NON-NLS-1$
printIllegalArgsAndExit(args);
return;
}
boolean isHostAddress = false;
try {
for (Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces(); interfaces
.hasMoreElements(); ) {
NetworkInterface iface = interfaces.nextElement();
for (Enumeration<InetAddress> addresses = iface.getInetAddresses(); addresses.hasMoreElements(); ) {
InetAddress inetAddress = addresses.nextElement();
if (inetAddress.getHostAddress().equals(host.getHostAddress()))
isHostAddress = true;
}
}
} catch (SocketException e) {
System.err.println("Oops: " + e.getMessage()); //$NON-NLS-1$
}
if (!isHostAddress) {
System.err.println(
MessageFormat.format("The address {0} is not an address of this host!", hostS)); //$NON-NLS-1$
printIllegalArgsAndExit(args);
}
// port
int port;
String portS = args[2];
try {
port = Integer.parseInt(portS);
} catch (NumberFormatException e) {
System.err.println(MessageFormat.format("Illegal port: {0}", portS)); //$NON-NLS-1$
printIllegalArgsAndExit(args);
return;
}
if (port < 1 || port > 65535) {
System.err.println(MessageFormat.format("Illegal port: {0}", port)); //$NON-NLS-1$
printIllegalArgsAndExit(args);
return;
}
// username
String username = args[3];
// start
ChatServer chatServer = new ChatServer(host, port, username);
chatServer.start();
}
private static void startClient(String[] args) {
// server
InetAddress host;
String hostS = args[1];
try {
host = InetAddress.getByName(hostS);
} catch (UnknownHostException e1) {
System.err.println(MessageFormat.format("Illegal server address: {0}", hostS)); //$NON-NLS-1$
printIllegalArgsAndExit(args);
return;
}
// port
int port;
String portS = args[2];
try {
port = Integer.parseInt(portS);
} catch (NumberFormatException e) {
System.err.println(MessageFormat.format("Illegal port: {0}", portS)); //$NON-NLS-1$
printIllegalArgsAndExit(args);
return;
}
if (port < 1 || port > 65535) {
System.err.println(MessageFormat.format("Illegal port: {0}", port)); //$NON-NLS-1$
printIllegalArgsAndExit(args);
return;
}
// username
String username = args[3];
// start
ChatClient chatClient = new ChatClient(host, port, username);
chatClient.start();
}
private static void printIllegalArgsAndExit(String[] args) {
System.err.print("Illegal arguments: "); //$NON-NLS-1$
if (args.length == 0) {
System.err.print("(none)"); //$NON-NLS-1$
} else {
for (String arg : args) {
System.err.print(arg + StringHelper.SPACE);
}
}
System.err.println();
System.err.println("Usage: java ...Chat server <local_address> <port> <username>"); //$NON-NLS-1$
System.err.println("Usage: java ...Chat client <server> <port> <username>"); //$NON-NLS-1$
System.exit(1);
}
}

View File

@ -1,87 +0,0 @@
package li.strolch.communication.chat;
import static li.strolch.communication.chat.ChatMessageVisitor.inboundKey;
import static li.strolch.communication.chat.ChatMessageVisitor.outboundKey;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
import li.strolch.communication.*;
import li.strolch.communication.tcpip.ClientSocketEndpoint;
import li.strolch.communication.tcpip.SocketEndpointConstants;
import li.strolch.communication.tcpip.SocketMessageVisitor;
public class ChatClient implements ConnectionObserver, ConnectionStateObserver {
private static final String id = "ChatClient"; //$NON-NLS-1$
private InetAddress host;
private int port;
private String username;
private boolean connected;
public ChatClient(InetAddress host, int port, String username) {
this.host = host;
this.port = port;
this.username = username;
}
public void start() {
ConnectionMode mode = ConnectionMode.ON;
Map<String, String> parameters = new HashMap<>();
parameters.put(SocketEndpointConstants.PARAMETER_RETRY, "10000"); //$NON-NLS-1$
parameters.put(SocketEndpointConstants.PARAMETER_USE_TIMEOUT, Boolean.FALSE.toString());
parameters.put(SocketEndpointConstants.PARAMETER_REMOTE_INPUT_ADDRESS, this.host.getHostAddress());
parameters.put(SocketEndpointConstants.PARAMETER_REMOTE_INPUT_PORT, Integer.toString(this.port));
parameters.put(SocketEndpointConstants.PARAMETER_CLOSE_AFTER_SEND, Boolean.FALSE.toString());
SocketMessageVisitor messageVisitor = new ChatMessageVisitor(id);
ClientSocketEndpoint endpoint = new ClientSocketEndpoint();
CommunicationConnection connection = new CommunicationConnection(id, mode, parameters, endpoint,
messageVisitor);
connection.addConnectionObserver(outboundKey, this);
connection.addConnectionObserver(inboundKey, this);
connection.addConnectionStateObserver(this);
connection.configure();
connection.start();
while (!this.connected) {
synchronized (this) {
try {
this.wait(2000l);
} catch (InterruptedException e) {
System.err.println("oops: " + e.getMessage()); //$NON-NLS-1$
}
}
}
System.out.println("Connected. Send messages to user:"); //$NON-NLS-1$
while (true) {
@SuppressWarnings("resource")
Scanner in = new Scanner(System.in);
//System.out.print(this.username + ": ");
String line = in.nextLine();
connection.send(ChatIoMessage.msg(outboundKey, id, this.username, line));
}
}
@Override
public void notify(CommandKey key, IoMessage message) {
if (key.equals(inboundKey)) {
ChatIoMessage chatIoMessage = (ChatIoMessage) message;
System.out.println(chatIoMessage.getChatMsg());
}
}
@Override
public void notify(ConnectionState oldState, String oldStateMsg, ConnectionState newState, String newStateMsg) {
this.connected = newState == ConnectionState.CONNECTED || newState == ConnectionState.IDLE
|| newState == ConnectionState.WORKING;
synchronized (this) {
notifyAll();
}
}
}

View File

@ -1,31 +0,0 @@
package li.strolch.communication.chat;
import li.strolch.communication.CommandKey;
import li.strolch.communication.IoMessage;
import li.strolch.utils.helper.StringHelper;
public class ChatIoMessage extends IoMessage {
private String chatMsg;
public ChatIoMessage(String id, CommandKey key, String connectionId) {
super(id, key, connectionId);
}
public String getChatMsg() {
return this.chatMsg;
}
public void setChatMsg(String chagMsg) {
this.chatMsg = chagMsg;
}
public static ChatIoMessage msg(CommandKey key, String connId, String username, String msg) {
String line = username + StringHelper.COLON + StringHelper.SPACE + msg;
ChatIoMessage chatIoMessage = new ChatIoMessage(StringHelper.getUniqueId(), key, connId);
chatIoMessage.setChatMsg(line);
return chatIoMessage;
}
}

View File

@ -1,44 +0,0 @@
package li.strolch.communication.chat;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import li.strolch.communication.CommandKey;
import li.strolch.communication.IoMessage;
import li.strolch.communication.tcpip.SocketMessageVisitor;
import li.strolch.utils.helper.StringHelper;
public class ChatMessageVisitor extends SocketMessageVisitor {
public static final CommandKey inboundKey = CommandKey.key("server", "msg"); //$NON-NLS-1$//$NON-NLS-2$
public static final CommandKey outboundKey = CommandKey.key("client", "msg"); //$NON-NLS-1$ //$NON-NLS-2$
public ChatMessageVisitor(String connectionId) {
super(connectionId);
}
@Override
public IoMessage visit(DataInputStream inputStream, DataOutputStream outputStream) throws Exception {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
String line = bufferedReader.readLine();
if (line == null) {
bufferedReader.close();
return null;
}
ChatIoMessage chatIoMessage = new ChatIoMessage(StringHelper.getUniqueId(), inboundKey, this.connectionId);
chatIoMessage.setChatMsg(line);
return chatIoMessage;
}
@Override
public void visit(DataInputStream inputStream, DataOutputStream outputStream, IoMessage message) throws Exception {
ChatIoMessage chatIoMessage = (ChatIoMessage) message;
outputStream.writeChars(chatIoMessage.getChatMsg());
outputStream.writeChar('\n');
outputStream.flush();
}
}

View File

@ -1,88 +0,0 @@
package li.strolch.communication.chat;
import static li.strolch.communication.chat.ChatMessageVisitor.inboundKey;
import static li.strolch.communication.chat.ChatMessageVisitor.outboundKey;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
import li.strolch.communication.*;
import li.strolch.communication.tcpip.ServerSocketEndpoint;
import li.strolch.communication.tcpip.SocketEndpointConstants;
import li.strolch.communication.tcpip.SocketMessageVisitor;
public class ChatServer implements ConnectionObserver, ConnectionStateObserver {
private static final String id = "ChatServer"; //$NON-NLS-1$
private InetAddress address;
private int port;
private String username;
private boolean connected;
public ChatServer(InetAddress address, int port, String username) {
this.address = address;
this.port = port;
this.username = username;
}
public void start() {
ConnectionMode mode = ConnectionMode.ON;
Map<String, String> parameters = new HashMap<>();
parameters.put(SocketEndpointConstants.PARAMETER_RETRY, "10000"); //$NON-NLS-1$
parameters.put(SocketEndpointConstants.PARAMETER_USE_TIMEOUT, Boolean.FALSE.toString());
parameters.put(SocketEndpointConstants.PARAMETER_LOCAL_INPUT_ADDRESS, this.address.getHostAddress());
parameters.put(SocketEndpointConstants.PARAMETER_LOCAL_INPUT_PORT, Integer.toString(this.port));
parameters.put(SocketEndpointConstants.PARAMETER_CLOSE_AFTER_SEND, Boolean.FALSE.toString());
parameters.put(SocketEndpointConstants.PARAMETER_CLOSE_AFTER_SEND, Boolean.FALSE.toString());
SocketMessageVisitor messageVisitor = new ChatMessageVisitor(id);
ServerSocketEndpoint endpoint = new ServerSocketEndpoint();
CommunicationConnection connection = new CommunicationConnection(id, mode, parameters, endpoint,
messageVisitor);
connection.addConnectionObserver(outboundKey, this);
connection.addConnectionObserver(inboundKey, this);
connection.addConnectionStateObserver(this);
connection.configure();
connection.start();
while (!this.connected) {
synchronized (this) {
try {
this.wait(2000l);
} catch (InterruptedException e) {
System.err.println("oops: " + e.getMessage()); //$NON-NLS-1$
}
}
}
System.out.println("Connected. Send messages to user:"); //$NON-NLS-1$
while (true) {
@SuppressWarnings("resource")
Scanner in = new Scanner(System.in);
//System.out.print(this.username + ": ");
String line = in.nextLine();
connection.send(ChatIoMessage.msg(outboundKey, id, this.username, line));
}
}
@Override
public void notify(CommandKey key, IoMessage message) {
if (key.equals(inboundKey)) {
ChatIoMessage chatIoMessage = (ChatIoMessage) message;
System.out.println(chatIoMessage.getChatMsg());
}
}
@Override
public void notify(ConnectionState oldState, String oldStateMsg, ConnectionState newState, String newStateMsg) {
this.connected = newState == ConnectionState.CONNECTED || newState == ConnectionState.IDLE
|| newState == ConnectionState.WORKING;
synchronized (this) {
notifyAll();
}
}
}

View File

@ -1,77 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication.console;
import li.strolch.communication.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Robert von Burg &lt;eitch@eitchnet.ch&gt;
*/
public class ConsoleEndpoint implements CommunicationEndpoint {
private static final Logger logger = LoggerFactory.getLogger(ConsoleEndpoint.class);
private CommunicationConnection connection;
private ConsoleMessageVisitor messageVisitor;
@Override
public void configure(CommunicationConnection connection, IoMessageVisitor messageVisitor) {
this.connection = connection;
ConnectionMessages.assertLegalMessageVisitor(this.getClass(), ConsoleMessageVisitor.class, messageVisitor);
this.messageVisitor = (ConsoleMessageVisitor) messageVisitor;
}
@Override
public void start() {
this.connection.notifyStateChange(ConnectionState.IDLE, ConnectionState.IDLE.toString());
}
@Override
public void stop() {
this.connection.notifyStateChange(ConnectionState.DISCONNECTED, ConnectionState.DISCONNECTED.toString());
}
@Override
public void reset() {
this.connection.notifyStateChange(ConnectionState.INITIALIZED, ConnectionState.INITIALIZED.toString());
}
@Override
public String getLocalUri() {
return "console"; //$NON-NLS-1$
}
@Override
public String getRemoteUri() {
return "console"; //$NON-NLS-1$
}
@Override
public void send(IoMessage message) throws Exception {
this.connection.notifyStateChange(ConnectionState.WORKING, ConnectionState.WORKING.toString());
try {
this.messageVisitor.visit(logger, message);
} finally {
this.connection.notifyStateChange(ConnectionState.IDLE, ConnectionState.IDLE.toString());
}
}
@Override
public void simulate(IoMessage message) throws Exception {
send(message);
}
}

View File

@ -1,25 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication.console;
import li.strolch.communication.IoMessage;
import li.strolch.communication.IoMessageVisitor;
import org.slf4j.Logger;
public abstract class ConsoleMessageVisitor extends IoMessageVisitor {
public abstract void visit(Logger logger, IoMessage message) throws Exception;
}

View File

@ -1,245 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication.file;
import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.text.MessageFormat;
import java.util.Map;
import li.strolch.communication.*;
import li.strolch.utils.helper.StringHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An {@link CommunicationEndpoint} which writes and/or reads from a designated file
*
* @author Robert von Burg &lt;eitch@eitchnet.ch&gt;
*/
public class FileEndpoint implements CommunicationEndpoint, Runnable {
public static final String ENDPOINT_MODE = "endpointMode"; //$NON-NLS-1$
public static final String INBOUND_FILENAME = "inboundFilename"; //$NON-NLS-1$
public static final String OUTBOUND_FILENAME = "outboundFilename"; //$NON-NLS-1$
public static final long POLL_TIME = 1000l;
private static final Logger logger = LoggerFactory.getLogger(FileEndpoint.class);
private CommunicationConnection connection;
private FileEndpointMode endpointMode;
private String inboundFilename;
private String outboundFilename;
private Thread thread;
private boolean run = false;
private StreamMessageVisitor messageVisitor;
/**
* {@link FileEndpoint} needs the following parameters on the configuration to be initialized
* <ul>
* <li>outboundFilename: the file name where the {@link IoMessage} contents are written to. The value may contain
* {@link System#getProperty(String)} place holders which will be evaluated</li>
* </ul>
*/
@Override
public void configure(CommunicationConnection connection, IoMessageVisitor messageVisitor) {
this.connection = connection;
ConnectionMessages.assertLegalMessageVisitor(this.getClass(), StreamMessageVisitor.class, messageVisitor);
this.messageVisitor = (StreamMessageVisitor) messageVisitor;
configure();
}
private void configure() {
Map<String, String> parameters = this.connection.getParameters();
String endpointModeS = parameters.get(ENDPOINT_MODE);
if (StringHelper.isEmpty(endpointModeS)) {
throw ConnectionMessages.throwInvalidParameter(FileEndpoint.class, ENDPOINT_MODE, endpointModeS);
}
try {
this.endpointMode = FileEndpointMode.valueOf(endpointModeS);
} catch (Exception e) {
throw ConnectionMessages.throwInvalidParameter(FileEndpoint.class, ENDPOINT_MODE, endpointModeS);
}
if (this.endpointMode.isRead()) {
this.inboundFilename = parameters.get(INBOUND_FILENAME);
if (StringHelper.isEmpty(this.inboundFilename)) {
throw ConnectionMessages
.throwInvalidParameter(FileEndpoint.class, INBOUND_FILENAME, this.inboundFilename);
}
}
if (this.endpointMode.isWrite()) {
this.outboundFilename = parameters.get(OUTBOUND_FILENAME);
if (StringHelper.isEmpty(this.outboundFilename)) {
throw ConnectionMessages
.throwInvalidParameter(FileEndpoint.class, OUTBOUND_FILENAME, this.outboundFilename);
}
}
}
@Override
public String getLocalUri() {
return new File(this.inboundFilename).getAbsolutePath();
}
@Override
public String getRemoteUri() {
return new File(this.outboundFilename).getAbsolutePath();
}
@Override
public void start() {
if (this.endpointMode.isRead()) {
this.thread = new Thread(this, new File(this.inboundFilename).getName());
this.run = true;
this.thread.start();
}
this.connection.notifyStateChange(ConnectionState.IDLE, ConnectionState.IDLE.toString());
}
@Override
public void stop() {
stopThread();
this.connection.notifyStateChange(ConnectionState.DISCONNECTED, ConnectionState.DISCONNECTED.toString());
}
@Override
public void reset() {
stopThread();
configure();
this.connection.notifyStateChange(ConnectionState.INITIALIZED, ConnectionState.INITIALIZED.toString());
}
private void stopThread() {
this.run = false;
if (this.thread != null) {
try {
this.thread.interrupt();
this.thread.join(2000l);
} catch (Exception e) {
logger.error(MessageFormat
.format("Error while interrupting thread: {0}", e.getLocalizedMessage())); //$NON-NLS-1$
}
this.thread = null;
}
}
@Override
public void send(IoMessage message) throws Exception {
if (!this.endpointMode.isWrite()) {
String msg = "FileEnpoint mode is {0} and thus write is not allowed!"; //$NON-NLS-1$
msg = MessageFormat.format(msg, this.endpointMode);
throw new ConnectionException(msg);
}
this.connection.notifyStateChange(ConnectionState.WORKING, ConnectionState.WORKING.toString());
// open the stream
try (OutputStream outputStream = Files.newOutputStream(Paths.get(this.outboundFilename))) {
// write the message using the visitor
this.messageVisitor.visit(outputStream, message);
} finally {
this.connection.notifyStateChange(ConnectionState.IDLE, ConnectionState.IDLE.toString());
}
}
@Override
public void simulate(IoMessage message) throws Exception {
this.messageVisitor.simulate(message);
}
@Override
public void run() {
File file = new File(this.inboundFilename);
long lastModified = 0l;
logger.info("Starting..."); //$NON-NLS-1$
while (this.run) {
try {
if (file.canRead()) {
long tmpModified = file.lastModified();
if (tmpModified > lastModified) {
logger.info(MessageFormat.format("Handling file {0}", file.getAbsolutePath())); //$NON-NLS-1$
this.connection.notifyStateChange(ConnectionState.WORKING, ConnectionState.WORKING.toString());
// file is changed
lastModified = tmpModified;
// read the file
handleFile(file);
this.connection.notifyStateChange(ConnectionState.IDLE, ConnectionState.IDLE.toString());
}
}
if (this.run) {
this.connection.notifyStateChange(ConnectionState.WAITING, ConnectionState.WAITING.toString());
try {
synchronized (this) {
this.wait(POLL_TIME);
}
} catch (InterruptedException e) {
this.run = false;
logger.info("Interrupted!"); //$NON-NLS-1$
}
}
} catch (Exception e) {
logger.error(MessageFormat.format("Error reading file: {0}", file.getAbsolutePath())); //$NON-NLS-1$
logger.error(e.getMessage(), e);
this.connection.notifyStateChange(ConnectionState.BROKEN, e.getLocalizedMessage());
}
}
}
/**
* Reads the file and handle using {@link StreamMessageVisitor}
*
* @param file
* the {@link File} to read
*/
protected void handleFile(File file) throws Exception {
try (InputStream inputStream = Files.newInputStream(file.toPath())) {
// convert the object to an integration message
IoMessage message = this.messageVisitor.visit(inputStream);
// and forward to the connection
if (message != null) {
this.connection.handleNewMessage(message);
}
}
}
}

View File

@ -1,37 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication.file;
public enum FileEndpointMode {
READ(true, false),
WRITE(false, true),
READ_WRITE(true, true);
private boolean read;
private boolean write;
private FileEndpointMode(boolean read, boolean write) {
this.read = read;
this.write = write;
}
public boolean isRead() {
return this.read;
}
public boolean isWrite() {
return this.write;
}
}

View File

@ -1,544 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication.tcpip;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.text.MessageFormat;
import java.util.Map;
import li.strolch.communication.*;
import li.strolch.communication.IoMessage.State;
import li.strolch.utils.helper.ExceptionHelper;
import li.strolch.utils.helper.StringHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>
* This {@link CommunicationEndpoint} is an abstract implementation with everything needed to connect through a {@link
* Socket} to a remote server which is listening for incoming {@link Socket} connections. This {@link Socket} endpoint
* can send messages to the remote side, as well as receive messages from the remote side
* </p>
* <p>
* This endpoint is maintained as a client connection. This means that this endpoint opens the {@link Socket} to the
* remote server
* </p>
*
* @author Robert von Burg &lt;eitch@eitchnet.ch&gt;
*/
public class ClientSocketEndpoint implements CommunicationEndpoint {
protected static final Logger logger = LoggerFactory.getLogger(ClientSocketEndpoint.class);
// state variables
private boolean connected;
private boolean closed;
private long lastConnect;
private boolean useTimeout;
private int timeout;
private long retry;
private boolean clearOnConnect;
private boolean connectOnStart;
private boolean closeAfterSend;
// remote address
private String remoteInputAddressS;
private int remoteInputPort;
private String localOutputAddressS;
private int localOutputPort;
private InetAddress remoteInputAddress;
private InetAddress localOutputAddress;
// connection
private Socket socket;
protected DataOutputStream outputStream;
protected DataInputStream inputStream;
protected CommunicationConnection connection;
protected SocketMessageVisitor messageVisitor;
/**
* Default constructor
*/
public ClientSocketEndpoint() {
this.connected = false;
this.closed = true;
}
/**
* Checks the state of the connection and returns true if {@link Socket} is connected and ready for transmission,
* false otherwise
*
* @return true if {@link Socket} is connected and ready for transmission, false otherwise
*/
protected boolean checkConnection() {
return !this.closed && this.connected && (this.socket != null && !this.socket.isClosed() && this.socket
.isBound() && this.socket.isConnected() && !this.socket.isInputShutdown() && !this.socket
.isOutputShutdown());
}
/**
* Establishes a {@link Socket} connection to the remote server. This method blocks till a connection is
* established. In the event of a connection failure, the method waits a configured time before retrying
*/
protected void openConnection() {
ConnectionState state = this.connection.getState();
// do not allow connecting if state is
// - CREATED
// - CONNECTING
// - WAITING
// - CLOSED
if (state == ConnectionState.CREATED || state == ConnectionState.CONNECTING || state == ConnectionState.WAITING
|| state == ConnectionState.DISCONNECTED) {
ConnectionMessages.throwIllegalConnectionState(state, ConnectionState.CONNECTING);
}
// first close the connection
closeConnection();
while (!this.connected && !this.closed) {
try {
this.connection.notifyStateChange(ConnectionState.CONNECTING, ConnectionState.CONNECTING.toString());
// only try in proper intervals
long currentTime = System.currentTimeMillis();
long timeDifference = currentTime - this.lastConnect;
if (timeDifference < this.retry) {
long wait = (this.retry - timeDifference);
logger.info(MessageFormat.format("Waiting: {0}ms", wait)); //$NON-NLS-1$
this.connection.notifyStateChange(ConnectionState.WAITING, ConnectionState.WAITING.toString());
Thread.sleep(wait);
this.connection
.notifyStateChange(ConnectionState.CONNECTING, ConnectionState.CONNECTING.toString());
}
// don't try and connect if we are closed!
if (this.closed) {
logger.error("The connection has been closed and can not be connected"); //$NON-NLS-1$
closeConnection();
this.connection.notifyStateChange(ConnectionState.DISCONNECTED, null);
return;
}
// keep track of the time of this connection attempt
this.lastConnect = System.currentTimeMillis();
// open the socket
if (this.localOutputAddress != null) {
String msg = "Opening connection to {0}:{1} from {2}:{3}..."; //$NON-NLS-1$
logger.info(MessageFormat.format(msg, this.remoteInputAddress.getHostAddress(),
Integer.toString(this.remoteInputPort), this.localOutputAddress.getHostAddress(),
Integer.toString(this.localOutputPort)));
this.socket = new Socket(this.remoteInputAddress, this.remoteInputPort, this.localOutputAddress,
this.localOutputPort);
} else {
String msg = "Opening connection to {0}:{1}..."; //$NON-NLS-1$
logger.info(MessageFormat.format(msg, this.remoteInputAddress.getHostAddress(),
Integer.toString(this.remoteInputPort)));
this.socket = new Socket(this.remoteInputAddress, this.remoteInputPort);
}
// configure the socket
if (logger.isDebugEnabled()) {
String msg = "BufferSize (send/read): {0} / {1} SoLinger: {2} TcpNoDelay: {3}"; //$NON-NLS-1$
logger.info(MessageFormat
.format(msg, this.socket.getSendBufferSize(), this.socket.getReceiveBufferSize(),
this.socket.getSoLinger(), this.socket.getTcpNoDelay()));
}
//outputSocket.setSendBufferSize(1);
//outputSocket.setSoLinger(true, 0);
//outputSocket.setTcpNoDelay(true);
// activate connection timeout
if (this.useTimeout) {
this.socket.setSoTimeout(this.timeout);
}
// get the streams
this.outputStream = new DataOutputStream(this.socket.getOutputStream());
this.inputStream = new DataInputStream(this.socket.getInputStream());
if (this.clearOnConnect) {
// clear the input stream
int available = this.inputStream.available();
logger.info(MessageFormat.format("clearOnConnect: skipping {0} bytes.", available)); //$NON-NLS-1$
this.inputStream.skip(available);
}
String msg = "Connected {0}: {1}:{2} with local side {3}:{4}"; //$NON-NLS-1$
logger.info(MessageFormat.format(msg, this.connection.getId(), this.remoteInputAddressS,
Integer.toString(this.remoteInputPort), this.socket.getLocalAddress().getHostAddress(),
Integer.toString(this.socket.getLocalPort())));
// we are connected!
this.connection.notifyStateChange(ConnectionState.CONNECTED, ConnectionState.CONNECTED.toString());
this.connected = true;
} catch (InterruptedException e) {
logger.info("Interrupted!"); //$NON-NLS-1$
this.closed = true;
this.connection.notifyStateChange(ConnectionState.DISCONNECTED, null);
} catch (Exception e) {
String msg = "Error while connecting to {0}:{1}: {2}"; //$NON-NLS-1$
logger.error(MessageFormat.format(msg, this.remoteInputAddressS, Integer.toString(this.remoteInputPort),
ExceptionHelper.formatExceptionMessage(e)));
this.connection.notifyStateChange(ConnectionState.BROKEN, e.getLocalizedMessage());
}
}
}
/**
* closes the connection HARD by calling close() on the streams and socket. All Exceptions are caught to make sure
* that the connections are cleaned up
*/
protected void closeConnection() {
this.connected = false;
this.connection.notifyStateChange(ConnectionState.BROKEN, null);
if (this.outputStream != null) {
try {
this.outputStream.close();
} catch (IOException e) {
logger.error(
MessageFormat.format("Error closing OutputStream: {0}", e.getLocalizedMessage())); //$NON-NLS-1$
} finally {
this.outputStream = null;
}
}
if (this.inputStream != null) {
try {
this.inputStream.close();
} catch (IOException e) {
logger.error(
MessageFormat.format("Error closing InputStream: {0}", e.getLocalizedMessage())); //$NON-NLS-1$
} finally {
this.inputStream = null;
}
}
if (this.socket != null) {
try {
this.socket.close();
} catch (IOException e) {
logger.error(
MessageFormat.format("Error closing OutputSocket: {0}", e.getLocalizedMessage())); //$NON-NLS-1$
} finally {
this.socket = null;
}
String msg = "Socket closed for connection {0} at remote input address {1}:{2}"; //$NON-NLS-1$
logger.info(MessageFormat.format(msg, this.connection.getId(), this.remoteInputAddressS,
Integer.toString(this.remoteInputPort)));
}
}
/**
* <p>
* Configures this {@link ClientSocketEndpoint}
* </p>
* gets the parameter map from the connection and reads the following parameters from the map:
* <ul>
* <li>remoteInputAddress - the IP or Hostname of the remote server</li>
* <li>remoteInputPort - the port to which the socket should be established</li>
* <li>localOutputAddress - the IP or Hostname of the local server (if null, then the network layer will
* decide)</li>
* <li>localOutputPort - the local port from which the socket should go out of (if null, then the network layer
* will
* decide)</li>
* <li>retry - a configured retry wait time. Default is {@link SocketEndpointConstants#RETRY}</li>
* <li>timeout - the timeout after which an idle socket is deemed dead. Default is
* {@link SocketEndpointConstants#TIMEOUT}</li>
* <li>useTimeout - if true, then the timeout is activated, otherwise it is. default is
* {@link SocketEndpointConstants#USE_TIMEOUT}</li>
* <li>clearOnConnect - if true, then the after a successful connect the input is cleared by discarding all
* available bytes. This can be useful in cases where the channel is clogged with stale data. default is {@link
* SocketEndpointConstants#CLEAR_ON_CONNECT}</li>
* <li>connectOnStart - if true, then when the connection is started, the connection to the remote address is
* attempted. default is {@link SocketEndpointConstants#CONNECT_ON_START}
* </ul>
*
* @see CommunicationEndpoint#configure(CommunicationConnection, IoMessageVisitor)
*/
@Override
public void configure(CommunicationConnection connection, IoMessageVisitor messageVisitor) {
if (this.connection != null && connection.getState().compareTo(ConnectionState.INITIALIZED) > 0) {
String msg = "{0}:{1} already configured."; //$NON-NLS-1$
logger.warn(MessageFormat.format(msg, this.getClass().getSimpleName(), connection.getId()));
return;
}
ConnectionMessages.assertLegalMessageVisitor(this.getClass(), SocketMessageVisitor.class, messageVisitor);
this.messageVisitor = (SocketMessageVisitor) messageVisitor;
this.connection = connection;
configure();
}
private void configure() {
Map<String, String> parameters = this.connection.getParameters();
this.remoteInputAddressS = parameters.get(SocketEndpointConstants.PARAMETER_REMOTE_INPUT_ADDRESS);
String remoteInputPortS = parameters.get(SocketEndpointConstants.PARAMETER_REMOTE_INPUT_PORT);
this.localOutputAddressS = parameters.get(SocketEndpointConstants.PARAMETER_LOCAL_OUTPUT_ADDRESS);
String localOutputPortS = parameters.get(SocketEndpointConstants.PARAMETER_LOCAL_OUTPUT_PORT);
// parse remote input Address to InetAddress object
try {
this.remoteInputAddress = InetAddress.getByName(this.remoteInputAddressS);
} catch (UnknownHostException e) {
throw ConnectionMessages.throwInvalidParameter(ClientSocketEndpoint.class,
SocketEndpointConstants.PARAMETER_REMOTE_INPUT_ADDRESS, this.remoteInputAddressS);
}
// parse remote input address port to integer
try {
this.remoteInputPort = Integer.parseInt(remoteInputPortS);
} catch (NumberFormatException e) {
throw ConnectionMessages.throwInvalidParameter(ClientSocketEndpoint.class,
SocketEndpointConstants.PARAMETER_REMOTE_INPUT_PORT, remoteInputPortS);
}
// if local output address is not set, then we will use the localhost InetAddress
if (this.localOutputAddressS == null || this.localOutputAddressS.length() == 0) {
logger.debug("No localOutputAddress set. Using localhost"); //$NON-NLS-1$
} else {
// parse local output address name to InetAddress object
try {
this.localOutputAddress = InetAddress.getByName(this.localOutputAddressS);
} catch (UnknownHostException e) {
String msg = "The host name ''{0}'' can not be evaluated to an internet address"; //$NON-NLS-1$
msg = MessageFormat.format(msg, this.localOutputAddressS);
throw new ConnectionException(msg, e);
}
// parse local output address port to integer
try {
this.localOutputPort = Integer.parseInt(localOutputPortS);
} catch (NumberFormatException e) {
throw ConnectionMessages.throwInvalidParameter(ClientSocketEndpoint.class,
SocketEndpointConstants.PARAMETER_LOCAL_OUTPUT_PORT, localOutputPortS);
}
}
// configure retry wait time
String retryS = parameters.get(SocketEndpointConstants.PARAMETER_RETRY);
if (retryS == null || retryS.length() == 0) {
ConnectionMessages.warnUnsetParameter(ClientSocketEndpoint.class, SocketEndpointConstants.PARAMETER_RETRY,
String.valueOf(SocketEndpointConstants.RETRY));
this.retry = SocketEndpointConstants.RETRY;
} else {
try {
this.retry = Long.parseLong(retryS);
} catch (NumberFormatException e) {
throw ConnectionMessages
.throwInvalidParameter(ClientSocketEndpoint.class, SocketEndpointConstants.PARAMETER_RETRY,
retryS);
}
}
// configure connect on start
String connectOnStartS = parameters.get(SocketEndpointConstants.PARAMETER_CONNECT_ON_START);
if (StringHelper.isNotEmpty(connectOnStartS)) {
this.connectOnStart = StringHelper.parseBoolean(connectOnStartS);
} else {
ConnectionMessages
.warnUnsetParameter(ClientSocketEndpoint.class, SocketEndpointConstants.PARAMETER_CONNECT_ON_START,
String.valueOf(SocketEndpointConstants.CONNECT_ON_START));
this.connectOnStart = SocketEndpointConstants.CONNECT_ON_START;
}
// configure closeAfterSend
String closeAfterSendS = parameters.get(SocketEndpointConstants.PARAMETER_CLOSE_AFTER_SEND);
if (StringHelper.isNotEmpty(closeAfterSendS)) {
this.closeAfterSend = StringHelper.parseBoolean(closeAfterSendS);
} else {
ConnectionMessages
.warnUnsetParameter(ClientSocketEndpoint.class, SocketEndpointConstants.PARAMETER_CLOSE_AFTER_SEND,
String.valueOf(SocketEndpointConstants.CLOSE_AFTER_SEND));
this.closeAfterSend = SocketEndpointConstants.CLOSE_AFTER_SEND;
}
// configure if timeout on connection should be activated
String useTimeoutS = parameters.get(SocketEndpointConstants.PARAMETER_USE_TIMEOUT);
if (useTimeoutS == null || useTimeoutS.length() == 0) {
ConnectionMessages
.warnUnsetParameter(ClientSocketEndpoint.class, SocketEndpointConstants.PARAMETER_USE_TIMEOUT,
String.valueOf(SocketEndpointConstants.USE_TIMEOUT));
this.useTimeout = SocketEndpointConstants.USE_TIMEOUT;
} else {
this.useTimeout = Boolean.parseBoolean(useTimeoutS);
}
if (this.useTimeout) {
// configure timeout on connection
String timeoutS = parameters.get(SocketEndpointConstants.PARAMETER_TIMEOUT);
if (timeoutS == null || timeoutS.length() == 0) {
ConnectionMessages
.warnUnsetParameter(ClientSocketEndpoint.class, SocketEndpointConstants.PARAMETER_TIMEOUT,
String.valueOf(SocketEndpointConstants.TIMEOUT));
this.timeout = SocketEndpointConstants.TIMEOUT;
} else {
try {
this.timeout = Integer.parseInt(timeoutS);
} catch (NumberFormatException e) {
throw ConnectionMessages.throwInvalidParameter(ClientSocketEndpoint.class,
SocketEndpointConstants.PARAMETER_TIMEOUT, timeoutS);
}
}
}
// configure if the connection should be cleared on connect
String clearOnConnectS = parameters.get(SocketEndpointConstants.PARAMETER_CLEAR_ON_CONNECT);
if (clearOnConnectS == null || clearOnConnectS.length() == 0) {
ConnectionMessages
.warnUnsetParameter(ClientSocketEndpoint.class, SocketEndpointConstants.PARAMETER_CLEAR_ON_CONNECT,
String.valueOf(SocketEndpointConstants.CLEAR_ON_CONNECT));
this.clearOnConnect = SocketEndpointConstants.CLEAR_ON_CONNECT;
} else {
this.clearOnConnect = Boolean.parseBoolean(clearOnConnectS);
}
}
/**
* @return the uri as String to which this {@link ClientSocketEndpoint} is locally bound to
*/
@Override
public String getLocalUri() {
if (this.socket != null) {
InetAddress localAddress = this.socket.getLocalAddress();
return localAddress.getHostAddress() + StringHelper.COLON + this.socket.getLocalPort();
} else if (this.localOutputAddress != null) {
return this.localOutputAddress.getHostAddress() + StringHelper.COLON + this.localOutputPort;
}
return "0.0.0.0:0"; //$NON-NLS-1$
}
/**
* @return the uri as String to which this {@link ClientSocketEndpoint} is connecting to
*/
@Override
public String getRemoteUri() {
if (this.socket != null) {
InetAddress remoteAddress = this.socket.getInetAddress();
return remoteAddress.getHostAddress() + StringHelper.COLON + this.socket.getPort();
} else if (this.remoteInputAddress != null) {
return this.remoteInputAddress.getHostAddress() + StringHelper.COLON + this.remoteInputPort;
}
return this.remoteInputAddressS + StringHelper.COLON + this.remoteInputPort;
}
/**
* Allows this end point to connect and then opens the connection to the defined remote server
*
* @see CommunicationEndpoint#start()
*/
@Override
public void start() {
if (!this.closed) {
logger.warn(MessageFormat
.format("CommunicationConnection {0} already started.", this.connection.getId())); //$NON-NLS-1$
} else {
// logger.info(MessageFormat.format("Enabling connection {0}...", this.connection.getId())); //$NON-NLS-1$
this.closed = false;
this.connection.notifyStateChange(ConnectionState.INITIALIZED, ConnectionState.INITIALIZED.toString());
if (this.connectOnStart) {
openConnection();
}
}
}
/**
* Closes this connection and disallows this end point to reconnect
*
* @see CommunicationEndpoint#stop()
*/
@Override
public void stop() {
this.closed = true;
closeConnection();
this.connection.notifyStateChange(ConnectionState.DISCONNECTED, ConnectionState.DISCONNECTED.toString());
logger.info(MessageFormat.format("Disabled connection {0}.", this.connection.getId())); //$NON-NLS-1$
}
@Override
public void reset() {
this.closed = true;
closeConnection();
this.connection.notifyStateChange(ConnectionState.INITIALIZED, ConnectionState.INITIALIZED.toString());
}
@Override
public void simulate(IoMessage message) throws Exception {
this.messageVisitor.simulate(message);
}
@Override
public void send(IoMessage message) throws Exception {
while (!this.closed && message.getState() == State.PENDING) {
try {
// open the connection
if (!checkConnection())
openConnection();
// read and write to the client socket
this.messageVisitor.visit(this.inputStream, this.outputStream, message);
message.setState(State.DONE, State.DONE.name());
// since send was ok, allow to reconnect immediately
if (this.closeAfterSend) {
this.lastConnect = System.currentTimeMillis() - this.retry;
}
} catch (Exception e) {
if (this.closed) {
logger.warn("Socket has been closed!"); //$NON-NLS-1$
message.setState(State.FATAL, "Socket has been closed!"); //$NON-NLS-1$
} else {
closeConnection();
logger.error(e.getMessage(), e);
message.setState(State.FATAL, e.getLocalizedMessage());
this.connection.notifyStateChange(ConnectionState.BROKEN, e.getLocalizedMessage());
}
} finally {
if (this.closeAfterSend && !this.closed) {
closeConnection();
}
}
}
}
}

View File

@ -1,589 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication.tcpip;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.*;
import java.text.MessageFormat;
import java.util.Map;
import li.strolch.communication.*;
import li.strolch.utils.helper.StringHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>
* This {@link CommunicationEndpoint} is an abstract implementation with everything needed to start a {@link Socket}
* server which waits for a request from a single client
* </p>
* <p>
* This end point only allows a single connection at a time and implements all exception handling for opening and
* closing a {@link Socket} connection
* </p>
*
* @author Robert von Burg &lt;eitch@eitchnet.ch&gt;
* @see ServerSocketEndpoint#configure(CommunicationConnection, IoMessageVisitor) for details on configuring the end
* point
*/
public class ServerSocketEndpoint implements CommunicationEndpoint, Runnable {
protected static final Logger logger = LoggerFactory.getLogger(ServerSocketEndpoint.class);
private Thread serverThread;
// state variables
private boolean connected;
private boolean closed;
private boolean fatal;
private long lastConnect;
private boolean useTimeout;
private int timeout;
private long retry;
private boolean clearOnConnect;
// address
private String localInputAddressS;
private int localInputPort;
private String remoteOutputAddressS;
private int remoteOutputPort;
private InetAddress localInputAddress;
private InetAddress remoteOutputAddress;
// connection
private ServerSocket serverSocket;
private Socket socket;
protected DataOutputStream outputStream;
protected DataInputStream inputStream;
protected CommunicationConnection connection;
protected SocketMessageVisitor messageVisitor;
/**
* Default constructor
*/
public ServerSocketEndpoint() {
this.connected = false;
this.closed = true;
this.fatal = false;
}
/**
* Checks the state of the connection and returns true if {@link Socket} is connected and ready for transmission,
* false otherwise
*
* @return true if {@link Socket} is connected and ready for transmission, false otherwise
*/
protected boolean checkConnection() {
return !this.closed && this.connected && (this.socket != null && !this.socket.isClosed() && this.socket
.isBound() && this.socket.isConnected() && !this.socket.isInputShutdown() && !this.socket
.isOutputShutdown());
}
/**
* Listens on the {@link ServerSocket} for an incoming connection. Prepares the connection then for use. If the
* remote address has been defined, then the remote connection is validated to come from this appropriate host.
* CommunicationConnection attempts are always separated by a configured amount of time
*/
protected void openConnection() {
ConnectionState state = this.connection.getState();
// do not open the connection if state is
// - CREATED
// - CONNECTING
// - WAITING
// - CLOSED
if (state == ConnectionState.CREATED || state == ConnectionState.CONNECTING || state == ConnectionState.WAITING
|| state == ConnectionState.DISCONNECTED) {
ConnectionMessages.throwIllegalConnectionState(state, ConnectionState.CONNECTING);
}
// first close the connection
closeConnection();
while (!this.connected && !this.closed) {
try {
this.connection.notifyStateChange(ConnectionState.CONNECTING, ConnectionState.CONNECTING.toString());
// only try in proper intervals
long currentTime = System.currentTimeMillis();
long timeDifference = currentTime - this.lastConnect;
if (timeDifference < this.retry) {
long wait = this.retry - timeDifference;
logger.info(MessageFormat.format("Waiting: {0}ms", wait)); //$NON-NLS-1$
this.connection.notifyStateChange(ConnectionState.WAITING, ConnectionState.WAITING.toString());
Thread.sleep(wait);
this.connection
.notifyStateChange(ConnectionState.CONNECTING, ConnectionState.CONNECTING.toString());
}
// don't try and connect if we are closed!
if (this.closed) {
logger.error("The connection has been closed and can not be connected"); //$NON-NLS-1$
closeConnection();
this.connection.notifyStateChange(ConnectionState.DISCONNECTED, null);
return;
}
// keep track of the time of this connection attempt
this.lastConnect = System.currentTimeMillis();
// open the socket
String msg = "Waiting for connections on: {0}:{1}..."; //$NON-NLS-1$
logger.info(MessageFormat
.format(msg, this.localInputAddress.getHostAddress(), Integer.toString(this.localInputPort)));
this.socket = this.serverSocket.accept();
// validate that the remote side of the socket is really the client we want
if (this.remoteOutputAddress != null) {
String remoteAddr = this.socket.getInetAddress().getHostAddress();
if (!remoteAddr.equals(this.remoteOutputAddress.getHostAddress())) {
msg = "Illegal remote client at address {0}. Expected is {1}"; //$NON-NLS-1$
msg = MessageFormat.format(msg, remoteAddr, this.remoteOutputAddress.getHostAddress());
logger.error(msg);
closeConnection();
throw new ConnectionException(msg);
}
}
// configure the socket
if (logger.isDebugEnabled()) {
msg = "BufferSize (send/read): {0} / {1} SoLinger: {2} TcpNoDelay: {3}"; //$NON-NLS-1$
logger.debug(MessageFormat
.format(msg, this.socket.getSendBufferSize(), this.socket.getReceiveBufferSize(),
this.socket.getSoLinger(), this.socket.getTcpNoDelay()));
}
//inputSocket.setSendBufferSize(1);
//inputSocket.setSoLinger(true, 0);
//inputSocket.setTcpNoDelay(true);
// activate connection timeout
if (this.useTimeout) {
this.socket.setSoTimeout(this.timeout);
}
// get the streams
this.outputStream = new DataOutputStream(this.socket.getOutputStream());
this.inputStream = new DataInputStream(this.socket.getInputStream());
if (this.clearOnConnect) {
// clear the input stream
int available = this.inputStream.available();
logger.info(MessageFormat.format("clearOnConnect: skipping {0} bytes.", available)); //$NON-NLS-1$
this.inputStream.skip(available);
}
msg = "Connected {0}{1}: {2}:{3} with local side {4}:{5}"; //$NON-NLS-1$
logger.info(MessageFormat.format(msg, this.getClass().getSimpleName(), this.connection.getId(),
this.socket.getInetAddress().getHostName(), Integer.toString(this.socket.getPort()),
this.socket.getLocalAddress().getHostAddress(), Integer.toString(this.socket.getLocalPort())));
// we are connected!
this.connection.notifyStateChange(ConnectionState.CONNECTED, ConnectionState.CONNECTED.toString());
this.connected = true;
} catch (InterruptedException e) {
logger.warn("Interrupted!"); //$NON-NLS-1$
this.closed = true;
this.connection.notifyStateChange(ConnectionState.DISCONNECTED, null);
} catch (Exception e) {
if (this.closed && e instanceof SocketException) {
logger.warn("Socket closed!"); //$NON-NLS-1$
this.connection.notifyStateChange(ConnectionState.DISCONNECTED, null);
} else {
String msg = "Error while opening socket for inbound connection {0}: {1}"; //$NON-NLS-1$
logger.error(MessageFormat.format(msg, this.connection.getId()), e.getMessage());
this.connected = false;
this.connection.notifyStateChange(ConnectionState.BROKEN, e.getLocalizedMessage());
}
}
}
}
/**
* closes the connection HARD by calling close() on the streams and socket. All Exceptions are caught to make sure
* that the connections are cleaned up
*/
protected void closeConnection() {
this.connected = false;
this.connection.notifyStateChange(ConnectionState.BROKEN, null);
if (this.outputStream != null) {
try {
this.outputStream.close();
} catch (IOException e) {
logger.error(
MessageFormat.format("Error closing OutputStream: {0}", e.getLocalizedMessage())); //$NON-NLS-1$
} finally {
this.outputStream = null;
}
}
if (this.inputStream != null) {
try {
this.inputStream.close();
} catch (IOException e) {
logger.error(
MessageFormat.format("Error closing InputStream: {0}", e.getLocalizedMessage())); //$NON-NLS-1$
} finally {
this.inputStream = null;
}
}
if (this.socket != null) {
try {
this.socket.close();
} catch (IOException e) {
logger.error(
MessageFormat.format("Error closing InputSocket: {0}", e.getLocalizedMessage())); //$NON-NLS-1$
} finally {
this.socket = null;
}
String msg = "Socket closed for inbound connection {0} at local input address {1}:{2}"; //$NON-NLS-1$
logger.info(MessageFormat.format(msg, this.connection.getId(), this.localInputAddressS,
Integer.toString(this.localInputPort)));
}
}
/**
* <p>
* Configures this {@link ServerSocketEndpoint}
* </p>
* gets the parameter map from the connection and reads the following parameters from the map:
* <ul>
* <li>localInputAddress - the local IP or Hostname to bind to for incoming connections</li>
* <li>localInputPort - the local port on which to listen for incoming connections</li>
* <li>remoteOutputAddress - the IP or Hostname of the remote client. If this value is not null, then it will be
* verified that the connecting client is connecting from this address</li>
* <li>remoteOutputPort - the port from which the remote client must connect. If this value is not null, then it
* will be verified that the connecting client is connecting from this port</li>
* <li>retry - a configured retry wait time. Default is {@link SocketEndpointConstants#RETRY}</li>
* <li>timeout - the timeout after which an idle socket is deemed dead. Default is
* {@link SocketEndpointConstants#TIMEOUT}</li>
* <li>useTimeout - if true, then the timeout is activated. default is {@link SocketEndpointConstants#USE_TIMEOUT}</li>
* <li>clearOnConnect - if true, then the after a successful connect the input is cleared by discarding all
* available bytes. This can be useful in cases where the channel is clogged with stale data. default is {@link
* SocketEndpointConstants#CLEAR_ON_CONNECT}</li>
* </ul>
*
* @see CommunicationEndpoint#configure(CommunicationConnection, IoMessageVisitor)
*/
@Override
public void configure(CommunicationConnection connection, IoMessageVisitor messageVisitor) {
if (this.connection != null && connection.getState().compareTo(ConnectionState.INITIALIZED) > 0) {
logger.warn(MessageFormat
.format("Inbound connection {0} already configured.", connection.getId())); //$NON-NLS-1$
return;
}
ConnectionMessages.assertLegalMessageVisitor(this.getClass(), SocketMessageVisitor.class, messageVisitor);
this.messageVisitor = (SocketMessageVisitor) messageVisitor;
this.connection = connection;
configure();
}
private void configure() {
Map<String, String> parameters = this.connection.getParameters();
this.localInputAddressS = parameters.get(SocketEndpointConstants.PARAMETER_LOCAL_INPUT_ADDRESS);
String localInputPortS = parameters.get(SocketEndpointConstants.PARAMETER_LOCAL_INPUT_PORT);
this.remoteOutputAddressS = parameters.get(SocketEndpointConstants.PARAMETER_REMOTE_OUTPUT_ADDRESS);
String remoteOutputPortS = parameters.get(SocketEndpointConstants.PARAMETER_REMOTE_OUTPUT_PORT);
// parse local Address to InetAddress object
try {
this.localInputAddress = InetAddress.getByName(this.localInputAddressS);
} catch (UnknownHostException e) {
throw ConnectionMessages.throwInvalidParameter(ServerSocketEndpoint.class,
SocketEndpointConstants.PARAMETER_LOCAL_INPUT_ADDRESS, this.localInputAddressS);
}
// parse local address port to integer
try {
this.localInputPort = Integer.parseInt(localInputPortS);
} catch (NumberFormatException e) {
throw ConnectionMessages.throwInvalidParameter(ServerSocketEndpoint.class,
SocketEndpointConstants.PARAMETER_LOCAL_INPUT_PORT, localInputPortS);
}
// if remote address is not set, then we will use the localhost InetAddress
if (this.remoteOutputAddressS == null || this.remoteOutputAddressS.length() == 0) {
logger.debug("No remoteOutputAddress set. Allowing connection from any remote address"); //$NON-NLS-1$
} else {
// parse remote output address name to InetAddress object
try {
this.remoteOutputAddress = InetAddress.getByName(this.remoteOutputAddressS);
} catch (UnknownHostException e) {
throw ConnectionMessages.throwInvalidParameter(ServerSocketEndpoint.class,
SocketEndpointConstants.PARAMETER_REMOTE_OUTPUT_ADDRESS, this.remoteOutputAddressS);
}
// parse remote output address port to integer
try {
this.remoteOutputPort = Integer.parseInt(remoteOutputPortS);
} catch (NumberFormatException e) {
throw ConnectionMessages.throwInvalidParameter(ServerSocketEndpoint.class,
SocketEndpointConstants.PARAMETER_REMOTE_OUTPUT_PORT, remoteOutputPortS);
}
}
// configure retry wait time
String retryS = parameters.get(SocketEndpointConstants.PARAMETER_RETRY);
if (retryS == null || retryS.length() == 0) {
ConnectionMessages.warnUnsetParameter(ServerSocketEndpoint.class, SocketEndpointConstants.PARAMETER_RETRY,
String.valueOf(SocketEndpointConstants.RETRY));
this.retry = SocketEndpointConstants.RETRY;
} else {
try {
this.retry = Long.parseLong(retryS);
} catch (NumberFormatException e) {
throw ConnectionMessages
.throwInvalidParameter(ServerSocketEndpoint.class, SocketEndpointConstants.PARAMETER_RETRY,
retryS);
}
}
// configure if timeout on connection should be activated
String useTimeoutS = parameters.get(SocketEndpointConstants.PARAMETER_USE_TIMEOUT);
if (useTimeoutS == null || useTimeoutS.length() == 0) {
ConnectionMessages
.warnUnsetParameter(ServerSocketEndpoint.class, SocketEndpointConstants.PARAMETER_USE_TIMEOUT,
String.valueOf(SocketEndpointConstants.USE_TIMEOUT));
this.useTimeout = SocketEndpointConstants.USE_TIMEOUT;
} else {
this.useTimeout = Boolean.parseBoolean(useTimeoutS);
}
if (this.useTimeout) {
// configure timeout on connection
String timeoutS = parameters.get(SocketEndpointConstants.PARAMETER_TIMEOUT);
if (timeoutS == null || timeoutS.length() == 0) {
ConnectionMessages
.warnUnsetParameter(ServerSocketEndpoint.class, SocketEndpointConstants.PARAMETER_TIMEOUT,
String.valueOf(SocketEndpointConstants.TIMEOUT));
this.timeout = SocketEndpointConstants.TIMEOUT;
} else {
try {
this.timeout = Integer.parseInt(timeoutS);
} catch (NumberFormatException e) {
throw ConnectionMessages.throwInvalidParameter(ServerSocketEndpoint.class,
SocketEndpointConstants.PARAMETER_TIMEOUT, timeoutS);
}
}
}
// configure if the connection should be cleared on connect
String clearOnConnectS = parameters.get(SocketEndpointConstants.PARAMETER_CLEAR_ON_CONNECT);
if (clearOnConnectS == null || clearOnConnectS.length() == 0) {
ConnectionMessages
.warnUnsetParameter(ServerSocketEndpoint.class, SocketEndpointConstants.PARAMETER_CLEAR_ON_CONNECT,
String.valueOf(SocketEndpointConstants.CLEAR_ON_CONNECT));
this.clearOnConnect = SocketEndpointConstants.CLEAR_ON_CONNECT;
} else {
this.clearOnConnect = Boolean.parseBoolean(clearOnConnectS);
}
}
/**
* @return the uri as String to which this {@link ServerSocketEndpoint} is locally bound to
*/
@Override
public String getLocalUri() {
if (this.socket != null) {
InetAddress localAddress = this.socket.getLocalAddress();
return localAddress.getHostAddress() + StringHelper.COLON + this.socket.getLocalPort();
} else if (this.localInputAddress != null) {
return this.localInputAddress.getHostAddress() + StringHelper.COLON + this.localInputPort;
}
return "0.0.0.0:0"; //$NON-NLS-1$
}
/**
* @return the uri as String from which this {@link ServerSocketEndpoint} is receiving data from
*/
@Override
public String getRemoteUri() {
if (this.socket != null) {
InetAddress remoteAddress = this.socket.getInetAddress();
return remoteAddress.getHostAddress() + StringHelper.COLON + this.socket.getPort();
} else if (this.remoteOutputAddressS != null) {
return this.remoteOutputAddress.getHostAddress() + StringHelper.COLON + this.remoteOutputPort;
}
return "0.0.0.0:0"; //$NON-NLS-1$
}
/**
* Starts the {@link Thread} to allow incoming connections
*
* @see CommunicationEndpoint#start()
*/
@Override
public void start() {
if (this.fatal) {
String msg = "CommunicationConnection had a fatal exception and can not yet be started. Please check log file for further information!"; //$NON-NLS-1$
throw new ConnectionException(msg);
}
if (this.serverThread != null) {
logger.warn(MessageFormat
.format("CommunicationConnection {0} already started.", this.connection.getId())); //$NON-NLS-1$
} else {
// logger.info(MessageFormat.format("Enabling connection {0}...", this.connection.getId())); //$NON-NLS-1$
this.closed = false;
this.serverThread = new Thread(this, this.connection.getId());
this.serverThread.start();
}
}
/**
* Closes any open connection and then stops the {@link Thread} disallowing incoming connections
*
* @see CommunicationEndpoint#stop()
*/
@Override
public void stop() {
closeThread();
closeConnection();
this.connection.notifyStateChange(ConnectionState.DISCONNECTED, ConnectionState.DISCONNECTED.toString());
logger.info(MessageFormat.format("Disabled connection {0}.", this.connection.getId())); //$NON-NLS-1$
}
@Override
public void reset() {
closeThread();
closeConnection();
configure();
this.connection.notifyStateChange(ConnectionState.INITIALIZED, ConnectionState.INITIALIZED.toString());
}
private void closeThread() {
this.closed = true;
this.fatal = false;
if (this.serverThread != null) {
try {
this.serverThread.interrupt();
if (this.serverSocket != null)
this.serverSocket.close();
this.serverThread.join(2000l);
} catch (Exception e) {
logger.error(MessageFormat.format("Exception while interrupting server thread: {0}",
e.getLocalizedMessage())); //$NON-NLS-1$
}
this.serverThread = null;
}
}
/**
* Thread is listening on the ServerSocket and opens a new connection if necessary
*/
@Override
public void run() {
while (!this.closed) {
// bomb-proof, catches all exceptions!
try {
// if serverSocket is null or closed, open a new server socket
if (this.serverSocket == null || this.serverSocket.isClosed()) {
try {
// String msg = "Opening socket on {0}:{1}..."; //$NON-NLS-1$
// logger.info(MessageFormat.format(msg, this.localInputAddress.getHostAddress(),
// Integer.toString(this.localInputPort)));
this.serverSocket = new ServerSocket(this.localInputPort, 1, this.localInputAddress);
this.serverSocket.setReuseAddress(true);
} catch (BindException e) {
logger.error(
"Fatal BindException occurred! Port is already in use, or address is illegal!"); //$NON-NLS-1$
logger.error(e.getMessage(), e);
this.closed = true;
this.fatal = true;
String msg = "Fatal error while binding to server socket. ServerSocket endpoint is dead"; //$NON-NLS-1$
throw new ConnectionException(msg);
}
}
// open the connection
openConnection();
// as long as connection is connected
while (checkConnection()) {
// read and write from the connected server socket
IoMessage message = this.messageVisitor.visit(this.inputStream, this.outputStream);
if (message != null) {
this.connection.handleNewMessage(message);
}
}
} catch (Exception e) {
if (e instanceof InterruptedException) {
logger.error("Interrupted!"); //$NON-NLS-1$
} else {
logger.error(e.getMessage(), e);
}
this.connection.notifyStateChange(ConnectionState.BROKEN, e.getLocalizedMessage());
} finally {
closeConnection();
}
}
if (!this.fatal) {
logger.warn(MessageFormat.format("CommunicationConnection {0} is not running anymore!",
this.connection.getId())); //$NON-NLS-1$
this.connection.notifyStateChange(ConnectionState.BROKEN, null);
} else {
String msg = "CommunicationConnection {0} is broken due to a fatal exception!"; //$NON-NLS-1$
logger.error(MessageFormat.format(msg, this.connection.getId()));
this.connection.notifyStateChange(ConnectionState.DISCONNECTED, null);
}
}
@Override
public void simulate(IoMessage message) throws Exception {
send(message);
}
@Override
public void send(IoMessage message) throws Exception {
String msg = "The Server Socket can not send messages, use the {0} implementation instead!"; //$NON-NLS-1$
throw new UnsupportedOperationException(MessageFormat.format(msg, ClientSocketEndpoint.class.getName()));
}
}

View File

@ -1,83 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication.tcpip;
/**
* Constants used in the communication classes
*
* @author Robert von Burg &lt;eitch@eitchnet.ch&gt;
*/
public class SocketEndpointConstants {
public static final String PARAMETER_USE_TIMEOUT = "useTimeout"; //$NON-NLS-1$
public static final String PARAMETER_TIMEOUT = "timeout"; //$NON-NLS-1$
public static final String PARAMETER_RETRY = "retry"; //$NON-NLS-1$
public static final String PARAMETER_CLEAR_ON_CONNECT = "clearOnConnect"; //$NON-NLS-1$
public static final String PARAMETER_CONNECT_ON_START = "connectOnStart"; //$NON-NLS-1$
public static final String PARAMETER_CLOSE_AFTER_SEND = "closeAfterSend"; //$NON-NLS-1$
public static final String PARAMETER_REMOTE_OUTPUT_PORT = "remoteOutputPort"; //$NON-NLS-1$
public static final String PARAMETER_REMOTE_OUTPUT_ADDRESS = "remoteOutputAddress"; //$NON-NLS-1$
public static final String PARAMETER_LOCAL_INPUT_PORT = "localInputPort"; //$NON-NLS-1$
public static final String PARAMETER_LOCAL_INPUT_ADDRESS = "localInputAddress"; //$NON-NLS-1$
public static final String PARAMETER_LOCAL_OUTPUT_ADDRESS = "localOutputAddress"; //$NON-NLS-1$
public static final String PARAMETER_LOCAL_OUTPUT_PORT = "localOutputPort"; //$NON-NLS-1$
public static final String PARAMETER_REMOTE_INPUT_ADDRESS = "remoteInputAddress"; //$NON-NLS-1$
public static final String PARAMETER_REMOTE_INPUT_PORT = "remoteInputPort"; //$NON-NLS-1$
/**
* Time to wait in milliseconds before reestablishing a connection. Default is 60000ms
*/
public static final long RETRY = 60000l;
/**
* The time after which a connection is deemed dead. Value is 60000ms
*/
public static final int TIMEOUT = 60000;
/**
* Default is to use a timeout on socket connections, thus this value is true
*/
public static final boolean USE_TIMEOUT = true;
/**
* Default is to not clear the input socket on connect, thus this value is false
*/
public static final boolean CLEAR_ON_CONNECT = false;
/**
* Default is to connect on start of the connection
*/
public static final boolean CONNECT_ON_START = true;
/**
* Default is to not close after sending
*/
public static final boolean CLOSE_AFTER_SEND = false;
/**
* Default is to disconnect after a null message is received when reading from a TCP socket, thus this value is
* true
*/
public static final boolean DISCONNECT_ON_NULL_MSG = true;
/**
* If {@link #DISCONNECT_ON_NULL_MSG} is activated, then this is the default time used to wait before reading again,
* which is 10000ms
*/
public static final long WAIT_TIME_ON_NULL_MSG = 10000l;
}

View File

@ -1,72 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication.tcpip;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.Socket;
import li.strolch.communication.IoMessage;
import li.strolch.communication.IoMessageVisitor;
/**
* This {@link IoMessageVisitor} implements and endpoint connecting to a {@link Socket}.
*
* @author Robert von Burg &lt;eitch@eitchnet.ch&gt;
*/
public abstract class SocketMessageVisitor extends IoMessageVisitor {
protected final String connectionId;
public SocketMessageVisitor(String connectionId) {
this.connectionId = connectionId;
}
public String getConnectionId() {
return this.connectionId;
}
/**
* This method is called when a message is read from the underlying {@link Socket}
*
* @param inputStream
* the input stream to read data from
* @param outputStream
* the output stream to write data to
*
* @return the parsed {@link IoMessage}
*
* @throws Exception
* if something goes wrong
*/
public abstract IoMessage visit(DataInputStream inputStream, DataOutputStream outputStream) throws Exception;
/**
* This method is called when a message is to be sent to the underlying connected endpoint
*
* @param inputStream
* the input stream to read data from
* @param outputStream
* the output stream to write data to
* @param message
* the message to parse
*
* @throws Exception
* of something goes wrong
*/
public abstract void visit(DataInputStream inputStream, DataOutputStream outputStream, IoMessage message)
throws Exception;
}

View File

@ -1,70 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public class AbstractEndpointTest {
static final Logger logger = LoggerFactory.getLogger(FileEndpointTest.class);
public static TestIoMessage createTestMessage(String key1, String key2, String connectionId) {
return createTestMessage(CommandKey.key(key1, key2), connectionId);
}
@SuppressWarnings("nls")
public static TestIoMessage createTestMessage(CommandKey key, String connectionId) {
TestIoMessage msg = new TestIoMessage(UUID.randomUUID().toString(), key, connectionId);
List<String> lines = new ArrayList<>();
lines.add("bla");
lines.add("foo");
lines.add("bar");
lines.add("bla");
msg.setContents(lines);
return msg;
}
protected void waitForMessage(TestConnectionObserver observer) throws InterruptedException {
long start = System.currentTimeMillis();
while (observer.getMessage() == null) {
if (System.currentTimeMillis() - start > 2000)
fail("Connection didn't send message in 2s!"); //$NON-NLS-1$
Thread.sleep(50);
}
}
}

View File

@ -1,78 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication;
import static org.junit.Assert.assertEquals;
import java.util.HashMap;
import java.util.Map;
import li.strolch.communication.console.ConsoleEndpoint;
import li.strolch.communication.console.ConsoleMessageVisitor;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
/**
* @author Robert von Burg &lt;eitch@eitchnet.ch&gt;
*/
public class ConsoleEndpointTest extends AbstractEndpointTest {
private static final String CONNECTION_ID = "Console"; //$NON-NLS-1$
private CommunicationConnection connection;
@Before
public void before() {
Map<String, String> parameters = new HashMap<>();
CommunicationEndpoint endpoint = new ConsoleEndpoint();
ConsoleMessageVisitor messageVisitor = new ConsoleMessageVisitorExtension();
this.connection = new CommunicationConnection(CONNECTION_ID, ConnectionMode.ON, parameters, endpoint,
messageVisitor);
this.connection.configure();
}
@Test
public void testConsoleEndpoint() throws InterruptedException {
this.connection.start();
CommandKey key = CommandKey.key(CONNECTION_ID, "logger"); //$NON-NLS-1$
TestIoMessage msg = createTestMessage(key, CONNECTION_ID);
TestConnectionObserver observer = new TestConnectionObserver();
this.connection.addConnectionObserver(key, observer);
this.connection.send(msg);
waitForMessage(observer);
assertEquals(msg.getKey(), observer.getMessage().getKey());
}
private final class ConsoleMessageVisitorExtension extends ConsoleMessageVisitor {
public ConsoleMessageVisitorExtension() {
// no-op
}
@Override
public void visit(Logger logger, IoMessage message) throws Exception {
TestIoMessage msg = (TestIoMessage) message;
for (String line : msg.getContents()) {
logger.info(line);
}
}
}
}

View File

@ -1,129 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication;
import static org.junit.Assert.assertEquals;
import java.io.*;
import java.util.*;
import li.strolch.communication.file.FileEndpoint;
import li.strolch.communication.file.FileEndpointMode;
import li.strolch.utils.helper.FileHelper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* @author Robert von Burg &lt;eitch@eitchnet.ch&gt;
*/
public class FileEndpointTest extends AbstractEndpointTest {
public static final String INBOUND_FILENAME = "target/test_in.txt"; //$NON-NLS-1$
public static final String OUTBOUND_FILENAME = "target/test_out.txt"; //$NON-NLS-1$
public static final String CONNECTION_ID = "FileTestEndpoint"; //$NON-NLS-1$
private CommunicationConnection connection;
@Before
public void before() {
new File(OUTBOUND_FILENAME).delete();
new File(INBOUND_FILENAME).delete();
Map<String, String> parameters = new HashMap<>();
parameters.put(FileEndpoint.ENDPOINT_MODE, FileEndpointMode.READ_WRITE.name());
parameters.put(FileEndpoint.INBOUND_FILENAME, INBOUND_FILENAME);
parameters.put(FileEndpoint.OUTBOUND_FILENAME, OUTBOUND_FILENAME);
ConnectionMode mode = ConnectionMode.ON;
CommunicationEndpoint endpoint = new FileEndpoint();
StreamMessageVisitor messageVisitor = new StreamMessageVisitorExtension();
this.connection = new CommunicationConnection(CONNECTION_ID, mode, parameters, endpoint, messageVisitor);
this.connection.configure();
}
@After
public void after() {
if (this.connection != null)
this.connection.stop();
}
@Test
public void testFileEndpoint() throws InterruptedException {
String inboundFilename = new File(INBOUND_FILENAME).getName();
String outboundFilename = new File(OUTBOUND_FILENAME).getName();
// send a message
this.connection.start();
TestConnectionObserver outboundObserver = new TestConnectionObserver();
TestIoMessage message = createTestMessage(outboundFilename, FileEndpointMode.WRITE.name(), CONNECTION_ID);
this.connection.addConnectionObserver(message.getKey(), outboundObserver);
this.connection.send(message);
// wait till the message has been sent
waitForMessage(outboundObserver);
this.connection.stop();
assertEquals(message.getKey(), outboundObserver.getMessage().getKey());
// now test reading a file
this.connection.start();
CommandKey inboundKey = CommandKey.key(inboundFilename, FileEndpointMode.READ.name());
TestConnectionObserver inboundObserver = new TestConnectionObserver();
this.connection.addConnectionObserver(inboundKey, inboundObserver);
FileHelper.writeStringToFile("Hello\nWorld!", new File(INBOUND_FILENAME)); //$NON-NLS-1$
// wait for thread to pick up the file
waitForMessage(inboundObserver);
assertEquals(inboundKey, inboundObserver.getMessage().getKey());
}
public static final class StreamMessageVisitorExtension extends StreamMessageVisitor {
private String inboundFilename;
@Override
public void configure(CommunicationConnection connection) {
super.configure(connection);
Map<String, String> parameters = connection.getParameters();
String filePath = parameters.get(FileEndpoint.INBOUND_FILENAME);
this.inboundFilename = new File(filePath).getName();
}
@Override
public IoMessage visit(InputStream inputStream) throws Exception {
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
List<String> lines = new ArrayList<>();
String line;
while ((line = reader.readLine()) != null) {
lines.add(line);
}
return new TestIoMessage(UUID.randomUUID().toString(),
CommandKey.key(this.inboundFilename, FileEndpointMode.READ.name()), CONNECTION_ID, lines);
}
@Override
public void visit(OutputStream outputStream, IoMessage message) throws Exception {
TestIoMessage msg = (TestIoMessage) message;
for (String line : msg.getContents()) {
outputStream.write(line.getBytes());
outputStream.write('\n');
}
}
}
}

View File

@ -1,66 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication;
import static org.junit.Assert.assertEquals;
import java.util.*;
import org.junit.Test;
/**
* @author Robert von Burg &lt;eitch@eitchnet.ch&gt;
*/
public class SimpleMessageArchiveTest extends AbstractEndpointTest {
@Test
public void testArchive() throws InterruptedException {
IoMessageArchive archive = new SimpleMessageArchive(20, 5);
CommandKey key = CommandKey.key("key1", "key2"); //$NON-NLS-1$//$NON-NLS-2$
String connectionId = "connection1"; //$NON-NLS-1$
int i = 0;
for (; i < 20; i++) {
TestIoMessage msg = new TestIoMessage(UUID.randomUUID().toString(), key, connectionId);
// update the time by plus 1, otherwise the tree set does not add it
msg.setUpdated(new Date(i + 1));
archive.archive(msg);
}
assertEquals(20, archive.size());
// add one more
TestIoMessage msg = new TestIoMessage(UUID.randomUUID().toString(), key, connectionId);
msg.setUpdated(new Date(i + 1));
archive.archive(msg);
// validate the trimming works
assertEquals(15, archive.size());
// Now make sure our last element is still in the list
List<IoMessage> all = archive.getAll();
Collections.sort(all, new Comparator<IoMessage>() {
@Override
public int compare(IoMessage o1, IoMessage o2) {
return o1.getUpdated().compareTo(o2.getUpdated());
}
});
IoMessage message = all.get(all.size() - 1);
assertEquals(msg.getId(), message.getId());
}
}

View File

@ -1,149 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication;
import static org.junit.Assert.assertEquals;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.text.MessageFormat;
import java.util.*;
import li.strolch.communication.tcpip.ClientSocketEndpoint;
import li.strolch.communication.tcpip.ServerSocketEndpoint;
import li.strolch.communication.tcpip.SocketEndpointConstants;
import li.strolch.communication.tcpip.SocketMessageVisitor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* @author Robert von Burg &lt;eitch@eitchnet.ch&gt;
*/
public class SocketEndpointTest extends AbstractEndpointTest {
private static final String PORT = "45678"; //$NON-NLS-1$
private static final String HOST = "localhost"; //$NON-NLS-1$
private static final String CLIENT_CONNECTION_ID = "ClientSocket"; //$NON-NLS-1$
private static final String SERVER_CONNECTION_ID = "ServerSocket"; //$NON-NLS-1$
private CommunicationConnection clientConnection;
private CommunicationConnection serverConnection;
@Before
public void before() {
{
Map<String, String> parameters = new HashMap<>();
parameters.put(SocketEndpointConstants.PARAMETER_REMOTE_INPUT_ADDRESS, HOST);
parameters.put(SocketEndpointConstants.PARAMETER_REMOTE_INPUT_PORT, PORT);
// we close after send, so that the server can read whole lines, as that is what we are sending
parameters.put(SocketEndpointConstants.PARAMETER_CLOSE_AFTER_SEND, Boolean.TRUE.toString());
CommunicationEndpoint endpoint = new ClientSocketEndpoint();
SocketMessageVisitor messageVisitor = new SocketMessageVisitorExtension(CLIENT_CONNECTION_ID);
this.clientConnection = new CommunicationConnection(CLIENT_CONNECTION_ID, ConnectionMode.ON, parameters,
endpoint, messageVisitor);
this.clientConnection.configure();
}
{
Map<String, String> parameters = new HashMap<>();
parameters.put(SocketEndpointConstants.PARAMETER_LOCAL_INPUT_ADDRESS, HOST);
parameters.put(SocketEndpointConstants.PARAMETER_LOCAL_INPUT_PORT, PORT);
CommunicationEndpoint endpoint = new ServerSocketEndpoint();
SocketMessageVisitor messageVisitor = new SocketMessageVisitorExtension(SERVER_CONNECTION_ID);
this.serverConnection = new CommunicationConnection(SERVER_CONNECTION_ID, ConnectionMode.ON, parameters,
endpoint, messageVisitor);
this.serverConnection.configure();
}
}
@After
public void after() {
if (this.clientConnection != null)
this.clientConnection.stop();
if (this.serverConnection != null)
this.serverConnection.stop();
}
@Test
public void testSocketEndpoints() throws Exception {
this.serverConnection.start();
Thread.sleep(100);
this.clientConnection.start();
TestConnectionObserver serverObserver = new TestConnectionObserver();
CommandKey inboundKey = CommandKey.key(SERVER_CONNECTION_ID, "lines"); //$NON-NLS-1$
this.serverConnection.addConnectionObserver(inboundKey, serverObserver);
TestConnectionObserver clientObserver = new TestConnectionObserver();
CommandKey outboundKey = CommandKey.key(CLIENT_CONNECTION_ID, "lines"); //$NON-NLS-1$
this.clientConnection.addConnectionObserver(outboundKey, clientObserver);
TestIoMessage outboundMsg = createTestMessage(outboundKey, CLIENT_CONNECTION_ID);
this.clientConnection.send(outboundMsg);
waitForMessage(clientObserver);
assertEquals(outboundMsg.getKey(), clientObserver.getMessage().getKey());
waitForMessage(serverObserver);
assertEquals(inboundKey, serverObserver.getMessage().getKey());
assertEquals(outboundMsg.getContents(), ((TestIoMessage) serverObserver.getMessage()).getContents());
}
private final class SocketMessageVisitorExtension extends SocketMessageVisitor {
public SocketMessageVisitorExtension(String connectionId) {
super(connectionId);
}
@Override
public void visit(DataInputStream inputStream, DataOutputStream outputStream, IoMessage message)
throws Exception {
TestIoMessage msg = (TestIoMessage) message;
logger.info(MessageFormat
.format("Writing {0} lines for message {1}", msg.getContents().size(), msg.getId())); //$NON-NLS-1$
for (String line : msg.getContents()) {
outputStream.writeBytes(line);
outputStream.write('\n');
}
outputStream.flush();
}
@Override
public IoMessage visit(DataInputStream inputStream, DataOutputStream outputStream) throws Exception {
List<String> lines = new ArrayList<>();
// since we are reading whole lines, we must close the stream when we read null i.e. EOF
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
String line;
logger.info("Reading from stream..."); //$NON-NLS-1$
while ((line = reader.readLine()) != null) {
lines.add(line);
}
}
logger.info(MessageFormat.format("Read {0} lines from stream.", lines.size())); //$NON-NLS-1$
return new TestIoMessage(UUID.randomUUID().toString(), CommandKey.key(SERVER_CONNECTION_ID, "lines"),
SERVER_CONNECTION_ID, lines); //$NON-NLS-1$
}
}
}

View File

@ -1,41 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication;
import java.text.MessageFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Robert von Burg &lt;eitch@eitchnet.ch&gt;
*/
public class TestConnectionObserver implements ConnectionObserver {
private static final Logger logger = LoggerFactory.getLogger(FileEndpointTest.class);
private IoMessage message;
public IoMessage getMessage() {
return this.message;
}
@Override
public void notify(CommandKey key, IoMessage message) {
this.message = message;
logger.info(MessageFormat.format("Received message with key {0} and message {1}", key, message)); //$NON-NLS-1$
}
}

View File

@ -1,43 +0,0 @@
/*
* Copyright 2014 Robert von Burg <eitch@eitchnet.ch>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package li.strolch.communication;
import java.util.List;
/**
* @author Robert von Burg &lt;eitch@eitchnet.ch&gt;
*/
public class TestIoMessage extends IoMessage {
private List<String> contents;
public TestIoMessage(String id, CommandKey key, String connectionId) {
super(id, key, connectionId);
}
public TestIoMessage(String id, CommandKey key, String connectionId, List<String> contents) {
super(id, key, connectionId);
this.contents = contents;
}
public List<String> getContents() {
return this.contents;
}
public void setContents(List<String> contents) {
this.contents = contents;
}
}