From b5e5cc314200525264c5856355a369fa7a04ab7d Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 29 Oct 2022 16:56:57 +0200 Subject: [PATCH] Avoid opening the same queue twice --- .../filequeue/LMDBQueueToConsumer.java | 33 ++++++++----------- .../cavallium/filequeue/SimpleQueueLMDB.java | 33 +++++++++++++++++-- 2 files changed, 43 insertions(+), 23 deletions(-) diff --git a/src/main/java/it/cavallium/filequeue/LMDBQueueToConsumer.java b/src/main/java/it/cavallium/filequeue/LMDBQueueToConsumer.java index b4d6894..7d5843d 100644 --- a/src/main/java/it/cavallium/filequeue/LMDBQueueToConsumer.java +++ b/src/main/java/it/cavallium/filequeue/LMDBQueueToConsumer.java @@ -1,22 +1,13 @@ package it.cavallium.filequeue; -import com.squareup.tape2.QueueFile; -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.file.Files; import java.nio.file.Path; -import org.lmdbjava.ByteArrayProxy; -import org.lmdbjava.Dbi; -import org.lmdbjava.DbiFlags; import org.lmdbjava.Env; -import org.lmdbjava.EnvFlags; public final class LMDBQueueToConsumer implements IQueueToConsumer { - private final QueueToConsumer queue; private final Env env; - private final Dbi dbi; + private final SimpleQueueLMDB queueLMDB; + private final QueueToConsumer queue; public LMDBQueueToConsumer(Path file, String dbName, @@ -25,27 +16,29 @@ public final class LMDBQueueToConsumer implements IQueueToConsumer { Deserializer deserializer, QueueConsumer consumer) { this.env = LMDBEnvManager.ofPath(file); - this.dbi = env.openDbi(dbName, DbiFlags.MDB_CREATE, DbiFlags.MDB_INTEGERKEY); - this.queue = new QueueToConsumer<>(new SimpleQueueLMDB<>(env, dbi, serializer, deserializer), consumer); + this.queueLMDB = new SimpleQueueLMDB<>(env, dbName, serializer, deserializer); + this.queue = new QueueToConsumer<>(queueLMDB, consumer); } @Override public void add(T value) { - queue.add(value); + synchronized (queue) { + queue.add(value); + } } @Override public void close() { - queue.close(); - try { - dbi.close(); - } catch (Exception e) { - throw new RuntimeException(e); + synchronized (queue) { + queue.close(); + queueLMDB.close(); } } @Override public void startQueue() { - queue.startQueue(); + synchronized (queue) { + queue.startQueue(); + } } } diff --git a/src/main/java/it/cavallium/filequeue/SimpleQueueLMDB.java b/src/main/java/it/cavallium/filequeue/SimpleQueueLMDB.java index fff390b..bb8cfa5 100644 --- a/src/main/java/it/cavallium/filequeue/SimpleQueueLMDB.java +++ b/src/main/java/it/cavallium/filequeue/SimpleQueueLMDB.java @@ -1,15 +1,23 @@ package it.cavallium.filequeue; +import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.lmdbjava.Dbi; +import org.lmdbjava.DbiFlags; import org.lmdbjava.Env; import org.lmdbjava.Txn; -public class SimpleQueueLMDB implements SimpleQueue { +public class SimpleQueueLMDB implements SimpleQueue, Closeable { + private static final Set REGISTRY = new HashSet<>(); private final Env env; + private final String name; private final Dbi dbi; private final Serializer serializer; private final Deserializer deserializer; @@ -23,13 +31,19 @@ public class SimpleQueueLMDB implements SimpleQueue { private int lastIndex; public SimpleQueueLMDB(Env env, - Dbi dbi, + String name, Serializer serializer, Deserializer deserializer) { this.env = env; - this.dbi = dbi; + this.name = name; this.serializer = serializer; this.deserializer = deserializer; + synchronized (REGISTRY) { + if (!REGISTRY.add(name)) { + throw new IllegalArgumentException("Database already registered: " + name); + } + } + this.dbi = env.openDbi(name, DbiFlags.MDB_CREATE, DbiFlags.MDB_INTEGERKEY); } @Override @@ -77,4 +91,17 @@ public class SimpleQueueLMDB implements SimpleQueue { public int size() { return 0; } + + @Override + public void close() { + try { + dbi.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + synchronized (REGISTRY) { + REGISTRY.remove(name); + } + } + } }