Merge branch 'develop' into release/1.2

This commit is contained in:
Robert von Burg 2022-10-14 10:17:56 +02:00
commit 95fc5351b5
Signed by: eitch
GPG Key ID: 75DB9C85C74331F7
17 changed files with 582 additions and 243 deletions

View File

@ -62,9 +62,6 @@
<Privilege name="li.strolch.report.ReportSearch" policy="DefaultPrivilege">
<AllAllowed>true</AllAllowed>
</Privilege>
<Privilege name="li.strolch.model.query.StrolchQuery" policy="DefaultPrivilege">
<AllAllowed>true</AllAllowed>
</Privilege>
<Privilege name="li.strolch.job.StrolchJob" policy="DefaultPrivilege">
<AllAllowed>true</AllAllowed>
</Privilege>
@ -182,9 +179,6 @@
<Privilege name="PrivilegeGetUser" policy="UserAccessPrivilege">
<AllAllowed>true</AllAllowed>
</Privilege>
<Privilege name="li.strolch.model.query.StrolchQuery" policy="DefaultPrivilege">
<AllAllowed>true</AllAllowed>
</Privilege>
<Privilege name="li.strolch.service.api.Service" policy="DefaultPrivilege">
<AllAllowed>true</AllAllowed>
</Privilege>

View File

@ -56,6 +56,8 @@
<buildTimestamp>${maven.build.timestamp}</buildTimestamp>
<jdk.version>17</jdk.version>
<maven.compiler.source>${jdk.version}</maven.compiler.source>
<maven.compiler.target>${jdk.version}</maven.compiler.target>
<!-- compile time dependencies -->
<slf4j.version>1.7.30</slf4j.version>

View File

@ -149,8 +149,6 @@ public class DefaultPlcHandler extends StrolchComponent implements PlcHandler, P
@Override
public void stop() throws Exception {
stopPlc();
if (this.ctx != null)
getContainer().getPrivilegeHandler().invalidate(this.ctx.getCertificate());
this.run = false;
if (this.messageSenderTask != null)
@ -158,6 +156,9 @@ public class DefaultPlcHandler extends StrolchComponent implements PlcHandler, P
if (this.updateStateTask != null)
this.updateStateTask.cancel(true);
if (this.ctx != null)
getContainer().getPrivilegeHandler().invalidate(this.ctx.getCertificate());
super.stop();
}

View File

@ -8,9 +8,6 @@
<Privilege name="li.strolch.service.api.Service" policy="DefaultPrivilege">
<AllAllowed>true</AllAllowed>
</Privilege>
<Privilege name="li.strolch.model.query.StrolchQuery" policy="DefaultPrivilege">
<AllAllowed>true</AllAllowed>
</Privilege>
<Privilege name="li.strolch.search.StrolchSearch" policy="DefaultPrivilege">
<AllAllowed>true</AllAllowed>
</Privilege>
@ -61,9 +58,6 @@
<Privilege name="li.strolch.service.api.Service" policy="DefaultPrivilege">
<AllAllowed>true</AllAllowed>
</Privilege>
<Privilege name="li.strolch.model.query.StrolchQuery" policy="DefaultPrivilege">
<AllAllowed>true</AllAllowed>
</Privilege>
<Privilege name="li.strolch.search.StrolchSearch" policy="DefaultPrivilege">
<AllAllowed>true</AllAllowed>
</Privilege>

View File

