package it.cavallium.dbengine.lucene; import static org.lmdbjava.DbiFlags.*; import io.net5.buffer.ByteBuf; import io.net5.buffer.PooledByteBufAllocator; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.SafeCloseable; import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; import java.io.Closeable; import java.io.IOException; import java.util.ArrayDeque; import java.util.Deque; import java.util.Iterator; import java.util.Objects; import java.util.Queue; import java.util.StringJoiner; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.jetbrains.annotations.NotNull; import org.lmdbjava.Cursor; import org.lmdbjava.CursorIterable; import org.lmdbjava.CursorIterable.KeyVal; import org.lmdbjava.Dbi; import org.lmdbjava.Env; import org.lmdbjava.GetOp; import org.lmdbjava.Txn; import reactor.core.publisher.Flux; import reactor.util.function.Tuple2; import reactor.util.function.Tuple3; import reactor.util.function.Tuples; public class LMDBPriorityQueue implements PriorityQueue, Reversable>, ReversableResourceIterable { private static final AtomicLong NEXT_ITEM_UID = new AtomicLong(0); private final AtomicBoolean closed = new AtomicBoolean(); private final LMDBSortedCodec codec; private final LLTempLMDBEnv tempEnv; private final Env env; private final int lmdbDbId; private final Dbi lmdb; private boolean writing; private boolean iterating; private Txn readTxn; private Txn rwTxn; private Cursor cur; // Cache private static final int WRITE_QUEUE_MAX_BOUND = 10_000; private final Deque toWriteKeys = new ArrayDeque<>(); private final Deque toWriteValues = new ArrayDeque<>(); private boolean topValid = true; private T top = null; private long size = 0; public LMDBPriorityQueue(LLTempLMDBEnv env, LMDBSortedCodec codec) { this.codec = codec; this.tempEnv = env; this.env = env.getEnv(); this.lmdbDbId = env.allocateDb(); this.lmdb = this.env.openDbi(LLTempLMDBEnv.stringifyDbId(lmdbDbId), codec::compareDirect, MDB_CREATE, MDB_DUPSORT, MDB_DUPFIXED); this.writing = true; this.iterating = false; this.rwTxn = null; this.readTxn = null; this.cur = null; } public LMDBSortedCodec getCodec() { return codec; } private ByteBuf allocate(int size) { return PooledByteBufAllocator.DEFAULT.directBuffer(size, size); } private void switchToMode(boolean write, boolean wantCursor) { assert !closed.get() : "Closed"; if (iterating) { throw new IllegalStateException("Tried to " + (write ? "write" : "read") + " while still iterating"); } if (toWriteKeys.size() > 0) { switchToModeUncached(true, false); try { var ki = toWriteKeys.iterator(); var vi = toWriteValues.iterator(); while (ki.hasNext()) { var k = ki.next(); var v = vi.next(); lmdb.put(rwTxn, k, v); } } finally { endMode(); for (ByteBuf toWriteKey : toWriteKeys) { toWriteKey.release(); } for (ByteBuf toWriteValue : toWriteValues) { toWriteValue.release(); } toWriteKeys.clear(); toWriteValues.clear(); } } switchToModeUncached(write, wantCursor); } private void switchToModeUncached(boolean write, boolean wantCursor) { assert !closed.get() : "Closed"; if (iterating) { throw new IllegalStateException("Tried to " + (write ? "write" : "read") + " while still iterating"); } boolean changedMode = false; if (write) { if (!writing) { changedMode = true; writing = true; if (cur != null) { cur.close(); cur = null; } readTxn.close(); readTxn = null; assert rwTxn == null; rwTxn = env.txnWrite(); } else if (rwTxn == null) { assert readTxn == null; rwTxn = env.txnWrite(); } } else { if (writing) { changedMode = true; writing = false; if (cur != null) { cur.close(); cur = null; } if (rwTxn != null) { rwTxn.commit(); rwTxn.close(); rwTxn = null; } assert readTxn == null; readTxn = env.txnRead(); } } if (cur == null) { if (wantCursor) { cur = lmdb.openCursor(Objects.requireNonNull(writing ? rwTxn : readTxn)); } } else { if (changedMode) { cur.close(); cur = null; } } } private void endMode() { if (cur != null) { cur.close(); cur = null; } writing = true; if (readTxn != null) { readTxn.commit(); readTxn.close(); readTxn = null; } if (rwTxn != null) { rwTxn.commit(); rwTxn.close(); rwTxn = null; } assert cur == null; assert readTxn == null; } private static void ensureThread() { LLUtils.ensureBlocking(); } private static void ensureItThread() { ensureThread(); //if (!(Thread.currentThread() instanceof LMDBThread)) { // throw new IllegalStateException("Must run in LMDB scheduler"); //} } @Override public void add(T element) { ensureThread(); var buf = codec.serialize(this::allocate, element); var uid = allocate(Long.BYTES); uid.writeLong(NEXT_ITEM_UID.getAndIncrement()); if (toWriteKeys.size() < WRITE_QUEUE_MAX_BOUND) { toWriteKeys.add(buf); toWriteValues.add(uid); if (++size == 1) { topValid = true; top = element; } else { topValid = false; } } else { switchToMode(true, false); try { if (lmdb.put(rwTxn, buf, uid)) { if (++size == 1) { topValid = true; top = element; } else { topValid = false; } } } finally { endMode(); buf.release(); uid.release(); } } assert topSingleValid(element); } private boolean topSingleValid(T element) { if (size == 1) { var top = databaseTop(); return codec.compare(top, element) == 0; } else { return true; } } @Override public T top() { ensureThread(); if (topValid) { return top; } else { var top = databaseTop(); this.top = top; topValid = true; return top; } } private T databaseTop() { ensureThread(); switchToMode(false, true); try { if (cur.first()) { return codec.deserialize(cur.key()); } else { return null; } } finally { endMode(); } } @Override public T pop() { ensureThread(); switchToMode(true, true); try { if (cur.first()) { var data = codec.deserialize(cur.key()); if (--size == 0) { topValid = true; top = null; } else { topValid = false; top = null; } cur.delete(); return data; } else { topValid = true; top = null; return null; } } finally { endMode(); } } @Override public void replaceTop(T newTop) { ensureThread(); this.pop(); this.add(newTop); } @Override public long size() { ensureThread(); return size; } @Override public void clear() { ensureThread(); switchToMode(true, false); try { lmdb.drop(rwTxn); topValid = true; top = null; size = 0; } finally { endMode(); } } @Override public boolean remove(@NotNull T element) { ensureThread(); Objects.requireNonNull(element); switchToMode(true, true); var buf = codec.serialize(this::allocate, element); try { var deletable = cur.get(buf, GetOp.MDB_SET); if (deletable) { cur.delete(); if (topValid && codec.compare(top, element) == 0) { if (--size == 0) { top = null; } } else { if (--size == 0) { topValid = true; top = null; } else { topValid = false; } } } return deletable; } finally { endMode(); } } public Flux reverseIterate() { return Flux .generate(() -> { ensureItThread(); switchToMode(false, true); iterating = true; return true; }, (isLastKey, sink) -> { try { ensureItThread(); boolean found; if (isLastKey) { found = cur.last(); } else { found = cur.prev(); } if (found) { sink.next(codec.deserialize(cur.key())); } else { sink.complete(); } return false; } catch (Throwable ex) { sink.error(ex); return false; } }, t -> { ensureItThread(); iterating = false; endMode(); }); } @Override public Flux iterate() { return Flux ., Iterator>>>generate(() -> { ensureItThread(); switchToMode(false, false); iterating = true; if (cur != null) { cur.close(); cur = null; } CursorIterable cit = lmdb.iterate(readTxn); var it = cit.iterator(); return Tuples.of(cit, it); }, (t, sink) -> { try { ensureItThread(); var it = t.getT2(); if (it.hasNext()) { sink.next(codec.deserialize(it.next().key())); } else { sink.complete(); } return t; } catch (Throwable ex) { sink.error(ex); return t; } }, t -> { ensureItThread(); var cit = t.getT1(); cit.close(); iterating = false; endMode(); }); } @Override public Flux iterate(long skips) { return Flux ., Iterator>, Long>>generate(() -> { ensureItThread(); switchToMode(false, false); iterating = true; if (cur != null) { cur.close(); cur = null; } CursorIterable cit = lmdb.iterate(readTxn); var it = cit.iterator(); return Tuples.of(cit, it, skips); }, (t, sink) -> { ensureItThread(); var it = t.getT2(); var remainingSkips = t.getT3(); while (remainingSkips-- > 0 && it.hasNext()) { it.next(); } if (it.hasNext()) { sink.next(codec.deserialize(it.next().key())); } else { sink.complete(); } return t.getT3() == 0L ? t : t.mapT3(s -> 0L); }, t -> { ensureItThread(); var cit = t.getT1(); cit.close(); iterating = false; endMode(); }); } public Flux reverseIterate(long skips) { return Flux .generate(() -> { ensureItThread(); switchToMode(false, true); iterating = true; return true; }, (isLastKey, sink) -> { try { ensureItThread(); boolean found; if (isLastKey) { found = cur.last(); } else { found = cur.prev(); } if (found) { // Skip elements if (isLastKey) { long remainingSkips = skips; while (remainingSkips > 0) { if (cur.prev()) { remainingSkips--; } else { sink.complete(); return false; } } } sink.next(codec.deserialize(cur.key())); } else { sink.complete(); } return false; } catch (Throwable ex) { sink.error(ex); return false; } }, t -> { ensureItThread(); iterating = false; endMode(); }); } @Override public void close() { if (closed.compareAndSet(false, true)) { ensureThread(); for (ByteBuf toWriteKey : toWriteKeys) { toWriteKey.release(); } for (ByteBuf toWriteValue : toWriteValues) { toWriteValue.release(); } toWriteKeys.clear(); toWriteValues.clear(); if (cur != null) { cur.close(); } if (rwTxn != null) { rwTxn.close(); } if (readTxn != null) { readTxn.close(); } try (var txn = env.txnWrite()) { lmdb.drop(txn, true); txn.commit(); } lmdb.close(); this.tempEnv.freeDb(lmdbDbId); if (this.codec instanceof SafeCloseable closeable) { closeable.close(); } } } @Override public String toString() { return new StringJoiner(", ", LMDBPriorityQueue.class.getSimpleName() + "[", "]") .add("size=" + size) .toString(); } @Override public ReversableResourceIterable reverse() { return new ReversableResourceIterable<>() { @Override public void close() { LMDBPriorityQueue.this.close(); } @Override public Flux iterate() { return reverseIterate(); } @Override public Flux iterate(long skips) { return reverseIterate(skips); } @Override public ReversableResourceIterable reverse() { return LMDBPriorityQueue.this; } }; } }