From 2286752c57244384c9739b84aa0733520af52bd6 Mon Sep 17 00:00:00 2001 From: Robert von Burg Date: Mon, 15 Oct 2018 14:37:27 +0200 Subject: [PATCH] [Fix] Fixed pruning of messages in OperationsLog --- .../handler/operationslog/OperationsLog.java | 74 +++++++++++-------- .../strolch_db_schema_0.7.0_initial.sql | 4 +- .../strolch_db_schema_0.7.0_migration.sql | 4 +- .../runtime/LogMessagesTestRunner.java | 17 +++-- 4 files changed, 55 insertions(+), 44 deletions(-) diff --git a/li.strolch.agent/src/main/java/li/strolch/handler/operationslog/OperationsLog.java b/li.strolch.agent/src/main/java/li/strolch/handler/operationslog/OperationsLog.java index d92814b8a..0b5b314ef 100644 --- a/li.strolch.agent/src/main/java/li/strolch/handler/operationslog/OperationsLog.java +++ b/li.strolch.agent/src/main/java/li/strolch/handler/operationslog/OperationsLog.java @@ -5,6 +5,7 @@ import java.util.concurrent.ExecutorService; import li.strolch.agent.api.ComponentContainer; import li.strolch.agent.api.StrolchComponent; +import li.strolch.agent.api.StrolchRealm; import li.strolch.model.Locator; import li.strolch.persistence.api.LogMessageDao; import li.strolch.persistence.api.StrolchTransaction; @@ -29,7 +30,7 @@ public class OperationsLog extends StrolchComponent { this.logMessagesByRealmAndId = new HashMap<>(); this.logMessagesByLocator = new HashMap<>(); - this.executorService = getExecutorService("OperationsLog"); + this.executorService = getSingleThreadExecutor("OperationsLog"); super.initialize(configuration); } @@ -65,48 +66,57 @@ public class OperationsLog extends StrolchComponent { public synchronized void addMessage(LogMessage logMessage) { // store in global list - List logMessages = this.logMessagesByRealmAndId - .computeIfAbsent(logMessage.getRealm(), r -> new ArrayList<>()); + String realmName = logMessage.getRealm(); + List logMessages = this.logMessagesByRealmAndId.computeIfAbsent(realmName, r -> new ArrayList<>()); logMessages.add(logMessage); // store under locator LinkedHashMap> logMessagesLocator = this.logMessagesByLocator - .computeIfAbsent(logMessage.getRealm(), this::newBoundedLocatorMap); + .computeIfAbsent(realmName, this::newBoundedLocatorMap); LinkedHashSet messages = logMessagesLocator .computeIfAbsent(logMessage.getLocator(), (l) -> new LinkedHashSet<>()); messages.add(logMessage); - this.executorService.submit(() -> this.persistAndPrune(logMessages, logMessage)); + // prune if necessary + List messagesToRemove = pruneMessages(logMessages); + + // persist changes for non-transient realms + StrolchRealm realm = getContainer().getRealm(realmName); + if (!realm.getMode().isTransient()) + this.executorService.submit(() -> this.persist(realm, logMessage, messagesToRemove)); } - private void persistAndPrune(List logMessages, LogMessage logMessage) { + private List pruneMessages(List logMessages) { + if (logMessages.size() < this.maxMessages) + return Collections.emptyList(); + + List messagesToRemove = new ArrayList<>(); + + int maxDelete = Math.max(1, (int) (this.maxMessages * 0.1)); + int nrOfExcessMsgs = logMessages.size() - this.maxMessages; + if (nrOfExcessMsgs > 0) + maxDelete += nrOfExcessMsgs; + + logger.info("Pruning " + maxDelete + " messages from OperationsLog..."); + Iterator iterator = logMessages.iterator(); + while (maxDelete > 0 && iterator.hasNext()) { + LogMessage messageToRemove = iterator.next(); + messagesToRemove.add(messageToRemove); + iterator.remove(); + maxDelete--; + } + + return messagesToRemove; + } + + private void persist(StrolchRealm realm, LogMessage logMessage, List messagesToRemove) { runAsAgent(ctx -> { - - List messagesToRemove = null; - synchronized (this) { - if (logMessages.size() > this.maxMessages) { - messagesToRemove = new ArrayList<>(); - int maxDelete = Math.max(1, (int) (this.maxMessages * 0.1)); - logger.info("Pruning " + maxDelete + " messages from OperationsLog..."); - Iterator iterator = logMessages.iterator(); - while (maxDelete > 0 && iterator.hasNext()) { - LogMessage messageToRemove = iterator.next(); - messagesToRemove.add(messageToRemove); - iterator.remove(); - maxDelete--; - } - } - - // only for persisted realms - if (!getContainer().getRealm(logMessage.getRealm()).getMode().isTransient()) { - try (StrolchTransaction tx = openTx(logMessage.getRealm(), ctx.getCertificate())) { - LogMessageDao logMessageDao = tx.getPersistenceHandler().getLogMessageDao(tx); - if (messagesToRemove != null && !messagesToRemove.isEmpty()) - logMessageDao.removeAll(messagesToRemove); - logMessageDao.save(logMessage); - tx.commitOnClose(); - } - } + try (StrolchTransaction tx = realm.openTx(ctx.getCertificate(), getClass())) { + LogMessageDao logMessageDao = tx.getPersistenceHandler().getLogMessageDao(tx); + if (messagesToRemove != null && !messagesToRemove.isEmpty()) + logMessageDao.removeAll(messagesToRemove); + logMessageDao.save(logMessage); + tx.commitOnClose(); } }); } diff --git a/li.strolch.persistence.postgresql/src/main/resources/strolch_db_schema_0.7.0_initial.sql b/li.strolch.persistence.postgresql/src/main/resources/strolch_db_schema_0.7.0_initial.sql index 64a266b24..ba9e116e1 100644 --- a/li.strolch.persistence.postgresql/src/main/resources/strolch_db_schema_0.7.0_initial.sql +++ b/li.strolch.persistence.postgresql/src/main/resources/strolch_db_schema_0.7.0_initial.sql @@ -96,8 +96,8 @@ CREATE TABLE IF NOT EXISTS operations_log ( ); CREATE TABLE IF NOT EXISTS operations_log_values ( - id varchar(255) PRIMARY KEY, - key VARCHAR(255), + id varchar(255), + key varchar(255), value text ); diff --git a/li.strolch.persistence.postgresql/src/main/resources/strolch_db_schema_0.7.0_migration.sql b/li.strolch.persistence.postgresql/src/main/resources/strolch_db_schema_0.7.0_migration.sql index 5ea02a5f0..925edbb67 100644 --- a/li.strolch.persistence.postgresql/src/main/resources/strolch_db_schema_0.7.0_migration.sql +++ b/li.strolch.persistence.postgresql/src/main/resources/strolch_db_schema_0.7.0_migration.sql @@ -14,8 +14,8 @@ CREATE TABLE IF NOT EXISTS operations_log ( ); CREATE TABLE IF NOT EXISTS operations_log_values ( - id varchar(255) PRIMARY KEY, - key VARCHAR(255), + id varchar(255), + key varchar(255), value text ); diff --git a/li.strolch.testbase/src/main/java/li/strolch/testbase/runtime/LogMessagesTestRunner.java b/li.strolch.testbase/src/main/java/li/strolch/testbase/runtime/LogMessagesTestRunner.java index b3e32af97..c2a4b6198 100644 --- a/li.strolch.testbase/src/main/java/li/strolch/testbase/runtime/LogMessagesTestRunner.java +++ b/li.strolch.testbase/src/main/java/li/strolch/testbase/runtime/LogMessagesTestRunner.java @@ -5,7 +5,6 @@ import static li.strolch.model.Tags.AGENT; import static li.strolch.runtime.StrolchConstants.SYSTEM_USER_AGENT; import static org.junit.Assert.assertEquals; -import java.util.ArrayList; import java.util.List; import java.util.ResourceBundle; @@ -56,12 +55,12 @@ public class LogMessagesTestRunner { if (realm.getMode().isTransient()) { List messages = this.operationsLog.getMessages(realmName); - assertEquals(1, messages.size()); + assertEquals(2, messages.size()); } else { try (StrolchTransaction tx = realm.openTx(this.certificate, "test");) { LogMessageDao logMessageDao = tx.getPersistenceHandler().getLogMessageDao(tx); List logMessages = logMessageDao.queryLatest(this.realmName, Integer.MAX_VALUE); - assertEquals(1, logMessages.size()); + assertEquals(2, logMessages.size()); LogMessage m = logMessages.get(0); assertEquals(logMessage.getId(), m.getId()); @@ -77,8 +76,10 @@ public class LogMessagesTestRunner { } } - List ids = new ArrayList<>(); - ids.add(logMessage.getId()); + // initialize with the existing message IDs + List ids = this.operationsLog.getMessages(this.realmName).stream().map(LogMessage::getId) + .collect(toList()); + for (int i = 0; i < MAX_MESSAGES * 2; i++) { LogMessage m = new LogMessage(this.realmName, SYSTEM_USER_AGENT, Locator.valueOf(AGENT, "li.strolch.testbase", StrolchAgent.getUniqueId()), @@ -88,13 +89,13 @@ public class LogMessagesTestRunner { } // default is async persisting... - Thread.sleep(100L); + Thread.sleep(1000L); int trimSize = (int) (MAX_MESSAGES * 0.1); - int expectedSize = MAX_MESSAGES - trimSize + 1; + int expectedSize = MAX_MESSAGES - trimSize + 2; // +2 => startup and first message if (realm.getMode().isTransient()) { - List messages = this.operationsLog.getMessages(realmName); + List messages = this.operationsLog.getMessages(this.realmName); assertEquals(expectedSize, messages.size()); } else {