filequeue/src/main/java/it/cavallium/filequeue/QueueToConsumer.java

94 lines
2.1 KiB
Java

package it.cavallium.filequeue;
import java.time.Duration;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.LockSupport;
class QueueToConsumer<T> implements IQueueToConsumer<T> {
private final long BACKOFF_NS = Duration.ofMillis(2).toNanos();
private final long MAX_BACKOFF_NS = Duration.ofMillis(500).toNanos();
private final Object lock = new Object();
private final Semaphore semaphore = new Semaphore(0);
private long queued;
private final SimpleQueue<T> queue;
private final QueueConsumer<T> consumer;
private Manager manager;
private volatile boolean closed;
public QueueToConsumer(SimpleQueue<T> queue, QueueConsumer<T> consumer) {
this.queue = queue;
this.consumer = consumer;
queued = queue.size();
}
public synchronized void startQueue() {
if (manager == null) {
this.manager = new Manager();
manager.setName("queue-manager");
manager.start();
}
}
@Override
public void add(T value) {
boolean shouldAdd = true;
synchronized (lock) {
if (queued == 0 && consumer.tryConsume(value)) {
shouldAdd = false;
} else {
queued++;
}
}
if (shouldAdd && !closed) {
synchronized (queue) {
queue.add(value);
}
semaphore.release();
}
}
@Override
public void close() {
closed = true;
semaphore.release();
}
private class Manager extends Thread {
@Override
public void run() {
try {
while (!closed) {
boolean shouldRemove = false;
T element;
synchronized (lock) {
if (queued > 0) {
queued--;
shouldRemove = true;
}
}
semaphore.acquire();
if (!closed && shouldRemove) {
synchronized (queue) {
element = queue.remove();
}
long nextDelay = BACKOFF_NS;
while (!closed && !consumer.tryConsume(element)) {
LockSupport.parkNanos(nextDelay);
if (nextDelay + BACKOFF_NS <= MAX_BACKOFF_NS) {
nextDelay += BACKOFF_NS;
} else if (nextDelay < MAX_BACKOFF_NS) {
nextDelay = MAX_BACKOFF_NS;
}
}
}
}
} catch (InterruptedException ex) {
closed = true;
}
}
}
}