[Fix] Fixed pruning of messages in OperationsLog

This commit is contained in:
Robert von Burg 2018-10-15 14:37:27 +02:00
parent cee1ac4f82
commit 2286752c57
4 changed files with 55 additions and 44 deletions

View File

@ -5,6 +5,7 @@ import java.util.concurrent.ExecutorService;
import li.strolch.agent.api.ComponentContainer;
import li.strolch.agent.api.StrolchComponent;
import li.strolch.agent.api.StrolchRealm;
import li.strolch.model.Locator;
import li.strolch.persistence.api.LogMessageDao;
import li.strolch.persistence.api.StrolchTransaction;
@ -29,7 +30,7 @@ public class OperationsLog extends StrolchComponent {
this.logMessagesByRealmAndId = new HashMap<>();
this.logMessagesByLocator = new HashMap<>();
this.executorService = getExecutorService("OperationsLog");
this.executorService = getSingleThreadExecutor("OperationsLog");
super.initialize(configuration);
}
@ -65,48 +66,57 @@ public class OperationsLog extends StrolchComponent {
public synchronized void addMessage(LogMessage logMessage) {
// store in global list
List<LogMessage> logMessages = this.logMessagesByRealmAndId
.computeIfAbsent(logMessage.getRealm(), r -> new ArrayList<>());
String realmName = logMessage.getRealm();
List<LogMessage> logMessages = this.logMessagesByRealmAndId.computeIfAbsent(realmName, r -> new ArrayList<>());
logMessages.add(logMessage);
// store under locator
LinkedHashMap<Locator, LinkedHashSet<LogMessage>> logMessagesLocator = this.logMessagesByLocator
.computeIfAbsent(logMessage.getRealm(), this::newBoundedLocatorMap);
.computeIfAbsent(realmName, this::newBoundedLocatorMap);
LinkedHashSet<LogMessage> messages = logMessagesLocator
.computeIfAbsent(logMessage.getLocator(), (l) -> new LinkedHashSet<>());
messages.add(logMessage);
this.executorService.submit(() -> this.persistAndPrune(logMessages, logMessage));
// prune if necessary
List<LogMessage> messagesToRemove = pruneMessages(logMessages);
// persist changes for non-transient realms
StrolchRealm realm = getContainer().getRealm(realmName);
if (!realm.getMode().isTransient())
this.executorService.submit(() -> this.persist(realm, logMessage, messagesToRemove));
}
private void persistAndPrune(List<LogMessage> logMessages, LogMessage logMessage) {
private List<LogMessage> pruneMessages(List<LogMessage> logMessages) {
if (logMessages.size() < this.maxMessages)
return Collections.emptyList();
List<LogMessage> messagesToRemove = new ArrayList<>();
int maxDelete = Math.max(1, (int) (this.maxMessages * 0.1));
int nrOfExcessMsgs = logMessages.size() - this.maxMessages;
if (nrOfExcessMsgs > 0)
maxDelete += nrOfExcessMsgs;
logger.info("Pruning " + maxDelete + " messages from OperationsLog...");
Iterator<LogMessage> iterator = logMessages.iterator();
while (maxDelete > 0 && iterator.hasNext()) {
LogMessage messageToRemove = iterator.next();
messagesToRemove.add(messageToRemove);
iterator.remove();
maxDelete--;
}
return messagesToRemove;
}
private void persist(StrolchRealm realm, LogMessage logMessage, List<LogMessage> messagesToRemove) {
runAsAgent(ctx -> {
List<LogMessage> messagesToRemove = null;
synchronized (this) {
if (logMessages.size() > this.maxMessages) {
messagesToRemove = new ArrayList<>();
int maxDelete = Math.max(1, (int) (this.maxMessages * 0.1));
logger.info("Pruning " + maxDelete + " messages from OperationsLog...");
Iterator<LogMessage> iterator = logMessages.iterator();
while (maxDelete > 0 && iterator.hasNext()) {
LogMessage messageToRemove = iterator.next();
messagesToRemove.add(messageToRemove);
iterator.remove();
maxDelete--;
}
}
// only for persisted realms
if (!getContainer().getRealm(logMessage.getRealm()).getMode().isTransient()) {
try (StrolchTransaction tx = openTx(logMessage.getRealm(), ctx.getCertificate())) {
LogMessageDao logMessageDao = tx.getPersistenceHandler().getLogMessageDao(tx);
if (messagesToRemove != null && !messagesToRemove.isEmpty())
logMessageDao.removeAll(messagesToRemove);
logMessageDao.save(logMessage);
tx.commitOnClose();
}
}
try (StrolchTransaction tx = realm.openTx(ctx.getCertificate(), getClass())) {
LogMessageDao logMessageDao = tx.getPersistenceHandler().getLogMessageDao(tx);
if (messagesToRemove != null && !messagesToRemove.isEmpty())
logMessageDao.removeAll(messagesToRemove);
logMessageDao.save(logMessage);
tx.commitOnClose();
}
});
}

View File

@ -96,8 +96,8 @@ CREATE TABLE IF NOT EXISTS operations_log (
);
CREATE TABLE IF NOT EXISTS operations_log_values (
id varchar(255) PRIMARY KEY,
key VARCHAR(255),
id varchar(255),
key varchar(255),
value text
);

View File

@ -14,8 +14,8 @@ CREATE TABLE IF NOT EXISTS operations_log (
);
CREATE TABLE IF NOT EXISTS operations_log_values (
id varchar(255) PRIMARY KEY,
key VARCHAR(255),
id varchar(255),
key varchar(255),
value text
);

View File

@ -5,7 +5,6 @@ import static li.strolch.model.Tags.AGENT;
import static li.strolch.runtime.StrolchConstants.SYSTEM_USER_AGENT;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
import java.util.ResourceBundle;
@ -56,12 +55,12 @@ public class LogMessagesTestRunner {
if (realm.getMode().isTransient()) {
List<LogMessage> messages = this.operationsLog.getMessages(realmName);
assertEquals(1, messages.size());
assertEquals(2, messages.size());
} else {
try (StrolchTransaction tx = realm.openTx(this.certificate, "test");) {
LogMessageDao logMessageDao = tx.getPersistenceHandler().getLogMessageDao(tx);
List<LogMessage> logMessages = logMessageDao.queryLatest(this.realmName, Integer.MAX_VALUE);
assertEquals(1, logMessages.size());
assertEquals(2, logMessages.size());
LogMessage m = logMessages.get(0);
assertEquals(logMessage.getId(), m.getId());
@ -77,8 +76,10 @@ public class LogMessagesTestRunner {
}
}
List<String> ids = new ArrayList<>();
ids.add(logMessage.getId());
// initialize with the existing message IDs
List<String> ids = this.operationsLog.getMessages(this.realmName).stream().map(LogMessage::getId)
.collect(toList());
for (int i = 0; i < MAX_MESSAGES * 2; i++) {
LogMessage m = new LogMessage(this.realmName, SYSTEM_USER_AGENT,
Locator.valueOf(AGENT, "li.strolch.testbase", StrolchAgent.getUniqueId()),
@ -88,13 +89,13 @@ public class LogMessagesTestRunner {
}
// default is async persisting...
Thread.sleep(100L);
Thread.sleep(1000L);
int trimSize = (int) (MAX_MESSAGES * 0.1);
int expectedSize = MAX_MESSAGES - trimSize + 1;
int expectedSize = MAX_MESSAGES - trimSize + 2; // +2 => startup and first message
if (realm.getMode().isTransient()) {
List<LogMessage> messages = this.operationsLog.getMessages(realmName);
List<LogMessage> messages = this.operationsLog.getMessages(this.realmName);
assertEquals(expectedSize, messages.size());
} else {