From aa1aa7a6fbaf4ee9791070477d4b8a59b3b10253 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sun, 18 Jul 2021 19:37:24 +0200 Subject: [PATCH] Test more numbers --- .../dbengine/client/CountedStream.java | 8 -- .../dbengine/client/DatabaseOptions.java | 3 +- .../dbengine/client/LuceneIndex.java | 2 +- .../dbengine/client/LuceneIndexImpl.java | 11 ++- .../dbengine/database/LLLuceneIndex.java | 2 +- .../collections/DatabaseStageMap.java | 27 ++++-- .../collections/SubStageGetterHashMap.java | 9 +- .../collections/SubStageGetterHashSet.java | 9 +- .../collections/SubStageGetterMap.java | 8 +- .../collections/SubStageGetterMapDeep.java | 8 +- .../collections/SubStageGetterSet.java | 9 +- .../database/disk/LLLocalDictionary.java | 74 ++++++++++------ .../disk/LLLocalKeyValueDatabase.java | 6 +- .../database/disk/LLLocalLuceneIndex.java | 39 +++++--- .../disk/LLLocalMultiLuceneIndex.java | 4 +- .../memory/BinaryLexicographicList.java | 16 ++++ .../database/memory/LLMemoryDictionary.java | 29 ++++-- .../memory/LLMemoryKeyValueDatabase.java | 18 +++- .../database/memory/LLMemorySingleton.java | 57 ++++++++++++ .../dbengine/lucene/LuceneUtils.java | 4 +- .../ScoredSimpleLuceneShardSearcher.java | 4 +- .../searcher/SimpleLuceneLocalSearcher.java | 88 +++++++++++-------- .../searcher/UnscoredLuceneShardSearcher.java | 4 +- .../it/cavallium/dbengine/DbTestUtils.java | 8 +- .../cavallium/dbengine/OldDatabaseTests.java | 8 +- 25 files changed, 317 insertions(+), 138 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java diff --git a/src/main/java/it/cavallium/dbengine/client/CountedStream.java b/src/main/java/it/cavallium/dbengine/client/CountedStream.java index 8aa3493..7035bb6 100644 --- a/src/main/java/it/cavallium/dbengine/client/CountedStream.java +++ b/src/main/java/it/cavallium/dbengine/client/CountedStream.java @@ -44,12 +44,4 @@ public class CountedStream { public Mono> collectList() { return stream.collectList(); } - - public static Mono> counted(Flux flux) { - var publishedFlux = flux.cache(); - return publishedFlux - .count() - .map(count -> new CountedStream<>(publishedFlux, count)) - .switchIfEmpty(Mono.fromSupplier(() -> new CountedStream<>(Flux.empty(), 0))); - } } diff --git a/src/main/java/it/cavallium/dbengine/client/DatabaseOptions.java b/src/main/java/it/cavallium/dbengine/client/DatabaseOptions.java index 814732c..4ed72f8 100644 --- a/src/main/java/it/cavallium/dbengine/client/DatabaseOptions.java +++ b/src/main/java/it/cavallium/dbengine/client/DatabaseOptions.java @@ -13,4 +13,5 @@ public record DatabaseOptions(Map extraFlags, boolean useDirectIO, boolean allowMemoryMapping, boolean allowNettyDirect, - boolean useNettyDirect) {} + boolean useNettyDirect, + boolean enableDbAssertionsWhenUsingAssertions) {} diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java index cc394e2..67d1558 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java @@ -87,7 +87,7 @@ public interface LuceneIndex extends LLSnapshottable { Mono flush(); - Mono refresh(); + Mono refresh(boolean force); private static ValueTransformer getValueGetterTransformer(ValueGetter valueGetter) { return new ValueTransformer() { diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java index 7390288..4516fe9 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java @@ -93,7 +93,7 @@ public class LuceneIndexImpl implements LuceneIndex { private Mono> transformLuceneResultWithValues(LLSearchResultShard llSearchResult, ValueGetter valueGetter) { - return Mono.just(new SearchResult<>(llSearchResult.results().map(signal -> { + return Mono.fromCallable(() -> new SearchResult<>(llSearchResult.results().map(signal -> { var key = signal.key().map(indicizer::getKey); return new SearchResultItem<>(key, key.flatMap(valueGetter::get), signal.score()); }), llSearchResult.totalHitsCount(), llSearchResult.release())); @@ -110,7 +110,10 @@ public class LuceneIndexImpl implements LuceneIndex { Mono.just(tuple3.getT3()), tuple3.getT1() )); - return Mono.just(new SearchResult<>(resultItemsFlux, llSearchResult.totalHitsCount(), llSearchResult.release())); + return Mono.fromCallable(() -> new SearchResult<>(resultItemsFlux, + llSearchResult.totalHitsCount(), + llSearchResult.release() + )); } @Override @@ -214,8 +217,8 @@ public class LuceneIndexImpl implements LuceneIndex { * Refresh index searcher */ @Override - public Mono refresh() { - return luceneIndex.refresh(); + public Mono refresh(boolean force) { + return luceneIndex.refresh(force); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java index b24927f..0f38032 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java @@ -70,5 +70,5 @@ public interface LLLuceneIndex extends LLSnapshottable { /** * Refresh index searcher */ - Mono refresh(); + Mono refresh(boolean force); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index 85b34f7..588b534 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -12,7 +12,9 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.BiFunction; import java.util.function.Function; import org.jetbrains.annotations.Nullable; @@ -115,12 +117,20 @@ public interface DatabaseStageMap> extends Dat return removeAndGetPrevious(key).map(o -> true).defaultIfEmpty(false); } + /** + * GetMulti must return the elements in sequence! + */ default Flux> getMulti(@Nullable CompositeSnapshot snapshot, Flux keys, boolean existsAlmostCertainly) { - return keys.flatMapSequential(key -> this - .getValue(snapshot, key, existsAlmostCertainly) - .map(value -> Map.entry(key, value))); + return keys + .flatMapSequential(key -> this + .getValue(snapshot, key, existsAlmostCertainly) + .map(value -> Map.entry(key, value)) + ); } + /** + * GetMulti must return the elements in sequence! + */ default Flux> getMulti(@Nullable CompositeSnapshot snapshot, Flux keys) { return getMulti(snapshot, keys, false); } @@ -271,9 +281,14 @@ public interface DatabaseStageMap> extends Dat @Override public Flux> transform(Flux> keys) { return Flux.defer(() -> { - ConcurrentHashMap extraValues = new ConcurrentHashMap<>(); - return getMulti(snapshot, keys.doOnNext(key -> extraValues.put(key.getT2(), key.getT1())).map(Tuple2::getT2)) - .map(result -> Tuples.of(extraValues.get(result.getKey()), result.getKey(), result.getValue())); + ConcurrentLinkedQueue extraValues = new ConcurrentLinkedQueue<>(); + return getMulti(snapshot, keys.map(key -> { + extraValues.add(key.getT1()); + return key.getT2(); + })).map(result -> { + var extraValue = extraValues.remove(); + return Tuples.of(extraValue, result.getKey(), result.getValue()); + }); }); } }; diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java index 18fd68b..cc1d09a 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java @@ -30,15 +30,18 @@ public class SubStageGetterHashMap implements private final Serializer valueSerializer; private final Function keyHashFunction; private final SerializerFixedBinaryLength keyHashSerializer; + private final boolean enableAssertionsWhenUsingAssertions; public SubStageGetterHashMap(Serializer keySerializer, Serializer valueSerializer, Function keyHashFunction, - SerializerFixedBinaryLength keyHashSerializer) { + SerializerFixedBinaryLength keyHashSerializer, + boolean enableAssertionsWhenUsingAssertions) { this.keySerializer = keySerializer; this.valueSerializer = valueSerializer; this.keyHashFunction = keyHashFunction; this.keyHashSerializer = keyHashSerializer; + this.enableAssertionsWhenUsingAssertions = enableAssertionsWhenUsingAssertions; } @Override @@ -49,7 +52,7 @@ public class SubStageGetterHashMap implements try { return Mono .defer(() -> { - if (assertsEnabled) { + if (assertsEnabled && enableAssertionsWhenUsingAssertions) { return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys); } else { return Mono @@ -86,7 +89,7 @@ public class SubStageGetterHashMap implements @Override public boolean needsDebuggingKeyFlux() { - return assertsEnabled; + return assertsEnabled && enableAssertionsWhenUsingAssertions; } private Mono checkKeyFluxConsistency(ByteBuf prefixKey, List keys) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java index b241c71..d82af80 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java @@ -30,13 +30,16 @@ public class SubStageGetterHashSet implements private final Serializer keySerializer; private final Function keyHashFunction; private final SerializerFixedBinaryLength keyHashSerializer; + private final boolean enableAssertionsWhenUsingAssertions; public SubStageGetterHashSet(Serializer keySerializer, Function keyHashFunction, - SerializerFixedBinaryLength keyHashSerializer) { + SerializerFixedBinaryLength keyHashSerializer, + boolean enableAssertionsWhenUsingAssertions) { this.keySerializer = keySerializer; this.keyHashFunction = keyHashFunction; this.keyHashSerializer = keyHashSerializer; + this.enableAssertionsWhenUsingAssertions = enableAssertionsWhenUsingAssertions; } @Override @@ -47,7 +50,7 @@ public class SubStageGetterHashSet implements try { return Mono .defer(() -> { - if (assertsEnabled) { + if (assertsEnabled && enableAssertionsWhenUsingAssertions) { return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys); } else { return Mono @@ -83,7 +86,7 @@ public class SubStageGetterHashSet implements @Override public boolean needsDebuggingKeyFlux() { - return assertsEnabled; + return assertsEnabled && enableAssertionsWhenUsingAssertions; } private Mono checkKeyFluxConsistency(ByteBuf prefixKey, List keys) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java index cf1ecfe..0cf8b94 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java @@ -25,11 +25,13 @@ public class SubStageGetterMap implements SubStageGetter, Databa private final SerializerFixedBinaryLength keySerializer; private final Serializer valueSerializer; + private final boolean enableAssertionsWhenUsingAssertions; public SubStageGetterMap(SerializerFixedBinaryLength keySerializer, - Serializer valueSerializer) { + Serializer valueSerializer, boolean enableAssertionsWhenUsingAssertions) { this.keySerializer = keySerializer; this.valueSerializer = valueSerializer; + this.enableAssertionsWhenUsingAssertions = enableAssertionsWhenUsingAssertions; } @Override @@ -40,7 +42,7 @@ public class SubStageGetterMap implements SubStageGetter, Databa try { return Mono .defer(() -> { - if (assertsEnabled) { + if (assertsEnabled && enableAssertionsWhenUsingAssertions) { return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys); } else { return Mono @@ -75,7 +77,7 @@ public class SubStageGetterMap implements SubStageGetter, Databa @Override public boolean needsDebuggingKeyFlux() { - return assertsEnabled; + return assertsEnabled && enableAssertionsWhenUsingAssertions; } private Mono checkKeyFluxConsistency(ByteBuf prefixKey, List keys) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java index 2e5b0f0..97ebae2 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java @@ -25,14 +25,16 @@ public class SubStageGetterMapDeep> implements private final SubStageGetter subStageGetter; private final SerializerFixedBinaryLength keySerializer; private final int keyExtLength; + private final boolean enableAssertionsWhenUsingAssertions; public SubStageGetterMapDeep(SubStageGetter subStageGetter, SerializerFixedBinaryLength keySerializer, - int keyExtLength) { + int keyExtLength, boolean enableAssertionsWhenUsingAssertions) { this.subStageGetter = subStageGetter; this.keySerializer = keySerializer; this.keyExtLength = keyExtLength; assert keyExtConsistency(); + this.enableAssertionsWhenUsingAssertions = enableAssertionsWhenUsingAssertions; } @@ -54,7 +56,7 @@ public class SubStageGetterMapDeep> implements try { return Mono .defer(() -> { - if (assertsEnabled) { + if (assertsEnabled && enableAssertionsWhenUsingAssertions) { return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys); } else { return Mono @@ -90,7 +92,7 @@ public class SubStageGetterMapDeep> implements @Override public boolean needsDebuggingKeyFlux() { - return assertsEnabled; + return assertsEnabled && enableAssertionsWhenUsingAssertions; } private Mono checkKeyFluxConsistency(ByteBuf prefixKey, List keys) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java index facf140..a86f15b 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java @@ -24,9 +24,12 @@ public class SubStageGetterSet implements SubStageGetter, Dat } private final SerializerFixedBinaryLength keySerializer; + private final boolean enableAssertionsWhenUsingAssertions; - public SubStageGetterSet(SerializerFixedBinaryLength keySerializer) { + public SubStageGetterSet(SerializerFixedBinaryLength keySerializer, + boolean enableAssertionsWhenUsingAssertions) { this.keySerializer = keySerializer; + this.enableAssertionsWhenUsingAssertions = enableAssertionsWhenUsingAssertions; } @Override @@ -37,7 +40,7 @@ public class SubStageGetterSet implements SubStageGetter, Dat try { return Mono .defer(() -> { - if (assertsEnabled) { + if (assertsEnabled && enableAssertionsWhenUsingAssertions) { return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys); } else { return Mono @@ -72,7 +75,7 @@ public class SubStageGetterSet implements SubStageGetter, Dat @Override public boolean needsDebuggingKeyFlux() { - return assertsEnabled; + return assertsEnabled && enableAssertionsWhenUsingAssertions; } private Mono checkKeyFluxConsistency(ByteBuf prefixKey, List keys) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index b4144d1..71732a9 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -287,7 +287,9 @@ public class LLLocalDictionary implements LLDictionary { throw new RocksDBException("Key buffer must be direct"); } ByteBuffer keyNioBuffer = LLUtils.toDirect(key); - assert keyNioBuffer.isDirect(); + if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) { + assert keyNioBuffer.isDirect(); + } // Create a direct result buffer because RocksDB works only with direct buffers ByteBuf resultBuf = alloc.directBuffer(LLLocalDictionary.INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES); try { @@ -297,35 +299,39 @@ public class LLLocalDictionary implements LLDictionary { do { // Create the result nio buffer to pass to RocksDB resultNioBuf = resultBuf.nioBuffer(0, resultBuf.capacity()); - assert keyNioBuffer.isDirect(); - assert resultNioBuf.isDirect(); + if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) { + assert keyNioBuffer.isDirect(); + assert resultNioBuf.isDirect(); + } valueSize = db.get(cfh, Objects.requireNonNullElse(readOptions, EMPTY_READ_OPTIONS), keyNioBuffer.position(0), resultNioBuf ); if (valueSize != RocksDB.NOT_FOUND) { - // todo: check if position is equal to data that have been read - // todo: check if limit is equal to value size or data that have been read - assert valueSize <= 0 || resultNioBuf.limit() > 0; + if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) { + // todo: check if position is equal to data that have been read + // todo: check if limit is equal to value size or data that have been read + assert valueSize <= 0 || resultNioBuf.limit() > 0; - // If the locking is enabled the data is safe, so since we are appending data to the end, - // we need to check if it has been appended correctly or it it has been overwritten. - // We must not do this check otherwise because if there is no locking the data can be - // overwritten with a smaller value the next time. - if (updateMode == UpdateMode.ALLOW) { - // Check if read data is larger than previously read data. - // If it's smaller or equals it means that RocksDB is overwriting the beginning of the result buffer. - assert resultNioBuf.limit() > assertionReadData; - if (ASSERTIONS_ENABLED) { - assertionReadData = resultNioBuf.limit(); + // If the locking is enabled the data is safe, so since we are appending data to the end, + // we need to check if it has been appended correctly or it it has been overwritten. + // We must not do this check otherwise because if there is no locking the data can be + // overwritten with a smaller value the next time. + if (updateMode == UpdateMode.ALLOW) { + // Check if read data is larger than previously read data. + // If it's smaller or equals it means that RocksDB is overwriting the beginning of the result buffer. + assert resultNioBuf.limit() > assertionReadData; + if (ASSERTIONS_ENABLED) { + assertionReadData = resultNioBuf.limit(); + } } - } - // Check if read data is not bigger than the total value size. - // If it's bigger it means that RocksDB is writing the start of the result into the result - // buffer more than once. - assert resultNioBuf.limit() <= valueSize; + // Check if read data is not bigger than the total value size. + // If it's bigger it means that RocksDB is writing the start of the result into the result + // buffer more than once. + assert resultNioBuf.limit() <= valueSize; + } if (valueSize <= resultNioBuf.limit()) { // Return the result ready to be read @@ -392,13 +398,17 @@ public class LLLocalDictionary implements LLDictionary { if (!value.isDirect()) { throw new RocksDBException("Value buffer must be direct"); } - var keyNioBuffer = LLUtils.toDirect(key); + var keyNioBuffer = LLUtils.toDirect(key); + if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) { assert keyNioBuffer.isDirect(); + } - var valueNioBuffer = LLUtils.toDirect(value); + var valueNioBuffer = LLUtils.toDirect(value); + if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) { assert valueNioBuffer.isDirect(); - db.put(cfh, Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS), keyNioBuffer, valueNioBuffer); + } + db.put(cfh, Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS), keyNioBuffer, valueNioBuffer); } else { db.put(cfh, Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS), LLUtils.toArray(key), LLUtils.toArray(value)); } @@ -750,9 +760,11 @@ public class LLLocalDictionary implements LLDictionary { ByteBuf prevDataToSendToUpdater = prevData == null ? null : prevData.retainedSlice(); try { newData = updater.apply(prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.retain()); - assert prevDataToSendToUpdater == null - || prevDataToSendToUpdater.readerIndex() == 0 - || !prevDataToSendToUpdater.isReadable(); + if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) { + assert prevDataToSendToUpdater == null + || prevDataToSendToUpdater.readerIndex() == 0 + || !prevDataToSendToUpdater.isReadable(); + } } finally { if (prevDataToSendToUpdater != null) { prevDataToSendToUpdater.release(); @@ -892,7 +904,9 @@ public class LLLocalDictionary implements LLDictionary { .single() .map(LLUtils::booleanToResponseByteBuffer) .doAfterTerminate(() -> { - assert key.refCnt() > 0; + if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) { + assert key.refCnt() > 0; + } }); case PREVIOUS_VALUE -> Mono .fromCallable(() -> { @@ -918,7 +932,9 @@ public class LLLocalDictionary implements LLDictionary { try { return dbGet(cfh, null, key.retain(), true); } finally { - assert key.refCnt() > 0; + if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) { + assert key.refCnt() > 0; + } } } } else { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 98c2a64..d2b3bde 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -470,8 +470,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { private ColumnFamilyHandle getCfh(byte[] columnName) throws RocksDBException { ColumnFamilyHandle cfh = handles.get(Column.special(Column.toString(columnName))); //noinspection RedundantIfStatement - if (!enableColumnsBug) { - assert Arrays.equals(cfh.getName(), columnName); + if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) { + if (!enableColumnsBug) { + assert Arrays.equals(cfh.getName(), columnName); + } } return cfh; } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index bb09606..162e334 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -91,7 +91,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "lucene", Integer.MAX_VALUE, - true + false ); // Scheduler used to get callback values of LuceneStreamSearcher without creating deadlocks private final Scheduler luceneSearcherScheduler = Schedulers.newBoundedElastic( @@ -99,7 +99,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "lucene-searcher", 60, - true + false + ); + // Scheduler used to get callback values of LuceneStreamSearcher without creating deadlocks + private final Scheduler luceneWriterScheduler = Schedulers.newBoundedElastic( + 4, + Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, + "lucene-writer", + 60, + false ); private final String luceneIndexName; @@ -353,12 +361,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return Mono.fromCallable(() -> { scheduledTasksLifecycle.startScheduledTask(); try { + //noinspection BlockingMethodInNonBlockingContext indexWriter.addDocument(LLUtils.toDocument(doc)); return null; } finally { scheduledTasksLifecycle.endScheduledTask(); } - }).subscribeOn(Schedulers.boundedElastic()); + }).subscribeOn(luceneWriterScheduler); } @Override @@ -369,13 +378,14 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .fromCallable(() -> { scheduledTasksLifecycle.startScheduledTask(); try { + //noinspection BlockingMethodInNonBlockingContext indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList)); return null; } finally { scheduledTasksLifecycle.endScheduledTask(); } }) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(luceneWriterScheduler) ); } @@ -385,12 +395,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return Mono.fromCallable(() -> { scheduledTasksLifecycle.startScheduledTask(); try { + //noinspection BlockingMethodInNonBlockingContext indexWriter.deleteDocuments(LLUtils.toTerm(id)); return null; } finally { scheduledTasksLifecycle.endScheduledTask(); } - }).subscribeOn(Schedulers.boundedElastic()); + }).subscribeOn(luceneWriterScheduler); } @Override @@ -398,12 +409,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return Mono.fromCallable(() -> { scheduledTasksLifecycle.startScheduledTask(); try { + //noinspection BlockingMethodInNonBlockingContext indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document)); } finally { scheduledTasksLifecycle.endScheduledTask(); } return null; - }).subscribeOn(Schedulers.boundedElastic()); + }).subscribeOn(luceneWriterScheduler); } @Override @@ -419,6 +431,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { for (Entry entry : documentsMap.entrySet()) { LLTerm key = entry.getKey(); LLDocument value = entry.getValue(); + //noinspection BlockingMethodInNonBlockingContext indexWriter.updateDocument(LLUtils.toTerm(key), LLUtils.toDocument(value)); } return null; @@ -426,7 +439,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { scheduledTasksLifecycle.endScheduledTask(); } }) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(luceneWriterScheduler); } @Override @@ -634,14 +647,20 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } @Override - public Mono refresh() { + public Mono refresh(boolean force) { return Mono .fromCallable(() -> { scheduledTasksLifecycle.startScheduledTask(); try { if (scheduledTasksLifecycle.isCancelled()) return null; - //noinspection BlockingMethodInNonBlockingContext - searcherManager.maybeRefresh(); + if (force) { + if (scheduledTasksLifecycle.isCancelled()) return null; + //noinspection BlockingMethodInNonBlockingContext + searcherManager.maybeRefreshBlocking(); + } else { + //noinspection BlockingMethodInNonBlockingContext + searcherManager.maybeRefresh(); + } } finally { scheduledTasksLifecycle.endScheduledTask(); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index bd5e5c5..5405dd6 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -269,10 +269,10 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { } @Override - public Mono refresh() { + public Mono refresh(boolean force) { return Flux .fromArray(luceneIndices) - .flatMap(LLLocalLuceneIndex::refresh) + .flatMap(index -> index.refresh(force)) .then(); } diff --git a/src/main/java/it/cavallium/dbengine/database/memory/BinaryLexicographicList.java b/src/main/java/it/cavallium/dbengine/database/memory/BinaryLexicographicList.java index 8e77fd3..6fb0b6a 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/BinaryLexicographicList.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/BinaryLexicographicList.java @@ -252,6 +252,22 @@ public class BinaryLexicographicList implements ByteList { return true; } if (o == null || getClass() != o.getClass()) { + if (o instanceof List) { + int i = 0; + for (Object o1 : ((List) o)) { + if (i >= size()) { + return false; + } + if (!(o1 instanceof Byte)) { + return false; + } + if (this.bytes[i] != (Byte) o1) { + return false; + } + i++; + } + return (size() == i); + } return false; } BinaryLexicographicList bytes1 = (BinaryLexicographicList) o; diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java index 7c458b0..608a13d 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java @@ -19,6 +19,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Function; import org.jetbrains.annotations.Nullable; @@ -150,7 +151,7 @@ public class LLMemoryDictionary implements LLDictionary { public Mono put(ByteBuf key, ByteBuf value, LLDictionaryResultType resultType) { try { return Mono - .fromCallable(() -> mainDb.put(k(key),k(value))) + .fromCallable(() -> mainDb.put(k(key), k(value))) .transform(result -> this.transformResult(result, resultType)) .onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause)) .doFirst(key::retain) @@ -169,7 +170,23 @@ public class LLMemoryDictionary implements LLDictionary { public Mono> updateAndGetDelta(ByteBuf key, Function<@Nullable ByteBuf, @Nullable ByteBuf> updater, boolean existsAlmostCertainly) { - return null; + return Mono.fromCallable(() -> { + AtomicReference oldRef = new AtomicReference<>(null); + var newValue = mainDb.compute(k(key), (_unused, old) -> { + if (old != null) { + oldRef.set(kk(old)); + } + var v = updater.apply(old != null ? kk(old) : null); + try { + return k(v); + } finally { + if (v != null) { + v.release(); + } + } + }); + return new Delta<>(oldRef.get(), kk(newValue)); + }); } @Override @@ -197,13 +214,13 @@ public class LLMemoryDictionary implements LLDictionary { Flux> keys, boolean existsAlmostCertainly) { return keys - .handle((key, sink) -> { + .flatMapSequential(key -> { try { - var v = snapshots.get(resolveSnapshot(snapshot)).get(k(key.getT2())); + ByteList v = snapshots.get(resolveSnapshot(snapshot)).get(k(key.getT2())); if (v == null) { - sink.complete(); + return Flux.empty(); } else { - sink.next(Tuples.of(key.getT1(), key.getT2().retain(), kk(v))); + return Flux.just(Tuples.of(key.getT1(), key.getT2().retain(), kk(v))); } } finally { key.getT2().release(); diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java index bd0c792..7c02f0f 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java @@ -34,6 +34,7 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase { private final ConcurrentHashMap>> snapshots = new ConcurrentHashMap<>(); private final ConcurrentHashMap> mainDb; + private final ConcurrentHashMap singletons = new ConcurrentHashMap<>(); public LLMemoryKeyValueDatabase(ByteBufAllocator allocator, String name, List columns) { this.allocator = allocator; @@ -46,8 +47,21 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase { } @Override - public Mono getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue) { - return Mono.error(new UnsupportedOperationException("Not implemented")); + public Mono getSingleton(byte[] singletonListColumnName, byte[] singletonName, byte[] defaultValue) { + var columnNameString = new String(singletonListColumnName, StandardCharsets.UTF_8); + var dict = singletons.computeIfAbsent(columnNameString, _unused -> new LLMemoryDictionary(allocator, + name, + columnNameString, + UpdateMode.ALLOW, + snapshots, + mainDb + )); + return Mono + .fromCallable(() -> new LLMemorySingleton(dict, singletonName)).flatMap(singleton -> singleton + .get(null) + .switchIfEmpty(singleton.set(defaultValue).then(Mono.empty())) + .thenReturn(singleton) + ); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java new file mode 100644 index 0000000..3355516 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java @@ -0,0 +1,57 @@ +package it.cavallium.dbengine.database.memory; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import it.cavallium.dbengine.database.LLDictionaryResultType; +import it.cavallium.dbengine.database.LLSingleton; +import it.cavallium.dbengine.database.LLSnapshot; +import it.cavallium.dbengine.database.LLUtils; +import org.jetbrains.annotations.Nullable; +import reactor.core.publisher.Mono; + +public class LLMemorySingleton implements LLSingleton { + + private final LLMemoryDictionary dict; + private final byte[] singletonName; + + public LLMemorySingleton(LLMemoryDictionary dict, byte[] singletonName) { + this.dict = dict; + this.singletonName = singletonName; + } + + @Override + public String getDatabaseName() { + return dict.getDatabaseName(); + } + + @Override + public Mono get(@Nullable LLSnapshot snapshot) { + var bb = Unpooled.wrappedBuffer(singletonName); + return Mono + .defer(() -> dict.get(snapshot, bb.retain(), false)) + .map(b -> { + try { + return LLUtils.toArray(b); + } finally { + b.release(); + } + }) + .doAfterTerminate(bb::release) + .doFirst(bb::retain); + } + + @Override + public Mono set(byte[] value) { + var bbKey = Unpooled.wrappedBuffer(singletonName); + var bbVal = Unpooled.wrappedBuffer(value); + return Mono + .defer(() -> dict + .put(bbKey.retain(), bbVal.retain(), LLDictionaryResultType.VOID) + ) + .doAfterTerminate(bbKey::release) + .doAfterTerminate(bbVal::release) + .doFirst(bbKey::retain) + .doFirst(bbVal::retain) + .then(); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index b716253..952129f 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -3,9 +3,11 @@ package it.cavallium.dbengine.lucene; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerSimilarities; +import it.cavallium.dbengine.client.query.BasicType; import it.cavallium.dbengine.client.query.QueryParser; import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.database.LLKeyScore; +import it.cavallium.dbengine.database.LLScoreMode; import it.cavallium.dbengine.database.collections.DatabaseMapDictionary; import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep; import it.cavallium.dbengine.database.collections.ValueGetter; @@ -432,6 +434,6 @@ public class LuceneUtils { } public static int totalHitsThreshold() { - return 0; + return 1; } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java index 64b0a82..3818a00 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java @@ -142,8 +142,8 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { return new LuceneSearchResult(result.totalHits.value, firstPageHits - .concatWith(nextHits) - .transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)), + .concatWith(nextHits), + //.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)), release ); }) diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java index 6b622cb..f5081da 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java @@ -56,48 +56,58 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { .take(queryParams.limit(), true); - Flux nextHits = Flux.defer(() -> { - if (paginationInfo.forceSinglePage() || paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) { - return Flux.empty(); - } - return Flux - .generate( - () -> new CurrentPageInfo(LuceneUtils.getLastScoreDoc(firstPageTopDocs.scoreDocs), paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1), - (s, sink) -> { - if (s.last() != null && s.remainingLimit() > 0) { - TopDocs pageTopDocs; - try { - TopDocsCollector collector = TopDocsSearcher.getTopDocsCollector(queryParams.sort(), - s.currentPageLimit(), - s.last(), - LuceneUtils.totalHitsThreshold() - ); - //noinspection BlockingMethodInNonBlockingContext - indexSearcher.search(queryParams.query(), collector); - pageTopDocs = collector.topDocs(); - } catch (IOException e) { - sink.error(e); + Flux nextHits; + if (paginationInfo.forceSinglePage() || paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) { + nextHits = null; + } else { + nextHits = Flux.defer(() -> { + return Flux + .generate( + () -> new CurrentPageInfo(LuceneUtils.getLastScoreDoc(firstPageTopDocs.scoreDocs), paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1), + (s, sink) -> { + if (s.last() != null && s.remainingLimit() > 0) { + TopDocs pageTopDocs; + try { + TopDocsCollector collector = TopDocsSearcher.getTopDocsCollector(queryParams.sort(), + s.currentPageLimit(), + s.last(), + LuceneUtils.totalHitsThreshold() + ); + //noinspection BlockingMethodInNonBlockingContext + indexSearcher.search(queryParams.query(), collector); + pageTopDocs = collector.topDocs(); + } catch (IOException e) { + sink.error(e); + return EMPTY_STATUS; + } + var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs); + sink.next(pageTopDocs); + return new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(), s.pageIndex() + 1); + } else { + sink.complete(); return EMPTY_STATUS; } - var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs); - sink.next(pageTopDocs); - return new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(), s.pageIndex() + 1); - } else { - sink.complete(); - return EMPTY_STATUS; - } - }, - s -> {} - ) - .subscribeOn(scheduler) - .concatMap(topFieldDoc -> LuceneUtils - .convertHits(topFieldDoc.scoreDocs, IndexSearchers.unsharded(indexSearcher), keyFieldName, scheduler) - ); - }); + }, + s -> {} + ) + .subscribeOn(scheduler) + .concatMap(topFieldDoc -> LuceneUtils + .convertHits(topFieldDoc.scoreDocs, IndexSearchers.unsharded(indexSearcher), keyFieldName, scheduler) + ); + }); + } - return new LuceneSearchResult(firstPageTopDocs.totalHits.value, firstPageMono - .concatWith(nextHits) - .transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)), + Flux combinedFlux; + + if (nextHits != null) { + combinedFlux = firstPageMono + .concatWith(nextHits); + } else { + combinedFlux = firstPageMono; + } + + return new LuceneSearchResult(firstPageTopDocs.totalHits.value, combinedFlux, + //.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)), releaseIndexSearcher ); }) diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredLuceneShardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredLuceneShardSearcher.java index c97e07b..9be27a4 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredLuceneShardSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredLuceneShardSearcher.java @@ -131,8 +131,8 @@ class UnscoredLuceneShardSearcher implements LuceneShardSearcher { }); return new LuceneSearchResult(result.totalHits.value, firstPageHits - .concatWith(nextHits) - .transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)), + .concatWith(nextHits), + //.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)), release ); }) diff --git a/src/test/java/it/cavallium/dbengine/DbTestUtils.java b/src/test/java/it/cavallium/dbengine/DbTestUtils.java index ed6e205..911f2a9 100644 --- a/src/test/java/it/cavallium/dbengine/DbTestUtils.java +++ b/src/test/java/it/cavallium/dbengine/DbTestUtils.java @@ -58,7 +58,7 @@ public class DbTestUtils { .then(new LLLocalDatabaseConnection(DbTestUtils.ALLOCATOR, wrkspcPath).connect()) .flatMap(conn -> conn.getDatabase("testdb", List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")), - new DatabaseOptions(Map.of(), true, false, true, false, true, true, true) + new DatabaseOptions(Map.of(), true, false, true, false, true, true, true, true) )), action, db -> db.close().then(Mono.fromCallable(() -> { @@ -149,7 +149,8 @@ public class DbTestUtils { SerializerFixedBinaryLength.utf8(DbTestUtils.ALLOCATOR, key1Bytes), key2Bytes, new SubStageGetterMap<>(SerializerFixedBinaryLength.utf8(DbTestUtils.ALLOCATOR, key2Bytes), - Serializer.utf8(DbTestUtils.ALLOCATOR) + Serializer.utf8(DbTestUtils.ALLOCATOR), + true ) ); } @@ -164,7 +165,8 @@ public class DbTestUtils { new SubStageGetterHashMap<>(Serializer.utf8(DbTestUtils.ALLOCATOR), Serializer.utf8(DbTestUtils.ALLOCATOR), String::hashCode, - SerializerFixedBinaryLength.intSerializer(DbTestUtils.ALLOCATOR) + SerializerFixedBinaryLength.intSerializer(DbTestUtils.ALLOCATOR), + true ) ); } diff --git a/src/test/java/it/cavallium/dbengine/OldDatabaseTests.java b/src/test/java/it/cavallium/dbengine/OldDatabaseTests.java index 5c3c65e..b32759f 100644 --- a/src/test/java/it/cavallium/dbengine/OldDatabaseTests.java +++ b/src/test/java/it/cavallium/dbengine/OldDatabaseTests.java @@ -75,7 +75,7 @@ public class OldDatabaseTests { .map(dictionary -> DatabaseMapDictionaryDeep.deepTail(dictionary, new FixedStringSerializer(3), 4, - new SubStageGetterMap<>(new FixedStringSerializer(4), Serializer.noop()) + new SubStageGetterMap<>(new FixedStringSerializer(4), Serializer.noop(), true) )) .flatMap(collection -> Flux .fromIterable(originalSuperKeys) @@ -135,7 +135,7 @@ public class OldDatabaseTests { .then(new LLLocalDatabaseConnection(PooledByteBufAllocator.DEFAULT, wrkspcPath).connect()) .flatMap(conn -> conn.getDatabase("testdb", List.of(Column.dictionary("testmap")), - new DatabaseOptions(Map.of(), true, false, true, false, true, true, true) + new DatabaseOptions(Map.of(), true, false, true, false, true, true, true, true) )); } @@ -159,14 +159,14 @@ public class OldDatabaseTests { .map(dictionary -> DatabaseMapDictionaryDeep.deepTail(dictionary, new FixedStringSerializer(3), 4, - new SubStageGetterMap<>(new FixedStringSerializer(4), Serializer.noop()) + new SubStageGetterMap<>(new FixedStringSerializer(4), Serializer.noop(), true) )), db .getDictionary("testmap", UpdateMode.DISALLOW) .map(dictionary -> DatabaseMapDictionaryDeep.deepTail(dictionary, new FixedStringSerializer(6), 7, - new SubStageGetterMap<>(new FixedStringSerializer(7), Serializer.noop()) + new SubStageGetterMap<>(new FixedStringSerializer(7), Serializer.noop(), true) )) ) .single()