Implement backpressure during iteration

This commit is contained in:
Andrea Cavalli 2021-02-26 14:06:16 +01:00
parent cbcc4df690
commit 64b9010e50
4 changed files with 293 additions and 121 deletions

View File

@ -0,0 +1,75 @@
package it.cavallium.dbengine.database;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
public abstract class BlockingFluxIterable<T> {
private final Scheduler scheduler;
public BlockingFluxIterable(Scheduler scheduler) {
this.scheduler = scheduler;
}
public Flux<T> generate() {
return Flux
.<T>create(sink -> {
boolean alreadyInitialized = false;
AtomicLong requests = new AtomicLong(0);
Semaphore availableRequests = new Semaphore(0);
AtomicBoolean cancelled = new AtomicBoolean(false);
sink.onRequest(n -> {
requests.addAndGet(n);
availableRequests.release();
});
sink.onDispose(() -> {
cancelled.set(true);
availableRequests.release();
});
try {
try {
loop:
while (true) {
availableRequests.acquireUninterruptibly();
var remainingRequests = requests.getAndSet(0);
if (remainingRequests == 0 || cancelled.get()) {
break;
}
while (remainingRequests-- > 0) {
if (!alreadyInitialized) {
alreadyInitialized = true;
this.onStartup();
}
T next = onNext();
if (next == null) {
break loop;
}
sink.next(next);
}
}
} finally {
if (alreadyInitialized) {
onTerminate();
}
}
} finally {
sink.complete();
}
})
.subscribeOn(scheduler);
}
public abstract void onStartup();
public abstract void onTerminate();
@Nullable
public abstract T onNext();
}

View File

@ -0,0 +1,99 @@
package it.cavallium.dbengine.database;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksIterator;
import reactor.core.scheduler.Scheduler;
public abstract class BoundedGroupedRocksFluxIterable<T> extends BlockingFluxIterable<List<T>> {
private final RocksDB db;
private final ColumnFamilyHandle cfh;
protected final LLRange range;
private final int prefixLength;
protected RocksIterator rocksIterator;
protected ReadOptions readOptions;
protected byte[] firstGroupKey = null;
protected List<T> currentGroupValues = new ArrayList<>();
public BoundedGroupedRocksFluxIterable(Scheduler scheduler,
RocksDB db,
ColumnFamilyHandle cfh,
LLRange range,
int prefixLength) {
super(scheduler);
this.db = db;
this.cfh = cfh;
this.range = range;
this.prefixLength = prefixLength;
}
@Override
public void onStartup() {
readOptions = this.getReadOptions();
rocksIterator = db.newIterator(cfh, readOptions);
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
}
@Override
public void onTerminate() {
if (rocksIterator != null) {
rocksIterator.close();
}
}
@Nullable
@Override
public List<T> onNext() {
while (rocksIterator.isValid()) {
if (!rocksIterator.isValid()) {
break;
}
byte[] key = rocksIterator.key();
if (firstGroupKey == null) { // Fix first value
firstGroupKey = key;
}
if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) {
break;
}
List<T> result = null;
if (Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) {
currentGroupValues.add(transformEntry(key));
} else {
if (!currentGroupValues.isEmpty()) {
result = currentGroupValues;
}
firstGroupKey = key;
currentGroupValues = new ArrayList<>();
}
if (result != null) {
return result;
}
rocksIterator.next();
}
if (!currentGroupValues.isEmpty()) {
return currentGroupValues;
}
return null;
}
protected abstract ReadOptions getReadOptions();
protected abstract T transformEntry(byte[] key);
protected byte[] getValue() {
return rocksIterator.value();
}
}

View File

@ -0,0 +1,69 @@
package it.cavallium.dbengine.database;
import java.util.Arrays;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksIterator;
import reactor.core.scheduler.Scheduler;
public abstract class BoundedRocksFluxIterable<T> extends BlockingFluxIterable<T> {
private final RocksDB db;
private final ColumnFamilyHandle cfh;
protected final LLRange range;
protected RocksIterator rocksIterator;
protected ReadOptions readOptions;
public BoundedRocksFluxIterable(Scheduler scheduler,
RocksDB db,
ColumnFamilyHandle cfh,
LLRange range) {
super(scheduler);
this.db = db;
this.cfh = cfh;
this.range = range;
}
@Override
public void onStartup() {
readOptions = this.getReadOptions();
rocksIterator = db.newIterator(cfh, readOptions);
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
}
@Override
public void onTerminate() {
if (rocksIterator != null) {
rocksIterator.close();
}
}
@Nullable
@Override
public T onNext() {
if (!rocksIterator.isValid()) {
return null;
}
byte[] key = rocksIterator.key();
if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) {
return null;
}
rocksIterator.next();
return this.transformEntry(key);
}
protected abstract ReadOptions getReadOptions();
protected abstract T transformEntry(byte[] key);
protected byte[] getValue() {
return rocksIterator.value();
}
}

