[Fix] Automatically flush DAOs in PostgreSqlDataArchiveHandler

This commit is contained in:
Robert von Burg 2021-12-28 22:02:08 +01:00
parent b24fa57490
commit 4c79a68cdb
1 changed files with 47 additions and 5 deletions

View File

@ -1,11 +1,13 @@
package li.strolch.persistence.postgresql; package li.strolch.persistence.postgresql;
import static li.strolch.db.DbConstants.*; import static li.strolch.db.DbConstants.*;
import static li.strolch.persistence.postgresql.DataType.xml;
import javax.sql.DataSource; import javax.sql.DataSource;
import java.sql.Connection; import java.sql.Connection;
import java.util.Date; import java.util.Date;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.BiFunction; import java.util.function.BiFunction;
@ -19,8 +21,15 @@ public class PostgreSqlDataArchiveHandler extends StrolchComponent implements Da
private static final String SCRIPT_PREFIX = "archive"; private static final String SCRIPT_PREFIX = "archive";
private final Map<Connection, ResourceDao> resourceDaoMap;
private final Map<Connection, OrderDao> orderDaoMap;
private final Map<Connection, ActivityDao> activityDaoMap;
public PostgreSqlDataArchiveHandler(ComponentContainer container, String componentName) { public PostgreSqlDataArchiveHandler(ComponentContainer container, String componentName) {
super(container, componentName); super(container, componentName);
this.resourceDaoMap = new ConcurrentHashMap<>();
this.orderDaoMap = new ConcurrentHashMap<>();
this.activityDaoMap = new ConcurrentHashMap<>();
} }
@Override @Override
@ -52,39 +61,72 @@ public class PostgreSqlDataArchiveHandler extends StrolchComponent implements Da
@Override @Override
public void run(StrolchTransaction tx, BiConsumer<Connection, TransactionResult> runnable) { public void run(StrolchTransaction tx, BiConsumer<Connection, TransactionResult> runnable) {
Connection con = null;
try (Connection connection = getConnection(tx)) { try (Connection connection = getConnection(tx)) {
con = connection;
TransactionResult txResult = new TransactionResult(tx.getRealmName(), System.nanoTime(), new Date()); TransactionResult txResult = new TransactionResult(tx.getRealmName(), System.nanoTime(), new Date());
runnable.accept(connection, txResult); runnable.accept(connection, txResult);
flush(connection);
connection.commit(); connection.commit();
} catch (Exception e) { } catch (Exception e) {
throw new StrolchPersistenceException("Archive DB Connection failed", e); throw new StrolchPersistenceException("Archive DB Connection failed", e);
} finally {
if (con != null)
clearCachedDAOs(con);
} }
} }
@Override @Override
public <T> T runWithResult(StrolchTransaction tx, BiFunction<Connection, TransactionResult, T> runnable) { public <T> T runWithResult(StrolchTransaction tx, BiFunction<Connection, TransactionResult, T> runnable) {
Connection con = null;
try (Connection connection = getConnection(tx)) { try (Connection connection = getConnection(tx)) {
con = connection;
TransactionResult txResult = new TransactionResult(tx.getRealmName(), System.nanoTime(), new Date()); TransactionResult txResult = new TransactionResult(tx.getRealmName(), System.nanoTime(), new Date());
T t = runnable.apply(connection, txResult); T t = runnable.apply(connection, txResult);
flush(connection);
connection.commit(); connection.commit();
return t; return t;
} catch (Exception e) { } catch (Exception e) {
throw new StrolchPersistenceException("Archive DB Connection failed", e); throw new StrolchPersistenceException("Archive DB Connection failed", e);
} finally {
if (con != null)
clearCachedDAOs(con);
} }
} }
@Override private void flush(Connection connection) {
public OrderDao getOrderDao(Connection connection, TransactionResult txResult) { ResourceDao resourceDao = this.resourceDaoMap.remove(connection);
return new ArchivePostgreSqlOrderDao(DataType.xml, connection, txResult, false); if (resourceDao != null)
resourceDao.flush();
OrderDao orderDao = this.orderDaoMap.remove(connection);
if (orderDao != null)
orderDao.flush();
ActivityDao activityDao = this.activityDaoMap.remove(connection);
if (activityDao != null)
activityDao.flush();
}
private void clearCachedDAOs(Connection con) {
this.resourceDaoMap.remove(con);
this.orderDaoMap.remove(con);
this.activityDaoMap.remove(con);
} }
@Override @Override
public ResourceDao getResourceDao(Connection connection, TransactionResult txResult) { public ResourceDao getResourceDao(Connection connection, TransactionResult txResult) {
return new ArchivePostgreSqlResourceDao(DataType.xml, connection, txResult, false); return this.resourceDaoMap.computeIfAbsent(connection,
c -> new ArchivePostgreSqlResourceDao(xml, connection, txResult, false));
}
@Override
public OrderDao getOrderDao(Connection connection, TransactionResult txResult) {
return this.orderDaoMap.computeIfAbsent(connection,
c -> new ArchivePostgreSqlOrderDao(xml, connection, txResult, false));
} }
@Override @Override
public ActivityDao getActivityDao(Connection connection, TransactionResult txResult) { public ActivityDao getActivityDao(Connection connection, TransactionResult txResult) {
return new ArchivePostgreSqlActivityDao(DataType.xml, connection, txResult, false); return this.activityDaoMap.computeIfAbsent(connection,
c -> new ArchivePostgreSqlActivityDao(xml, connection, txResult, false));
} }
} }