diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLTempHugePqEnv.java b/src/main/java/it/cavallium/dbengine/database/disk/LLTempHugePqEnv.java index 6030e1f..2baf513 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLTempHugePqEnv.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLTempHugePqEnv.java @@ -15,6 +15,7 @@ import org.rocksdb.ChecksumType; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompressionType; import org.rocksdb.DBOptions; import org.rocksdb.InfoLogLevel; import org.rocksdb.RocksDB; @@ -81,10 +82,13 @@ public class LLTempHugePqEnv implements Closeable { var opts = new ColumnFamilyOptions() .setOptimizeFiltersForHits(true) .setParanoidFileChecks(false) + .setEnableBlobFiles(true) + .setBlobCompressionType(CompressionType.LZ4_COMPRESSION) .optimizeLevelStyleCompaction() .setLevelCompactionDynamicLevelBytes(true) .setTableFormatConfig(new BlockBasedTableConfig() .setOptimizeFiltersForMemory(true) + .setVerifyCompression(false) .setChecksumType(ChecksumType.kNoChecksum)); if (comparator != null) { opts.setComparator(comparator); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java b/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java index a6dbaf1..e602b2f 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java @@ -88,12 +88,9 @@ public class SnapshotsManager { throw new IOException("LLSnapshot " + snapshot.getSequenceNumber() + " not found!"); } - indexSnapshot.close(); - var luceneIndexSnapshot = indexSnapshot.getSnapshot(); snapshotter.release(luceneIndexSnapshot); - // Delete unused files after releasing the snapshot - indexWriter.deleteUnusedFiles(); + indexSnapshot.close(); return null; } finally { activeTasks.arriveAndDeregister(); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java index 4f91110..11743c2 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java @@ -14,6 +14,9 @@ import reactor.core.scheduler.Schedulers; public class AdaptiveLocalSearcher implements LocalSearcher { + static final boolean FORCE_HUGE_PQ + = Boolean.parseBoolean(System.getProperty("it.cavallium.hugepq.force", "false")); + private static final StandardSearcher standardSearcher = new StandardSearcher(); private static final LocalSearcher scoredPaged = new PagedLocalSearcher(); @@ -34,8 +37,8 @@ public class AdaptiveLocalSearcher implements LocalSearcher { private final SortedScoredFullMultiSearcher sortedScoredFull; public AdaptiveLocalSearcher(LLTempHugePqEnv env, boolean useHugePq, int maxInMemoryResultEntries) { - sortedByScoreFull = useHugePq ? new SortedByScoreFullMultiSearcher(env) : null; - sortedScoredFull = useHugePq ? new SortedScoredFullMultiSearcher(env) : null; + sortedByScoreFull = (FORCE_HUGE_PQ || useHugePq) ? new SortedByScoreFullMultiSearcher(env) : null; + sortedScoredFull = (FORCE_HUGE_PQ || useHugePq) ? new SortedScoredFullMultiSearcher(env) : null; this.maxInMemoryResultEntries = maxInMemoryResultEntries; } @@ -73,16 +76,16 @@ public class AdaptiveLocalSearcher implements LocalSearcher { long maxAllowedInMemoryLimit = Math.max(maxInMemoryResultEntries, (long) queryParams.pageLimits().getPageLimit(0)); - if (queryParams.limitLong() == 0) { + if (!FORCE_HUGE_PQ && queryParams.limitLong() == 0) { return countSearcher.collect(Mono.just(indexSearcher), queryParams, keyFieldName, transformer); - } else if (realLimit <= maxInMemoryResultEntries) { + } else if (!FORCE_HUGE_PQ && realLimit <= maxInMemoryResultEntries) { return standardSearcher.collect(Mono.just(indexSearcher), queryParams, keyFieldName, transformer); - } else if (queryParams.isSorted()) { - if (realLimit <= maxAllowedInMemoryLimit) { + } else if (FORCE_HUGE_PQ || queryParams.isSorted()) { + if (!FORCE_HUGE_PQ && realLimit <= maxAllowedInMemoryLimit) { return scoredPaged.collect(Mono.just(indexSearcher), queryParams, keyFieldName, transformer); } else { if (queryParams.isSortedByScore()) { - if (queryParams.limitLong() < maxInMemoryResultEntries) { + if (!FORCE_HUGE_PQ && queryParams.limitLong() < maxInMemoryResultEntries) { throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater"); } if (sortedByScoreFull != null) { @@ -91,7 +94,7 @@ public class AdaptiveLocalSearcher implements LocalSearcher { return scoredPaged.collect(Mono.just(indexSearcher), queryParams, keyFieldName, transformer); } } else { - if (queryParams.limitLong() < maxInMemoryResultEntries) { + if (!FORCE_HUGE_PQ && queryParams.limitLong() < maxInMemoryResultEntries) { throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater"); } if (sortedScoredFull != null) { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java index ebc8aff..eb8354c 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java @@ -2,6 +2,7 @@ package it.cavallium.dbengine.lucene.searcher; import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler; import static it.cavallium.dbengine.database.LLUtils.singleOrClose; +import static it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher.FORCE_HUGE_PQ; import static it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite.NO_REWRITE; import io.netty5.buffer.api.Send; @@ -35,8 +36,8 @@ public class AdaptiveMultiSearcher implements MultiSearcher { private final SortedScoredFullMultiSearcher sortedScoredFull; public AdaptiveMultiSearcher(LLTempHugePqEnv env, boolean useHugePq, int maxInMemoryResultEntries) { - sortedByScoreFull = useHugePq ? new SortedByScoreFullMultiSearcher(env) : null; - sortedScoredFull = useHugePq ? new SortedScoredFullMultiSearcher(env) : null; + sortedByScoreFull = (FORCE_HUGE_PQ || useHugePq) ? new SortedByScoreFullMultiSearcher(env) : null; + sortedScoredFull = (FORCE_HUGE_PQ || useHugePq) ? new SortedScoredFullMultiSearcher(env) : null; this.maxInMemoryResultEntries = maxInMemoryResultEntries; } @@ -66,16 +67,16 @@ public class AdaptiveMultiSearcher implements MultiSearcher { long maxAllowedInMemoryLimit = Math.max(maxInMemoryResultEntries, (long) queryParams.pageLimits().getPageLimit(0)); - if (queryParams.limitLong() == 0) { + if (!FORCE_HUGE_PQ && queryParams.limitLong() == 0) { return count.collectMulti(Mono.just(indexSearchers), queryParams, keyFieldName, transformer); - } else if (realLimit <= maxInMemoryResultEntries) { + } else if (!FORCE_HUGE_PQ && realLimit <= maxInMemoryResultEntries) { return standardSearcher.collectMulti(Mono.just(indexSearchers), queryParams, keyFieldName, transformer); - } else if (queryParams.isSorted()) { - if (realLimit <= maxAllowedInMemoryLimit) { + } else if (FORCE_HUGE_PQ || queryParams.isSorted()) { + if (!FORCE_HUGE_PQ && realLimit <= maxAllowedInMemoryLimit) { return scoredPaged.collectMulti(Mono.just(indexSearchers), queryParams, keyFieldName, transformer); } else { if (queryParams.isSortedByScore()) { - if (queryParams.limitLong() < maxInMemoryResultEntries) { + if (!FORCE_HUGE_PQ && queryParams.limitLong() < maxInMemoryResultEntries) { throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater"); } if (sortedByScoreFull != null) { @@ -84,7 +85,7 @@ public class AdaptiveMultiSearcher implements MultiSearcher { return scoredPaged.collectMulti(Mono.just(indexSearchers), queryParams, keyFieldName, transformer); } } else { - if (queryParams.limitLong() < maxInMemoryResultEntries) { + if (!FORCE_HUGE_PQ && queryParams.limitLong() < maxInMemoryResultEntries) { throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater"); } if (sortedScoredFull != null) { diff --git a/src/test/java/it/cavallium/dbengine/TestHugePq.java b/src/test/java/it/cavallium/dbengine/TestHugePq.java index 957b3b7..0440e20 100644 --- a/src/test/java/it/cavallium/dbengine/TestHugePq.java +++ b/src/test/java/it/cavallium/dbengine/TestHugePq.java @@ -9,6 +9,8 @@ import it.cavallium.dbengine.lucene.HugePqPriorityQueue; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; +import java.util.concurrent.ThreadLocalRandom; import java.util.function.Function; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -70,6 +72,20 @@ public class TestHugePq { Assertions.assertEquals(0, queue.top()); } + @Test + public void testAddRandomMulti() { + var list = new ArrayList(1000); + for (int i = 0; i < 1000; i++) { + var n = ThreadLocalRandom.current().nextInt(-20, 20); + queue.add(n); + list.add(n); + } + list.sort(Comparator.reverseOrder()); + for (int i = 0; i < 1000; i++) { + Assertions.assertEquals(list.remove(list.size() - 1), queue.pop()); + } + } + @Test public void testAddMultiClear() { for (int i = 0; i < 1000; i++) { diff --git a/src/test/java/it/cavallium/dbengine/TestHugePqHitQueue.java b/src/test/java/it/cavallium/dbengine/TestHugePqHitQueue.java index ebc544b..7e02e27 100644 --- a/src/test/java/it/cavallium/dbengine/TestHugePqHitQueue.java +++ b/src/test/java/it/cavallium/dbengine/TestHugePqHitQueue.java @@ -11,8 +11,10 @@ import it.cavallium.dbengine.lucene.PriorityQueue; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; import java.util.function.Function; import org.apache.lucene.search.HitQueue; import org.apache.lucene.search.ScoreDoc; @@ -148,6 +150,22 @@ public class TestHugePqHitQueue { assertEqualsScoreDoc(new TextDescription("top value of %s", testingPriorityQueue), new ScoreDoc(1, 0, -1), testingPriorityQueue.top()); } + @Test + public void testAddMultiRandom() { + var list = new ArrayList(1000); + for (int i = 0; i < 1000; i++) { + var ri = ThreadLocalRandom.current().nextInt(0, 20); + list.add(ri); + var item = new ScoreDoc(ri, ri << 1, ri % 4); + testingPriorityQueue.addUnsafe(item); + } + list.sort(Comparator.reverseOrder()); + for (int i = 0; i < 1000; i++) { + var top = list.remove(list.size() - 1); + assertEqualsScoreDoc(new TextDescription("%d value of %s", i, testingPriorityQueue), new ScoreDoc(top, top << 1, top % 4), testingPriorityQueue.pop()); + } + } + @Test public void testAddMultiClear() { for (int i = 0; i < 1000; i++) {