[Major] Refactored DefaultPlcHandler

- queueing state updates, etc.
- don't do inversion of boolean values (already done before in Plc)
This commit is contained in:
Robert von Burg 2022-05-05 13:17:02 +02:00
parent b3679caf58
commit 5dfbb59967
Signed by: eitch
GPG Key ID: 75DB9C85C74331F7
1 changed files with 119 additions and 131 deletions

View File

@ -10,22 +10,22 @@ import static li.strolch.utils.helper.StringHelper.formatNanoDuration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.Consumer;
import li.strolch.agent.api.ComponentContainer;
import li.strolch.agent.api.StrolchComponent;
import li.strolch.model.log.LogMessage;
import li.strolch.model.Locator;
import li.strolch.model.Resource;
import li.strolch.model.StrolchValueType;
import li.strolch.model.log.LogMessage;
import li.strolch.model.parameter.Parameter;
import li.strolch.model.parameter.StringParameter;
import li.strolch.model.visitor.SetParameterValueVisitor;
import li.strolch.persistence.api.StrolchTransaction;
import li.strolch.plc.core.hw.*;
import li.strolch.plc.core.hw.gpio.PlcGpioController;
import li.strolch.plc.model.ConnectionState;
import li.strolch.plc.model.PlcAddress;
import li.strolch.plc.model.PlcAddressType;
import li.strolch.plc.model.PlcState;
@ -38,6 +38,8 @@ import li.strolch.utils.dbc.DBC;
public class DefaultPlcHandler extends StrolchComponent implements PlcHandler, PlcConnectionStateChangeListener {
public static final int SILENT_THRESHOLD = 60;
private static final int MAX_MESSAGE_QUEUE = 200;
private PrivilegeContext ctx;
private String plcId;
private Plc plc;
@ -49,12 +51,13 @@ public class DefaultPlcHandler extends StrolchComponent implements PlcHandler, P
private GlobalPlcListener globalListener;
private LinkedBlockingDeque<MessageTask> messageQueue;
private int maxMessageQueue;
private LinkedBlockingDeque<Runnable> updateStateQueue;
private LinkedBlockingDeque<Consumer<GlobalPlcListener>> messageQueue;
private boolean run;
private Future<?> messageSenderTask;
private Future<?> updateStateTask;
private boolean asyncAddressUpdate;
private boolean verbose;
public DefaultPlcHandler(ComponentContainer container, String componentName) {
@ -122,11 +125,10 @@ public class DefaultPlcHandler extends StrolchComponent implements PlcHandler, P
this.plcAddresses = new MapOfMaps<>();
this.plcTelegrams = new MapOfMaps<>();
this.addressesToResourceId = new HashMap<>();
this.asyncAddressUpdate = configuration.getBoolean("asyncAddressUpdate", false);
this.verbose = configuration.getBoolean("verbose", false);
this.maxMessageQueue = configuration.getInt("maxMessageQueue", 100);
this.messageQueue = new LinkedBlockingDeque<>();
this.updateStateQueue = new LinkedBlockingDeque<>();
super.initialize(configuration);
}
@ -136,7 +138,8 @@ public class DefaultPlcHandler extends StrolchComponent implements PlcHandler, P
this.ctx = getContainer().getPrivilegeHandler().openAgentSystemUserContext();
this.run = true;
this.messageSenderTask = getExecutorService("LogSender").submit(this::sendMessages);
this.messageSenderTask = getSingleThreadExecutor("LogSender").submit(this::sendMessages);
this.updateStateTask = getSingleThreadExecutor("UpdateState").submit(this::updateStates);
if (reconfigurePlc())
startPlc();
@ -152,6 +155,8 @@ public class DefaultPlcHandler extends StrolchComponent implements PlcHandler, P
this.run = false;
if (this.messageSenderTask != null)
this.messageSenderTask.cancel(true);
if (this.updateStateTask != null)
this.updateStateTask.cancel(true);
super.stop();
}
@ -218,18 +223,6 @@ public class DefaultPlcHandler extends StrolchComponent implements PlcHandler, P
}
}
private void updateConnectionState(StrolchTransaction tx, Resource connection, PlcConnection plcConnection) {
StringParameter stateP = connection.getParameter(BAG_PARAMETERS, PARAM_STATE, true);
StringParameter stateMsgP = connection.getParameter(BAG_PARAMETERS, PARAM_STATE_MSG, true);
logger.info("State for PlcConnection {} has changed from {} to {}", connection.getId(), stateP.getValue(),
plcConnection.getState().name());
stateP.setValue(plcConnection.getState().name());
stateMsgP.setValue(plcConnection.getStateMsg());
tx.update(connection);
}
private Plc configure(PrivilegeContext ctx, MapOfMaps<String, String, PlcAddress> plcAddresses,
MapOfMaps<String, String, PlcAddress> plcTelegrams, Map<PlcAddress, String> addressesToResourceId)
throws Exception {
@ -241,7 +234,13 @@ public class DefaultPlcHandler extends StrolchComponent implements PlcHandler, P
plc = PlcConfigurator.configurePlc(tx, plcClassName, plcAddresses, plcTelegrams, addressesToResourceId);
plc.setConnectionStateChangeListener(this);
plcAddresses.values().forEach(plcAddress -> plc.register(plcAddress, this::updateState));
plcAddresses.values().forEach(plcAddress -> plc.register(plcAddress, this::queueUpdateState));
if (tx.getConfiguration().hasParameter(PARAM_VERBOSE)) {
boolean verboseOverride = tx.getConfiguration().getBoolean(PARAM_VERBOSE);
logger.info("Overriding XML verbose property from configuration resource to " + verboseOverride);
this.verbose = verboseOverride;
}
if (tx.needsCommit())
tx.commitOnClose();
@ -250,77 +249,6 @@ public class DefaultPlcHandler extends StrolchComponent implements PlcHandler, P
return plc;
}
private void updateState(PlcAddress address, Object value) {
if (this.asyncAddressUpdate) {
ExecutorService service = getExecutorService("PlcAddressUpdater");
if (!service.isShutdown())
service.submit(() -> updatePlcAddress(address, value));
} else
updatePlcAddress(address, value);
}
private void asyncUpdateState(PlcConnection connection) {
ExecutorService service = getExecutorService("PlcConnectionUpdater");
if (!service.isShutdown())
service.submit(() -> updateConnectionState(connection));
}
private void updatePlcAddress(PlcAddress address, Object value) {
long s = 0L;
if (this.verbose)
s = nanoTime();
String addressId = this.addressesToResourceId.get(address);
if (addressId == null) {
logger.error("No PlcAddress mapping for " + address);
return;
}
try (StrolchTransaction tx = openTx(validateCtx().getCertificate(), getCallerMethod(), false).silentThreshold(
SILENT_THRESHOLD, MILLISECONDS)) {
tx.lock(Resource.locatorFor(TYPE_PLC_ADDRESS, addressId));
Resource addressRes = tx.getResourceBy(TYPE_PLC_ADDRESS, addressId, true);
// see if we need to invert a boolean flag
if (address.valueType == StrolchValueType.BOOLEAN && address.inverted)
value = !((boolean) value);
Parameter<?> valueP = addressRes.getParameter(PARAM_VALUE, true);
if (this.verbose)
logger.info("PlcAddress {}-{} has changed from {} to {}", address.resource, address.action,
valueP.getValue(), value);
valueP.accept(new SetParameterValueVisitor(value));
tx.update(addressRes);
tx.commitOnClose();
} catch (Exception e) {
logger.error("Failed to update PlcAddress " + addressId + " with new value " + value, e);
}
if (this.verbose)
logger.info("async update " + address.address + " took " + (formatNanoDuration(nanoTime() - s)));
}
private void updateConnectionState(PlcConnection plcConnection) {
long s = 0L;
if (this.verbose)
s = nanoTime();
try (StrolchTransaction tx = openTx(validateCtx().getCertificate(), getCallerMethod(), false).silentThreshold(
SILENT_THRESHOLD, MILLISECONDS)) {
tx.lock(Resource.locatorFor(TYPE_PLC_CONNECTION, plcConnection.getId()));
Resource connection = tx.getResourceBy(TYPE_PLC_CONNECTION, plcConnection.getId());
updateConnectionState(tx, connection, plcConnection);
tx.update(connection);
tx.commitOnClose();
} catch (Exception e) {
logger.error("Failed to update state for connection " + plcConnection.getId(), e);
}
if (this.verbose)
logger.info("updateConnectionState took " + (formatNanoDuration(nanoTime() - s)));
}
private PrivilegeContext validateCtx() {
if (this.ctx == null) {
this.ctx = getContainer().getPrivilegeHandler().openAgentSystemUserContext();
@ -361,29 +289,37 @@ public class DefaultPlcHandler extends StrolchComponent implements PlcHandler, P
}
}
private void queueUpdateState(PlcAddress plcAddress, Object o) {
this.updateStateQueue.add(() -> updatePlcAddress(plcAddress, o));
}
private void queueUpdateState(PlcConnection connection) {
this.updateStateQueue.add(
() -> updateConnectionState(connection.getId(), connection.getState(), connection.getStateMsg()));
}
@Override
public void sendMsg(LogMessage message) {
addMsg(new LogMessageTask(message));
addMsg(listener -> listener.sendMsg(message));
}
@Override
public void disableMsg(Locator locator) {
addMsg(new DisableMessageTask(locator));
addMsg(listener -> listener.disableMsg(locator));
}
private synchronized void addMsg(MessageTask task) {
if (this.messageQueue.size() > this.maxMessageQueue)
private synchronized void addMsg(Consumer<GlobalPlcListener> consumer) {
if (this.messageQueue.size() > MAX_MESSAGE_QUEUE)
this.messageQueue.removeFirst();
this.messageQueue.addLast(task);
this.messageQueue.addLast(consumer);
}
private void sendMessages() {
while (this.run) {
try {
if (this.globalListener == null) {
while (this.globalListener == null) {
Thread.sleep(100L);
continue;
}
this.messageQueue.takeFirst().accept(this.globalListener);
@ -395,6 +331,86 @@ public class DefaultPlcHandler extends StrolchComponent implements PlcHandler, P
}
}
private void updateStates() {
logger.info("Update State Handler running...");
while (this.run) {
try {
this.updateStateQueue.take().run();
} catch (InterruptedException e) {
logger.error("Interrupted!");
} catch (Exception e) {
logger.error("Failed to perform state update", e);
}
}
logger.info("Update State Handler stopped.");
}
private void updatePlcAddress(PlcAddress address, Object value) {
long s = 0L;
if (this.verbose)
s = nanoTime();
String addressId = this.addressesToResourceId.get(address);
if (addressId == null) {
logger.error("No PlcAddress mapping for " + address);
return;
}
Certificate cert = validateCtx().getCertificate();
try (StrolchTransaction tx = openTx(cert, getCallerMethod(), false).silentThreshold(SILENT_THRESHOLD,
MILLISECONDS)) {
tx.lock(Resource.locatorFor(TYPE_PLC_ADDRESS, addressId));
Resource addressRes = tx.getResourceBy(TYPE_PLC_ADDRESS, addressId, true);
Parameter<?> valueP = addressRes.getParameter(PARAM_VALUE, true);
if (valueP.getValue().equals(value)) {
if (this.verbose)
logger.info("Ignoring PlcAddress {} unchanged value {}", address.toKey(), value);
return;
}
if (this.verbose)
logger.info("PlcAddress {} has changed from {} to {}", address.toKey(), valueP.getValue(), value);
valueP.accept(new SetParameterValueVisitor(value));
tx.update(addressRes);
tx.commitOnClose();
} catch (Exception e) {
logger.error("Failed to update PlcAddress " + addressId + " with new value " + value, e);
}
if (this.verbose)
logger.info("async update " + address.toKey() + " took " + (formatNanoDuration(nanoTime() - s)));
}
private void updateConnectionState(String id, ConnectionState state, String stateMsg) {
long s = 0L;
if (this.verbose)
s = nanoTime();
try (StrolchTransaction tx = openTx(validateCtx().getCertificate(), getCallerMethod(), false).silentThreshold(
SILENT_THRESHOLD, MILLISECONDS)) {
tx.lock(Resource.locatorFor(TYPE_PLC_CONNECTION, id));
Resource connection = tx.getResourceBy(TYPE_PLC_CONNECTION, id);
StringParameter stateP = connection.getParameter(BAG_PARAMETERS, PARAM_STATE, true);
StringParameter stateMsgP = connection.getParameter(BAG_PARAMETERS, PARAM_STATE_MSG, true);
logger.info("State for PlcConnection {} has changed from {} to {}", connection.getId(), stateP.getValue(),
state.name());
stateP.setValue(state.name());
stateMsgP.setValue(stateMsg);
tx.update(connection);
tx.commitOnClose();
} catch (Exception e) {
logger.error("Failed to update state for connection " + id, e);
}
if (this.verbose)
logger.info("updateConnectionState took " + (formatNanoDuration(nanoTime() - s)));
}
@Override
public void send(String resource, String action) {
send(resource, action, true, true);
@ -405,6 +421,7 @@ public class DefaultPlcHandler extends StrolchComponent implements PlcHandler, P
send(resource, action, value, true, true);
}
@Override
public void send(String resource, String action, boolean catchExceptions, boolean notifyGlobalListener) {
PlcAddress plcAddress = this.plcTelegrams.getElement(resource, action);
if (plcAddress == null)
@ -416,6 +433,7 @@ public class DefaultPlcHandler extends StrolchComponent implements PlcHandler, P
this.plc.send(plcAddress, catchExceptions, notifyGlobalListener);
}
@Override
public void send(String resource, String action, Object value, boolean catchExceptions,
boolean notifyGlobalListener) {
PlcAddress plcAddress = this.plcTelegrams.getElement(resource, action);
@ -445,36 +463,6 @@ public class DefaultPlcHandler extends StrolchComponent implements PlcHandler, P
@Override
public void notifyStateChange(PlcConnection connection) {
asyncUpdateState(connection);
}
private interface MessageTask {
void accept(GlobalPlcListener listener);
}
private static class LogMessageTask implements MessageTask {
private LogMessage message;
private LogMessageTask(LogMessage message) {
this.message = message;
}
@Override
public void accept(GlobalPlcListener listener) {
listener.sendMsg(message);
}
}
private static class DisableMessageTask implements MessageTask {
private Locator locator;
private DisableMessageTask(Locator locator) {
this.locator = locator;
}
@Override
public void accept(GlobalPlcListener listener) {
listener.disableMsg(locator);
}
queueUpdateState(connection);
}
}