From 4a5a42b7fd4c5d81975cfc11154c55a1c3b1cb5f Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 10 Nov 2022 00:11:47 +0100 Subject: [PATCH] Fix synchronization --- pom.xml | 2 +- .../cavallium/filequeue/QueueToConsumer.java | 65 ++++++++++--------- 2 files changed, 35 insertions(+), 32 deletions(-) diff --git a/pom.xml b/pom.xml index fb22c17..946b8e1 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ it.cavallium filequeue file queue project - 3.1.4 + 3.1.5 jar Light weight, high performance, simple, reliable and persistent queue 4.0.0 diff --git a/src/main/java/it/cavallium/filequeue/QueueToConsumer.java b/src/main/java/it/cavallium/filequeue/QueueToConsumer.java index 8a984b6..6c93ecd 100644 --- a/src/main/java/it/cavallium/filequeue/QueueToConsumer.java +++ b/src/main/java/it/cavallium/filequeue/QueueToConsumer.java @@ -1,17 +1,15 @@ package it.cavallium.filequeue; import java.time.Duration; -import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.LockSupport; class QueueToConsumer implements IQueueToConsumer { private final long BACKOFF_NS = Duration.ofMillis(2).toNanos(); private final long MAX_BACKOFF_NS = Duration.ofMillis(500).toNanos(); - - private final Object lock = new Object(); - private final Semaphore semaphore = new Semaphore(0); - private long queued; + private final AtomicLong preAddQueued; + private final AtomicLong afterAddQueued; private final SimpleQueue queue; private final QueueConsumer consumer; private Manager manager; @@ -20,7 +18,8 @@ class QueueToConsumer implements IQueueToConsumer { public QueueToConsumer(SimpleQueue queue, QueueConsumer consumer) { this.queue = queue; this.consumer = consumer; - queued = queue.size(); + this.preAddQueued = new AtomicLong(queue.size()); + this.afterAddQueued = new AtomicLong(queue.size()); } public synchronized void startQueue() { @@ -34,23 +33,30 @@ class QueueToConsumer implements IQueueToConsumer { @Override public void add(T value) { boolean shouldAdd = true; - synchronized (lock) { - if (queued == 0 && consumer.tryConsume(value)) { - shouldAdd = false; - } else { - queued++; + if (preAddQueued.getAndIncrement() == 0) { + boolean crashed = true; + try { + if (consumer.tryConsume(value)) { + crashed = false; + shouldAdd = false; + preAddQueued.decrementAndGet(); + } + } finally { + if (crashed) { + shouldAdd = false; + preAddQueued.decrementAndGet(); + } } } if (shouldAdd && !closed) { queue.add(value); - semaphore.release(); + afterAddQueued.incrementAndGet(); } } @Override public void close() { closed = true; - semaphore.release(); } private class Manager extends Thread { @@ -59,29 +65,26 @@ class QueueToConsumer implements IQueueToConsumer { public void run() { try { while (!closed) { - boolean shouldRemove = false; T element; - synchronized (lock) { - if (queued > 0) { - queued--; - shouldRemove = true; - } - } - semaphore.acquire(); - if (!closed && shouldRemove) { - element = queue.remove(); - long nextDelay = BACKOFF_NS; - while (!closed && !consumer.tryConsume(element)) { - LockSupport.parkNanos(nextDelay); - if (nextDelay + BACKOFF_NS <= MAX_BACKOFF_NS) { - nextDelay += BACKOFF_NS; - } else if (nextDelay < MAX_BACKOFF_NS) { - nextDelay = MAX_BACKOFF_NS; + boolean shouldRemove = preAddQueued.getAndUpdate(n -> n > 0 ? n - 1 : 0) > 0; + while (!closed) { + if (afterAddQueued.get() > 0) { + if (!closed && shouldRemove) { + element = queue.remove(); + long nextDelay = BACKOFF_NS; + while (!closed && !consumer.tryConsume(element)) { + LockSupport.parkNanos(nextDelay); + if (nextDelay + BACKOFF_NS <= MAX_BACKOFF_NS) { + nextDelay += BACKOFF_NS; + } else if (nextDelay < MAX_BACKOFF_NS) { + nextDelay = MAX_BACKOFF_NS; + } + } } } } } - } catch (InterruptedException ex) { + } finally { closed = true; } }