From dbca36b3aafa11858b72496fdbd9df6ffb5ca59b Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 2 Feb 2021 00:09:46 +0100 Subject: [PATCH] Update Example.java, LLDictionary.java, and 6 more files... --- .../it.cavallium.dbengine.client/Example.java | 69 ++- .../dbengine/database/LLDictionary.java | 30 +- .../collections/DatabaseMapDictionary.java | 12 +- .../DatabaseMapDictionaryDeep.java | 48 +- .../collections/DatabaseStageMap.java | 30 +- .../collections/SubStageGetterSingle.java | 25 +- .../database/disk/LLLocalDictionary.java | 452 ++++++++++-------- .../database/disk/LLLocalLuceneIndex.java | 88 ++-- 8 files changed, 455 insertions(+), 299 deletions(-) diff --git a/src/example/java/it.cavallium.dbengine.client/Example.java b/src/example/java/it.cavallium.dbengine.client/Example.java index 77d7a8d..2393deb 100644 --- a/src/example/java/it.cavallium.dbengine.client/Example.java +++ b/src/example/java/it.cavallium.dbengine.client/Example.java @@ -5,8 +5,8 @@ import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.collections.DatabaseMapDictionary; import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep; -import it.cavallium.dbengine.database.collections.SerializerFixedBinaryLength; import it.cavallium.dbengine.database.collections.Serializer; +import it.cavallium.dbengine.database.collections.SerializerFixedBinaryLength; import it.cavallium.dbengine.database.collections.SubStageGetterSingleBytes; import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection; import java.io.IOException; @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.concurrent.CompletionException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -32,7 +33,7 @@ import reactor.util.function.Tuples; public class Example { private static final boolean printPreviousValue = false; - private static final int numRepeats = 100; + private static final int numRepeats = 1000; private static final int batchSize = 10000; public static void main(String[] args) throws InterruptedException { @@ -52,8 +53,8 @@ public class Example { */ - testPutMulti() - .then(rangeTestPutMulti()) + rangeTestPutMultiProgressive() + .then(rangeTestPutMultiSame()) .subscribeOn(Schedulers.parallel()) .blockOptional(); } @@ -145,11 +146,14 @@ public class Example { tempDb() .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ssg, ser))), - tuple -> Mono - .defer(() -> tuple.getT2().putMulti(putMultiFlux) - ), + tuple -> Mono.defer(() -> tuple.getT2().putMulti(putMultiFlux)), numRepeats, - tuple -> tuple.getT1().close()); + tuple -> Mono + .fromRunnable(() -> System.out.println("Calculating size")) + .then(tuple.getT2().size(null, false)) + .doOnNext(s -> System.out.println("Size after: " + s)) + .then(tuple.getT1().close()) + ); } private static Mono rangeTestAtPut() { @@ -227,23 +231,56 @@ public class Example { tuple -> tuple.getT1().close()); } - private static Mono rangeTestPutMulti() { + private static Mono rangeTestPutMultiSame() { var ser = SerializerFixedBinaryLength.noop(4); var vser = Serializer.noop(); HashMap keysToPut = new HashMap<>(); for (int i = 0; i < batchSize; i++) { keysToPut.put(Ints.toByteArray(i * 3), Ints.toByteArray(i * 11)); } - var putMultiFlux = Flux.fromIterable(keysToPut.entrySet()); return test("MapDictionary::putMulti (batch of " + batchSize + " entries)", tempDb() .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, vser))), tuple -> Mono - .defer(() -> tuple.getT2().putMulti(putMultiFlux) + .defer(() -> tuple.getT2().putMulti(Flux.fromIterable(keysToPut.entrySet())) ), numRepeats, - tuple -> tuple.getT1().close()); + tuple -> Mono + .fromRunnable(() -> System.out.println("Calculating size")) + .then(tuple.getT2().size(null, false)) + .doOnNext(s -> System.out.println("Size after: " + s)) + .then(tuple.getT1().close()) + ); + } + + private static Mono rangeTestPutMultiProgressive() { + var ser = SerializerFixedBinaryLength.noop(4); + var vser = Serializer.noop(); + AtomicInteger ai = new AtomicInteger(0); + return test("MapDictionary::putMulti (batch of " + batchSize + " entries)", + tempDb() + .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, vser))), + tuple -> Mono + .defer(() -> { + var aiv = ai.incrementAndGet(); + HashMap keysToPut = new HashMap<>(); + for (int i = 0; i < batchSize; i++) { + keysToPut.put( + Ints.toByteArray(i * 3 + (batchSize * aiv)), + Ints.toByteArray(i * 11 + (batchSize * aiv)) + ); + } + return tuple.getT2().putMulti(Flux.fromIterable(keysToPut.entrySet())); + }), + numRepeats, + tuple -> Mono + .fromRunnable(() -> System.out.println("Calculating size")) + .then(tuple.getT2().size(null, false)) + .doOnNext(s -> System.out.println("Size after: " + s)) + .then(tuple.getT1().close()) + ); } private static Mono tempDb() { @@ -282,18 +319,20 @@ public class Example { Duration WAIT_TIME_END = Duration.ofSeconds(5); return Mono .delay(WAIT_TIME) + .doOnSuccess(s -> { + System.out.println("----------------------------------------------------------------------"); + System.out.println(name); + }) .then(Mono.fromRunnable(() -> instantInit.tryEmitValue(now()))) .then(setup) .doOnSuccess(s -> instantInitTest.tryEmitValue(now())) - .flatMap(a ->Mono.defer(() -> test.apply(a)).repeat(numRepeats) + .flatMap(a -> Mono.defer(() -> test.apply(a)).repeat(numRepeats - 1) .then() .doOnSuccess(s -> instantEndTest.tryEmitValue(now())) .then(close.apply(a))) .doOnSuccess(s -> instantEnd.tryEmitValue(now())) .then(Mono.zip(instantInit.asMono(), instantInitTest.asMono(), instantEndTest.asMono(), instantEnd.asMono())) .doOnSuccess(tuple -> { - System.out.println("----------------------------------------------------------------------"); - System.out.println(name); System.out.println( "\t - Executed " + DecimalFormat.getInstance(Locale.ITALY).format((numRepeats * batchSize)) + " times:"); System.out.println("\t - Test time: " + DecimalFormat diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java index ca810b6..2a5c0b4 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.database; +import java.util.List; import java.util.Map.Entry; import java.util.function.Function; import org.jetbrains.annotations.Nullable; @@ -22,21 +23,30 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure { Flux> getRange(@Nullable LLSnapshot snapshot, LLRange range); + Flux>> getRangeGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength); + Flux getRangeKeys(@Nullable LLSnapshot snapshot, LLRange range); + Flux> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength); + Flux> setRange(LLRange range, Flux> entries, boolean getOldValues); default Mono replaceRange(LLRange range, boolean canKeysChange, Function, Mono>> entriesReplacer) { - Flux> replacedFlux = this.getRange(null, range).flatMap(entriesReplacer); - if (canKeysChange) { - return this - .setRange(range, replacedFlux, false) - .then(); - } else { - return this - .putMulti(replacedFlux, false) - .then(); - } + return Mono.defer(() -> { + if (canKeysChange) { + return this + .setRange(range, this + .getRange(null, range) + .flatMap(entriesReplacer), false) + .then(); + } else { + return this + .putMulti(this + .getRange(null, range) + .flatMap(entriesReplacer), false) + .then(); + } + }); } Mono isRangeEmpty(@Nullable LLSnapshot snapshot, LLRange range); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 23c1e1d..9a684f9 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -73,8 +73,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep stripPrefix(Entry entry) { - byte[] keySuffix = stripPrefix(entry.getKey()); - return Map.entry(keySuffix, entry.getValue()); + return Map.entry(stripPrefix(entry.getKey()), entry.getValue()); } @Override @@ -89,7 +88,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep size(@Nullable CompositeSnapshot snapshot, boolean fast) { - return dictionary.sizeRange(resolveSnapshot(snapshot), range, true); + return dictionary.sizeRange(resolveSnapshot(snapshot), range, fast); } @Override @@ -174,9 +173,10 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> setAllValuesAndGetPrevious(Flux> entries) { - var serializedEntries = entries - .map(entry -> Map.entry(toKey(serializeSuffix(entry.getKey())), serialize(entry.getValue()))); - return dictionary.setRange(range, serializedEntries, true) + return dictionary + .setRange(range, + entries.map(entry -> + Map.entry(toKey(serializeSuffix(entry.getKey())), serialize(entry.getValue()))), true) .map(entry -> Map.entry(deserializeSuffix(stripPrefix(entry.getKey())), deserialize(entry.getValue()))); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index e844507..8bb8d98 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -9,11 +9,11 @@ import java.util.Map; import java.util.Map.Entry; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; -import reactor.core.publisher.GroupedFlux; import reactor.core.publisher.Mono; import reactor.util.function.Tuples; // todo: implement optimized methods +@SuppressWarnings("Convert2MethodRef") public class DatabaseMapDictionaryDeep> implements DatabaseStageMap { public static final byte[] EMPTY_BYTES = new byte[0]; @@ -196,34 +196,40 @@ public class DatabaseMapDictionaryDeep> implem @Override public Mono at(@Nullable CompositeSnapshot snapshot, T keySuffix) { byte[] keySuffixData = serializeSuffix(keySuffix); - Flux rangeKeys = this - .dictionary.getRangeKeys(resolveSnapshot(snapshot), toExtRange(keySuffixData) - ); return this.subStageGetter - .subStage(dictionary, snapshot, toKeyWithoutExt(keySuffixData), rangeKeys); - } - - @Override - public Flux> getAllStages(@Nullable CompositeSnapshot snapshot) { - Flux> groupedFlux = dictionary - .getRangeKeys(resolveSnapshot(snapshot), range) - .groupBy(this::removeExtFromFullKey); - return groupedFlux - .flatMap(rangeKeys -> this.subStageGetter - .subStage(dictionary, snapshot, rangeKeys.key(), rangeKeys) - .map(us -> Map.entry(this.deserializeSuffix(this.stripPrefix(rangeKeys.key())), us)) + .subStage(dictionary, + snapshot, + toKeyWithoutExt(keySuffixData), + this.dictionary.getRangeKeys(resolveSnapshot(snapshot), toExtRange(keySuffixData)) ); } @Override - public Flux> setAllValuesAndGetPrevious(Flux> entries) { - var newValues = entries - .flatMap(entry -> at(null, entry.getKey()).map(us -> Tuples.of(us, entry.getValue()))) - .flatMap(tuple -> tuple.getT1().set(tuple.getT2())); + public Flux> getAllStages(@Nullable CompositeSnapshot snapshot) { + return dictionary + .getRangeKeysGrouped(resolveSnapshot(snapshot), range, keyPrefix.length + keySuffixLength) + .flatMap(rangeKeys -> { + //System.out.println(Thread.currentThread() + "\tkReceived range key flux"); + byte[] groupKeyWithoutExt = removeExtFromFullKey(rangeKeys.get(0)); + byte[] groupSuffix = this.stripPrefix(groupKeyWithoutExt); + return this.subStageGetter + .subStage(dictionary, snapshot, groupKeyWithoutExt, Flux.fromIterable(rangeKeys)) + //.doOnSuccess(s -> System.out.println(Thread.currentThread() + "\tObtained stage for a key")) + .map(us -> Map.entry(this.deserializeSuffix(groupSuffix), us)); + //.doOnSuccess(s -> System.out.println(Thread.currentThread() + "\tMapped stage for a key")); + } + ); + //.doOnNext(s -> System.out.println(Thread.currentThread() + "\tNext stage")) + } + @Override + public Flux> setAllValuesAndGetPrevious(Flux> entries) { return getAllStages(null) .flatMap(stage -> stage.getValue().get(null).map(val -> Map.entry(stage.getKey(), val))) - .concatWith(newValues.then(Mono.empty())); + .concatWith(entries + .flatMap(entry -> at(null, entry.getKey()).map(us -> Tuples.of(us, entry.getValue()))) + .flatMap(tuple -> tuple.getT1().set(tuple.getT2())) + .then(Mono.empty())); } //todo: temporary wrapper. convert the whole class to buffers diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index ffbd3e1..8229dc3 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -30,7 +30,7 @@ public interface DatabaseStageMap> extends Dat } default Mono putValueAndGetPrevious(T key, U value) { - return at(null, key).flatMap(v -> v.setAndGetPrevious(value)); + return at(null, key).single().flatMap(v -> v.setAndGetPrevious(value)); } default Mono putValueAndGetStatus(T key, U value) { @@ -76,21 +76,19 @@ public interface DatabaseStageMap> extends Dat } default Mono replaceAllValues(boolean canKeysChange, Function, Mono>> entriesReplacer) { - Flux> replacedFlux = this - .getAllValues(null) - .flatMap(entriesReplacer); - if (canKeysChange) { - return this - .setAllValues(replacedFlux) - .then(); - } else { - return replacedFlux - .flatMap(replacedEntry -> this - .at(null, replacedEntry.getKey()) - .map(entry -> entry.set(replacedEntry.getValue())) - ) - .then(); - } + return Mono.defer(() -> { + if (canKeysChange) { + return this.setAllValues(this.getAllValues(null).flatMap(entriesReplacer)).then(); + } else { + return this + .getAllValues(null) + .flatMap(entriesReplacer) + .flatMap(replacedEntry -> this + .at(null, replacedEntry.getKey()) + .map(entry -> entry.set(replacedEntry.getValue()))) + .then(); + } + }); } default Mono replaceAll(Function, Mono> entriesReplacer) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java index 0ea29ed..989042c 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java @@ -20,12 +20,25 @@ public class SubStageGetterSingle implements SubStageGetter keyFlux) { - return keyFlux.singleOrEmpty().flatMap(key -> Mono.fromCallable(() -> { - if (!Arrays.equals(keyPrefix, key)) { - throw new IndexOutOfBoundsException("Found more than one element!"); - } - return null; - })).thenReturn(new DatabaseSingle<>(dictionary, keyPrefix, serializer)); + //System.out.println(Thread.currentThread() + "subStageGetterSingle1"); + return keyFlux + .singleOrEmpty() + .flatMap(key -> Mono + .>fromCallable(() -> { + //System.out.println(Thread.currentThread() + "subStageGetterSingle2"); + if (!Arrays.equals(keyPrefix, key)) { + throw new IndexOutOfBoundsException("Found more than one element!"); + } + return null; + }) + ) + .then(Mono.fromSupplier(() -> { + //System.out.println(Thread.currentThread() + "subStageGetterSingle3"); + return new DatabaseSingle(dictionary, + keyPrefix, + serializer + ); + })); } //todo: temporary wrapper. convert the whole class to buffers diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 0e3ea3b..c974fc2 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -9,7 +9,6 @@ import it.unimi.dsi.fastutil.objects.ObjectArrayList; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -27,7 +26,6 @@ import org.rocksdb.RocksIterator; import org.rocksdb.Snapshot; import org.rocksdb.WriteOptions; import org.warp.commonutils.concurrency.atomicity.NotAtomic; -import org.warp.commonutils.type.VariableWrapper; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; @@ -159,55 +157,59 @@ public class LLLocalDictionary implements LLDictionary { @Override public Mono put(byte[] key, byte[] value, LLDictionaryResultType resultType) { - Mono response = getPrevValue(key, resultType); - return Mono - .fromCallable(() -> { - db.put(cfh, key, value); - return null; - }) - .onErrorMap(IOException::new) - .subscribeOn(dbScheduler) - .then(response); + return getPrevValue(key, resultType) + .concatWith(Mono + .fromCallable(() -> { + db.put(cfh, key, value); + return null; + }) + .onErrorMap(IOException::new) + .subscribeOn(dbScheduler) + .then(Mono.empty()) + ).singleOrEmpty(); } @Override public Mono remove(byte[] key, LLDictionaryResultType resultType) { - Mono response = getPrevValue(key, resultType); - return Mono - .fromCallable(() -> { - db.delete(cfh, key); - return null; - }) - .onErrorMap(IOException::new) - .subscribeOn(dbScheduler) - .then(response); + return getPrevValue(key, resultType) + .concatWith(Mono + .fromCallable(() -> { + db.delete(cfh, key); + return null; + }) + .onErrorMap(IOException::new) + .subscribeOn(dbScheduler) + .then(Mono.empty()) + ).singleOrEmpty(); } private Mono getPrevValue(byte[] key, LLDictionaryResultType resultType) { - switch (resultType) { - case VALUE_CHANGED: - return containsKey(null, key).single().map(LLUtils::booleanToResponse); - case PREVIOUS_VALUE: - return Mono - .fromCallable(() -> { - var data = new Holder(); - if (db.keyMayExist(cfh, key, data)) { - if (data.getValue() != null) { - return data.getValue(); + return Mono.defer(() -> { + switch (resultType) { + case VALUE_CHANGED: + return containsKey(null, key).single().map(LLUtils::booleanToResponse); + case PREVIOUS_VALUE: + return Mono + .fromCallable(() -> { + var data = new Holder(); + if (db.keyMayExist(cfh, key, data)) { + if (data.getValue() != null) { + return data.getValue(); + } else { + return db.get(cfh, key); + } } else { - return db.get(cfh, key); + return null; } - } else { - return null; - } - }) - .onErrorMap(IOException::new) - .subscribeOn(dbScheduler); - case VOID: - return Mono.empty(); - default: - return Mono.error(new IllegalStateException("Unexpected value: " + resultType)); - } + }) + .onErrorMap(IOException::new) + .subscribeOn(dbScheduler); + case VOID: + return Mono.empty(); + default: + return Mono.error(new IllegalStateException("Unexpected value: " + resultType)); + } + }); } @Override @@ -242,11 +244,12 @@ public class LLLocalDictionary implements LLDictionary { public Flux> putMulti(Flux> entries, boolean getOldValues) { return entries .window(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP)) - .publishOn(dbScheduler) .flatMap(Flux::collectList) .flatMap(entriesWindow -> this .getMulti(null, Flux.fromIterable(entriesWindow).map(Entry::getKey)) + .publishOn(dbScheduler) .concatWith(Mono.fromCallable(() -> { + //System.out.println(Thread.currentThread()+"\tTest"); var batch = new CappedWriteBatch(db, CAPPED_WRITE_BATCH_CAP, RESERVED_WRITE_BATCH_SIZE, @@ -311,11 +314,26 @@ public class LLLocalDictionary implements LLDictionary { @Override public Flux> getRange(@Nullable LLSnapshot snapshot, LLRange range) { - if (range.isSingle()) { - return getRangeSingle(snapshot, range.getMin()); - } else { - return getRangeMulti(snapshot, range); - } + return Flux.defer(() -> { + if (range.isSingle()) { + return getRangeSingle(snapshot, range.getMin()); + } else { + return getRangeMulti(snapshot, range); + } + }); + } + + @Override + public Flux>> getRangeGrouped(@Nullable LLSnapshot snapshot, + LLRange range, + int prefixLength) { + return Flux.defer(() -> { + if (range.isSingle()) { + return getRangeSingle(snapshot, range.getMin()).map(List::of); + } else { + return getRangeMultiGrouped(snapshot, range, prefixLength); + } + }); } private Flux> getRangeSingle(LLSnapshot snapshot, byte[] key) { @@ -326,67 +344,138 @@ public class LLLocalDictionary implements LLDictionary { } private Flux> getRangeMulti(LLSnapshot snapshot, LLRange range) { - return Mono - .fromCallable(() -> { - var iter = db.newIterator(cfh, resolveSnapshot(snapshot)); - if (range.hasMin()) { - iter.seek(range.getMin()); - } else { - iter.seekToFirst(); + return Flux + .>push(sink -> { + //System.out.println(Thread.currentThread() + "\tPreparing Read rande item"); + try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) { + if (range.hasMin()) { + rocksIterator.seek(range.getMin()); + } else { + rocksIterator.seekToFirst(); + } + byte[] key; + while (rocksIterator.isValid()) { + key = rocksIterator.key(); + if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) { + break; + } + //System.out.println(Thread.currentThread() + "\tRead rande item"); + sink.next(Map.entry(key, rocksIterator.value())); + rocksIterator.next(); + } + } finally { + //System.out.println(Thread.currentThread() + "\tFinish Read rande item"); + sink.complete(); } - return iter; }) - .subscribeOn(dbScheduler) - .flatMapMany(rocksIterator -> Flux - .>fromIterable(() -> { - VariableWrapper nextKey = new VariableWrapper<>(null); - VariableWrapper nextValue = new VariableWrapper<>(null); - return new Iterator<>() { - @Override - public boolean hasNext() { - assert nextKey.var == null; - assert nextValue.var == null; - if (!rocksIterator.isValid()) { - nextKey.var = null; - nextValue.var = null; - return false; - } - var key = rocksIterator.key(); - var value = rocksIterator.value(); - if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) { - nextKey.var = null; - nextValue.var = null; - return false; - } - nextKey.var = key; - nextValue.var = value; - return true; - } + .subscribeOn(dbScheduler); + } - @Override - public Entry next() { - var key = nextKey.var; - var val = nextValue.var; - assert key != null; - assert val != null; - nextKey.var = null; - nextValue.var = null; - return Map.entry(key, val); + private Flux>> getRangeMultiGrouped(LLSnapshot snapshot, LLRange range, int prefixLength) { + return Flux + .>>push(sink -> { + //System.out.println(Thread.currentThread() + "\tPreparing Read rande item"); + try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) { + if (range.hasMin()) { + rocksIterator.seek(range.getMin()); + } else { + rocksIterator.seekToFirst(); + } + byte[] firstGroupKey = null; + List> currentGroupValues = new ArrayList<>(); + + byte[] key; + while (rocksIterator.isValid()) { + key = rocksIterator.key(); + if (firstGroupKey == null) { // Fix first value + firstGroupKey = key; + } + if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) { + break; + } + if (Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) { + currentGroupValues.add(Map.entry(key, rocksIterator.value())); + } else { + if (!currentGroupValues.isEmpty()) { + //System.out.println(Thread.currentThread() + "\tRead rande item"); + sink.next(currentGroupValues); } - }; - }) - .doFinally(signalType -> rocksIterator.close()) - .subscribeOn(dbScheduler) - ); + firstGroupKey = key; + currentGroupValues = new ArrayList<>(); + } + rocksIterator.next(); + } + if (!currentGroupValues.isEmpty()) { + //System.out.println(Thread.currentThread() + "\tRead rande item"); + sink.next(currentGroupValues); + } + } finally { + //System.out.println(Thread.currentThread() + "\tFinish Read rande item"); + sink.complete(); + } + }) + .subscribeOn(dbScheduler); } @Override public Flux getRangeKeys(@Nullable LLSnapshot snapshot, LLRange range) { - if (range.isSingle()) { - return getRangeKeysSingle(snapshot, range.getMin()); - } else { - return getRangeKeysMulti(snapshot, range); - } + return Flux.defer(() -> { + if (range.isSingle()) { + //System.out.println(Thread.currentThread() + "getRangeKeys single"); + return getRangeKeysSingle(snapshot, range.getMin()).doOnTerminate(() -> {}/*System.out.println(Thread.currentThread() + "getRangeKeys single end")*/); + } else { + //System.out.println(Thread.currentThread() + "getRangeKeys multi"); + return getRangeKeysMulti(snapshot, range); + } + }); + } + + @Override + public Flux> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) { + return Flux + .>push(sink -> { + //System.out.println(Thread.currentThread() + "\tPreparing Read rande item"); + try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) { + if (range.hasMin()) { + rocksIterator.seek(range.getMin()); + } else { + rocksIterator.seekToFirst(); + } + byte[] firstGroupKey = null; + List currentGroupValues = new ArrayList<>(); + + byte[] key; + while (rocksIterator.isValid()) { + key = rocksIterator.key(); + if (firstGroupKey == null) { // Fix first value + firstGroupKey = key; + } + if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) { + break; + } + if (Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) { + currentGroupValues.add(key); + } else { + if (!currentGroupValues.isEmpty()) { + //System.out.println(Thread.currentThread() + "\tRead rande item"); + sink.next(currentGroupValues); + } + firstGroupKey = key; + currentGroupValues = new ArrayList<>(); + currentGroupValues.add(key); + } + rocksIterator.next(); + } + if (!currentGroupValues.isEmpty()) { + //System.out.println(Thread.currentThread() + "\tRead rande item"); + sink.next(currentGroupValues); + } + } finally { + //System.out.println(Thread.currentThread() + "\tFinish Read rande item"); + sink.complete(); + } + }) + .subscribeOn(dbScheduler); } private Flux getRangeKeysSingle(LLSnapshot snapshot, byte[] key) { @@ -398,105 +487,90 @@ public class LLLocalDictionary implements LLDictionary { } private Flux getRangeKeysMulti(LLSnapshot snapshot, LLRange range) { - return Mono - .fromCallable(() -> { - var iter = db.newIterator(cfh, resolveSnapshot(snapshot)); - if (range.hasMin()) { - iter.seek(range.getMin()); - } else { - iter.seekToFirst(); + return Flux + .push(sink -> { + //System.out.println(Thread.currentThread() + "\tkPreparing Read rande item"); + try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) { + if (range.hasMin()) { + rocksIterator.seek(range.getMin()); + } else { + rocksIterator.seekToFirst(); + } + byte[] key; + sink.onRequest(l -> {}/*System.out.println(Thread.currentThread() + "\tkRequested " + l)*/); + while (rocksIterator.isValid()) { + key = rocksIterator.key(); + if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) { + break; + } + //System.out.println(Thread.currentThread() + "\tkRead rande item"); + sink.next(key); + rocksIterator.next(); + } + } finally { + //System.out.println(Thread.currentThread() + "\tkFinish Read rande item"); + sink.complete(); } - return iter; + //System.out.println(Thread.currentThread() + "\tkFinish end Read rande item"); }) - .subscribeOn(dbScheduler) - .flatMapMany(rocksIterator -> Flux - .fromIterable(() -> { - VariableWrapper nextKey = new VariableWrapper<>(null); - return new Iterator<>() { - @Override - public boolean hasNext() { - assert nextKey.var == null; - if (!rocksIterator.isValid()) { - nextKey.var = null; - return false; - } - var key = rocksIterator.key(); - var value = rocksIterator.value(); - if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) { - nextKey.var = null; - return false; - } - nextKey.var = key; - return true; - } - - @Override - public byte[] next() { - var key = nextKey.var; - assert key != null; - nextKey.var = null; - return key; - } - }; - }) - .doFinally(signalType -> rocksIterator.close()) - .subscribeOn(dbScheduler) - ); + .subscribeOn(dbScheduler); } @Override public Flux> setRange(LLRange range, Flux> entries, boolean getOldValues) { - if (range.isAll()) { - return clear().thenMany(Flux.empty()); - } else { - return Mono - .fromCallable(() -> new CappedWriteBatch(db, - CAPPED_WRITE_BATCH_CAP, - RESERVED_WRITE_BATCH_SIZE, - MAX_WRITE_BATCH_SIZE, - BATCH_WRITE_OPTIONS - )) - .subscribeOn(dbScheduler) - .flatMapMany(writeBatch -> Mono - .fromCallable(() -> { - synchronized (writeBatch) { - if (range.hasMin() && range.hasMax()) { - writeBatch.deleteRange(cfh, range.getMin(), range.getMax()); - writeBatch.delete(cfh, range.getMax()); - } else if (range.hasMax()) { - writeBatch.deleteRange(cfh, FIRST_KEY, range.getMax()); - writeBatch.delete(cfh, range.getMax()); - } else { - try (var it = db.newIterator(cfh, getReadOptions(null))) { - it.seekToLast(); - if (it.isValid()) { - writeBatch.deleteRange(cfh, range.getMin(), it.key()); - writeBatch.delete(cfh, it.key()); + return Flux.defer(() -> { + if (range.isAll()) { + return clear().thenMany(Flux.empty()); + } else { + return Mono + .fromCallable(() -> new CappedWriteBatch(db, + CAPPED_WRITE_BATCH_CAP, + RESERVED_WRITE_BATCH_SIZE, + MAX_WRITE_BATCH_SIZE, + BATCH_WRITE_OPTIONS + )) + .subscribeOn(dbScheduler) + .flatMapMany(writeBatch -> Mono + .fromCallable(() -> { + synchronized (writeBatch) { + if (range.hasMin() && range.hasMax()) { + writeBatch.deleteRange(cfh, range.getMin(), range.getMax()); + writeBatch.delete(cfh, range.getMax()); + } else if (range.hasMax()) { + writeBatch.deleteRange(cfh, FIRST_KEY, range.getMax()); + writeBatch.delete(cfh, range.getMax()); + } else { + try (var it = db.newIterator(cfh, getReadOptions(null))) { + it.seekToLast(); + if (it.isValid()) { + writeBatch.deleteRange(cfh, range.getMin(), it.key()); + writeBatch.delete(cfh, it.key()); + } } } } - } - return null; - }) - .subscribeOn(dbScheduler) - .thenMany(entries) - .flatMap(newEntry -> putEntryToWriteBatch(newEntry, getOldValues, writeBatch)) - .concatWith(Mono.>fromCallable(() -> { - synchronized (writeBatch) { - writeBatch.writeToDbAndClose(); - writeBatch.close(); - } - return null; - }).subscribeOn(dbScheduler)) - .doFinally(signalType -> { - synchronized (writeBatch) { - writeBatch.close(); - } - })) - .onErrorMap(IOException::new); - } + return null; + }) + .subscribeOn(dbScheduler) + .thenMany(entries) + .flatMap(newEntry -> putEntryToWriteBatch(newEntry, getOldValues, writeBatch)) + .concatWith(Mono.>fromCallable(() -> { + synchronized (writeBatch) { + writeBatch.writeToDbAndClose(); + writeBatch.close(); + } + return null; + }).subscribeOn(dbScheduler)) + .doFinally(signalType -> { + synchronized (writeBatch) { + writeBatch.close(); + } + })) + .onErrorMap(IOException::new); + } + }); } public Mono clear() { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 9bfb58c..b7c2e1b 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -319,24 +319,32 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .unicast() .onBackpressureBuffer(new ArrayBlockingQueue<>(1000)); - streamSearcher.search(indexSearcher, - query, - limit, - null, - ScoreMode.COMPLETE, - keyFieldName, - keyScore -> { - EmitResult result = topKeysSink.tryEmitNext(keyScore); - if (result.isFailure()) { - throw new EmissionException(result); - } - }, - totalHitsCount -> { - EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount); - if (result.isFailure()) { - throw new EmissionException(result); - } - }); + luceneScheduler.schedule(() -> { + try { + streamSearcher.search(indexSearcher, + query, + limit, + null, + ScoreMode.COMPLETE, + keyFieldName, + keyScore -> { + EmitResult result = topKeysSink.tryEmitNext(keyScore); + if (result.isFailure()) { + throw new EmissionException(result); + } + }, + totalHitsCount -> { + EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount); + if (result.isFailure()) { + throw new EmissionException(result); + } + }); + topKeysSink.tryEmitComplete(); + } catch (IOException e) { + topKeysSink.tryEmitError(e); + totalHitsCountSink.tryEmitError(e); + } + }); return new LLSearchResult(totalHitsCountSink.asMono(), Flux.just(topKeysSink.asFlux())); }).subscribeOn(luceneScheduler) @@ -374,24 +382,32 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .unicast() .onBackpressureBuffer(new ArrayBlockingQueue<>(PagedStreamSearcher.MAX_ITEMS_PER_PAGE)); - streamSearcher.search(indexSearcher, - query, - limit, - luceneSort, - luceneScoreMode, - keyFieldName, - keyScore -> { - EmitResult result = topKeysSink.tryEmitNext(keyScore); - if (result.isFailure()) { - throw new EmissionException(result); - } - }, - totalHitsCount -> { - EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount); - if (result.isFailure()) { - throw new EmissionException(result); - } - }); + luceneScheduler.schedule(() -> { + try { + streamSearcher.search(indexSearcher, + query, + limit, + luceneSort, + luceneScoreMode, + keyFieldName, + keyScore -> { + EmitResult result = topKeysSink.tryEmitNext(keyScore); + if (result.isFailure()) { + throw new EmissionException(result); + } + }, + totalHitsCount -> { + EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount); + if (result.isFailure()) { + throw new EmissionException(result); + } + }); + topKeysSink.tryEmitComplete(); + } catch (IOException e) { + topKeysSink.tryEmitError(e); + totalHitsCountSink.tryEmitError(e); + } + }); return new LLSearchResult(totalHitsCountSink.asMono(), Flux.just(topKeysSink.asFlux())); }).subscribeOn(luceneScheduler)