[Major] Added simulated mode, and async message sending with queue for start-up

This commit is contained in:
Robert von Burg 2020-05-26 17:39:18 +02:00
parent e0119b5270
commit befa810fe4
17 changed files with 430 additions and 105 deletions

View File

@ -10,9 +10,13 @@ import static li.strolch.utils.helper.StringHelper.formatNanoDuration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import li.strolch.agent.api.ComponentContainer;
import li.strolch.agent.api.StrolchComponent;
import li.strolch.handler.operationslog.LogMessage;
import li.strolch.model.Locator;
import li.strolch.model.Resource;
import li.strolch.model.StrolchValueType;
import li.strolch.model.parameter.Parameter;
@ -20,11 +24,12 @@ import li.strolch.model.parameter.StringParameter;
import li.strolch.model.visitor.SetParameterValueVisitor;
import li.strolch.persistence.api.StrolchTransaction;
import li.strolch.plc.core.hw.*;
import li.strolch.plc.model.*;
import li.strolch.plc.model.PlcAddress;
import li.strolch.plc.model.PlcAddressType;
import li.strolch.plc.model.PlcState;
import li.strolch.privilege.model.Certificate;
import li.strolch.privilege.model.PrivilegeContext;
import li.strolch.runtime.configuration.ComponentConfiguration;
import li.strolch.utils.I18nMessage;
import li.strolch.utils.collections.MapOfMaps;
import li.strolch.utils.dbc.DBC;
@ -32,13 +37,21 @@ public class DefaultPlcHandler extends StrolchComponent implements PlcHandler, P
public static final int SILENT_THRESHOLD = 60;
private PrivilegeContext ctx;
private String plcId;
private Plc plc;
private PlcState plcState;
private String plcStateMsg;
private MapOfMaps<String, String, PlcAddress> plcAddresses;
private MapOfMaps<String, String, PlcAddress> plcTelegrams;
private Map<PlcAddress, String> addressesToResourceId;
private GlobalPlcListener globalListener;
private LinkedBlockingDeque<MessageTask> messageQueue;
private int maxMessageQueue;
private boolean run;
private Future<?> messageSenderTask;
private boolean verbose;
public DefaultPlcHandler(ComponentContainer container, String componentName) {
@ -50,6 +63,11 @@ public class DefaultPlcHandler extends StrolchComponent implements PlcHandler, P
return super.getContainer();
}
@Override
public String getPlcId() {
return this.plcId;
}
@Override
public Plc getPlc() {
return this.plc;
@ -90,6 +108,8 @@ public class DefaultPlcHandler extends StrolchComponent implements PlcHandler, P
@Override
public void initialize(ComponentConfiguration configuration) throws Exception {
this.plcId = configuration.getString("plcId", null);
// validate Plc class name
String plcClassName = configuration.getString("plcClass", DefaultPlc.class.getName());
Class.forName(plcClassName);
@ -101,12 +121,19 @@ public class DefaultPlcHandler extends StrolchComponent implements PlcHandler, P
this.addressesToResourceId = new HashMap<>();
this.verbose = configuration.getBoolean("verbose", false);
this.maxMessageQueue = configuration.getInt("maxMessageQueue", 100);
this.messageQueue = new LinkedBlockingDeque<>();
super.initialize(configuration);
}
@Override
public void start() throws Exception {
this.ctx = getContainer().getPrivilegeHandler().openAgentSystemUserContext();
this.run = true;
this.messageSenderTask = getExecutorService("LogSender").submit(this::sendMessages);
if (reconfigurePlc())
startPlc();
super.start();
@ -117,6 +144,10 @@ public class DefaultPlcHandler extends StrolchComponent implements PlcHandler, P
stopPlc();
if (this.ctx != null)
getContainer().getPrivilegeHandler().invalidate(this.ctx.getCertificate());
this.run = false;
this.messageSenderTask.cancel(false);
super.stop();
}
@ -311,8 +342,35 @@ public class DefaultPlcHandler extends StrolchComponent implements PlcHandler, P
}
@Override
public void sendMsg(I18nMessage msg, MessageState state) {
this.globalListener.sendMsg(msg, state);
public void sendMsg(LogMessage message) {
addMsg(new LogMessageTask(message));
}
@Override
public void disableMsg(Locator locator) {
addMsg(new DisableMessageTask(locator));
}
private synchronized void addMsg(MessageTask task) {
if (this.messageQueue.size() > this.maxMessageQueue)
this.messageQueue.removeFirst();
this.messageQueue.addLast(task);
}
private void sendMessages() {
while (this.run) {
try {
if (this.globalListener == null) {
Thread.sleep(100L);
continue;
}
this.messageQueue.takeFirst().accept(this.globalListener);
} catch (Exception e) {
logger.error("Failed to send message", e);
}
}
}
@Override
@ -358,4 +416,34 @@ public class DefaultPlcHandler extends StrolchComponent implements PlcHandler, P
public void notifyStateChange(PlcConnection connection) {
asyncUpdateState(connection);
}
private interface MessageTask {
void accept(GlobalPlcListener listener);
}
private static class LogMessageTask implements MessageTask {
private LogMessage message;
private LogMessageTask(LogMessage message) {
this.message = message;
}
@Override
public void accept(GlobalPlcListener listener) {
listener.sendMsg(message);
}
}
private static class DisableMessageTask implements MessageTask {
private Locator locator;
private DisableMessageTask(Locator locator) {
this.locator = locator;
}
@Override
public void accept(GlobalPlcListener listener) {
listener.disableMsg(locator);
}
}
}

