Add requests timeout

This commit is contained in:
Andrea Cavalli 2021-12-12 23:40:30 +01:00
parent 5157656a2c
commit 907561d93c
21 changed files with 121 additions and 61 deletions

View File

@ -161,6 +161,12 @@
<artifactId>junit-jupiter-params</artifactId> <artifactId>junit-jupiter-params</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.18.0</version>
<scope>test</scope>
</dependency>
<!-- This will get hamcrest-core automatically --> <!-- This will get hamcrest-core automatically -->
<dependency> <dependency>
<groupId>org.hamcrest</groupId> <groupId>org.hamcrest</groupId>

View File

@ -190,6 +190,7 @@ versions:
minCompetitiveScore: -float minCompetitiveScore: -float
sort: Sort sort: Sort
computePreciseHitsCount: boolean computePreciseHitsCount: boolean
timeoutMilliseconds: long
NoSort: NoSort:
data: { } data: { }
NumericSort: NumericSort:

View File

@ -18,4 +18,5 @@ public record LuceneOptions(Map<String, String> extraFlags,
int indexWriterBufferSize, int indexWriterBufferSize,
boolean applyAllDeletes, boolean applyAllDeletes,
boolean writeAllDeletes, boolean writeAllDeletes,
boolean allowNonVolatileCollection) {} boolean allowNonVolatileCollection,
int maxInMemoryResultEntries) {}

View File

@ -69,7 +69,7 @@ public interface LLLuceneIndex extends LLSnapshottable {
BucketParams bucketParams); BucketParams bucketParams);
default Mono<TotalHitsCount> count(@Nullable LLSnapshot snapshot, Query query) { default Mono<TotalHitsCount> count(@Nullable LLSnapshot snapshot, Query query) {
QueryParams params = QueryParams.of(query, 0, 0, Nullablefloat.empty(), NoSort.of(), false); QueryParams params = QueryParams.of(query, 0, 0, Nullablefloat.empty(), NoSort.of(), false, Long.MAX_VALUE);
return Mono.from(this.search(snapshot, params, null) return Mono.from(this.search(snapshot, params, null)
.map(llSearchResultShard -> { .map(llSearchResultShard -> {
try (llSearchResultShard) { try (llSearchResultShard) {

View File

@ -196,10 +196,11 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
this.luceneSimilarity = LuceneUtils.toPerFieldSimilarityWrapper(indicizerSimilarities); this.luceneSimilarity = LuceneUtils.toPerFieldSimilarityWrapper(indicizerSimilarities);
var useLMDB = luceneOptions.allowNonVolatileCollection(); var useLMDB = luceneOptions.allowNonVolatileCollection();
var maxInMemoryResultEntries = luceneOptions.maxInMemoryResultEntries();
if (luceneHacks != null && luceneHacks.customLocalSearcher() != null) { if (luceneHacks != null && luceneHacks.customLocalSearcher() != null) {
localSearcher = luceneHacks.customLocalSearcher().get(); localSearcher = luceneHacks.customLocalSearcher().get();
} else { } else {
localSearcher = new AdaptiveLocalSearcher(env, useLMDB); localSearcher = new AdaptiveLocalSearcher(env, useLMDB, maxInMemoryResultEntries);
} }
var indexWriterConfig = new IndexWriterConfig(luceneAnalyzer); var indexWriterConfig = new IndexWriterConfig(luceneAnalyzer);

View File

@ -98,10 +98,11 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
this.luceneSimilarity = LuceneUtils.toPerFieldSimilarityWrapper(indicizerSimilarities); this.luceneSimilarity = LuceneUtils.toPerFieldSimilarityWrapper(indicizerSimilarities);
var useLMDB = luceneOptions.allowNonVolatileCollection(); var useLMDB = luceneOptions.allowNonVolatileCollection();
var maxInMemoryResultEntries = luceneOptions.maxInMemoryResultEntries();
if (luceneHacks != null && luceneHacks.customMultiSearcher() != null) { if (luceneHacks != null && luceneHacks.customMultiSearcher() != null) {
multiSearcher = luceneHacks.customMultiSearcher().get(); multiSearcher = luceneHacks.customMultiSearcher().get();
} else { } else {
multiSearcher = new AdaptiveMultiSearcher(env, useLMDB); multiSearcher = new AdaptiveMultiSearcher(env, useLMDB, maxInMemoryResultEntries);
} }
} }

View File

@ -26,6 +26,7 @@ import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.time.Duration;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -46,6 +47,7 @@ import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.IndexableField;
import org.apache.lucene.search.BooleanClause.Occur; import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery.Builder; import org.apache.lucene.search.BooleanQuery.Builder;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.ConstantScoreQuery; import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.MatchAllDocsQuery;
@ -53,7 +55,9 @@ import org.apache.lucene.search.MatchNoDocsQuery;
import org.apache.lucene.search.Query; import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort; import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TimeLimitingCollector;
import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopDocsCollector;
import org.apache.lucene.search.TopFieldDocs; import org.apache.lucene.search.TopFieldDocs;
import org.apache.lucene.search.TotalHits; import org.apache.lucene.search.TotalHits;
import org.apache.lucene.search.similarities.BooleanSimilarity; import org.apache.lucene.search.similarities.BooleanSimilarity;
@ -346,7 +350,8 @@ public class LuceneUtils {
DEFAULT_PAGE_LIMITS, DEFAULT_PAGE_LIMITS,
queryParams.minCompetitiveScore().getNullable(), queryParams.minCompetitiveScore().getNullable(),
QueryParser.toSort(queryParams.sort()), QueryParser.toSort(queryParams.sort()),
queryParams.computePreciseHitsCount() queryParams.computePreciseHitsCount(),
Duration.ofMillis(queryParams.timeoutMilliseconds())
); );
} }
@ -504,7 +509,8 @@ public class LuceneUtils {
DEFAULT_PAGE_LIMITS, DEFAULT_PAGE_LIMITS,
localQueryParams.minCompetitiveScore(), localQueryParams.minCompetitiveScore(),
localQueryParams.sort(), localQueryParams.sort(),
localQueryParams.computePreciseHitsCount() localQueryParams.computePreciseHitsCount(),
localQueryParams.timeout()
); );
} }
MultiMoreLikeThis mlt; MultiMoreLikeThis mlt;
@ -549,8 +555,13 @@ public class LuceneUtils {
DEFAULT_PAGE_LIMITS, DEFAULT_PAGE_LIMITS,
localQueryParams.minCompetitiveScore(), localQueryParams.minCompetitiveScore(),
localQueryParams.sort(), localQueryParams.sort(),
localQueryParams.computePreciseHitsCount() localQueryParams.computePreciseHitsCount(),
localQueryParams.timeout()
); );
}).subscribeOn(Schedulers.boundedElastic())); }).subscribeOn(Schedulers.boundedElastic()));
} }
public static Collector withTimeout(TopDocsCollector<?> collector, Duration timeout) {
return new TimeLimitingCollector(collector, TimeLimitingCollector.getGlobalCounter(), timeout.toMillis());
}
} }

