[Major] New GetAddressState PLC telegram

This commit is contained in:
Robert von Burg 2022-10-07 15:39:10 +02:00
parent 20bd342651
commit 15c9eb731d
Signed by: eitch
GPG Key ID: 75DB9C85C74331F7
11 changed files with 350 additions and 173 deletions

View File

@ -2,6 +2,7 @@ package li.strolch.plc.gw.client;
import static java.net.NetworkInterface.getByInetAddress; import static java.net.NetworkInterface.getByInetAddress;
import static li.strolch.model.Tags.Json.*; import static li.strolch.model.Tags.Json.*;
import static li.strolch.plc.model.ModelHelper.valueToJson;
import static li.strolch.plc.model.PlcConstants.*; import static li.strolch.plc.model.PlcConstants.*;
import static li.strolch.runtime.StrolchConstants.DEFAULT_REALM; import static li.strolch.runtime.StrolchConstants.DEFAULT_REALM;
import static li.strolch.utils.helper.ExceptionHelper.*; import static li.strolch.utils.helper.ExceptionHelper.*;
@ -17,12 +18,14 @@ import java.nio.ByteBuffer;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.*; import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import com.google.gson.JsonArray; import com.google.gson.JsonArray;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import com.google.gson.JsonParser; import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import li.strolch.agent.api.*; import li.strolch.agent.api.*;
import li.strolch.model.Locator; import li.strolch.model.Locator;
import li.strolch.model.Resource; import li.strolch.model.Resource;
@ -34,6 +37,7 @@ import li.strolch.plc.core.PlcHandler;
import li.strolch.plc.model.*; import li.strolch.plc.model.*;
import li.strolch.privilege.model.PrivilegeContext; import li.strolch.privilege.model.PrivilegeContext;
import li.strolch.runtime.configuration.ComponentConfiguration; import li.strolch.runtime.configuration.ComponentConfiguration;
import li.strolch.utils.CheckedRunnable;
import li.strolch.utils.helper.NetworkHelper; import li.strolch.utils.helper.NetworkHelper;
import org.glassfish.tyrus.client.ClientManager; import org.glassfish.tyrus.client.ClientManager;
@ -64,7 +68,7 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
private ScheduledFuture<?> serverConnectFuture; private ScheduledFuture<?> serverConnectFuture;
private Map<PlcAddress, Object> notConnectedQueue; private Map<PlcAddress, Object> notConnectedQueue;
private LinkedBlockingDeque<Callable<?>> messageQueue; private LinkedBlockingDeque<CheckedRunnable> messageQueue;
private int maxMessageQueue; private int maxMessageQueue;
private boolean run; private boolean run;
private Future<?> messageSenderTask; private Future<?> messageSenderTask;
@ -176,6 +180,10 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
logger.error( logger.error(
"Connection refused to connect to server. Will try to connect again in " + RETRY_DELAY + "s: " "Connection refused to connect to server. Will try to connect again in " + RETRY_DELAY + "s: "
+ getExceptionMessageWithCauses(e)); + getExceptionMessageWithCauses(e));
} else if (rootCause.getMessage() != null && rootCause.getMessage()
.contains("Response code was not 101: 404.")) {
logger.error("Connection failed with 404 error code. Is URL " + this.gwServerUrl + " correct?");
logger.error("Server not yet ready with 404 error. Will try again in " + RETRY_DELAY + "s");
} else { } else {
logger.error("Failed to connect to server! Will try to connect again in " + RETRY_DELAY + "s", e); logger.error("Failed to connect to server! Will try to connect again in " + RETRY_DELAY + "s", e);
} }
@ -300,15 +308,15 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
} }
} }
private void addMsg(Callable<?> callable) { private void async(CheckedRunnable runnable) {
if (this.messageQueue.size() > this.maxMessageQueue) if (this.messageQueue.size() > this.maxMessageQueue)
this.messageQueue.removeFirst(); this.messageQueue.removeFirst();
this.messageQueue.addLast(callable); this.messageQueue.addLast(runnable);
} }
@Override @Override
public void sendMsg(LogMessage message) { public void sendMsg(LogMessage message) {
addMsg(() -> { async(() -> {
JsonObject messageJ = new JsonObject(); JsonObject messageJ = new JsonObject();
messageJ.addProperty(PARAM_PLC_ID, this.plcId); messageJ.addProperty(PARAM_PLC_ID, this.plcId);
@ -318,14 +326,12 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
sendDataToClient(messageJ); sendDataToClient(messageJ);
if (this.verbose) if (this.verbose)
logger.info("Sent msg " + message.getLocator() + " to server"); logger.info("Sent msg " + message.getLocator() + " to server");
return null;
}); });
} }
@Override @Override
public void disableMsg(Locator locator) { public void disableMsg(Locator locator) {
addMsg(() -> { async(() -> {
JsonObject messageJ = new JsonObject(); JsonObject messageJ = new JsonObject();
messageJ.addProperty(PARAM_PLC_ID, this.plcId); messageJ.addProperty(PARAM_PLC_ID, this.plcId);
@ -336,8 +342,6 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
sendDataToClient(messageJ); sendDataToClient(messageJ);
if (this.verbose) if (this.verbose)
logger.info("Sent msg " + locator + " to server"); logger.info("Sent msg " + locator + " to server");
return null;
}); });
} }
@ -345,29 +349,19 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
if (!plcAddress.remote) if (!plcAddress.remote)
return; return;
addMsg(() -> { async(() -> {
JsonObject notificationJ = new JsonObject(); JsonObject notificationJ = new JsonObject();
notificationJ.addProperty(PARAM_PLC_ID, this.plcId); notificationJ.addProperty(PARAM_PLC_ID, this.plcId);
notificationJ.addProperty(PARAM_MESSAGE_TYPE, MSG_TYPE_PLC_NOTIFICATION); notificationJ.addProperty(PARAM_MESSAGE_TYPE, MSG_TYPE_PLC_NOTIFICATION);
notificationJ.addProperty(PARAM_RESOURCE, plcAddress.resource); notificationJ.addProperty(PARAM_RESOURCE, plcAddress.resource);
notificationJ.addProperty(PARAM_ACTION, plcAddress.action); notificationJ.addProperty(PARAM_ACTION, plcAddress.action);
notificationJ.add(PARAM_VALUE, valueToJson(value));
if (value instanceof Boolean)
notificationJ.add(PARAM_VALUE, new JsonPrimitive((Boolean) value));
else if (value instanceof Number)
notificationJ.add(PARAM_VALUE, new JsonPrimitive((Number) value));
else if (value instanceof String)
notificationJ.add(PARAM_VALUE, new JsonPrimitive((String) value));
else
notificationJ.add(PARAM_VALUE, new JsonPrimitive(value.toString()));
sendDataToClient(notificationJ); sendDataToClient(notificationJ);
if (this.verbose) if (this.verbose)
logger.info("Sent notification for " + plcAddress.toKey() + " to server"); logger.info("Sent notification for " + plcAddress.toKey() + " to server");
return null;
}); });
} }
@ -382,11 +376,12 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
try { try {
runAsAgent(ctx -> { runAsAgent(ctx -> {
if (MSG_TYPE_AUTHENTICATION.equals(messageType)) { if (MSG_TYPE_AUTHENTICATION.equals(messageType)) {
handleAuthResponse(ctx, jsonObject); handleAuthResponse(ctx, jsonObject);
} else if (MSG_TYPE_PLC_TELEGRAM.equals(messageType)) { } else if (MSG_TYPE_PLC_TELEGRAM.equals(messageType)) {
handleTelegram(jsonObject); async(() -> handleTelegram(jsonObject));
} else if (MSG_TYPE_PLC_GET_ADDRESS_STATE.equals(messageType)) {
async(() -> handleGetAddressState(ctx, jsonObject));
} else { } else {
logger.error("Unhandled message type " + messageType); logger.error("Unhandled message type " + messageType);
} }
@ -396,54 +391,55 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
} }
} }
private void handleTelegram(JsonObject telegramJ) { private void handleGetAddressState(PrivilegeContext ctx, JsonObject telegramJ) throws Exception {
PlcAddress plcAddress = null;
try (StrolchTransaction tx = openTx(ctx.getCertificate(), true)) {
plcAddress = parsePlcAddress(telegramJ);
String plcAddressId = this.plcHandler.getPlcAddressId(plcAddress.resource, plcAddress.action);
Resource address = tx.getResourceBy(TYPE_PLC_ADDRESS, plcAddressId, true);
Object value = address.getParameter(PARAM_VALUE, true).getValue();
telegramJ.add(PARAM_VALUE, valueToJson(value));
telegramJ.addProperty(PARAM_STATE, PlcResponseState.Done.name());
telegramJ.addProperty(PARAM_STATE_MSG, "");
if (this.verbose)
logger.info("Sent address state for " + plcAddress.toKey() + " = " + value + " to server");
} catch (Exception e) {
handleFailedTelegram(telegramJ, plcAddress, e);
}
sendDataToClient(telegramJ);
}
private void handleTelegram(JsonObject telegramJ) throws Exception {
PlcAddress plcAddress = null; PlcAddress plcAddress = null;
try { try {
if (!telegramJ.has(PARAM_RESOURCE) || !telegramJ.has(PARAM_ACTION)) plcAddress = parsePlcAddress(telegramJ);
throw new IllegalArgumentException("Both " + PARAM_RESOURCE + " and " + PARAM_ACTION + " is required!");
String resource = telegramJ.get(PARAM_RESOURCE).getAsString();
String action = telegramJ.get(PARAM_ACTION).getAsString();
plcAddress = this.plcHandler.getPlcAddress(resource, action);
if (telegramJ.has(PARAM_VALUE)) { if (telegramJ.has(PARAM_VALUE)) {
String valueS = telegramJ.get(PARAM_VALUE).getAsString(); String valueS = telegramJ.get(PARAM_VALUE).getAsString();
Object value = plcAddress.valueType.parseValue(valueS); Object value = plcAddress.valueType.parseValue(valueS);
this.plcHandler.send(resource, action, value, false, false); this.plcHandler.send(plcAddress.resource, plcAddress.action, value, false, false);
} else { } else {
this.plcHandler.send(resource, action, false, false); this.plcHandler.send(plcAddress.resource, plcAddress.action, false, false);
} }
telegramJ.addProperty(PARAM_STATE, PlcResponseState.Done.name()); telegramJ.addProperty(PARAM_STATE, PlcResponseState.Done.name());
telegramJ.addProperty(PARAM_STATE_MSG, ""); telegramJ.addProperty(PARAM_STATE_MSG, "");
} catch (Exception e) { } catch (Exception e) {
handleFailedTelegram(telegramJ, plcAddress, e);
if (plcAddress == null) {
logger.error("Failed to handle telegram: " + telegramJ, e);
telegramJ.addProperty(PARAM_STATE, PlcResponseState.Failed.name());
telegramJ.addProperty(PARAM_STATE_MSG,
"Could not evaluate PlcAddress: " + getExceptionMessage(getRootCause(e), false));
} else {
logger.error("Failed to execute telegram: " + plcAddress.toKeyAddress(), e);
telegramJ.addProperty(PARAM_STATE, PlcResponseState.Failed.name());
telegramJ.addProperty(PARAM_STATE_MSG,
"Failed to perform " + plcAddress.toKey() + ": " + getExceptionMessage(getRootCause(e), false));
}
} }
// async sending of response sendDataToClient(telegramJ);
PlcAddress address = plcAddress;
addMsg(() -> { if (this.verbose)
sendDataToClient(telegramJ); logger.info("Sent Telegram response for " + (plcAddress == null ? "unknown" : plcAddress.toKey())
if (this.verbose) + " to server");
logger.info(
"Sent Telegram response for " + (address == null ? "unknown" : address.toKey()) + " to server");
return null;
});
} }
private void handleAuthResponse(PrivilegeContext ctx, JsonObject response) { private void handleAuthResponse(PrivilegeContext ctx, JsonObject response) {
@ -542,15 +538,15 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
continue; continue;
} }
Callable<?> callable = null; CheckedRunnable runnable = null;
try { try {
callable = this.messageQueue.takeFirst(); runnable = this.messageQueue.takeFirst();
callable.call(); runnable.run();
} catch (Exception e) { } catch (Exception e) {
closeBrokenGwSessionUpdateState("Failed to send message", closeBrokenGwSessionUpdateState("Failed to send message",
"Failed to send message, reconnecting in " + RETRY_DELAY + "s."); "Failed to send message, reconnecting in " + RETRY_DELAY + "s.");
if (callable != null) { if (runnable != null) {
this.messageQueue.addFirst(callable); this.messageQueue.addFirst(runnable);
logger.error( logger.error(
"Failed to send message, reconnecting in " + RETRY_DELAY + "s. And then retrying message.", "Failed to send message, reconnecting in " + RETRY_DELAY + "s. And then retrying message.",
e); e);
@ -584,9 +580,10 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
VersionQueryResult versionQueryResult = getContainer().getAgent().getVersion(); VersionQueryResult versionQueryResult = getContainer().getAgent().getVersion();
this.versions.add(AGENT_VERSION, versionQueryResult.getAgentVersion().toJson()); this.versions.add(AGENT_VERSION, versionQueryResult.getAgentVersion().toJson());
this.versions.add(APP_VERSION, versionQueryResult.getAppVersion().toJson()); this.versions.add(APP_VERSION, versionQueryResult.getAppVersion().toJson());
this.versions.add(COMPONENT_VERSIONS, this.versions.add(COMPONENT_VERSIONS, versionQueryResult.getComponentVersions()
versionQueryResult.getComponentVersions().stream().map(ComponentVersion::toJson) .stream()
.collect(JsonArray::new, JsonArray::add, JsonArray::addAll)); .map(ComponentVersion::toJson)
.collect(JsonArray::new, JsonArray::add, JsonArray::addAll));
} }
return this.versions; return this.versions;
@ -635,6 +632,30 @@ public class PlcGwClientHandler extends StrolchComponent implements GlobalPlcLis
notifyServer(address, value); notifyServer(address, value);
} }
private PlcAddress parsePlcAddress(JsonObject telegramJ) {
if (!telegramJ.has(PARAM_RESOURCE) || !telegramJ.has(PARAM_ACTION))
throw new IllegalArgumentException("Both " + PARAM_RESOURCE + " and " + PARAM_ACTION + " is required!");
String resource = telegramJ.get(PARAM_RESOURCE).getAsString();
String action = telegramJ.get(PARAM_ACTION).getAsString();
return this.plcHandler.getPlcAddress(resource, action);
}
private static void handleFailedTelegram(JsonObject telegramJ, PlcAddress plcAddress, Exception e) {
if (plcAddress == null) {
logger.error("Failed to handle telegram: " + telegramJ, e);
telegramJ.addProperty(PARAM_STATE, PlcResponseState.Failed.name());
telegramJ.addProperty(PARAM_STATE_MSG,
"Could not evaluate PlcAddress: " + getExceptionMessage(getRootCause(e), false));
} else {
logger.error("Failed to execute telegram: " + plcAddress.toKeyAddress(), e);
telegramJ.addProperty(PARAM_STATE, PlcResponseState.Failed.name());
telegramJ.addProperty(PARAM_STATE_MSG,
"Failed to perform " + plcAddress.toKey() + ": " + getExceptionMessage(getRootCause(e), false));
}
}
@ClientEndpoint @ClientEndpoint
public static class PlcGwClientEndpoint { public static class PlcGwClientEndpoint {

View File

@ -0,0 +1,8 @@
package li.strolch.plc.gw.server;
import li.strolch.plc.model.PlcAddressValueResponse;
public interface PlcAddressResponseValueListener {
void handleResponse(PlcAddressValueResponse response) throws Exception;
}

View File

@ -1,6 +1,8 @@
package li.strolch.plc.gw.server; package li.strolch.plc.gw.server;
import static java.util.stream.Collectors.toSet; import static java.util.stream.Collectors.toSet;
import static li.strolch.plc.model.ModelHelper.jsonToValue;
import static li.strolch.plc.model.ModelHelper.valueToJson;
import static li.strolch.plc.model.PlcConstants.*; import static li.strolch.plc.model.PlcConstants.*;
import static li.strolch.utils.collections.SynchronizedCollections.synchronizedMapOfLists; import static li.strolch.utils.collections.SynchronizedCollections.synchronizedMapOfLists;
import static li.strolch.utils.helper.ExceptionHelper.getExceptionMessageWithCauses; import static li.strolch.utils.helper.ExceptionHelper.getExceptionMessageWithCauses;
@ -8,7 +10,6 @@ import static li.strolch.websocket.WebSocketRemoteIp.get;
import javax.websocket.CloseReason; import javax.websocket.CloseReason;
import javax.websocket.PongMessage; import javax.websocket.PongMessage;
import javax.websocket.RemoteEndpoint.Basic;
import javax.websocket.Session; import javax.websocket.Session;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
@ -120,6 +121,10 @@ public class PlcGwServerHandler extends StrolchComponent {
this.plcConnectionStateListeners.addElement(plcId, listener); this.plcConnectionStateListeners.addElement(plcId, listener);
} }
public void unregister(String plcId, PlcConnectionStateListener listener) {
this.plcConnectionStateListeners.removeElement(plcId, listener);
}
public void register(String plcId, PlcAddressKey addressKey, PlcNotificationListener listener) { public void register(String plcId, PlcAddressKey addressKey, PlcNotificationListener listener) {
DBC.PRE.assertNotNull("addressKey must not be null", addressKey); DBC.PRE.assertNotNull("addressKey must not be null", addressKey);
DBC.PRE.assertNotEmpty("plcId must not be empty", plcId); DBC.PRE.assertNotEmpty("plcId must not be empty", plcId);
@ -160,52 +165,20 @@ public class PlcGwServerHandler extends StrolchComponent {
return super.runAsWithResult(this.runAsUser, runnable); return super.runAsWithResult(this.runAsUser, runnable);
} }
public void sendMessage(PlcAddressKey addressKey, String plcId, boolean value, public void sendMessage(PlcAddressKey addressKey, String plcId, Object value, PlcAddressResponseListener listener) {
PlcAddressResponseListener listener) { sendMessage(addressKey, plcId, value == null ? null : valueToJson(value), listener);
sendMessage(addressKey, plcId, new JsonPrimitive(value), listener);
}
public void sendMessage(PlcAddressKey addressKey, String plcId, int value, PlcAddressResponseListener listener) {
sendMessage(addressKey, plcId, new JsonPrimitive(value), listener);
}
public void sendMessage(PlcAddressKey addressKey, String plcId, double value, PlcAddressResponseListener listener) {
sendMessage(addressKey, plcId, new JsonPrimitive(value), listener);
}
public void sendMessage(PlcAddressKey addressKey, String plcId, String value, PlcAddressResponseListener listener) {
sendMessage(addressKey, plcId, new JsonPrimitive(value), listener);
} }
public void sendMessage(PlcAddressKey addressKey, String plcId, PlcAddressResponseListener listener) { public void sendMessage(PlcAddressKey addressKey, String plcId, PlcAddressResponseListener listener) {
sendMessage(addressKey, plcId, (JsonPrimitive) null, listener); sendMessage(addressKey, plcId, null, listener);
}
public PlcAddressResponse sendMessageSync(PlcAddressKey addressKey, String plcId, boolean value) {
return sendMessageSync(addressKey, plcId, new JsonPrimitive(value));
}
public PlcAddressResponse sendMessage(PlcAddressKey addressKey, String plcId, int value) {
return sendMessageSync(addressKey, plcId, new JsonPrimitive(value));
}
public PlcAddressResponse sendMessage(PlcAddressKey addressKey, String plcId, double value) {
return sendMessageSync(addressKey, plcId, new JsonPrimitive(value));
}
public PlcAddressResponse sendMessage(PlcAddressKey addressKey, String plcId, String value) {
return sendMessageSync(addressKey, plcId, new JsonPrimitive(value));
}
public PlcAddressResponse sendMessage(PlcAddressKey addressKey, String plcId) {
return sendMessageSync(addressKey, plcId, (JsonPrimitive) null);
} }
public PlcAddressResponse sendMessageSync(PlcAddressKey addressKey, String plcId) { public PlcAddressResponse sendMessageSync(PlcAddressKey addressKey, String plcId) {
return sendMessageSync(addressKey, plcId, null); return sendMessageSync(addressKey, plcId, null);
} }
public PlcAddressResponse sendMessageSync(PlcAddressKey addressKey, String plcId, JsonPrimitive valueJ) { public PlcAddressResponse sendMessageSync(PlcAddressKey addressKey, String plcId, Object value) {
JsonPrimitive valueJ = value == null ? null : valueToJson(value);
PlcAddressResponse[] response = new PlcAddressResponse[1]; PlcAddressResponse[] response = new PlcAddressResponse[1];
@ -228,44 +201,30 @@ public class PlcGwServerHandler extends StrolchComponent {
private void sendMessage(PlcAddressKey addressKey, String plcId, JsonPrimitive valueJ, private void sendMessage(PlcAddressKey addressKey, String plcId, JsonPrimitive valueJ,
PlcAddressResponseListener listener) { PlcAddressResponseListener listener) {
PlcSession plcSession = getPlcSession(plcId);
PlcSession plcSession = this.plcSessionsByPlcId.get(plcId);
if (plcSession == null)
throw new IllegalStateException("PLC " + plcId + " is not connected!");
assertPlcAuthed(plcId, plcSession.session.getId());
getExecutorService(THREAD_POOL).submit(() -> send(plcSession, addressKey, valueJ, listener)); getExecutorService(THREAD_POOL).submit(() -> send(plcSession, addressKey, valueJ, listener));
} }
public void asyncGetAddressState(PlcAddressKey addressKey, String plcId, PlcAddressResponseValueListener listener) {
PlcSession plcSession = getPlcSession(plcId);
getExecutorService(THREAD_POOL).submit(() -> asyncGetAddressState(plcSession, addressKey, listener));
}
private void send(PlcSession plcSession, PlcAddressKey plcAddressKey, JsonPrimitive valueJ, private void send(PlcSession plcSession, PlcAddressKey plcAddressKey, JsonPrimitive valueJ,
PlcAddressResponseListener listener) { PlcAddressResponseListener listener) {
if (valueJ == null) if (valueJ == null)
logger.info("Sending " + plcAddressKey + " to " + plcSession.plcId + "..."); logger.info("Sending " + plcAddressKey + " to " + plcSession.plcId + "...");
else else
logger.info("Sending " + plcAddressKey + " with value " + valueJ + " to " + plcSession.plcId + "..."); logger.info("Sending " + plcAddressKey + " = " + valueJ + " to " + plcSession.plcId + "...");
PlcAddressResponse plcResponse = new PlcAddressResponse(plcSession.plcId, plcAddressKey); PlcAddressResponse plcResponse = new PlcAddressResponse(plcSession.plcId, plcAddressKey);
plcResponse.setListener(() -> handleResponse(listener, plcResponse)); plcResponse.setListener(() -> listener.handleResponse(plcResponse));
try { try {
String data = buildJsonTelegram(plcSession.plcId, plcAddressKey, valueJ, plcResponse).toString();
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty(PARAM_SEQUENCE_ID, plcResponse.getSequenceId());
jsonObject.addProperty(PARAM_MESSAGE_TYPE, MSG_TYPE_PLC_TELEGRAM);
jsonObject.addProperty(PARAM_PLC_ID, plcSession.plcId);
jsonObject.addProperty(PARAM_RESOURCE, plcAddressKey.resource);
jsonObject.addProperty(PARAM_ACTION, plcAddressKey.action);
if (valueJ != null)
jsonObject.add(PARAM_VALUE, valueJ);
String data = jsonObject.toString();
this.plcResponses.put(plcResponse.getSequenceId(), plcResponse); this.plcResponses.put(plcResponse.getSequenceId(), plcResponse);
sendDataToClient(data, plcSession.session);
synchronized (plcSession.session) {
sendDataToClient(data, plcSession.session.getBasicRemote());
}
} catch (Exception e) { } catch (Exception e) {
logger.error("Failed to send " + plcAddressKey + " to PLC " + plcSession.plcId, e); logger.error("Failed to send " + plcAddressKey + " to PLC " + plcSession.plcId, e);
@ -281,14 +240,57 @@ public class PlcGwServerHandler extends StrolchComponent {
} }
} }
private void handleResponse(PlcAddressResponseListener listener, PlcAddressResponse response) { private void asyncGetAddressState(PlcSession plcSession, PlcAddressKey plcAddressKey,
PlcAddressResponseValueListener listener) {
logger.info("Requesting value for address " + plcAddressKey + " from PLC " + plcSession.plcId + "...");
PlcAddressValueResponse plcResponse = new PlcAddressValueResponse(plcSession.plcId, plcAddressKey);
plcResponse.setListener(() -> listener.handleResponse(plcResponse));
try { try {
listener.handleResponse(response); String data = buildJsonGetAddressStateTelegram(plcSession.plcId, plcAddressKey, plcResponse).toString();
this.plcResponses.put(plcResponse.getSequenceId(), plcResponse);
sendDataToClient(data, plcSession.session);
} catch (Exception e) { } catch (Exception e) {
logger.error("Failed to notify listener " + listener + " for response of " + response, e); logger.error("Failed to get address state for " + plcAddressKey + " from PLC " + plcSession.plcId, e);
plcResponse.setState(PlcResponseState.Failed);
plcResponse.setStateMsg(
"Failed to get address state for " + plcAddressKey + " from PLC " + plcSession.plcId + ": "
+ getExceptionMessageWithCauses(e));
try {
listener.handleResponse(plcResponse);
} catch (Exception ex) {
logger.error("Failed to notify listener " + listener, ex);
}
} }
} }
private static JsonObject buildJsonTelegram(String plcId, PlcAddressKey plcAddressKey, JsonPrimitive valueJ,
PlcAddressResponse plcResponse) {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty(PARAM_SEQUENCE_ID, plcResponse.getSequenceId());
jsonObject.addProperty(PARAM_MESSAGE_TYPE, MSG_TYPE_PLC_TELEGRAM);
jsonObject.addProperty(PARAM_PLC_ID, plcId);
jsonObject.addProperty(PARAM_RESOURCE, plcAddressKey.resource);
jsonObject.addProperty(PARAM_ACTION, plcAddressKey.action);
if (valueJ != null)
jsonObject.add(PARAM_VALUE, valueJ);
return jsonObject;
}
private static JsonObject buildJsonGetAddressStateTelegram(String plcId, PlcAddressKey plcAddressKey,
PlcAddressResponse plcResponse) {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty(PARAM_SEQUENCE_ID, plcResponse.getSequenceId());
jsonObject.addProperty(PARAM_MESSAGE_TYPE, MSG_TYPE_PLC_GET_ADDRESS_STATE);
jsonObject.addProperty(PARAM_PLC_ID, plcId);
jsonObject.addProperty(PARAM_RESOURCE, plcAddressKey.resource);
jsonObject.addProperty(PARAM_ACTION, plcAddressKey.action);
return jsonObject;
}
private PlcSession assertPlcAuthed(String plcId, String sessionId) throws NotAuthenticatedException { private PlcSession assertPlcAuthed(String plcId, String sessionId) throws NotAuthenticatedException {
PlcSession plcSession = this.plcSessionsBySessionId.get(sessionId); PlcSession plcSession = this.plcSessionsBySessionId.get(sessionId);
if (plcSession.certificate == null) if (plcSession.certificate == null)
@ -309,13 +311,16 @@ public class PlcGwServerHandler extends StrolchComponent {
return plcSession; return plcSession;
} }
private void sendDataToClient(String data, Basic basic) throws IOException { private void sendDataToClient(String data, Session session) throws IOException {
int pos = 0; //noinspection SynchronizationOnLocalVariableOrMethodParameter
while (pos + 8192 < data.length()) { synchronized (session) {
basic.sendText(data.substring(pos, pos + 8192), false); int pos = 0;
pos += 8192; while (pos + 8192 < data.length()) {
session.getBasicRemote().sendText(data.substring(pos, pos + 8192), false);
pos += 8192;
}
session.getBasicRemote().sendText(data.substring(pos), true);
} }
basic.sendText(data.substring(pos), true);
} }
public void onWsMessage(String message, Session session) throws IOException { public void onWsMessage(String message, Session session) throws IOException {
@ -333,6 +338,8 @@ public class PlcGwServerHandler extends StrolchComponent {
case MSG_TYPE_AUTHENTICATION -> handleAuth(session, jsonObject); case MSG_TYPE_AUTHENTICATION -> handleAuth(session, jsonObject);
case MSG_TYPE_PLC_NOTIFICATION -> handleNotification(assertPlcAuthed(plcId, session.getId()), jsonObject); case MSG_TYPE_PLC_NOTIFICATION -> handleNotification(assertPlcAuthed(plcId, session.getId()), jsonObject);
case MSG_TYPE_PLC_TELEGRAM -> handleTelegramResponse(assertPlcAuthed(plcId, session.getId()), jsonObject); case MSG_TYPE_PLC_TELEGRAM -> handleTelegramResponse(assertPlcAuthed(plcId, session.getId()), jsonObject);
case MSG_TYPE_PLC_GET_ADDRESS_STATE ->
handleGetAddressStateResponse(assertPlcAuthed(plcId, session.getId()), jsonObject);
case MSG_TYPE_STATE_NOTIFICATION -> handleStateMsg(assertPlcAuthed(plcId, session.getId()), jsonObject); case MSG_TYPE_STATE_NOTIFICATION -> handleStateMsg(assertPlcAuthed(plcId, session.getId()), jsonObject);
case MSG_TYPE_MESSAGE -> { case MSG_TYPE_MESSAGE -> {
assertPlcAuthed(plcId, session.getId()); assertPlcAuthed(plcId, session.getId());
@ -393,7 +400,6 @@ public class PlcGwServerHandler extends StrolchComponent {
} }
private void handleTelegramResponse(PlcSession plcSession, JsonObject responseJ) { private void handleTelegramResponse(PlcSession plcSession, JsonObject responseJ) {
long sequenceId = responseJ.get(PARAM_SEQUENCE_ID).getAsLong(); long sequenceId = responseJ.get(PARAM_SEQUENCE_ID).getAsLong();
PlcResponse plcResponse = this.plcResponses.remove(sequenceId); PlcResponse plcResponse = this.plcResponses.remove(sequenceId);
if (plcResponse == null) { if (plcResponse == null) {
@ -406,7 +412,38 @@ public class PlcGwServerHandler extends StrolchComponent {
plcResponse.setState(PlcResponseState.valueOf(state)); plcResponse.setState(PlcResponseState.valueOf(state));
plcResponse.setStateMsg(stateMsg); plcResponse.setStateMsg(stateMsg);
plcResponse.getListener().run(); try {
plcResponse.getListener().run();
} catch (Exception e) {
logger.error("Failed to notify listener " + plcResponse.getListener() + " for response of " + plcResponse,
e);
}
}
private void handleGetAddressStateResponse(PlcSession plcSession, JsonObject responseJ) {
long sequenceId = responseJ.get(PARAM_SEQUENCE_ID).getAsLong();
PlcResponse response = this.plcResponses.remove(sequenceId);
if (response == null) {
logger.error(plcSession.plcId + ": PlcResponse does not exist for sequenceId " + sequenceId);
return;
}
if (!(response instanceof PlcAddressValueResponse plcResponse))
throw new IllegalStateException(
"Performing a GetAddressState response handling, but listener is wrong: " + response);
String state = responseJ.get(PARAM_STATE).getAsString();
String stateMsg = responseJ.get(PARAM_STATE_MSG).getAsString();
plcResponse.setState(PlcResponseState.valueOf(state));
plcResponse.setStateMsg(stateMsg);
plcResponse.setValue(jsonToValue(responseJ.getAsJsonPrimitive(PARAM_VALUE)));
try {
plcResponse.getListener().run();
} catch (Exception e) {
logger.error("Failed to notify listener " + plcResponse.getListener() + " for response of " + plcResponse,
e);
}
} }
private void handleMessage(JsonObject jsonObject) { private void handleMessage(JsonObject jsonObject) {
@ -481,10 +518,7 @@ public class PlcGwServerHandler extends StrolchComponent {
private void sendAuthResponse(PlcSession plcSession, JsonObject jsonObject) { private void sendAuthResponse(PlcSession plcSession, JsonObject jsonObject) {
try { try {
String data = jsonObject.toString(); sendDataToClient(jsonObject.toString(), plcSession.session);
synchronized (plcSession.session) {
sendDataToClient(data, plcSession.session.getBasicRemote());
}
logger.info(plcSession.plcId + ": Sent " + MSG_TYPE_AUTHENTICATION + " response on Session " logger.info(plcSession.plcId + ": Sent " + MSG_TYPE_AUTHENTICATION + " response on Session "
+ plcSession.session.getId()); + plcSession.session.getId());
} catch (Exception e) { } catch (Exception e) {
@ -706,6 +740,15 @@ public class PlcGwServerHandler extends StrolchComponent {
logger.error(session.getId() + ": Error: " + throwable.getMessage(), throwable); logger.error(session.getId() + ": Error: " + throwable.getMessage(), throwable);
} }
private PlcSession getPlcSession(String plcId) {
PlcSession plcSession = this.plcSessionsByPlcId.get(plcId);
if (plcSession == null)
throw new IllegalStateException("PLC " + plcId + " is not connected!");
assertPlcAuthed(plcId, plcSession.session.getId());
return plcSession;
}
public static class PlcSession { public static class PlcSession {
public final String plcId; public final String plcId;
public final Session session; public final Session session;

View File

@ -1,5 +1,6 @@
package li.strolch.plc.gw.server; package li.strolch.plc.gw.server;
import static li.strolch.plc.model.ModelHelper.valueToJson;
import static li.strolch.plc.model.PlcConstants.TYPE_PLC; import static li.strolch.plc.model.PlcConstants.TYPE_PLC;
import static li.strolch.runtime.StrolchConstants.SYSTEM_USER_AGENT; import static li.strolch.runtime.StrolchConstants.SYSTEM_USER_AGENT;
import static li.strolch.utils.helper.ExceptionHelper.getCallerMethod; import static li.strolch.utils.helper.ExceptionHelper.getCallerMethod;
@ -68,10 +69,17 @@ public abstract class PlcGwService implements PlcNotificationListener, PlcAddres
this.state = PlcServiceState.Unregistered; this.state = PlcServiceState.Unregistered;
} }
protected PlcAddressKey register(String resource, String action) { protected void register(String resource, String action) {
PlcAddressKey addressKey = keyFor(resource, action); PlcAddressKey addressKey = keyFor(resource, action);
this.plcHandler.register(this.plcId, addressKey, this); this.plcHandler.register(this.plcId, addressKey, this);
return addressKey; }
protected void register(PlcConnectionStateListener listener) {
this.plcHandler.register(this.plcId, listener);
}
protected void unregister(PlcConnectionStateListener listener) {
this.plcHandler.unregister(this.plcId, listener);
} }
protected void register(PlcAddressKey key) { protected void register(PlcAddressKey key) {
@ -90,25 +98,20 @@ public abstract class PlcGwService implements PlcNotificationListener, PlcAddres
return PlcAddressKey.keyFor(resource, action); return PlcAddressKey.keyFor(resource, action);
} }
public void sendMessage(PlcAddressKey addressKey, String plcId, boolean value, public void sendMessage(String resource, String action, Object value, PlcAddressResponseListener listener) {
PlcAddressResponseListener listener) { this.plcHandler.sendMessage(keyFor(resource, action), this.plcId, valueToJson(value), listener);
this.plcHandler.sendMessage(addressKey, plcId, value, listener);
} }
public void sendMessage(PlcAddressKey addressKey, String plcId, int value, PlcAddressResponseListener listener) { public void sendMessage(PlcAddressKey addressKey, Object value, PlcAddressResponseListener listener) {
this.plcHandler.sendMessage(addressKey, plcId, value, listener); this.plcHandler.sendMessage(addressKey, this.plcId, value, listener);
} }
public void sendMessage(PlcAddressKey addressKey, String plcId, double value, PlcAddressResponseListener listener) { public void readState(String resource, String action, PlcAddressResponseValueListener listener) {
this.plcHandler.sendMessage(addressKey, plcId, value, listener); this.plcHandler.asyncGetAddressState(keyFor(resource, action), this.plcId, listener);
} }
public void sendMessage(PlcAddressKey addressKey, String plcId, String value, PlcAddressResponseListener listener) { public void readState(PlcAddressKey addressKey, PlcAddressResponseValueListener listener) {
this.plcHandler.sendMessage(addressKey, plcId, value, listener); this.plcHandler.asyncGetAddressState(addressKey, this.plcId, listener);
}
public void sendMessage(PlcAddressKey addressKey, String plcId, PlcAddressResponseListener listener) {
this.plcHandler.sendMessage(addressKey, plcId, listener);
} }
@Override @Override

View File

@ -0,0 +1,26 @@
package li.strolch.plc.gw.server;
import li.strolch.plc.model.PlcAddressKey;
import li.strolch.plc.model.PlcAddressValueResponse;
public abstract class ReadStatePlcGwService extends PlcGwService implements PlcConnectionStateListener {
public ReadStatePlcGwService(String plcId, PlcGwServerHandler plcHandler) {
super(plcId, plcHandler);
}
protected void handleGetState(PlcAddressValueResponse response) {
PlcAddressKey addressKey = response.getPlcAddressKey();
if (response.isFailed()) {
logger.error("Failed to read value for address " + addressKey + ": " + response.getStateMsg());
} else {
storeAddressState(addressKey, response.getValue());
}
}
protected void readState(String resource, String action) {
this.plcHandler.asyncGetAddressState(keyFor(resource, action), this.plcId, this::handleGetState);
}
protected abstract void storeAddressState(PlcAddressKey addressKey, Object value);
}

View File

@ -2,7 +2,6 @@ package li.strolch.plc.gw.server.service;
import static li.strolch.plc.model.PlcConstants.*; import static li.strolch.plc.model.PlcConstants.*;
import com.google.gson.JsonPrimitive;
import li.strolch.model.StrolchValueType; import li.strolch.model.StrolchValueType;
import li.strolch.persistence.api.StrolchTransaction; import li.strolch.persistence.api.StrolchTransaction;
import li.strolch.plc.gw.server.PlcGwServerHandler; import li.strolch.plc.gw.server.PlcGwServerHandler;
@ -77,17 +76,6 @@ public class SendPlcTelegramCommand extends Command {
// sending with a value // sending with a value
StrolchValueType valueType = StrolchValueType.parse(this.valueTypeS); StrolchValueType valueType = StrolchValueType.parse(this.valueTypeS);
Object value = valueType.parseValue(valueS); this.response = plcHandler.sendMessageSync(addressKey, this.plcId, valueType.parseValue(valueS));
JsonPrimitive valueJ;
if (value instanceof String)
valueJ = new JsonPrimitive((String) value);
else if (value instanceof Number)
valueJ = new JsonPrimitive((Number) value);
else if (value instanceof Boolean)
valueJ = new JsonPrimitive((Boolean) value);
else
throw new IllegalArgumentException("Unhandled value type " + valueType);
this.response = plcHandler.sendMessageSync(addressKey, this.plcId, valueJ);
} }
} }

View File

@ -4,4 +4,16 @@ public enum ConnectionState {
Connected, Connected,
Disconnected, Disconnected,
Failed; Failed;
public boolean isConnected() {
return this == Connected;
}
public boolean isDisconnected() {
return this == Disconnected;
}
public boolean isFailed() {
return this == Failed;
}
} }

View File

@ -0,0 +1,27 @@
package li.strolch.plc.model;
import com.google.gson.JsonPrimitive;
public class ModelHelper {
public static JsonPrimitive valueToJson(Object value) {
if (value instanceof Boolean)
return new JsonPrimitive((Boolean) value);
else if (value instanceof Number)
return new JsonPrimitive((Number) value);
else if (value instanceof String)
return new JsonPrimitive((String) value);
throw new IllegalArgumentException(
"Unhandled value type " + (value == null ? "(null)" : value.getClass().getName()));
}
public static Object jsonToValue(JsonPrimitive valueJ) {
if (valueJ.isBoolean())
return valueJ.getAsBoolean();
else if (valueJ.isNumber())
return valueJ.getAsNumber();
else if (valueJ.isString())
return valueJ.getAsString();
throw new IllegalArgumentException("Unhandled value type " + valueJ);
}
}

View File

@ -0,0 +1,46 @@
package li.strolch.plc.model;
public class PlcAddressValueResponse extends PlcAddressResponse {
private Object value;
public PlcAddressValueResponse(String plcId, PlcAddressKey plcAddressKey) {
super(plcId, plcAddressKey);
}
public void setValue(Object value) {
this.value = value;
}
public Object getValue() {
return this.value;
}
public boolean getValueAsBoolean() {
return (Boolean) this.value;
}
public int getValueAsInt() {
return ((Number) this.value).intValue();
}
public double getValueAsDouble() {
return ((Number) this.value).doubleValue();
}
public String getValueAsString() {
return ((String) this.value);
}
@Override
public PlcAddressValueResponse state(PlcResponseState state, String stateMsg) {
super.state(state, stateMsg);
return this;
}
@Override
public String toString() {
return "PlcAddressValueResponse{" + "plcId='" + plcId + '\'' + ", sequenceId=" + sequenceId + ", state=" + state
+ ", stateMsg='" + stateMsg + '\'' + ", value=" + value + '}';
}
}

View File

@ -60,6 +60,7 @@ public class PlcConstants {
public static final String MSG_TYPE_AUTHENTICATION = "Authentication"; public static final String MSG_TYPE_AUTHENTICATION = "Authentication";
public static final String MSG_TYPE_PLC_NOTIFICATION = "PlcNotification"; public static final String MSG_TYPE_PLC_NOTIFICATION = "PlcNotification";
public static final String MSG_TYPE_PLC_TELEGRAM = "PlcTelegram"; public static final String MSG_TYPE_PLC_TELEGRAM = "PlcTelegram";
public static final String MSG_TYPE_PLC_GET_ADDRESS_STATE = "PlcGetAddressState";
public static final String MSG_TYPE_MESSAGE = "Message"; public static final String MSG_TYPE_MESSAGE = "Message";
public static final String MSG_TYPE_DISABLE_MESSAGE = "DisableMessage"; public static final String MSG_TYPE_DISABLE_MESSAGE = "DisableMessage";
public static final String MSG_TYPE_STATE_NOTIFICATION = "StateNotification"; public static final String MSG_TYPE_STATE_NOTIFICATION = "StateNotification";

View File

@ -1,5 +1,7 @@
package li.strolch.plc.model; package li.strolch.plc.model;
import li.strolch.utils.CheckedRunnable;
public class PlcResponse { public class PlcResponse {
private static long lastSequenceId = System.currentTimeMillis(); private static long lastSequenceId = System.currentTimeMillis();
@ -9,7 +11,7 @@ public class PlcResponse {
protected PlcResponseState state; protected PlcResponseState state;
protected String stateMsg; protected String stateMsg;
private Runnable listener; private CheckedRunnable listener;
public PlcResponse(String plcId) { public PlcResponse(String plcId) {
this.plcId = plcId; this.plcId = plcId;
@ -60,11 +62,11 @@ public class PlcResponse {
this.stateMsg = stateMsg; this.stateMsg = stateMsg;
} }
public Runnable getListener() { public CheckedRunnable getListener() {
return this.listener; return this.listener;
} }
public void setListener(Runnable listener) { public void setListener(CheckedRunnable listener) {
this.listener = listener; this.listener = listener;
} }
} }