[Major] Refactored LockableObject and ElementLockingHandler
This commit is contained in:
parent
14bd01e8ac
commit
8002d32121
|
@ -25,7 +25,7 @@ import li.strolch.agent.api.*;
|
|||
import li.strolch.model.Locator;
|
||||
import li.strolch.privilege.model.PrivilegeContext;
|
||||
import li.strolch.runtime.configuration.ComponentConfiguration;
|
||||
import li.strolch.utils.ElementLockingHandler;
|
||||
import li.strolch.utils.concurrent.ElementLockingHandler;
|
||||
import li.strolch.utils.dbc.DBC;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
package li.strolch.utils.concurrent;
|
||||
|
||||
import static java.lang.Thread.currentThread;
|
||||
import static java.text.MessageFormat.format;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class ElementLock extends ReentrantLock {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(ElementLock.class);
|
||||
private final String name;
|
||||
private long lastLockTime;
|
||||
|
||||
public ElementLock(String name, boolean fair) {
|
||||
super(fair);
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return this.name;
|
||||
}
|
||||
|
||||
public long getLastLockTime() {
|
||||
return this.lastLockTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Thread getOwner() {
|
||||
return super.getOwner();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<Thread> getQueuedThreads() {
|
||||
return super.getQueuedThreads();
|
||||
}
|
||||
|
||||
public void lock(TimeUnit timeUnit, long tryLockTime) throws ElementLockingException {
|
||||
try {
|
||||
|
||||
if (!tryLock() && !tryLock(tryLockTime, timeUnit)) {
|
||||
String msg = "Thread {0} failed to acquire lock after {1}s for {2}";
|
||||
msg = format(msg, currentThread().getName(), timeUnit.toSeconds(tryLockTime), this.name);
|
||||
|
||||
Thread owner = getOwner();
|
||||
if (owner == null) {
|
||||
logger.error(format("Lock {0} is currently held by unknown thread!", this.name));
|
||||
logger.error(toString());
|
||||
} else {
|
||||
Exception e = new Exception();
|
||||
e.setStackTrace(owner.getStackTrace());
|
||||
logger.error(format("Lock {0} is currently held by {1}", this.name, owner), e);
|
||||
}
|
||||
|
||||
logger.error("Threads waiting on this lock are:");
|
||||
for (Thread queuedThread : getQueuedThreads()) {
|
||||
Exception e = new Exception();
|
||||
e.setStackTrace(queuedThread.getStackTrace());
|
||||
logger.error("\n" + queuedThread.getName(), e);
|
||||
}
|
||||
|
||||
throw new ElementLockingException(msg);
|
||||
}
|
||||
|
||||
this.lastLockTime = System.currentTimeMillis();
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
throw new ElementLockingException("Interrupted while trying to acquire lock for " + this.name, e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -13,7 +13,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package li.strolch.utils.exceptions;
|
||||
package li.strolch.utils.concurrent;
|
||||
|
||||
import java.util.Locale;
|
||||
|
|
@ -1,7 +1,6 @@
|
|||
package li.strolch.utils;
|
||||
package li.strolch.utils.concurrent;
|
||||
|
||||
import java.text.MessageFormat;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
@ -10,9 +9,10 @@ import java.util.concurrent.ScheduledFuture;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import li.strolch.utils.CheckedRunnable;
|
||||
import li.strolch.utils.CheckedSupplier;
|
||||
import li.strolch.utils.collections.TypedTuple;
|
||||
import li.strolch.utils.dbc.DBC;
|
||||
import li.strolch.utils.exceptions.ElementLockingException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -20,7 +20,7 @@ public class ElementLockingHandler<T> {
|
|||
private static final Logger logger = LoggerFactory.getLogger(ElementLockingHandler.class);
|
||||
private final TimeUnit tryLockTimeUnit;
|
||||
private final long tryLockTime;
|
||||
private final Map<T, TypedTuple<Lock, Long>> lockMap;
|
||||
private final Map<T, TypedTuple<ElementLock, Long>> lockMap;
|
||||
private final ScheduledExecutorService executorService;
|
||||
|
||||
private ScheduledFuture<?> cleanupTask;
|
||||
|
@ -35,6 +35,54 @@ public class ElementLockingHandler<T> {
|
|||
this.lockMap = new ConcurrentHashMap<>();
|
||||
}
|
||||
|
||||
/**
|
||||
* First locks the given element, then calls the given action, using a try catch/finally to unlock the element after
|
||||
* the action has completed.
|
||||
* <p>
|
||||
* Note that only {@link #lock(Object)} and {@link #unlock(Object)} are called, if the element was locked
|
||||
* previously, then the lock counter is only reduced to the value prior to the call
|
||||
*
|
||||
* @param element
|
||||
* the element to lock
|
||||
* @param action
|
||||
* the action to perform
|
||||
*/
|
||||
public void lockedExecute(T element, CheckedRunnable action) {
|
||||
lock(element);
|
||||
try {
|
||||
action.run();
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to execute action " + action + " for locked element " + element, e);
|
||||
} finally {
|
||||
unlock(element);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* First locks the given element, then calls the given action, returning any result, using a try catch/finally to
|
||||
* unlock the element after the action has completed.
|
||||
* <p>
|
||||
* Note that only {@link #lock(Object)} and {@link #unlock(Object)} are called, if the element was locked
|
||||
* previously, then the lock counter is only reduced to the value prior to the call
|
||||
*
|
||||
* @param element
|
||||
* the element to lock
|
||||
* @param action
|
||||
* the action to perform
|
||||
*
|
||||
* @return the result of the action
|
||||
*/
|
||||
public <U> U lockedExecuteWithResult(T element, CheckedSupplier<U> action) {
|
||||
lock(element);
|
||||
try {
|
||||
return action.get();
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to execute action " + action + " for locked element " + element, e);
|
||||
} finally {
|
||||
unlock(element);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Locks the given element by creating a {@link ReentrantLock} on it. If the lock is already held by the calling
|
||||
* thread, then the lock count is increased
|
||||
|
@ -46,9 +94,9 @@ public class ElementLockingHandler<T> {
|
|||
* if the lock could not be acquired
|
||||
*/
|
||||
public void lock(T element) throws ElementLockingException {
|
||||
TypedTuple<Lock, Long> tuple = this.lockMap.computeIfAbsent(element,
|
||||
l -> new TypedTuple<>(new Lock(true), System.currentTimeMillis()));
|
||||
lock(this.tryLockTimeUnit, this.tryLockTime, tuple, element);
|
||||
DBC.PRE.assertNotNull("element may not be null!", element);
|
||||
TypedTuple<ElementLock, Long> tuple = this.lockMap.computeIfAbsent(element, this::newLock);
|
||||
tuple.getFirst().lock(this.tryLockTimeUnit, this.tryLockTime);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -68,12 +116,13 @@ public class ElementLockingHandler<T> {
|
|||
* if unlocking failed
|
||||
*/
|
||||
public void unlock(T element) throws ElementLockingException {
|
||||
TypedTuple<Lock, Long> tuple = this.lockMap.get(element);
|
||||
Lock lock = tuple.getFirst();
|
||||
if (lock == null || !lock.isHeldByCurrentThread()) {
|
||||
DBC.PRE.assertNotNull("element may not be null!", element);
|
||||
TypedTuple<ElementLock, Long> tuple = this.lockMap.get(element);
|
||||
ElementLock elementLock = tuple.getFirst();
|
||||
if (elementLock == null || !elementLock.isHeldByCurrentThread()) {
|
||||
logger.error(MessageFormat.format("Trying to unlock not locked element {0}", element));
|
||||
} else {
|
||||
unlock(lock);
|
||||
unlock(elementLock);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -88,57 +137,18 @@ public class ElementLockingHandler<T> {
|
|||
* if the lock could not be released
|
||||
*/
|
||||
public void releaseLock(T element) throws ElementLockingException {
|
||||
TypedTuple<Lock, Long> tuple = this.lockMap.get(element);
|
||||
Lock lock = tuple.getFirst();
|
||||
if (lock == null) {
|
||||
DBC.PRE.assertNotNull("element may not be null!", element);
|
||||
TypedTuple<ElementLock, Long> tuple = this.lockMap.get(element);
|
||||
ElementLock elementLock = tuple.getFirst();
|
||||
if (elementLock == null) {
|
||||
logger.error(MessageFormat.format("Trying to unlock not locked element {0}", element));
|
||||
} else if (!lock.isHeldByCurrentThread()) {
|
||||
if (lock.isLocked())
|
||||
} else if (!elementLock.isHeldByCurrentThread()) {
|
||||
if (elementLock.isLocked())
|
||||
logger.error(MessageFormat.format("Lock not held by this thread for element {0}", element));
|
||||
else
|
||||
logger.error(MessageFormat.format("Element {0} is not locked!", element));
|
||||
} else {
|
||||
releaseLock(lock);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @see java.util.concurrent.locks.ReentrantLock#tryLock(long, TimeUnit)
|
||||
*/
|
||||
private void lock(TimeUnit timeUnit, long tryLockTime, TypedTuple<Lock, Long> tuple, T element)
|
||||
throws ElementLockingException {
|
||||
try {
|
||||
|
||||
Lock lock = tuple.getFirst();
|
||||
if (!lock.tryLock() && !lock.tryLock(tryLockTime, timeUnit)) {
|
||||
String msg = "Thread {0} failed to acquire lock after {1}s for {2}";
|
||||
msg = MessageFormat.format(msg, Thread.currentThread().getName(), timeUnit.toSeconds(tryLockTime),
|
||||
element);
|
||||
|
||||
Thread owner = lock.getOwner();
|
||||
if (owner == null) {
|
||||
logger.error(MessageFormat.format("Lock {0} is currently held by unknown thread!", element));
|
||||
logger.error(lock.toString());
|
||||
} else {
|
||||
Exception e = new Exception();
|
||||
e.setStackTrace(owner.getStackTrace());
|
||||
logger.error(MessageFormat.format("Lock {0} is currently held by {1}", element, owner), e);
|
||||
}
|
||||
|
||||
logger.error("Threads waiting on this lock are:");
|
||||
for (Thread queuedThread : lock.getQueuedThreads()) {
|
||||
Exception e = new Exception();
|
||||
e.setStackTrace(queuedThread.getStackTrace());
|
||||
logger.error("\n" + queuedThread.getName(), e);
|
||||
}
|
||||
|
||||
throw new ElementLockingException(msg);
|
||||
}
|
||||
|
||||
tuple.setSecond(System.currentTimeMillis());
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
throw new ElementLockingException("Interrupted while trying to acquire lock for " + element, e);
|
||||
releaseLock(elementLock);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -180,14 +190,14 @@ public class ElementLockingHandler<T> {
|
|||
|
||||
private void cleanupOldLocks() {
|
||||
|
||||
Map<T, TypedTuple<Lock, Long>> lockMap;
|
||||
Map<T, TypedTuple<ElementLock, Long>> lockMap;
|
||||
synchronized (this.lockMap) {
|
||||
lockMap = new HashMap<>(this.lockMap);
|
||||
}
|
||||
|
||||
long maxAge = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1);
|
||||
long count = 0;
|
||||
for (Map.Entry<T, TypedTuple<Lock, Long>> entry : lockMap.entrySet()) {
|
||||
for (Map.Entry<T, TypedTuple<ElementLock, Long>> entry : lockMap.entrySet()) {
|
||||
if (!entry.getValue().getFirst().isLocked() && entry.getValue().getSecond() <= maxAge) {
|
||||
this.lockMap.remove(entry.getKey());
|
||||
count++;
|
||||
|
@ -197,20 +207,7 @@ public class ElementLockingHandler<T> {
|
|||
logger.info("Pruned " + count + " T locks.");
|
||||
}
|
||||
|
||||
public static class Lock extends ReentrantLock {
|
||||
|
||||
public Lock(boolean fair) {
|
||||
super(fair);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Thread getOwner() {
|
||||
return super.getOwner();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<Thread> getQueuedThreads() {
|
||||
return super.getQueuedThreads();
|
||||
}
|
||||
private TypedTuple<ElementLock, Long> newLock(T element) {
|
||||
return new TypedTuple<>(new ElementLock(element.toString(), true), 0L);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Copyright 2013 Robert von Burg <eitch@eitchnet.ch>
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package li.strolch.utils.concurrent;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class LockableObject {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(LockableObject.class);
|
||||
private static long tryLockTime = 10000L;
|
||||
|
||||
public static void setTryLockTime(long tryLockTime) {
|
||||
LockableObject.tryLockTime = tryLockTime;
|
||||
}
|
||||
|
||||
private final ElementLock elementLock;
|
||||
protected final String name;
|
||||
|
||||
public LockableObject(String name) {
|
||||
this.name = name;
|
||||
this.elementLock = new ElementLock(name, true);
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return this.name;
|
||||
}
|
||||
|
||||
public static long getLockTime() {
|
||||
return tryLockTime;
|
||||
}
|
||||
|
||||
public void lock() {
|
||||
|
||||
// don't lock multiple times
|
||||
if (this.elementLock.isHeldByCurrentThread() && this.elementLock.isLocked())
|
||||
return;
|
||||
|
||||
this.elementLock.lock(TimeUnit.MILLISECONDS, tryLockTime);
|
||||
}
|
||||
|
||||
/**
|
||||
* @see java.util.concurrent.locks.ReentrantLock#unlock()
|
||||
*/
|
||||
public void releaseLock() {
|
||||
while (this.elementLock.isHeldByCurrentThread() && this.elementLock.isLocked()) {
|
||||
if (logger.isDebugEnabled())
|
||||
logger.debug("unlocking " + this);
|
||||
this.elementLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -15,7 +15,7 @@
|
|||
*/
|
||||
package li.strolch.xmlpers.api;
|
||||
|
||||
import li.strolch.xmlpers.objref.LockableObject;
|
||||
import li.strolch.utils.concurrent.LockableObject;
|
||||
|
||||
/**
|
||||
* @author Robert von Burg <eitch@eitchnet.ch>
|
||||
|
|
|
@ -23,7 +23,7 @@ import java.text.MessageFormat;
|
|||
import java.util.Properties;
|
||||
|
||||
import li.strolch.xmlpers.api.*;
|
||||
import li.strolch.xmlpers.objref.LockableObject;
|
||||
import li.strolch.utils.concurrent.LockableObject;
|
||||
import li.strolch.xmlpers.objref.ObjectReferenceCache;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
|
@ -22,7 +22,7 @@ import java.util.*;
|
|||
|
||||
import li.strolch.utils.objectfilter.ObjectFilter;
|
||||
import li.strolch.xmlpers.api.*;
|
||||
import li.strolch.xmlpers.objref.LockableObject;
|
||||
import li.strolch.utils.concurrent.LockableObject;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
|
@ -1,124 +0,0 @@
|
|||
/*
|
||||
* Copyright 2013 Robert von Burg <eitch@eitchnet.ch>
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package li.strolch.xmlpers.objref;
|
||||
|
||||
import static java.lang.Thread.currentThread;
|
||||
import static java.text.MessageFormat.format;
|
||||
import static li.strolch.utils.helper.StringHelper.formatMillisecondsDuration;
|
||||
|
||||
import java.text.MessageFormat;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import li.strolch.xmlpers.api.XmlPersistenceException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class LockableObject {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(LockableObject.class);
|
||||
private static long tryLockTime = 10000L;
|
||||
|
||||
public static void setTryLockTime(long tryLockTime) {
|
||||
LockableObject.tryLockTime = tryLockTime;
|
||||
}
|
||||
|
||||
private final Lock lock;
|
||||
protected final String name;
|
||||
|
||||
public LockableObject(String name) {
|
||||
this.name = name;
|
||||
this.lock = new Lock(true);
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return this.name;
|
||||
}
|
||||
|
||||
public static long getLockTime() {
|
||||
return tryLockTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* @see java.util.concurrent.locks.ReentrantLock#tryLock(long, TimeUnit)
|
||||
*/
|
||||
public void lock() {
|
||||
|
||||
// don't lock multiple times
|
||||
if (this.lock.isHeldByCurrentThread() && this.lock.isLocked())
|
||||
return;
|
||||
|
||||
try {
|
||||
|
||||
if (!this.lock.tryLock(tryLockTime, TimeUnit.MILLISECONDS)) {
|
||||
String msg = "Thread {0} failed to acquire lock after {1} for {2}";
|
||||
msg = format(msg, currentThread().getName(), formatMillisecondsDuration(tryLockTime), this);
|
||||
|
||||
Thread owner = lock.getOwner();
|
||||
if (owner == null) {
|
||||
logger.error(MessageFormat.format("Lock {0} is currently held unknown thread!", this.name));
|
||||
logger.error(lock.toString());
|
||||
} else {
|
||||
Exception e = new Exception();
|
||||
e.setStackTrace(owner.getStackTrace());
|
||||
logger.error(MessageFormat.format("Lock {0} is currently held by {1}", this.name, owner), e);
|
||||
}
|
||||
|
||||
logger.error("Threads waiting on this lock are:");
|
||||
for (Thread queuedThread : lock.getQueuedThreads()) {
|
||||
Exception e = new Exception();
|
||||
e.setStackTrace(queuedThread.getStackTrace());
|
||||
logger.error("\n" + queuedThread.getName(), e);
|
||||
}
|
||||
|
||||
throw new XmlPersistenceException(msg);
|
||||
}
|
||||
if (logger.isDebugEnabled())
|
||||
logger.debug("locked " + this);
|
||||
} catch (InterruptedException e) {
|
||||
throw new XmlPersistenceException("Thread interrupted: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @see java.util.concurrent.locks.ReentrantLock#unlock()
|
||||
*/
|
||||
public void releaseLock() {
|
||||
while (this.lock.isHeldByCurrentThread() && this.lock.isLocked()) {
|
||||
if (logger.isDebugEnabled())
|
||||
logger.debug("unlocking " + this);
|
||||
this.lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public static class Lock extends ReentrantLock {
|
||||
|
||||
public Lock(boolean fair) {
|
||||
super(fair);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Thread getOwner() {
|
||||
return super.getOwner();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<Thread> getQueuedThreads() {
|
||||
return super.getQueuedThreads();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -18,6 +18,7 @@ package li.strolch.xmlpers.objref;
|
|||
import java.io.File;
|
||||
import java.util.Objects;
|
||||
|
||||
import li.strolch.utils.concurrent.LockableObject;
|
||||
import li.strolch.xmlpers.api.PersistenceContext;
|
||||
import li.strolch.xmlpers.api.PersistenceTransaction;
|
||||
import li.strolch.xmlpers.impl.PathBuilder;
|
||||
|
|
|
@ -28,7 +28,7 @@ import li.strolch.xmlpers.api.IoMode;
|
|||
import li.strolch.xmlpers.api.PersistenceConstants;
|
||||
import li.strolch.xmlpers.api.PersistenceTransaction;
|
||||
import li.strolch.xmlpers.objref.IdOfSubTypeRef;
|
||||
import li.strolch.xmlpers.objref.LockableObject;
|
||||
import li.strolch.utils.concurrent.LockableObject;
|
||||
import li.strolch.xmlpers.test.model.MyModel;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
|
Loading…
Reference in New Issue