Fix synchronization

This commit is contained in:
Andrea Cavalli 2022-11-10 00:11:47 +01:00
parent 599a2463a3
commit 4a5a42b7fd
2 changed files with 35 additions and 32 deletions

View File

@ -4,7 +4,7 @@
<groupId>it.cavallium</groupId> <groupId>it.cavallium</groupId>
<artifactId>filequeue</artifactId> <artifactId>filequeue</artifactId>
<name>file queue project</name> <name>file queue project</name>
<version>3.1.4</version> <version>3.1.5</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<description>Light weight, high performance, simple, reliable and persistent queue</description> <description>Light weight, high performance, simple, reliable and persistent queue</description>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -1,17 +1,15 @@
package it.cavallium.filequeue; package it.cavallium.filequeue;
import java.time.Duration; import java.time.Duration;
import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
class QueueToConsumer<T> implements IQueueToConsumer<T> { class QueueToConsumer<T> implements IQueueToConsumer<T> {
private final long BACKOFF_NS = Duration.ofMillis(2).toNanos(); private final long BACKOFF_NS = Duration.ofMillis(2).toNanos();
private final long MAX_BACKOFF_NS = Duration.ofMillis(500).toNanos(); private final long MAX_BACKOFF_NS = Duration.ofMillis(500).toNanos();
private final AtomicLong preAddQueued;
private final Object lock = new Object(); private final AtomicLong afterAddQueued;
private final Semaphore semaphore = new Semaphore(0);
private long queued;
private final SimpleQueue<T> queue; private final SimpleQueue<T> queue;
private final QueueConsumer<T> consumer; private final QueueConsumer<T> consumer;
private Manager manager; private Manager manager;
@ -20,7 +18,8 @@ class QueueToConsumer<T> implements IQueueToConsumer<T> {
public QueueToConsumer(SimpleQueue<T> queue, QueueConsumer<T> consumer) { public QueueToConsumer(SimpleQueue<T> queue, QueueConsumer<T> consumer) {
this.queue = queue; this.queue = queue;
this.consumer = consumer; this.consumer = consumer;
queued = queue.size(); this.preAddQueued = new AtomicLong(queue.size());
this.afterAddQueued = new AtomicLong(queue.size());
} }
public synchronized void startQueue() { public synchronized void startQueue() {
@ -34,23 +33,30 @@ class QueueToConsumer<T> implements IQueueToConsumer<T> {
@Override @Override
public void add(T value) { public void add(T value) {
boolean shouldAdd = true; boolean shouldAdd = true;
synchronized (lock) { if (preAddQueued.getAndIncrement() == 0) {
if (queued == 0 && consumer.tryConsume(value)) { boolean crashed = true;
shouldAdd = false; try {
} else { if (consumer.tryConsume(value)) {
queued++; crashed = false;
shouldAdd = false;
preAddQueued.decrementAndGet();
}
} finally {
if (crashed) {
shouldAdd = false;
preAddQueued.decrementAndGet();
}
} }
} }
if (shouldAdd && !closed) { if (shouldAdd && !closed) {
queue.add(value); queue.add(value);
semaphore.release(); afterAddQueued.incrementAndGet();
} }
} }
@Override @Override
public void close() { public void close() {
closed = true; closed = true;
semaphore.release();
} }
private class Manager extends Thread { private class Manager extends Thread {
@ -59,29 +65,26 @@ class QueueToConsumer<T> implements IQueueToConsumer<T> {
public void run() { public void run() {
try { try {
while (!closed) { while (!closed) {
boolean shouldRemove = false;
T element; T element;
synchronized (lock) { boolean shouldRemove = preAddQueued.getAndUpdate(n -> n > 0 ? n - 1 : 0) > 0;
if (queued > 0) { while (!closed) {
queued--; if (afterAddQueued.get() > 0) {
shouldRemove = true; if (!closed && shouldRemove) {
} element = queue.remove();
} long nextDelay = BACKOFF_NS;
semaphore.acquire(); while (!closed && !consumer.tryConsume(element)) {
if (!closed && shouldRemove) { LockSupport.parkNanos(nextDelay);
element = queue.remove(); if (nextDelay + BACKOFF_NS <= MAX_BACKOFF_NS) {
long nextDelay = BACKOFF_NS; nextDelay += BACKOFF_NS;
while (!closed && !consumer.tryConsume(element)) { } else if (nextDelay < MAX_BACKOFF_NS) {
LockSupport.parkNanos(nextDelay); nextDelay = MAX_BACKOFF_NS;
if (nextDelay + BACKOFF_NS <= MAX_BACKOFF_NS) { }
nextDelay += BACKOFF_NS; }
} else if (nextDelay < MAX_BACKOFF_NS) {
nextDelay = MAX_BACKOFF_NS;
} }
} }
} }
} }
} catch (InterruptedException ex) { } finally {
closed = true; closed = true;
} }
} }