2021-10-13 00:23:56 +02:00
|
|
|
package it.cavallium.dbengine.lucene;
|
|
|
|
|
|
|
|
import static org.lmdbjava.DbiFlags.*;
|
|
|
|
|
|
|
|
import io.net5.buffer.ByteBuf;
|
|
|
|
import io.net5.buffer.PooledByteBufAllocator;
|
2021-10-14 15:55:58 +02:00
|
|
|
import it.cavallium.dbengine.database.LLUtils;
|
2021-10-13 00:23:56 +02:00
|
|
|
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
|
|
|
|
import java.io.IOException;
|
|
|
|
import java.util.Iterator;
|
|
|
|
import java.util.Objects;
|
2021-10-14 15:55:58 +02:00
|
|
|
import java.util.StringJoiner;
|
2021-10-13 00:23:56 +02:00
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
import org.jetbrains.annotations.NotNull;
|
|
|
|
import org.lmdbjava.Cursor;
|
|
|
|
import org.lmdbjava.CursorIterable;
|
|
|
|
import org.lmdbjava.CursorIterable.KeyVal;
|
|
|
|
import org.lmdbjava.Dbi;
|
|
|
|
import org.lmdbjava.Env;
|
2021-10-14 15:55:58 +02:00
|
|
|
import org.lmdbjava.GetOp;
|
2021-10-13 00:23:56 +02:00
|
|
|
import org.lmdbjava.Txn;
|
|
|
|
import reactor.core.publisher.Flux;
|
|
|
|
import reactor.util.function.Tuple2;
|
|
|
|
import reactor.util.function.Tuple3;
|
|
|
|
import reactor.util.function.Tuples;
|
|
|
|
|
2021-10-15 22:03:53 +02:00
|
|
|
public class LMDBPriorityQueue<T> implements PriorityQueue<T>, Reversable<ReversableResourceIterable<T>>, ReversableResourceIterable<T> {
|
2021-10-13 00:23:56 +02:00
|
|
|
|
|
|
|
private static final boolean FORCE_SYNC = false;
|
|
|
|
private static final boolean FORCE_THREAD_LOCAL = true;
|
|
|
|
|
|
|
|
private static final AtomicLong NEXT_LMDB_QUEUE_ID = new AtomicLong(0);
|
2021-10-14 15:55:58 +02:00
|
|
|
private static final AtomicLong NEXT_ITEM_UID = new AtomicLong(0);
|
2021-10-13 00:23:56 +02:00
|
|
|
|
|
|
|
private final AtomicBoolean closed = new AtomicBoolean();
|
2021-10-14 23:04:34 +02:00
|
|
|
private final LMDBSortedCodec<T> codec;
|
2021-10-13 00:23:56 +02:00
|
|
|
private final Env<ByteBuf> env;
|
|
|
|
private final Dbi<ByteBuf> lmdb;
|
|
|
|
|
|
|
|
private boolean writing;
|
|
|
|
private boolean iterating;
|
|
|
|
private Txn<ByteBuf> readTxn;
|
|
|
|
private Txn<ByteBuf> rwTxn;
|
|
|
|
private Cursor<ByteBuf> cur;
|
|
|
|
|
|
|
|
private boolean topValid = true;
|
|
|
|
private T top = null;
|
|
|
|
private long size = 0;
|
|
|
|
|
2021-10-14 23:04:34 +02:00
|
|
|
public LMDBPriorityQueue(LLTempLMDBEnv env, LMDBSortedCodec<T> codec) {
|
2021-10-13 00:23:56 +02:00
|
|
|
var name = "$queue_" + NEXT_LMDB_QUEUE_ID.getAndIncrement();
|
|
|
|
this.codec = codec;
|
2021-10-16 14:59:38 +02:00
|
|
|
this.env = env.getEnv();
|
2021-10-14 15:55:58 +02:00
|
|
|
this.lmdb = this.env.openDbi(name, codec::compareDirect, MDB_CREATE, MDB_DUPSORT, MDB_DUPFIXED);
|
2021-10-13 00:23:56 +02:00
|
|
|
|
|
|
|
this.writing = true;
|
|
|
|
this.iterating = false;
|
|
|
|
if (FORCE_THREAD_LOCAL) {
|
|
|
|
this.rwTxn = null;
|
|
|
|
} else {
|
|
|
|
this.rwTxn = this.env.txnWrite();
|
|
|
|
}
|
|
|
|
this.readTxn = null;
|
|
|
|
this.cur = null;
|
|
|
|
}
|
|
|
|
|
2021-10-14 23:04:34 +02:00
|
|
|
public LMDBSortedCodec<T> getCodec() {
|
|
|
|
return codec;
|
|
|
|
}
|
|
|
|
|
2021-10-13 00:23:56 +02:00
|
|
|
private ByteBuf allocate(int size) {
|
|
|
|
return PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
|
|
|
|
}
|
|
|
|
|
|
|
|
private void switchToMode(boolean write, boolean wantCursor) {
|
|
|
|
if (iterating) {
|
|
|
|
throw new IllegalStateException("Tried to " + (write ? "write" : "read") + " while still iterating");
|
|
|
|
}
|
|
|
|
boolean changedMode = false;
|
|
|
|
if (write) {
|
|
|
|
if (!writing) {
|
|
|
|
changedMode = true;
|
|
|
|
writing = true;
|
|
|
|
if (cur != null) {
|
|
|
|
cur.close();
|
|
|
|
cur = null;
|
|
|
|
}
|
|
|
|
readTxn.close();
|
|
|
|
readTxn = null;
|
|
|
|
assert rwTxn == null;
|
|
|
|
rwTxn = env.txnWrite();
|
|
|
|
} else if (rwTxn == null) {
|
|
|
|
assert readTxn == null;
|
|
|
|
rwTxn = env.txnWrite();
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
if (writing) {
|
|
|
|
changedMode = true;
|
|
|
|
writing = false;
|
|
|
|
if (cur != null) {
|
|
|
|
cur.close();
|
|
|
|
cur = null;
|
|
|
|
}
|
|
|
|
if (rwTxn != null) {
|
|
|
|
rwTxn.commit();
|
|
|
|
rwTxn.close();
|
|
|
|
rwTxn = null;
|
|
|
|
}
|
|
|
|
if (FORCE_SYNC) {
|
|
|
|
env.sync(true);
|
|
|
|
}
|
|
|
|
assert rwTxn == null;
|
|
|
|
assert readTxn == null;
|
|
|
|
readTxn = env.txnRead();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (cur == null) {
|
|
|
|
if (wantCursor) {
|
|
|
|
cur = lmdb.openCursor(Objects.requireNonNull(writing ? rwTxn : readTxn));
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
if (changedMode) {
|
|
|
|
cur.close();
|
|
|
|
cur = null;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private void endMode() {
|
|
|
|
if (FORCE_THREAD_LOCAL) {
|
|
|
|
if (cur != null) {
|
|
|
|
cur.close();
|
|
|
|
cur = null;
|
|
|
|
}
|
|
|
|
writing = true;
|
|
|
|
if (readTxn != null) {
|
|
|
|
readTxn.commit();
|
|
|
|
readTxn.close();
|
|
|
|
readTxn = null;
|
|
|
|
}
|
|
|
|
if (rwTxn != null) {
|
|
|
|
rwTxn.commit();
|
|
|
|
rwTxn.close();
|
|
|
|
rwTxn = null;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
assert cur == null;
|
|
|
|
assert rwTxn == null;
|
|
|
|
assert readTxn == null;
|
|
|
|
}
|
|
|
|
|
|
|
|
private static void ensureThread() {
|
2021-10-14 15:55:58 +02:00
|
|
|
LLUtils.ensureBlocking();
|
2021-10-13 00:23:56 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
private static void ensureItThread() {
|
2021-10-14 15:55:58 +02:00
|
|
|
ensureThread();
|
|
|
|
//if (!(Thread.currentThread() instanceof LMDBThread)) {
|
|
|
|
// throw new IllegalStateException("Must run in LMDB scheduler");
|
|
|
|
//}
|
2021-10-13 00:23:56 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void add(T element) {
|
|
|
|
ensureThread();
|
|
|
|
switchToMode(true, false);
|
|
|
|
var buf = codec.serialize(this::allocate, element);
|
2021-10-14 15:55:58 +02:00
|
|
|
var uid = allocate(Long.BYTES);
|
|
|
|
uid.writeLong(NEXT_ITEM_UID.getAndIncrement());
|
2021-10-13 00:23:56 +02:00
|
|
|
try {
|
2021-10-14 15:55:58 +02:00
|
|
|
if (lmdb.put(rwTxn, buf, uid)) {
|
2021-10-13 00:23:56 +02:00
|
|
|
if (++size == 1) {
|
|
|
|
topValid = true;
|
|
|
|
top = element;
|
|
|
|
} else {
|
|
|
|
topValid = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} finally {
|
|
|
|
endMode();
|
2021-10-15 22:03:53 +02:00
|
|
|
buf.release();
|
|
|
|
uid.release();
|
2021-10-13 00:23:56 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
assert topSingleValid(element);
|
|
|
|
}
|
|
|
|
|
|
|
|
private boolean topSingleValid(T element) {
|
|
|
|
if (size == 1) {
|
|
|
|
var top = databaseTop();
|
|
|
|
return codec.compare(top, element) == 0;
|
|
|
|
} else {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public T top() {
|
|
|
|
ensureThread();
|
|
|
|
if (topValid) {
|
|
|
|
return top;
|
|
|
|
} else {
|
|
|
|
var top = databaseTop();
|
|
|
|
this.top = top;
|
|
|
|
topValid = true;
|
|
|
|
return top;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private T databaseTop() {
|
|
|
|
ensureThread();
|
|
|
|
switchToMode(false, true);
|
|
|
|
try {
|
|
|
|
if (cur.first()) {
|
|
|
|
return codec.deserialize(cur.key());
|
|
|
|
} else {
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
} finally {
|
|
|
|
endMode();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public T pop() {
|
|
|
|
ensureThread();
|
|
|
|
switchToMode(true, true);
|
|
|
|
try {
|
|
|
|
if (cur.first()) {
|
|
|
|
var data = codec.deserialize(cur.key());
|
|
|
|
if (--size == 0) {
|
|
|
|
topValid = true;
|
|
|
|
top = null;
|
|
|
|
} else {
|
|
|
|
topValid = false;
|
2021-10-14 15:55:58 +02:00
|
|
|
top = null;
|
2021-10-13 00:23:56 +02:00
|
|
|
}
|
|
|
|
cur.delete();
|
|
|
|
return data;
|
|
|
|
} else {
|
2021-10-14 15:55:58 +02:00
|
|
|
topValid = true;
|
|
|
|
top = null;
|
2021-10-13 00:23:56 +02:00
|
|
|
return null;
|
|
|
|
}
|
|
|
|
} finally {
|
|
|
|
endMode();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-10-14 15:55:58 +02:00
|
|
|
public void replaceTop(T newTop) {
|
2021-10-13 00:23:56 +02:00
|
|
|
ensureThread();
|
2021-10-14 15:55:58 +02:00
|
|
|
this.pop();
|
|
|
|
this.add(newTop);
|
2021-10-13 00:23:56 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public long size() {
|
|
|
|
ensureThread();
|
|
|
|
return size;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void clear() {
|
|
|
|
ensureThread();
|
|
|
|
switchToMode(true, false);
|
|
|
|
try {
|
|
|
|
lmdb.drop(rwTxn);
|
|
|
|
topValid = true;
|
|
|
|
top = null;
|
|
|
|
size = 0;
|
|
|
|
} finally {
|
|
|
|
endMode();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public boolean remove(@NotNull T element) {
|
|
|
|
ensureThread();
|
|
|
|
Objects.requireNonNull(element);
|
2021-10-14 15:55:58 +02:00
|
|
|
switchToMode(true, true);
|
2021-10-13 00:23:56 +02:00
|
|
|
var buf = codec.serialize(this::allocate, element);
|
|
|
|
try {
|
2021-10-14 15:55:58 +02:00
|
|
|
var deletable = cur.get(buf, GetOp.MDB_SET);
|
|
|
|
if (deletable) {
|
|
|
|
cur.delete();
|
2021-10-13 00:23:56 +02:00
|
|
|
if (topValid && codec.compare(top, element) == 0) {
|
|
|
|
if (--size == 0) {
|
|
|
|
top = null;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
if (--size == 0) {
|
|
|
|
topValid = true;
|
|
|
|
top = null;
|
|
|
|
} else {
|
|
|
|
topValid = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-10-14 15:55:58 +02:00
|
|
|
return deletable;
|
2021-10-13 00:23:56 +02:00
|
|
|
} finally {
|
|
|
|
endMode();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-15 22:03:53 +02:00
|
|
|
public Flux<T> reverseIterate() {
|
|
|
|
return Flux
|
|
|
|
.generate(() -> {
|
|
|
|
ensureItThread();
|
|
|
|
switchToMode(false, true);
|
|
|
|
iterating = true;
|
|
|
|
return true;
|
|
|
|
}, (isLastKey, sink) -> {
|
|
|
|
try {
|
|
|
|
ensureItThread();
|
|
|
|
boolean found;
|
|
|
|
if (isLastKey) {
|
|
|
|
found = cur.last();
|
|
|
|
} else {
|
|
|
|
found = cur.prev();
|
|
|
|
}
|
|
|
|
if (found) {
|
|
|
|
sink.next(codec.deserialize(cur.key()));
|
|
|
|
} else {
|
|
|
|
sink.complete();
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
} catch (Throwable ex) {
|
|
|
|
sink.error(ex);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}, t -> {
|
|
|
|
ensureItThread();
|
|
|
|
iterating = false;
|
|
|
|
endMode();
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2021-10-13 00:23:56 +02:00
|
|
|
@Override
|
|
|
|
public Flux<T> iterate() {
|
|
|
|
return Flux
|
|
|
|
.<T, Tuple2<CursorIterable<ByteBuf>, Iterator<KeyVal<ByteBuf>>>>generate(() -> {
|
|
|
|
ensureItThread();
|
|
|
|
switchToMode(false, false);
|
|
|
|
iterating = true;
|
|
|
|
if (cur != null) {
|
|
|
|
cur.close();
|
|
|
|
cur = null;
|
|
|
|
}
|
|
|
|
CursorIterable<ByteBuf> cit = lmdb.iterate(readTxn);
|
|
|
|
var it = cit.iterator();
|
|
|
|
return Tuples.of(cit, it);
|
|
|
|
}, (t, sink) -> {
|
2021-10-14 15:55:58 +02:00
|
|
|
try {
|
|
|
|
ensureItThread();
|
|
|
|
var it = t.getT2();
|
|
|
|
if (it.hasNext()) {
|
|
|
|
sink.next(codec.deserialize(it.next().key()));
|
|
|
|
} else {
|
|
|
|
sink.complete();
|
|
|
|
}
|
|
|
|
return t;
|
|
|
|
} catch (Throwable ex) {
|
|
|
|
sink.error(ex);
|
|
|
|
return t;
|
2021-10-13 00:23:56 +02:00
|
|
|
}
|
|
|
|
}, t -> {
|
|
|
|
ensureItThread();
|
|
|
|
var cit = t.getT1();
|
|
|
|
cit.close();
|
|
|
|
iterating = false;
|
|
|
|
endMode();
|
2021-10-14 15:55:58 +02:00
|
|
|
});
|
2021-10-13 00:23:56 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Flux<T> iterate(long skips) {
|
|
|
|
return Flux
|
|
|
|
.<T, Tuple3<CursorIterable<ByteBuf>, Iterator<KeyVal<ByteBuf>>, Long>>generate(() -> {
|
|
|
|
ensureItThread();
|
|
|
|
switchToMode(false, false);
|
|
|
|
iterating = true;
|
|
|
|
if (cur != null) {
|
|
|
|
cur.close();
|
|
|
|
cur = null;
|
|
|
|
}
|
|
|
|
CursorIterable<ByteBuf> cit = lmdb.iterate(readTxn);
|
|
|
|
var it = cit.iterator();
|
|
|
|
return Tuples.of(cit, it, skips);
|
|
|
|
}, (t, sink) -> {
|
|
|
|
ensureItThread();
|
|
|
|
var it = t.getT2();
|
|
|
|
var remainingSkips = t.getT3();
|
|
|
|
while (remainingSkips-- > 0 && it.hasNext()) {
|
|
|
|
it.next();
|
|
|
|
}
|
|
|
|
if (it.hasNext()) {
|
|
|
|
sink.next(codec.deserialize(it.next().key()));
|
|
|
|
} else {
|
|
|
|
sink.complete();
|
|
|
|
}
|
|
|
|
return t.getT3() == 0L ? t : t.mapT3(s -> 0L);
|
|
|
|
}, t -> {
|
|
|
|
ensureItThread();
|
|
|
|
var cit = t.getT1();
|
|
|
|
cit.close();
|
|
|
|
iterating = false;
|
|
|
|
endMode();
|
2021-10-15 22:03:53 +02:00
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
public Flux<T> reverseIterate(long skips) {
|
|
|
|
return Flux
|
|
|
|
.generate(() -> {
|
|
|
|
ensureItThread();
|
|
|
|
switchToMode(false, true);
|
|
|
|
iterating = true;
|
|
|
|
return true;
|
|
|
|
}, (isLastKey, sink) -> {
|
|
|
|
try {
|
|
|
|
ensureItThread();
|
|
|
|
boolean found;
|
|
|
|
if (isLastKey) {
|
|
|
|
found = cur.last();
|
|
|
|
} else {
|
|
|
|
found = cur.prev();
|
|
|
|
}
|
|
|
|
if (found) {
|
|
|
|
|
|
|
|
// Skip elements
|
|
|
|
if (isLastKey) {
|
|
|
|
long remainingSkips = skips;
|
|
|
|
while (remainingSkips > 0) {
|
|
|
|
if (cur.prev()) {
|
|
|
|
remainingSkips--;
|
|
|
|
} else {
|
|
|
|
sink.complete();
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
sink.next(codec.deserialize(cur.key()));
|
|
|
|
} else {
|
|
|
|
sink.complete();
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
} catch (Throwable ex) {
|
|
|
|
sink.error(ex);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}, t -> {
|
|
|
|
ensureItThread();
|
|
|
|
iterating = false;
|
|
|
|
endMode();
|
|
|
|
});
|
2021-10-13 00:23:56 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void close() throws IOException {
|
|
|
|
if (closed.compareAndSet(false, true)) {
|
2021-10-16 14:59:38 +02:00
|
|
|
ensureThread();
|
|
|
|
if (cur != null) {
|
|
|
|
cur.close();
|
|
|
|
}
|
|
|
|
if (rwTxn != null) {
|
|
|
|
rwTxn.close();
|
|
|
|
}
|
|
|
|
if (readTxn != null) {
|
|
|
|
readTxn.close();
|
|
|
|
}
|
|
|
|
try (var txn = env.txnWrite()) {
|
|
|
|
lmdb.drop(txn, true);
|
|
|
|
txn.commit();
|
2021-10-13 00:23:56 +02:00
|
|
|
}
|
2021-10-16 14:59:38 +02:00
|
|
|
lmdb.close();
|
2021-10-13 00:23:56 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-14 15:55:58 +02:00
|
|
|
@Override
|
|
|
|
public String toString() {
|
|
|
|
return new StringJoiner(", ", LMDBPriorityQueue.class.getSimpleName() + "[", "]")
|
|
|
|
.add("size=" + size)
|
|
|
|
.toString();
|
|
|
|
}
|
2021-10-15 22:03:53 +02:00
|
|
|
|
|
|
|
@Override
|
|
|
|
public ReversableResourceIterable<T> reverse() {
|
|
|
|
return new ReversableResourceIterable<>() {
|
|
|
|
@Override
|
|
|
|
public Flux<T> iterate() {
|
|
|
|
return reverseIterate();
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Flux<T> iterate(long skips) {
|
|
|
|
return reverseIterate(skips);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ReversableResourceIterable<T> reverse() {
|
|
|
|
return LMDBPriorityQueue.this;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
2021-10-13 00:23:56 +02:00
|
|
|
}
|