Avoid opening the same queue twice

This commit is contained in:
Andrea Cavalli 2022-10-29 16:56:57 +02:00
parent b438b82b9d
commit b5e5cc3142
2 changed files with 43 additions and 23 deletions

View File

@ -1,22 +1,13 @@
package it.cavallium.filequeue;
import com.squareup.tape2.QueueFile;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import org.lmdbjava.ByteArrayProxy;
import org.lmdbjava.Dbi;
import org.lmdbjava.DbiFlags;
import org.lmdbjava.Env;
import org.lmdbjava.EnvFlags;
public final class LMDBQueueToConsumer<T> implements IQueueToConsumer<T> {
private final QueueToConsumer<T> queue;
private final Env<byte[]> env;
private final Dbi<byte[]> dbi;
private final SimpleQueueLMDB<T> queueLMDB;
private final QueueToConsumer<T> queue;
public LMDBQueueToConsumer(Path file,
String dbName,
@ -25,27 +16,29 @@ public final class LMDBQueueToConsumer<T> implements IQueueToConsumer<T> {
Deserializer<T> deserializer,
QueueConsumer<T> consumer) {
this.env = LMDBEnvManager.ofPath(file);
this.dbi = env.openDbi(dbName, DbiFlags.MDB_CREATE, DbiFlags.MDB_INTEGERKEY);
this.queue = new QueueToConsumer<>(new SimpleQueueLMDB<>(env, dbi, serializer, deserializer), consumer);
this.queueLMDB = new SimpleQueueLMDB<>(env, dbName, serializer, deserializer);
this.queue = new QueueToConsumer<>(queueLMDB, consumer);
}
@Override
public void add(T value) {
queue.add(value);
synchronized (queue) {
queue.add(value);
}
}
@Override
public void close() {
queue.close();
try {
dbi.close();
} catch (Exception e) {
throw new RuntimeException(e);
synchronized (queue) {
queue.close();
queueLMDB.close();
}
}
@Override
public void startQueue() {
queue.startQueue();
synchronized (queue) {
queue.startQueue();
}
}
}

View File

@ -1,15 +1,23 @@
package it.cavallium.filequeue;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.lmdbjava.Dbi;
import org.lmdbjava.DbiFlags;
import org.lmdbjava.Env;
import org.lmdbjava.Txn;
public class SimpleQueueLMDB<T> implements SimpleQueue<T> {
public class SimpleQueueLMDB<T> implements SimpleQueue<T>, Closeable {
private static final Set<String> REGISTRY = new HashSet<>();
private final Env<byte[]> env;
private final String name;
private final Dbi<byte[]> dbi;
private final Serializer<T> serializer;
private final Deserializer<T> deserializer;
@ -23,13 +31,19 @@ public class SimpleQueueLMDB<T> implements SimpleQueue<T> {
private int lastIndex;
public SimpleQueueLMDB(Env<byte[]> env,
Dbi<byte[]> dbi,
String name,
Serializer<T> serializer,
Deserializer<T> deserializer) {
this.env = env;
this.dbi = dbi;
this.name = name;
this.serializer = serializer;
this.deserializer = deserializer;
synchronized (REGISTRY) {
if (!REGISTRY.add(name)) {
throw new IllegalArgumentException("Database already registered: " + name);
}
}
this.dbi = env.openDbi(name, DbiFlags.MDB_CREATE, DbiFlags.MDB_INTEGERKEY);
}
@Override
@ -77,4 +91,17 @@ public class SimpleQueueLMDB<T> implements SimpleQueue<T> {
public int size() {
return 0;
}
@Override
public void close() {
try {
dbi.close();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
synchronized (REGISTRY) {
REGISTRY.remove(name);
}
}
}
}