[New] Implemented async parallel loading from pageable persistence layers

This commit is contained in:
Robert von Burg 2022-11-21 13:26:13 +01:00
parent 1eef4b8746
commit 7116f05bc2
Signed by: eitch
GPG Key ID: 75DB9C85C74331F7
1 changed files with 37 additions and 47 deletions

View File

@ -11,7 +11,9 @@ import java.util.function.Function;
import java.util.function.Supplier;
import li.strolch.model.StrolchRootElement;
import li.strolch.persistence.api.*;
import li.strolch.persistence.api.PersistenceHandler;
import li.strolch.persistence.api.StrolchDao;
import li.strolch.persistence.api.StrolchTransaction;
import li.strolch.privilege.model.Certificate;
import li.strolch.privilege.model.PrivilegeContext;
import li.strolch.utils.helper.StringHelper;
@ -21,7 +23,6 @@ import org.slf4j.LoggerFactory;
public class CachedRealmLoader {
private static final Logger logger = LoggerFactory.getLogger(CachedRealmLoader.class);
public static final int PAGE_SIZE = 100;
private final CachedRealm realm;
private final PersistenceHandler persistenceHandler;
@ -48,9 +49,10 @@ public class CachedRealmLoader {
if (this.persistenceHandler.supportsPaging()) {
loadPagingAsync();
} else {
loadResources();
loadOrders();
loadActivities();
load("Resources", this.persistenceHandler::getResourceDao, this.realm::getResourceMap, this.nrOfResources);
load("Orders", this.persistenceHandler::getOrderDao, this.realm::getOrderMap, this.nrOfOrders);
load("Activities", this.persistenceHandler::getActivityDao, this.realm::getActivityMap,
this.nrOfActivities);
}
long duration = System.nanoTime() - start;
@ -61,49 +63,34 @@ public class CachedRealmLoader {
logger.info(MessageFormat.format("Loaded {0} Activities", this.nrOfActivities));
}
private void loadActivities() {
try (StrolchTransaction tx = this.realm.openTx(getCert(), "strolch_boot", false)) {
ActivityDao activityDao = this.persistenceHandler.getActivityDao(tx);
load("Activities", this.nrOfActivities, activityDao, this.realm.getActivityMap());
private <T extends StrolchRootElement> void load(String context,
Function<StrolchTransaction, StrolchDao<T>> daoSupplier, Supplier<CachedElementMap<T>> elementMapSupplier,
AtomicInteger counter) {
long start = System.nanoTime();
long nrOfElements;
CachedElementMap<T> elementMap = elementMapSupplier.get();
try (StrolchTransaction tx = this.realm.openTx(getCert(), "strolch_boot_" + context, false)) {
StrolchDao<T> dao = daoSupplier.apply(tx);
nrOfElements = dao.querySize();
logger.info("Loading " + nrOfElements + " " + context + " from DB...");
Set<String> types = dao.queryTypes();
for (String type : types) {
long sizeOfType = dao.querySize(type);
logger.info("Loading " + sizeOfType + " " + context + " of type " + type + " from DB...");
List<T> elements = dao.queryAll(type);
elements.forEach(elementMap::insert);
counter.addAndGet(elements.size());
}
tx.commitOnClose();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException("Failed to load activities!", e);
}
}
private void loadOrders() {
try (StrolchTransaction tx = this.realm.openTx(getCert(), "strolch_boot", false)) {
OrderDao orderDao = this.persistenceHandler.getOrderDao(tx);
load("Orders", this.nrOfOrders, orderDao, this.realm.getOrderMap());
tx.commitOnClose();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException("Failed to load orders!", e);
}
}
private void loadResources() {
try (StrolchTransaction tx = this.realm.openTx(getCert(), "strolch_boot", false)) {
ResourceDao resourceDao = this.persistenceHandler.getResourceDao(tx);
load("Resources", this.nrOfResources, resourceDao, this.realm.getResourceMap());
tx.commitOnClose();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException("Failed to load resources!", e);
}
}
private <T extends StrolchRootElement> void load(String context, AtomicInteger counter, StrolchDao<T> dao,
CachedElementMap<T> elementMap) throws ExecutionException, InterruptedException {
logger.info("Loading " + dao.querySize() + " " + context + " from DB...");
Set<String> types = dao.queryTypes();
for (String type : types) {
long sizeOfType = dao.querySize(type);
logger.info("Loading " + sizeOfType + " " + context + " of type " + type + " from DB...");
List<T> elements = dao.queryAll(type);
elements.forEach(elementMap::insert);
counter.addAndGet(elements.size());
}
String durationS = StringHelper.formatNanoDuration(System.nanoTime() - start);
logger.info(MessageFormat.format("Loading of {0} {1} took {2}.", nrOfElements, context, durationS));
}
private void loadPagingAsync() {
@ -129,6 +116,9 @@ public class CachedRealmLoader {
List<CompletableFuture<Void>> tasks = new ArrayList<>();
sizeByTypes.forEach((type, size) -> {
long pageSize = Math.max(1, size / Runtime.getRuntime().availableProcessors());
logger.info("Loading " + size + " " + context + " of type " + type + " from DB async in parallel...");
long position = 0;
while (position < size) {
@ -138,13 +128,13 @@ public class CachedRealmLoader {
try (StrolchTransaction tx = this.realm.openTx(getCert(), "strolch_boot", true)
.silentThreshold(10, SECONDS)
.suppressUpdates()) {
List<T> elements = daoSupplier.apply(tx).queryAll(PAGE_SIZE, p, type);
List<T> elements = daoSupplier.apply(tx).queryAll(pageSize, p, type);
elements.forEach(elementMap::insert);
counter.addAndGet(elements.size());
}
}));
position += PAGE_SIZE;
position += pageSize;
}
});