Refactor iterators

This commit is contained in:
Andrea Cavalli 2021-03-14 23:06:46 +01:00
parent 63469c0f89
commit fd4f8e77d6
9 changed files with 20 additions and 158 deletions

View File

@ -1,139 +0,0 @@
package it.cavallium.dbengine.database;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
public abstract class BlockingFluxIterable<T> {
private static final Logger logger = LoggerFactory.getLogger(BlockingFluxIterable.class);
private final String name;
private AtomicBoolean cancelled = new AtomicBoolean(false);
public BlockingFluxIterable(String name) {
this.name = name;
}
public Flux<T> generate(Scheduler scheduler) {
logger.trace("Generating iterable flux {}", this.name);
AtomicBoolean alreadyInitialized = new AtomicBoolean(false);
AtomicLong requests = new AtomicLong(0);
Semaphore availableRequests = new Semaphore(0);
return Flux
.<T>create(sink -> {
sink.onRequest(n -> {
requests.addAndGet(n);
availableRequests.release();
});
sink.onDispose(() -> {
cancelled.set(true);
availableRequests.release();
});
scheduler.schedule(() -> {
logger.trace("Starting iterable flux {}", this.name);
try {
try {
loop:
while (true) {
availableRequests.acquireUninterruptibly();
var remainingRequests = requests.getAndSet(0);
if (cancelled.get()) {
break;
}
while (remainingRequests-- > 0) {
if (alreadyInitialized.compareAndSet(false, true)) {
this.onStartup();
}
T next = onNext();
if (next == null) {
break loop;
}
sink.next(next);
}
}
} catch (InterruptedException ex) {
sink.error(ex);
} finally {
if (alreadyInitialized.get()) {
onTerminate();
}
}
} finally {
sink.complete();
}
});
});
}
public Flux<T> generateNonblocking(Scheduler scheduler, int rateLimit) {
logger.trace("Generating nonblocking iterable flux {}", this.name);
AtomicBoolean alreadyInitialized = new AtomicBoolean(false);
final Object lock = new Object();
return Flux
.<T>create(sink -> {
sink.onRequest(n -> {
if (n > rateLimit || n < 1) {
sink.error(new IndexOutOfBoundsException("Requests must be <= " + rateLimit));
} else {
scheduler.schedule(() -> {
synchronized (lock) {
if (cancelled.get()) {
return;
}
int remaining = (int) n;
try {
T next;
do {
if (alreadyInitialized.compareAndSet(false, true)) {
this.onStartup();
}
next = this.onNext();
if (next != null) {
sink.next(next);
} else {
cancelled.set(true);
sink.complete();
}
} while (next != null && --remaining > 0 && !cancelled.get());
} catch (InterruptedException e) {
sink.error(e);
}
}
});
}
});
sink.onCancel(() -> {
});
sink.onDispose(() -> {
cancelled.set(true);
scheduler.schedule(() -> {
synchronized (lock) {
if (alreadyInitialized.get()) {
this.onTerminate();
}
}
});
});
})
.limitRate(rateLimit);
}
public abstract void onStartup();
public abstract void onTerminate();
@Nullable
public abstract T onNext() throws InterruptedException;
protected boolean isCancelled() {
return cancelled.get();
}
}

View File