@ -2,6 +2,7 @@ package li.strolch.plc.gw.client;
import static java.net.NetworkInterface.getByInetAddress;
import static li.strolch.model.Tags.Json.*;
import static li.strolch.plc.model.ModelHelper.valueToJson;
import static li.strolch.plc.model.PlcConstants.*;
import static li.strolch.runtime.StrolchConstants.DEFAULT_REALM;
import static li.strolch.utils.helper.ExceptionHelper.*;
@ -17,12 +18,14 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import li.strolch.agent.api.*;
import li.strolch.model.Locator;
import li.strolch.model.Resource;
@ -34,6 +37,7 @@ import li.strolch.plc.core.PlcHandler;
import li.strolch.plc.model.*;
import li.strolch.privilege.model.PrivilegeContext;
import li.strolch.runtime.configuration.ComponentConfiguration;
import li.strolch.utils.CheckedRunnable;
import li.strolch.utils.helper.NetworkHelper;
import org.glassfish.tyrus.client.ClientManager;
@ -64,7 +68,7 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
private ScheduledFuture<?> serverConnectFuture;
private Map<PlcAddress, Object> notConnectedQueue;
private LinkedBlockingDeque<Callable<?>> messageQueue;
private LinkedBlockingDeque<CheckedRunnable> messageQueue;
private int maxMessageQueue;
private boolean run;
private Future<?> messageSenderTask;
@ -129,7 +133,7 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
this.run = false;
this.authenticated = false;
if (this.messageSenderTask != null)
this.messageSenderTask.cancel(false);
this.messageSenderTask.cancel(true);
notifyPlcConnectionState(ConnectionState.Disconnected);
@ -176,6 +180,10 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
logger.error(
"Connection refused to connect to server. Will try to connect again in " + RETRY_DELAY + "s: "
+ getExceptionMessageWithCauses(e));
} else if (rootCause.getMessage() != null && rootCause.getMessage()
.contains("Response code was not 101: 404.")) {
logger.error("Connection failed with 404 error code. Is URL " + this.gwServerUrl + " correct?");
logger.error("Server not yet ready with 404 error. Will try again in " + RETRY_DELAY + "s");
} else {
logger.error("Failed to connect to server! Will try to connect again in " + RETRY_DELAY + "s", e);
}
@ -300,15 +308,15 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
}
}
private void addMsg(Callable<?> callable) {
private void async(CheckedRunnable runnable) {
if (this.messageQueue.size() > this.maxMessageQueue)
this.messageQueue.removeFirst();
this.messageQueue.addLast(callable);
this.messageQueue.addLast(runnable);
}
@Override
public void sendMsg(LogMessage message) {
addMsg(() -> {
async(() -> {
JsonObject messageJ = new JsonObject();
messageJ.addProperty(PARAM_PLC_ID, this.plcId);
@ -318,14 +326,12 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
sendDataToClient(messageJ);
if (this.verbose)
logger.info("Sent msg " + message.getLocator() + " to server");
return null;
});
}
@Override
public void disableMsg(Locator locator) {
addMsg(() -> {
async(() -> {
JsonObject messageJ = new JsonObject();
messageJ.addProperty(PARAM_PLC_ID, this.plcId);
@ -336,8 +342,6 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
sendDataToClient(messageJ);
if (this.verbose)
logger.info("Sent msg " + locator + " to server");
return null;
});
}
@ -345,29 +349,19 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
if (!plcAddress.remote)
return;
addMsg(() -> {
async(() -> {
JsonObject notificationJ = new JsonObject();
notificationJ.addProperty(PARAM_PLC_ID, this.plcId);
notificationJ.addProperty(PARAM_MESSAGE_TYPE, MSG_TYPE_PLC_NOTIFICATION);
notificationJ.addProperty(PARAM_RESOURCE, plcAddress.resource);
notificationJ.addProperty(PARAM_ACTION, plcAddress.action);
if (value instanceof Boolean)
notificationJ.add(PARAM_VALUE, new JsonPrimitive((Boolean) value));
else if (value instanceof Number)
notificationJ.add(PARAM_VALUE, new JsonPrimitive((Number) value));
else if (value instanceof String)
notificationJ.add(PARAM_VALUE, new JsonPrimitive((String) value));
else
notificationJ.add(PARAM_VALUE, new JsonPrimitive(value.toString()));
notificationJ.add(PARAM_VALUE, valueToJson(value));
sendDataToClient(notificationJ);
if (this.verbose)
logger.info("Sent notification for " + plcAddress.toKey() + " to server");
return null;
});
}
@ -382,11 +376,12 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
try {
runAsAgent(ctx -> {
if (MSG_TYPE_AUTHENTICATION.equals(messageType)) {
handleAuthResponse(ctx, jsonObject);
} else if (MSG_TYPE_PLC_TELEGRAM.equals(messageType)) {
handleTelegram(jsonObject);
async(() -> handleTelegram(jsonObject));
} else if (MSG_TYPE_PLC_GET_ADDRESS_STATE.equals(messageType)) {
async(() -> handleGetAddressState(ctx, jsonObject));
} else {
logger.error("Unhandled message type " + messageType);
}
@ -396,54 +391,55 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
}
}
private void handleTelegram(JsonObject telegramJ) {
private void handleGetAddressState(PrivilegeContext ctx, JsonObject telegramJ) throws Exception {
PlcAddress plcAddress = null;
try (StrolchTransaction tx = openTx(ctx.getCertificate(), true)) {
plcAddress = parsePlcAddress(telegramJ);
String plcAddressId = this.plcHandler.getPlcAddressId(plcAddress.resource, plcAddress.action);
Resource address = tx.getResourceBy(TYPE_PLC_ADDRESS, plcAddressId, true);
Object value = address.getParameter(PARAM_VALUE, true).getValue();
telegramJ.add(PARAM_VALUE, valueToJson(value));
telegramJ.addProperty(PARAM_STATE, PlcResponseState.Done.name());
telegramJ.addProperty(PARAM_STATE_MSG, "");
if (this.verbose)
logger.info("Sent address state for " + plcAddress.toKey() + " = " + value + " to server");
} catch (Exception e) {
handleFailedTelegram(telegramJ, plcAddress, e);
}
sendDataToClient(telegramJ);
}
private void handleTelegram(JsonObject telegramJ) throws Exception {
PlcAddress plcAddress = null;
try {
if (!telegramJ.has(PARAM_RESOURCE) || !telegramJ.has(PARAM_ACTION))
throw new IllegalArgumentException("Both " + PARAM_RESOURCE + " and " + PARAM_ACTION + " is required!");
String resource = telegramJ.get(PARAM_RESOURCE).getAsString();
String action = telegramJ.get(PARAM_ACTION).getAsString();
plcAddress = this.plcHandler.getPlcAddress(resource, action);
plcAddress = parsePlcAddress(telegramJ);
if (telegramJ.has(PARAM_VALUE)) {
String valueS = telegramJ.get(PARAM_VALUE).getAsString();
Object value = plcAddress.valueType.parseValue(valueS);
this.plcHandler.send(resource, action, value, false, false);
this.plcHandler.send(plcAddress.resource, plcAddress.action, value, false, false);
} else {
this.plcHandler.send(resource, action, false, false);
this.plcHandler.send(plcAddress.resource, plcAddress.action, false, false);
}
telegramJ.addProperty(PARAM_STATE, PlcResponseState.Done.name());
telegramJ.addProperty(PARAM_STATE_MSG, "");
} catch (Exception e) {
if (plcAddress == null) {
logger.error("Failed to handle telegram: " + telegramJ, e);
telegramJ.addProperty(PARAM_STATE, PlcResponseState.Failed.name());
telegramJ.addProperty(PARAM_STATE_MSG,
"Could not evaluate PlcAddress: " + getExceptionMessage(getRootCause(e), false));
} else {
logger.error("Failed to execute telegram: " + plcAddress.toKeyAddress(), e);
telegramJ.addProperty(PARAM_STATE, PlcResponseState.Failed.name());
telegramJ.addProperty(PARAM_STATE_MSG,
"Failed to perform " + plcAddress.toKey() + ": " + getExceptionMessage(getRootCause(e), false));
}
handleFailedTelegram(telegramJ, plcAddress, e);
}
// async sending of response
PlcAddress address = plcAddress;
addMsg(() -> {
sendDataToClient(telegramJ);
if (this.verbose)
logger.info(
"Sent Telegram response for " + (address == null ? "unknown" : address.toKey()) + " to server");
return null;
});
sendDataToClient(telegramJ);
if (this.verbose)
logger.info("Sent Telegram response for " + (plcAddress == null ? "unknown" : plcAddress.toKey())
+ " to server");
}
private void handleAuthResponse(PrivilegeContext ctx, JsonObject response) {
@ -542,15 +538,15 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
continue;
}
Callable<?> callable = null;
CheckedRunnable runnable = null;
try {
callable = this.messageQueue.takeFirst();
callable.call();
runnable = this.messageQueue.takeFirst();
runnable.run();
} catch (Exception e) {
closeBrokenGwSessionUpdateState("Failed to send message",
"Failed to send message, reconnecting in " + RETRY_DELAY + "s.");
if (callable != null) {
this.messageQueue.addFirst(callable);
if (runnable != null) {
this.messageQueue.addFirst(runnable);
logger.error(
"Failed to send message, reconnecting in " + RETRY_DELAY + "s. And then retrying message.",
e);
@ -584,9 +580,10 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
VersionQueryResult versionQueryResult = getContainer().getAgent().getVersion();
this.versions.add(AGENT_VERSION, versionQueryResult.getAgentVersion().toJson());
this.versions.add(APP_VERSION, versionQueryResult.getAppVersion().toJson());
this.versions.add(COMPONENT_VERSIONS,
versionQueryResult.getComponentVersions().stream().map(ComponentVersion::toJson)
.collect(JsonArray::new, JsonArray::add, JsonArray::addAll));
this.versions.add(COMPONENT_VERSIONS, versionQueryResult.getComponentVersions()
.stream()
.map(ComponentVersion::toJson)
.collect(JsonArray::new, JsonArray::add, JsonArray::addAll));
}
return this.versions;
@ -635,6 +632,30 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
notifyServer(address, value);
}
private PlcAddress parsePlcAddress(JsonObject telegramJ) {
if (!telegramJ.has(PARAM_RESOURCE) || !telegramJ.has(PARAM_ACTION))
throw new IllegalArgumentException("Both " + PARAM_RESOURCE + " and " + PARAM_ACTION + " is required!");
String resource = telegramJ.get(PARAM_RESOURCE).getAsString();
String action = telegramJ.get(PARAM_ACTION).getAsString();
return this.plcHandler.getPlcAddress(resource, action);
}
private static void handleFailedTelegram(JsonObject telegramJ, PlcAddress plcAddress, Exception e) {
if (plcAddress == null) {
logger.error("Failed to handle telegram: " + telegramJ, e);
telegramJ.addProperty(PARAM_STATE, PlcResponseState.Failed.name());
telegramJ.addProperty(PARAM_STATE_MSG,
"Could not evaluate PlcAddress: " + getExceptionMessage(getRootCause(e), false));
} else {
logger.error("Failed to execute telegram: " + plcAddress.toKeyAddress(), e);
telegramJ.addProperty(PARAM_STATE, PlcResponseState.Failed.name());
telegramJ.addProperty(PARAM_STATE_MSG,
"Failed to perform " + plcAddress.toKey() + ": " + getExceptionMessage(getRootCause(e), false));
}
}
@ClientEndpoint
public static class PlcGwClientEndpoint {

View File

@ -0,0 +1,8 @@
package li.strolch.plc.gw.server;
import li.strolch.plc.model.PlcAddressValueResponse;
public interface PlcAddressResponseValueListener {
void handleResponse(PlcAddressValueResponse response) throws Exception;
}

View File

@ -0,0 +1,9 @@
package li.strolch.plc.gw.server;
import li.strolch.plc.model.ConnectionState;
import li.strolch.plc.model.PlcAddressKey;
public interface PlcConnectionStateListener {
void handleConnectionState(String plcId, ConnectionState connectionState);
}

View File

@ -1,19 +1,21 @@
package li.strolch.plc.gw.server;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static li.strolch.plc.model.ModelHelper.jsonToValue;
import static li.strolch.plc.model.ModelHelper.valueToJson;
import static li.strolch.plc.model.PlcConstants.*;
import static li.strolch.utils.collections.SynchronizedCollections.synchronizedMapOfLists;
import static li.strolch.utils.helper.ExceptionHelper.getExceptionMessageWithCauses;
import static li.strolch.websocket.WebSocketRemoteIp.*;
import static li.strolch.websocket.WebSocketRemoteIp.get;
import javax.websocket.CloseReason;
import javax.websocket.PongMessage;
import javax.websocket.RemoteEndpoint.Basic;
import javax.websocket.Session;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import com.google.gson.JsonObject;
@ -21,6 +23,7 @@ import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import li.strolch.agent.api.ComponentContainer;
import li.strolch.agent.api.StrolchComponent;
import li.strolch.exception.StrolchNotAuthenticatedException;
import li.strolch.handler.operationslog.OperationsLog;
import li.strolch.model.Locator;
import li.strolch.model.log.LogMessage;
@ -37,7 +40,6 @@ import li.strolch.runtime.privilege.PrivilegedRunnable;
import li.strolch.runtime.privilege.PrivilegedRunnableWithResult;
import li.strolch.utils.collections.MapOfLists;
import li.strolch.utils.dbc.DBC;
import li.strolch.websocket.WebSocketRemoteIp;
public class PlcGwServerHandler extends StrolchComponent {
@ -45,19 +47,26 @@ public class PlcGwServerHandler extends StrolchComponent {
public static final String THREAD_POOL = "PlcRequests";
private String runAsUser;
private String realm;
private Set<String> plcIds;
private PlcStateHandler plcStateHandler;
private Map<String, PlcSession> plcSessionsBySessionId;
private Map<String, PlcSession> plcSessionsByPlcId;
private MapOfLists<String, PlcConnectionStateListener> plcConnectionStateListeners;
private Map<String, MapOfLists<PlcAddressKey, PlcNotificationListener>> plcAddressListenersByPlcId;
private Map<Long, PlcResponse> plcResponses;
private ScheduledFuture<?> clearDeadConnectionsTask;
public PlcGwServerHandler(ComponentContainer container, String componentName) {
super(container, componentName);
}
public String getRealm() {
return this.realm;
}
public Set<String> getPlcIds() {
return this.plcIds;
}
@ -66,26 +75,56 @@ public class PlcGwServerHandler extends StrolchComponent {
public void initialize(ComponentConfiguration configuration) throws Exception {
this.runAsUser = configuration.getString("runAsUser", "plc-server");
this.realm = getContainer().getRealmNames().iterator().next();
this.plcIds = runAsAgentWithResult(
ctx -> getContainer().getPrivilegeHandler().getPrivilegeHandler().getUsers(ctx.getCertificate())
.stream() //
.filter(user -> user.hasRole(ROLE_PLC)).map(UserRep::getUsername) //
.collect(toSet()));
this.plcIds = runAsAgentWithResult(ctx -> getContainer().getPrivilegeHandler()
.getPrivilegeHandler()
.getUsers(ctx.getCertificate())
.stream() //
.filter(user -> user.hasRole(ROLE_PLC))
.map(UserRep::getUsername) //
.collect(toSet()));
this.plcStateHandler = new PlcStateHandler(getContainer());
this.plcStateHandler = getPlcStateHandler();
this.plcSessionsBySessionId = new ConcurrentHashMap<>();
this.plcSessionsByPlcId = new ConcurrentHashMap<>();
this.plcConnectionStateListeners = synchronizedMapOfLists(new MapOfLists<>());
this.plcAddressListenersByPlcId = new ConcurrentHashMap<>();
this.plcResponses = new ConcurrentHashMap<>();
super.initialize(configuration);
}
@Override
public void start() throws Exception {
this.clearDeadConnectionsTask = getAgent().getScheduledExecutor(getName())
.scheduleWithFixedDelay(this::clearDeadConnections, 10, 10, TimeUnit.SECONDS);
super.start();
}
@Override
public void stop() throws Exception {
if (this.clearDeadConnectionsTask != null)
this.clearDeadConnectionsTask.cancel(true);
super.stop();
}
protected PlcStateHandler getPlcStateHandler() {
return new PlcStateHandler(this);
}
public boolean isPlcConnected(String plcId) {
DBC.PRE.assertNotEmpty("plcId must not be empty", plcId);
return this.plcSessionsByPlcId.containsKey(plcId);
}
public void register(String plcId, PlcConnectionStateListener listener) {
this.plcConnectionStateListeners.addElement(plcId, listener);
}
public void unregister(String plcId, PlcConnectionStateListener listener) {
this.plcConnectionStateListeners.removeElement(plcId, listener);
}
public void register(String plcId, PlcAddressKey addressKey, PlcNotificationListener listener) {
DBC.PRE.assertNotNull("addressKey must not be null", addressKey);
DBC.PRE.assertNotEmpty("plcId must not be empty", plcId);
@ -126,52 +165,20 @@ public class PlcGwServerHandler extends StrolchComponent {
return super.runAsWithResult(this.runAsUser, runnable);
}
public void sendMessage(PlcAddressKey addressKey, String plcId, boolean value,
PlcAddressResponseListener listener) {
sendMessage(addressKey, plcId, new JsonPrimitive(value), listener);
}
public void sendMessage(PlcAddressKey addressKey, String plcId, int value, PlcAddressResponseListener listener) {
sendMessage(addressKey, plcId, new JsonPrimitive(value), listener);
}
public void sendMessage(PlcAddressKey addressKey, String plcId, double value, PlcAddressResponseListener listener) {
sendMessage(addressKey, plcId, new JsonPrimitive(value), listener);
}
public void sendMessage(PlcAddressKey addressKey, String plcId, String value, PlcAddressResponseListener listener) {
sendMessage(addressKey, plcId, new JsonPrimitive(value), listener);
public void sendMessage(PlcAddressKey addressKey, String plcId, Object value, PlcAddressResponseListener listener) {
sendMessage(addressKey, plcId, value == null ? null : valueToJson(value), listener);
}
public void sendMessage(PlcAddressKey addressKey, String plcId, PlcAddressResponseListener listener) {
sendMessage(addressKey, plcId, (JsonPrimitive) null, listener);
}
public PlcAddressResponse sendMessageSync(PlcAddressKey addressKey, String plcId, boolean value) {
return sendMessageSync(addressKey, plcId, new JsonPrimitive(value));
}
public PlcAddressResponse sendMessage(PlcAddressKey addressKey, String plcId, int value) {
return sendMessageSync(addressKey, plcId, new JsonPrimitive(value));
}
public PlcAddressResponse sendMessage(PlcAddressKey addressKey, String plcId, double value) {
return sendMessageSync(addressKey, plcId, new JsonPrimitive(value));
}
public PlcAddressResponse sendMessage(PlcAddressKey addressKey, String plcId, String value) {
return sendMessageSync(addressKey, plcId, new JsonPrimitive(value));
}
public PlcAddressResponse sendMessage(PlcAddressKey addressKey, String plcId) {
return sendMessageSync(addressKey, plcId, (JsonPrimitive) null);
sendMessage(addressKey, plcId, null, listener);
}
public PlcAddressResponse sendMessageSync(PlcAddressKey addressKey, String plcId) {
return sendMessageSync(addressKey, plcId, null);
}
public PlcAddressResponse sendMessageSync(PlcAddressKey addressKey, String plcId, JsonPrimitive valueJ) {
public PlcAddressResponse sendMessageSync(PlcAddressKey addressKey, String plcId, Object value) {
JsonPrimitive valueJ = value == null ? null : valueToJson(value);
PlcAddressResponse[] response = new PlcAddressResponse[1];
@ -194,44 +201,30 @@ public class PlcGwServerHandler extends StrolchComponent {
private void sendMessage(PlcAddressKey addressKey, String plcId, JsonPrimitive valueJ,
PlcAddressResponseListener listener) {
PlcSession plcSession = this.plcSessionsByPlcId.get(plcId);
if (plcSession == null)
throw new IllegalStateException("PLC " + plcId + " is not connected!");
assertPlcAuthed(plcId, plcSession.session.getId());
PlcSession plcSession = getPlcSession(plcId);
getExecutorService(THREAD_POOL).submit(() -> send(plcSession, addressKey, valueJ, listener));
}
public void asyncGetAddressState(PlcAddressKey addressKey, String plcId, PlcAddressResponseValueListener listener) {
PlcSession plcSession = getPlcSession(plcId);
getExecutorService(THREAD_POOL).submit(() -> asyncGetAddressState(plcSession, addressKey, listener));
}
private void send(PlcSession plcSession, PlcAddressKey plcAddressKey, JsonPrimitive valueJ,
PlcAddressResponseListener listener) {
if (valueJ == null)
logger.info("Sending " + plcAddressKey + " to " + plcSession.plcId + "...");
else
logger.info("Sending " + plcAddressKey + " with value " + valueJ + " to " + plcSession.plcId + "...");
logger.info("Sending " + plcAddressKey + " = " + valueJ + " to " + plcSession.plcId + "...");
PlcAddressResponse plcResponse = new PlcAddressResponse(plcSession.plcId, plcAddressKey);
plcResponse.setListener(() -> handleResponse(listener, plcResponse));
plcResponse.setListener(() -> listener.handleResponse(plcResponse));
try {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty(PARAM_SEQUENCE_ID, plcResponse.getSequenceId());
jsonObject.addProperty(PARAM_MESSAGE_TYPE, MSG_TYPE_PLC_TELEGRAM);
jsonObject.addProperty(PARAM_PLC_ID, plcSession.plcId);
jsonObject.addProperty(PARAM_RESOURCE, plcAddressKey.resource);
jsonObject.addProperty(PARAM_ACTION, plcAddressKey.action);
if (valueJ != null)
jsonObject.add(PARAM_VALUE, valueJ);
String data = jsonObject.toString();
String data = buildJsonTelegram(plcSession.plcId, plcAddressKey, valueJ, plcResponse).toString();
this.plcResponses.put(plcResponse.getSequenceId(), plcResponse);
synchronized (plcSession.session) {
sendDataToClient(data, plcSession.session.getBasicRemote());
}
sendDataToClient(data, plcSession.session);
} catch (Exception e) {
logger.error("Failed to send " + plcAddressKey + " to PLC " + plcSession.plcId, e);
@ -247,14 +240,57 @@ public class PlcGwServerHandler extends StrolchComponent {
}
}
private void handleResponse(PlcAddressResponseListener listener, PlcAddressResponse response) {
private void asyncGetAddressState(PlcSession plcSession, PlcAddressKey plcAddressKey,
PlcAddressResponseValueListener listener) {
logger.info("Requesting value for address " + plcAddressKey + " from PLC " + plcSession.plcId + "...");
PlcAddressValueResponse plcResponse = new PlcAddressValueResponse(plcSession.plcId, plcAddressKey);
plcResponse.setListener(() -> listener.handleResponse(plcResponse));
try {
listener.handleResponse(response);
String data = buildJsonGetAddressStateTelegram(plcSession.plcId, plcAddressKey, plcResponse).toString();
this.plcResponses.put(plcResponse.getSequenceId(), plcResponse);
sendDataToClient(data, plcSession.session);
} catch (Exception e) {
logger.error("Failed to notify listener " + listener + " for response of " + response, e);
logger.error("Failed to get address state for " + plcAddressKey + " from PLC " + plcSession.plcId, e);
plcResponse.setState(PlcResponseState.Failed);
plcResponse.setStateMsg(
"Failed to get address state for " + plcAddressKey + " from PLC " + plcSession.plcId + ": "
+ getExceptionMessageWithCauses(e));
try {
listener.handleResponse(plcResponse);
} catch (Exception ex) {
logger.error("Failed to notify listener " + listener, ex);
}
}
}
private static JsonObject buildJsonTelegram(String plcId, PlcAddressKey plcAddressKey, JsonPrimitive valueJ,
PlcAddressResponse plcResponse) {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty(PARAM_SEQUENCE_ID, plcResponse.getSequenceId());
jsonObject.addProperty(PARAM_MESSAGE_TYPE, MSG_TYPE_PLC_TELEGRAM);
jsonObject.addProperty(PARAM_PLC_ID, plcId);
jsonObject.addProperty(PARAM_RESOURCE, plcAddressKey.resource);
jsonObject.addProperty(PARAM_ACTION, plcAddressKey.action);
if (valueJ != null)
jsonObject.add(PARAM_VALUE, valueJ);
return jsonObject;
}
private static JsonObject buildJsonGetAddressStateTelegram(String plcId, PlcAddressKey plcAddressKey,
PlcAddressResponse plcResponse) {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty(PARAM_SEQUENCE_ID, plcResponse.getSequenceId());
jsonObject.addProperty(PARAM_MESSAGE_TYPE, MSG_TYPE_PLC_GET_ADDRESS_STATE);
jsonObject.addProperty(PARAM_PLC_ID, plcId);
jsonObject.addProperty(PARAM_RESOURCE, plcAddressKey.resource);
jsonObject.addProperty(PARAM_ACTION, plcAddressKey.action);
return jsonObject;
}
private PlcSession assertPlcAuthed(String plcId, String sessionId) throws NotAuthenticatedException {
PlcSession plcSession = this.plcSessionsBySessionId.get(sessionId);
if (plcSession.certificate == null)
@ -275,13 +311,16 @@ public class PlcGwServerHandler extends StrolchComponent {
return plcSession;
}
private void sendDataToClient(String data, Basic basic) throws IOException {
int pos = 0;
while (pos + 8192 < data.length()) {
basic.sendText(data.substring(pos, pos + 8192), false);
pos += 8192;
private void sendDataToClient(String data, Session session) throws IOException {
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (session) {
int pos = 0;
while (pos + 8192 < data.length()) {
session.getBasicRemote().sendText(data.substring(pos, pos + 8192), false);
pos += 8192;
}
session.getBasicRemote().sendText(data.substring(pos), true);
}
basic.sendText(data.substring(pos), true);
}
public void onWsMessage(String message, Session session) throws IOException {
@ -299,6 +338,8 @@ public class PlcGwServerHandler extends StrolchComponent {
case MSG_TYPE_AUTHENTICATION -> handleAuth(session, jsonObject);
case MSG_TYPE_PLC_NOTIFICATION -> handleNotification(assertPlcAuthed(plcId, session.getId()), jsonObject);
case MSG_TYPE_PLC_TELEGRAM -> handleTelegramResponse(assertPlcAuthed(plcId, session.getId()), jsonObject);
case MSG_TYPE_PLC_GET_ADDRESS_STATE ->
handleGetAddressStateResponse(assertPlcAuthed(plcId, session.getId()), jsonObject);
case MSG_TYPE_STATE_NOTIFICATION -> handleStateMsg(assertPlcAuthed(plcId, session.getId()), jsonObject);
case MSG_TYPE_MESSAGE -> {
assertPlcAuthed(plcId, session.getId());
@ -359,7 +400,6 @@ public class PlcGwServerHandler extends StrolchComponent {
}
private void handleTelegramResponse(PlcSession plcSession, JsonObject responseJ) {
long sequenceId = responseJ.get(PARAM_SEQUENCE_ID).getAsLong();
PlcResponse plcResponse = this.plcResponses.remove(sequenceId);
if (plcResponse == null) {
@ -372,13 +412,49 @@ public class PlcGwServerHandler extends StrolchComponent {
plcResponse.setState(PlcResponseState.valueOf(state));
plcResponse.setStateMsg(stateMsg);
plcResponse.getListener().run();
try {
plcResponse.getListener().run();
} catch (Exception e) {
logger.error("Failed to notify listener " + plcResponse.getListener() + " for response of " + plcResponse,
e);
}
}
private void handleGetAddressStateResponse(PlcSession plcSession, JsonObject responseJ) {
long sequenceId = responseJ.get(PARAM_SEQUENCE_ID).getAsLong();
PlcResponse response = this.plcResponses.remove(sequenceId);
if (response == null) {
logger.error(plcSession.plcId + ": PlcResponse does not exist for sequenceId " + sequenceId);
return;
}
if (!(response instanceof PlcAddressValueResponse plcResponse))
throw new IllegalStateException(
"Performing a GetAddressState response handling, but listener is wrong: " + response);
String state = responseJ.get(PARAM_STATE).getAsString();
String stateMsg = responseJ.get(PARAM_STATE_MSG).getAsString();
plcResponse.setState(PlcResponseState.valueOf(state));
plcResponse.setStateMsg(stateMsg);
plcResponse.setValue(jsonToValue(responseJ.getAsJsonPrimitive(PARAM_VALUE)));
try {
plcResponse.getListener().run();
} catch (Exception e) {
logger.error("Failed to notify listener " + plcResponse.getListener() + " for response of " + plcResponse,
e);
}
}
private void handleMessage(JsonObject jsonObject) {
JsonObject msgJ = jsonObject.get(PARAM_MESSAGE).getAsJsonObject();
LogMessage logMessage = LogMessage.fromJson(msgJ);
logger.info("Received message " + logMessage.getLocator());
if (!logMessage.getRealm().equals(this.realm))
throw new IllegalStateException(
"Unexpected realm in message " + logMessage.getId() + " " + logMessage.getLocator() + " "
+ logMessage.getMessage());
OperationsLog log = getComponent(OperationsLog.class);
log.updateState(logMessage.getRealm(), logMessage.getLocator(), LogMessageState.Inactive);
log.addMessage(logMessage);
@ -387,8 +463,10 @@ public class PlcGwServerHandler extends StrolchComponent {
private void handleDisableMessage(JsonObject jsonObject) {
String realm = jsonObject.get(PARAM_REALM).getAsString();
Locator locator = Locator.valueOf(jsonObject.get(PARAM_LOCATOR).getAsString());
logger.info("Received disable for messages with locator " + locator);
if (!realm.equals(this.realm))
throw new IllegalStateException("Unexpected realm in disable message action for message " + locator);
logger.info("Received disable for messages with locator " + locator);
OperationsLog operationsLog = getComponent(OperationsLog.class);
operationsLog.updateState(realm, locator, LogMessageState.Inactive);
}
@ -440,10 +518,7 @@ public class PlcGwServerHandler extends StrolchComponent {
private void sendAuthResponse(PlcSession plcSession, JsonObject jsonObject) {
try {
String data = jsonObject.toString();
synchronized (plcSession.session) {
sendDataToClient(data, plcSession.session.getBasicRemote());
}
sendDataToClient(jsonObject.toString(), plcSession.session);
logger.info(plcSession.plcId + ": Sent " + MSG_TYPE_AUTHENTICATION + " response on Session "
+ plcSession.session.getId());
} catch (Exception e) {
@ -465,12 +540,7 @@ public class PlcGwServerHandler extends StrolchComponent {
String plcId = new String(message.getApplicationData().array());
PlcSession plcSession = this.plcSessionsBySessionId.get(session.getId());
if (plcSession != null) {
plcSession.lastUpdate = System.currentTimeMillis();
logger.info(
"PLC " + plcId + " with SessionId " + plcSession.session.getId() + " is still alive on certificate "
+ (plcSession.certificate == null ? null : plcSession.certificate.getSessionId()));
} else {
if (plcSession == null) {
plcSession = new PlcSession(plcId, session);
plcSession.lastUpdate = System.currentTimeMillis();
@ -495,17 +565,43 @@ public class PlcGwServerHandler extends StrolchComponent {
}
if (plcSession.certificate != null) {
StrolchSessionHandler sessionHandler = getContainer().getComponent(StrolchSessionHandler.class);
sessionHandler.validate(plcSession.certificate);
try {
StrolchSessionHandler sessionHandler = getContainer().getComponent(StrolchSessionHandler.class);
sessionHandler.validate(plcSession.certificate);
this.plcStateHandler.handleStillConnected(plcSession);
plcSession.lastUpdate = System.currentTimeMillis();
logger.info("PLC " + plcId + " with SessionId " + session.getId() + " is still alive on certificate "
+ plcSession.certificate.getSessionId());
this.plcStateHandler.handleStillConnected(plcSession);
} catch (StrolchNotAuthenticatedException e) {
logger.error("PLC session " + session.getId() + " is not authenticated anymore for plc " + plcId
+ ". Closing session.");
this.plcSessionsBySessionId.remove(plcId);
PlcSession registeredSession = this.plcSessionsByPlcId.get(plcId);
if (registeredSession != null && registeredSession.session.getId().equals(session.getId())) {
this.plcSessionsByPlcId.remove(plcId);
}
try {
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (session) {
session.close(new CloseReason(CloseReason.CloseCodes.NOT_CONSISTENT, "Stale session"));
}
} catch (Exception e1) {
logger.error("Failed to close session " + session.getId(), e1);
}
}
}
}
private void clearDeadConnections() {
// find all sessions which are timed out
List<PlcSession> expiredSessions = this.plcSessionsBySessionId.values().stream().filter(this::hasExpired)
List<PlcSession> expiredSessions = this.plcSessionsBySessionId.values()
.stream()
.filter(this::hasExpired)
.toList();
for (PlcSession plcSession : expiredSessions) {
@ -531,7 +627,17 @@ public class PlcGwServerHandler extends StrolchComponent {
}
this.plcSessionsBySessionId.remove(plcSession.session.getId());
this.plcSessionsByPlcId.remove(plcSession.plcId);
// see if this session is also still the registered session
// it might already have been overwritten by another session
PlcSession registeredSession = this.plcSessionsByPlcId.get(plcSession.plcId);
if (registeredSession != null && registeredSession.session.getId().equals(plcSession.session.getId())) {
this.plcSessionsByPlcId.remove(plcSession.plcId);
// handle state change
this.plcStateHandler.handlePlcState(plcSession, ConnectionState.Disconnected, "dead connection", null);
notifyObserversOfConnectionLost(plcSession.plcId);
}
}
}
@ -584,7 +690,9 @@ public class PlcGwServerHandler extends StrolchComponent {
logger.warn("Notifying PlcResponse listener " + plcResponse + " of connection lost!");
plcResponse.getListener().run();
} catch (Exception e) {
logger.error("Failed to notify PlcResponse listener " + plcResponse);
logger.error(
"Failed to notify PlcResponse listener " + plcResponse + " of connection lost to PLC " + plcId,
e);
}
}
@ -604,7 +712,26 @@ public class PlcGwServerHandler extends StrolchComponent {
for (PlcNotificationListener listener : listenersCopy) {
logger.warn("Notifying PlcNotificationListener " + addressKey + " with " + listener
+ " of connection lost!");
listener.handleConnectionLost();
try {
listener.handleConnectionLost();
} catch (Exception e) {
logger.error("Failed to notify listener " + listener + " of connection lost for PLC " + plcId, e);
}
}
}
}
public void notifyConnectionState(String plcId, ConnectionState connectionState) {
List<PlcConnectionStateListener> listeners = this.plcConnectionStateListeners.getList(plcId);
if (listeners == null)
return;
listeners = new ArrayList<>(listeners);
for (PlcConnectionStateListener listener : listeners) {
try {
listener.handleConnectionState(plcId, connectionState);
} catch (Exception e) {
logger.error("Failed to notify listener " + listener + " of new connection state " + connectionState
+ " for PLC " + plcId, e);
}
}
}
@ -613,6 +740,15 @@ public class PlcGwServerHandler extends StrolchComponent {
logger.error(session.getId() + ": Error: " + throwable.getMessage(), throwable);
}
private PlcSession getPlcSession(String plcId) {
PlcSession plcSession = this.plcSessionsByPlcId.get(plcId);
if (plcSession == null)
throw new IllegalStateException("PLC " + plcId + " is not connected!");
assertPlcAuthed(plcId, plcSession.session.getId());
return plcSession;
}
public static class PlcSession {
public final String plcId;
public final Session session;

View File

@ -1,7 +1,9 @@
package li.strolch.plc.gw.server;
import static li.strolch.plc.model.ModelHelper.valueToJson;
import static li.strolch.plc.model.PlcConstants.TYPE_PLC;
import static li.strolch.runtime.StrolchConstants.SYSTEM_USER_AGENT;
import static li.strolch.utils.helper.ExceptionHelper.getCallerMethod;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -20,6 +22,7 @@ import li.strolch.plc.model.PlcServiceState;
import li.strolch.privilege.model.PrivilegeContext;
import li.strolch.runtime.privilege.PrivilegedRunnable;
import li.strolch.runtime.privilege.PrivilegedRunnableWithResult;
import li.strolch.utils.CheckedBiFunction;
import li.strolch.utils.dbc.DBC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -42,6 +45,10 @@ public abstract class PlcGwService implements PlcNotificationListener, PlcAddres
this.state = PlcServiceState.Unregistered;
}
public ComponentContainer getContainer() {
return this.container;
}
public PlcServiceState getState() {
return this.state;
}
@ -62,10 +69,17 @@ public abstract class PlcGwService implements PlcNotificationListener, PlcAddres
this.state = PlcServiceState.Unregistered;
}
protected PlcAddressKey register(String resource, String action) {
protected void register(String resource, String action) {
PlcAddressKey addressKey = keyFor(resource, action);
this.plcHandler.register(this.plcId, addressKey, this);
return addressKey;
}
protected void register(PlcConnectionStateListener listener) {
this.plcHandler.register(this.plcId, listener);
}
protected void unregister(PlcConnectionStateListener listener) {
this.plcHandler.unregister(this.plcId, listener);
}
protected void register(PlcAddressKey key) {
@ -84,25 +98,20 @@ public abstract class PlcGwService implements PlcNotificationListener, PlcAddres
return PlcAddressKey.keyFor(resource, action);
}
public void sendMessage(PlcAddressKey addressKey, String plcId, boolean value,
PlcAddressResponseListener listener) {
this.plcHandler.sendMessage(addressKey, plcId, value, listener);
public void sendMessage(String resource, String action, Object value, PlcAddressResponseListener listener) {
this.plcHandler.sendMessage(keyFor(resource, action), this.plcId, valueToJson(value), listener);
}
public void sendMessage(PlcAddressKey addressKey, String plcId, int value, PlcAddressResponseListener listener) {
this.plcHandler.sendMessage(addressKey, plcId, value, listener);
public void sendMessage(PlcAddressKey addressKey, Object value, PlcAddressResponseListener listener) {
this.plcHandler.sendMessage(addressKey, this.plcId, value, listener);
}
public void sendMessage(PlcAddressKey addressKey, String plcId, double value, PlcAddressResponseListener listener) {
this.plcHandler.sendMessage(addressKey, plcId, value, listener);
public void readState(String resource, String action, PlcAddressResponseValueListener listener) {
this.plcHandler.asyncGetAddressState(keyFor(resource, action), this.plcId, listener);
}
public void sendMessage(PlcAddressKey addressKey, String plcId, String value, PlcAddressResponseListener listener) {
this.plcHandler.sendMessage(addressKey, plcId, value, listener);
}
public void sendMessage(PlcAddressKey addressKey, String plcId, PlcAddressResponseListener listener) {
this.plcHandler.sendMessage(addressKey, plcId, listener);
public void readState(PlcAddressKey addressKey, PlcAddressResponseValueListener listener) {
this.plcHandler.asyncGetAddressState(addressKey, this.plcId, listener);
}
@Override
@ -115,6 +124,10 @@ public abstract class PlcGwService implements PlcNotificationListener, PlcAddres
throw new UnsupportedOperationException("Not implemented!");
}
protected boolean hasExecutionHandler() {
return this.container.hasComponent(ExecutionHandler.class);
}
protected ExecutionHandler getExecutionHandler() {
return this.container.getComponent(ExecutionHandler.class);
}
@ -135,17 +148,70 @@ public abstract class PlcGwService implements PlcNotificationListener, PlcAddres
try {
this.plcHandler.run(runnable);
} catch (Exception e) {
logger.error("Runnable " + runnable + " failed!", e);
if (hasOperationsLogs()) {
getOperationsLogs().addMessage(
new LogMessage(this.container.getRealmNames().iterator().next(), SYSTEM_USER_AGENT,
Resource.locatorFor(TYPE_PLC, this.plcId), LogSeverity.Exception,
LogMessageState.Information, PlcGwSrvI18n.bundle, "systemAction.failed")
.withException(e).value("action", runnable).value("reason", e));
}
handleFailedRunnable(runnable.toString(), e);
}
}
protected <T> T run(PrivilegedRunnableWithResult<T> runnable) {
try {
return this.plcHandler.runWithResult(runnable);
} catch (Exception e) {
handleFailedRunnable(runnable.toString(), e);
throw new IllegalStateException("Failed to execute runnable " + runnable, e);
}
}
private void handleFailedRunnable(String runnable, Exception e) {
logger.error("Runnable " + runnable + " failed!", e);
if (hasOperationsLogs()) {
getOperationsLogs().addMessage(
new LogMessage(this.container.getRealmNames().iterator().next(), SYSTEM_USER_AGENT,
Resource.locatorFor(TYPE_PLC, this.plcId), LogSeverity.Exception,
LogMessageState.Information, PlcGwSrvI18n.bundle, "systemAction.failed").withException(e)
.value("action", runnable));
}
}
/**
* Executes the given consumer in a read-only transaction
*
* @param consumer
* the consumer to run in a read-only transaction
*/
protected <T> T runReadOnlyTx(CheckedBiFunction<PrivilegeContext, StrolchTransaction, T> consumer) {
return run(ctx -> {
try (StrolchTransaction tx = openTx(ctx, getCallerMethod(), true)) {
return consumer.apply(ctx, tx);
}
});
}
/**
* <p>Executes the given consumer in a writeable transaction</p>
*
* <p><b>Note:</b> The transaction is automatically committed by calling {@link StrolchTransaction#commitOnClose()}
* when the runnable is completed and the TX is dirty, i.e. {@link StrolchTransaction#needsCommit()} returns
* true</p>
*
* @param consumer
* the consumer to run in a writeable transaction
*/
protected <T> T runWritableTx(CheckedBiFunction<PrivilegeContext, StrolchTransaction, T> consumer) {
return run(ctx -> {
try (StrolchTransaction tx = openTx(ctx, getCallerMethod(), false)) {
try {
T t = consumer.apply(ctx, tx);
if (tx.needsCommit())
tx.commitOnClose();
return t;
} catch (Exception e) {
tx.rollbackOnClose();
throw e;
}
}
});
}
protected <T> T runWithResult(PrivilegedRunnableWithResult<T> runnable) throws Exception {
return this.plcHandler.runWithResult(runnable);
}
@ -166,7 +232,8 @@ public abstract class PlcGwService implements PlcNotificationListener, PlcAddres
protected ScheduledFuture<?> scheduleAtFixedRate(PrivilegedRunnable runnable, long initialDelay, long period,
TimeUnit delayUnit) {
return this.container.getAgent().getScheduledExecutor(PlcGwService.class.getSimpleName())
return this.container.getAgent()
.getScheduledExecutor(PlcGwService.class.getSimpleName())
.scheduleAtFixedRate(() -> {
try {
this.container.getPrivilegeHandler().runAsAgent(runnable);
@ -178,7 +245,8 @@ public abstract class PlcGwService implements PlcNotificationListener, PlcAddres
protected ScheduledFuture<?> scheduleWithFixedDelay(PrivilegedRunnable runnable, long initialDelay, long period,
TimeUnit delayUnit) {
return this.container.getAgent().getScheduledExecutor(PlcGwService.class.getSimpleName())
return this.container.getAgent()
.getScheduledExecutor(PlcGwService.class.getSimpleName())
.scheduleWithFixedDelay(() -> {
try {
this.container.getPrivilegeHandler().runAsAgent(runnable);

View File

@ -39,10 +39,12 @@ public class PlcStateHandler {
private static final Logger logger = LoggerFactory.getLogger(PlcStateHandler.class);
private final PlcGwServerHandler gwServerHandler;
private final ComponentContainer container;
public PlcStateHandler(ComponentContainer container) {
this.container = container;
public PlcStateHandler(PlcGwServerHandler gwServerHandler) {
this.gwServerHandler = gwServerHandler;
this.container = gwServerHandler.getContainer();
}
protected void runAsAgent(PrivilegedRunnable runnable) throws Exception {
@ -60,15 +62,15 @@ public class PlcStateHandler {
// get the gateway and set the state
Resource plc = tx.getResourceBy(TYPE_PLC, plcSession.plcId, false);
if (plc == null)
if (plc == null) {
plc = buildNewPlc(plcSession, tx);
tx.add(plc);
}
StringParameter stateP = plc.getParameter(PARAM_CONNECTION_STATE, true);
StringParameter stateP = plc.getStringP(PARAM_CONNECTION_STATE);
if (!stateP.getValue().equals(ConnectionState.Connected.name())) {
stateP.setValue(ConnectionState.Connected.name());
StringParameter stateMsgP = plc.getParameter(PARAM_CONNECTION_STATE_MSG, true);
stateMsgP.setValue("");
plc.getStringP(PARAM_CONNECTION_STATE_MSG).clear();
tx.update(plc);
tx.commitOnClose();
}
@ -90,15 +92,16 @@ public class PlcStateHandler {
// get the gateway and set the state
Resource plc = tx.getResourceBy(TYPE_PLC, plcSession.plcId, false);
if (plc == null)
if (plc == null) {
plc = buildNewPlc(plcSession, tx);
tx.add(plc);
}
StringParameter stateP = plc.getParameter(PARAM_CONNECTION_STATE, true);
StringParameter stateP = plc.getStringP(PARAM_CONNECTION_STATE);
existingState = ConnectionState.valueOf(stateP.getValue());
if (existingState != connectionState) {
stateP.setValue(connectionState.name());
StringParameter stateMsgP = plc.getParameter(PARAM_CONNECTION_STATE_MSG, true);
stateMsgP.setValue(connectionStateMsg);
plc.setString(PARAM_CONNECTION_STATE_MSG, connectionStateMsg);
tx.update(plc);
logger.info(
@ -131,11 +134,7 @@ public class PlcStateHandler {
}
}
// trigger execution handler that we are connected
if (existingState != connectionState && connectionState == ConnectionState.Connected
&& this.container.hasComponent(ExecutionHandler.class))
this.container.getComponent(ExecutionHandler.class).triggerExecution(realm);
this.gwServerHandler.notifyConnectionState(plcSession.plcId, connectionState);
});
} catch (Exception e) {
logger.error("Failed to handle gateway connection state notification!", e);
@ -223,16 +222,21 @@ public class PlcStateHandler {
}
private Resource buildNewPlc(PlcGwServerHandler.PlcSession plcSession, StrolchTransaction tx) {
Resource plc = new ResourceBuilder(plcSession.plcId, plcSession.plcId, TYPE_PLC) //
return new ResourceBuilder(plcSession.plcId, plcSession.plcId, TYPE_PLC) //
.defaultBag() //
.string(PARAM_CONNECTION_STATE, buildParamName(PARAM_CONNECTION_STATE))
.enumeration(ConnectionState.Disconnected).end() //
.string(PARAM_CONNECTION_STATE_MSG, buildParamName(PARAM_CONNECTION_STATE_MSG)).end() //
.string(PARAM_LOCAL_IP, buildParamName(PARAM_LOCAL_IP)).end() //
.enumeration(ConnectionState.Disconnected)
.end() //
.string(PARAM_CONNECTION_STATE_MSG, buildParamName(PARAM_CONNECTION_STATE_MSG))
.end() //
.stringList(PARAM_LOCAL_IP, buildParamName(PARAM_LOCAL_IP))
.end() //
.endBag() //
.build();
tx.add(plc);
return plc;
}
private void setSystemState(JsonObject systemState, Resource gateway) {

View File

@ -0,0 +1,26 @@
package li.strolch.plc.gw.server;
import li.strolch.plc.model.PlcAddressKey;
import li.strolch.plc.model.PlcAddressValueResponse;
public abstract class ReadStatePlcGwService extends PlcGwService implements PlcConnectionStateListener {
public ReadStatePlcGwService(String plcId, PlcGwServerHandler plcHandler) {
super(plcId, plcHandler);
}
protected void handleGetState(PlcAddressValueResponse response) {
PlcAddressKey addressKey = response.getPlcAddressKey();
if (response.isFailed()) {
logger.error("Failed to read value for address " + addressKey + ": " + response.getStateMsg());
} else {
storeAddressState(addressKey, response.getValue());
}
}
protected void readState(String resource, String action) {
this.plcHandler.asyncGetAddressState(keyFor(resource, action), this.plcId, this::handleGetState);
}
protected abstract void storeAddressState(PlcAddressKey addressKey, Object value);
}

View File

@ -2,7 +2,6 @@ package li.strolch.plc.gw.server.service;
import static li.strolch.plc.model.PlcConstants.*;
import com.google.gson.JsonPrimitive;
import li.strolch.model.StrolchValueType;
import li.strolch.persistence.api.StrolchTransaction;
import li.strolch.plc.gw.server.PlcGwServerHandler;
@ -77,17 +76,6 @@ public class SendPlcTelegramCommand extends Command {
// sending with a value
StrolchValueType valueType = StrolchValueType.parse(this.valueTypeS);
Object value = valueType.parseValue(valueS);
JsonPrimitive valueJ;
if (value instanceof String)
valueJ = new JsonPrimitive((String) value);
else if (value instanceof Number)
valueJ = new JsonPrimitive((Number) value);
else if (value instanceof Boolean)
valueJ = new JsonPrimitive((Boolean) value);
else
throw new IllegalArgumentException("Unhandled value type " + valueType);
this.response = plcHandler.sendMessageSync(addressKey, this.plcId, valueJ);
this.response = plcHandler.sendMessageSync(addressKey, this.plcId, valueType.parseValue(valueS));
}
}

View File

@ -4,4 +4,16 @@ public enum ConnectionState {
Connected,
Disconnected,
Failed;
public boolean isConnected() {
return this == Connected;
}
public boolean isDisconnected() {
return this == Disconnected;
}
public boolean isFailed() {
return this == Failed;
}
}

View File

@ -0,0 +1,27 @@
package li.strolch.plc.model;
import com.google.gson.JsonPrimitive;
public class ModelHelper {
public static JsonPrimitive valueToJson(Object value) {
if (value instanceof Boolean)
return new JsonPrimitive((Boolean) value);
else if (value instanceof Number)
return new JsonPrimitive((Number) value);
else if (value instanceof String)
return new JsonPrimitive((String) value);
throw new IllegalArgumentException(
"Unhandled value type " + (value == null ? "(null)" : value.getClass().getName()));
}
public static Object jsonToValue(JsonPrimitive valueJ) {
if (valueJ.isBoolean())
return valueJ.getAsBoolean();
else if (valueJ.isNumber())
return valueJ.getAsNumber();
else if (valueJ.isString())
return valueJ.getAsString();
throw new IllegalArgumentException("Unhandled value type " + valueJ);
}
}

View File

@ -0,0 +1,46 @@
package li.strolch.plc.model;
public class PlcAddressValueResponse extends PlcAddressResponse {
private Object value;
public PlcAddressValueResponse(String plcId, PlcAddressKey plcAddressKey) {
super(plcId, plcAddressKey);
}
public void setValue(Object value) {
this.value = value;
}
public Object getValue() {
return this.value;
}
public boolean getValueAsBoolean() {
return (Boolean) this.value;
}
public int getValueAsInt() {
return ((Number) this.value).intValue();
}
public double getValueAsDouble() {
return ((Number) this.value).doubleValue();
}
public String getValueAsString() {
return ((String) this.value);
}
@Override
public PlcAddressValueResponse state(PlcResponseState state, String stateMsg) {
super.state(state, stateMsg);
return this;
}
@Override
public String toString() {
return "PlcAddressValueResponse{" + "plcId='" + plcId + '\'' + ", sequenceId=" + sequenceId + ", state=" + state
+ ", stateMsg='" + stateMsg + '\'' + ", value=" + value + '}';
}
}

View File

@ -60,6 +60,7 @@ public class PlcConstants {
public static final String MSG_TYPE_AUTHENTICATION = "Authentication";
public static final String MSG_TYPE_PLC_NOTIFICATION = "PlcNotification";
public static final String MSG_TYPE_PLC_TELEGRAM = "PlcTelegram";
public static final String MSG_TYPE_PLC_GET_ADDRESS_STATE = "PlcGetAddressState";
public static final String MSG_TYPE_MESSAGE = "Message";
public static final String MSG_TYPE_DISABLE_MESSAGE = "DisableMessage";
public static final String MSG_TYPE_STATE_NOTIFICATION = "StateNotification";

View File

@ -1,5 +1,7 @@
package li.strolch.plc.model;
import li.strolch.utils.CheckedRunnable;
public class PlcResponse {
private static long lastSequenceId = System.currentTimeMillis();
@ -9,7 +11,7 @@ public class PlcResponse {
protected PlcResponseState state;
protected String stateMsg;
private Runnable listener;
private CheckedRunnable listener;
public PlcResponse(String plcId) {
this.plcId = plcId;
@ -60,11 +62,11 @@ public class PlcResponse {
this.stateMsg = stateMsg;
}
public Runnable getListener() {
public CheckedRunnable getListener() {
return this.listener;
}
public void setListener(Runnable listener) {
public void setListener(CheckedRunnable listener) {
this.listener = listener;
}
}