Update Example.java, LLDictionary.java, and 6 more files...

This commit is contained in:
Andrea Cavalli 2021-02-02 00:09:46 +01:00
parent 023bc3b0dd
commit dbca36b3aa
8 changed files with 455 additions and 299 deletions

View File

@ -5,8 +5,8 @@ import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionary;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
import it.cavallium.dbengine.database.collections.SerializerFixedBinaryLength;
import it.cavallium.dbengine.database.collections.Serializer;
import it.cavallium.dbengine.database.collections.SerializerFixedBinaryLength;
import it.cavallium.dbengine.database.collections.SubStageGetterSingleBytes;
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
import java.io.IOException;
@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -32,7 +33,7 @@ import reactor.util.function.Tuples;
public class Example {
private static final boolean printPreviousValue = false;
private static final int numRepeats = 100;
private static final int numRepeats = 1000;
private static final int batchSize = 10000;
public static void main(String[] args) throws InterruptedException {
@ -52,8 +53,8 @@ public class Example {
*/
testPutMulti()
.then(rangeTestPutMulti())
rangeTestPutMultiProgressive()
.then(rangeTestPutMultiSame())
.subscribeOn(Schedulers.parallel())
.blockOptional();
}
@ -145,11 +146,14 @@ public class Example {
tempDb()
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ssg, ser))),
tuple -> Mono
.defer(() -> tuple.getT2().putMulti(putMultiFlux)
),
tuple -> Mono.defer(() -> tuple.getT2().putMulti(putMultiFlux)),
numRepeats,
tuple -> tuple.getT1().close());
tuple -> Mono
.fromRunnable(() -> System.out.println("Calculating size"))
.then(tuple.getT2().size(null, false))
.doOnNext(s -> System.out.println("Size after: " + s))
.then(tuple.getT1().close())
);
}
private static Mono<Void> rangeTestAtPut() {
@ -227,23 +231,56 @@ public class Example {
tuple -> tuple.getT1().close());
}
private static Mono<Void> rangeTestPutMulti() {
private static Mono<Void> rangeTestPutMultiSame() {
var ser = SerializerFixedBinaryLength.noop(4);
var vser = Serializer.noop();
HashMap<byte[], byte[]> keysToPut = new HashMap<>();
for (int i = 0; i < batchSize; i++) {
keysToPut.put(Ints.toByteArray(i * 3), Ints.toByteArray(i * 11));
}
var putMultiFlux = Flux.fromIterable(keysToPut.entrySet());
return test("MapDictionary::putMulti (batch of " + batchSize + " entries)",
tempDb()
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, vser))),
tuple -> Mono
.defer(() -> tuple.getT2().putMulti(putMultiFlux)
.defer(() -> tuple.getT2().putMulti(Flux.fromIterable(keysToPut.entrySet()))
),
numRepeats,
tuple -> tuple.getT1().close());
tuple -> Mono
.fromRunnable(() -> System.out.println("Calculating size"))
.then(tuple.getT2().size(null, false))
.doOnNext(s -> System.out.println("Size after: " + s))
.then(tuple.getT1().close())
);
}
private static Mono<Void> rangeTestPutMultiProgressive() {
var ser = SerializerFixedBinaryLength.noop(4);
var vser = Serializer.noop();
AtomicInteger ai = new AtomicInteger(0);
return test("MapDictionary::putMulti (batch of " + batchSize + " entries)",
tempDb()
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, vser))),
tuple -> Mono
.defer(() -> {
var aiv = ai.incrementAndGet();
HashMap<byte[], byte[]> keysToPut = new HashMap<>();
for (int i = 0; i < batchSize; i++) {
keysToPut.put(
Ints.toByteArray(i * 3 + (batchSize * aiv)),
Ints.toByteArray(i * 11 + (batchSize * aiv))
);
}
return tuple.getT2().putMulti(Flux.fromIterable(keysToPut.entrySet()));
}),
numRepeats,
tuple -> Mono
.fromRunnable(() -> System.out.println("Calculating size"))
.then(tuple.getT2().size(null, false))
.doOnNext(s -> System.out.println("Size after: " + s))
.then(tuple.getT1().close())
);
}
private static <U> Mono<? extends LLKeyValueDatabase> tempDb() {
@ -282,18 +319,20 @@ public class Example {
Duration WAIT_TIME_END = Duration.ofSeconds(5);
return Mono
.delay(WAIT_TIME)
.doOnSuccess(s -> {
System.out.println("----------------------------------------------------------------------");
System.out.println(name);
})
.then(Mono.fromRunnable(() -> instantInit.tryEmitValue(now())))
.then(setup)
.doOnSuccess(s -> instantInitTest.tryEmitValue(now()))
.flatMap(a ->Mono.defer(() -> test.apply(a)).repeat(numRepeats)
.flatMap(a -> Mono.defer(() -> test.apply(a)).repeat(numRepeats - 1)
.then()
.doOnSuccess(s -> instantEndTest.tryEmitValue(now()))
.then(close.apply(a)))
.doOnSuccess(s -> instantEnd.tryEmitValue(now()))
.then(Mono.zip(instantInit.asMono(), instantInitTest.asMono(), instantEndTest.asMono(), instantEnd.asMono()))
.doOnSuccess(tuple -> {
System.out.println("----------------------------------------------------------------------");
System.out.println(name);
System.out.println(
"\t - Executed " + DecimalFormat.getInstance(Locale.ITALY).format((numRepeats * batchSize)) + " times:");
System.out.println("\t - Test time: " + DecimalFormat

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine.database;
import java.util.List;
import java.util.Map.Entry;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
@ -22,21 +23,30 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
Flux<Entry<byte[], byte[]>> getRange(@Nullable LLSnapshot snapshot, LLRange range);
Flux<List<Entry<byte[], byte[]>>> getRangeGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength);
Flux<byte[]> getRangeKeys(@Nullable LLSnapshot snapshot, LLRange range);
Flux<List<byte[]>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength);
Flux<Entry<byte[], byte[]>> setRange(LLRange range, Flux<Entry<byte[], byte[]>> entries, boolean getOldValues);
default Mono<Void> replaceRange(LLRange range, boolean canKeysChange, Function<Entry<byte[], byte[]>, Mono<Entry<byte[], byte[]>>> entriesReplacer) {
Flux<Entry<byte[], byte[]>> replacedFlux = this.getRange(null, range).flatMap(entriesReplacer);
if (canKeysChange) {
return this
.setRange(range, replacedFlux, false)
.then();
} else {
return this
.putMulti(replacedFlux, false)
.then();
}
return Mono.defer(() -> {
if (canKeysChange) {
return this
.setRange(range, this
.getRange(null, range)
.flatMap(entriesReplacer), false)
.then();
} else {
return this
.putMulti(this
.getRange(null, range)
.flatMap(entriesReplacer), false)
.then();
}
});
}
Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, LLRange range);

View File

@ -73,8 +73,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
}
private Entry<byte[], byte[]> stripPrefix(Entry<byte[], byte[]> entry) {
byte[] keySuffix = stripPrefix(entry.getKey());
return Map.entry(keySuffix, entry.getValue());
return Map.entry(stripPrefix(entry.getKey()), entry.getValue());
}
@Override
@ -89,7 +88,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<Long> size(@Nullable CompositeSnapshot snapshot, boolean fast) {
return dictionary.sizeRange(resolveSnapshot(snapshot), range, true);
return dictionary.sizeRange(resolveSnapshot(snapshot), range, fast);
}
@Override
@ -174,9 +173,10 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) {
var serializedEntries = entries
.map(entry -> Map.entry(toKey(serializeSuffix(entry.getKey())), serialize(entry.getValue())));
return dictionary.setRange(range, serializedEntries, true)
return dictionary
.setRange(range,
entries.map(entry ->
Map.entry(toKey(serializeSuffix(entry.getKey())), serialize(entry.getValue()))), true)
.map(entry -> Map.entry(deserializeSuffix(stripPrefix(entry.getKey())), deserialize(entry.getValue())));
}

View File

@ -9,11 +9,11 @@ import java.util.Map;
import java.util.Map.Entry;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;
// todo: implement optimized methods
@SuppressWarnings("Convert2MethodRef")
public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implements DatabaseStageMap<T, U, US> {
public static final byte[] EMPTY_BYTES = new byte[0];
@ -196,34 +196,40 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
@Override
public Mono<US> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
byte[] keySuffixData = serializeSuffix(keySuffix);
Flux<byte[]> rangeKeys = this
.dictionary.getRangeKeys(resolveSnapshot(snapshot), toExtRange(keySuffixData)
);
return this.subStageGetter
.subStage(dictionary, snapshot, toKeyWithoutExt(keySuffixData), rangeKeys);
}
@Override
public Flux<Entry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot) {
Flux<GroupedFlux<byte[], byte[]>> groupedFlux = dictionary
.getRangeKeys(resolveSnapshot(snapshot), range)
.groupBy(this::removeExtFromFullKey);
return groupedFlux
.flatMap(rangeKeys -> this.subStageGetter
.subStage(dictionary, snapshot, rangeKeys.key(), rangeKeys)
.map(us -> Map.entry(this.deserializeSuffix(this.stripPrefix(rangeKeys.key())), us))
.subStage(dictionary,
snapshot,
toKeyWithoutExt(keySuffixData),
this.dictionary.getRangeKeys(resolveSnapshot(snapshot), toExtRange(keySuffixData))
);
}
@Override
public Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) {
var newValues = entries
.flatMap(entry -> at(null, entry.getKey()).map(us -> Tuples.of(us, entry.getValue())))
.flatMap(tuple -> tuple.getT1().set(tuple.getT2()));
public Flux<Entry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot) {
return dictionary
.getRangeKeysGrouped(resolveSnapshot(snapshot), range, keyPrefix.length + keySuffixLength)
.flatMap(rangeKeys -> {
//System.out.println(Thread.currentThread() + "\tkReceived range key flux");
byte[] groupKeyWithoutExt = removeExtFromFullKey(rangeKeys.get(0));
byte[] groupSuffix = this.stripPrefix(groupKeyWithoutExt);
return this.subStageGetter
.subStage(dictionary, snapshot, groupKeyWithoutExt, Flux.fromIterable(rangeKeys))
//.doOnSuccess(s -> System.out.println(Thread.currentThread() + "\tObtained stage for a key"))
.map(us -> Map.entry(this.deserializeSuffix(groupSuffix), us));
//.doOnSuccess(s -> System.out.println(Thread.currentThread() + "\tMapped stage for a key"));
}
);
//.doOnNext(s -> System.out.println(Thread.currentThread() + "\tNext stage"))
}
@Override
public Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) {
return getAllStages(null)
.flatMap(stage -> stage.getValue().get(null).map(val -> Map.entry(stage.getKey(), val)))
.concatWith(newValues.then(Mono.empty()));
.concatWith(entries
.flatMap(entry -> at(null, entry.getKey()).map(us -> Tuples.of(us, entry.getValue())))
.flatMap(tuple -> tuple.getT1().set(tuple.getT2()))
.then(Mono.empty()));
}
//todo: temporary wrapper. convert the whole class to buffers

