From 599a2463a3e54767286d6ff80db3d32dadf7282a Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 9 Nov 2022 23:27:46 +0100 Subject: [PATCH] Fix synchronization --- pom.xml | 2 +- .../cavallium/filequeue/LMDBQueueToConsumer.java | 14 ++++---------- .../it/cavallium/filequeue/QueueToConsumer.java | 8 ++------ .../java/it/cavallium/filequeue/SimpleQueue.java | 3 +++ .../it/cavallium/filequeue/SimpleQueueFile.java | 6 +++--- .../it/cavallium/filequeue/SimpleQueueJava.java | 6 ++++++ .../it/cavallium/filequeue/SimpleQueueLMDB.java | 3 +++ 7 files changed, 22 insertions(+), 20 deletions(-) diff --git a/pom.xml b/pom.xml index 4ccbc03..fb22c17 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ it.cavallium filequeue file queue project - 3.1.3 + 3.1.4 jar Light weight, high performance, simple, reliable and persistent queue 4.0.0 diff --git a/src/main/java/it/cavallium/filequeue/LMDBQueueToConsumer.java b/src/main/java/it/cavallium/filequeue/LMDBQueueToConsumer.java index 7d5843d..aedffa6 100644 --- a/src/main/java/it/cavallium/filequeue/LMDBQueueToConsumer.java +++ b/src/main/java/it/cavallium/filequeue/LMDBQueueToConsumer.java @@ -22,23 +22,17 @@ public final class LMDBQueueToConsumer implements IQueueToConsumer { @Override public void add(T value) { - synchronized (queue) { - queue.add(value); - } + queue.add(value); } @Override public void close() { - synchronized (queue) { - queue.close(); - queueLMDB.close(); - } + queue.close(); + queueLMDB.close(); } @Override public void startQueue() { - synchronized (queue) { - queue.startQueue(); - } + queue.startQueue(); } } diff --git a/src/main/java/it/cavallium/filequeue/QueueToConsumer.java b/src/main/java/it/cavallium/filequeue/QueueToConsumer.java index 623a294..8a984b6 100644 --- a/src/main/java/it/cavallium/filequeue/QueueToConsumer.java +++ b/src/main/java/it/cavallium/filequeue/QueueToConsumer.java @@ -42,9 +42,7 @@ class QueueToConsumer implements IQueueToConsumer { } } if (shouldAdd && !closed) { - synchronized (queue) { - queue.add(value); - } + queue.add(value); semaphore.release(); } } @@ -71,9 +69,7 @@ class QueueToConsumer implements IQueueToConsumer { } semaphore.acquire(); if (!closed && shouldRemove) { - synchronized (queue) { - element = queue.remove(); - } + element = queue.remove(); long nextDelay = BACKOFF_NS; while (!closed && !consumer.tryConsume(element)) { LockSupport.parkNanos(nextDelay); diff --git a/src/main/java/it/cavallium/filequeue/SimpleQueue.java b/src/main/java/it/cavallium/filequeue/SimpleQueue.java index cd4903b..4b83073 100644 --- a/src/main/java/it/cavallium/filequeue/SimpleQueue.java +++ b/src/main/java/it/cavallium/filequeue/SimpleQueue.java @@ -1,5 +1,8 @@ package it.cavallium.filequeue; +/** + * The queue must be thread-safe + */ interface SimpleQueue { void add(T element); diff --git a/src/main/java/it/cavallium/filequeue/SimpleQueueFile.java b/src/main/java/it/cavallium/filequeue/SimpleQueueFile.java index b1d5bb2..a2c074a 100644 --- a/src/main/java/it/cavallium/filequeue/SimpleQueueFile.java +++ b/src/main/java/it/cavallium/filequeue/SimpleQueueFile.java @@ -17,7 +17,7 @@ class SimpleQueueFile implements SimpleQueue { } @Override - public void add(T element) { + public synchronized void add(T element) { try { queueFile.add(ser.serialize(element)); } catch (IOException e) { @@ -26,7 +26,7 @@ class SimpleQueueFile implements SimpleQueue { } @Override - public T remove() { + public synchronized T remove() { try { byte[] element = queueFile.peek(); if (element == null) { @@ -41,7 +41,7 @@ class SimpleQueueFile implements SimpleQueue { } @Override - public int size() { + public synchronized int size() { return queueFile.size(); } } diff --git a/src/main/java/it/cavallium/filequeue/SimpleQueueJava.java b/src/main/java/it/cavallium/filequeue/SimpleQueueJava.java index ccf2f57..7650d7f 100644 --- a/src/main/java/it/cavallium/filequeue/SimpleQueueJava.java +++ b/src/main/java/it/cavallium/filequeue/SimpleQueueJava.java @@ -2,10 +2,16 @@ package it.cavallium.filequeue; import java.util.Queue; +/** + * Thread safe queue + */ class SimpleQueueJava implements SimpleQueue { private final Queue queue; + /** + * @param queue the queue must be thread-safe + */ public SimpleQueueJava(Queue queue) { this.queue = queue; } diff --git a/src/main/java/it/cavallium/filequeue/SimpleQueueLMDB.java b/src/main/java/it/cavallium/filequeue/SimpleQueueLMDB.java index bb8cfa5..4f52a48 100644 --- a/src/main/java/it/cavallium/filequeue/SimpleQueueLMDB.java +++ b/src/main/java/it/cavallium/filequeue/SimpleQueueLMDB.java @@ -13,6 +13,9 @@ import org.lmdbjava.DbiFlags; import org.lmdbjava.Env; import org.lmdbjava.Txn; +/** + * Thread safe queue + */ public class SimpleQueueLMDB implements SimpleQueue, Closeable { private static final Set REGISTRY = new HashSet<>();