Fix synchronization

This commit is contained in:
Andrea Cavalli 2022-11-09 23:27:46 +01:00
parent 477932e051
commit 599a2463a3
7 changed files with 22 additions and 20 deletions

View File

@ -4,7 +4,7 @@
<groupId>it.cavallium</groupId> <groupId>it.cavallium</groupId>
<artifactId>filequeue</artifactId> <artifactId>filequeue</artifactId>
<name>file queue project</name> <name>file queue project</name>
<version>3.1.3</version> <version>3.1.4</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<description>Light weight, high performance, simple, reliable and persistent queue</description> <description>Light weight, high performance, simple, reliable and persistent queue</description>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -22,23 +22,17 @@ public final class LMDBQueueToConsumer<T> implements IQueueToConsumer<T> {
@Override @Override
public void add(T value) { public void add(T value) {
synchronized (queue) {
queue.add(value); queue.add(value);
} }
}
@Override @Override
public void close() { public void close() {
synchronized (queue) {
queue.close(); queue.close();
queueLMDB.close(); queueLMDB.close();
} }
}
@Override @Override
public void startQueue() { public void startQueue() {
synchronized (queue) {
queue.startQueue(); queue.startQueue();
} }
} }
}

View File

@ -42,9 +42,7 @@ class QueueToConsumer<T> implements IQueueToConsumer<T> {
} }
} }
if (shouldAdd && !closed) { if (shouldAdd && !closed) {
synchronized (queue) {
queue.add(value); queue.add(value);
}
semaphore.release(); semaphore.release();
} }
} }
@ -71,9 +69,7 @@ class QueueToConsumer<T> implements IQueueToConsumer<T> {
} }
semaphore.acquire(); semaphore.acquire();
if (!closed && shouldRemove) { if (!closed && shouldRemove) {
synchronized (queue) {
element = queue.remove(); element = queue.remove();
}
long nextDelay = BACKOFF_NS; long nextDelay = BACKOFF_NS;
while (!closed && !consumer.tryConsume(element)) { while (!closed && !consumer.tryConsume(element)) {
LockSupport.parkNanos(nextDelay); LockSupport.parkNanos(nextDelay);

View File

@ -1,5 +1,8 @@
package it.cavallium.filequeue; package it.cavallium.filequeue;
/**
* The queue must be thread-safe
*/
interface SimpleQueue<T> { interface SimpleQueue<T> {
void add(T element); void add(T element);

View File

@ -17,7 +17,7 @@ class SimpleQueueFile<T> implements SimpleQueue<T> {
} }
@Override @Override
public void add(T element) { public synchronized void add(T element) {
try { try {
queueFile.add(ser.serialize(element)); queueFile.add(ser.serialize(element));
} catch (IOException e) { } catch (IOException e) {
@ -26,7 +26,7 @@ class SimpleQueueFile<T> implements SimpleQueue<T> {
} }
@Override @Override
public T remove() { public synchronized T remove() {
try { try {
byte[] element = queueFile.peek(); byte[] element = queueFile.peek();
if (element == null) { if (element == null) {
@ -41,7 +41,7 @@ class SimpleQueueFile<T> implements SimpleQueue<T> {
} }
@Override @Override
public int size() { public synchronized int size() {
return queueFile.size(); return queueFile.size();
} }
} }

View File

@ -2,10 +2,16 @@ package it.cavallium.filequeue;
import java.util.Queue; import java.util.Queue;
/**
* Thread safe queue
*/
class SimpleQueueJava<T> implements SimpleQueue<T> { class SimpleQueueJava<T> implements SimpleQueue<T> {
private final Queue<T> queue; private final Queue<T> queue;
/**
* @param queue the queue must be thread-safe
*/
public SimpleQueueJava(Queue<T> queue) { public SimpleQueueJava(Queue<T> queue) {
this.queue = queue; this.queue = queue;
} }

View File

@ -13,6 +13,9 @@ import org.lmdbjava.DbiFlags;
import org.lmdbjava.Env; import org.lmdbjava.Env;
import org.lmdbjava.Txn; import org.lmdbjava.Txn;
/**
* Thread safe queue
*/
public class SimpleQueueLMDB<T> implements SimpleQueue<T>, Closeable { public class SimpleQueueLMDB<T> implements SimpleQueue<T>, Closeable {
private static final Set<String> REGISTRY = new HashSet<>(); private static final Set<String> REGISTRY = new HashSet<>();