From 9140c7f3d6cc900c470890034cf113b5f782acb3 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 26 Feb 2021 21:21:02 +0100 Subject: [PATCH] Bugfix --- .../database/BlockingFluxIterable.java | 30 ++++++++++++------- .../BoundedGroupedRocksFluxIterable.java | 6 ++-- .../database/BoundedRocksFluxIterable.java | 6 ++-- .../database/disk/LLLocalDictionary.java | 9 +++--- 4 files changed, 27 insertions(+), 24 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/BlockingFluxIterable.java b/src/main/java/it/cavallium/dbengine/database/BlockingFluxIterable.java index 87898f2..af1abe4 100644 --- a/src/main/java/it/cavallium/dbengine/database/BlockingFluxIterable.java +++ b/src/main/java/it/cavallium/dbengine/database/BlockingFluxIterable.java @@ -4,24 +4,27 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.jetbrains.annotations.Nullable; +import org.warp.commonutils.log.Logger; +import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; -import reactor.core.scheduler.Scheduler; public abstract class BlockingFluxIterable { - private final Scheduler scheduler; + private static final Logger logger = LoggerFactory.getLogger(BlockingFluxIterable.class); + private final String name; + private AtomicBoolean alreadyInitialized = new AtomicBoolean(false); + private AtomicLong requests = new AtomicLong(0); + private Semaphore availableRequests = new Semaphore(0); + private AtomicBoolean cancelled = new AtomicBoolean(false); - public BlockingFluxIterable(Scheduler scheduler) { - this.scheduler = scheduler; + public BlockingFluxIterable(String name) { + this.name = name; } public Flux generate() { + logger.trace("Generating iterable flux {}", this.name); return Flux .create(sink -> { - AtomicBoolean alreadyInitialized = new AtomicBoolean(false); - AtomicLong requests = new AtomicLong(0); - Semaphore availableRequests = new Semaphore(0); - AtomicBoolean cancelled = new AtomicBoolean(false); sink.onRequest(n -> { requests.addAndGet(n); availableRequests.release(); @@ -31,14 +34,15 @@ public abstract class BlockingFluxIterable { availableRequests.release(); }); - scheduler.schedule(() -> { + new Thread(() -> { + logger.trace("Starting iterable flux {}", this.name); try { try { loop: while (true) { availableRequests.acquireUninterruptibly(); var remainingRequests = requests.getAndSet(0); - if (remainingRequests == 0 || cancelled.get()) { + if (cancelled.get()) { break; } @@ -64,7 +68,7 @@ public abstract class BlockingFluxIterable { } finally { sink.complete(); } - }); + }, "blocking-flux-iterable").start(); }); } @@ -74,4 +78,8 @@ public abstract class BlockingFluxIterable { @Nullable public abstract T onNext() throws InterruptedException; + + protected boolean isCancelled() { + return cancelled.get(); + } } diff --git a/src/main/java/it/cavallium/dbengine/database/BoundedGroupedRocksFluxIterable.java b/src/main/java/it/cavallium/dbengine/database/BoundedGroupedRocksFluxIterable.java index 27158d5..5174229 100644 --- a/src/main/java/it/cavallium/dbengine/database/BoundedGroupedRocksFluxIterable.java +++ b/src/main/java/it/cavallium/dbengine/database/BoundedGroupedRocksFluxIterable.java @@ -8,7 +8,6 @@ import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksIterator; -import reactor.core.scheduler.Scheduler; public abstract class BoundedGroupedRocksFluxIterable extends BlockingFluxIterable> { @@ -20,12 +19,11 @@ public abstract class BoundedGroupedRocksFluxIterable extends BlockingFluxIte protected RocksIterator rocksIterator; protected ReadOptions readOptions; - public BoundedGroupedRocksFluxIterable(Scheduler scheduler, - RocksDB db, + public BoundedGroupedRocksFluxIterable(RocksDB db, ColumnFamilyHandle cfh, LLRange range, int prefixLength) { - super(scheduler); + super("bounded-grouped-rocksdb"); this.db = db; this.cfh = cfh; this.range = range; diff --git a/src/main/java/it/cavallium/dbengine/database/BoundedRocksFluxIterable.java b/src/main/java/it/cavallium/dbengine/database/BoundedRocksFluxIterable.java index 1205c5f..3c4e95e 100644 --- a/src/main/java/it/cavallium/dbengine/database/BoundedRocksFluxIterable.java +++ b/src/main/java/it/cavallium/dbengine/database/BoundedRocksFluxIterable.java @@ -6,7 +6,6 @@ import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksIterator; -import reactor.core.scheduler.Scheduler; public abstract class BoundedRocksFluxIterable extends BlockingFluxIterable { @@ -17,11 +16,10 @@ public abstract class BoundedRocksFluxIterable extends BlockingFluxIterable> getRangeMulti(LLSnapshot snapshot, LLRange range) { - return new BoundedRocksFluxIterable>(dbScheduler, db, cfh, range) { + return new BoundedRocksFluxIterable>(db, cfh, range) { @Override protected ReadOptions getReadOptions() { @@ -573,7 +572,7 @@ public class LLLocalDictionary implements LLDictionary { } private Flux>> getRangeMultiGrouped(LLSnapshot snapshot, LLRange range, int prefixLength) { - return new BoundedGroupedRocksFluxIterable>(Schedulers.boundedElastic(), db, cfh, range, prefixLength) { + return new BoundedGroupedRocksFluxIterable>(db, cfh, range, prefixLength) { @Override protected ReadOptions getReadOptions() { @@ -600,7 +599,7 @@ public class LLLocalDictionary implements LLDictionary { @Override public Flux> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) { - return new BoundedGroupedRocksFluxIterable(Schedulers.boundedElastic(), db, cfh, range, prefixLength) { + return new BoundedGroupedRocksFluxIterable(db, cfh, range, prefixLength) { @Override protected ReadOptions getReadOptions() { @@ -623,7 +622,7 @@ public class LLLocalDictionary implements LLDictionary { } private Flux getRangeKeysMulti(LLSnapshot snapshot, LLRange range) { - return new BoundedRocksFluxIterable(dbScheduler, db, cfh, range) { + return new BoundedRocksFluxIterable(db, cfh, range) { @Override protected ReadOptions getReadOptions() {