Fix searchers leak, change method references

Replace most method references with lambdas to ease debugging
This commit is contained in:
Andrea Cavalli 2022-07-03 01:32:13 +02:00
parent 409b2985ca
commit 8f47adfc44
29 changed files with 194 additions and 226 deletions

View File

@ -113,7 +113,7 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
)
.collectList()
.mapNotNull(shards -> mergeResults(queryParams, shards))
.map(this::mapResults)
.map(llSearchResult -> mapResults(llSearchResult))
.defaultIfEmpty(Hits.empty())
.doOnDiscard(DiscardingCloseable.class, DiscardingCloseable::close);
}
@ -127,7 +127,7 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
)
.collectList()
.mapNotNull(shards -> mergeResults(queryParams, shards))
.map(this::mapResults)
.map(llSearchResult -> mapResults(llSearchResult))
.defaultIfEmpty(Hits.empty())
.doOnDiscard(DiscardingCloseable.class, DiscardingCloseable::close);
}

View File

@ -19,7 +19,7 @@ public abstract class BufSupplier implements SafeCloseable, DiscardingCloseable,
}
public static BufSupplier ofShared(Buffer supplier) {
return new SimpleBufSupplier(supplier::copy);
return new SimpleBufSupplier(() -> supplier.copy());
}
private static final class SimpleBufSupplier extends BufSupplier {

View File

@ -118,7 +118,7 @@ public class LLMultiLuceneIndex implements LLLuceneIndex {
@Override
public Mono<Void> deleteAll() {
Iterable<Mono<Void>> it = () -> luceneIndicesSet.stream().map(LLLuceneIndex::deleteAll).iterator();
Iterable<Mono<Void>> it = () -> luceneIndicesSet.stream().map(llLuceneIndex -> llLuceneIndex.deleteAll()).iterator();
return Mono.whenDelayError(it);
}
@ -223,7 +223,7 @@ public class LLMultiLuceneIndex implements LLLuceneIndex {
// Generate next snapshot index
.fromCallable(nextSnapshotNumber::getAndIncrement)
.flatMap(snapshotIndex -> luceneIndicesFlux
.flatMapSequential(LLLuceneIndex::takeSnapshot)
.flatMapSequential(llLuceneIndex -> llLuceneIndex.takeSnapshot())
.collectList()
.doOnNext(instancesSnapshotsArray -> registeredSnapshots.put(snapshotIndex, instancesSnapshotsArray))
.thenReturn(new LLSnapshot(snapshotIndex))

View File

@ -233,7 +233,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.get(null)
.concatWith(dictionary.setRange(rangeMono, Flux
.fromIterable(Collections.unmodifiableMap(value).entrySet())
.map(this::serializeEntry), true).then(Mono.empty()))
.map(entry -> serializeEntry(entry)), true).then(Mono.empty()))
.singleOrEmpty();
}
@ -307,7 +307,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
return dictionary
.updateAndGetDelta(keyMono, getSerializedUpdater(updater))
.transform(mono -> LLUtils.mapLLDelta(mono, valueSerializer::deserialize));
.transform(mono -> LLUtils.mapLLDelta(mono, serialized -> valueSerializer.deserialize(serialized)));
}
public BinarySerializationFunction getSerializedUpdater(SerializationFunction<@Nullable U, @Nullable U> updater) {
@ -395,12 +395,12 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
return dictionary
.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE_EXISTENCE)
.map(LLUtils::responseToBoolean);
.map(response -> LLUtils.responseToBoolean(response));
}
@Override
public Flux<Optional<U>> getMulti(@Nullable CompositeSnapshot snapshot, Flux<T> keys) {
var mappedKeys = keys.map(this::serializeKeySuffixToKey);
var mappedKeys = keys.map(keySuffix -> serializeKeySuffixToKey(keySuffix));
return dictionary
.getMulti(resolveSnapshot(snapshot), mappedKeys)
.map(valueBufOpt -> {
@ -431,7 +431,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<Void> putMulti(Flux<Entry<T, U>> entries) {
var serializedEntries = entries.map(this::serializeEntry);
var serializedEntries = entries.map(entry -> serializeEntry(entry));
return dictionary.putMulti(serializedEntries);
}
@ -439,7 +439,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public Flux<Boolean> updateMulti(Flux<T> keys,
KVSerializationFunction<T, @Nullable U, @Nullable U> updater) {
var sharedKeys = keys.publish().refCount(2);
var serializedKeys = sharedKeys.map(this::serializeKeySuffixToKey);
var serializedKeys = sharedKeys.map(keySuffix -> serializeKeySuffixToKey(keySuffix));
var serializedUpdater = getSerializedUpdater(updater);
return dictionary.updateMulti(sharedKeys, serializedKeys, serializedUpdater);
}
@ -574,18 +574,18 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) {
return Flux.usingWhen(Mono.just(true),
b -> this.getAllValues(null, false),
b -> dictionary.setRange(rangeMono, entries.map(this::serializeEntry), false)
b -> dictionary.setRange(rangeMono, entries.map(entry -> serializeEntry(entry)), false)
);
}
@Override
public Mono<Void> clear() {
return Mono.using(rangeSupplier::get, range -> {
return Mono.using(() -> rangeSupplier.get(), range -> {
if (range.isAll()) {
return dictionary.clear();
} else if (range.isSingle()) {
return dictionary
.remove(Mono.fromCallable(range::getSingleUnsafe), LLDictionaryResultType.VOID)
.remove(Mono.fromCallable(() -> range.getSingleUnsafe()), LLDictionaryResultType.VOID)
.doOnNext(LLUtils::finalizeResourceNow)
.then();
} else {

View File

@ -364,7 +364,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
return dictionary
.getRangeKeyPrefixes(resolveSnapshot(snapshot), rangeMono, keyPrefixLength + keySuffixLength, smallRange)
.flatMapSequential(groupKeyWithoutExt -> this.subStageGetter
.subStage(dictionary, snapshot, Mono.fromCallable(groupKeyWithoutExt::copy))
.subStage(dictionary, snapshot, Mono.fromCallable(() -> groupKeyWithoutExt.copy()))
.map(us -> {
T deserializedSuffix;
try (var splittedGroupSuffix = splitGroupSuffix(groupKeyWithoutExt)) {
@ -415,18 +415,18 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
@Override
public Mono<Void> clear() {
return Mono.using(rangeSupplier::get, range -> {
return Mono.using(() -> rangeSupplier.get(), range -> {
if (range.isAll()) {
return dictionary.clear();
} else if (range.isSingle()) {
return dictionary
.remove(Mono.fromCallable(range::getSingleUnsafe), LLDictionaryResultType.VOID)
.doOnNext(LLUtils::finalizeResourceNow)
.remove(Mono.fromCallable(() -> range.getSingleUnsafe()), LLDictionaryResultType.VOID)
.doOnNext(resource -> LLUtils.finalizeResourceNow(resource))
.then();
} else {
return dictionary.setRange(rangeMono, Flux.empty(), false);
}
}, LLUtils::finalizeResourceNow);
}, resource -> LLUtils.finalizeResourceNow(resource));
}
protected T deserializeSuffix(@NotNull Buffer keySuffix) throws SerializationException {
@ -515,7 +515,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
throw new IllegalArgumentException();
}
var savedProgressKey1Opt = savedProgressKey1.map(Optional::of).defaultIfEmpty(Optional.empty());
var savedProgressKey1Opt = savedProgressKey1.map(value1 -> Optional.of(value1)).defaultIfEmpty(Optional.empty());
return deepMap
.dictionary

View File

@ -153,7 +153,7 @@ public class DatabaseMapDictionaryHashed<T, U, TH> extends
@Override
public Mono<Object2ObjectSortedMap<T, U>> get(@Nullable CompositeSnapshot snapshot) {
return subDictionary.get(snapshot).map(this::deserializeMap);
return subDictionary.get(snapshot).map(map -> deserializeMap(map));
}
@Override
@ -164,12 +164,15 @@ public class DatabaseMapDictionaryHashed<T, U, TH> extends
@Override
public Mono<Void> set(Object2ObjectSortedMap<T, U> map) {
return Mono.fromSupplier(() -> this.serializeMap(map)).flatMap(subDictionary::set);
return Mono.fromSupplier(() -> this.serializeMap(map)).flatMap(value -> subDictionary.set(value));
}
@Override
public Mono<Boolean> setAndGetChanged(Object2ObjectSortedMap<T, U> map) {
return Mono.fromSupplier(() -> this.serializeMap(map)).flatMap(subDictionary::setAndGetChanged).single();
return Mono
.fromSupplier(() -> this.serializeMap(map))
.flatMap(value -> subDictionary.setAndGetChanged(value))
.single();
}
@Override
@ -249,15 +252,15 @@ public class DatabaseMapDictionaryHashed<T, U, TH> extends
public Mono<Object2ObjectSortedMap<T, U>> setAndGetPrevious(Object2ObjectSortedMap<T, U> value) {
return Mono
.fromSupplier(() -> this.serializeMap(value))
.flatMap(subDictionary::setAndGetPrevious)
.map(this::deserializeMap);
.flatMap(value1 -> subDictionary.setAndGetPrevious(value1))
.map(map -> deserializeMap(map));
}
@Override
public Mono<Object2ObjectSortedMap<T, U>> clearAndGetPrevious() {
return subDictionary
.clearAndGetPrevious()
.map(this::deserializeMap);
.map(map -> deserializeMap(map));
}
@Override

View File

@ -167,7 +167,7 @@ public class DatabaseMapSingle<U> extends ResourceSupport<DatabaseStage<U>, Data
} else {
return serializeValue(result);
}
}).transform(mono -> LLUtils.mapLLDelta(mono, serializer::deserialize));
}).transform(mono -> LLUtils.mapLLDelta(mono, serialized -> serializer.deserialize(serialized)));
}
@Override
@ -181,19 +181,19 @@ public class DatabaseMapSingle<U> extends ResourceSupport<DatabaseStage<U>, Data
@Override
public Mono<Long> leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) {
return dictionary
.isRangeEmpty(resolveSnapshot(snapshot), keyMono.map(LLRange::singleUnsafe), false)
.isRangeEmpty(resolveSnapshot(snapshot), keyMono.map(single -> LLRange.singleUnsafe(single)), false)
.map(empty -> empty ? 0L : 1L);
}
@Override
public Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) {
return dictionary
.isRangeEmpty(resolveSnapshot(snapshot), keyMono.map(LLRange::singleUnsafe), true);
.isRangeEmpty(resolveSnapshot(snapshot), keyMono.map(single -> LLRange.singleUnsafe(single)), true);
}
@Override
public Flux<BadBlock> badBlocks() {
return dictionary.badBlocks(keyMono.map(LLRange::singleUnsafe));
return dictionary.badBlocks(keyMono.map(single -> LLRange.singleUnsafe(single)));
}
@Override

View File

@ -83,12 +83,12 @@ public class DatabaseSingleBucket<K, V, TH>
@Override
public Mono<V> get(@Nullable CompositeSnapshot snapshot) {
return bucketStage.get(snapshot).flatMap(this::extractValueTransformation);
return bucketStage.get(snapshot).flatMap(entries -> extractValueTransformation(entries));
}
@Override
public Mono<V> getOrDefault(@Nullable CompositeSnapshot snapshot, Mono<V> defaultValue) {
return bucketStage.get(snapshot).flatMap(this::extractValueTransformation).switchIfEmpty(defaultValue);
return bucketStage.get(snapshot).flatMap(entries -> extractValueTransformation(entries)).switchIfEmpty(defaultValue);
}
@Override
@ -103,7 +103,7 @@ public class DatabaseSingleBucket<K, V, TH>
@Override
public Mono<Boolean> setAndGetChanged(V value) {
return this.updateAndGetDelta(prev -> value).map(LLUtils::isDeltaChanged);
return this.updateAndGetDelta(prev -> value).map(delta -> LLUtils.isDeltaChanged(delta));
}
@Override
@ -119,7 +119,7 @@ public class DatabaseSingleBucket<K, V, TH>
return this.insertValueOrCreate(oldBucket, newValue);
}
}, updateReturnMode)
.flatMap(this::extractValueTransformation);
.flatMap(entries -> extractValueTransformation(entries));
}
@Override
@ -132,7 +132,7 @@ public class DatabaseSingleBucket<K, V, TH>
} else {
return this.insertValueOrCreate(oldBucket, result);
}
}).transform(mono -> LLUtils.mapDelta(mono, this::extractValue));
}).transform(mono -> LLUtils.mapDelta(mono, entries -> extractValue(entries)));
}
@Override
@ -147,7 +147,7 @@ public class DatabaseSingleBucket<K, V, TH>
@Override
public Mono<Boolean> clearAndGetStatus() {
return this.updateAndGetDelta(prev -> null).map(LLUtils::isDeltaChanged);
return this.updateAndGetDelta(prev -> null).map(delta -> LLUtils.isDeltaChanged(delta));
}
@Override

