[Minor] OperationsLog start is now async
This commit is contained in:
parent
6a8f2abe8d
commit
0aa38c4e5b
|
@ -2,7 +2,10 @@ package li.strolch.handler.operationslog;
|
|||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static java.util.ResourceBundle.getBundle;
|
||||
import static li.strolch.agent.api.StrolchAgent.getUniqueId;
|
||||
import static li.strolch.model.Tags.AGENT;
|
||||
import static li.strolch.model.log.LogMessageState.Information;
|
||||
import static li.strolch.runtime.StrolchConstants.SYSTEM_USER_AGENT;
|
||||
|
||||
import java.util.*;
|
||||
|
@ -10,7 +13,6 @@ import java.util.concurrent.ExecutorService;
|
|||
import java.util.stream.Collectors;
|
||||
|
||||
import li.strolch.agent.api.ComponentContainer;
|
||||
import li.strolch.agent.api.StrolchAgent;
|
||||
import li.strolch.agent.api.StrolchComponent;
|
||||
import li.strolch.agent.api.StrolchRealm;
|
||||
import li.strolch.model.Locator;
|
||||
|
@ -47,13 +49,20 @@ public class OperationsLog extends StrolchComponent {
|
|||
|
||||
@Override
|
||||
public void start() throws Exception {
|
||||
runAsAgent(ctx -> {
|
||||
|
||||
Set<String> realmNames = getContainer().getRealmNames();
|
||||
for (String realmName : realmNames) {
|
||||
StrolchRealm realm = getContainer().getRealm(realmName);
|
||||
if (!realm.getMode().isTransient())
|
||||
this.executorService.submit(() -> loadMessages(realmName));
|
||||
}
|
||||
|
||||
// ignore for transient realms
|
||||
if (getContainer().getRealm(realmName).getMode().isTransient())
|
||||
continue;
|
||||
super.start();
|
||||
}
|
||||
|
||||
private synchronized void loadMessages(String realmName) {
|
||||
try {
|
||||
runAsAgent(ctx -> {
|
||||
|
||||
logger.info("Loading OperationsLog for realm " + realmName + "...");
|
||||
|
||||
|
@ -61,14 +70,22 @@ public class OperationsLog extends StrolchComponent {
|
|||
LogMessageDao logMessageDao = tx.getPersistenceHandler().getLogMessageDao(tx);
|
||||
List<LogMessage> messages = logMessageDao.queryLatest(realmName, this.maxMessages);
|
||||
logger.info("Loaded " + messages.size() + " messages for OperationsLog for realm " + realmName);
|
||||
this.logMessagesByRealmAndId.put(realmName, new LinkedHashSet<>(messages));
|
||||
this.logMessagesByRealmAndId.computeIfAbsent(realmName, OperationsLog::newHashSet).addAll(messages);
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("Failed to load operations log for realm " + realmName, e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
super.start();
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to load operations logs!", e);
|
||||
synchronized (this) {
|
||||
this.logMessagesByRealmAndId.computeIfAbsent(realmName, OperationsLog::newHashSet)
|
||||
.add(new LogMessage(realmName, SYSTEM_USER_AGENT,
|
||||
Locator.valueOf(AGENT, "strolch-agent", getUniqueId()), LogSeverity.Exception,
|
||||
Information, getBundle("strolch-agent"), "operationsLog.load.failed") //
|
||||
.value("reason", e.getMessage()) //
|
||||
.withException(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void setMaxMessages(int maxMessages) {
|
||||
|
@ -82,14 +99,14 @@ public class OperationsLog extends StrolchComponent {
|
|||
// store in global list
|
||||
String realmName = logMessage.getRealm();
|
||||
LinkedHashSet<LogMessage> logMessages = this.logMessagesByRealmAndId.computeIfAbsent(realmName,
|
||||
r -> new LinkedHashSet<>());
|
||||
OperationsLog::newHashSet);
|
||||
logMessages.add(logMessage);
|
||||
|
||||
// store under locator
|
||||
LinkedHashMap<Locator, LinkedHashSet<LogMessage>> logMessagesLocator = this.logMessagesByLocator.computeIfAbsent(
|
||||
realmName, this::newBoundedLocatorMap);
|
||||
LinkedHashSet<LogMessage> messages = logMessagesLocator.computeIfAbsent(logMessage.getLocator(),
|
||||
(l) -> new LinkedHashSet<>());
|
||||
OperationsLog::newHashSet);
|
||||
messages.add(logMessage);
|
||||
|
||||
// prune if necessary
|
||||
|
@ -101,7 +118,7 @@ public class OperationsLog extends StrolchComponent {
|
|||
this.executorService.submit(() -> persist(realm, logMessage, messagesToRemove));
|
||||
}
|
||||
|
||||
public void removeMessage(LogMessage message) {
|
||||
public synchronized void removeMessage(LogMessage message) {
|
||||
|
||||
String realmName = message.getRealm();
|
||||
LinkedHashMap<Locator, LinkedHashSet<LogMessage>> byLocator = this.logMessagesByLocator.get(realmName);
|
||||
|
@ -169,6 +186,9 @@ public class OperationsLog extends StrolchComponent {
|
|||
|
||||
public synchronized void updateState(String realmName, String id, LogMessageState state) {
|
||||
LinkedHashSet<LogMessage> logMessages = this.logMessagesByRealmAndId.get(realmName);
|
||||
if (logMessages == null)
|
||||
return;
|
||||
|
||||
for (LogMessage logMessage : logMessages) {
|
||||
if (logMessage.getId().equals(id)) {
|
||||
logMessage.setState(state);
|
||||
|
@ -188,9 +208,9 @@ public class OperationsLog extends StrolchComponent {
|
|||
List<LogMessage> messagesToRemove = new ArrayList<>();
|
||||
|
||||
int maxDelete = Math.max(1, (int) (this.maxMessages * 0.1));
|
||||
int nrOfExcessMsgs = logMessages.size() - this.maxMessages;
|
||||
if (nrOfExcessMsgs > 0)
|
||||
maxDelete += nrOfExcessMsgs;
|
||||
int nrOfExcessMessages = logMessages.size() - this.maxMessages;
|
||||
if (nrOfExcessMessages > 0)
|
||||
maxDelete += nrOfExcessMessages;
|
||||
|
||||
logger.info("Pruning " + maxDelete + " messages from OperationsLog...");
|
||||
Iterator<LogMessage> iterator = logMessages.iterator();
|
||||
|
@ -217,16 +237,7 @@ public class OperationsLog extends StrolchComponent {
|
|||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to persist operations logs!", e);
|
||||
synchronized (this) {
|
||||
this.logMessagesByRealmAndId.computeIfAbsent(realm.getRealm(), r -> new LinkedHashSet<>())
|
||||
.add(new LogMessage(realm.getRealm(), SYSTEM_USER_AGENT,
|
||||
Locator.valueOf(AGENT, "strolch-agent", StrolchAgent.getUniqueId()), LogSeverity.Info,
|
||||
LogMessageState.Information, ResourceBundle.getBundle("strolch-agent"),
|
||||
"operationsLog.persist.failed") //
|
||||
.value("reason", e.getMessage()) //
|
||||
.withException(e));
|
||||
}
|
||||
handleFailedPersist(realm, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -240,16 +251,7 @@ public class OperationsLog extends StrolchComponent {
|
|||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to persist operations logs!", e);
|
||||
synchronized (this) {
|
||||
this.logMessagesByRealmAndId.computeIfAbsent(realm.getRealm(), r -> new LinkedHashSet<>())
|
||||
.add(new LogMessage(realm.getRealm(), SYSTEM_USER_AGENT,
|
||||
Locator.valueOf(AGENT, "strolch-agent", StrolchAgent.getUniqueId()), LogSeverity.Info,
|
||||
LogMessageState.Information, ResourceBundle.getBundle("strolch-agent"),
|
||||
"operationsLog.persist.failed") //
|
||||
.value("reason", e.getMessage()) //
|
||||
.withException(e));
|
||||
}
|
||||
handleFailedPersist(realm, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -277,6 +279,18 @@ public class OperationsLog extends StrolchComponent {
|
|||
return new ArrayList<>(logMessages);
|
||||
}
|
||||
|
||||
private void handleFailedPersist(StrolchRealm realm, Exception e) {
|
||||
logger.error("Failed to persist operations logs!", e);
|
||||
synchronized (this) {
|
||||
this.logMessagesByRealmAndId.computeIfAbsent(realm.getRealm(), OperationsLog::newHashSet)
|
||||
.add(new LogMessage(realm.getRealm(), SYSTEM_USER_AGENT,
|
||||
Locator.valueOf(AGENT, "strolch-agent", getUniqueId()), LogSeverity.Exception, Information,
|
||||
getBundle("strolch-agent"), "operationsLog.persist.failed") //
|
||||
.value("reason", e.getMessage()) //
|
||||
.withException(e));
|
||||
}
|
||||
}
|
||||
|
||||
private LinkedHashMap<Locator, LinkedHashSet<LogMessage>> newBoundedLocatorMap(String realm) {
|
||||
return new LinkedHashMap<>() {
|
||||
@Override
|
||||
|
@ -285,4 +299,8 @@ public class OperationsLog extends StrolchComponent {
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static LinkedHashSet<LogMessage> newHashSet(Object o) {
|
||||
return new LinkedHashSet<>();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ agent.observers.update.failed=Observer handler action '{type}' failed due to: {r
|
|||
agent.service.failed=Service {service} has failed due to {reason}
|
||||
agent.service.failed.ex=Service {service} has failed due to {reason} due to: {exception}
|
||||
strolchjob.failed=Execution of Job {jobName} has failed due to {reason}
|
||||
operationsLog.load.failed=Failed to load OperationsLog due to: {reason}
|
||||
operationsLog.persist.failed=Failed to persist OperationsLog due to: {reason}
|
||||
agent.query.failed.access.denied=User {user} may not perform query {query}
|
||||
agent.search.failed.access.denied=User {user} may not perform search {search}
|
||||
|
|
|
@ -21,9 +21,13 @@ import li.strolch.persistence.api.LogMessageDao;
|
|||
import li.strolch.persistence.api.StrolchTransaction;
|
||||
import li.strolch.privilege.model.Certificate;
|
||||
import li.strolch.runtime.privilege.PrivilegeHandler;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class LogMessagesTestRunner {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(LogMessagesTestRunner.class);
|
||||
|
||||
private static final int MAX_MESSAGES = 100;
|
||||
|
||||
private final ComponentContainer container;
|
||||
|
@ -79,13 +83,16 @@ public class LogMessagesTestRunner {
|
|||
}
|
||||
|
||||
// initialize with the existing message IDs
|
||||
List<String> ids = this.operationsLog.getMessages(this.realmName).stream().map(LogMessage::getId)
|
||||
List<String> ids = this.operationsLog.getMessages(this.realmName)
|
||||
.stream()
|
||||
.map(LogMessage::getId)
|
||||
.collect(toList());
|
||||
|
||||
for (int i = 0; i < MAX_MESSAGES * 2; i++) {
|
||||
LogMessage m = new LogMessage(this.realmName, SYSTEM_USER_AGENT,
|
||||
Locator.valueOf(AGENT, "li.strolch.testbase", StrolchAgent.getUniqueId()),
|
||||
LogSeverity.Exception, LogMessageState.Information, bundle, "test-message");
|
||||
LogSeverity.Exception, LogMessageState.Information, bundle, "test-message").value("reason",
|
||||
"" + i);
|
||||
this.operationsLog.addMessage(m);
|
||||
ids.add(m.getId());
|
||||
}
|
||||
|
@ -103,10 +110,14 @@ public class LogMessagesTestRunner {
|
|||
|
||||
try (StrolchTransaction tx = realm.openTx(this.certificate, "test", true)) {
|
||||
LogMessageDao logMessageDao = tx.getPersistenceHandler().getLogMessageDao(tx);
|
||||
List<String> logMessageIds = logMessageDao.queryLatest(this.realmName, Integer.MAX_VALUE).stream()
|
||||
.map(LogMessage::getId).sorted().collect(toList());
|
||||
assertEquals(expectedSize, logMessageIds.size());
|
||||
assertEquals(ids.subList(ids.size() - expectedSize, ids.size()), logMessageIds);
|
||||
List<String> actualIds = logMessageDao.queryLatest(this.realmName, Integer.MAX_VALUE)
|
||||
.stream()
|
||||
.map(LogMessage::getId)
|
||||
.sorted()
|
||||
.collect(toList());
|
||||
assertEquals(expectedSize, actualIds.size());
|
||||
List<String> expectedIds = ids.subList(ids.size() - expectedSize, ids.size());
|
||||
assertEquals(expectedIds, actualIds);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -158,8 +169,11 @@ public class LogMessagesTestRunner {
|
|||
} else {
|
||||
try (StrolchTransaction tx = realm.openTx(this.certificate, "test", true)) {
|
||||
LogMessageDao logMessageDao = tx.getPersistenceHandler().getLogMessageDao(tx);
|
||||
List<String> logMessageIds = logMessageDao.queryLatest(this.realmName, Integer.MAX_VALUE).stream()
|
||||
.map(LogMessage::getId).sorted().collect(toList());
|
||||
List<String> logMessageIds = logMessageDao.queryLatest(this.realmName, Integer.MAX_VALUE)
|
||||
.stream()
|
||||
.map(LogMessage::getId)
|
||||
.sorted()
|
||||
.toList();
|
||||
assertFalse(logMessageIds.contains(logMessage1.getId()));
|
||||
assertFalse(logMessageIds.contains(logMessage2.getId()));
|
||||
assertFalse(logMessageIds.contains(logMessage3.getId()));
|
||||
|
|
Loading…
Reference in New Issue