This commit is contained in:
Andrea Cavalli 2021-02-26 21:21:02 +01:00
parent 6095f9eba9
commit 9140c7f3d6
4 changed files with 27 additions and 24 deletions

View File

@ -4,24 +4,27 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
public abstract class BlockingFluxIterable<T> {
private final Scheduler scheduler;
private static final Logger logger = LoggerFactory.getLogger(BlockingFluxIterable.class);
private final String name;
private AtomicBoolean alreadyInitialized = new AtomicBoolean(false);
private AtomicLong requests = new AtomicLong(0);
private Semaphore availableRequests = new Semaphore(0);
private AtomicBoolean cancelled = new AtomicBoolean(false);
public BlockingFluxIterable(Scheduler scheduler) {
this.scheduler = scheduler;
public BlockingFluxIterable(String name) {
this.name = name;
}
public Flux<T> generate() {
logger.trace("Generating iterable flux {}", this.name);
return Flux
.<T>create(sink -> {
AtomicBoolean alreadyInitialized = new AtomicBoolean(false);
AtomicLong requests = new AtomicLong(0);
Semaphore availableRequests = new Semaphore(0);
AtomicBoolean cancelled = new AtomicBoolean(false);
sink.onRequest(n -> {
requests.addAndGet(n);
availableRequests.release();
@ -31,14 +34,15 @@ public abstract class BlockingFluxIterable<T> {
availableRequests.release();
});
scheduler.schedule(() -> {
new Thread(() -> {
logger.trace("Starting iterable flux {}", this.name);
try {
try {
loop:
while (true) {
availableRequests.acquireUninterruptibly();
var remainingRequests = requests.getAndSet(0);
if (remainingRequests == 0 || cancelled.get()) {
if (cancelled.get()) {
break;
}
@ -64,7 +68,7 @@ public abstract class BlockingFluxIterable<T> {
} finally {
sink.complete();
}
});
}, "blocking-flux-iterable").start();
});
}
@ -74,4 +78,8 @@ public abstract class BlockingFluxIterable<T> {
@Nullable
public abstract T onNext() throws InterruptedException;
protected boolean isCancelled() {
return cancelled.get();
}
}

View File

@ -8,7 +8,6 @@ import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksIterator;
import reactor.core.scheduler.Scheduler;
public abstract class BoundedGroupedRocksFluxIterable<T> extends BlockingFluxIterable<List<T>> {
@ -20,12 +19,11 @@ public abstract class BoundedGroupedRocksFluxIterable<T> extends BlockingFluxIte
protected RocksIterator rocksIterator;
protected ReadOptions readOptions;
public BoundedGroupedRocksFluxIterable(Scheduler scheduler,
RocksDB db,
public BoundedGroupedRocksFluxIterable(RocksDB db,
ColumnFamilyHandle cfh,
LLRange range,
int prefixLength) {
super(scheduler);
super("bounded-grouped-rocksdb");
this.db = db;
this.cfh = cfh;
this.range = range;

View File

@ -6,7 +6,6 @@ import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksIterator;
import reactor.core.scheduler.Scheduler;
public abstract class BoundedRocksFluxIterable<T> extends BlockingFluxIterable<T> {
@ -17,11 +16,10 @@ public abstract class BoundedRocksFluxIterable<T> extends BlockingFluxIterable<T
protected RocksIterator rocksIterator;
protected ReadOptions readOptions;
public BoundedRocksFluxIterable(Scheduler scheduler,
RocksDB db,
public BoundedRocksFluxIterable(RocksDB db,
ColumnFamilyHandle cfh,
LLRange range) {
super(scheduler);
super("bounded-rocksdb");
this.db = db;
this.cfh = cfh;
this.range = range;

View File

@ -38,7 +38,6 @@ import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@NotAtomic
public class LLLocalDictionary implements LLDictionary {
@ -558,7 +557,7 @@ public class LLLocalDictionary implements LLDictionary {
}
private Flux<Entry<byte[],byte[]>> getRangeMulti(LLSnapshot snapshot, LLRange range) {
return new BoundedRocksFluxIterable<Entry<byte[], byte[]>>(dbScheduler, db, cfh, range) {
return new BoundedRocksFluxIterable<Entry<byte[], byte[]>>(db, cfh, range) {
@Override
protected ReadOptions getReadOptions() {
@ -573,7 +572,7 @@ public class LLLocalDictionary implements LLDictionary {
}
private Flux<List<Entry<byte[],byte[]>>> getRangeMultiGrouped(LLSnapshot snapshot, LLRange range, int prefixLength) {
return new BoundedGroupedRocksFluxIterable<Entry<byte[], byte[]>>(Schedulers.boundedElastic(), db, cfh, range, prefixLength) {
return new BoundedGroupedRocksFluxIterable<Entry<byte[], byte[]>>(db, cfh, range, prefixLength) {
@Override
protected ReadOptions getReadOptions() {
@ -600,7 +599,7 @@ public class LLLocalDictionary implements LLDictionary {
@Override
public Flux<List<byte[]>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) {
return new BoundedGroupedRocksFluxIterable<byte[]>(Schedulers.boundedElastic(), db, cfh, range, prefixLength) {
return new BoundedGroupedRocksFluxIterable<byte[]>(db, cfh, range, prefixLength) {
@Override
protected ReadOptions getReadOptions() {
@ -623,7 +622,7 @@ public class LLLocalDictionary implements LLDictionary {
}
private Flux<byte[]> getRangeKeysMulti(LLSnapshot snapshot, LLRange range) {
return new BoundedRocksFluxIterable<byte[]>(dbScheduler, db, cfh, range) {
return new BoundedRocksFluxIterable<byte[]>(db, cfh, range) {
@Override
protected ReadOptions getReadOptions() {