View File

@ -1,10 +1,12 @@
package li.strolch.plc.core;
import li.strolch.handler.operationslog.LogMessage;
import li.strolch.model.Locator;
import li.strolch.plc.core.hw.PlcListener;
import li.strolch.plc.model.MessageState;
import li.strolch.utils.I18nMessage;
public interface GlobalPlcListener extends PlcListener {
void sendMsg(I18nMessage msg, MessageState state);
void sendMsg(LogMessage message);
void disableMsg(Locator locator);
}

View File

@ -1,19 +1,21 @@
package li.strolch.plc.core;
import li.strolch.agent.api.ComponentContainer;
import li.strolch.handler.operationslog.LogMessage;
import li.strolch.model.Locator;
import li.strolch.persistence.api.StrolchTransaction;
import li.strolch.plc.core.hw.Plc;
import li.strolch.plc.core.hw.PlcListener;
import li.strolch.plc.model.MessageState;
import li.strolch.plc.model.PlcAddress;
import li.strolch.plc.model.PlcState;
import li.strolch.privilege.model.Certificate;
import li.strolch.utils.I18nMessage;
public interface PlcHandler {
ComponentContainer getContainer();
String getPlcId();
PlcState getPlcState();
String getPlcStateMsg();
@ -42,7 +44,9 @@ public interface PlcHandler {
void notify(String resource, String action, Object value);
void sendMsg(I18nMessage msg, MessageState state);
void sendMsg(LogMessage message);
void disableMsg(Locator locator);
StrolchTransaction openTx(Certificate cert, boolean readOnly);
}

View File

@ -2,25 +2,30 @@ package li.strolch.plc.core;
import static li.strolch.plc.model.PlcConstants.PARAM_VALUE;
import static li.strolch.plc.model.PlcConstants.TYPE_PLC_ADDRESS;
import static li.strolch.runtime.StrolchConstants.DEFAULT_REALM;
import static li.strolch.runtime.StrolchConstants.SYSTEM_USER_AGENT;
import static li.strolch.utils.helper.ExceptionHelper.getCallerMethod;
import java.util.ResourceBundle;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import li.strolch.agent.api.ComponentContainer;
import li.strolch.handler.operationslog.LogMessage;
import li.strolch.handler.operationslog.LogMessageState;
import li.strolch.handler.operationslog.LogSeverity;
import li.strolch.model.Locator;
import li.strolch.model.Resource;
import li.strolch.model.parameter.Parameter;
import li.strolch.persistence.api.StrolchTransaction;
import li.strolch.plc.core.hw.PlcListener;
import li.strolch.plc.model.MessageState;
import li.strolch.plc.model.PlcAddress;
import li.strolch.plc.model.PlcAddressKey;
import li.strolch.plc.model.PlcServiceState;
import li.strolch.privilege.model.PrivilegeContext;
import li.strolch.runtime.privilege.PrivilegedRunnable;
import li.strolch.runtime.privilege.PrivilegedRunnableWithResult;
import li.strolch.utils.I18nMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -81,8 +86,54 @@ public abstract class PlcService implements PlcListener {
return addressParam.getValue();
}
protected void sendMsg(I18nMessage msg, MessageState state) {
this.plcHandler.sendMsg(msg, state);
protected void enableMsg(PlcAddressKey addressKey, ResourceBundle bundle, LogSeverity severity) {
sendMsg(logMessageFor(addressKey, bundle, severity, LogMessageState.Active));
}
protected void disableMsg(PlcAddressKey addressKey) {
disableMsg(Locator.valueOf("Plc", this.plcHandler.getPlcId(), addressKey.resource, addressKey.action));
}
protected void enableMsg(String i18nKey, ResourceBundle bundle, LogSeverity severity) {
sendMsg(logMessageFor(i18nKey, bundle, severity, LogMessageState.Active));
}
protected void disableMsg(String i18nKey, ResourceBundle bundle) {
disableMsg(Locator.valueOf("Plc", this.plcHandler.getPlcId(), bundle.getBaseBundleName(), i18nKey));
}
protected void sendMsg(String i18nKey, ResourceBundle bundle, LogSeverity severity) {
sendMsg(logMessageFor(i18nKey, bundle, severity));
}
protected LogMessage logMessageFor(PlcAddressKey addressKey, ResourceBundle bundle, LogSeverity severity) {
return logMessageFor(addressKey, bundle, severity, LogMessageState.Information);
}
protected LogMessage logMessageFor(PlcAddressKey addressKey, ResourceBundle bundle, LogSeverity severity,
LogMessageState state) {
return new LogMessage(DEFAULT_REALM, SYSTEM_USER_AGENT,
Locator.valueOf("Plc", this.plcHandler.getPlcId(), addressKey.resource, addressKey.action), severity,
state, bundle, addressKey.toKey());
}
protected LogMessage logMessageFor(String i18nKey, ResourceBundle bundle, LogSeverity severity) {
return logMessageFor(i18nKey, bundle, severity, LogMessageState.Information);
}
protected LogMessage logMessageFor(String i18nKey, ResourceBundle bundle, LogSeverity severity,
LogMessageState state) {
return new LogMessage(DEFAULT_REALM, SYSTEM_USER_AGENT,
Locator.valueOf("Plc", this.plcHandler.getPlcId(), bundle.getBaseBundleName(), i18nKey), severity,
state, bundle, i18nKey);
}
protected void sendMsg(LogMessage logMessage) {
this.plcHandler.sendMsg(logMessage);
}
protected void disableMsg(Locator locator) {
this.plcHandler.disableMsg(locator);
}
protected void send(String resource, String action) {

View File

@ -1,5 +1,6 @@
package li.strolch.plc.core.hw.connections;
import static li.strolch.plc.model.PlcConstants.PARAM_SIMULATED;
import static li.strolch.utils.helper.ExceptionHelper.getExceptionMessageWithCauses;
import java.io.IOException;
@ -51,6 +52,7 @@ public class DataLogicScannerConnection extends SimplePlcConnection {
@Override
public void initialize(Map<String, Object> parameters) throws Exception {
this.simulated = parameters.containsKey(PARAM_SIMULATED) && (boolean) parameters.get(PARAM_SIMULATED);
String address = (String) parameters.get("address");
String[] parts = address.split(":");
@ -67,6 +69,11 @@ public class DataLogicScannerConnection extends SimplePlcConnection {
@Override
public boolean connect() {
if (this.simulated) {
logger.warn(this.id + ": Running SIMULATED, NOT CONNECTING!");
return super.connect();
}
if (isConnected())
return true;
@ -90,6 +97,12 @@ public class DataLogicScannerConnection extends SimplePlcConnection {
@Override
public void disconnect() {
if (this.simulated) {
logger.warn(this.id + ": Running SIMULATED, NOT CONNECTING!");
super.disconnect();
return;
}
internalDisconnect();
super.disconnect();
}

View File

@ -5,11 +5,14 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import li.strolch.plc.model.ConnectionState;
import li.strolch.plc.core.hw.Plc;
import li.strolch.plc.core.hw.PlcConnection;
import li.strolch.plc.model.ConnectionState;
public abstract class SimplePlcConnection extends PlcConnection {
protected boolean simulated;
public SimplePlcConnection(Plc plc, String id) {
super(plc, id);
}
@ -22,6 +25,8 @@ public abstract class SimplePlcConnection extends PlcConnection {
@Override
public boolean connect() {
logger.info(this.id + ": Is now connected.");
if (this.simulated)
logger.info("Running SIMULATED");
this.connectionState = ConnectionState.Connected;
this.connectionStateMsg = "-";
this.plc.notifyConnectionStateChanged(this);

View File

@ -1,9 +1,13 @@
package li.strolch.plc.core.hw.gpio;
import static java.util.stream.Collectors.joining;
import static li.strolch.plc.model.PlcConstants.PARAM_SIMULATED;
import static li.strolch.utils.helper.ExceptionHelper.getExceptionMessageWithCauses;
import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.pi4j.io.gpio.*;
import com.pi4j.io.gpio.event.GpioPinDigitalStateChangeEvent;
@ -26,6 +30,8 @@ public class RaspiBcmGpioInputConnection extends SimplePlcConnection {
@Override
public void initialize(Map<String, Object> parameters) {
this.simulated = parameters.containsKey(PARAM_SIMULATED) && (boolean) parameters.get(PARAM_SIMULATED);
@SuppressWarnings("unchecked")
List<Integer> bcmInputPins = (List<Integer>) parameters.get("bcmInputPins");
this.inputBcmAddresses = bcmInputPins;
@ -54,6 +60,11 @@ public class RaspiBcmGpioInputConnection extends SimplePlcConnection {
@Override
public boolean connect() {
if (this.simulated) {
logger.warn(this.id + ": Running SIMULATED, NOT CONNECTING!");
return super.connect();
}
try {
GpioController gpioController = PlcGpioController.getInstance();
@ -89,6 +100,12 @@ public class RaspiBcmGpioInputConnection extends SimplePlcConnection {
@Override
public void disconnect() {
if (this.simulated) {
logger.warn(this.id + ": Running SIMULATED, NOT CONNECTING!");
super.disconnect();
return;
}
try {
GpioController gpioController = PlcGpioController.getInstance();
for (GpioPin inputPin : this.addressesByPin.keySet()) {

View File

@ -1,9 +1,13 @@
package li.strolch.plc.core.hw.gpio;
import static java.util.stream.Collectors.joining;
import static li.strolch.plc.model.PlcConstants.PARAM_SIMULATED;
import static li.strolch.utils.helper.ExceptionHelper.getExceptionMessageWithCauses;
import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.pi4j.io.gpio.*;
import li.strolch.plc.core.hw.Plc;
@ -22,6 +26,8 @@ public class RaspiBcmGpioOutputConnection extends SimplePlcConnection {
@Override
public void initialize(Map<String, Object> parameters) {
this.simulated = parameters.containsKey(PARAM_SIMULATED) && (boolean) parameters.get(PARAM_SIMULATED);
@SuppressWarnings("unchecked")
List<Integer> bcmOutputPins = (List<Integer>) parameters.get("bcmOutputPins");
this.outputBcmAddresses = bcmOutputPins;
@ -44,6 +50,11 @@ public class RaspiBcmGpioOutputConnection extends SimplePlcConnection {
@Override
public boolean connect() {
if (this.simulated) {
logger.warn(this.id + ": Running SIMULATED, NOT CONNECTING!");
return super.connect();
}
try {
GpioController gpioController = PlcGpioController.getInstance();
@ -65,6 +76,12 @@ public class RaspiBcmGpioOutputConnection extends SimplePlcConnection {
@Override
public void disconnect() {
if (this.simulated) {
logger.warn(this.id + ": Running SIMULATED, NOT CONNECTING!");
super.disconnect();
return;
}
try {
GpioController gpioController = PlcGpioController.getInstance();
for (GpioPinDigitalOutput outputPin : this.gpioPinsByAddress.values()) {
@ -80,6 +97,10 @@ public class RaspiBcmGpioOutputConnection extends SimplePlcConnection {
@Override
public void send(String address, Object value) {
if (this.simulated) {
logger.warn(this.id + ": Running SIMULATED, NOT CONNECTING!");
return;
}
boolean high = (boolean) value;
if (this.inverted)

View File

@ -1,5 +1,6 @@
package li.strolch.plc.core.hw.i2c;
import static li.strolch.plc.model.PlcConstants.PARAM_SIMULATED;
import static li.strolch.utils.helper.ByteHelper.asBinary;
import static li.strolch.utils.helper.ByteHelper.isBitSet;
import static li.strolch.utils.helper.ExceptionHelper.getExceptionMessageWithCauses;
@ -46,6 +47,7 @@ public class PCF8574InputConnection extends SimplePlcConnection {
@Override
public void initialize(Map<String, Object> parameters) {
this.simulated = parameters.containsKey(PARAM_SIMULATED) && (boolean) parameters.get(PARAM_SIMULATED);
if (!parameters.containsKey("i2cBus"))
throw new IllegalArgumentException("Missing param i2cBus");
@ -86,6 +88,11 @@ public class PCF8574InputConnection extends SimplePlcConnection {
@Override
public boolean connect() {
if (this.simulated) {
logger.warn(this.id + ": Running SIMULATED, NOT CONNECTING!");
return super.connect();
}
if (isConnected()) {
logger.warn(this.id + ": Already connected");
return true;
@ -148,6 +155,12 @@ public class PCF8574InputConnection extends SimplePlcConnection {
@Override
public void disconnect() {
if (this.simulated) {
super.disconnect();
logger.warn(this.id + ": Running SIMULATED, NOT CONNECTING!");
return;
}
if (this.interruptGpioPin != null) {
this.interruptGpioPin.removeAllListeners();
PlcGpioController.getInstance().unprovisionPin(this.interruptGpioPin);

View File

@ -1,5 +1,6 @@
package li.strolch.plc.core.hw.i2c;
import static li.strolch.plc.model.PlcConstants.PARAM_SIMULATED;
import static li.strolch.utils.helper.ByteHelper.*;
import static li.strolch.utils.helper.ExceptionHelper.getExceptionMessageWithCauses;
import static li.strolch.utils.helper.StringHelper.toHexString;
@ -12,7 +13,6 @@ import com.pi4j.io.i2c.I2CDevice;
import com.pi4j.io.i2c.I2CFactory;
import li.strolch.plc.core.hw.Plc;
import li.strolch.plc.core.hw.connections.SimplePlcConnection;
import li.strolch.plc.model.ConnectionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,6 +36,7 @@ public class PCF8574OutputConnection extends SimplePlcConnection {
@Override
public void initialize(Map<String, Object> parameters) {
this.simulated = parameters.containsKey(PARAM_SIMULATED) && (boolean) parameters.get(PARAM_SIMULATED);
if (!parameters.containsKey("i2cBus"))
throw new IllegalArgumentException("Missing param i2cBus");
@ -65,6 +66,11 @@ public class PCF8574OutputConnection extends SimplePlcConnection {
@Override
public boolean connect() {
if (this.simulated) {
logger.warn(this.id + ": Running SIMULATED, NOT CONNECTING!");
return super.connect();
}
if (isConnected()) {
logger.warn(this.id + ": Already connected");
return true;
@ -121,12 +127,16 @@ public class PCF8574OutputConnection extends SimplePlcConnection {
@Override
public void disconnect() {
if (this.simulated) {
logger.warn(this.id + ": Running SIMULATED, NOT CONNECTING!");
super.disconnect();
return;
}
this.outputDevices = null;
this.states = null;
this.connectionState = ConnectionState.Disconnected;
this.connectionStateMsg = "-";
this.plc.notifyConnectionStateChanged(this);
super.disconnect();
}
@Override
@ -136,6 +146,11 @@ public class PCF8574OutputConnection extends SimplePlcConnection {
@Override
public void send(String address, Object value) {
if (this.simulated) {
logger.warn(this.id + ": Running SIMULATED, NOT CONNECTING!");
return;
}
assertConnected();
int[] pos = this.positionsByAddress.get(address);

View File

@ -1,5 +1,6 @@
package li.strolch.plc.core.hw.i2c;
import static li.strolch.plc.model.PlcConstants.PARAM_SIMULATED;
import static li.strolch.utils.helper.ExceptionHelper.getExceptionMessageWithCauses;
import static li.strolch.utils.helper.StringHelper.toHexString;
@ -66,6 +67,7 @@ public class RSL366OverHorterI2c extends SimplePlcConnection {
@Override
public void initialize(Map<String, Object> parameters) {
this.simulated = parameters.containsKey(PARAM_SIMULATED) && (boolean) parameters.get(PARAM_SIMULATED);
if (!parameters.containsKey("i2cBus"))
throw new IllegalArgumentException("Missing param i2cBus");
@ -89,6 +91,11 @@ public class RSL366OverHorterI2c extends SimplePlcConnection {
@Override
public boolean connect() {
if (this.simulated) {
logger.warn(this.id + ": Running SIMULATED, NOT CONNECTING!");
return super.connect();
}
if (isConnected()) {
logger.warn(this.id + ": Already connected");
return true;
@ -122,6 +129,19 @@ public class RSL366OverHorterI2c extends SimplePlcConnection {
}
}
@Override
public void disconnect() {
if (this.simulated) {
logger.warn(this.id + ": Running SIMULATED, NOT CONNECTING!");
super.disconnect();
return;
}
this.device = null;
super.disconnect();
}
@Override
public Set<String> getAddresses() {
return new TreeSet<>(this.positionsByAddress.keySet());
@ -129,6 +149,11 @@ public class RSL366OverHorterI2c extends SimplePlcConnection {
@Override
public void send(String address, Object value) {
if (this.simulated) {
logger.warn(this.id + ": Running SIMULATED, NOT CONNECTING!");
return;
}
assertConnected();
byte[] pos = this.positionsByAddress.get(address);

View File

@ -3,6 +3,7 @@ package li.strolch.plc.gw.client;
import static java.net.NetworkInterface.getByInetAddress;
import static li.strolch.model.Tags.Json.*;
import static li.strolch.plc.model.PlcConstants.*;
import static li.strolch.runtime.StrolchConstants.DEFAULT_REALM;
import static li.strolch.utils.helper.ExceptionHelper.*;
import static li.strolch.utils.helper.NetworkHelper.formatMacAddress;
import static li.strolch.utils.helper.StringHelper.isEmpty;
@ -13,24 +14,26 @@ import java.io.IOException;
import java.net.SocketException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import li.strolch.agent.api.*;
import li.strolch.handler.operationslog.LogMessage;
import li.strolch.model.Locator;
import li.strolch.model.Resource;
import li.strolch.model.i18n.I18nMessageToJsonVisitor;
import li.strolch.model.parameter.StringParameter;
import li.strolch.persistence.api.StrolchTransaction;
import li.strolch.plc.core.GlobalPlcListener;
import li.strolch.plc.core.PlcHandler;
import li.strolch.plc.model.*;
import li.strolch.plc.model.ConnectionState;
import li.strolch.plc.model.PlcAddress;
import li.strolch.plc.model.PlcResponseState;
import li.strolch.plc.model.PlcState;
import li.strolch.privilege.model.PrivilegeContext;
import li.strolch.runtime.configuration.ComponentConfiguration;
import li.strolch.utils.I18nMessage;
import li.strolch.utils.helper.NetworkHelper;
import org.glassfish.tyrus.client.ClientManager;
@ -57,6 +60,11 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
private ScheduledFuture<?> serverConnectFuture;
private LinkedBlockingDeque<Callable<?>> messageQueue;
private int maxMessageQueue;
private boolean run;
private Future<?> messageSenderTask;
private long lastSystemStateNotification;
private long ipAddressesUpdateTime;
private JsonArray ipAddresses;
@ -70,11 +78,14 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
public void initialize(ComponentConfiguration configuration) throws Exception {
this.verbose = configuration.getBoolean("verbose", false);
this.plcId = configuration.getString("plcId", null);
this.plcId = getComponent(PlcHandler.class).getPlcId();
this.gwUsername = configuration.getString("gwUsername", null);
this.gwPassword = configuration.getString("gwPassword", null);
this.gwServerUrl = configuration.getString("gwServerUrl", null);
this.maxMessageQueue = configuration.getInt("maxMessageQueue", 100);
this.messageQueue = new LinkedBlockingDeque<>();
super.initialize(configuration);
}
@ -89,6 +100,9 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
delayConnect(INITIAL_DELAY, TimeUnit.SECONDS);
this.run = true;
this.messageSenderTask = getExecutorService("MessageSender").submit(this::sendMessages);
super.start();
}
@ -103,6 +117,9 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
@Override
public void stop() throws Exception {
this.run = false;
this.messageSenderTask.cancel(false);
notifyPlcConnectionState(ConnectionState.Disconnected);
if (this.gwSession != null) {
@ -212,7 +229,7 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
if (this.serverConnectFuture != null)
this.serverConnectFuture.cancel(true);
if (this.gwSession != null) {
if (this.gwSession != null && this.gwSession.isOpen()) {
try {
this.gwSession.close(new CloseReason(CloseCodes.UNEXPECTED_CONDITION, msg));
} catch (Exception e) {
@ -253,6 +270,7 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
stateJ.add(PARAM_IP_ADDRESSES, getIpAddresses());
stateJ.add(PARAM_VERSIONS, getVersions());
stateJ.add(PARAM_SYSTEM_STATE, getContainer().getAgent().getSystemState(1, TimeUnit.HOURS));
sendDataToClient(stateJ);
this.lastSystemStateNotification = System.currentTimeMillis();
}
@ -265,61 +283,75 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
}
}
private void sendMsgToServer(I18nMessage msg, MessageState state) {
if (!this.authenticated) {
logger.warn("Not yet authenticated with server, ignoring update for msg " + msg.getKey() + ": " + msg
.getMessage());
return;
}
private void addMsg(Callable<?> callable) {
if (this.messageQueue.size() > this.maxMessageQueue)
this.messageQueue.removeFirst();
this.messageQueue.addLast(callable);
}
JsonObject messageJ = new JsonObject();
messageJ.addProperty(PARAM_PLC_ID, this.plcId);
messageJ.addProperty(PARAM_MESSAGE_TYPE, MSG_TYPE_MESSAGE);
messageJ.addProperty(PARAM_STATE, state.name());
messageJ.add(PARAM_MESSAGE, msg.accept(new I18nMessageToJsonVisitor()));
@Override
public void sendMsg(LogMessage message) {
addMsg(() -> {
JsonObject messageJ = new JsonObject();
messageJ.addProperty(PARAM_PLC_ID, this.plcId);
messageJ.addProperty(PARAM_MESSAGE_TYPE, MSG_TYPE_MESSAGE);
messageJ.add(PARAM_MESSAGE, message.toJson());
try {
sendDataToClient(messageJ);
if (this.verbose)
logger.info("Sent msg " + msg.getKey() + " to server");
} catch (IOException e) {
logger.error("Failed to send notification to server", e);
}
logger.info("Sent msg " + message.getLocator() + " to server");
return null;
});
}
@Override
public void disableMsg(Locator locator) {
addMsg(() -> {
JsonObject messageJ = new JsonObject();
messageJ.addProperty(PARAM_PLC_ID, this.plcId);
messageJ.addProperty(PARAM_MESSAGE_TYPE, MSG_TYPE_DISABLE_MESSAGE);
messageJ.addProperty(PARAM_REALM, DEFAULT_REALM);
messageJ.addProperty(PARAM_LOCATOR, locator.toString());
sendDataToClient(messageJ);
if (this.verbose)
logger.info("Sent msg " + locator + " to server");
return null;
});
}
private void notifyServer(PlcAddress plcAddress, Object value) {
if (!this.authenticated) {
logger.warn("Not yet authenticated with server, ignoring update for " + plcAddress + ": " + value);
return;
}
addMsg(() -> {
JsonObject notificationJ = new JsonObject();
notificationJ.addProperty(PARAM_PLC_ID, this.plcId);
notificationJ.addProperty(PARAM_MESSAGE_TYPE, MSG_TYPE_PLC_NOTIFICATION);
notificationJ.addProperty(PARAM_RESOURCE, plcAddress.resource);
notificationJ.addProperty(PARAM_ACTION, plcAddress.action);
JsonObject notificationJ = new JsonObject();
notificationJ.addProperty(PARAM_PLC_ID, this.plcId);
notificationJ.addProperty(PARAM_MESSAGE_TYPE, MSG_TYPE_PLC_NOTIFICATION);
notificationJ.addProperty(PARAM_RESOURCE, plcAddress.resource);
notificationJ.addProperty(PARAM_ACTION, plcAddress.action);
if (value instanceof Boolean)
notificationJ.add(PARAM_VALUE, new JsonPrimitive((Boolean) value));
else if (value instanceof Number)
notificationJ.add(PARAM_VALUE, new JsonPrimitive((Number) value));
else if (value instanceof String)
notificationJ.add(PARAM_VALUE, new JsonPrimitive((String) value));
else
notificationJ.add(PARAM_VALUE, new JsonPrimitive(value.toString()));
if (value instanceof Boolean)
notificationJ.add(PARAM_VALUE, new JsonPrimitive((Boolean) value));
else if (value instanceof Number)
notificationJ.add(PARAM_VALUE, new JsonPrimitive((Number) value));
else if (value instanceof String)
notificationJ.add(PARAM_VALUE, new JsonPrimitive((String) value));
else
notificationJ.add(PARAM_VALUE, new JsonPrimitive(value.toString()));
try {
sendDataToClient(notificationJ);
if (this.verbose)
logger.info("Sent notification for " + plcAddress.toKey() + " to server");
} catch (IOException e) {
logger.error("Failed to send notification to server", e);
}
return null;
});
}
public void onWsMessage(Session session, String message) {
//logger.info(session.getId() + ": Handling message");
public void onWsMessage(String message) {
JsonObject jsonObject = new JsonParser().parse(message).getAsJsonObject();
if (!jsonObject.has(PARAM_MESSAGE_TYPE)) {
logger.error("Received data has no message type!");
@ -334,7 +366,7 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
if (MSG_TYPE_AUTHENTICATION.equals(messageType)) {
handleAuthResponse(ctx, jsonObject);
} else if (MSG_TYPE_PLC_TELEGRAM.equals(messageType)) {
handleTelegram(ctx, jsonObject);
handleTelegram(jsonObject);
} else {
logger.error("Unhandled message type " + messageType);
}
@ -344,7 +376,7 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
}
}
private void handleTelegram(PrivilegeContext ctx, JsonObject telegramJ) {
private void handleTelegram(JsonObject telegramJ) {
PlcAddress plcAddress = null;
@ -383,14 +415,15 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
}
}
try {
// async sending of response
PlcAddress address = plcAddress;
addMsg(() -> {
sendDataToClient(telegramJ);
if (this.verbose)
logger.info("Sent Telegram response for " + (plcAddress == null ? "unknown" : plcAddress.toKey())
+ " to server");
} catch (IOException e) {
logger.error("Failed to send notification to server", e);
}
logger.info(
"Sent Telegram response for " + (address == null ? "unknown" : address.toKey()) + " to server");
return null;
});
}
private void handleAuthResponse(PrivilegeContext ctx, JsonObject response) {
@ -452,7 +485,7 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
}
@SuppressWarnings("SynchronizeOnNonFinalField")
private synchronized void sendDataToClient(JsonObject jsonObject) throws IOException {
private void sendDataToClient(JsonObject jsonObject) throws IOException {
String data = jsonObject.toString();
synchronized (this.gwSession) {
RemoteEndpoint.Basic basic = this.gwSession.getBasicRemote();
@ -465,6 +498,36 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
}
}
private void sendMessages() {
while (this.run) {
if (!this.authenticated) {
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
logger.error("Interrupted!");
if (!this.run)
return;
}
continue;
}
Callable<?> callable = null;
try {
callable = this.messageQueue.takeFirst();
callable.call();
} catch (Exception e) {
if (e instanceof IOException) {
closeBrokenGwSessionUpdateState("Failed to send message", "Failed to send message");
this.messageQueue.addFirst(callable);
}
logger.error("Failed to send message", e);
}
}
}
private void saveServerConnectionState(PrivilegeContext ctx, ConnectionState state, String stateMsg) {
StrolchRealm realm = getContainer().getRealm(ctx.getCertificate());
@ -525,11 +588,6 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
return this.ipAddresses;
}
@Override
public void sendMsg(I18nMessage msg, MessageState state) {
sendMsgToServer(msg, state);
}
@Override
public void handleNotification(PlcAddress address, Object value) {
getExecutorService(THREAD_POOL).submit(() -> notifyServer(address, value));
@ -545,8 +603,8 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
}
@OnMessage
public void onMessage(String message, Session session) {
this.gwHandler.onWsMessage(session, message);
public void onMessage(String message) {
this.gwHandler.onWsMessage(message);
}
@OnMessage

View File

@ -19,6 +19,10 @@ import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import li.strolch.agent.api.ComponentContainer;
import li.strolch.agent.api.StrolchComponent;
import li.strolch.handler.operationslog.LogMessage;
import li.strolch.handler.operationslog.LogMessageState;
import li.strolch.handler.operationslog.OperationsLog;
import li.strolch.model.Locator;
import li.strolch.plc.model.*;
import li.strolch.privilege.base.NotAuthenticatedException;
import li.strolch.privilege.model.Certificate;
@ -261,7 +265,6 @@ public class PlcGwServerHandler extends StrolchComponent {
}
public void onWsMessage(String message, Session session) {
//logger.info(session.getId() + ": Handling message");
JsonObject jsonObject = new JsonParser().parse(message).getAsJsonObject();
if (!jsonObject.has(PARAM_MESSAGE_TYPE))
@ -280,8 +283,14 @@ public class PlcGwServerHandler extends StrolchComponent {
}
case MSG_TYPE_MESSAGE: {
PlcSession plcSession = assertPlcAuthed(plcId, session.getId());
handleMessage(plcSession, jsonObject);
assertPlcAuthed(plcId, session.getId());
handleMessage(jsonObject);
break;
}
case MSG_TYPE_DISABLE_MESSAGE: {
assertPlcAuthed(plcId, session.getId());
handleDisableMessage(jsonObject);
break;
}
@ -364,15 +373,20 @@ public class PlcGwServerHandler extends StrolchComponent {
plcResponse.getListener().run();
}
private void handleMessage(PlcSession plcSession, JsonObject jsonObject) {
MessageState state = MessageState.valueOf(jsonObject.get(PARAM_STATE).getAsString());
private void handleMessage(JsonObject jsonObject) {
JsonObject msgJ = jsonObject.get(PARAM_MESSAGE).getAsJsonObject();
LogMessage logMessage = LogMessage.fromJson(msgJ);
logger.info("Received message " + logMessage.getLocator());
getComponent(OperationsLog.class).addMessage(logMessage);
}
// I18nMessage i18nMessage = new I18nMessage();
private void handleDisableMessage(JsonObject jsonObject) {
String realm = jsonObject.get(PARAM_REALM).getAsString();
Locator locator = Locator.valueOf(jsonObject.get(PARAM_LOCATOR).getAsString());
logger.info("Received disable for messages with locator " + locator);
// TODO
OperationsLog operationsLog = getComponent(OperationsLog.class);
operationsLog.updateState(realm, locator, LogMessageState.Inactive);
}
private void handleAuth(String sessionId, JsonObject authJ) {
@ -577,7 +591,7 @@ public class PlcGwServerHandler extends StrolchComponent {
}
public void onWsError(Session session, Throwable throwable) {
logger.error(session.getId() + ": Error: " + throwable.getMessage(), true);
logger.error(session.getId() + ": Error: " + throwable.getMessage(), throwable);
}
public static class PlcSession {

View File

@ -10,6 +10,7 @@ import java.util.concurrent.TimeUnit;
import li.strolch.agent.api.ComponentContainer;
import li.strolch.execution.ExecutionHandler;
import li.strolch.handler.operationslog.LogMessage;
import li.strolch.handler.operationslog.LogMessageState;
import li.strolch.handler.operationslog.LogSeverity;
import li.strolch.handler.operationslog.OperationsLog;
import li.strolch.model.Resource;
@ -140,7 +141,7 @@ public abstract class PlcGwService implements PlcNotificationListener, PlcAddres
getOperationsLogs().addMessage(
new LogMessage(this.container.getRealmNames().iterator().next(), SYSTEM_USER_AGENT,
Resource.locatorFor(TYPE_PLC, this.plcId), LogSeverity.Exception,
ResourceBundle.getBundle("strolch-plc-gw-server"), "systemAction.failed")
LogMessageState.Information, ResourceBundle.getBundle("strolch-plc-gw-server"), "systemAction.failed")
.withException(e).value("action", runnable).value("reason", e));
}
}

View File

@ -8,6 +8,7 @@ import java.util.Set;
import li.strolch.execution.policy.SimpleExecution;
import li.strolch.handler.operationslog.LogMessage;
import li.strolch.handler.operationslog.LogMessageState;
import li.strolch.handler.operationslog.LogSeverity;
import li.strolch.model.activity.Action;
import li.strolch.persistence.api.StrolchTransaction;
@ -115,13 +116,13 @@ public abstract class PlcExecutionPolicy extends SimpleExecution
protected LogMessage msgPlcNotConnected() {
return new LogMessage(this.realm, SYSTEM_USER_AGENT, this.actionLoc, LogSeverity.Error,
BUNDLE_STROLCH_PLC_GW_SERVER, "execution.plc.notConnected").value("plc", getPlcId());
LogMessageState.Information, BUNDLE_STROLCH_PLC_GW_SERVER, "execution.plc.notConnected").value("plc", getPlcId());
}
protected LogMessage msgFailedToSendMessage(PlcAddressResponse response) {
PlcAddressKey key = response.getPlcAddressKey();
return new LogMessage(this.realm, SYSTEM_USER_AGENT, this.actionLoc, LogSeverity.Error,
BUNDLE_STROLCH_PLC_GW_SERVER, "execution.plc.sendMessage.failed") //
LogMessageState.Information, BUNDLE_STROLCH_PLC_GW_SERVER, "execution.plc.sendMessage.failed") //
.value("plc", getPlcId()) //
.value("key", key) //
.value("msg", response.getStateMsg());
@ -129,6 +130,6 @@ public abstract class PlcExecutionPolicy extends SimpleExecution
protected LogMessage msgConnectionLostToPlc() {
return new LogMessage(this.realm, SYSTEM_USER_AGENT, this.actionLoc, LogSeverity.Error,
BUNDLE_STROLCH_PLC_GW_SERVER, "execution.plc.connectionLost").value("plc", getPlcId());
LogMessageState.Information, BUNDLE_STROLCH_PLC_GW_SERVER, "execution.plc.connectionLost").value("plc", getPlcId());
}
}

View File

@ -1,7 +0,0 @@
package li.strolch.plc.model;
public enum MessageState {
Active,
Inactive,
Information
}

View File

@ -48,6 +48,9 @@ public class PlcConstants {
public static final String PARAM_VERSIONS = "versions";
public static final String PARAM_LOCAL_IP = "localIp";
public static final String PARAM_MESSAGE = "message";
public static final String PARAM_LOCATOR = "locator";
public static final String PARAM_REALM = "realm";
public static final String PARAM_SIMULATED = "simulated";
public static final String INTERPRETATION_NOTIFICATION = "Notification";
public static final String INTERPRETATION_TELEGRAM = "Telegram";
@ -56,5 +59,6 @@ public class PlcConstants {
public static final String MSG_TYPE_PLC_NOTIFICATION = "PlcNotification";
public static final String MSG_TYPE_PLC_TELEGRAM = "PlcTelegram";
public static final String MSG_TYPE_MESSAGE = "Message";
public static final String MSG_TYPE_DISABLE_MESSAGE = "DisableMessage";
public static final String MSG_TYPE_STATE_NOTIFICATION = "StateNotification";
}