View File

@ -1,7 +1,5 @@
package it.cavallium.dbengine.lucene.searcher; package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.lucene.searcher.MultiSearcher.MAX_IN_MEMORY_SIZE;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport; import io.net5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
@ -22,15 +20,21 @@ public class AdaptiveLocalSearcher implements LocalSearcher {
private static final MultiSearcher unsortedUnscoredContinuous = new UnsortedUnscoredStreamingMultiSearcher(); private static final MultiSearcher unsortedUnscoredContinuous = new UnsortedUnscoredStreamingMultiSearcher();
/**
* Use in-memory collectors if the expected results count is lower or equal than this limit
*/
private final int maxInMemoryResultEntries;
@Nullable @Nullable
private final UnsortedScoredFullMultiSearcher unsortedScoredFull; private final UnsortedScoredFullMultiSearcher unsortedScoredFull;
@Nullable @Nullable
private final SortedScoredFullMultiSearcher sortedScoredFull; private final SortedScoredFullMultiSearcher sortedScoredFull;
public AdaptiveLocalSearcher(LLTempLMDBEnv env, boolean useLMDB) { public AdaptiveLocalSearcher(LLTempLMDBEnv env, boolean useLMDB, int maxInMemoryResultEntries) {
unsortedScoredFull = useLMDB ? new UnsortedScoredFullMultiSearcher(env) : null; unsortedScoredFull = useLMDB ? new UnsortedScoredFullMultiSearcher(env) : null;
sortedScoredFull = useLMDB ? new SortedScoredFullMultiSearcher(env) : null; sortedScoredFull = useLMDB ? new SortedScoredFullMultiSearcher(env) : null;
this.maxInMemoryResultEntries = maxInMemoryResultEntries;
} }
@Override @Override
@ -66,7 +70,7 @@ public class AdaptiveLocalSearcher implements LocalSearcher {
// offset + limit // offset + limit
long realLimit = queryParams.offsetLong() + queryParams.limitLong(); long realLimit = queryParams.offsetLong() + queryParams.limitLong();
long maxAllowedInMemoryLimit long maxAllowedInMemoryLimit
= Math.max(MAX_IN_MEMORY_SIZE, (long) queryParams.pageLimits().getPageLimit(0)); = Math.max(maxInMemoryResultEntries, (long) queryParams.pageLimits().getPageLimit(0));
if (queryParams.limitLong() == 0) { if (queryParams.limitLong() == 0) {
return countSearcher.collect(indexSearcher, queryParams, keyFieldName, transformer); return countSearcher.collect(indexSearcher, queryParams, keyFieldName, transformer);
@ -75,8 +79,8 @@ public class AdaptiveLocalSearcher implements LocalSearcher {
return scoredPaged.collect(indexSearcher, queryParams, keyFieldName, transformer); return scoredPaged.collect(indexSearcher, queryParams, keyFieldName, transformer);
} else { } else {
if ((queryParams.isSorted() && !queryParams.isSortedByScore())) { if ((queryParams.isSorted() && !queryParams.isSortedByScore())) {
if (queryParams.limitLong() < MAX_IN_MEMORY_SIZE) { if (queryParams.limitLong() < maxInMemoryResultEntries) {
throw new UnsupportedOperationException("Allowed limit is " + MAX_IN_MEMORY_SIZE + " or greater"); throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater");
} }
if (sortedScoredFull != null) { if (sortedScoredFull != null) {
return sortedScoredFull.collect(indexSearcher, queryParams, keyFieldName, transformer); return sortedScoredFull.collect(indexSearcher, queryParams, keyFieldName, transformer);
@ -84,8 +88,8 @@ public class AdaptiveLocalSearcher implements LocalSearcher {
return scoredPaged.collect(indexSearcher, queryParams, keyFieldName, transformer); return scoredPaged.collect(indexSearcher, queryParams, keyFieldName, transformer);
} }
} else { } else {
if (queryParams.limitLong() < MAX_IN_MEMORY_SIZE) { if (queryParams.limitLong() < maxInMemoryResultEntries) {
throw new UnsupportedOperationException("Allowed limit is " + MAX_IN_MEMORY_SIZE + " or greater"); throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater");
} }
if (unsortedScoredFull != null) { if (unsortedScoredFull != null) {
return unsortedScoredFull.collect(indexSearcher, queryParams, keyFieldName, transformer); return unsortedScoredFull.collect(indexSearcher, queryParams, keyFieldName, transformer);

View File

@ -18,15 +18,21 @@ public class AdaptiveMultiSearcher implements MultiSearcher {
private static final MultiSearcher unsortedUnscoredContinuous = new UnsortedUnscoredStreamingMultiSearcher(); private static final MultiSearcher unsortedUnscoredContinuous = new UnsortedUnscoredStreamingMultiSearcher();
/**
* Use in-memory collectors if the expected results count is lower or equal than this limit
*/
private final int maxInMemoryResultEntries;
@Nullable @Nullable
private final UnsortedScoredFullMultiSearcher unsortedScoredFull; private final UnsortedScoredFullMultiSearcher unsortedScoredFull;
@Nullable @Nullable
private final SortedScoredFullMultiSearcher sortedScoredFull; private final SortedScoredFullMultiSearcher sortedScoredFull;
public AdaptiveMultiSearcher(LLTempLMDBEnv env, boolean useLMDB) { public AdaptiveMultiSearcher(LLTempLMDBEnv env, boolean useLMDB, int maxInMemoryResultEntries) {
unsortedScoredFull = useLMDB ? new UnsortedScoredFullMultiSearcher(env) : null; unsortedScoredFull = useLMDB ? new UnsortedScoredFullMultiSearcher(env) : null;
sortedScoredFull = useLMDB ? new SortedScoredFullMultiSearcher(env) : null; sortedScoredFull = useLMDB ? new SortedScoredFullMultiSearcher(env) : null;
this.maxInMemoryResultEntries = maxInMemoryResultEntries;
} }
@Override @Override
@ -53,7 +59,7 @@ public class AdaptiveMultiSearcher implements MultiSearcher {
// offset + limit // offset + limit
long realLimit = queryParams.offsetLong() + queryParams.limitLong(); long realLimit = queryParams.offsetLong() + queryParams.limitLong();
long maxAllowedInMemoryLimit long maxAllowedInMemoryLimit
= Math.max(MultiSearcher.MAX_IN_MEMORY_SIZE, (long) queryParams.pageLimits().getPageLimit(0)); = Math.max(maxInMemoryResultEntries, (long) queryParams.pageLimits().getPageLimit(0));
return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> { return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> {
if (queryParams.limitLong() == 0) { if (queryParams.limitLong() == 0) {
@ -63,8 +69,8 @@ public class AdaptiveMultiSearcher implements MultiSearcher {
return scoredPaged.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); return scoredPaged.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
} else { } else {
if ((queryParams.isSorted() && !queryParams.isSortedByScore())) { if ((queryParams.isSorted() && !queryParams.isSortedByScore())) {
if (queryParams.limitLong() < MAX_IN_MEMORY_SIZE) { if (queryParams.limitLong() < maxInMemoryResultEntries) {
throw new UnsupportedOperationException("Allowed limit is " + MAX_IN_MEMORY_SIZE + " or greater"); throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater");
} }
if (sortedScoredFull != null) { if (sortedScoredFull != null) {
return sortedScoredFull.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); return sortedScoredFull.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
@ -72,8 +78,8 @@ public class AdaptiveMultiSearcher implements MultiSearcher {
return scoredPaged.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); return scoredPaged.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
} }
} else { } else {
if (queryParams.limitLong() < MAX_IN_MEMORY_SIZE) { if (queryParams.limitLong() < maxInMemoryResultEntries) {
throw new UnsupportedOperationException("Allowed limit is " + MAX_IN_MEMORY_SIZE + " or greater"); throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater");
} }
if (unsortedScoredFull != null) { if (unsortedScoredFull != null) {
return unsortedScoredFull.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); return unsortedScoredFull.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);

View File

@ -92,7 +92,8 @@ public class CountMultiSearcher implements MultiSearcher {
queryParams.pageLimits(), queryParams.pageLimits(),
queryParams.minCompetitiveScore(), queryParams.minCompetitiveScore(),
queryParams.sort(), queryParams.sort(),
queryParams.computePreciseHitsCount() queryParams.computePreciseHitsCount(),
queryParams.timeout()
); );
} }
@ -113,12 +114,14 @@ public class CountMultiSearcher implements MultiSearcher {
.fromSupplier(() -> new TransformerInput(LLIndexSearchers.unsharded(indexSearcher), queryParams))); .fromSupplier(() -> new TransformerInput(LLIndexSearchers.unsharded(indexSearcher), queryParams)));
} }
return queryParamsMono.flatMap(queryParams2 -> Mono.fromCallable(() -> { return queryParamsMono
.flatMap(queryParams2 -> Mono.fromCallable(() -> {
try (var is = indexSearcher.receive()) { try (var is = indexSearcher.receive()) {
LLUtils.ensureBlocking(); LLUtils.ensureBlocking();
return is.getIndexSearcher().count(queryParams2.query()); return is.getIndexSearcher().count(queryParams2.query());
} }
}).subscribeOn(Schedulers.boundedElastic())); }).subscribeOn(Schedulers.boundedElastic()))
.timeout(queryParams.timeout());
}, },
is -> Mono.empty() is -> Mono.empty()
) )

View File

@ -4,6 +4,7 @@ import static it.cavallium.dbengine.lucene.LuceneUtils.safeLongToInt;
import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.PageLimits; import it.cavallium.dbengine.lucene.PageLimits;
import java.time.Duration;
import java.util.Objects; import java.util.Objects;
import org.apache.lucene.search.Query; import org.apache.lucene.search.Query;
import org.apache.lucene.search.Sort; import org.apache.lucene.search.Sort;
@ -11,8 +12,8 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
public record LocalQueryParams(@NotNull Query query, int offsetInt, long offsetLong, int limitInt, long limitLong, public record LocalQueryParams(@NotNull Query query, int offsetInt, long offsetLong, int limitInt, long limitLong,
@NotNull PageLimits pageLimits, @NotNull PageLimits pageLimits, @Nullable Float minCompetitiveScore, @Nullable Sort sort,
@Nullable Float minCompetitiveScore, @Nullable Sort sort, boolean computePreciseHitsCount) { boolean computePreciseHitsCount, Duration timeout) {
public LocalQueryParams(@NotNull Query query, public LocalQueryParams(@NotNull Query query,
long offsetLong, long offsetLong,
@ -20,9 +21,10 @@ public record LocalQueryParams(@NotNull Query query, int offsetInt, long offsetL
@NotNull PageLimits pageLimits, @NotNull PageLimits pageLimits,
@Nullable Float minCompetitiveScore, @Nullable Float minCompetitiveScore,
@Nullable Sort sort, @Nullable Sort sort,
boolean computePreciseHitsCount) { boolean computePreciseHitsCount,
Duration timeout) {
this(query, safeLongToInt(offsetLong), offsetLong, safeLongToInt(limitLong), limitLong, pageLimits, this(query, safeLongToInt(offsetLong), offsetLong, safeLongToInt(limitLong), limitLong, pageLimits,
minCompetitiveScore, sort, computePreciseHitsCount); minCompetitiveScore, sort, computePreciseHitsCount, timeout);
} }
public LocalQueryParams(@NotNull Query query, public LocalQueryParams(@NotNull Query query,
@ -31,7 +33,8 @@ public record LocalQueryParams(@NotNull Query query, int offsetInt, long offsetL
@NotNull PageLimits pageLimits, @NotNull PageLimits pageLimits,
@Nullable Float minCompetitiveScore, @Nullable Float minCompetitiveScore,
@Nullable Sort sort, @Nullable Sort sort,
boolean computePreciseHitsCount) { boolean computePreciseHitsCount,
Duration timeout) {
this(query, this(query,
offsetInt, offsetInt,
offsetInt, offsetInt,
@ -40,7 +43,8 @@ public record LocalQueryParams(@NotNull Query query, int offsetInt, long offsetL
pageLimits, pageLimits,
minCompetitiveScore, minCompetitiveScore,
sort, sort,
computePreciseHitsCount computePreciseHitsCount,
timeout
); );
} }

View File

@ -7,11 +7,6 @@ import reactor.core.publisher.Mono;
public interface MultiSearcher extends LocalSearcher { public interface MultiSearcher extends LocalSearcher {
/**
* Use in-memory collectors if the expected results count is lower or equal than this limit
*/
int MAX_IN_MEMORY_SIZE = 8192;
/** /**
* @param indexSearchersMono Lucene index searcher * @param indexSearchersMono Lucene index searcher
* @param queryParams the query parameters * @param queryParams the query parameters

View File

@ -9,6 +9,7 @@ import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import java.util.List; import java.util.List;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TimeLimitingCollector;
import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldCollector; import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.search.TopScoreDocCollector; import org.apache.lucene.search.TopScoreDocCollector;
@ -73,7 +74,7 @@ public class OfficialSearcher implements MultiSearcher {
var collector = sharedManager.newCollector(); var collector = sharedManager.newCollector();
assert queryParams.computePreciseHitsCount() == collector.scoreMode().isExhaustive(); assert queryParams.computePreciseHitsCount() == collector.scoreMode().isExhaustive();
shard.search(queryParams.query(), collector); shard.search(queryParams.query(), LuceneUtils.withTimeout(collector, queryParams.timeout()));
return collector; return collector;
})) }))
.collectList() .collectList()

View File

@ -120,7 +120,8 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher {
queryParams.pageLimits(), queryParams.pageLimits(),
queryParams.minCompetitiveScore(), queryParams.minCompetitiveScore(),
queryParams.sort(), queryParams.sort(),
queryParams.computePreciseHitsCount() queryParams.computePreciseHitsCount(),
queryParams.timeout()
); );
} }

View File

@ -40,6 +40,7 @@ import reactor.core.publisher.Mono;
public class DbTestUtils { public class DbTestUtils {
public static final String BIG_STRING = generateBigString(); public static final String BIG_STRING = generateBigString();
public static final int MAX_IN_MEMORY_RESULT_ENTRIES = 8192;
private static String generateBigString() { private static String generateBigString() {
return "0123456789".repeat(1024); return "0123456789".repeat(1024);

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine; package it.cavallium.dbengine;
import static it.cavallium.dbengine.DbTestUtils.MAX_IN_MEMORY_RESULT_ENTRIES;
import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks; import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
@ -34,8 +35,20 @@ public class LocalTemporaryDbGenerator implements TemporaryDbGenerator {
private static final AtomicInteger dbId = new AtomicInteger(0); private static final AtomicInteger dbId = new AtomicInteger(0);
private static final Optional<NRTCachingOptions> NRT = Optional.empty(); private static final Optional<NRTCachingOptions> NRT = Optional.empty();
private static final LuceneOptions LUCENE_OPTS = new LuceneOptions(Map.of(), Duration.ofSeconds(5), Duration.ofSeconds(5), private static final LuceneOptions LUCENE_OPTS = new LuceneOptions(Map.of(),
false, true, Optional.empty(), true, NRT, 16 * 1024 * 1024, true, false, true); Duration.ofSeconds(5),
Duration.ofSeconds(5),
false,
true,
Optional.empty(),
true,
NRT,
16 * 1024 * 1024,
true,
false,
true,
MAX_IN_MEMORY_RESULT_ENTRIES
);
@Override @Override
public Mono<TempDb> openTempDb(TestAllocator allocator) { public Mono<TempDb> openTempDb(TestAllocator allocator) {

View File

@ -1,5 +1,7 @@
package it.cavallium.dbengine; package it.cavallium.dbengine;
import static it.cavallium.dbengine.DbTestUtils.MAX_IN_MEMORY_RESULT_ENTRIES;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import it.cavallium.dbengine.DbTestUtils.TempDb; import it.cavallium.dbengine.DbTestUtils.TempDb;
import it.cavallium.dbengine.DbTestUtils.TestAllocator; import it.cavallium.dbengine.DbTestUtils.TestAllocator;
@ -23,7 +25,17 @@ public class MemoryTemporaryDbGenerator implements TemporaryDbGenerator {
private static final Optional<NRTCachingOptions> NRT = Optional.empty(); private static final Optional<NRTCachingOptions> NRT = Optional.empty();
private static final LuceneOptions LUCENE_OPTS = new LuceneOptions(Map.of(), Duration.ofSeconds(5), Duration.ofSeconds(5), private static final LuceneOptions LUCENE_OPTS = new LuceneOptions(Map.of(), Duration.ofSeconds(5), Duration.ofSeconds(5),
false, true, Optional.empty(), true, NRT, 16 * 1024 * 1024, true, false, false); false,
true,
Optional.empty(),
true,
NRT,
16 * 1024 * 1024,
true,
false,
false,
MAX_IN_MEMORY_RESULT_ENTRIES
);
@Override @Override
public Mono<TempDb> openTempDb(TestAllocator allocator) { public Mono<TempDb> openTempDb(TestAllocator allocator) {

View File

@ -10,6 +10,7 @@ import static it.cavallium.dbengine.DbTestUtils.tempDatabaseMapDictionaryMap;
import static it.cavallium.dbengine.DbTestUtils.tempDb; import static it.cavallium.dbengine.DbTestUtils.tempDb;
import static it.cavallium.dbengine.DbTestUtils.tempDictionary; import static it.cavallium.dbengine.DbTestUtils.tempDictionary;
import static it.cavallium.dbengine.SyncUtils.*; import static it.cavallium.dbengine.SyncUtils.*;
import static org.assertj.core.api.Assertions.*;
import io.net5.buffer.api.internal.ResourceSupport; import io.net5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.DbTestUtils.TestAllocator; import it.cavallium.dbengine.DbTestUtils.TestAllocator;
@ -765,9 +766,7 @@ public abstract class TestDictionaryMapDeep {
@MethodSource("provideArgumentsSetMulti") @MethodSource("provideArgumentsSetMulti")
public void testSetMultiGetMulti(UpdateMode updateMode, Map<String, Map<String, String>> entries, public void testSetMultiGetMulti(UpdateMode updateMode, Map<String, Map<String, String>> entries,
boolean shouldFail) { boolean shouldFail) {
var remainingEntries = new ConcurrentHashMap<Entry<String, Map<String, String>>, Boolean>().keySet(true); var flux = tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode)
Step<Entry<String, Map<String, String>>> stpVer = StepVerifier
.create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> { .flatMapMany(map -> {
var entriesFlux = Flux.fromIterable(entries.entrySet()); var entriesFlux = Flux.fromIterable(entries.entrySet());
@ -782,16 +781,13 @@ public abstract class TestDictionaryMapDeep {
.filter(k -> k.getValue().isPresent()) .filter(k -> k.getValue().isPresent())
.map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow()))
.transform(LLUtils::handleDiscard) .transform(LLUtils::handleDiscard)
)); );
if (shouldFail) { if (shouldFail) {
this.checkLeaks = false; this.checkLeaks = false;
stpVer.verifyError(); StepVerifier.create(flux).verifyError();
} else { } else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); var elements = flux.collect(Collectors.toList()).block();
for (Entry<String, Map<String, String>> ignored : remainingEntries) { assertThat(elements).containsExactlyInAnyOrderElementsOf(entries.entrySet());
stpVer = stpVer.expectNextMatches(remainingEntries::remove);
}
stpVer.verifyComplete();
} }
} }

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine; package it.cavallium.dbengine;
import static it.cavallium.dbengine.DbTestUtils.MAX_IN_MEMORY_RESULT_ENTRIES;
import static it.cavallium.dbengine.DbTestUtils.destroyAllocator; import static it.cavallium.dbengine.DbTestUtils.destroyAllocator;
import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks; import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks;
import static it.cavallium.dbengine.DbTestUtils.newAllocator; import static it.cavallium.dbengine.DbTestUtils.newAllocator;
@ -149,8 +150,8 @@ public class TestLuceneIndex {
} }
} }
} else { } else {
tempDb.swappableLuceneSearcher().setSingle(new AdaptiveLocalSearcher(ENV, true)); tempDb.swappableLuceneSearcher().setSingle(new AdaptiveLocalSearcher(ENV, true, MAX_IN_MEMORY_RESULT_ENTRIES));
tempDb.swappableLuceneSearcher().setMulti(new AdaptiveMultiSearcher(ENV, true)); tempDb.swappableLuceneSearcher().setMulti(new AdaptiveMultiSearcher(ENV, true, MAX_IN_MEMORY_RESULT_ENTRIES));
} }
return index; return index;
} }

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine; package it.cavallium.dbengine;
import static it.cavallium.dbengine.DbTestUtils.MAX_IN_MEMORY_RESULT_ENTRIES;
import static it.cavallium.dbengine.DbTestUtils.destroyAllocator; import static it.cavallium.dbengine.DbTestUtils.destroyAllocator;
import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks; import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks;
import static it.cavallium.dbengine.DbTestUtils.newAllocator; import static it.cavallium.dbengine.DbTestUtils.newAllocator;
@ -167,14 +168,14 @@ public class TestLuceneSearches {
sink.next(new UnsortedUnscoredStreamingMultiSearcher()); sink.next(new UnsortedUnscoredStreamingMultiSearcher());
} }
} }
sink.next(new AdaptiveMultiSearcher(ENV, true)); sink.next(new AdaptiveMultiSearcher(ENV, true, MAX_IN_MEMORY_RESULT_ENTRIES));
} else { } else {
if (info.onlyCount()) { if (info.onlyCount()) {
sink.next(new CountMultiSearcher()); sink.next(new CountMultiSearcher());
} else { } else {
sink.next(new PagedLocalSearcher()); sink.next(new PagedLocalSearcher());
} }
sink.next(new AdaptiveLocalSearcher(ENV, true)); sink.next(new AdaptiveLocalSearcher(ENV, true, MAX_IN_MEMORY_RESULT_ENTRIES));
} }
sink.complete(); sink.complete();
}, OverflowStrategy.BUFFER); }, OverflowStrategy.BUFFER);
@ -219,8 +220,8 @@ public class TestLuceneSearches {
} }
} }
} else { } else {
tempDb.swappableLuceneSearcher().setSingle(new AdaptiveLocalSearcher(ENV, true)); tempDb.swappableLuceneSearcher().setSingle(new AdaptiveLocalSearcher(ENV, true, MAX_IN_MEMORY_RESULT_ENTRIES));
tempDb.swappableLuceneSearcher().setMulti(new AdaptiveMultiSearcher(ENV, true)); tempDb.swappableLuceneSearcher().setMulti(new AdaptiveMultiSearcher(ENV, true, MAX_IN_MEMORY_RESULT_ENTRIES));
} }
return shards ? multiIndex : localIndex; return shards ? multiIndex : localIndex;
} }

View File

@ -100,7 +100,8 @@ public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher {
queryParams.pageLimits(), queryParams.pageLimits(),
queryParams.minCompetitiveScore(), queryParams.minCompetitiveScore(),
queryParams.sort(), queryParams.sort(),
queryParams.computePreciseHitsCount() queryParams.computePreciseHitsCount(),
queryParams.timeout()
); );
} }