View File

@ -79,34 +79,34 @@ public class DatabaseSingleMapped<A, B> extends ResourceSupport<DatabaseStage<A>
@Override
public Mono<A> get(@Nullable CompositeSnapshot snapshot) {
return serializedSingle.get(snapshot).handle(this::deserializeSink);
return serializedSingle.get(snapshot).handle((value, sink) -> deserializeSink(value, sink));
}
@Override
public Mono<A> getOrDefault(@Nullable CompositeSnapshot snapshot, Mono<A> defaultValue) {
return serializedSingle.get(snapshot).handle(this::deserializeSink).switchIfEmpty(defaultValue);
return serializedSingle.get(snapshot).handle((B value, SynchronousSink<A> sink) -> deserializeSink(value, sink)).switchIfEmpty(defaultValue);
}
@Override
public Mono<Void> set(A value) {
return Mono
.fromCallable(() -> map(value))
.flatMap(serializedSingle::set);
.flatMap(value1 -> serializedSingle.set(value1));
}
@Override
public Mono<A> setAndGetPrevious(A value) {
return Mono
.fromCallable(() -> map(value))
.flatMap(serializedSingle::setAndGetPrevious)
.handle(this::deserializeSink);
.flatMap(value2 -> serializedSingle.setAndGetPrevious(value2))
.handle((value1, sink) -> deserializeSink(value1, sink));
}
@Override
public Mono<Boolean> setAndGetChanged(A value) {
return Mono
.fromCallable(() -> map(value))
.flatMap(serializedSingle::setAndGetChanged)
.flatMap(value1 -> serializedSingle.setAndGetChanged(value1))
.single();
}
@ -120,7 +120,7 @@ public class DatabaseSingleMapped<A, B> extends ResourceSupport<DatabaseStage<A>
} else {
return this.map(result);
}
}, updateReturnMode).handle(this::deserializeSink);
}, updateReturnMode).handle((value, sink) -> deserializeSink(value, sink));
}
@Override
@ -132,7 +132,7 @@ public class DatabaseSingleMapped<A, B> extends ResourceSupport<DatabaseStage<A>
} else {
return this.map(result);
}
}).transform(mono -> LLUtils.mapDelta(mono, this::unMap));
}).transform(mono -> LLUtils.mapDelta(mono, bytes -> unMap(bytes)));
}
@Override
@ -142,7 +142,7 @@ public class DatabaseSingleMapped<A, B> extends ResourceSupport<DatabaseStage<A>
@Override
public Mono<A> clearAndGetPrevious() {
return serializedSingle.clearAndGetPrevious().handle(this::deserializeSink);
return serializedSingle.clearAndGetPrevious().handle((value, sink) -> deserializeSink(value, sink));
}
@Override

