Fix searcher leak

This commit is contained in:
Andrea Cavalli 2022-06-14 17:46:49 +02:00
parent 8e47c15809
commit fb0bd092a4
15 changed files with 126 additions and 49 deletions

View File

@ -1,11 +1,13 @@
package it.cavallium.dbengine.client;
import io.netty5.buffer.api.Resource;
import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.client.query.ClientQueryParams;
import it.cavallium.dbengine.client.query.current.data.Query;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLSearchResult;
import it.cavallium.dbengine.database.LLSearchResultShard;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLTerm;
@ -120,7 +122,9 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
.collectList()
.flatMap(shards -> mergeResults(queryParams, shards))
.map(this::mapResults)
.single();
.single()
.doOnDiscard(LLSearchResultShard.class, ResourceSupport::close)
.doOnDiscard(Hits.class, ResourceSupport::close);
}
@Override

View File

@ -16,9 +16,14 @@ import io.netty5.buffer.api.Send;
import io.netty5.buffer.api.WritableComponent;
import io.netty5.buffer.api.internal.Statics;
import io.netty5.util.IllegalReferenceCountException;
import it.cavallium.dbengine.client.Hits;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.lucene.RandomSortField;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
@ -48,6 +53,7 @@ import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
@ -57,7 +63,6 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import org.rocksdb.AbstractImmutableNativeReference;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
@ -648,6 +653,35 @@ public class LLUtils {
return flux.doOnDiscard(Object.class, LLUtils::onDiscard);
}
/**
* Obtain the resource, then run the closure.
* If the closure publisher returns a single element, then the resource is kept open,
* otherwise it is closed.
*/
public static <T extends AutoCloseable, U> Mono<U> singleOrClose(Mono<T> resourceMono,
Function<T, Mono<U>> closure) {
return Mono.usingWhen(resourceMono,
resource -> closure.apply(resource).doOnSuccess(s -> {
if (s == null) {
try {
resource.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}),
resource -> Mono.empty(),
(resource, ex) -> Mono.fromCallable(() -> {
resource.close();
return null;
}),
resource -> Mono.fromCallable(() -> {
resource.close();
return null;
})
);
}
@Deprecated
public record DirectBuffer(@NotNull Buffer buffer, @NotNull ByteBuffer byteBuffer) {}
@ -893,6 +927,22 @@ public class LLUtils {
if (rocksObj.isOwningHandle()) {
rocksObj.close();
}
} else if (next instanceof Hits<?> hits) {
hits.close();
} else if (next instanceof LLIndexSearcher searcher) {
try {
searcher.close();
} catch (IOException e) {
logger.error("Failed to close searcher {}", searcher, e);
}
} else if (next instanceof LLIndexSearchers searchers) {
try {
searchers.close();
} catch (IOException e) {
logger.error("Failed to close searchers {}", searchers, e);
}
} else if (next instanceof LLSearchResultShard shard) {
shard.close();
} else if (next instanceof Optional<?> optional) {
optional.ifPresent(LLUtils::onNextDropped);
} else if (next instanceof Map.Entry<?, ?> entry) {

View File

@ -5,7 +5,6 @@ import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterrupti
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import java.io.IOException;
@ -36,7 +35,7 @@ import reactor.core.scheduler.Schedulers;
public class CachedIndexSearcherManager implements IndexSearcherManager {
private static final Logger logger = LogManager.getLogger(CachedIndexSearcherManager.class);
private static final Logger LOG = LogManager.getLogger(CachedIndexSearcherManager.class);
private final ExecutorService searchExecutor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
new ShortNamedThreadFactory("lucene-search")
@ -60,7 +59,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
private final Empty<Void> closeRequestedMono = Sinks.empty();
private final Mono<Void> closeMono;
private final Cleaner cleaner = Cleaner.create();
private static final Cleaner CLEANER = Cleaner.create();
public CachedIndexSearcherManager(IndexWriter indexWriter,
SnapshotsManager snapshotsManager,
@ -82,7 +81,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
try {
maybeRefresh();
} catch (Exception ex) {
logger.error("Failed to refresh the searcher manager", ex);
LOG.error("Failed to refresh the searcher manager", ex);
}
})
.subscribeOn(luceneHeavyTasksScheduler)
@ -107,26 +106,26 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
this.closeMono = Mono
.fromRunnable(() -> {
logger.debug("Closing IndexSearcherManager...");
LOG.debug("Closing IndexSearcherManager...");
this.closeRequested.set(true);
this.closeRequestedMono.tryEmitEmpty();
})
.then(refresherClosed.asMono())
.then(Mono.<Void>fromRunnable(() -> {
logger.debug("Closed IndexSearcherManager");
logger.debug("Closing refreshes...");
LOG.debug("Closed IndexSearcherManager");
LOG.debug("Closing refreshes...");
long initTime = System.nanoTime();
while (activeRefreshes.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
LockSupport.parkNanos(50000000);
}
logger.debug("Closed refreshes...");
logger.debug("Closing active searchers...");
LOG.debug("Closed refreshes...");
LOG.debug("Closing active searchers...");
initTime = System.nanoTime();
while (activeSearchers.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
LockSupport.parkNanos(50000000);
}
logger.debug("Closed active searchers");
logger.debug("Stopping searcher executor...");
LOG.debug("Closed active searchers");
LOG.debug("Stopping searcher executor...");
cachedSnapshotSearchers.invalidateAll();
cachedSnapshotSearchers.cleanUp();
searchExecutor.shutdown();
@ -136,9 +135,9 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
searchExecutor.shutdownNow();
}
} catch (InterruptedException e) {
logger.error("Failed to stop executor", e);
LOG.error("Failed to stop executor", e);
}
logger.debug("Stopped searcher executor");
LOG.debug("Stopped searcher executor");
}).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())))
.publishOn(Schedulers.parallel())
.cache();
@ -161,23 +160,25 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
}
indexSearcher.setSimilarity(similarity);
assert indexSearcher.getIndexReader().getRefCount() > 0;
var closed = new AtomicBoolean();
LLIndexSearcher llIndexSearcher;
if (fromSnapshot) {
llIndexSearcher = new SnapshotIndexSearcher(indexSearcher);
llIndexSearcher = new SnapshotIndexSearcher(indexSearcher, closed);
} else {
var released = new AtomicBoolean();
llIndexSearcher = new MainIndexSearcher(indexSearcher, released);
cleaner.register(llIndexSearcher, () -> {
if (released.compareAndSet(false, true)) {
logger.warn("An index searcher was not closed!");
llIndexSearcher = new MainIndexSearcher(indexSearcher, closed);
}
CLEANER.register(llIndexSearcher, () -> {
if (closed.compareAndSet(false, true)) {
LOG.warn("An index searcher was not closed!");
if (!fromSnapshot) {
try {
searcherManager.release(indexSearcher);
} catch (IOException e) {
logger.error("Failed to release the index searcher", e);
LOG.error("Failed to release the index searcher", e);
}
}
});
}
}
});
return llIndexSearcher;
});
}
@ -235,17 +236,14 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
private class MainIndexSearcher extends LLIndexSearcher {
private final AtomicBoolean released;
public MainIndexSearcher(IndexSearcher indexSearcher, AtomicBoolean released) {
super(indexSearcher);
this.released = released;
super(indexSearcher, released);
}
@Override
public void onClose() throws IOException {
dropCachedIndexSearcher();
if (released.compareAndSet(false, true)) {
if (getClosed().compareAndSet(false, true)) {
searcherManager.release(indexSearcher);
}
}
@ -253,8 +251,8 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
private class SnapshotIndexSearcher extends LLIndexSearcher {
public SnapshotIndexSearcher(IndexSearcher indexSearcher) {
super(indexSearcher);
public SnapshotIndexSearcher(IndexSearcher indexSearcher, AtomicBoolean closed) {
super(indexSearcher, closed);
}
@Override

View File

@ -17,10 +17,16 @@ public abstract class LLIndexSearcher implements Closeable {
protected static final Logger LOG = LogManager.getLogger(LLIndexSearcher.class);
protected final IndexSearcher indexSearcher;
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicBoolean closed;
public LLIndexSearcher(IndexSearcher indexSearcher) {
this.indexSearcher = indexSearcher;
this.closed = new AtomicBoolean();
}
public LLIndexSearcher(IndexSearcher indexSearcher, AtomicBoolean closed) {
this.indexSearcher = indexSearcher;
this.closed = closed;
}
public IndexReader getIndexReader() {
@ -33,6 +39,10 @@ public abstract class LLIndexSearcher implements Closeable {
return indexSearcher;
}
public AtomicBoolean getClosed() {
return closed;
}
@Override
public final void close() throws IOException {
if (closed.compareAndSet(false, true)) {

View File

@ -147,14 +147,20 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
}
private Mono<LLIndexSearchers> getIndexSearchers(LLSnapshot snapshot) {
return luceneIndicesFlux
.index()
return luceneIndicesFlux.index()
// Resolve the snapshot of each shard
.flatMap(tuple -> Mono
.fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1()))
.flatMap(luceneSnapshot -> tuple.getT2().retrieveSearcher(luceneSnapshot.orElse(null)))
)
.collectList()
.doOnDiscard(LLIndexSearcher.class, indexSearcher -> {
try {
indexSearcher.close();
} catch (IOException ex) {
LOG.error("Failed to close an index searcher", ex);
}
})
.map(LLIndexSearchers::of);
}

View File

@ -1,18 +1,16 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import static it.cavallium.dbengine.database.LLUtils.singleOrClose;
import static it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite.NO_REWRITE;
import io.netty5.buffer.api.Send;
import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import java.io.IOException;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuples;
public class AdaptiveLocalSearcher implements LocalSearcher {
@ -46,7 +44,7 @@ public class AdaptiveLocalSearcher implements LocalSearcher {
LocalQueryParams queryParams,
@Nullable String keyFieldName,
GlobalQueryRewrite transformer) {
return indexSearcherMono.flatMap(indexSearcher -> {
return singleOrClose(indexSearcherMono, indexSearcher -> {
var indexSearchers = LLIndexSearchers.unsharded(indexSearcher);
if (transformer == NO_REWRITE) {

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import static it.cavallium.dbengine.database.LLUtils.singleOrClose;
import static it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite.NO_REWRITE;
import io.netty5.buffer.api.Send;
@ -44,7 +45,7 @@ public class AdaptiveMultiSearcher implements MultiSearcher {
LocalQueryParams queryParams,
@Nullable String keyFieldName,
GlobalQueryRewrite transformer) {
return indexSearchersMono.flatMap(indexSearchers -> {
return singleOrClose(indexSearchersMono, indexSearchers -> {
if (transformer == NO_REWRITE) {
return transformedCollectMulti(indexSearchers, queryParams, keyFieldName, transformer);
} else {

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import static it.cavallium.dbengine.database.LLUtils.singleOrClose;
import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
@ -27,7 +28,7 @@ public class CountMultiSearcher implements MultiSearcher {
LocalQueryParams queryParams,
String keyFieldName,
GlobalQueryRewrite transformer) {
return indexSearchersMono.flatMap(indexSearchers -> {
return singleOrClose(indexSearchersMono, indexSearchers -> {
Mono<LocalQueryParams> queryParamsMono;
if (transformer == GlobalQueryRewrite.NO_REWRITE) {
queryParamsMono = Mono.just(queryParams);
@ -104,7 +105,7 @@ public class CountMultiSearcher implements MultiSearcher {
LocalQueryParams queryParams,
@Nullable String keyFieldName,
GlobalQueryRewrite transformer) {
return indexSearcherMono.flatMap(indexSearcher -> {
return singleOrClose(indexSearcherMono, indexSearcher -> {
Mono<LocalQueryParams> queryParamsMono;
if (transformer == GlobalQueryRewrite.NO_REWRITE) {
queryParamsMono = Mono.just(queryParams);

View File

@ -1,5 +1,7 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.database.LLUtils.singleOrClose;
import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
@ -30,7 +32,8 @@ public interface MultiSearcher extends LocalSearcher {
LocalQueryParams queryParams,
@Nullable String keyFieldName,
GlobalQueryRewrite transformer) {
Mono<LLIndexSearchers> searchers = indexSearcherMono.map(LLIndexSearchers::unsharded);
Mono<LLIndexSearchers> searchers = singleOrClose(indexSearcherMono, indexSearcher ->
Mono.just(LLIndexSearchers.unsharded(indexSearcher)));
return this.collectMulti(searchers, queryParams, keyFieldName, transformer);
}

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import static it.cavallium.dbengine.database.LLUtils.singleOrClose;
import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.EMPTY_STATUS;
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT;
@ -39,7 +40,7 @@ public class PagedLocalSearcher implements LocalSearcher {
GlobalQueryRewrite transformer) {
PaginationInfo paginationInfo = getPaginationInfo(queryParams);
return indexSearcherMono.flatMap(indexSearcher -> {
return singleOrClose(indexSearcherMono, indexSearcher -> {
var indexSearchers = LLIndexSearchers.unsharded(indexSearcher);
Mono<LocalQueryParams> queryParamsMono;

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import static it.cavallium.dbengine.database.LLUtils.singleOrClose;
import static it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite.NO_REWRITE;
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT;
@ -37,7 +38,7 @@ public class ScoredPagedMultiSearcher implements MultiSearcher {
LocalQueryParams queryParams,
@Nullable String keyFieldName,
GlobalQueryRewrite transformer) {
return indexSearchersMono.flatMap(indexSearchers -> {
return singleOrClose(indexSearchersMono, indexSearchers -> {
Mono<LocalQueryParams> queryParamsMono;
if (transformer == GlobalQueryRewrite.NO_REWRITE) {
queryParamsMono = Mono.just(queryParams);

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import static it.cavallium.dbengine.database.LLUtils.singleOrClose;
import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.LLKeyScore;
@ -35,7 +36,7 @@ public class SortedByScoreFullMultiSearcher implements MultiSearcher {
LocalQueryParams queryParams,
@Nullable String keyFieldName,
GlobalQueryRewrite transformer) {
return indexSearchersMono.flatMap(indexSearchers -> {
return singleOrClose(indexSearchersMono, indexSearchers -> {
Mono<LocalQueryParams> queryParamsMono;
if (transformer == GlobalQueryRewrite.NO_REWRITE) {
queryParamsMono = Mono.just(queryParams);

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import static it.cavallium.dbengine.database.LLUtils.singleOrClose;
import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.LLKeyScore;
@ -35,7 +36,7 @@ public class SortedScoredFullMultiSearcher implements MultiSearcher {
LocalQueryParams queryParams,
@Nullable String keyFieldName,
GlobalQueryRewrite transformer) {
return indexSearchersMono.flatMap(indexSearchers -> {
return singleOrClose(indexSearchersMono, indexSearchers -> {
Mono<LocalQueryParams> queryParamsMono;
if (transformer == GlobalQueryRewrite.NO_REWRITE) {
queryParamsMono = Mono.just(queryParams);

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import static it.cavallium.dbengine.database.LLUtils.singleOrClose;
import static java.util.Objects.requireNonNull;
import io.netty5.buffer.api.Send;
@ -36,7 +37,7 @@ public class StandardSearcher implements MultiSearcher {
LocalQueryParams queryParams,
@Nullable String keyFieldName,
GlobalQueryRewrite transformer) {
return indexSearchersMono.flatMap(indexSearchers -> {
return singleOrClose(indexSearchersMono, indexSearchers -> {
Mono<LocalQueryParams> queryParamsMono;
if (transformer == GlobalQueryRewrite.NO_REWRITE) {
queryParamsMono = Mono.just(queryParams);

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import static it.cavallium.dbengine.database.LLUtils.singleOrClose;
import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
@ -31,7 +32,7 @@ public class UnsortedStreamingMultiSearcher implements MultiSearcher {
LocalQueryParams queryParams,
@Nullable String keyFieldName,
GlobalQueryRewrite transformer) {
return indexSearchersMono.flatMap(indexSearchers -> {
return singleOrClose(indexSearchersMono, indexSearchers -> {
Mono<LocalQueryParams> queryParamsMono;
if (transformer == GlobalQueryRewrite.NO_REWRITE) {
queryParamsMono = Mono.just(queryParams);