diff --git a/src/main/java/it/cavallium/dbengine/client/UninterruptibleScheduler.java b/src/main/java/it/cavallium/dbengine/client/UninterruptibleScheduler.java index e2db42e..0a0e325 100644 --- a/src/main/java/it/cavallium/dbengine/client/UninterruptibleScheduler.java +++ b/src/main/java/it/cavallium/dbengine/client/UninterruptibleScheduler.java @@ -62,7 +62,8 @@ public class UninterruptibleScheduler { @Override public void dispose() { - + //todo: check if we need to remove this line + worker.dispose(); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 426caa5..6c517a8 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -1565,7 +1565,7 @@ public class LLLocalDictionary implements LLDictionary { } try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { readOpts.setFillCache(false); - readOpts.setReadaheadSize(32 * 1024); // 32KiB + readOpts.setReadaheadSize(128 * 1024); // 128KiB readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); if (PARALLEL_EXACT_SIZE) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 221552f..a7fb232 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -5,6 +5,8 @@ import static it.cavallium.dbengine.database.LLUtils.MARKER_LUCENE; import static it.cavallium.dbengine.database.LLUtils.toDocument; import static it.cavallium.dbengine.database.LLUtils.toFields; import static it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite.NO_REWRITE; +import static reactor.core.scheduler.Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE; +import static reactor.core.scheduler.Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE; import com.google.common.collect.Multimap; import io.micrometer.core.instrument.Counter; @@ -82,6 +84,9 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { */ private static final ReentrantLock shutdownLock = new ReentrantLock(); private static final Scheduler luceneHeavyTasksScheduler = uninterruptibleScheduler(Schedulers.single(Schedulers.boundedElastic())); + //todo: remove after https://github.com/reactor/reactor-core/issues/2960 is fixed + private static final Scheduler bulkScheduler = uninterruptibleScheduler(Schedulers.newBoundedElastic( + DEFAULT_BOUNDED_ELASTIC_SIZE, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "bulkBoundedElastic", 60, true)); static { LLUtils.initHooks(); @@ -275,7 +280,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { public Mono addDocuments(boolean atomic, Flux> documents) { if (!atomic) { return documents - .publishOn(uninterruptibleScheduler(Schedulers.boundedElastic())) + .publishOn(bulkScheduler) .handle((document, sink) -> { LLUpdateDocument value = document.getValue(); startedDocIndexings.increment(); @@ -297,7 +302,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } else { return documents .collectList() - .publishOn(uninterruptibleScheduler(Schedulers.boundedElastic())) + .publishOn(bulkScheduler) .handle((documentsList, sink) -> { var count = documentsList.size(); StopWatch stopWatch = StopWatch.createStarted(); @@ -363,7 +368,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { @Override public Mono updateDocuments(Flux> documents) { return documents - .publishOn(uninterruptibleScheduler(Schedulers.boundedElastic())) + .publishOn(bulkScheduler) .handle((document, sink) -> { LLTerm key = document.getKey(); LLUpdateDocument value = document.getValue(); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index 39bf8ff..1ec7e75 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -33,7 +33,6 @@ import it.unimi.dsi.fastutil.ints.IntList; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -43,6 +42,9 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper; import org.apache.lucene.search.similarities.PerFieldSimilarityWrapper; import org.jetbrains.annotations.NotNull; @@ -53,6 +55,12 @@ import reactor.core.scheduler.Schedulers; public class LLLocalMultiLuceneIndex implements LLLuceneIndex { + private static final Logger LOG = LogManager.getLogger(LLLuceneIndex.class); + private static final boolean BYPASS_GROUPBY_BUG = Boolean.parseBoolean(System.getProperty( + "it.cavallium.dbengine.bypassGroupByBug", + "false" + )); + static { LLUtils.initHooks(); } @@ -156,10 +164,43 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { @Override public Mono addDocuments(boolean atomic, Flux> documents) { - return documents - .groupBy(term -> getLuceneIndex(term.getKey())) - .flatMap(group -> group.key().addDocuments(atomic, group)) - .then(); + if (BYPASS_GROUPBY_BUG) { + return documents + .buffer(8192) + .flatMap(inputEntries -> { + List>[] sortedEntries = new List[totalShards]; + Mono[] results = new Mono[totalShards]; + + // Sort entries + for(var inputEntry : inputEntries) { + int luceneIndexId = LuceneUtils.getLuceneIndexId(inputEntry.getKey(), totalShards); + if (sortedEntries[luceneIndexId] == null) { + sortedEntries[luceneIndexId] = new ArrayList<>(); + } + sortedEntries[luceneIndexId].add(inputEntry); + } + + // Add documents + int luceneIndexId = 0; + for (List> docs : sortedEntries) { + if (docs != null && !docs.isEmpty()) { + LLLocalLuceneIndex luceneIndex = Objects.requireNonNull(luceneIndicesById[luceneIndexId]); + results[luceneIndexId] = luceneIndex.addDocuments(atomic, Flux.fromIterable(docs)); + } else { + results[luceneIndexId] = Mono.empty(); + } + luceneIndexId++; + } + + return Mono.when(results); + }) + .then(); + } else { + return documents + .groupBy(term -> getLuceneIndex(term.getKey())) + .flatMap(group -> group.key().addDocuments(atomic, group)) + .then(); + } } @Override @@ -174,10 +215,43 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { @Override public Mono updateDocuments(Flux> documents) { - return documents - .groupBy(term -> getLuceneIndex(term.getKey())) - .flatMap(group -> group.key().updateDocuments(group)) - .then(); + if (BYPASS_GROUPBY_BUG) { + return documents + .buffer(8192) + .flatMap(inputEntries -> { + List>[] sortedEntries = new List[totalShards]; + Mono[] results = new Mono[totalShards]; + + // Sort entries + for(var inputEntry : inputEntries) { + int luceneIndexId = LuceneUtils.getLuceneIndexId(inputEntry.getKey(), totalShards); + if (sortedEntries[luceneIndexId] == null) { + sortedEntries[luceneIndexId] = new ArrayList<>(); + } + sortedEntries[luceneIndexId].add(inputEntry); + } + + // Add documents + int luceneIndexId = 0; + for (List> docs : sortedEntries) { + if (docs != null && !docs.isEmpty()) { + LLLocalLuceneIndex luceneIndex = Objects.requireNonNull(luceneIndicesById[luceneIndexId]); + results[luceneIndexId] = luceneIndex.updateDocuments(Flux.fromIterable(docs)); + } else { + results[luceneIndexId] = Mono.empty(); + } + luceneIndexId++; + } + + return Mono.when(results); + }) + .then(); + } else { + return documents + .groupBy(term -> getLuceneIndex(term.getKey())) + .flatMap(group -> group.key().updateDocuments(group)) + .then(); + } } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java index da02793..4622343 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java @@ -79,8 +79,8 @@ public abstract class LLLocalReactiveRocksIterator extends return Flux.generate(() -> { var readOptions = new ReadOptions(this.readOptions); if (!rangeShared.hasMin() || !rangeShared.hasMax()) { - //readOptions.setReadaheadSize(32 * 1024); // 32KiB - //readOptions.setFillCache(false); + readOptions.setReadaheadSize(32 * 1024); // 32KiB + readOptions.setFillCache(false); readOptions.setVerifyChecksums(false); } if (logger.isTraceEnabled()) {