From fd4f8e77d69b78c41d2e9088c35691987fad7075 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sun, 14 Mar 2021 23:06:46 +0100 Subject: [PATCH] Refactor iterators --- .../database/BlockingFluxIterable.java | 139 ------------------ .../database/disk/LLLocalDictionary.java | 10 +- ...=> LLLocalEntryReactiveRocksIterator.java} | 4 +- ...calGroupedEntryReactiveRocksIterator.java} | 5 +- ...LocalGroupedKeyReactiveRocksIterator.java} | 4 +- ... LLLocalGroupedReactiveRocksIterator.java} | 4 +- ...LLocalKeyPrefixReactiveRocksIterator.java} | 4 +- ...a => LLLocalKeyReactiveRocksIterator.java} | 4 +- ...java => LLLocalReactiveRocksIterator.java} | 4 +- 9 files changed, 20 insertions(+), 158 deletions(-) delete mode 100644 src/main/java/it/cavallium/dbengine/database/BlockingFluxIterable.java rename src/main/java/it/cavallium/dbengine/database/disk/{LLLocalLuceneEntryReactiveIterator.java => LLLocalEntryReactiveRocksIterator.java} (74%) rename src/main/java/it/cavallium/dbengine/database/disk/{LLLocalLuceneGroupedEntryReactiveIterator.java => LLLocalGroupedEntryReactiveRocksIterator.java} (74%) rename src/main/java/it/cavallium/dbengine/database/disk/{LLLocalLuceneGroupedKeysReactiveIterator.java => LLLocalGroupedKeyReactiveRocksIterator.java} (73%) rename src/main/java/it/cavallium/dbengine/database/disk/{LLLocalLuceneGroupedReactiveIterator.java => LLLocalGroupedReactiveRocksIterator.java} (96%) rename src/main/java/it/cavallium/dbengine/database/disk/{LLLocalLuceneKeyPrefixesReactiveIterator.java => LLLocalKeyPrefixReactiveRocksIterator.java} (95%) rename src/main/java/it/cavallium/dbengine/database/disk/{LLLocalLuceneKeysReactiveIterator.java => LLLocalKeyReactiveRocksIterator.java} (72%) rename src/main/java/it/cavallium/dbengine/database/disk/{LLLocalLuceneReactiveIterator.java => LLLocalReactiveRocksIterator.java} (95%) diff --git a/src/main/java/it/cavallium/dbengine/database/BlockingFluxIterable.java b/src/main/java/it/cavallium/dbengine/database/BlockingFluxIterable.java deleted file mode 100644 index 9c4a592..0000000 --- a/src/main/java/it/cavallium/dbengine/database/BlockingFluxIterable.java +++ /dev/null @@ -1,139 +0,0 @@ -package it.cavallium.dbengine.database; - -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import org.jetbrains.annotations.Nullable; -import org.warp.commonutils.log.Logger; -import org.warp.commonutils.log.LoggerFactory; -import reactor.core.publisher.Flux; -import reactor.core.scheduler.Scheduler; - -public abstract class BlockingFluxIterable { - - private static final Logger logger = LoggerFactory.getLogger(BlockingFluxIterable.class); - private final String name; - private AtomicBoolean cancelled = new AtomicBoolean(false); - - public BlockingFluxIterable(String name) { - this.name = name; - } - - public Flux generate(Scheduler scheduler) { - logger.trace("Generating iterable flux {}", this.name); - AtomicBoolean alreadyInitialized = new AtomicBoolean(false); - AtomicLong requests = new AtomicLong(0); - Semaphore availableRequests = new Semaphore(0); - return Flux - .create(sink -> { - sink.onRequest(n -> { - requests.addAndGet(n); - availableRequests.release(); - }); - sink.onDispose(() -> { - cancelled.set(true); - availableRequests.release(); - }); - - scheduler.schedule(() -> { - logger.trace("Starting iterable flux {}", this.name); - try { - try { - loop: - while (true) { - availableRequests.acquireUninterruptibly(); - var remainingRequests = requests.getAndSet(0); - if (cancelled.get()) { - break; - } - - while (remainingRequests-- > 0) { - if (alreadyInitialized.compareAndSet(false, true)) { - this.onStartup(); - } - - T next = onNext(); - if (next == null) { - break loop; - } - sink.next(next); - } - } - } catch (InterruptedException ex) { - sink.error(ex); - } finally { - if (alreadyInitialized.get()) { - onTerminate(); - } - } - } finally { - sink.complete(); - } - }); - }); - } - - public Flux generateNonblocking(Scheduler scheduler, int rateLimit) { - logger.trace("Generating nonblocking iterable flux {}", this.name); - AtomicBoolean alreadyInitialized = new AtomicBoolean(false); - final Object lock = new Object(); - return Flux - .create(sink -> { - sink.onRequest(n -> { - if (n > rateLimit || n < 1) { - sink.error(new IndexOutOfBoundsException("Requests must be <= " + rateLimit)); - } else { - scheduler.schedule(() -> { - synchronized (lock) { - if (cancelled.get()) { - return; - } - int remaining = (int) n; - try { - T next; - do { - if (alreadyInitialized.compareAndSet(false, true)) { - this.onStartup(); - } - next = this.onNext(); - if (next != null) { - sink.next(next); - } else { - cancelled.set(true); - sink.complete(); - } - } while (next != null && --remaining > 0 && !cancelled.get()); - } catch (InterruptedException e) { - sink.error(e); - } - } - }); - } - }); - sink.onCancel(() -> { - }); - sink.onDispose(() -> { - cancelled.set(true); - scheduler.schedule(() -> { - synchronized (lock) { - if (alreadyInitialized.get()) { - this.onTerminate(); - } - } - }); - }); - }) - .limitRate(rateLimit); - } - - public abstract void onStartup(); - - public abstract void onTerminate(); - - @Nullable - public abstract T onNext() throws InterruptedException; - - protected boolean isCancelled() { - return cancelled.get(); - } -} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 8cd1b16..2aa80e1 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -550,13 +550,13 @@ public class LLLocalDictionary implements LLDictionary { } private Flux> getRangeMulti(LLSnapshot snapshot, LLRange range) { - return new LLLocalLuceneEntryReactiveIterator(db, cfh, range, resolveSnapshot(snapshot)) + return new LLLocalEntryReactiveRocksIterator(db, cfh, range, resolveSnapshot(snapshot)) .flux() .subscribeOn(dbScheduler); } private Flux>> getRangeMultiGrouped(LLSnapshot snapshot, LLRange range, int prefixLength) { - return new LLLocalLuceneGroupedEntryReactiveIterator(db, + return new LLLocalGroupedEntryReactiveRocksIterator(db, cfh, prefixLength, range, @@ -578,7 +578,7 @@ public class LLLocalDictionary implements LLDictionary { @Override public Flux> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) { - return new LLLocalLuceneGroupedKeysReactiveIterator(db, + return new LLLocalGroupedKeyReactiveRocksIterator(db, cfh, prefixLength, range, @@ -589,7 +589,7 @@ public class LLLocalDictionary implements LLDictionary { @Override public Flux getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) { - return new LLLocalLuceneKeyPrefixesReactiveIterator(db, + return new LLLocalKeyPrefixReactiveRocksIterator(db, cfh, prefixLength, range, @@ -607,7 +607,7 @@ public class LLLocalDictionary implements LLDictionary { } private Flux getRangeKeysMulti(LLSnapshot snapshot, LLRange range) { - return new LLLocalLuceneKeysReactiveIterator(db, cfh, range, resolveSnapshot(snapshot)).flux().subscribeOn(dbScheduler); + return new LLLocalKeyReactiveRocksIterator(db, cfh, range, resolveSnapshot(snapshot)).flux().subscribeOn(dbScheduler); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneEntryReactiveIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalEntryReactiveRocksIterator.java similarity index 74% rename from src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneEntryReactiveIterator.java rename to src/main/java/it/cavallium/dbengine/database/disk/LLLocalEntryReactiveRocksIterator.java index e6f4e82..9a8d823 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneEntryReactiveIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalEntryReactiveRocksIterator.java @@ -7,9 +7,9 @@ import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; -public class LLLocalLuceneEntryReactiveIterator extends LLLocalLuceneReactiveIterator> { +public class LLLocalEntryReactiveRocksIterator extends LLLocalReactiveRocksIterator> { - public LLLocalLuceneEntryReactiveIterator(RocksDB db, + public LLLocalEntryReactiveRocksIterator(RocksDB db, ColumnFamilyHandle cfh, LLRange range, ReadOptions readOptions) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedEntryReactiveIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedEntryReactiveRocksIterator.java similarity index 74% rename from src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedEntryReactiveIterator.java rename to src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedEntryReactiveRocksIterator.java index 61e2b85..c1804ee 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedEntryReactiveIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedEntryReactiveRocksIterator.java @@ -7,9 +7,10 @@ import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; -public class LLLocalLuceneGroupedEntryReactiveIterator extends LLLocalLuceneGroupedReactiveIterator> { +public class LLLocalGroupedEntryReactiveRocksIterator extends + LLLocalGroupedReactiveRocksIterator> { - public LLLocalLuceneGroupedEntryReactiveIterator(RocksDB db, + public LLLocalGroupedEntryReactiveRocksIterator(RocksDB db, ColumnFamilyHandle cfh, int prefixLength, LLRange range, diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedKeysReactiveIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedKeyReactiveRocksIterator.java similarity index 73% rename from src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedKeysReactiveIterator.java rename to src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedKeyReactiveRocksIterator.java index 10e38a5..64656df 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedKeysReactiveIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedKeyReactiveRocksIterator.java @@ -5,9 +5,9 @@ import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; -public class LLLocalLuceneGroupedKeysReactiveIterator extends LLLocalLuceneGroupedReactiveIterator { +public class LLLocalGroupedKeyReactiveRocksIterator extends LLLocalGroupedReactiveRocksIterator { - public LLLocalLuceneGroupedKeysReactiveIterator(RocksDB db, + public LLLocalGroupedKeyReactiveRocksIterator(RocksDB db, ColumnFamilyHandle cfh, int prefixLength, LLRange range, diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedReactiveIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java similarity index 96% rename from src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedReactiveIterator.java rename to src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java index 670c5a1..b8003d8 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedReactiveIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java @@ -13,7 +13,7 @@ import org.rocksdb.Slice; import reactor.core.publisher.Flux; import reactor.util.function.Tuples; -public abstract class LLLocalLuceneGroupedReactiveIterator { +public abstract class LLLocalGroupedReactiveRocksIterator { private static final byte[] EMPTY = new byte[0]; @@ -25,7 +25,7 @@ public abstract class LLLocalLuceneGroupedReactiveIterator { private final boolean readValues; private final String debugName; - public LLLocalLuceneGroupedReactiveIterator(RocksDB db, + public LLLocalGroupedReactiveRocksIterator(RocksDB db, ColumnFamilyHandle cfh, int prefixLength, LLRange range, diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneKeyPrefixesReactiveIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java similarity index 95% rename from src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneKeyPrefixesReactiveIterator.java rename to src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java index a18f59b..25e5511 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneKeyPrefixesReactiveIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java @@ -11,7 +11,7 @@ import org.rocksdb.Slice; import reactor.core.publisher.Flux; import reactor.util.function.Tuples; -public class LLLocalLuceneKeyPrefixesReactiveIterator { +public class LLLocalKeyPrefixReactiveRocksIterator { private static final byte[] EMPTY = new byte[0]; @@ -22,7 +22,7 @@ public class LLLocalLuceneKeyPrefixesReactiveIterator { private final ReadOptions readOptions; private final String debugName; - public LLLocalLuceneKeyPrefixesReactiveIterator(RocksDB db, + public LLLocalKeyPrefixReactiveRocksIterator(RocksDB db, ColumnFamilyHandle cfh, int prefixLength, LLRange range, diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneKeysReactiveIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyReactiveRocksIterator.java similarity index 72% rename from src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneKeysReactiveIterator.java rename to src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyReactiveRocksIterator.java index 5b1dccd..39ddd09 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneKeysReactiveIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyReactiveRocksIterator.java @@ -5,9 +5,9 @@ import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; -public class LLLocalLuceneKeysReactiveIterator extends LLLocalLuceneReactiveIterator { +public class LLLocalKeyReactiveRocksIterator extends LLLocalReactiveRocksIterator { - public LLLocalLuceneKeysReactiveIterator(RocksDB db, + public LLLocalKeyReactiveRocksIterator(RocksDB db, ColumnFamilyHandle cfh, LLRange range, ReadOptions readOptions) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneReactiveIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java similarity index 95% rename from src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneReactiveIterator.java rename to src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java index be8d7fd..15d5eb3 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneReactiveIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java @@ -10,7 +10,7 @@ import org.rocksdb.Slice; import reactor.core.publisher.Flux; import reactor.util.function.Tuples; -public abstract class LLLocalLuceneReactiveIterator { +public abstract class LLLocalReactiveRocksIterator { private static final byte[] EMPTY = new byte[0]; @@ -20,7 +20,7 @@ public abstract class LLLocalLuceneReactiveIterator { private final ReadOptions readOptions; private final boolean readValues; - public LLLocalLuceneReactiveIterator(RocksDB db, + public LLLocalReactiveRocksIterator(RocksDB db, ColumnFamilyHandle cfh, LLRange range, ReadOptions readOptions,