View File

@ -30,7 +30,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
}
default Mono<U> putValueAndGetPrevious(T key, U value) {
return at(null, key).flatMap(v -> v.setAndGetPrevious(value));
return at(null, key).single().flatMap(v -> v.setAndGetPrevious(value));
}
default Mono<Boolean> putValueAndGetStatus(T key, U value) {
@ -76,21 +76,19 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
}
default Mono<Void> replaceAllValues(boolean canKeysChange, Function<Entry<T, U>, Mono<Entry<T, U>>> entriesReplacer) {
Flux<Entry<T, U>> replacedFlux = this
.getAllValues(null)
.flatMap(entriesReplacer);
if (canKeysChange) {
return this
.setAllValues(replacedFlux)
.then();
} else {
return replacedFlux
.flatMap(replacedEntry -> this
.at(null, replacedEntry.getKey())
.map(entry -> entry.set(replacedEntry.getValue()))
)
.then();
}
return Mono.defer(() -> {
if (canKeysChange) {
return this.setAllValues(this.getAllValues(null).flatMap(entriesReplacer)).then();
} else {
return this
.getAllValues(null)
.flatMap(entriesReplacer)
.flatMap(replacedEntry -> this
.at(null, replacedEntry.getKey())
.map(entry -> entry.set(replacedEntry.getValue())))
.then();
}
});
}
default Mono<Void> replaceAll(Function<Entry<T, US>, Mono<Void>> entriesReplacer) {

View File

@ -20,12 +20,25 @@ public class SubStageGetterSingle<T> implements SubStageGetter<T, DatabaseStageE
@Nullable CompositeSnapshot snapshot,
byte[] keyPrefix,
Flux<byte[]> keyFlux) {
return keyFlux.singleOrEmpty().flatMap(key -> Mono.fromCallable(() -> {
if (!Arrays.equals(keyPrefix, key)) {
throw new IndexOutOfBoundsException("Found more than one element!");
}
return null;
})).thenReturn(new DatabaseSingle<>(dictionary, keyPrefix, serializer));
//System.out.println(Thread.currentThread() + "subStageGetterSingle1");
return keyFlux
.singleOrEmpty()
.flatMap(key -> Mono
.<DatabaseStageEntry<T>>fromCallable(() -> {
//System.out.println(Thread.currentThread() + "subStageGetterSingle2");
if (!Arrays.equals(keyPrefix, key)) {
throw new IndexOutOfBoundsException("Found more than one element!");
}
return null;
})
)
.then(Mono.fromSupplier(() -> {
//System.out.println(Thread.currentThread() + "subStageGetterSingle3");
return new DatabaseSingle<T>(dictionary,
keyPrefix,
serializer
);
}));
}
//todo: temporary wrapper. convert the whole class to buffers