View File

@ -1,5 +1,7 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.BoundedGroupedRocksFluxIterable;
import it.cavallium.dbengine.database.BoundedRocksFluxIterable;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLRange;
@ -29,10 +31,10 @@ import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Snapshot;
import org.rocksdb.WriteOptions;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
import org.warp.commonutils.locks.Striped;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
@ -555,70 +557,33 @@ public class LLLocalDictionary implements LLDictionary {
}
private Flux<Entry<byte[],byte[]>> getRangeMulti(LLSnapshot snapshot, LLRange range) {
return Flux
.<Entry<byte[], byte[]>>push(sink -> {
try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) {
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
byte[] key;
while (rocksIterator.isValid()) {
key = rocksIterator.key();
if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) {
break;
}
sink.next(Map.entry(key, rocksIterator.value()));
rocksIterator.next();
}
} finally {
sink.complete();
}
})
.subscribeOn(dbScheduler);
return new BoundedRocksFluxIterable<Entry<byte[], byte[]>>(dbScheduler, db, cfh, range) {
@Override
protected ReadOptions getReadOptions() {
return resolveSnapshot(snapshot);
}
@Override
protected Entry<byte[], byte[]> transformEntry(byte[] key) {
return Map.entry(key, this.getValue());
}
}.generate();
}
private Flux<List<Entry<byte[],byte[]>>> getRangeMultiGrouped(LLSnapshot snapshot, LLRange range, int prefixLength) {
return Flux
.<List<Entry<byte[], byte[]>>>push(sink -> {
try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) {
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
byte[] firstGroupKey = null;
List<Entry<byte[], byte[]>> currentGroupValues = new ArrayList<>();
return new BoundedGroupedRocksFluxIterable<Entry<byte[], byte[]>>(dbScheduler, db, cfh, range, prefixLength) {
byte[] key;
while (rocksIterator.isValid()) {
key = rocksIterator.key();
if (firstGroupKey == null) { // Fix first value
firstGroupKey = key;
}
if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) {
break;
}
if (Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) {
currentGroupValues.add(Map.entry(key, rocksIterator.value()));
} else {
if (!currentGroupValues.isEmpty()) {
sink.next(currentGroupValues);
}
firstGroupKey = key;
currentGroupValues = new ArrayList<>();
}
rocksIterator.next();
}
if (!currentGroupValues.isEmpty()) {
sink.next(currentGroupValues);
}
} finally {
sink.complete();
}
})
.subscribeOn(dbScheduler);
@Override
protected ReadOptions getReadOptions() {
return resolveSnapshot(snapshot);
}
@Override
protected Entry<byte[], byte[]> transformEntry(byte[] key) {
return Map.entry(key, this.getValue());
}
}.generate();
}
@Override
@ -634,44 +599,18 @@ public class LLLocalDictionary implements LLDictionary {
@Override
public Flux<List<byte[]>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) {
return Flux
.<List<byte[]>>push(sink -> {
try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) {
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
byte[] firstGroupKey = null;
List<byte[]> currentGroupValues = new ArrayList<>();
return new BoundedGroupedRocksFluxIterable<byte[]>(dbScheduler, db, cfh, range, prefixLength) {
byte[] key;
while (rocksIterator.isValid()) {
key = rocksIterator.key();
if (firstGroupKey == null) { // Fix first value
firstGroupKey = key;
}
if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) {
break;
}
if (!Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) {
if (!currentGroupValues.isEmpty()) {
sink.next(currentGroupValues);
}
firstGroupKey = key;
currentGroupValues = new ArrayList<>();
}
currentGroupValues.add(key);
rocksIterator.next();
}
if (!currentGroupValues.isEmpty()) {
sink.next(currentGroupValues);
}
} finally {
sink.complete();
}
})
.subscribeOn(dbScheduler);
@Override
protected ReadOptions getReadOptions() {
return resolveSnapshot(snapshot);
}
@Override
protected byte[] transformEntry(byte[] key) {
return key;
}
}.generate();
}
private Flux<byte[]> getRangeKeysSingle(LLSnapshot snapshot, byte[] key) {
@ -683,28 +622,18 @@ public class LLLocalDictionary implements LLDictionary {
}
private Flux<byte[]> getRangeKeysMulti(LLSnapshot snapshot, LLRange range) {
return Flux
.<byte[]>push(sink -> {
try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) {
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
byte[] key;
while (rocksIterator.isValid()) {
key = rocksIterator.key();
if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) {
break;
}
sink.next(key);
rocksIterator.next();
}
} finally {
sink.complete();
}
})
.subscribeOn(dbScheduler);
return new BoundedRocksFluxIterable<byte[]>(dbScheduler, db, cfh, range) {
@Override
protected ReadOptions getReadOptions() {
return resolveSnapshot(snapshot);
}
@Override
protected byte[] transformEntry(byte[] key) {
return key;
}
}.generate();
}
//todo: replace implementation with a simple Flux.push