From 90b6c62da88729319228a118220761ae16e2f48e Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 10 Nov 2022 02:20:25 +0100 Subject: [PATCH] Fix synchronization --- pom.xml | 6 +++ .../cavallium/filequeue/LMDBEnvManager.java | 4 +- .../cavallium/filequeue/QueueToConsumer.java | 34 +++++++++-------- .../filequeue/TestQueueToConsumer.java | 37 +++++++++++++++++++ 4 files changed, 64 insertions(+), 17 deletions(-) create mode 100644 src/test/java/it/cavallium/filequeue/TestQueueToConsumer.java diff --git a/pom.xml b/pom.xml index 946b8e1..54928c1 100644 --- a/pom.xml +++ b/pom.xml @@ -49,6 +49,12 @@ lmdbjava 0.8.2 + + org.junit.jupiter + junit-jupiter-api + 5.9.0 + test + diff --git a/src/main/java/it/cavallium/filequeue/LMDBEnvManager.java b/src/main/java/it/cavallium/filequeue/LMDBEnvManager.java index 5295b0f..8ee2081 100644 --- a/src/main/java/it/cavallium/filequeue/LMDBEnvManager.java +++ b/src/main/java/it/cavallium/filequeue/LMDBEnvManager.java @@ -50,9 +50,9 @@ public class LMDBEnvManager { private static Env open(File path) { return Env.create(ByteArrayProxy.PROXY_BA) // 128GiB - .setMapSize(128L * 1024 * 1024 * 1024) + .setMapSize(Integer.parseInt(System.getProperty("fq.writemap.size.gb", "32")) * 1024L * 1024 * 1024) .setMaxReaders(1024) .setMaxDbs(1024) - .open(path, EnvFlags.MDB_NOSYNC); + .open(path, EnvFlags.MDB_NOSYNC, EnvFlags.MDB_WRITEMAP, EnvFlags.MDB_NOMETASYNC); } } diff --git a/src/main/java/it/cavallium/filequeue/QueueToConsumer.java b/src/main/java/it/cavallium/filequeue/QueueToConsumer.java index 828e1d5..ff9fbb0 100644 --- a/src/main/java/it/cavallium/filequeue/QueueToConsumer.java +++ b/src/main/java/it/cavallium/filequeue/QueueToConsumer.java @@ -8,6 +8,8 @@ class QueueToConsumer implements IQueueToConsumer { private final long BACKOFF_NS = Duration.ofMillis(2).toNanos(); private final long MAX_BACKOFF_NS = Duration.ofMillis(500).toNanos(); + + private final long HALF_SECOND_NS = Duration.ofMillis(500).toNanos(); private final AtomicLong preAddQueued; private final AtomicLong afterAddQueued; private final SimpleQueue queue; @@ -50,7 +52,10 @@ class QueueToConsumer implements IQueueToConsumer { } if (shouldAdd && !closed) { queue.add(value); - afterAddQueued.incrementAndGet(); + var queueSize = afterAddQueued.incrementAndGet(); + if (queueSize == 1) { + LockSupport.unpark(manager); + } } } @@ -66,22 +71,21 @@ class QueueToConsumer implements IQueueToConsumer { try { while (!closed) { T element; - boolean shouldRemove = preAddQueued.getAndUpdate(n -> n > 0 ? n - 1 : 0) > 0; - while (!closed) { - if (afterAddQueued.get() > 0) { - if (!closed && shouldRemove) { - element = queue.remove(); - long nextDelay = BACKOFF_NS; - while (!closed && !consumer.tryConsume(element)) { - LockSupport.parkNanos(nextDelay); - if (nextDelay + BACKOFF_NS <= MAX_BACKOFF_NS) { - nextDelay += BACKOFF_NS; - } else if (nextDelay < MAX_BACKOFF_NS) { - nextDelay = MAX_BACKOFF_NS; - } - } + boolean shouldRemove = afterAddQueued.getAndUpdate(n -> n > 0 ? n - 1 : 0) > 0; + if (shouldRemove) { + element = queue.remove(); + long nextDelay = BACKOFF_NS; + while (!closed && !consumer.tryConsume(element)) { + LockSupport.parkNanos(nextDelay); + if (nextDelay + BACKOFF_NS <= MAX_BACKOFF_NS) { + nextDelay += BACKOFF_NS; + } else if (nextDelay < MAX_BACKOFF_NS) { + nextDelay = MAX_BACKOFF_NS; } } + preAddQueued.updateAndGet(n -> n > 0 ? n - 1 : 0); + } else { + LockSupport.parkNanos(HALF_SECOND_NS); } } } finally { diff --git a/src/test/java/it/cavallium/filequeue/TestQueueToConsumer.java b/src/test/java/it/cavallium/filequeue/TestQueueToConsumer.java new file mode 100644 index 0000000..5875315 --- /dev/null +++ b/src/test/java/it/cavallium/filequeue/TestQueueToConsumer.java @@ -0,0 +1,37 @@ +package it.cavallium.filequeue; + +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.Test; + +public class TestQueueToConsumer { + + @Test + public void test() { + var q = new SimpleQueueJava(new ConcurrentLinkedDeque<>()); + AtomicBoolean ab = new AtomicBoolean(); + try (var qtc = new QueueToConsumer(q, new QueueConsumer() { + @Override + public boolean tryConsume(String value) { + System.out.println("value:" + value + " thread: " + Thread.currentThread()); + if (ab.get()) { + ab.set(false); + return false; + } else { + ab.set(true); + return true; + } + } + })) { + qtc.startQueue(); + qtc.add("ciao"); + qtc.add("mondo"); + for (int i = 0; i < 1000; i++) { + qtc.add(i + "n"); + } + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } +}