diff --git a/src/main/java/it/cavallium/dbengine/database/BlockingFluxIterable.java b/src/main/java/it/cavallium/dbengine/database/BlockingFluxIterable.java new file mode 100644 index 0000000..eeb1a92 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/BlockingFluxIterable.java @@ -0,0 +1,75 @@ +package it.cavallium.dbengine.database; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.jetbrains.annotations.Nullable; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Scheduler; + +public abstract class BlockingFluxIterable { + + private final Scheduler scheduler; + + public BlockingFluxIterable(Scheduler scheduler) { + this.scheduler = scheduler; + } + + public Flux generate() { + return Flux + .create(sink -> { + boolean alreadyInitialized = false; + AtomicLong requests = new AtomicLong(0); + Semaphore availableRequests = new Semaphore(0); + AtomicBoolean cancelled = new AtomicBoolean(false); + sink.onRequest(n -> { + requests.addAndGet(n); + availableRequests.release(); + }); + sink.onDispose(() -> { + cancelled.set(true); + availableRequests.release(); + }); + + try { + try { + loop: + while (true) { + availableRequests.acquireUninterruptibly(); + var remainingRequests = requests.getAndSet(0); + if (remainingRequests == 0 || cancelled.get()) { + break; + } + + while (remainingRequests-- > 0) { + if (!alreadyInitialized) { + alreadyInitialized = true; + this.onStartup(); + } + + T next = onNext(); + if (next == null) { + break loop; + } + sink.next(next); + } + } + } finally { + if (alreadyInitialized) { + onTerminate(); + } + } + } finally { + sink.complete(); + } + }) + .subscribeOn(scheduler); + } + + public abstract void onStartup(); + + public abstract void onTerminate(); + + @Nullable + public abstract T onNext(); +} diff --git a/src/main/java/it/cavallium/dbengine/database/BoundedGroupedRocksFluxIterable.java b/src/main/java/it/cavallium/dbengine/database/BoundedGroupedRocksFluxIterable.java new file mode 100644 index 0000000..974fe67 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/BoundedGroupedRocksFluxIterable.java @@ -0,0 +1,99 @@ +package it.cavallium.dbengine.database; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.jetbrains.annotations.Nullable; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksIterator; +import reactor.core.scheduler.Scheduler; + +public abstract class BoundedGroupedRocksFluxIterable extends BlockingFluxIterable> { + + private final RocksDB db; + private final ColumnFamilyHandle cfh; + protected final LLRange range; + private final int prefixLength; + + protected RocksIterator rocksIterator; + protected ReadOptions readOptions; + protected byte[] firstGroupKey = null; + protected List currentGroupValues = new ArrayList<>(); + + public BoundedGroupedRocksFluxIterable(Scheduler scheduler, + RocksDB db, + ColumnFamilyHandle cfh, + LLRange range, + int prefixLength) { + super(scheduler); + this.db = db; + this.cfh = cfh; + this.range = range; + this.prefixLength = prefixLength; + } + + @Override + public void onStartup() { + readOptions = this.getReadOptions(); + rocksIterator = db.newIterator(cfh, readOptions); + if (range.hasMin()) { + rocksIterator.seek(range.getMin()); + } else { + rocksIterator.seekToFirst(); + } + } + + @Override + public void onTerminate() { + if (rocksIterator != null) { + rocksIterator.close(); + } + } + + @Nullable + @Override + public List onNext() { + while (rocksIterator.isValid()) { + if (!rocksIterator.isValid()) { + break; + } + byte[] key = rocksIterator.key(); + if (firstGroupKey == null) { // Fix first value + firstGroupKey = key; + } + if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) { + break; + } + + List result = null; + + if (Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) { + currentGroupValues.add(transformEntry(key)); + } else { + if (!currentGroupValues.isEmpty()) { + result = currentGroupValues; + } + firstGroupKey = key; + currentGroupValues = new ArrayList<>(); + } + if (result != null) { + return result; + } + rocksIterator.next(); + } + if (!currentGroupValues.isEmpty()) { + return currentGroupValues; + } + return null; + } + + protected abstract ReadOptions getReadOptions(); + + protected abstract T transformEntry(byte[] key); + + protected byte[] getValue() { + return rocksIterator.value(); + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/BoundedRocksFluxIterable.java b/src/main/java/it/cavallium/dbengine/database/BoundedRocksFluxIterable.java new file mode 100644 index 0000000..947bc37 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/BoundedRocksFluxIterable.java @@ -0,0 +1,69 @@ +package it.cavallium.dbengine.database; + +import java.util.Arrays; +import org.jetbrains.annotations.Nullable; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksIterator; +import reactor.core.scheduler.Scheduler; + +public abstract class BoundedRocksFluxIterable extends BlockingFluxIterable { + + private final RocksDB db; + private final ColumnFamilyHandle cfh; + protected final LLRange range; + + protected RocksIterator rocksIterator; + protected ReadOptions readOptions; + + public BoundedRocksFluxIterable(Scheduler scheduler, + RocksDB db, + ColumnFamilyHandle cfh, + LLRange range) { + super(scheduler); + this.db = db; + this.cfh = cfh; + this.range = range; + } + + @Override + public void onStartup() { + readOptions = this.getReadOptions(); + rocksIterator = db.newIterator(cfh, readOptions); + if (range.hasMin()) { + rocksIterator.seek(range.getMin()); + } else { + rocksIterator.seekToFirst(); + } + } + + @Override + public void onTerminate() { + if (rocksIterator != null) { + rocksIterator.close(); + } + } + + @Nullable + @Override + public T onNext() { + if (!rocksIterator.isValid()) { + return null; + } + byte[] key = rocksIterator.key(); + if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) { + return null; + } + rocksIterator.next(); + return this.transformEntry(key); + } + + protected abstract ReadOptions getReadOptions(); + + protected abstract T transformEntry(byte[] key); + + protected byte[] getValue() { + return rocksIterator.value(); + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 67267d4..2121cec 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -1,5 +1,7 @@ package it.cavallium.dbengine.database.disk; +import it.cavallium.dbengine.database.BoundedGroupedRocksFluxIterable; +import it.cavallium.dbengine.database.BoundedRocksFluxIterable; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLRange; @@ -29,10 +31,10 @@ import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; import org.rocksdb.Snapshot; import org.rocksdb.WriteOptions; -import org.warp.commonutils.log.Logger; -import org.warp.commonutils.log.LoggerFactory; import org.warp.commonutils.concurrency.atomicity.NotAtomic; import org.warp.commonutils.locks.Striped; +import org.warp.commonutils.log.Logger; +import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; @@ -555,70 +557,33 @@ public class LLLocalDictionary implements LLDictionary { } private Flux> getRangeMulti(LLSnapshot snapshot, LLRange range) { - return Flux - .>push(sink -> { - try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) { - if (range.hasMin()) { - rocksIterator.seek(range.getMin()); - } else { - rocksIterator.seekToFirst(); - } - byte[] key; - while (rocksIterator.isValid()) { - key = rocksIterator.key(); - if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) { - break; - } - sink.next(Map.entry(key, rocksIterator.value())); - rocksIterator.next(); - } - } finally { - sink.complete(); - } - }) - .subscribeOn(dbScheduler); + return new BoundedRocksFluxIterable>(dbScheduler, db, cfh, range) { + + @Override + protected ReadOptions getReadOptions() { + return resolveSnapshot(snapshot); + } + + @Override + protected Entry transformEntry(byte[] key) { + return Map.entry(key, this.getValue()); + } + }.generate(); } private Flux>> getRangeMultiGrouped(LLSnapshot snapshot, LLRange range, int prefixLength) { - return Flux - .>>push(sink -> { - try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) { - if (range.hasMin()) { - rocksIterator.seek(range.getMin()); - } else { - rocksIterator.seekToFirst(); - } - byte[] firstGroupKey = null; - List> currentGroupValues = new ArrayList<>(); + return new BoundedGroupedRocksFluxIterable>(dbScheduler, db, cfh, range, prefixLength) { - byte[] key; - while (rocksIterator.isValid()) { - key = rocksIterator.key(); - if (firstGroupKey == null) { // Fix first value - firstGroupKey = key; - } - if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) { - break; - } - if (Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) { - currentGroupValues.add(Map.entry(key, rocksIterator.value())); - } else { - if (!currentGroupValues.isEmpty()) { - sink.next(currentGroupValues); - } - firstGroupKey = key; - currentGroupValues = new ArrayList<>(); - } - rocksIterator.next(); - } - if (!currentGroupValues.isEmpty()) { - sink.next(currentGroupValues); - } - } finally { - sink.complete(); - } - }) - .subscribeOn(dbScheduler); + @Override + protected ReadOptions getReadOptions() { + return resolveSnapshot(snapshot); + } + + @Override + protected Entry transformEntry(byte[] key) { + return Map.entry(key, this.getValue()); + } + }.generate(); } @Override @@ -634,44 +599,18 @@ public class LLLocalDictionary implements LLDictionary { @Override public Flux> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) { - return Flux - .>push(sink -> { - try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) { - if (range.hasMin()) { - rocksIterator.seek(range.getMin()); - } else { - rocksIterator.seekToFirst(); - } - byte[] firstGroupKey = null; - List currentGroupValues = new ArrayList<>(); + return new BoundedGroupedRocksFluxIterable(dbScheduler, db, cfh, range, prefixLength) { - byte[] key; - while (rocksIterator.isValid()) { - key = rocksIterator.key(); - if (firstGroupKey == null) { // Fix first value - firstGroupKey = key; - } - if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) { - break; - } - if (!Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) { - if (!currentGroupValues.isEmpty()) { - sink.next(currentGroupValues); - } - firstGroupKey = key; - currentGroupValues = new ArrayList<>(); - } - currentGroupValues.add(key); - rocksIterator.next(); - } - if (!currentGroupValues.isEmpty()) { - sink.next(currentGroupValues); - } - } finally { - sink.complete(); - } - }) - .subscribeOn(dbScheduler); + @Override + protected ReadOptions getReadOptions() { + return resolveSnapshot(snapshot); + } + + @Override + protected byte[] transformEntry(byte[] key) { + return key; + } + }.generate(); } private Flux getRangeKeysSingle(LLSnapshot snapshot, byte[] key) { @@ -683,28 +622,18 @@ public class LLLocalDictionary implements LLDictionary { } private Flux getRangeKeysMulti(LLSnapshot snapshot, LLRange range) { - return Flux - .push(sink -> { - try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) { - if (range.hasMin()) { - rocksIterator.seek(range.getMin()); - } else { - rocksIterator.seekToFirst(); - } - byte[] key; - while (rocksIterator.isValid()) { - key = rocksIterator.key(); - if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) { - break; - } - sink.next(key); - rocksIterator.next(); - } - } finally { - sink.complete(); - } - }) - .subscribeOn(dbScheduler); + return new BoundedRocksFluxIterable(dbScheduler, db, cfh, range) { + + @Override + protected ReadOptions getReadOptions() { + return resolveSnapshot(snapshot); + } + + @Override + protected byte[] transformEntry(byte[] key) { + return key; + } + }.generate(); } //todo: replace implementation with a simple Flux.push