View File

@ -9,7 +9,6 @@ import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -27,7 +26,6 @@ import org.rocksdb.RocksIterator;
import org.rocksdb.Snapshot;
import org.rocksdb.WriteOptions;
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
import org.warp.commonutils.type.VariableWrapper;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
@ -159,55 +157,59 @@ public class LLLocalDictionary implements LLDictionary {
@Override
public Mono<byte[]> put(byte[] key, byte[] value, LLDictionaryResultType resultType) {
Mono<byte[]> response = getPrevValue(key, resultType);
return Mono
.fromCallable(() -> {
db.put(cfh, key, value);
return null;
})
.onErrorMap(IOException::new)
.subscribeOn(dbScheduler)
.then(response);
return getPrevValue(key, resultType)
.concatWith(Mono
.fromCallable(() -> {
db.put(cfh, key, value);
return null;
})
.onErrorMap(IOException::new)
.subscribeOn(dbScheduler)
.then(Mono.empty())
).singleOrEmpty();
}
@Override
public Mono<byte[]> remove(byte[] key, LLDictionaryResultType resultType) {
Mono<byte[]> response = getPrevValue(key, resultType);
return Mono
.fromCallable(() -> {
db.delete(cfh, key);
return null;
})
.onErrorMap(IOException::new)
.subscribeOn(dbScheduler)
.then(response);
return getPrevValue(key, resultType)
.concatWith(Mono
.fromCallable(() -> {
db.delete(cfh, key);
return null;
})
.onErrorMap(IOException::new)
.subscribeOn(dbScheduler)
.then(Mono.empty())
).singleOrEmpty();
}
private Mono<byte[]> getPrevValue(byte[] key, LLDictionaryResultType resultType) {
switch (resultType) {
case VALUE_CHANGED:
return containsKey(null, key).single().map(LLUtils::booleanToResponse);
case PREVIOUS_VALUE:
return Mono
.fromCallable(() -> {
var data = new Holder<byte[]>();
if (db.keyMayExist(cfh, key, data)) {
if (data.getValue() != null) {
return data.getValue();
return Mono.defer(() -> {
switch (resultType) {
case VALUE_CHANGED:
return containsKey(null, key).single().map(LLUtils::booleanToResponse);
case PREVIOUS_VALUE:
return Mono
.fromCallable(() -> {
var data = new Holder<byte[]>();
if (db.keyMayExist(cfh, key, data)) {
if (data.getValue() != null) {
return data.getValue();
} else {
return db.get(cfh, key);
}
} else {
return db.get(cfh, key);
return null;
}
} else {
return null;
}
})
.onErrorMap(IOException::new)
.subscribeOn(dbScheduler);
case VOID:
return Mono.empty();
default:
return Mono.error(new IllegalStateException("Unexpected value: " + resultType));
}
})
.onErrorMap(IOException::new)
.subscribeOn(dbScheduler);
case VOID:
return Mono.empty();
default:
return Mono.error(new IllegalStateException("Unexpected value: " + resultType));
}
});
}
@Override
@ -242,11 +244,12 @@ public class LLLocalDictionary implements LLDictionary {
public Flux<Entry<byte[], byte[]>> putMulti(Flux<Entry<byte[], byte[]>> entries, boolean getOldValues) {
return entries
.window(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP))
.publishOn(dbScheduler)
.flatMap(Flux::collectList)
.flatMap(entriesWindow -> this
.getMulti(null, Flux.fromIterable(entriesWindow).map(Entry::getKey))
.publishOn(dbScheduler)
.concatWith(Mono.fromCallable(() -> {
//System.out.println(Thread.currentThread()+"\tTest");
var batch = new CappedWriteBatch(db,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
@ -311,11 +314,26 @@ public class LLLocalDictionary implements LLDictionary {
@Override
public Flux<Entry<byte[], byte[]>> getRange(@Nullable LLSnapshot snapshot, LLRange range) {
if (range.isSingle()) {
return getRangeSingle(snapshot, range.getMin());
} else {
return getRangeMulti(snapshot, range);
}
return Flux.defer(() -> {
if (range.isSingle()) {
return getRangeSingle(snapshot, range.getMin());
} else {
return getRangeMulti(snapshot, range);
}
});
}
@Override
public Flux<List<Entry<byte[], byte[]>>> getRangeGrouped(@Nullable LLSnapshot snapshot,
LLRange range,
int prefixLength) {
return Flux.defer(() -> {
if (range.isSingle()) {
return getRangeSingle(snapshot, range.getMin()).map(List::of);
} else {
return getRangeMultiGrouped(snapshot, range, prefixLength);
}
});
}
private Flux<Entry<byte[],byte[]>> getRangeSingle(LLSnapshot snapshot, byte[] key) {
@ -326,67 +344,138 @@ public class LLLocalDictionary implements LLDictionary {
}
private Flux<Entry<byte[],byte[]>> getRangeMulti(LLSnapshot snapshot, LLRange range) {
return Mono
.fromCallable(() -> {
var iter = db.newIterator(cfh, resolveSnapshot(snapshot));
if (range.hasMin()) {
iter.seek(range.getMin());
} else {
iter.seekToFirst();
return Flux
.<Entry<byte[], byte[]>>push(sink -> {
//System.out.println(Thread.currentThread() + "\tPreparing Read rande item");
try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) {
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
byte[] key;
while (rocksIterator.isValid()) {
key = rocksIterator.key();
if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) {
break;
}
//System.out.println(Thread.currentThread() + "\tRead rande item");
sink.next(Map.entry(key, rocksIterator.value()));
rocksIterator.next();
}
} finally {
//System.out.println(Thread.currentThread() + "\tFinish Read rande item");
sink.complete();
}
return iter;
})
.subscribeOn(dbScheduler)
.flatMapMany(rocksIterator -> Flux
.<Entry<byte[], byte[]>>fromIterable(() -> {
VariableWrapper<byte[]> nextKey = new VariableWrapper<>(null);
VariableWrapper<byte[]> nextValue = new VariableWrapper<>(null);
return new Iterator<>() {
@Override
public boolean hasNext() {
assert nextKey.var == null;
assert nextValue.var == null;
if (!rocksIterator.isValid()) {
nextKey.var = null;
nextValue.var = null;
return false;
}
var key = rocksIterator.key();
var value = rocksIterator.value();
if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) {
nextKey.var = null;
nextValue.var = null;
return false;
}
nextKey.var = key;
nextValue.var = value;
return true;
}
.subscribeOn(dbScheduler);
}
@Override
public Entry<byte[], byte[]> next() {
var key = nextKey.var;
var val = nextValue.var;
assert key != null;
assert val != null;
nextKey.var = null;
nextValue.var = null;
return Map.entry(key, val);
private Flux<List<Entry<byte[],byte[]>>> getRangeMultiGrouped(LLSnapshot snapshot, LLRange range, int prefixLength) {
return Flux
.<List<Entry<byte[], byte[]>>>push(sink -> {
//System.out.println(Thread.currentThread() + "\tPreparing Read rande item");
try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) {
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
byte[] firstGroupKey = null;
List<Entry<byte[], byte[]>> currentGroupValues = new ArrayList<>();
byte[] key;
while (rocksIterator.isValid()) {
key = rocksIterator.key();
if (firstGroupKey == null) { // Fix first value
firstGroupKey = key;
}
if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) {
break;
}
if (Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) {
currentGroupValues.add(Map.entry(key, rocksIterator.value()));
} else {
if (!currentGroupValues.isEmpty()) {
//System.out.println(Thread.currentThread() + "\tRead rande item");
sink.next(currentGroupValues);
}
};
})
.doFinally(signalType -> rocksIterator.close())
.subscribeOn(dbScheduler)
);
firstGroupKey = key;
currentGroupValues = new ArrayList<>();
}
rocksIterator.next();
}
if (!currentGroupValues.isEmpty()) {
//System.out.println(Thread.currentThread() + "\tRead rande item");
sink.next(currentGroupValues);
}
} finally {
//System.out.println(Thread.currentThread() + "\tFinish Read rande item");
sink.complete();
}
})
.subscribeOn(dbScheduler);
}
@Override
public Flux<byte[]> getRangeKeys(@Nullable LLSnapshot snapshot, LLRange range) {
if (range.isSingle()) {
return getRangeKeysSingle(snapshot, range.getMin());
} else {
return getRangeKeysMulti(snapshot, range);
}
return Flux.defer(() -> {
if (range.isSingle()) {
//System.out.println(Thread.currentThread() + "getRangeKeys single");
return getRangeKeysSingle(snapshot, range.getMin()).doOnTerminate(() -> {}/*System.out.println(Thread.currentThread() + "getRangeKeys single end")*/);
} else {
//System.out.println(Thread.currentThread() + "getRangeKeys multi");
return getRangeKeysMulti(snapshot, range);
}
});
}
@Override
public Flux<List<byte[]>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) {
return Flux
.<List<byte[]>>push(sink -> {
//System.out.println(Thread.currentThread() + "\tPreparing Read rande item");
try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) {
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
byte[] firstGroupKey = null;
List<byte[]> currentGroupValues = new ArrayList<>();
byte[] key;
while (rocksIterator.isValid()) {
key = rocksIterator.key();
if (firstGroupKey == null) { // Fix first value
firstGroupKey = key;
}
if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) {
break;
}
if (Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) {
currentGroupValues.add(key);
} else {
if (!currentGroupValues.isEmpty()) {
//System.out.println(Thread.currentThread() + "\tRead rande item");
sink.next(currentGroupValues);
}
firstGroupKey = key;
currentGroupValues = new ArrayList<>();
currentGroupValues.add(key);
}
rocksIterator.next();
}
if (!currentGroupValues.isEmpty()) {
//System.out.println(Thread.currentThread() + "\tRead rande item");
sink.next(currentGroupValues);
}
} finally {
//System.out.println(Thread.currentThread() + "\tFinish Read rande item");
sink.complete();
}
})
.subscribeOn(dbScheduler);
}
private Flux<byte[]> getRangeKeysSingle(LLSnapshot snapshot, byte[] key) {
@ -398,105 +487,90 @@ public class LLLocalDictionary implements LLDictionary {
}
private Flux<byte[]> getRangeKeysMulti(LLSnapshot snapshot, LLRange range) {
return Mono
.fromCallable(() -> {
var iter = db.newIterator(cfh, resolveSnapshot(snapshot));
if (range.hasMin()) {
iter.seek(range.getMin());
} else {
iter.seekToFirst();
return Flux
.<byte[]>push(sink -> {
//System.out.println(Thread.currentThread() + "\tkPreparing Read rande item");
try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) {
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
byte[] key;
sink.onRequest(l -> {}/*System.out.println(Thread.currentThread() + "\tkRequested " + l)*/);
while (rocksIterator.isValid()) {
key = rocksIterator.key();
if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) {
break;
}
//System.out.println(Thread.currentThread() + "\tkRead rande item");
sink.next(key);
rocksIterator.next();
}
} finally {
//System.out.println(Thread.currentThread() + "\tkFinish Read rande item");
sink.complete();
}
return iter;
//System.out.println(Thread.currentThread() + "\tkFinish end Read rande item");
})
.subscribeOn(dbScheduler)
.flatMapMany(rocksIterator -> Flux
.<byte[]>fromIterable(() -> {
VariableWrapper<byte[]> nextKey = new VariableWrapper<>(null);
return new Iterator<>() {
@Override
public boolean hasNext() {
assert nextKey.var == null;
if (!rocksIterator.isValid()) {
nextKey.var = null;
return false;
}
var key = rocksIterator.key();
var value = rocksIterator.value();
if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) {
nextKey.var = null;
return false;
}
nextKey.var = key;
return true;
}
@Override
public byte[] next() {
var key = nextKey.var;
assert key != null;
nextKey.var = null;
return key;
}
};
})
.doFinally(signalType -> rocksIterator.close())
.subscribeOn(dbScheduler)
);
.subscribeOn(dbScheduler);
}
@Override
public Flux<Entry<byte[], byte[]>> setRange(LLRange range,
Flux<Entry<byte[], byte[]>> entries,
boolean getOldValues) {
if (range.isAll()) {
return clear().thenMany(Flux.empty());
} else {
return Mono
.fromCallable(() -> new CappedWriteBatch(db,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS
))
.subscribeOn(dbScheduler)
.flatMapMany(writeBatch -> Mono
.fromCallable(() -> {
synchronized (writeBatch) {
if (range.hasMin() && range.hasMax()) {
writeBatch.deleteRange(cfh, range.getMin(), range.getMax());
writeBatch.delete(cfh, range.getMax());
} else if (range.hasMax()) {
writeBatch.deleteRange(cfh, FIRST_KEY, range.getMax());
writeBatch.delete(cfh, range.getMax());
} else {
try (var it = db.newIterator(cfh, getReadOptions(null))) {
it.seekToLast();
if (it.isValid()) {
writeBatch.deleteRange(cfh, range.getMin(), it.key());
writeBatch.delete(cfh, it.key());
return Flux.defer(() -> {
if (range.isAll()) {
return clear().thenMany(Flux.empty());
} else {
return Mono
.fromCallable(() -> new CappedWriteBatch(db,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS
))
.subscribeOn(dbScheduler)
.flatMapMany(writeBatch -> Mono
.fromCallable(() -> {
synchronized (writeBatch) {
if (range.hasMin() && range.hasMax()) {
writeBatch.deleteRange(cfh, range.getMin(), range.getMax());
writeBatch.delete(cfh, range.getMax());
} else if (range.hasMax()) {
writeBatch.deleteRange(cfh, FIRST_KEY, range.getMax());
writeBatch.delete(cfh, range.getMax());
} else {
try (var it = db.newIterator(cfh, getReadOptions(null))) {
it.seekToLast();
if (it.isValid()) {
writeBatch.deleteRange(cfh, range.getMin(), it.key());
writeBatch.delete(cfh, it.key());
}
}
}
}
}
return null;
})
.subscribeOn(dbScheduler)
.thenMany(entries)
.flatMap(newEntry -> putEntryToWriteBatch(newEntry, getOldValues, writeBatch))
.concatWith(Mono.<Entry<byte[], byte[]>>fromCallable(() -> {
synchronized (writeBatch) {
writeBatch.writeToDbAndClose();
writeBatch.close();
}
return null;
}).subscribeOn(dbScheduler))
.doFinally(signalType -> {
synchronized (writeBatch) {
writeBatch.close();
}
}))
.onErrorMap(IOException::new);
}
return null;
})
.subscribeOn(dbScheduler)
.thenMany(entries)
.flatMap(newEntry -> putEntryToWriteBatch(newEntry, getOldValues, writeBatch))
.concatWith(Mono.<Entry<byte[], byte[]>>fromCallable(() -> {
synchronized (writeBatch) {
writeBatch.writeToDbAndClose();
writeBatch.close();
}
return null;
}).subscribeOn(dbScheduler))
.doFinally(signalType -> {
synchronized (writeBatch) {
writeBatch.close();
}
}))
.onErrorMap(IOException::new);
}
});
}
public Mono<Void> clear() {

View File

@ -319,24 +319,32 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.unicast()
.onBackpressureBuffer(new ArrayBlockingQueue<>(1000));
streamSearcher.search(indexSearcher,
query,
limit,
null,
ScoreMode.COMPLETE,
keyFieldName,
keyScore -> {
EmitResult result = topKeysSink.tryEmitNext(keyScore);
if (result.isFailure()) {
throw new EmissionException(result);
}
},
totalHitsCount -> {
EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount);
if (result.isFailure()) {
throw new EmissionException(result);
}
});
luceneScheduler.schedule(() -> {
try {
streamSearcher.search(indexSearcher,
query,
limit,
null,
ScoreMode.COMPLETE,
keyFieldName,
keyScore -> {
EmitResult result = topKeysSink.tryEmitNext(keyScore);
if (result.isFailure()) {
throw new EmissionException(result);
}
},
totalHitsCount -> {
EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount);
if (result.isFailure()) {
throw new EmissionException(result);
}
});
topKeysSink.tryEmitComplete();
} catch (IOException e) {
topKeysSink.tryEmitError(e);
totalHitsCountSink.tryEmitError(e);
}
});
return new LLSearchResult(totalHitsCountSink.asMono(), Flux.just(topKeysSink.asFlux()));
}).subscribeOn(luceneScheduler)
@ -374,24 +382,32 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.unicast()
.onBackpressureBuffer(new ArrayBlockingQueue<>(PagedStreamSearcher.MAX_ITEMS_PER_PAGE));
streamSearcher.search(indexSearcher,
query,
limit,
luceneSort,
luceneScoreMode,
keyFieldName,
keyScore -> {
EmitResult result = topKeysSink.tryEmitNext(keyScore);
if (result.isFailure()) {
throw new EmissionException(result);
}
},
totalHitsCount -> {
EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount);
if (result.isFailure()) {
throw new EmissionException(result);
}
});
luceneScheduler.schedule(() -> {
try {
streamSearcher.search(indexSearcher,
query,
limit,
luceneSort,
luceneScoreMode,
keyFieldName,
keyScore -> {
EmitResult result = topKeysSink.tryEmitNext(keyScore);
if (result.isFailure()) {
throw new EmissionException(result);
}
},
totalHitsCount -> {
EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount);
if (result.isFailure()) {
throw new EmissionException(result);
}
});
topKeysSink.tryEmitComplete();
} catch (IOException e) {
topKeysSink.tryEmitError(e);
totalHitsCountSink.tryEmitError(e);
}
});
return new LLSearchResult(totalHitsCountSink.asMono(), Flux.just(topKeysSink.asFlux()));
}).subscribeOn(luceneScheduler)