View File

@ -168,7 +168,7 @@ public class DatabaseSingleton<U> extends ResourceSupport<DatabaseStage<U>, Data
return serializeValue(result);
}
}
}).transform(mono -> LLUtils.mapLLDelta(mono, serializer::deserialize));
}).transform(mono -> LLUtils.mapLLDelta(mono, serialized -> serializer.deserialize(serialized)));
}
@Override

View File

@ -67,7 +67,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
}
default Mono<Boolean> updateValue(T key, SerializationFunction<@Nullable U, @Nullable U> updater) {
return updateValueAndGetDelta(key, updater).map(LLUtils::isDeltaChanged).single();
return updateValueAndGetDelta(key, updater).map(delta -> LLUtils.isDeltaChanged(delta)).single();
}
default Mono<Delta<U>> updateValueAndGetDelta(T key,
@ -99,7 +99,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
}
default Mono<U> removeAndGetPrevious(T key) {
return Mono.usingWhen(at(null, key), DatabaseStage::clearAndGetPrevious, LLUtils::finalizeResource);
return Mono.usingWhen(at(null, key), us -> us.clearAndGetPrevious(), LLUtils::finalizeResource);
}
default Mono<Boolean> removeAndGetStatus(T key) {

View File

@ -72,19 +72,13 @@ public class CachedIndexSearcherManager extends SimpleResource implements IndexS
this.searcherManager = new SearcherManager(indexWriter, applyAllDeletes, writeAllDeletes, SEARCHER_FACTORY);
this.refreshSubscription = Mono
.fromRunnable(() -> {
try {
maybeRefresh();
} catch (Exception ex) {
LOG.error("Failed to refresh the searcher manager", ex);
}
})
.subscribeOn(uninterruptibleScheduler(luceneHeavyTasksScheduler))
.publishOn(Schedulers.parallel())
.repeatWhen(s -> s.delayElements(queryRefreshDebounceTime))
.transform(LLUtils::handleDiscard)
.subscribe();
refreshSubscription = luceneHeavyTasksScheduler.schedulePeriodically(() -> {
try {
maybeRefreshBlocking();
} catch (Exception ex) {
LOG.error("Failed to refresh the searcher manager", ex);
}
}, queryRefreshDebounceTime.toMillis(), queryRefreshDebounceTime.toMillis(), TimeUnit.MILLISECONDS);
this.cachedSnapshotSearchers = CacheBuilder.newBuilder()
.expireAfterWrite(queryRefreshDebounceTime)
@ -210,7 +204,7 @@ public class CachedIndexSearcherManager extends SimpleResource implements IndexS
return activeRefreshes.get();
}
private class MainIndexSearcher extends LLIndexSearcher {
private class MainIndexSearcher extends LLIndexSearcherImpl {
public MainIndexSearcher(IndexSearcher indexSearcher, SearcherManager searcherManager) {
super(indexSearcher, () -> releaseOnCleanup(searcherManager, indexSearcher));
@ -236,7 +230,7 @@ public class CachedIndexSearcherManager extends SimpleResource implements IndexS
}
}
private class SnapshotIndexSearcher extends LLIndexSearcher {
private class SnapshotIndexSearcher extends LLIndexSearcherImpl {
public SnapshotIndexSearcher(IndexSearcher indexSearcher) {
super(indexSearcher);

View File

@ -18,38 +18,21 @@ public abstract class LLIndexSearcher extends SimpleResource implements Discardi
protected static final Logger LOG = LogManager.getLogger(LLIndexSearcher.class);
protected final IndexSearcher indexSearcher;
public LLIndexSearcher(IndexSearcher indexSearcher) {
public LLIndexSearcher() {
super();
this.indexSearcher = indexSearcher;
}
public LLIndexSearcher(IndexSearcher indexSearcher, AtomicBoolean closed) {
super(closed);
this.indexSearcher = indexSearcher;
}
public LLIndexSearcher(IndexSearcher indexSearcher, AtomicBoolean closed, Runnable cleanAction) {
super(closed, cleanAction);
this.indexSearcher = indexSearcher;
}
public LLIndexSearcher(IndexSearcher indexSearcher, Runnable cleanAction) {
public LLIndexSearcher(Runnable cleanAction) {
super(cleanAction);
this.indexSearcher = indexSearcher;
}
public IndexReader getIndexReader() {
ensureOpen();
return indexSearcher.getIndexReader();
}
public IndexSearcher getIndexSearcher() {
ensureOpen();
return indexSearcher;
return getIndexSearcherInternal();
}
protected abstract IndexSearcher getIndexSearcherInternal();
public AtomicBoolean getClosed() {
return super.getClosed();
}

View File

@ -0,0 +1,34 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.DiscardingCloseable;
import it.cavallium.dbengine.utils.SimpleResource;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.IndexSearcher;
public abstract class LLIndexSearcherImpl extends LLIndexSearcher {
protected static final Logger LOG = LogManager.getLogger(LLIndexSearcherImpl.class);
protected final IndexSearcher indexSearcher;
public LLIndexSearcherImpl(IndexSearcher indexSearcher) {
super();
this.indexSearcher = indexSearcher;
}
public LLIndexSearcherImpl(IndexSearcher indexSearcher, Runnable cleanAction) {
super(cleanAction);
this.indexSearcher = indexSearcher;
}
public IndexSearcher getIndexSearcherInternal() {
return indexSearcher;
}
public AtomicBoolean getClosed() {
return super.getClosed();
}
}

View File

@ -1,27 +1,11 @@
package it.cavallium.dbengine.database.disk;
import io.netty5.buffer.api.Drop;
import io.netty5.buffer.api.Owned;
import io.netty5.buffer.api.Resource;
import io.netty5.buffer.api.Send;
import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.database.DiscardingCloseable;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.lucene.searcher.ShardIndexSearcher;
import it.cavallium.dbengine.utils.SimpleResource;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.MultiReader;
import org.apache.lucene.search.IndexSearcher;
public interface LLIndexSearchers extends DiscardingCloseable {
@ -42,8 +26,6 @@ public interface LLIndexSearchers extends DiscardingCloseable {
LLIndexSearcher llShard(int shardIndex);
IndexReader allShards();
class UnshardedIndexSearchers extends SimpleResource implements LLIndexSearchers {
private final LLIndexSearcher indexSearcher;
@ -78,11 +60,6 @@ public interface LLIndexSearchers extends DiscardingCloseable {
return indexSearcher;
}
@Override
public IndexReader allShards() {
return indexSearcher.getIndexReader();
}
public IndexSearcher shard() {
return this.shard(-1);
}
@ -103,18 +80,12 @@ public interface LLIndexSearchers extends DiscardingCloseable {
private final List<IndexSearcher> indexSearchersVals;
public ShardedIndexSearchers(List<LLIndexSearcher> indexSearchers) {
var shardedIndexSearchers = new ArrayList<LLIndexSearcher>(indexSearchers.size());
List<IndexSearcher> shardedIndexSearchersVals = new ArrayList<>(indexSearchers.size());
for (LLIndexSearcher indexSearcher : indexSearchers) {
shardedIndexSearchersVals.add(indexSearcher.getIndexSearcher());
}
shardedIndexSearchersVals = ShardIndexSearcher.create(shardedIndexSearchersVals);
int i = 0;
for (IndexSearcher shardedIndexSearcher : shardedIndexSearchersVals) {
shardedIndexSearchers.add(new WrappedLLIndexSearcher(shardedIndexSearcher, indexSearchers.get(i)));
i++;
}
this.indexSearchers = shardedIndexSearchers;
this.indexSearchers = indexSearchers;
this.indexSearchersVals = shardedIndexSearchersVals;
}
@ -144,54 +115,11 @@ public interface LLIndexSearchers extends DiscardingCloseable {
return indexSearchers.get(shardIndex);
}
@Override
public IndexReader allShards() {
var irs = new IndexReader[indexSearchersVals.size()];
for (int i = 0, s = indexSearchersVals.size(); i < s; i++) {
irs[i] = indexSearchersVals.get(i).getIndexReader();
}
Object2IntOpenHashMap<IndexReader> indexes = new Object2IntOpenHashMap<>();
for (int i = 0; i < irs.length; i++) {
indexes.put(irs[i], i);
}
try {
return new MultiReader(irs, Comparator.comparingInt(indexes::getInt), false);
} catch (IOException ex) {
// This shouldn't happen
throw new UncheckedIOException(ex);
}
}
@Override
protected void onClose() {
for (LLIndexSearcher indexSearcher : indexSearchers) {
indexSearcher.close();
}
}
private static class WrappedLLIndexSearcher extends LLIndexSearcher {
private final LLIndexSearcher parent;
public WrappedLLIndexSearcher(IndexSearcher indexSearcher, LLIndexSearcher parent) {
super(indexSearcher, parent.getClosed());
this.parent = parent;
}
@Override
public IndexSearcher getIndexSearcher() {
return indexSearcher;
}
@Override
public IndexReader getIndexReader() {
return indexSearcher.getIndexReader();
}
@Override
protected void onClose() {
parent.close();
}
}
}
}

View File

@ -721,7 +721,7 @@ public class LLLocalDictionary implements LLDictionary {
return rangeMono.flatMapMany(range -> {
try (range) {
if (range.isSingle()) {
var rangeSingleMono = rangeMono.map(LLRange::getSingleUnsafe);
var rangeSingleMono = rangeMono.map(llRange -> llRange.getSingleUnsafe());
return getRangeSingle(snapshot, rangeSingleMono);
} else {
return getRangeMulti(snapshot, rangeMono, reverse, smallRange);
@ -738,7 +738,7 @@ public class LLLocalDictionary implements LLDictionary {
return rangeMono.flatMapMany(range -> {
try (range) {
if (range.isSingle()) {
var rangeSingleMono = rangeMono.map(LLRange::getSingleUnsafe);
var rangeSingleMono = rangeMono.map(llRange -> llRange.getSingleUnsafe());
return getRangeSingle(snapshot, rangeSingleMono).map(List::of);
} else {
@ -793,7 +793,7 @@ public class LLLocalDictionary implements LLDictionary {
return rangeMono.flatMapMany(range -> {
try {
if (range.isSingle()) {
return this.getRangeKeysSingle(snapshot, rangeMono.map(LLRange::getSingleUnsafe));
return this.getRangeKeysSingle(snapshot, rangeMono.map(llRange -> llRange.getSingleUnsafe()));
} else {
return this.getRangeKeysMulti(snapshot, rangeMono, reverse, smallRange);
}
@ -1064,7 +1064,7 @@ public class LLLocalDictionary implements LLDictionary {
rocksIterator.seekToFirst();
}
while (rocksIterator.isValid()) {
writeBatch.delete(cfh, LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).send());
writeBatch.delete(cfh, LLUtils.readDirectNioBuffer(alloc, buffer -> rocksIterator.key(buffer)).send());
rocksIterator.next();
}
}
@ -1217,8 +1217,8 @@ public class LLLocalDictionary implements LLDictionary {
rocksIterator.seekToFirst();
}
if (rocksIterator.isValid()) {
try (var key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)) {
try (var value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value)) {
try (var key = LLUtils.readDirectNioBuffer(alloc, buffer -> rocksIterator.key(buffer))) {
try (var value = LLUtils.readDirectNioBuffer(alloc, buffer -> rocksIterator.value(buffer))) {
return LLEntry.of(key.touch("get-one key"), value.touch("get-one value"));
}
}
@ -1246,7 +1246,7 @@ public class LLLocalDictionary implements LLDictionary {
rocksIterator.seekToFirst();
}
if (rocksIterator.isValid()) {
return LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
return LLUtils.readDirectNioBuffer(alloc, buffer -> rocksIterator.key(buffer));
} else {
return null;
}
@ -1345,7 +1345,7 @@ public class LLLocalDictionary implements LLDictionary {
}
}
})
.map(commonPool::submit)
.map(task -> commonPool.submit(task))
.toList();
long count = 0;
for (ForkJoinTask<Long> future : futures) {
@ -1383,8 +1383,8 @@ public class LLLocalDictionary implements LLDictionary {
if (!rocksIterator.isValid()) {
return null;
}
Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
Buffer value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value);
Buffer key = LLUtils.readDirectNioBuffer(alloc, buffer -> rocksIterator.key(buffer));
Buffer value = LLUtils.readDirectNioBuffer(alloc, buffer -> rocksIterator.value(buffer));
db.delete(writeOpts, key);
return LLEntry.of(key, value);
}

View File

@ -97,7 +97,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends
while (rocksIterator.isValid()) {
Buffer key;
if (allowNettyDirect) {
key = LLUtils.readDirectNioBuffer(db.getAllocator(), rocksIterator::key);
key = LLUtils.readDirectNioBuffer(db.getAllocator(), buffer -> rocksIterator.key(buffer));
} else {
key = LLUtils.fromByteArray(db.getAllocator(), rocksIterator.key());
}

View File

@ -546,7 +546,7 @@ public class LLLocalLuceneIndex extends SimpleResource implements LLLuceneIndex
var localNormalizationQuery = QueryParser.toQuery(normalizationQuery, luceneAnalyzer);
Mono<LLIndexSearchers> searchers = searcherManager
.retrieveSearcher(snapshot)
.map(LLIndexSearchers::unsharded);
.map(indexSearcher -> LLIndexSearchers.unsharded(indexSearcher));
return decimalBucketMultiSearcher.collectMulti(searchers, bucketParams, localQueries, localNormalizationQuery);
}

View File

@ -167,7 +167,7 @@ public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneI
LOG.error("Failed to close an index searcher", ex);
}
})
.map(LLIndexSearchers::of);
.map(indexSearchers -> LLIndexSearchers.of(indexSearchers));
}
@Override
@ -255,7 +255,7 @@ public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneI
@Override
public Mono<Void> deleteAll() {
Iterable<Mono<Void>> it = () -> luceneIndicesSet.stream().map(LLLuceneIndex::deleteAll).iterator();
Iterable<Mono<Void>> it = () -> luceneIndicesSet.stream().map(llLocalLuceneIndex -> llLocalLuceneIndex.deleteAll()).iterator();
return Mono.whenDelayError(it);
}
@ -390,7 +390,7 @@ public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneI
// Generate next snapshot index
.fromCallable(nextSnapshotNumber::getAndIncrement)
.flatMap(snapshotIndex -> luceneIndicesFlux
.flatMapSequential(LLLocalLuceneIndex::takeSnapshot)
.flatMapSequential(llLocalLuceneIndex -> llLocalLuceneIndex.takeSnapshot())
.collectList()
.doOnNext(instancesSnapshotsArray -> registeredSnapshots.put(snapshotIndex, instancesSnapshotsArray))
.thenReturn(new LLSnapshot(snapshotIndex))

View File

@ -74,19 +74,13 @@ public class SimpleIndexSearcherManager extends SimpleResource implements IndexS
this.searcherManager = new SearcherManager(indexWriter, applyAllDeletes, writeAllDeletes, SEARCHER_FACTORY);
refreshSubscription = Mono
.fromRunnable(() -> {
try {
maybeRefresh();
} catch (Exception ex) {
LOG.error("Failed to refresh the searcher manager", ex);
}
})
.subscribeOn(luceneHeavyTasksScheduler)
.publishOn(Schedulers.parallel())
.repeatWhen(s -> s.delayElements(queryRefreshDebounceTime))
.transform(LLUtils::handleDiscard)
.subscribe();
refreshSubscription = luceneHeavyTasksScheduler.schedulePeriodically(() -> {
try {
maybeRefreshBlocking();
} catch (Exception ex) {
LOG.error("Failed to refresh the searcher manager", ex);
}
}, queryRefreshDebounceTime.toMillis(), queryRefreshDebounceTime.toMillis(), TimeUnit.MILLISECONDS);
this.noSnapshotSearcherMono = retrieveSearcherInternal(null);
}
@ -134,26 +128,18 @@ public class SimpleIndexSearcherManager extends SimpleResource implements IndexS
if (isClosed()) {
return null;
}
activeSearchers.incrementAndGet();
try {
IndexSearcher indexSearcher;
boolean fromSnapshot;
if (snapshotsManager == null || snapshot == null) {
indexSearcher = searcherManager.acquire();
fromSnapshot = false;
return new OnDemandIndexSearcher(searcherManager, similarity);
} else {
indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher(SEARCH_EXECUTOR);
fromSnapshot = true;
activeSearchers.incrementAndGet();
IndexSearcher indexSearcher = snapshotsManager
.resolveSnapshot(snapshot)
.getIndexSearcher(SEARCH_EXECUTOR);
indexSearcher.setSimilarity(similarity);
assert indexSearcher.getIndexReader().getRefCount() > 0;
return new SnapshotIndexSearcher(indexSearcher);
}
indexSearcher.setSimilarity(similarity);
assert indexSearcher.getIndexReader().getRefCount() > 0;
LLIndexSearcher llIndexSearcher;
if (fromSnapshot) {
llIndexSearcher = new SnapshotIndexSearcher(indexSearcher);
} else {
llIndexSearcher = new MainIndexSearcher(indexSearcher);
}
return llIndexSearcher;
} catch (Throwable ex) {
activeSearchers.decrementAndGet();
throw ex;
@ -200,7 +186,7 @@ public class SimpleIndexSearcherManager extends SimpleResource implements IndexS
return activeRefreshes.get();
}
private class MainIndexSearcher extends LLIndexSearcher {
private class MainIndexSearcher extends LLIndexSearcherImpl {
public MainIndexSearcher(IndexSearcher indexSearcher) {
super(indexSearcher, () -> releaseOnCleanup(searcherManager, indexSearcher));
@ -226,7 +212,7 @@ public class SimpleIndexSearcherManager extends SimpleResource implements IndexS
}
}
private class SnapshotIndexSearcher extends LLIndexSearcher {
private class SnapshotIndexSearcher extends LLIndexSearcherImpl {
public SnapshotIndexSearcher(IndexSearcher indexSearcher) {
super(indexSearcher);
@ -237,4 +223,51 @@ public class SimpleIndexSearcherManager extends SimpleResource implements IndexS
dropCachedIndexSearcher();
}
}
private class OnDemandIndexSearcher extends LLIndexSearcher {
private final SearcherManager searcherManager;
private final Similarity similarity;
private IndexSearcher indexSearcher = null;
public OnDemandIndexSearcher(SearcherManager searcherManager,
Similarity similarity) {
super();
this.searcherManager = searcherManager;
this.similarity = similarity;
}
@Override
protected IndexSearcher getIndexSearcherInternal() {
if (indexSearcher != null) {
return indexSearcher;
}
synchronized (this) {
try {
var indexSearcher = searcherManager.acquire();
indexSearcher.setSimilarity(similarity);
activeSearchers.incrementAndGet();
this.indexSearcher = indexSearcher;
return indexSearcher;
} catch (IOException e) {
throw new IllegalStateException("Failed to acquire the index searcher", e);
}
}
}
@Override
protected void onClose() {
try {
synchronized (this) {
if (indexSearcher != null) {
dropCachedIndexSearcher();
searcherManager.release(indexSearcher);
}
}
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
}
}

View File

@ -49,7 +49,7 @@ public class SnapshotsManager extends SimpleResource {
public Mono<LLSnapshot> takeSnapshot() {
return Mono
.fromCallable(this::takeLuceneSnapshot)
.fromCallable(() -> takeLuceneSnapshot())
.subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.publishOn(Schedulers.parallel());
}

View File

@ -34,7 +34,7 @@ public class FastFacetsCollectorManager implements CollectorManager<FacetsCollec
public FacetsCollector reduce(Collection<FacetsCollector> collectors) throws IOException {
return FacetsCollector.wrap(facetsCollectorManager.reduce(collectors
.stream()
.map(FacetsCollector::getLuceneFacetsCollector)
.map(facetsCollector -> facetsCollector.getLuceneFacetsCollector())
.toList()));
}

View File

@ -73,7 +73,7 @@ public class BigCompositeReader<R extends IndexReader> {
public static <T extends IndexReader> Collection<String> getIndexedFields(BigCompositeReader<T> readers) {
return readers.subReadersList
.stream()
.map(IndexReader::getContext)
.map(t -> t.getContext())
.flatMap(l -> l.leaves().stream())
.flatMap((l) -> StreamSupport
.stream(l.reader().getFieldInfos().spliterator(), false)

View File

@ -63,7 +63,8 @@ public class CountMultiSearcher implements MultiSearcher {
return new LuceneSearchResult(totalHitsCount, Flux.empty(), null);
})
.doOnDiscard(LuceneSearchResult.class, SimpleResource::close), LLUtils::finalizeResource);
.doOnDiscard(LuceneSearchResult.class, luceneSearchResult -> luceneSearchResult.close()),
LLUtils::finalizeResource);
}
@Override

View File

@ -32,7 +32,7 @@ public interface MultiSearcher extends LocalSearcher {
LocalQueryParams queryParams,
@Nullable String keyFieldName,
GlobalQueryRewrite transformer) {
Mono<LLIndexSearchers> searchers = indexSearcherMono.map(LLIndexSearchers::unsharded);
Mono<LLIndexSearchers> searchers = indexSearcherMono.map(indexSearcher -> LLIndexSearchers.unsharded(indexSearcher));
return this.collectMulti(searchers, queryParams, keyFieldName, transformer);
}

View File

@ -44,7 +44,7 @@ public class PagedLocalSearcher implements LocalSearcher {
}
PaginationInfo paginationInfo = getPaginationInfo(queryParams);
var indexSearchersMono = indexSearcherMono.map(LLIndexSearchers::unsharded);
var indexSearchersMono = indexSearcherMono.map(indexSearcher -> LLIndexSearchers.unsharded(indexSearcher));
return singleOrClose(indexSearchersMono, indexSearchers -> this
// Search first page results
@ -159,7 +159,7 @@ public class PagedLocalSearcher implements LocalSearcher {
)
.subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.publishOn(Schedulers.parallel())
.map(PageData::topDocs)
.map(pageData -> pageData.topDocs())
.flatMapIterable(topDocs -> Arrays.asList(topDocs.scoreDocs))
.transform(topFieldDocFlux -> LuceneUtils.convertHits(topFieldDocFlux, indexSearchers,
keyFieldName, true));

View File

@ -152,7 +152,7 @@ public class ScoredPagedMultiSearcher implements MultiSearcher {
})
.subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.publishOn(Schedulers.parallel())
.map(PageData::topDocs)
.map(pageData -> pageData.topDocs())
.flatMapIterable(topDocs -> Arrays.asList(topDocs.scoreDocs))
.transform(topFieldDocFlux -> LuceneUtils.convertHits(topFieldDocFlux, indexSearchers,
keyFieldName, true));

View File

@ -32,20 +32,9 @@ public abstract class SimpleResource implements SafeCloseable {
this(canClose, null);
}
protected SimpleResource(boolean canClose, @Nullable Runnable cleanerAction) {
this(canClose, new AtomicBoolean(), cleanerAction);
}
protected SimpleResource(AtomicBoolean closed) {
this(true, closed, null);
}
protected SimpleResource(AtomicBoolean closed, @Nullable Runnable cleanerAction) {
this(true, closed, cleanerAction);
}
private SimpleResource(boolean canClose, AtomicBoolean closed, @Nullable Runnable cleanerAction) {
private SimpleResource(boolean canClose, @Nullable Runnable cleanerAction) {
this.canClose = canClose;
var closed = new AtomicBoolean();
this.closed = closed;
if (ENABLE_LEAK_DETECTION && canClose) {

View File

@ -31,7 +31,10 @@ public class SwappableLuceneSearcher implements LocalSearcher, MultiSearcher, Cl
LocalQueryParams queryParams,
@Nullable String keyFieldName,
GlobalQueryRewrite transformer) {
var single = requireNonNullElseGet(this.single.get(), this.multi::get);
var single = this.single.get();
if (single == null) {
single = this.multi.get();
}
requireNonNull(single, "LuceneLocalSearcher not set");
return single.collect(indexSearcherMono, queryParams, keyFieldName, transformer);
}