diff --git a/pom.xml b/pom.xml index fe640e5..df4aeb9 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ it.cavallium filequeue file queue project - 3.0.2 + 3.1.2 jar Light weight, high performance, simple, reliable and persistent queue 4.0.0 @@ -44,6 +44,11 @@ tape 2.0.0-beta1 + + org.lmdbjava + lmdbjava + 0.8.2 + diff --git a/src/main/java/it/cavallium/filequeue/LMDBEnvManager.java b/src/main/java/it/cavallium/filequeue/LMDBEnvManager.java new file mode 100644 index 0000000..85a62c0 --- /dev/null +++ b/src/main/java/it/cavallium/filequeue/LMDBEnvManager.java @@ -0,0 +1,58 @@ +package it.cavallium.filequeue; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import org.lmdbjava.ByteArrayProxy; +import org.lmdbjava.Env; +import org.lmdbjava.EnvFlags; + +public class LMDBEnvManager { + private static final Map> ENVS = new HashMap<>(); + + public synchronized static Env ofPath(Path path) { + return ENVS.compute(path, (p, env) -> { + if (env != null) { + if (env.isClosed()) { + env = null; + } + } + if (env != null) { + return env; + } + if (Files.notExists(path)) { + try { + Files.createDirectories(path); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + try { + return open(path.toFile()); + } catch (Throwable ex) { + try { + Files.walk(path) + .sorted(Comparator.reverseOrder()) + .map(Path::toFile) + .forEach(File::delete); + } catch (IOException e) { + throw new RuntimeException(e); + } + return open(path.toFile()); + } + }); + } + + private static Env open(File path) { + return Env.create(ByteArrayProxy.PROXY_BA) + // 16GiB + .setMapSize(16L * 1024 * 1024 * 1024) + .setMaxReaders(1024) + .setMaxDbs(1024) + .open(path, EnvFlags.MDB_FIXEDMAP, EnvFlags.MDB_NOSYNC, EnvFlags.MDB_WRITEMAP); + } +} diff --git a/src/main/java/it/cavallium/filequeue/LMDBQueueToConsumer.java b/src/main/java/it/cavallium/filequeue/LMDBQueueToConsumer.java new file mode 100644 index 0000000..b4d6894 --- /dev/null +++ b/src/main/java/it/cavallium/filequeue/LMDBQueueToConsumer.java @@ -0,0 +1,51 @@ +package it.cavallium.filequeue; + +import com.squareup.tape2.QueueFile; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import org.lmdbjava.ByteArrayProxy; +import org.lmdbjava.Dbi; +import org.lmdbjava.DbiFlags; +import org.lmdbjava.Env; +import org.lmdbjava.EnvFlags; + +public final class LMDBQueueToConsumer implements IQueueToConsumer { + + private final QueueToConsumer queue; + private final Env env; + private final Dbi dbi; + + public LMDBQueueToConsumer(Path file, + String dbName, + boolean clear, + Serializer serializer, + Deserializer deserializer, + QueueConsumer consumer) { + this.env = LMDBEnvManager.ofPath(file); + this.dbi = env.openDbi(dbName, DbiFlags.MDB_CREATE, DbiFlags.MDB_INTEGERKEY); + this.queue = new QueueToConsumer<>(new SimpleQueueLMDB<>(env, dbi, serializer, deserializer), consumer); + } + + @Override + public void add(T value) { + queue.add(value); + } + + @Override + public void close() { + queue.close(); + try { + dbi.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void startQueue() { + queue.startQueue(); + } +} diff --git a/src/main/java/it/cavallium/filequeue/SimpleQueueLMDB.java b/src/main/java/it/cavallium/filequeue/SimpleQueueLMDB.java new file mode 100644 index 0000000..fff390b --- /dev/null +++ b/src/main/java/it/cavallium/filequeue/SimpleQueueLMDB.java @@ -0,0 +1,80 @@ +package it.cavallium.filequeue; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import org.lmdbjava.Dbi; +import org.lmdbjava.Env; +import org.lmdbjava.Txn; + +public class SimpleQueueLMDB implements SimpleQueue { + + private final Env env; + private final Dbi dbi; + private final Serializer serializer; + private final Deserializer deserializer; + /** + * The First index. + */ + private int firstIndex; + /** + * The Last index. + */ + private int lastIndex; + + public SimpleQueueLMDB(Env env, + Dbi dbi, + Serializer serializer, + Deserializer deserializer) { + this.env = env; + this.dbi = dbi; + this.serializer = serializer; + this.deserializer = deserializer; + } + + @Override + public void add(T element) { + int last = lastIndex++; + try { + dbi.put(serializeKey(last), serializer.serialize(element)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public T remove() { + byte[] byteBuffer; + try (Txn txnWrite = env.txnWrite()) { + var bb = serializeKey(firstIndex); + byteBuffer = dbi.get(txnWrite, bb); + if (byteBuffer == null) { + txnWrite.commit(); + return null; + } + boolean isDel = dbi.delete(txnWrite, bb); + firstIndex++; + txnWrite.commit(); + } + return readObject(byteBuffer); + } + + private T readObject(byte[] byteBuffer) { + try { + return deserializer.deserialize(byteBuffer); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private static byte[] serializeKey(int value) { + ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES); + buffer.putInt(value); + return buffer.array(); + } + + @Override + public int size() { + return 0; + } +}