[Major] Refactored state management of PLCs

Now one can register for connection states using PlcConnectionStateListener and perform appropriate work when a PLC connects or disconnects
This commit is contained in:
Robert von Burg 2022-10-06 17:42:24 +02:00
parent 110a7ffe3f
commit 20bd342651
Signed by: eitch
GPG Key ID: 75DB9C85C74331F7
4 changed files with 226 additions and 55 deletions

View File

@ -0,0 +1,9 @@
package li.strolch.plc.gw.server;
import li.strolch.plc.model.ConnectionState;
import li.strolch.plc.model.PlcAddressKey;
public interface PlcConnectionStateListener {
void handleConnectionState(String plcId, ConnectionState connectionState);
}

View File

@ -1,10 +1,10 @@
package li.strolch.plc.gw.server;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static li.strolch.plc.model.PlcConstants.*;
import static li.strolch.utils.collections.SynchronizedCollections.synchronizedMapOfLists;
import static li.strolch.utils.helper.ExceptionHelper.getExceptionMessageWithCauses;
import static li.strolch.websocket.WebSocketRemoteIp.*;
import static li.strolch.websocket.WebSocketRemoteIp.get;
import javax.websocket.CloseReason;
import javax.websocket.PongMessage;
@ -14,6 +14,7 @@ import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import com.google.gson.JsonObject;
@ -21,6 +22,7 @@ import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import li.strolch.agent.api.ComponentContainer;
import li.strolch.agent.api.StrolchComponent;
import li.strolch.exception.StrolchNotAuthenticatedException;
import li.strolch.handler.operationslog.OperationsLog;
import li.strolch.model.Locator;
import li.strolch.model.log.LogMessage;
@ -37,7 +39,6 @@ import li.strolch.runtime.privilege.PrivilegedRunnable;
import li.strolch.runtime.privilege.PrivilegedRunnableWithResult;
import li.strolch.utils.collections.MapOfLists;
import li.strolch.utils.dbc.DBC;
import li.strolch.websocket.WebSocketRemoteIp;
public class PlcGwServerHandler extends StrolchComponent {
@ -45,19 +46,26 @@ public class PlcGwServerHandler extends StrolchComponent {
public static final String THREAD_POOL = "PlcRequests";
private String runAsUser;
private String realm;
private Set<String> plcIds;
private PlcStateHandler plcStateHandler;
private Map<String, PlcSession> plcSessionsBySessionId;
private Map<String, PlcSession> plcSessionsByPlcId;
private MapOfLists<String, PlcConnectionStateListener> plcConnectionStateListeners;
private Map<String, MapOfLists<PlcAddressKey, PlcNotificationListener>> plcAddressListenersByPlcId;
private Map<Long, PlcResponse> plcResponses;
private ScheduledFuture<?> clearDeadConnectionsTask;
public PlcGwServerHandler(ComponentContainer container, String componentName) {
super(container, componentName);
}
public String getRealm() {
return this.realm;
}
public Set<String> getPlcIds() {
return this.plcIds;
}
@ -66,26 +74,52 @@ public class PlcGwServerHandler extends StrolchComponent {
public void initialize(ComponentConfiguration configuration) throws Exception {
this.runAsUser = configuration.getString("runAsUser", "plc-server");
this.realm = getContainer().getRealmNames().iterator().next();
this.plcIds = runAsAgentWithResult(
ctx -> getContainer().getPrivilegeHandler().getPrivilegeHandler().getUsers(ctx.getCertificate())
.stream() //
.filter(user -> user.hasRole(ROLE_PLC)).map(UserRep::getUsername) //
.collect(toSet()));
this.plcIds = runAsAgentWithResult(ctx -> getContainer().getPrivilegeHandler()
.getPrivilegeHandler()
.getUsers(ctx.getCertificate())
.stream() //
.filter(user -> user.hasRole(ROLE_PLC))
.map(UserRep::getUsername) //
.collect(toSet()));
this.plcStateHandler = new PlcStateHandler(getContainer());
this.plcStateHandler = getPlcStateHandler();
this.plcSessionsBySessionId = new ConcurrentHashMap<>();
this.plcSessionsByPlcId = new ConcurrentHashMap<>();
this.plcConnectionStateListeners = synchronizedMapOfLists(new MapOfLists<>());
this.plcAddressListenersByPlcId = new ConcurrentHashMap<>();
this.plcResponses = new ConcurrentHashMap<>();
super.initialize(configuration);
}
@Override
public void start() throws Exception {
this.clearDeadConnectionsTask = getAgent().getScheduledExecutor(getName())
.scheduleWithFixedDelay(this::clearDeadConnections, 10, 10, TimeUnit.SECONDS);
super.start();
}
@Override
public void stop() throws Exception {
if (this.clearDeadConnectionsTask != null)
this.clearDeadConnectionsTask.cancel(true);
super.stop();
}
protected PlcStateHandler getPlcStateHandler() {
return new PlcStateHandler(this);
}
public boolean isPlcConnected(String plcId) {
DBC.PRE.assertNotEmpty("plcId must not be empty", plcId);
return this.plcSessionsByPlcId.containsKey(plcId);
}
public void register(String plcId, PlcConnectionStateListener listener) {
this.plcConnectionStateListeners.addElement(plcId, listener);
}
public void register(String plcId, PlcAddressKey addressKey, PlcNotificationListener listener) {
DBC.PRE.assertNotNull("addressKey must not be null", addressKey);
DBC.PRE.assertNotEmpty("plcId must not be empty", plcId);
@ -379,6 +413,11 @@ public class PlcGwServerHandler extends StrolchComponent {
JsonObject msgJ = jsonObject.get(PARAM_MESSAGE).getAsJsonObject();
LogMessage logMessage = LogMessage.fromJson(msgJ);
logger.info("Received message " + logMessage.getLocator());
if (!logMessage.getRealm().equals(this.realm))
throw new IllegalStateException(
"Unexpected realm in message " + logMessage.getId() + " " + logMessage.getLocator() + " "
+ logMessage.getMessage());
OperationsLog log = getComponent(OperationsLog.class);
log.updateState(logMessage.getRealm(), logMessage.getLocator(), LogMessageState.Inactive);
log.addMessage(logMessage);
@ -387,8 +426,10 @@ public class PlcGwServerHandler extends StrolchComponent {
private void handleDisableMessage(JsonObject jsonObject) {
String realm = jsonObject.get(PARAM_REALM).getAsString();
Locator locator = Locator.valueOf(jsonObject.get(PARAM_LOCATOR).getAsString());
logger.info("Received disable for messages with locator " + locator);
if (!realm.equals(this.realm))
throw new IllegalStateException("Unexpected realm in disable message action for message " + locator);
logger.info("Received disable for messages with locator " + locator);
OperationsLog operationsLog = getComponent(OperationsLog.class);
operationsLog.updateState(realm, locator, LogMessageState.Inactive);
}
@ -465,12 +506,7 @@ public class PlcGwServerHandler extends StrolchComponent {
String plcId = new String(message.getApplicationData().array());
PlcSession plcSession = this.plcSessionsBySessionId.get(session.getId());
if (plcSession != null) {
plcSession.lastUpdate = System.currentTimeMillis();
logger.info(
"PLC " + plcId + " with SessionId " + plcSession.session.getId() + " is still alive on certificate "
+ (plcSession.certificate == null ? null : plcSession.certificate.getSessionId()));
} else {
if (plcSession == null) {
plcSession = new PlcSession(plcId, session);
plcSession.lastUpdate = System.currentTimeMillis();
@ -495,17 +531,43 @@ public class PlcGwServerHandler extends StrolchComponent {
}
if (plcSession.certificate != null) {
StrolchSessionHandler sessionHandler = getContainer().getComponent(StrolchSessionHandler.class);
sessionHandler.validate(plcSession.certificate);
try {
StrolchSessionHandler sessionHandler = getContainer().getComponent(StrolchSessionHandler.class);
sessionHandler.validate(plcSession.certificate);
this.plcStateHandler.handleStillConnected(plcSession);
plcSession.lastUpdate = System.currentTimeMillis();
logger.info("PLC " + plcId + " with SessionId " + session.getId() + " is still alive on certificate "
+ plcSession.certificate.getSessionId());
this.plcStateHandler.handleStillConnected(plcSession);
} catch (StrolchNotAuthenticatedException e) {
logger.error("PLC session " + session.getId() + " is not authenticated anymore for plc " + plcId
+ ". Closing session.");
this.plcSessionsBySessionId.remove(plcId);
PlcSession registeredSession = this.plcSessionsByPlcId.get(plcId);
if (registeredSession != null && registeredSession.session.getId().equals(session.getId())) {
this.plcSessionsByPlcId.remove(plcId);
}
try {
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (session) {
session.close(new CloseReason(CloseReason.CloseCodes.NOT_CONSISTENT, "Stale session"));
}
} catch (Exception e1) {
logger.error("Failed to close session " + session.getId(), e1);
}
}
}
}
private void clearDeadConnections() {
// find all sessions which are timed out
List<PlcSession> expiredSessions = this.plcSessionsBySessionId.values().stream().filter(this::hasExpired)
List<PlcSession> expiredSessions = this.plcSessionsBySessionId.values()
.stream()
.filter(this::hasExpired)
.toList();
for (PlcSession plcSession : expiredSessions) {
@ -531,7 +593,17 @@ public class PlcGwServerHandler extends StrolchComponent {
}
this.plcSessionsBySessionId.remove(plcSession.session.getId());
this.plcSessionsByPlcId.remove(plcSession.plcId);
// see if this session is also still the registered session
// it might already have been overwritten by another session
PlcSession registeredSession = this.plcSessionsByPlcId.get(plcSession.plcId);
if (registeredSession != null && registeredSession.session.getId().equals(plcSession.session.getId())) {
this.plcSessionsByPlcId.remove(plcSession.plcId);
// handle state change
this.plcStateHandler.handlePlcState(plcSession, ConnectionState.Disconnected, "dead connection", null);
notifyObserversOfConnectionLost(plcSession.plcId);
}
}
}
@ -584,7 +656,9 @@ public class PlcGwServerHandler extends StrolchComponent {
logger.warn("Notifying PlcResponse listener " + plcResponse + " of connection lost!");
plcResponse.getListener().run();
} catch (Exception e) {
logger.error("Failed to notify PlcResponse listener " + plcResponse);
logger.error(
"Failed to notify PlcResponse listener " + plcResponse + " of connection lost to PLC " + plcId,
e);
}
}
@ -604,7 +678,26 @@ public class PlcGwServerHandler extends StrolchComponent {
for (PlcNotificationListener listener : listenersCopy) {
logger.warn("Notifying PlcNotificationListener " + addressKey + " with " + listener
+ " of connection lost!");
listener.handleConnectionLost();
try {
listener.handleConnectionLost();
} catch (Exception e) {
logger.error("Failed to notify listener " + listener + " of connection lost for PLC " + plcId, e);
}
}
}
}
public void notifyConnectionState(String plcId, ConnectionState connectionState) {
List<PlcConnectionStateListener> listeners = this.plcConnectionStateListeners.getList(plcId);
if (listeners == null)
return;
listeners = new ArrayList<>(listeners);
for (PlcConnectionStateListener listener : listeners) {
try {
listener.handleConnectionState(plcId, connectionState);
} catch (Exception e) {
logger.error("Failed to notify listener " + listener + " of new connection state " + connectionState
+ " for PLC " + plcId, e);
}
}
}

View File

@ -2,6 +2,7 @@ package li.strolch.plc.gw.server;
import static li.strolch.plc.model.PlcConstants.TYPE_PLC;
import static li.strolch.runtime.StrolchConstants.SYSTEM_USER_AGENT;
import static li.strolch.utils.helper.ExceptionHelper.getCallerMethod;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -20,6 +21,7 @@ import li.strolch.plc.model.PlcServiceState;
import li.strolch.privilege.model.PrivilegeContext;
import li.strolch.runtime.privilege.PrivilegedRunnable;
import li.strolch.runtime.privilege.PrivilegedRunnableWithResult;
import li.strolch.utils.CheckedBiFunction;
import li.strolch.utils.dbc.DBC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -42,6 +44,10 @@ public abstract class PlcGwService implements PlcNotificationListener, PlcAddres
this.state = PlcServiceState.Unregistered;
}
public ComponentContainer getContainer() {
return this.container;
}
public PlcServiceState getState() {
return this.state;
}
@ -115,6 +121,10 @@ public abstract class PlcGwService implements PlcNotificationListener, PlcAddres
throw new UnsupportedOperationException("Not implemented!");
}
protected boolean hasExecutionHandler() {
return this.container.hasComponent(ExecutionHandler.class);
}
protected ExecutionHandler getExecutionHandler() {
return this.container.getComponent(ExecutionHandler.class);
}
@ -135,17 +145,70 @@ public abstract class PlcGwService implements PlcNotificationListener, PlcAddres
try {
this.plcHandler.run(runnable);
} catch (Exception e) {
logger.error("Runnable " + runnable + " failed!", e);
if (hasOperationsLogs()) {
getOperationsLogs().addMessage(
new LogMessage(this.container.getRealmNames().iterator().next(), SYSTEM_USER_AGENT,
Resource.locatorFor(TYPE_PLC, this.plcId), LogSeverity.Exception,
LogMessageState.Information, PlcGwSrvI18n.bundle, "systemAction.failed")
.withException(e).value("action", runnable).value("reason", e));
}
handleFailedRunnable(runnable.toString(), e);
}
}
protected <T> T run(PrivilegedRunnableWithResult<T> runnable) {
try {
return this.plcHandler.runWithResult(runnable);
} catch (Exception e) {
handleFailedRunnable(runnable.toString(), e);
throw new IllegalStateException("Failed to execute runnable " + runnable, e);
}
}
private void handleFailedRunnable(String runnable, Exception e) {
logger.error("Runnable " + runnable + " failed!", e);
if (hasOperationsLogs()) {
getOperationsLogs().addMessage(
new LogMessage(this.container.getRealmNames().iterator().next(), SYSTEM_USER_AGENT,
Resource.locatorFor(TYPE_PLC, this.plcId), LogSeverity.Exception,
LogMessageState.Information, PlcGwSrvI18n.bundle, "systemAction.failed").withException(e)
.value("action", runnable));
}
}
/**
* Executes the given consumer in a read-only transaction
*
* @param consumer
* the consumer to run in a read-only transaction
*/
protected <T> T runReadOnlyTx(CheckedBiFunction<PrivilegeContext, StrolchTransaction, T> consumer) {
return run(ctx -> {
try (StrolchTransaction tx = openTx(ctx, getCallerMethod(), true)) {
return consumer.apply(ctx, tx);
}
});
}
/**
* <p>Executes the given consumer in a writeable transaction</p>
*
* <p><b>Note:</b> The transaction is automatically committed by calling {@link StrolchTransaction#commitOnClose()}
* when the runnable is completed and the TX is dirty, i.e. {@link StrolchTransaction#needsCommit()} returns
* true</p>
*
* @param consumer
* the consumer to run in a writeable transaction
*/
protected <T> T runWritableTx(CheckedBiFunction<PrivilegeContext, StrolchTransaction, T> consumer) {
return run(ctx -> {
try (StrolchTransaction tx = openTx(ctx, getCallerMethod(), false)) {
try {
T t = consumer.apply(ctx, tx);
if (tx.needsCommit())
tx.commitOnClose();
return t;
} catch (Exception e) {
tx.rollbackOnClose();
throw e;
}
}
});
}
protected <T> T runWithResult(PrivilegedRunnableWithResult<T> runnable) throws Exception {
return this.plcHandler.runWithResult(runnable);
}
@ -166,7 +229,8 @@ public abstract class PlcGwService implements PlcNotificationListener, PlcAddres
protected ScheduledFuture<?> scheduleAtFixedRate(PrivilegedRunnable runnable, long initialDelay, long period,
TimeUnit delayUnit) {
return this.container.getAgent().getScheduledExecutor(PlcGwService.class.getSimpleName())
return this.container.getAgent()
.getScheduledExecutor(PlcGwService.class.getSimpleName())
.scheduleAtFixedRate(() -> {
try {
this.container.getPrivilegeHandler().runAsAgent(runnable);
@ -178,7 +242,8 @@ public abstract class PlcGwService implements PlcNotificationListener, PlcAddres
protected ScheduledFuture<?> scheduleWithFixedDelay(PrivilegedRunnable runnable, long initialDelay, long period,
TimeUnit delayUnit) {
return this.container.getAgent().getScheduledExecutor(PlcGwService.class.getSimpleName())
return this.container.getAgent()
.getScheduledExecutor(PlcGwService.class.getSimpleName())
.scheduleWithFixedDelay(() -> {
try {
this.container.getPrivilegeHandler().runAsAgent(runnable);

View File

@ -39,10 +39,12 @@ public class PlcStateHandler {
private static final Logger logger = LoggerFactory.getLogger(PlcStateHandler.class);
private final PlcGwServerHandler gwServerHandler;
private final ComponentContainer container;
public PlcStateHandler(ComponentContainer container) {
this.container = container;
public PlcStateHandler(PlcGwServerHandler gwServerHandler) {
this.gwServerHandler = gwServerHandler;
this.container = gwServerHandler.getContainer();
}
protected void runAsAgent(PrivilegedRunnable runnable) throws Exception {
@ -60,15 +62,15 @@ public class PlcStateHandler {
// get the gateway and set the state
Resource plc = tx.getResourceBy(TYPE_PLC, plcSession.plcId, false);
if (plc == null)
if (plc == null) {
plc = buildNewPlc(plcSession, tx);
tx.add(plc);
}
StringParameter stateP = plc.getParameter(PARAM_CONNECTION_STATE, true);
StringParameter stateP = plc.getStringP(PARAM_CONNECTION_STATE);
if (!stateP.getValue().equals(ConnectionState.Connected.name())) {
stateP.setValue(ConnectionState.Connected.name());
StringParameter stateMsgP = plc.getParameter(PARAM_CONNECTION_STATE_MSG, true);
stateMsgP.setValue("");
plc.getStringP(PARAM_CONNECTION_STATE_MSG).clear();
tx.update(plc);
tx.commitOnClose();
}
@ -90,15 +92,16 @@ public class PlcStateHandler {
// get the gateway and set the state
Resource plc = tx.getResourceBy(TYPE_PLC, plcSession.plcId, false);
if (plc == null)
if (plc == null) {
plc = buildNewPlc(plcSession, tx);
tx.add(plc);
}
StringParameter stateP = plc.getParameter(PARAM_CONNECTION_STATE, true);
StringParameter stateP = plc.getStringP(PARAM_CONNECTION_STATE);
existingState = ConnectionState.valueOf(stateP.getValue());
if (existingState != connectionState) {
stateP.setValue(connectionState.name());
StringParameter stateMsgP = plc.getParameter(PARAM_CONNECTION_STATE_MSG, true);
stateMsgP.setValue(connectionStateMsg);
plc.setString(PARAM_CONNECTION_STATE_MSG, connectionStateMsg);
tx.update(plc);
logger.info(
@ -131,11 +134,7 @@ public class PlcStateHandler {
}
}
// trigger execution handler that we are connected
if (existingState != connectionState && connectionState == ConnectionState.Connected
&& this.container.hasComponent(ExecutionHandler.class))
this.container.getComponent(ExecutionHandler.class).triggerExecution(realm);
this.gwServerHandler.notifyConnectionState(plcSession.plcId, connectionState);
});
} catch (Exception e) {
logger.error("Failed to handle gateway connection state notification!", e);
@ -223,16 +222,21 @@ public class PlcStateHandler {
}
private Resource buildNewPlc(PlcGwServerHandler.PlcSession plcSession, StrolchTransaction tx) {
Resource plc = new ResourceBuilder(plcSession.plcId, plcSession.plcId, TYPE_PLC) //
return new ResourceBuilder(plcSession.plcId, plcSession.plcId, TYPE_PLC) //
.defaultBag() //
.string(PARAM_CONNECTION_STATE, buildParamName(PARAM_CONNECTION_STATE))
.enumeration(ConnectionState.Disconnected).end() //
.string(PARAM_CONNECTION_STATE_MSG, buildParamName(PARAM_CONNECTION_STATE_MSG)).end() //
.string(PARAM_LOCAL_IP, buildParamName(PARAM_LOCAL_IP)).end() //
.enumeration(ConnectionState.Disconnected)
.end() //
.string(PARAM_CONNECTION_STATE_MSG, buildParamName(PARAM_CONNECTION_STATE_MSG))
.end() //
.stringList(PARAM_LOCAL_IP, buildParamName(PARAM_LOCAL_IP))
.end() //
.endBag() //
.build();
tx.add(plc);
return plc;
}
private void setSystemState(JsonObject systemState, Resource gateway) {