diff --git a/pom.xml b/pom.xml
index fb22c17..946b8e1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
it.cavallium
filequeue
file queue project
- 3.1.4
+ 3.1.5
jar
Light weight, high performance, simple, reliable and persistent queue
4.0.0
diff --git a/src/main/java/it/cavallium/filequeue/QueueToConsumer.java b/src/main/java/it/cavallium/filequeue/QueueToConsumer.java
index 8a984b6..6c93ecd 100644
--- a/src/main/java/it/cavallium/filequeue/QueueToConsumer.java
+++ b/src/main/java/it/cavallium/filequeue/QueueToConsumer.java
@@ -1,17 +1,15 @@
package it.cavallium.filequeue;
import java.time.Duration;
-import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
class QueueToConsumer implements IQueueToConsumer {
private final long BACKOFF_NS = Duration.ofMillis(2).toNanos();
private final long MAX_BACKOFF_NS = Duration.ofMillis(500).toNanos();
-
- private final Object lock = new Object();
- private final Semaphore semaphore = new Semaphore(0);
- private long queued;
+ private final AtomicLong preAddQueued;
+ private final AtomicLong afterAddQueued;
private final SimpleQueue queue;
private final QueueConsumer consumer;
private Manager manager;
@@ -20,7 +18,8 @@ class QueueToConsumer implements IQueueToConsumer {
public QueueToConsumer(SimpleQueue queue, QueueConsumer consumer) {
this.queue = queue;
this.consumer = consumer;
- queued = queue.size();
+ this.preAddQueued = new AtomicLong(queue.size());
+ this.afterAddQueued = new AtomicLong(queue.size());
}
public synchronized void startQueue() {
@@ -34,23 +33,30 @@ class QueueToConsumer implements IQueueToConsumer {
@Override
public void add(T value) {
boolean shouldAdd = true;
- synchronized (lock) {
- if (queued == 0 && consumer.tryConsume(value)) {
- shouldAdd = false;
- } else {
- queued++;
+ if (preAddQueued.getAndIncrement() == 0) {
+ boolean crashed = true;
+ try {
+ if (consumer.tryConsume(value)) {
+ crashed = false;
+ shouldAdd = false;
+ preAddQueued.decrementAndGet();
+ }
+ } finally {
+ if (crashed) {
+ shouldAdd = false;
+ preAddQueued.decrementAndGet();
+ }
}
}
if (shouldAdd && !closed) {
queue.add(value);
- semaphore.release();
+ afterAddQueued.incrementAndGet();
}
}
@Override
public void close() {
closed = true;
- semaphore.release();
}
private class Manager extends Thread {
@@ -59,29 +65,26 @@ class QueueToConsumer implements IQueueToConsumer {
public void run() {
try {
while (!closed) {
- boolean shouldRemove = false;
T element;
- synchronized (lock) {
- if (queued > 0) {
- queued--;
- shouldRemove = true;
- }
- }
- semaphore.acquire();
- if (!closed && shouldRemove) {
- element = queue.remove();
- long nextDelay = BACKOFF_NS;
- while (!closed && !consumer.tryConsume(element)) {
- LockSupport.parkNanos(nextDelay);
- if (nextDelay + BACKOFF_NS <= MAX_BACKOFF_NS) {
- nextDelay += BACKOFF_NS;
- } else if (nextDelay < MAX_BACKOFF_NS) {
- nextDelay = MAX_BACKOFF_NS;
+ boolean shouldRemove = preAddQueued.getAndUpdate(n -> n > 0 ? n - 1 : 0) > 0;
+ while (!closed) {
+ if (afterAddQueued.get() > 0) {
+ if (!closed && shouldRemove) {
+ element = queue.remove();
+ long nextDelay = BACKOFF_NS;
+ while (!closed && !consumer.tryConsume(element)) {
+ LockSupport.parkNanos(nextDelay);
+ if (nextDelay + BACKOFF_NS <= MAX_BACKOFF_NS) {
+ nextDelay += BACKOFF_NS;
+ } else if (nextDelay < MAX_BACKOFF_NS) {
+ nextDelay = MAX_BACKOFF_NS;
+ }
+ }
}
}
}
}
- } catch (InterruptedException ex) {
+ } finally {
closed = true;
}
}