Leak detection
This commit is contained in:
parent
aee08f3e48
commit
831af1ef81
@ -27,7 +27,7 @@ public final class Hits<T> extends SimpleResource {
|
||||
}
|
||||
|
||||
private Hits(Flux<T> results, TotalHitsCount totalHitsCount, Runnable onClose, boolean canClose) {
|
||||
super(canClose);
|
||||
super(canClose && onClose != null);
|
||||
this.results = results;
|
||||
this.totalHitsCount = totalHitsCount;
|
||||
this.onClose = onClose;
|
||||
|
@ -7,6 +7,7 @@ import com.google.common.cache.CacheLoader;
|
||||
import com.google.common.cache.LoadingCache;
|
||||
import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.utils.SimpleResource;
|
||||
import java.io.IOException;
|
||||
import java.lang.ref.Cleaner;
|
||||
import java.time.Duration;
|
||||
@ -33,9 +34,10 @@ import reactor.core.publisher.Sinks.Empty;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
|
||||
// todo: deduplicate code between Cached and Simple searcher managers
|
||||
public class CachedIndexSearcherManager implements IndexSearcherManager {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(CachedIndexSearcherManager.class);
|
||||
private static final Logger LOG = LogManager.getLogger(SimpleIndexSearcherManager.class);
|
||||
private static final ExecutorService SEARCH_EXECUTOR = Executors.newFixedThreadPool(
|
||||
Runtime.getRuntime().availableProcessors(),
|
||||
new ShortNamedThreadFactory("lucene-search")
|
||||
@ -43,6 +45,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
|
||||
);
|
||||
private static final SearcherFactory SEARCHER_FACTORY = new ExecutorSearcherFactory(SEARCH_EXECUTOR);
|
||||
|
||||
@Nullable
|
||||
private final SnapshotsManager snapshotsManager;
|
||||
private final Scheduler luceneHeavyTasksScheduler;
|
||||
private final Similarity similarity;
|
||||
@ -59,10 +62,8 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
|
||||
private final Empty<Void> closeRequestedMono = Sinks.empty();
|
||||
private final Mono<Void> closeMono;
|
||||
|
||||
private static final Cleaner CLEANER = Cleaner.create();
|
||||
|
||||
public CachedIndexSearcherManager(IndexWriter indexWriter,
|
||||
SnapshotsManager snapshotsManager,
|
||||
@Nullable SnapshotsManager snapshotsManager,
|
||||
Scheduler luceneHeavyTasksScheduler,
|
||||
Similarity similarity,
|
||||
boolean applyAllDeletes,
|
||||
@ -76,7 +77,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
|
||||
this.searcherManager = new SearcherManager(indexWriter, applyAllDeletes, writeAllDeletes, SEARCHER_FACTORY);
|
||||
|
||||
Empty<Void> refresherClosed = Sinks.empty();
|
||||
Mono
|
||||
var refreshSubscription = Mono
|
||||
.fromRunnable(() -> {
|
||||
try {
|
||||
maybeRefresh();
|
||||
@ -109,6 +110,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
|
||||
LOG.debug("Closing IndexSearcherManager...");
|
||||
this.closeRequested.set(true);
|
||||
this.closeRequestedMono.tryEmitEmpty();
|
||||
refreshSubscription.dispose();
|
||||
})
|
||||
.then(refresherClosed.asMono())
|
||||
.then(Mono.<Void>fromRunnable(() -> {
|
||||
@ -152,7 +154,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
|
||||
try {
|
||||
IndexSearcher indexSearcher;
|
||||
boolean fromSnapshot;
|
||||
if (snapshot == null) {
|
||||
if (snapshotsManager == null || snapshot == null) {
|
||||
indexSearcher = searcherManager.acquire();
|
||||
fromSnapshot = false;
|
||||
} else {
|
||||
@ -169,7 +171,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
|
||||
} else {
|
||||
llIndexSearcher = new MainIndexSearcher(indexSearcher, closed);
|
||||
}
|
||||
CLEANER.register(llIndexSearcher, () -> {
|
||||
SimpleResource.CLEANER.register(llIndexSearcher, () -> {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
LOG.warn("An index searcher was not closed!");
|
||||
if (!fromSnapshot) {
|
||||
@ -186,7 +188,9 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
|
||||
activeSearchers.decrementAndGet();
|
||||
throw ex;
|
||||
}
|
||||
});
|
||||
})
|
||||
.subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
|
||||
.publishOn(Schedulers.parallel());
|
||||
}
|
||||
|
||||
private void dropCachedIndexSearcher() {
|
||||
|
@ -100,6 +100,14 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
));
|
||||
private static final Scheduler bulkScheduler = luceneWriteScheduler;
|
||||
|
||||
private static final boolean ENABLE_SNAPSHOTS
|
||||
= Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.lucene.snapshot.enable", "true"));
|
||||
|
||||
private static final boolean CACHE_SEARCHER_MANAGER
|
||||
= Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.lucene.cachedsearchermanager.enable", "true"));
|
||||
|
||||
private static final LLSnapshot DUMMY_SNAPSHOT = new LLSnapshot(-1);
|
||||
|
||||
static {
|
||||
LLUtils.initHooks();
|
||||
}
|
||||
@ -169,7 +177,9 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
var indexWriterConfig = new IndexWriterConfig(luceneAnalyzer);
|
||||
IndexDeletionPolicy deletionPolicy;
|
||||
deletionPolicy = requireNonNull(indexWriterConfig.getIndexDeletionPolicy());
|
||||
deletionPolicy = new SnapshotDeletionPolicy(deletionPolicy);
|
||||
if (ENABLE_SNAPSHOTS) {
|
||||
deletionPolicy = new SnapshotDeletionPolicy(deletionPolicy);
|
||||
}
|
||||
indexWriterConfig.setIndexDeletionPolicy(deletionPolicy);
|
||||
indexWriterConfig.setCommitOnClose(true);
|
||||
int writerSchedulerMaxThreadCount;
|
||||
@ -213,15 +223,30 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
}
|
||||
indexWriterConfig.setSimilarity(getLuceneSimilarity());
|
||||
this.indexWriter = new IndexWriter(directory, indexWriterConfig);
|
||||
this.snapshotsManager = new SnapshotsManager(indexWriter, (SnapshotDeletionPolicy) deletionPolicy);
|
||||
var searcherManager = new CachedIndexSearcherManager(indexWriter,
|
||||
snapshotsManager,
|
||||
luceneHeavyTasksScheduler,
|
||||
getLuceneSimilarity(),
|
||||
luceneOptions.applyAllDeletes().orElse(true),
|
||||
luceneOptions.writeAllDeletes().orElse(false),
|
||||
luceneOptions.queryRefreshDebounceTime()
|
||||
);
|
||||
if (ENABLE_SNAPSHOTS) {
|
||||
this.snapshotsManager = new SnapshotsManager(indexWriter, (SnapshotDeletionPolicy) deletionPolicy);
|
||||
} else {
|
||||
this.snapshotsManager = null;
|
||||
}
|
||||
SimpleIndexSearcherManager searcherManager;
|
||||
if (CACHE_SEARCHER_MANAGER) {
|
||||
searcherManager = new SimpleIndexSearcherManager(indexWriter,
|
||||
snapshotsManager,
|
||||
luceneHeavyTasksScheduler,
|
||||
getLuceneSimilarity(),
|
||||
luceneOptions.applyAllDeletes().orElse(true),
|
||||
luceneOptions.writeAllDeletes().orElse(false),
|
||||
luceneOptions.queryRefreshDebounceTime()
|
||||
);
|
||||
} else {
|
||||
searcherManager = new SimpleIndexSearcherManager(indexWriter,
|
||||
snapshotsManager,
|
||||
luceneHeavyTasksScheduler,
|
||||
getLuceneSimilarity(),
|
||||
luceneOptions.applyAllDeletes().orElse(true),
|
||||
luceneOptions.writeAllDeletes().orElse(false),
|
||||
luceneOptions.queryRefreshDebounceTime());
|
||||
}
|
||||
this.searcherManager = searcherManager;
|
||||
|
||||
this.startedDocIndexings = meterRegistry.counter("index.write.doc.started.counter", "index.name", clusterName);
|
||||
@ -243,12 +268,12 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
meterRegistry.gauge("index.searcher.refreshes.active.count",
|
||||
List.of(Tag.of("index.name", clusterName)),
|
||||
searcherManager,
|
||||
CachedIndexSearcherManager::getActiveRefreshes
|
||||
SimpleIndexSearcherManager::getActiveRefreshes
|
||||
);
|
||||
meterRegistry.gauge("index.searcher.searchers.active.count",
|
||||
List.of(Tag.of("index.name", clusterName)),
|
||||
searcherManager,
|
||||
CachedIndexSearcherManager::getActiveSearchers
|
||||
SimpleIndexSearcherManager::getActiveSearchers
|
||||
);
|
||||
|
||||
// Start scheduled tasks
|
||||
@ -268,6 +293,9 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
|
||||
@Override
|
||||
public Mono<LLSnapshot> takeSnapshot() {
|
||||
if (snapshotsManager == null) {
|
||||
return Mono.just(DUMMY_SNAPSHOT);
|
||||
}
|
||||
return snapshotsManager.takeSnapshot().elapsed().map(elapsed -> {
|
||||
snapshotTime.record(elapsed.getT1(), TimeUnit.MILLISECONDS);
|
||||
return elapsed.getT2();
|
||||
@ -293,6 +321,12 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
|
||||
@Override
|
||||
public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
|
||||
if (snapshotsManager == null) {
|
||||
if (snapshot != null && !Objects.equals(snapshot, DUMMY_SNAPSHOT)) {
|
||||
return Mono.error(new IllegalStateException("Can't release snapshot " + snapshot));
|
||||
}
|
||||
return Mono.empty();
|
||||
}
|
||||
return snapshotsManager
|
||||
.releaseSnapshot(snapshot)
|
||||
.elapsed()
|
||||
@ -693,6 +727,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
if (closeRequested.get()) {
|
||||
return 0d;
|
||||
}
|
||||
if (snapshotsManager == null) return 0d;
|
||||
return snapshotsManager.getSnapshotsCount();
|
||||
} finally {
|
||||
shutdownLock.unlock();
|
||||
|
@ -0,0 +1,260 @@
|
||||
package it.cavallium.dbengine.database.disk;
|
||||
|
||||
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
|
||||
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.CacheLoader;
|
||||
import com.google.common.cache.LoadingCache;
|
||||
import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.utils.SimpleResource;
|
||||
import java.io.IOException;
|
||||
import java.lang.ref.Cleaner;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.SearcherFactory;
|
||||
import org.apache.lucene.search.SearcherManager;
|
||||
import org.apache.lucene.search.similarities.Similarity;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import it.cavallium.dbengine.utils.ShortNamedThreadFactory;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.Sinks;
|
||||
import reactor.core.publisher.Sinks.Empty;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
|
||||
// todo: deduplicate code between Cached and Simple searcher managers
|
||||
public class SimpleIndexSearcherManager implements IndexSearcherManager {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(SimpleIndexSearcherManager.class);
|
||||
private static final ExecutorService SEARCH_EXECUTOR = Executors.newFixedThreadPool(
|
||||
Runtime.getRuntime().availableProcessors(),
|
||||
new ShortNamedThreadFactory("lucene-search")
|
||||
.setDaemon(true).withGroup(new ThreadGroup("lucene-search"))
|
||||
);
|
||||
private static final SearcherFactory SEARCHER_FACTORY = new ExecutorSearcherFactory(SEARCH_EXECUTOR);
|
||||
|
||||
@Nullable
|
||||
private final SnapshotsManager snapshotsManager;
|
||||
private final Scheduler luceneHeavyTasksScheduler;
|
||||
private final Similarity similarity;
|
||||
private final SearcherManager searcherManager;
|
||||
private final Duration queryRefreshDebounceTime;
|
||||
|
||||
private Mono<LLIndexSearcher> noSnapshotSearcherMono;
|
||||
|
||||
private final AtomicLong activeSearchers = new AtomicLong(0);
|
||||
private final AtomicLong activeRefreshes = new AtomicLong(0);
|
||||
|
||||
private final AtomicBoolean closeRequested = new AtomicBoolean();
|
||||
private final Empty<Void> closeRequestedMono = Sinks.empty();
|
||||
private final Mono<Void> closeMono;
|
||||
|
||||
public SimpleIndexSearcherManager(IndexWriter indexWriter,
|
||||
@Nullable SnapshotsManager snapshotsManager,
|
||||
Scheduler luceneHeavyTasksScheduler,
|
||||
Similarity similarity,
|
||||
boolean applyAllDeletes,
|
||||
boolean writeAllDeletes,
|
||||
Duration queryRefreshDebounceTime) throws IOException {
|
||||
this.snapshotsManager = snapshotsManager;
|
||||
this.luceneHeavyTasksScheduler = luceneHeavyTasksScheduler;
|
||||
this.similarity = similarity;
|
||||
this.queryRefreshDebounceTime = queryRefreshDebounceTime;
|
||||
|
||||
this.searcherManager = new SearcherManager(indexWriter, applyAllDeletes, writeAllDeletes, SEARCHER_FACTORY);
|
||||
|
||||
Empty<Void> refresherClosed = Sinks.empty();
|
||||
var refreshSubscription = Mono
|
||||
.fromRunnable(() -> {
|
||||
try {
|
||||
maybeRefresh();
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Failed to refresh the searcher manager", ex);
|
||||
}
|
||||
})
|
||||
.subscribeOn(luceneHeavyTasksScheduler)
|
||||
.publishOn(Schedulers.parallel())
|
||||
.repeatWhen(s -> s.delayElements(queryRefreshDebounceTime))
|
||||
.takeUntilOther(closeRequestedMono.asMono())
|
||||
.doAfterTerminate(refresherClosed::tryEmitEmpty)
|
||||
.transform(LLUtils::handleDiscard)
|
||||
.subscribe();
|
||||
|
||||
this.closeMono = Mono
|
||||
.fromRunnable(() -> {
|
||||
LOG.debug("Closing IndexSearcherManager...");
|
||||
this.closeRequested.set(true);
|
||||
this.closeRequestedMono.tryEmitEmpty();
|
||||
refreshSubscription.dispose();
|
||||
})
|
||||
.then(refresherClosed.asMono())
|
||||
.then(Mono.<Void>fromRunnable(() -> {
|
||||
LOG.debug("Closed IndexSearcherManager");
|
||||
LOG.debug("Closing refreshes...");
|
||||
long initTime = System.nanoTime();
|
||||
while (activeRefreshes.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
|
||||
LockSupport.parkNanos(50000000);
|
||||
}
|
||||
LOG.debug("Closed refreshes...");
|
||||
LOG.debug("Closing active searchers...");
|
||||
initTime = System.nanoTime();
|
||||
while (activeSearchers.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
|
||||
LockSupport.parkNanos(50000000);
|
||||
}
|
||||
LOG.debug("Closed active searchers");
|
||||
LOG.debug("Stopping searcher executor...");
|
||||
SEARCH_EXECUTOR.shutdown();
|
||||
try {
|
||||
//noinspection BlockingMethodInNonBlockingContext
|
||||
if (!SEARCH_EXECUTOR.awaitTermination(15, TimeUnit.SECONDS)) {
|
||||
SEARCH_EXECUTOR.shutdownNow();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("Failed to stop executor", e);
|
||||
}
|
||||
LOG.debug("Stopped searcher executor");
|
||||
}).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())))
|
||||
.publishOn(Schedulers.parallel())
|
||||
.cache();
|
||||
this.noSnapshotSearcherMono = retrieveSearcherInternal(null);
|
||||
}
|
||||
|
||||
private void dropCachedIndexSearcher() {
|
||||
// This shouldn't happen more than once per searcher.
|
||||
activeSearchers.decrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void maybeRefreshBlocking() throws IOException {
|
||||
try {
|
||||
activeRefreshes.incrementAndGet();
|
||||
searcherManager.maybeRefreshBlocking();
|
||||
} catch (AlreadyClosedException ignored) {
|
||||
|
||||
} finally {
|
||||
activeRefreshes.decrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void maybeRefresh() throws IOException {
|
||||
try {
|
||||
activeRefreshes.incrementAndGet();
|
||||
searcherManager.maybeRefresh();
|
||||
} catch (AlreadyClosedException ignored) {
|
||||
|
||||
} finally {
|
||||
activeRefreshes.decrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<LLIndexSearcher> retrieveSearcher(@Nullable LLSnapshot snapshot) {
|
||||
if (snapshot == null) {
|
||||
return noSnapshotSearcherMono;
|
||||
} else {
|
||||
return retrieveSearcherInternal(snapshot);
|
||||
}
|
||||
}
|
||||
|
||||
private Mono<LLIndexSearcher> retrieveSearcherInternal(@Nullable LLSnapshot snapshot) {
|
||||
return Mono.fromCallable(() -> {
|
||||
if (closeRequested.get()) {
|
||||
return null;
|
||||
}
|
||||
activeSearchers.incrementAndGet();
|
||||
try {
|
||||
IndexSearcher indexSearcher;
|
||||
boolean fromSnapshot;
|
||||
if (snapshotsManager == null || snapshot == null) {
|
||||
indexSearcher = searcherManager.acquire();
|
||||
fromSnapshot = false;
|
||||
} else {
|
||||
//noinspection resource
|
||||
indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher(SEARCH_EXECUTOR);
|
||||
fromSnapshot = true;
|
||||
}
|
||||
indexSearcher.setSimilarity(similarity);
|
||||
assert indexSearcher.getIndexReader().getRefCount() > 0;
|
||||
var closed = new AtomicBoolean();
|
||||
LLIndexSearcher llIndexSearcher;
|
||||
if (fromSnapshot) {
|
||||
llIndexSearcher = new SnapshotIndexSearcher(indexSearcher, closed);
|
||||
} else {
|
||||
llIndexSearcher = new MainIndexSearcher(indexSearcher, closed);
|
||||
}
|
||||
SimpleResource.CLEANER.register(llIndexSearcher, () -> {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
LOG.warn("An index searcher was not closed!");
|
||||
if (!fromSnapshot) {
|
||||
try {
|
||||
searcherManager.release(indexSearcher);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to release the index searcher", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
return llIndexSearcher;
|
||||
} catch (Throwable ex) {
|
||||
activeSearchers.decrementAndGet();
|
||||
throw ex;
|
||||
}
|
||||
})
|
||||
.subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
|
||||
.publishOn(Schedulers.parallel());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> close() {
|
||||
return closeMono;
|
||||
}
|
||||
|
||||
public long getActiveSearchers() {
|
||||
return activeSearchers.get();
|
||||
}
|
||||
|
||||
public long getActiveRefreshes() {
|
||||
return activeRefreshes.get();
|
||||
}
|
||||
|
||||
private class MainIndexSearcher extends LLIndexSearcher {
|
||||
|
||||
public MainIndexSearcher(IndexSearcher indexSearcher, AtomicBoolean released) {
|
||||
super(indexSearcher, released);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClose() throws IOException {
|
||||
dropCachedIndexSearcher();
|
||||
if (getClosed().compareAndSet(false, true)) {
|
||||
searcherManager.release(indexSearcher);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class SnapshotIndexSearcher extends LLIndexSearcher {
|
||||
|
||||
public SnapshotIndexSearcher(IndexSearcher indexSearcher,
|
||||
AtomicBoolean closed) {
|
||||
super(indexSearcher, closed);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClose() throws IOException {
|
||||
dropCachedIndexSearcher();
|
||||
}
|
||||
}
|
||||
}
|
@ -268,7 +268,7 @@ public class DecimalBucketMultiCollectorManager implements CollectorMultiManager
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buckets reduce(List<Buckets> reducedBucketsList) throws IOException {
|
||||
public Buckets reduce(List<Buckets> reducedBucketsList) {
|
||||
List<DoubleArrayList> seriesReducedValues = new ArrayList<>();
|
||||
double[] reducedTotals = newBuckets();
|
||||
for (var seriesBuckets : reducedBucketsList) {
|
||||
|
@ -0,0 +1,30 @@
|
||||
package it.cavallium.dbengine.lucene.searcher;
|
||||
|
||||
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import reactor.core.CoreSubscriber;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
public class CountedFlux<T> extends Flux<T> {
|
||||
|
||||
private final TotalHitsCount totalHitsCount;
|
||||
private final Flux<? extends T> flux;
|
||||
|
||||
private CountedFlux(TotalHitsCount totalHitsCount, Flux<? extends T> flux) {
|
||||
this.totalHitsCount = totalHitsCount;
|
||||
this.flux = flux;
|
||||
}
|
||||
|
||||
public static <T> CountedFlux<T> of(TotalHitsCount totalHitsCount, Flux<? extends T> flux) {
|
||||
return new CountedFlux<>(totalHitsCount, flux);
|
||||
}
|
||||
|
||||
public TotalHitsCount totalHitsCount() {
|
||||
return totalHitsCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe(@NotNull CoreSubscriber<? super T> actual) {
|
||||
flux.subscribe(actual);
|
||||
}
|
||||
}
|
@ -1,5 +1,7 @@
|
||||
package it.cavallium.dbengine.lucene.searcher;
|
||||
|
||||
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
|
||||
|
||||
import io.netty5.buffer.api.Send;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
|
||||
@ -30,40 +32,31 @@ public class DecimalBucketMultiSearcher {
|
||||
.search(indexSearchers.shards(), bucketParams, queries, normalizationQuery)
|
||||
// Ensure that one result is always returned
|
||||
.single(), indexSearchers -> Mono.fromCallable(() -> {
|
||||
//noinspection BlockingMethodInNonBlockingContext
|
||||
indexSearchers.close();
|
||||
return null;
|
||||
}).subscribeOn(Schedulers.boundedElastic()));
|
||||
}).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))).publishOn(Schedulers.parallel());
|
||||
}
|
||||
|
||||
private Mono<Buckets> search(Iterable<IndexSearcher> indexSearchers,
|
||||
BucketParams bucketParams,
|
||||
@NotNull List<Query> queries,
|
||||
@Nullable Query normalizationQuery) {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
LLUtils.ensureBlocking();
|
||||
return new DecimalBucketMultiCollectorManager(bucketParams.min(),
|
||||
bucketParams.max(),
|
||||
bucketParams.buckets(),
|
||||
bucketParams.bucketFieldName(),
|
||||
bucketParams.valueSource(),
|
||||
queries,
|
||||
normalizationQuery,
|
||||
bucketParams.collectionRate(),
|
||||
bucketParams.sampleSize()
|
||||
);
|
||||
})
|
||||
.flatMap(cmm -> Flux
|
||||
.fromIterable(indexSearchers)
|
||||
.flatMap(shard -> Mono.fromCallable(() -> {
|
||||
LLUtils.ensureBlocking();
|
||||
return cmm.search(shard);
|
||||
}))
|
||||
.collectList()
|
||||
.flatMap(results -> Mono.fromCallable(() -> {
|
||||
LLUtils.ensureBlocking();
|
||||
return cmm.reduce(results);
|
||||
}))
|
||||
);
|
||||
return Mono.defer(() -> {
|
||||
var cmm = new DecimalBucketMultiCollectorManager(bucketParams.min(),
|
||||
bucketParams.max(),
|
||||
bucketParams.buckets(),
|
||||
bucketParams.bucketFieldName(),
|
||||
bucketParams.valueSource(),
|
||||
queries,
|
||||
normalizationQuery,
|
||||
bucketParams.collectionRate(),
|
||||
bucketParams.sampleSize()
|
||||
);
|
||||
return Flux.fromIterable(indexSearchers).flatMap(shard -> Mono.fromCallable(() -> {
|
||||
LLUtils.ensureBlocking();
|
||||
return cmm.search(shard);
|
||||
})).collectList().flatMap(results -> Mono.fromSupplier(() -> cmm.reduce(results)));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -15,9 +15,9 @@ public final class LuceneSearchResult extends SimpleResource {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(LuceneSearchResult.class);
|
||||
|
||||
private TotalHitsCount totalHitsCount;
|
||||
private Flux<LLKeyScore> results;
|
||||
private Runnable onClose;
|
||||
private final TotalHitsCount totalHitsCount;
|
||||
private final Flux<LLKeyScore> results;
|
||||
private final Runnable onClose;
|
||||
|
||||
public LuceneSearchResult(TotalHitsCount totalHitsCount, Flux<LLKeyScore> results, Runnable onClose) {
|
||||
this.totalHitsCount = totalHitsCount;
|
||||
|
@ -1,19 +1,49 @@
|
||||
package it.cavallium.dbengine.utils;
|
||||
|
||||
import it.cavallium.dbengine.MetricUtils;
|
||||
import it.cavallium.dbengine.database.SafeCloseable;
|
||||
import java.lang.ref.Cleaner;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
public abstract class SimpleResource implements SafeCloseable {
|
||||
|
||||
private final AtomicBoolean closed = new AtomicBoolean();
|
||||
protected static final boolean ENABLE_LEAK_DETECTION
|
||||
= Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.leakdetection.enable", "true"));
|
||||
protected static final boolean ADVANCED_LEAK_DETECTION
|
||||
= Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.leakdetection.advanced", "false"));
|
||||
private static final Logger LOG = LogManager.getLogger(SimpleResource.class);
|
||||
public static final Cleaner CLEANER = Cleaner.create();
|
||||
|
||||
private final AtomicBoolean closed;
|
||||
private final boolean canClose;
|
||||
|
||||
public SimpleResource() {
|
||||
canClose = true;
|
||||
this(true);
|
||||
}
|
||||
|
||||
protected SimpleResource(boolean canClose) {
|
||||
this.canClose = canClose;
|
||||
var closed = new AtomicBoolean();
|
||||
this.closed = closed;
|
||||
|
||||
if (ENABLE_LEAK_DETECTION && canClose) {
|
||||
var resourceClass = this.getClass();
|
||||
Exception initializationStackTrace;
|
||||
if (ADVANCED_LEAK_DETECTION) {
|
||||
var stackTrace = Thread.currentThread().getStackTrace();
|
||||
initializationStackTrace = new Exception("Initialization point");
|
||||
initializationStackTrace.setStackTrace(stackTrace);
|
||||
} else {
|
||||
initializationStackTrace = null;
|
||||
}
|
||||
CLEANER.register(this, () -> {
|
||||
if (!closed.get()) {
|
||||
LOG.error("Resource leak of type {}", resourceClass, initializationStackTrace);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
Loading…
x
Reference in New Issue
Block a user