Fix synchronization
This commit is contained in:
parent
858d1f86a6
commit
90b6c62da8
6
pom.xml
6
pom.xml
@ -49,6 +49,12 @@
|
|||||||
<artifactId>lmdbjava</artifactId>
|
<artifactId>lmdbjava</artifactId>
|
||||||
<version>0.8.2</version>
|
<version>0.8.2</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-api</artifactId>
|
||||||
|
<version>5.9.0</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
</project>
|
</project>
|
||||||
|
@ -50,9 +50,9 @@ public class LMDBEnvManager {
|
|||||||
private static Env<byte[]> open(File path) {
|
private static Env<byte[]> open(File path) {
|
||||||
return Env.create(ByteArrayProxy.PROXY_BA)
|
return Env.create(ByteArrayProxy.PROXY_BA)
|
||||||
// 128GiB
|
// 128GiB
|
||||||
.setMapSize(128L * 1024 * 1024 * 1024)
|
.setMapSize(Integer.parseInt(System.getProperty("fq.writemap.size.gb", "32")) * 1024L * 1024 * 1024)
|
||||||
.setMaxReaders(1024)
|
.setMaxReaders(1024)
|
||||||
.setMaxDbs(1024)
|
.setMaxDbs(1024)
|
||||||
.open(path, EnvFlags.MDB_NOSYNC);
|
.open(path, EnvFlags.MDB_NOSYNC, EnvFlags.MDB_WRITEMAP, EnvFlags.MDB_NOMETASYNC);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,8 @@ class QueueToConsumer<T> implements IQueueToConsumer<T> {
|
|||||||
|
|
||||||
private final long BACKOFF_NS = Duration.ofMillis(2).toNanos();
|
private final long BACKOFF_NS = Duration.ofMillis(2).toNanos();
|
||||||
private final long MAX_BACKOFF_NS = Duration.ofMillis(500).toNanos();
|
private final long MAX_BACKOFF_NS = Duration.ofMillis(500).toNanos();
|
||||||
|
|
||||||
|
private final long HALF_SECOND_NS = Duration.ofMillis(500).toNanos();
|
||||||
private final AtomicLong preAddQueued;
|
private final AtomicLong preAddQueued;
|
||||||
private final AtomicLong afterAddQueued;
|
private final AtomicLong afterAddQueued;
|
||||||
private final SimpleQueue<T> queue;
|
private final SimpleQueue<T> queue;
|
||||||
@ -50,7 +52,10 @@ class QueueToConsumer<T> implements IQueueToConsumer<T> {
|
|||||||
}
|
}
|
||||||
if (shouldAdd && !closed) {
|
if (shouldAdd && !closed) {
|
||||||
queue.add(value);
|
queue.add(value);
|
||||||
afterAddQueued.incrementAndGet();
|
var queueSize = afterAddQueued.incrementAndGet();
|
||||||
|
if (queueSize == 1) {
|
||||||
|
LockSupport.unpark(manager);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -66,22 +71,21 @@ class QueueToConsumer<T> implements IQueueToConsumer<T> {
|
|||||||
try {
|
try {
|
||||||
while (!closed) {
|
while (!closed) {
|
||||||
T element;
|
T element;
|
||||||
boolean shouldRemove = preAddQueued.getAndUpdate(n -> n > 0 ? n - 1 : 0) > 0;
|
boolean shouldRemove = afterAddQueued.getAndUpdate(n -> n > 0 ? n - 1 : 0) > 0;
|
||||||
while (!closed) {
|
if (shouldRemove) {
|
||||||
if (afterAddQueued.get() > 0) {
|
element = queue.remove();
|
||||||
if (!closed && shouldRemove) {
|
long nextDelay = BACKOFF_NS;
|
||||||
element = queue.remove();
|
while (!closed && !consumer.tryConsume(element)) {
|
||||||
long nextDelay = BACKOFF_NS;
|
LockSupport.parkNanos(nextDelay);
|
||||||
while (!closed && !consumer.tryConsume(element)) {
|
if (nextDelay + BACKOFF_NS <= MAX_BACKOFF_NS) {
|
||||||
LockSupport.parkNanos(nextDelay);
|
nextDelay += BACKOFF_NS;
|
||||||
if (nextDelay + BACKOFF_NS <= MAX_BACKOFF_NS) {
|
} else if (nextDelay < MAX_BACKOFF_NS) {
|
||||||
nextDelay += BACKOFF_NS;
|
nextDelay = MAX_BACKOFF_NS;
|
||||||
} else if (nextDelay < MAX_BACKOFF_NS) {
|
|
||||||
nextDelay = MAX_BACKOFF_NS;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
preAddQueued.updateAndGet(n -> n > 0 ? n - 1 : 0);
|
||||||
|
} else {
|
||||||
|
LockSupport.parkNanos(HALF_SECOND_NS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -0,0 +1,37 @@
|
|||||||
|
package it.cavallium.filequeue;
|
||||||
|
|
||||||
|
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
public class TestQueueToConsumer {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() {
|
||||||
|
var q = new SimpleQueueJava<String>(new ConcurrentLinkedDeque<>());
|
||||||
|
AtomicBoolean ab = new AtomicBoolean();
|
||||||
|
try (var qtc = new QueueToConsumer<String>(q, new QueueConsumer<String>() {
|
||||||
|
@Override
|
||||||
|
public boolean tryConsume(String value) {
|
||||||
|
System.out.println("value:" + value + " thread: " + Thread.currentThread());
|
||||||
|
if (ab.get()) {
|
||||||
|
ab.set(false);
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
ab.set(true);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})) {
|
||||||
|
qtc.startQueue();
|
||||||
|
qtc.add("ciao");
|
||||||
|
qtc.add("mondo");
|
||||||
|
for (int i = 0; i < 1000; i++) {
|
||||||
|
qtc.add(i + "n");
|
||||||
|
}
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user