Add LMDB database

This commit is contained in:
Andrea Cavalli 2022-10-11 00:28:28 +02:00
parent 3a48d63b11
commit b438b82b9d
4 changed files with 195 additions and 1 deletions

View File

@ -4,7 +4,7 @@
<groupId>it.cavallium</groupId>
<artifactId>filequeue</artifactId>
<name>file queue project</name>
<version>3.0.2</version>
<version>3.1.2</version>
<packaging>jar</packaging>
<description>Light weight, high performance, simple, reliable and persistent queue</description>
<modelVersion>4.0.0</modelVersion>
@ -44,6 +44,11 @@
<artifactId>tape</artifactId>
<version>2.0.0-beta1</version>
</dependency>
<dependency>
<groupId>org.lmdbjava</groupId>
<artifactId>lmdbjava</artifactId>
<version>0.8.2</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,58 @@
package it.cavallium.filequeue;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import org.lmdbjava.ByteArrayProxy;
import org.lmdbjava.Env;
import org.lmdbjava.EnvFlags;
public class LMDBEnvManager {
private static final Map<Path, Env<byte[]>> ENVS = new HashMap<>();
public synchronized static Env<byte[]> ofPath(Path path) {
return ENVS.compute(path, (p, env) -> {
if (env != null) {
if (env.isClosed()) {
env = null;
}
}
if (env != null) {
return env;
}
if (Files.notExists(path)) {
try {
Files.createDirectories(path);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
try {
return open(path.toFile());
} catch (Throwable ex) {
try {
Files.walk(path)
.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(File::delete);
} catch (IOException e) {
throw new RuntimeException(e);
}
return open(path.toFile());
}
});
}
private static Env<byte[]> open(File path) {
return Env.create(ByteArrayProxy.PROXY_BA)
// 16GiB
.setMapSize(16L * 1024 * 1024 * 1024)
.setMaxReaders(1024)
.setMaxDbs(1024)
.open(path, EnvFlags.MDB_FIXEDMAP, EnvFlags.MDB_NOSYNC, EnvFlags.MDB_WRITEMAP);
}
}

View File

@ -0,0 +1,51 @@
package it.cavallium.filequeue;
import com.squareup.tape2.QueueFile;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import org.lmdbjava.ByteArrayProxy;
import org.lmdbjava.Dbi;
import org.lmdbjava.DbiFlags;
import org.lmdbjava.Env;
import org.lmdbjava.EnvFlags;
public final class LMDBQueueToConsumer<T> implements IQueueToConsumer<T> {
private final QueueToConsumer<T> queue;
private final Env<byte[]> env;
private final Dbi<byte[]> dbi;
public LMDBQueueToConsumer(Path file,
String dbName,
boolean clear,
Serializer<T> serializer,
Deserializer<T> deserializer,
QueueConsumer<T> consumer) {
this.env = LMDBEnvManager.ofPath(file);
this.dbi = env.openDbi(dbName, DbiFlags.MDB_CREATE, DbiFlags.MDB_INTEGERKEY);
this.queue = new QueueToConsumer<>(new SimpleQueueLMDB<>(env, dbi, serializer, deserializer), consumer);
}
@Override
public void add(T value) {
queue.add(value);
}
@Override
public void close() {
queue.close();
try {
dbi.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void startQueue() {
queue.startQueue();
}
}

View File

@ -0,0 +1,80 @@
package it.cavallium.filequeue;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import org.lmdbjava.Dbi;
import org.lmdbjava.Env;
import org.lmdbjava.Txn;
public class SimpleQueueLMDB<T> implements SimpleQueue<T> {
private final Env<byte[]> env;
private final Dbi<byte[]> dbi;
private final Serializer<T> serializer;
private final Deserializer<T> deserializer;
/**
* The First index.
*/
private int firstIndex;
/**
* The Last index.
*/
private int lastIndex;
public SimpleQueueLMDB(Env<byte[]> env,
Dbi<byte[]> dbi,
Serializer<T> serializer,
Deserializer<T> deserializer) {
this.env = env;
this.dbi = dbi;
this.serializer = serializer;
this.deserializer = deserializer;
}
@Override
public void add(T element) {
int last = lastIndex++;
try {
dbi.put(serializeKey(last), serializer.serialize(element));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public T remove() {
byte[] byteBuffer;
try (Txn<byte[]> txnWrite = env.txnWrite()) {
var bb = serializeKey(firstIndex);
byteBuffer = dbi.get(txnWrite, bb);
if (byteBuffer == null) {
txnWrite.commit();
return null;
}
boolean isDel = dbi.delete(txnWrite, bb);
firstIndex++;
txnWrite.commit();
}
return readObject(byteBuffer);
}
private T readObject(byte[] byteBuffer) {
try {
return deserializer.deserialize(byteBuffer);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
private static byte[] serializeKey(int value) {
ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
buffer.putInt(value);
return buffer.array();
}
@Override
public int size() {
return 0;
}
}