diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java index 86896bd..647579e 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java @@ -96,8 +96,8 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection { @Nullable LuceneHacks luceneHacks) { return Mono .fromCallable(() -> { + var env = this.env.get(); if (instancesCount != 1) { - var env = this.env.get(); Objects.requireNonNull(env, "Environment not set"); return new LLLocalMultiLuceneIndex(env, luceneOptions.inMemory() ? null : basePath.resolve("lucene"), @@ -110,7 +110,7 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection { luceneHacks ); } else { - return new LLLocalLuceneIndex(luceneOptions.inMemory() ? null : basePath.resolve("lucene"), + return new LLLocalLuceneIndex(env, luceneOptions.inMemory() ? null : basePath.resolve("lucene"), meterRegistry, name, indicizerAnalyzers, diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index c2db07d..e0f27a9 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -106,7 +106,8 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { private final Phaser activeTasks = new Phaser(1); private final AtomicBoolean closeRequested = new AtomicBoolean(); - public LLLocalLuceneIndex(@Nullable Path luceneBasePath, + public LLLocalLuceneIndex(LLTempLMDBEnv env, + @Nullable Path luceneBasePath, MeterRegistry meterRegistry, String name, IndicizerAnalyzers indicizerAnalyzers, @@ -196,7 +197,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { if (luceneHacks != null && luceneHacks.customLocalSearcher() != null) { localSearcher = luceneHacks.customLocalSearcher().get(); } else { - localSearcher = new AdaptiveLocalSearcher(); + localSearcher = new AdaptiveLocalSearcher(env); } var indexWriterConfig = new IndexWriterConfig(luceneAnalyzer); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index fb25801..d9221e1 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -83,7 +83,8 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { } else { instanceName = name + "_" + String.format("%03d", i); } - luceneIndices[i] = new LLLocalLuceneIndex(lucene, + luceneIndices[i] = new LLLocalLuceneIndex(env, + lucene, meterRegistry, instanceName, indicizerAnalyzers, diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java index 824ae6a..dbac82d 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java @@ -11,9 +11,13 @@ import it.cavallium.dbengine.database.LLDatabaseConnection; import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.disk.LLLocalLuceneIndex; +import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; import it.cavallium.dbengine.lucene.LuceneHacks; import it.cavallium.dbengine.netty.JMXNettyMonitoringManager; +import java.nio.file.Files; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -24,8 +28,10 @@ public class LLMemoryDatabaseConnection implements LLDatabaseConnection { JMXNettyMonitoringManager.initialize(); } + private final AtomicBoolean connected = new AtomicBoolean(); private final BufferAllocator allocator; private final MeterRegistry meterRegistry; + private final AtomicReference env = new AtomicReference<>(); public LLMemoryDatabaseConnection(BufferAllocator allocator, MeterRegistry meterRegistry) { this.allocator = allocator; @@ -44,7 +50,18 @@ public class LLMemoryDatabaseConnection implements LLDatabaseConnection { @Override public Mono connect() { - return Mono.empty(); + return Mono + .fromCallable(() -> { + if (!connected.compareAndSet(false, true)) { + throw new IllegalStateException("Already connected"); + } + var prev = env.getAndSet(new LLTempLMDBEnv()); + if (prev != null) { + throw new IllegalStateException("Env was already set"); + } + return this; + }) + .subscribeOn(Schedulers.boundedElastic()); } @Override @@ -69,19 +86,31 @@ public class LLMemoryDatabaseConnection implements LLDatabaseConnection { LuceneOptions luceneOptions, @Nullable LuceneHacks luceneHacks) { return Mono - .fromCallable(() -> new LLLocalLuceneIndex(null, - meterRegistry, - name, - indicizerAnalyzers, - indicizerSimilarities, - luceneOptions, - luceneHacks - )) + .fromCallable(() -> { + var env = this.env.get(); + return new LLLocalLuceneIndex(env, + null, + meterRegistry, + name, + indicizerAnalyzers, + indicizerSimilarities, + luceneOptions, + luceneHacks + ); + }) .subscribeOn(Schedulers.boundedElastic()); } @Override public Mono disconnect() { - return Mono.empty(); + return Mono.fromCallable(() -> { + if (connected.compareAndSet(true, false)) { + var env = this.env.get(); + if (env != null) { + env.close(); + } + } + return null; + }).subscribeOn(Schedulers.boundedElastic()); } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java index e8ce26b..20f46da 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java @@ -1,19 +1,33 @@ package it.cavallium.dbengine.lucene.searcher; +import static it.cavallium.dbengine.lucene.searcher.MultiSearcher.MAX_IN_MEMORY_SIZE; + import io.net5.buffer.api.Send; import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearchers; +import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; import reactor.core.publisher.Mono; public class AdaptiveLocalSearcher implements LocalSearcher { - private static final LocalSearcher localSearcher = new PagedLocalSearcher(); + private static final LocalSearcher localPagedSearcher = new PagedLocalSearcher(); private static final LocalSearcher countSearcher = new CountMultiSearcher(); + private static final MultiSearcher unsortedUnscoredContinuous = new UnsortedUnscoredStreamingMultiSearcher(); + + private final UnsortedScoredFullMultiSearcher unsortedScoredFull; + + private final SortedScoredFullMultiSearcher sortedScoredFull; + + public AdaptiveLocalSearcher(LLTempLMDBEnv env) { + unsortedScoredFull = new UnsortedScoredFullMultiSearcher(env); + sortedScoredFull = new SortedScoredFullMultiSearcher(env); + } + @Override public Mono collect(Mono> indexSearcher, LocalQueryParams queryParams, @@ -39,14 +53,37 @@ public class AdaptiveLocalSearcher implements LocalSearcher { return "adaptivelocal"; } + // Remember to change also AdaptiveMultiSearcher public Mono transformedCollect(Mono> indexSearcher, LocalQueryParams queryParams, String keyFieldName, LLSearchTransformer transformer) { + // offset + limit + long realLimit = queryParams.offsetLong() + queryParams.limitLong(); + long maxAllowedInMemoryLimit + = Math.max(MAX_IN_MEMORY_SIZE, (long) queryParams.pageLimits().getPageLimit(0)); + if (queryParams.limitLong() == 0) { return countSearcher.collect(indexSearcher, queryParams, keyFieldName, transformer); + } else if (queryParams.isSorted() || queryParams.needsScores()) { + if (realLimit <= maxAllowedInMemoryLimit) { + return localPagedSearcher.collect(indexSearcher, queryParams, keyFieldName, transformer); + } else { + if ((queryParams.isSorted() && !queryParams.isSortedByScore())) { + if (queryParams.limitLong() < MAX_IN_MEMORY_SIZE) { + throw new UnsupportedOperationException("Allowed limit is " + MAX_IN_MEMORY_SIZE + " or greater"); + } + return sortedScoredFull.collect(indexSearcher, queryParams, keyFieldName, transformer); + } else { + if (queryParams.limitLong() < MAX_IN_MEMORY_SIZE) { + throw new UnsupportedOperationException("Allowed limit is " + MAX_IN_MEMORY_SIZE + " or greater"); + } + return unsortedScoredFull.collect(indexSearcher, queryParams, keyFieldName, transformer); + } + } } else { - return localSearcher.collect(indexSearcher, queryParams, keyFieldName, transformer); + // Run large/unbounded searches using the continuous multi searcher + return unsortedUnscoredContinuous.collect(indexSearcher, queryParams, keyFieldName, transformer); } } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java index bf9fab8..d41fca3 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java @@ -40,6 +40,7 @@ public class AdaptiveMultiSearcher implements MultiSearcher { } } + // Remember to change also AdaptiveLocalSearcher public Mono transformedCollectMulti(Mono> indexSearchersMono, LocalQueryParams queryParams, String keyFieldName, diff --git a/src/test/java/it/cavallium/dbengine/TestLuceneIndex.java b/src/test/java/it/cavallium/dbengine/TestLuceneIndex.java index 30175bb..40c311e 100644 --- a/src/test/java/it/cavallium/dbengine/TestLuceneIndex.java +++ b/src/test/java/it/cavallium/dbengine/TestLuceneIndex.java @@ -149,7 +149,7 @@ public class TestLuceneIndex { } } } else { - tempDb.swappableLuceneSearcher().setSingle(new AdaptiveLocalSearcher()); + tempDb.swappableLuceneSearcher().setSingle(new AdaptiveLocalSearcher(ENV)); tempDb.swappableLuceneSearcher().setMulti(new AdaptiveMultiSearcher(ENV)); } return index; diff --git a/src/test/java/it/cavallium/dbengine/TestLuceneSearches.java b/src/test/java/it/cavallium/dbengine/TestLuceneSearches.java index d76bf29..96977aa 100644 --- a/src/test/java/it/cavallium/dbengine/TestLuceneSearches.java +++ b/src/test/java/it/cavallium/dbengine/TestLuceneSearches.java @@ -174,7 +174,7 @@ public class TestLuceneSearches { } else { sink.next(new PagedLocalSearcher()); } - sink.next(new AdaptiveLocalSearcher()); + sink.next(new AdaptiveLocalSearcher(ENV)); } sink.complete(); }, OverflowStrategy.BUFFER); @@ -219,7 +219,7 @@ public class TestLuceneSearches { } } } else { - tempDb.swappableLuceneSearcher().setSingle(new AdaptiveLocalSearcher()); + tempDb.swappableLuceneSearcher().setSingle(new AdaptiveLocalSearcher(ENV)); tempDb.swappableLuceneSearcher().setMulti(new AdaptiveMultiSearcher(ENV)); } return shards ? multiIndex : localIndex;