[New] Better cache management in TX

- Added items to cache when modified
- added streaming of cached elements
This commit is contained in:
Robert von Burg 2024-02-06 07:59:28 +01:00
parent 56098c96fa
commit e33950b7a1
Signed by: eitch
GPG Key ID: 75DB9C85C74331F7
2 changed files with 151 additions and 33 deletions

View File

@ -38,11 +38,11 @@ import li.strolch.utils.objectfilter.ObjectFilterStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.MessageFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static java.text.MessageFormat.format;
import static li.strolch.agent.api.StrolchAgent.getUniqueId;
import static li.strolch.model.StrolchModelConstants.*;
import static li.strolch.model.Tags.*;
@ -515,7 +515,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
if (locator.getSize() < 3) {
String msg
= "The locator is invalid as it does not have at least three path elements (e.g. Resource/MyType/@id): {0}";
msg = MessageFormat.format(msg, locator.toString());
msg = format(msg, locator.toString());
throw new StrolchModelException(msg);
}
@ -535,7 +535,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
if (allowNull)
return null;
String msg = "No top level object could be found with locator {0}";
throw new StrolchModelException(MessageFormat.format(msg, locator));
throw new StrolchModelException(format(msg, locator));
}
if (elements.size() == 3)
@ -551,7 +551,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
if (allowNull)
return null;
String msg = "Could not find ParameterBag for locator {0} on element {1}";
throw new StrolchModelException(MessageFormat.format(msg, locator, rootElement.getLocator()));
throw new StrolchModelException(format(msg, locator, rootElement.getLocator()));
}
if (elements.size() == 5)
@ -563,7 +563,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
if (allowNull)
return null;
String msg = "Could not find Parameter for locator {0} on element {1}";
throw new StrolchModelException(MessageFormat.format(msg, locator, bag.getLocator()));
throw new StrolchModelException(format(msg, locator, bag.getLocator()));
}
return (T) parameter;
@ -571,7 +571,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
if (elements.size() != 5) {
String msg = "Missing state Id on locator {0}";
throw new StrolchModelException(MessageFormat.format(msg, locator));
throw new StrolchModelException(format(msg, locator));
}
Resource resource = rootElement.asResource();
@ -589,7 +589,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
if (!(element instanceof Activity)) {
String msg = "Invalid locator {0} with part {1} as not an Activity but deeper element specified";
throw new StrolchModelException(MessageFormat.format(msg, locator, next));
throw new StrolchModelException(format(msg, locator, next));
}
element = ((Activity) element).getElement(next);
@ -602,7 +602,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
return null;
String msg = "Invalid locator {0} with part {1}";
throw new StrolchModelException(MessageFormat.format(msg, locator, stateOrBagOrActivity));
throw new StrolchModelException(format(msg, locator, stateOrBagOrActivity));
}
@Override
@ -641,8 +641,8 @@ public abstract class AbstractTransaction implements StrolchTransaction {
break;
if (!parents.add(parent))
throw new IllegalStateException(
"circular dependencies from " + element.getLocator() + " to " + parent.getLocator() +
" on relations parameter " + parentParamKey);
format("circular dependencies from {0} to {1} on relations parameter {2}", element.getLocator(),
parent.getLocator(), parentParamKey));
t = parent.getParameter(bagKey, paramKey);
}
@ -787,7 +787,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
if (assertExists && refP.isEmpty()) {
String msg = "The Order with type \"{0}\" and id \"{1}\" does not exist for param \"{2}\"";
throw new StrolchException(MessageFormat.format(msg, refP.getUom(), refP.getValue(), refP.getLocator()));
throw new StrolchException(format(msg, refP.getUom(), refP.getValue(), refP.getLocator()));
}
if (refP.isEmpty())
@ -901,7 +901,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
if (assertExists && refP.isEmpty()) {
String msg = "The Resource with type \"{0}\" and id \"{1}\" does not exist for param \"{2}\"";
throw new StrolchException(MessageFormat.format(msg, refP.getUom(), refP.getValue(), refP.getLocator()));
throw new StrolchException(format(msg, refP.getUom(), refP.getValue(), refP.getLocator()));
}
if (refP.isEmpty())
@ -1031,7 +1031,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
if (assertExists && refP.isEmpty()) {
String msg = "The Activity with type \"{0}\" and id \"{1}\" does not exist for param \"{2}\"";
throw new StrolchException(MessageFormat.format(msg, refP.getUom(), refP.getValue(), refP.getLocator()));
throw new StrolchException(format(msg, refP.getUom(), refP.getValue(), refP.getLocator()));
}
if (refP.isEmpty())
@ -1134,6 +1134,21 @@ public abstract class AbstractTransaction implements StrolchTransaction {
this.objectFilter.removeObjectCache(locator.get(0), locator);
}
@Override
public boolean isResourceCached(String type, String id) {
return this.resourceCache.containsElement(type, id);
}
@Override
public boolean isOrderCached(String type, String id) {
return this.orderCache.containsElement(type, id);
}
@Override
public boolean isActivityCached(String type, String id) {
return this.activityCache.containsElement(type, id);
}
@Override
public Resource getCachedResource(String type, String id) {
return this.resourceCache.getElement(type, id);
@ -1149,6 +1164,66 @@ public abstract class AbstractTransaction implements StrolchTransaction {
return this.activityCache.getElement(type, id);
}
@Override
public Stream<Resource> streamCachedResources(String... types) {
if (types.length == 0)
return this.resourceCache.streamValues();
if (types.length == 1) {
String type = types[0];
if (!this.resourceCache.containsMap(type))
return Stream.empty();
return this.resourceCache.getMap(type).values().stream();
}
return this.resourceCache.streamValues().filter(element -> {
String resType = element.getType();
for (String type : types) {
if (resType.equals(type))
return true;
}
return false;
});
}
@Override
public Stream<Order> streamCachedOrders(String... types) {
if (types.length == 0)
return this.orderCache.streamValues();
if (types.length == 1) {
String type = types[0];
if (!this.orderCache.containsMap(type))
return Stream.empty();
return this.orderCache.getMap(type).values().stream();
}
return this.orderCache.streamValues().filter(element -> {
String resType = element.getType();
for (String type : types) {
if (resType.equals(type))
return true;
}
return false;
});
}
@Override
public Stream<Activity> streamCachedActivities(String... types) {
if (types.length == 0)
return this.activityCache.streamValues();
if (types.length == 1) {
String type = types[0];
if (!this.activityCache.containsMap(type))
return Stream.empty();
return this.activityCache.getMap(type).values().stream();
}
return this.activityCache.streamValues().filter(element -> {
String resType = element.getType();
for (String type : types) {
if (resType.equals(type))
return true;
}
return false;
});
}
@Override
public boolean hasResource(String type, String id) {
boolean inFilter = hasElementInFilter(Tags.RESOURCE, Resource.locatorFor(type, id));
@ -1253,6 +1328,8 @@ public abstract class AbstractTransaction implements StrolchTransaction {
lock(resource);
this.objectFilter.add(Tags.RESOURCE, resource.getLocator(), resource);
}
this.resourceCache.addElement(resource.getType(), resource.getId(), resource);
}
@Override
@ -1271,6 +1348,8 @@ public abstract class AbstractTransaction implements StrolchTransaction {
lock(order);
this.objectFilter.add(Tags.ORDER, order.getLocator(), order);
}
this.orderCache.addElement(order.getType(), order.getId(), order);
}
@Override
@ -1289,6 +1368,8 @@ public abstract class AbstractTransaction implements StrolchTransaction {
lock(activity);
this.objectFilter.add(Tags.ACTIVITY, activity.getLocator(), activity);
}
this.activityCache.addElement(activity.getType(), activity.getId(), activity);
}
@Override
@ -1298,6 +1379,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
resource.assertNotReadonly();
lock(resource);
this.objectFilter.add(Tags.RESOURCE, resource.getLocator(), resource);
this.resourceCache.addElement(resource.getType(), resource.getId(), resource);
}
@Override
@ -1307,6 +1389,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
order.assertNotReadonly();
lock(order);
this.objectFilter.add(Tags.ORDER, order.getLocator(), order);
this.orderCache.addElement(order.getType(), order.getId(), order);
}
@Override
@ -1316,6 +1399,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
activity.assertNotReadonly();
lock(activity);
this.objectFilter.add(Tags.ACTIVITY, activity.getLocator(), activity);
this.activityCache.addElement(activity.getType(), activity.getId(), activity);
}
@Override
@ -1324,6 +1408,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
DBC.PRE.assertNotNull("resource must not be null", resource);
resource.assertNotReadonly();
this.objectFilter.update(Tags.RESOURCE, resource.getLocator(), resource);
this.resourceCache.addElement(resource.getType(), resource.getId(), resource);
}
@Override
@ -1332,6 +1417,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
DBC.PRE.assertNotNull("order must not be null", order);
order.assertNotReadonly();
this.objectFilter.update(Tags.ORDER, order.getLocator(), order);
this.orderCache.addElement(order.getType(), order.getId(), order);
}
@Override
@ -1340,6 +1426,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
DBC.PRE.assertNotNull("activity must not be null", activity);
activity.assertNotReadonly();
this.objectFilter.update(Tags.ACTIVITY, activity.getLocator(), activity);
this.activityCache.addElement(activity.getType(), activity.getId(), activity);
}
@Override
@ -1383,7 +1470,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
List<Resource> changedR = this.objectFilter.getRemoved(Resource.class, Tags.RESOURCE);
if (changedR.size() == 1) {
RemoveResourceCommand cmd = new RemoveResourceCommand(this);
cmd.setResource(changedR.get(0));
cmd.setResource(changedR.getFirst());
add(cmd);
} else if (changedR.size() > 1) {
RemoveResourcesCommand cmd = new RemoveResourcesCommand(this);
@ -1395,7 +1482,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
changedR = this.objectFilter.getUpdated(Resource.class, Tags.RESOURCE);
if (changedR.size() == 1) {
UpdateResourceCommand cmd = new UpdateResourceCommand(this);
cmd.setResource(changedR.get(0));
cmd.setResource(changedR.getFirst());
add(cmd);
} else if (changedR.size() > 1) {
UpdateResourcesCommand cmd = new UpdateResourcesCommand(this);
@ -1407,7 +1494,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
changedR = this.objectFilter.getAdded(Resource.class, Tags.RESOURCE);
if (changedR.size() == 1) {
AddResourceCommand cmd = new AddResourceCommand(this);
cmd.setResource(changedR.get(0));
cmd.setResource(changedR.getFirst());
add(cmd);
} else if (changedR.size() > 1) {
AddResourcesCommand cmd = new AddResourcesCommand(this);
@ -1422,7 +1509,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
List<Order> changedO = this.objectFilter.getRemoved(Order.class, Tags.ORDER);
if (changedO.size() == 1) {
RemoveOrderCommand cmd = new RemoveOrderCommand(this);
cmd.setOrder(changedO.get(0));
cmd.setOrder(changedO.getFirst());
add(cmd);
} else if (changedO.size() > 1) {
RemoveOrdersCommand cmd = new RemoveOrdersCommand(this);
@ -1434,7 +1521,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
changedO = this.objectFilter.getUpdated(Order.class, Tags.ORDER);
if (changedO.size() == 1) {
UpdateOrderCommand cmd = new UpdateOrderCommand(this);
cmd.setOrder(changedO.get(0));
cmd.setOrder(changedO.getFirst());
add(cmd);
} else if (changedO.size() > 1) {
UpdateOrdersCommand cmd = new UpdateOrdersCommand(this);
@ -1446,7 +1533,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
changedO = this.objectFilter.getAdded(Order.class, Tags.ORDER);
if (changedO.size() == 1) {
AddOrderCommand cmd = new AddOrderCommand(this);
cmd.setOrder(changedO.get(0));
cmd.setOrder(changedO.getFirst());
add(cmd);
} else if (changedO.size() > 1) {
AddOrdersCommand cmd = new AddOrdersCommand(this);
@ -1461,7 +1548,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
List<Activity> changedA = this.objectFilter.getRemoved(Activity.class, Tags.ACTIVITY);
if (changedA.size() == 1) {
RemoveActivityCommand cmd = new RemoveActivityCommand(this);
cmd.setActivity(changedA.get(0));
cmd.setActivity(changedA.getFirst());
add(cmd);
} else if (changedA.size() > 1) {
RemoveActivitiesCommand cmd = new RemoveActivitiesCommand(this);
@ -1473,7 +1560,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
changedA = this.objectFilter.getUpdated(Activity.class, Tags.ACTIVITY);
if (changedA.size() == 1) {
UpdateActivityCommand cmd = new UpdateActivityCommand(this);
cmd.setActivity(changedA.get(0));
cmd.setActivity(changedA.getFirst());
add(cmd);
} else if (changedA.size() > 1) {
UpdateActivitiesCommand cmd = new UpdateActivitiesCommand(this);
@ -1485,7 +1572,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
changedA = this.objectFilter.getAdded(Activity.class, Tags.ACTIVITY);
if (changedA.size() == 1) {
AddActivityCommand cmd = new AddActivityCommand(this);
cmd.setActivity(changedA.get(0));
cmd.setActivity(changedA.getFirst());
add(cmd);
} else if (changedA.size() > 1) {
AddActivitiesCommand cmd = new AddActivitiesCommand(this);
@ -1531,7 +1618,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
this.closeStrategy = TransactionCloseStrategy.ROLLBACK;
String msg = "Strolch Transaction for realm {0} failed due to {1}";
msg = MessageFormat.format(msg, getRealmName(), getExceptionMessage(e));
msg = format(msg, getRealmName(), getExceptionMessage(e));
throw new StrolchTransactionException(msg, e);
}
}
@ -1595,7 +1682,7 @@ public abstract class AbstractTransaction implements StrolchTransaction {
@Override
public void autoCloseableRollback() {
long start = System.nanoTime();
logger.warn(MessageFormat.format("Rolling back TX for realm {0}...", getRealmName()));
logger.warn(format("Rolling back TX for realm {0}...", getRealmName()));
try {
this.txResult.setState(TransactionState.ROLLING_BACK);
undoCommands();
@ -1808,12 +1895,13 @@ public abstract class AbstractTransaction implements StrolchTransaction {
OperationsLog operationsLog = container.getComponent(OperationsLog.class);
operationsLog.addMessage(new LogMessage(this.realm.getRealm(), this.certificate.getUsername(),
Locator.valueOf(AGENT, "tx", this.action, getUniqueId()), LogSeverity.Exception,
LogMessageState.Information, ResourceBundle.getBundle("strolch-agent"),
"agent.tx.failed").withException(e).value("reason", e));
LogMessageState.Information, ResourceBundle.getBundle("strolch-agent"), "agent.tx.failed")
.withException(e)
.value("reason", e));
}
String msg = "Strolch Transaction for realm {0} failed due to {1}\n{2}";
msg = MessageFormat.format(msg, getRealmName(), getExceptionMessage(e), sb.toString());
msg = format(msg, getRealmName(), getExceptionMessage(e), sb.toString());
StrolchTransactionException ex = new StrolchTransactionException(msg, e);
if (throwEx)
@ -1922,10 +2010,10 @@ public abstract class AbstractTransaction implements StrolchTransaction {
if (this.auditTrail != null && !isSuppressAuditsForAudits()) {
if (this.realm.isAuditTrailEnabledForRead())
auditsForAudits(audits, AccessType.READ, Tags.AUDIT, this.auditTrail.getRead());
auditsForAudits(audits, AccessType.CREATE, Tags.AUDIT, this.auditTrail.getCreated());
auditsForAudits(audits, AccessType.UPDATE, Tags.AUDIT, this.auditTrail.getUpdated());
auditsForAudits(audits, AccessType.DELETE, Tags.AUDIT, this.auditTrail.getDeleted());
auditsForAudits(audits, AccessType.READ, this.auditTrail.getRead());
auditsForAudits(audits, AccessType.CREATE, this.auditTrail.getCreated());
auditsForAudits(audits, AccessType.UPDATE, this.auditTrail.getUpdated());
auditsForAudits(audits, AccessType.DELETE, this.auditTrail.getDeleted());
}
if (!audits.isEmpty())
@ -1940,9 +2028,9 @@ public abstract class AbstractTransaction implements StrolchTransaction {
}
}
private void auditsForAudits(List<Audit> audits, AccessType accessType, String elementType, Set<Audit> elements) {
private void auditsForAudits(List<Audit> audits, AccessType accessType, Set<Audit> elements) {
for (Audit element : elements) {
audits.add(auditFrom(accessType, elementType, StringHelper.DASH, element.getId().toString()));
audits.add(auditFrom(accessType, Tags.AUDIT, StringHelper.DASH, element.getId().toString()));
}
}

View File

@ -1571,6 +1571,21 @@ public interface StrolchTransaction extends AutoCloseable {
*/
void removeFromCache(Locator locator);
/**
* Returns true if the given resource is currently cached
*/
boolean isResourceCached(String type, String id);
/**
* Returns true if the given order is currently cached
*/
boolean isOrderCached(String type, String id);
/**
* Returns true if the given activity is currently cached
*/
boolean isActivityCached(String type, String id);
/**
* Returns the cached resource with the given type and id, or null if not yet fetched
*
@ -1607,6 +1622,21 @@ public interface StrolchTransaction extends AutoCloseable {
*/
Activity getCachedActivity(String type, String id);
/**
* Returns a stream of resources in the cache
*/
Stream<Resource> streamCachedResources(String... types);
/**
* Returns a stream of orders in the cache
*/
Stream<Order> streamCachedOrders(String... types);
/**
* Returns a stream of activities in the cache
*/
Stream<Activity> streamCachedActivities(String... types);
/**
* Returns true if the @{@link Resource} exists with the given type and ID
*