@ -550,13 +550,13 @@ public class LLLocalDictionary implements LLDictionary {
}
private Flux<Entry<byte[],byte[]>> getRangeMulti(LLSnapshot snapshot, LLRange range) {
return new LLLocalLuceneEntryReactiveIterator(db, cfh, range, resolveSnapshot(snapshot))
return new LLLocalEntryReactiveRocksIterator(db, cfh, range, resolveSnapshot(snapshot))
.flux()
.subscribeOn(dbScheduler);
}
private Flux<List<Entry<byte[],byte[]>>> getRangeMultiGrouped(LLSnapshot snapshot, LLRange range, int prefixLength) {
return new LLLocalLuceneGroupedEntryReactiveIterator(db,
return new LLLocalGroupedEntryReactiveRocksIterator(db,
cfh,
prefixLength,
range,
@ -578,7 +578,7 @@ public class LLLocalDictionary implements LLDictionary {
@Override
public Flux<List<byte[]>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) {
return new LLLocalLuceneGroupedKeysReactiveIterator(db,
return new LLLocalGroupedKeyReactiveRocksIterator(db,
cfh,
prefixLength,
range,
@ -589,7 +589,7 @@ public class LLLocalDictionary implements LLDictionary {
@Override
public Flux<byte[]> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) {
return new LLLocalLuceneKeyPrefixesReactiveIterator(db,
return new LLLocalKeyPrefixReactiveRocksIterator(db,
cfh,
prefixLength,
range,
@ -607,7 +607,7 @@ public class LLLocalDictionary implements LLDictionary {
}
private Flux<byte[]> getRangeKeysMulti(LLSnapshot snapshot, LLRange range) {
return new LLLocalLuceneKeysReactiveIterator(db, cfh, range, resolveSnapshot(snapshot)).flux().subscribeOn(dbScheduler);
return new LLLocalKeyReactiveRocksIterator(db, cfh, range, resolveSnapshot(snapshot)).flux().subscribeOn(dbScheduler);
}
@Override

View File

@ -7,9 +7,9 @@ import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
public class LLLocalLuceneEntryReactiveIterator extends LLLocalLuceneReactiveIterator<Entry<byte[], byte[]>> {
public class LLLocalEntryReactiveRocksIterator extends LLLocalReactiveRocksIterator<Entry<byte[], byte[]>> {
public LLLocalLuceneEntryReactiveIterator(RocksDB db,
public LLLocalEntryReactiveRocksIterator(RocksDB db,
ColumnFamilyHandle cfh,
LLRange range,
ReadOptions readOptions) {

View File

@ -7,9 +7,10 @@ import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
public class LLLocalLuceneGroupedEntryReactiveIterator extends LLLocalLuceneGroupedReactiveIterator<Entry<byte[], byte[]>> {
public class LLLocalGroupedEntryReactiveRocksIterator extends
LLLocalGroupedReactiveRocksIterator<Entry<byte[], byte[]>> {
public LLLocalLuceneGroupedEntryReactiveIterator(RocksDB db,
public LLLocalGroupedEntryReactiveRocksIterator(RocksDB db,
ColumnFamilyHandle cfh,
int prefixLength,
LLRange range,

View File

@ -5,9 +5,9 @@ import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
public class LLLocalLuceneGroupedKeysReactiveIterator extends LLLocalLuceneGroupedReactiveIterator<byte[]> {
public class LLLocalGroupedKeyReactiveRocksIterator extends LLLocalGroupedReactiveRocksIterator<byte[]> {
public LLLocalLuceneGroupedKeysReactiveIterator(RocksDB db,
public LLLocalGroupedKeyReactiveRocksIterator(RocksDB db,
ColumnFamilyHandle cfh,
int prefixLength,
LLRange range,

View File

@ -13,7 +13,7 @@ import org.rocksdb.Slice;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuples;
public abstract class LLLocalLuceneGroupedReactiveIterator<T> {
public abstract class LLLocalGroupedReactiveRocksIterator<T> {
private static final byte[] EMPTY = new byte[0];
@ -25,7 +25,7 @@ public abstract class LLLocalLuceneGroupedReactiveIterator<T> {
private final boolean readValues;
private final String debugName;
public LLLocalLuceneGroupedReactiveIterator(RocksDB db,
public LLLocalGroupedReactiveRocksIterator(RocksDB db,
ColumnFamilyHandle cfh,
int prefixLength,
LLRange range,

View File

@ -11,7 +11,7 @@ import org.rocksdb.Slice;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuples;
public class LLLocalLuceneKeyPrefixesReactiveIterator {
public class LLLocalKeyPrefixReactiveRocksIterator {
private static final byte[] EMPTY = new byte[0];
@ -22,7 +22,7 @@ public class LLLocalLuceneKeyPrefixesReactiveIterator {
private final ReadOptions readOptions;
private final String debugName;
public LLLocalLuceneKeyPrefixesReactiveIterator(RocksDB db,
public LLLocalKeyPrefixReactiveRocksIterator(RocksDB db,
ColumnFamilyHandle cfh,
int prefixLength,
LLRange range,

View File

@ -5,9 +5,9 @@ import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
public class LLLocalLuceneKeysReactiveIterator extends LLLocalLuceneReactiveIterator<byte[]> {
public class LLLocalKeyReactiveRocksIterator extends LLLocalReactiveRocksIterator<byte[]> {
public LLLocalLuceneKeysReactiveIterator(RocksDB db,
public LLLocalKeyReactiveRocksIterator(RocksDB db,
ColumnFamilyHandle cfh,
LLRange range,
ReadOptions readOptions) {

View File

@ -10,7 +10,7 @@ import org.rocksdb.Slice;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuples;
public abstract class LLLocalLuceneReactiveIterator<T> {
public abstract class LLLocalReactiveRocksIterator<T> {
private static final byte[] EMPTY = new byte[0];
@ -20,7 +20,7 @@ public abstract class LLLocalLuceneReactiveIterator<T> {
private final ReadOptions readOptions;
private final boolean readValues;
public LLLocalLuceneReactiveIterator(RocksDB db,
public LLLocalReactiveRocksIterator(RocksDB db,
ColumnFamilyHandle cfh,
LLRange range,
ReadOptions readOptions,