diff --git a/pom.xml b/pom.xml
index 1b99e7d..8e51b87 100644
--- a/pom.xml
+++ b/pom.xml
@@ -90,7 +90,7 @@
io.projectreactor
reactor-bom
- 2020.0.19
+ 2020.0.20
pom
import
@@ -102,7 +102,7 @@
reactor-tools
original
runtime
- 3.4.18
+ 3.4.19
com.google.guava
@@ -157,12 +157,28 @@
junit-jupiter-params
${junit.jupiter.version}
test
+
+
+ org.mockito
+ mockito-core
+
+
org.assertj
assertj-core
3.23.1
test
+
+
+ org.mockito
+ mockito-core
+
+
+ net.bytebuddy
+ byte-buddy
+
+
@@ -579,6 +595,25 @@
+
+ net.bytebuddy
+ byte-buddy-maven-plugin
+ 1.12.13-20220629
+
+
+
+ transform
+
+
+
+
+
+
+ reactor.tools.agent.ReactorDebugByteBuddyPlugin
+
+
+
+
@@ -613,18 +648,6 @@
-
- net.bytebuddy
- byte-buddy-maven-plugin
- 1.12.10
-
-
-
- reactor.tools.agent.ReactorDebugByteBuddyPlugin
-
-
-
-
diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java
index af3bf24..494741d 100644
--- a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java
+++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java
@@ -66,7 +66,7 @@ public interface LuceneIndex extends LLSnapshottable {
boolean isLowMemoryMode();
- Mono close();
+ void close();
Mono flush();
diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java
index bbdc107..5af1eff 100644
--- a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java
+++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java
@@ -109,9 +109,10 @@ public class LuceneIndexImpl implements LuceneIndex {
mltDocumentFields
)
.collectList()
- .flatMap(shards -> mergeResults(queryParams, shards))
+ .mapNotNull(shards -> mergeResults(queryParams, shards))
.map(this::mapResults)
- .single();
+ .defaultIfEmpty(Hits.empty())
+ .doOnDiscard(SimpleResource.class, SimpleResource::close);
}
@Override
@@ -122,11 +123,10 @@ public class LuceneIndexImpl implements LuceneIndex {
indicizer.getKeyFieldName()
)
.collectList()
- .flatMap(shards -> mergeResults(queryParams, shards))
+ .mapNotNull(shards -> mergeResults(queryParams, shards))
.map(this::mapResults)
.defaultIfEmpty(Hits.empty())
- .doOnDiscard(LLSearchResultShard.class, SimpleResource::close)
- .doOnDiscard(Hits.class, SimpleResource::close);
+ .doOnDiscard(SimpleResource.class, SimpleResource::close);
}
@Override
@@ -162,8 +162,8 @@ public class LuceneIndexImpl implements LuceneIndex {
}
@Override
- public Mono close() {
- return luceneIndex.close();
+ public void close() {
+ luceneIndex.close();
}
/**
@@ -202,41 +202,40 @@ public class LuceneIndexImpl implements LuceneIndex {
return luceneIndex.releaseSnapshot(snapshot);
}
- private static Mono mergeResults(ClientQueryParams queryParams,
+ @Nullable
+ private static LLSearchResultShard mergeResults(ClientQueryParams queryParams,
List shards) {
if (shards.size() == 0) {
- return Mono.empty();
+ return null;
} else if (shards.size() == 1) {
- return Mono.just(shards.get(0));
+ return shards.get(0);
}
- return Mono.fromCallable(() -> {
- TotalHitsCount count = null;
- ObjectArrayList> results = new ObjectArrayList<>(shards.size());
- ObjectArrayList resources = new ObjectArrayList<>(shards.size());
- for (LLSearchResultShard shard : shards) {
- if (count == null) {
- count = shard.totalHitsCount();
- } else {
- count = LuceneUtils.sum(count, shard.totalHitsCount());
- }
- var maxLimit = queryParams.offset() + queryParams.limit();
- results.add(shard.results().take(maxLimit, true));
- resources.add(shard);
- }
- Objects.requireNonNull(count);
- Flux resultsFlux;
- if (results.size() == 0) {
- resultsFlux = Flux.empty();
- } else if (results.size() == 1) {
- resultsFlux = results.get(0);
+ TotalHitsCount count = null;
+ ObjectArrayList> results = new ObjectArrayList<>(shards.size());
+ ObjectArrayList resources = new ObjectArrayList<>(shards.size());
+ for (LLSearchResultShard shard : shards) {
+ if (count == null) {
+ count = shard.totalHitsCount();
} else {
- resultsFlux = Flux.merge(results);
+ count = LuceneUtils.sum(count, shard.totalHitsCount());
+ }
+ var maxLimit = queryParams.offset() + queryParams.limit();
+ results.add(shard.results().take(maxLimit, true));
+ resources.add(shard);
+ }
+ Objects.requireNonNull(count);
+ Flux resultsFlux;
+ if (results.size() == 0) {
+ resultsFlux = Flux.empty();
+ } else if (results.size() == 1) {
+ resultsFlux = results.get(0);
+ } else {
+ resultsFlux = Flux.merge(results);
+ }
+ return new LLSearchResultShard(resultsFlux, count, () -> {
+ for (var resource : resources) {
+ resource.close();
}
- return new LLSearchResultShard(resultsFlux, count, () -> {
- for (var resource : resources) {
- resource.close();
- }
- });
});
}
}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLEntry.java b/src/main/java/it/cavallium/dbengine/database/LLEntry.java
index a457f43..7f66eba 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLEntry.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLEntry.java
@@ -6,6 +6,7 @@ import io.netty5.buffer.api.Owned;
import io.netty5.buffer.api.Resource;
import io.netty5.buffer.api.Send;
import io.netty5.buffer.api.internal.ResourceSupport;
+import it.cavallium.dbengine.utils.SimpleResource;
import java.util.Objects;
import java.util.StringJoiner;
import org.apache.logging.log4j.LogManager;
@@ -13,7 +14,7 @@ import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-public class LLEntry implements SafeCloseable {
+public class LLEntry extends SimpleResource implements SafeCloseable {
private static final Logger logger = LogManager.getLogger(LLEntry.class);
private Buffer key;
@@ -91,7 +92,7 @@ public class LLEntry implements SafeCloseable {
}
@Override
- public void close() {
+ protected void onClose() {
try {
if (key != null && key.isAccessible()) {
key.close();
diff --git a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java
index 955798e..02f148a 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java
@@ -15,7 +15,7 @@ import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-public interface LLLuceneIndex extends LLSnapshottable {
+public interface LLLuceneIndex extends LLSnapshottable, SafeCloseable {
String getLuceneIndexName();
@@ -74,8 +74,6 @@ public interface LLLuceneIndex extends LLSnapshottable {
boolean isLowMemoryMode();
- Mono close();
-
/**
* Flush writes to disk.
* This does not commit, it syncs the data to the disk
diff --git a/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java
index c6814dc..33d8b8e 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java
@@ -193,9 +193,9 @@ public class LLMultiLuceneIndex implements LLLuceneIndex {
}
@Override
- public Mono close() {
- Iterable> it = () -> luceneIndicesSet.stream().map(LLLuceneIndex::close).iterator();
- return Mono.whenDelayError(it);
+ public void close() {
+ Iterable> it = () -> luceneIndicesSet.stream().map(e -> Mono.fromRunnable(e::close)).iterator();
+ Mono.whenDelayError(it).block();
}
@Override
diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java
index e70ef72..a139826 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java
@@ -25,6 +25,7 @@ import it.cavallium.dbengine.lucene.RandomSortField;
import it.cavallium.dbengine.utils.SimpleResource;
import java.io.Closeable;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
@@ -947,13 +948,13 @@ public class LLUtils {
} else if (next instanceof LLIndexSearcher searcher) {
try {
searcher.close();
- } catch (IOException e) {
+ } catch (UncheckedIOException e) {
logger.error("Failed to close searcher {}", searcher, e);
}
} else if (next instanceof LLIndexSearchers searchers) {
try {
searchers.close();
- } catch (IOException e) {
+ } catch (UncheckedIOException e) {
logger.error("Failed to close searchers {}", searchers, e);
}
} else if (next instanceof Optional> optional) {
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java
index 4790caf..d1a4e4e 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java
@@ -9,6 +9,7 @@ import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.utils.SimpleResource;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.lang.ref.Cleaner;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
@@ -28,6 +29,7 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import it.cavallium.dbengine.utils.ShortNamedThreadFactory;
+import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Empty;
@@ -35,7 +37,7 @@ import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
// todo: deduplicate code between Cached and Simple searcher managers
-public class CachedIndexSearcherManager implements IndexSearcherManager {
+public class CachedIndexSearcherManager extends SimpleResource implements IndexSearcherManager {
private static final Logger LOG = LogManager.getLogger(SimpleIndexSearcherManager.class);
private static final ExecutorService SEARCH_EXECUTOR = Executors.newFixedThreadPool(
@@ -57,10 +59,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
private final LoadingCache> cachedSnapshotSearchers;
private final Mono cachedMainSearcher;
-
- private final AtomicBoolean closeRequested = new AtomicBoolean();
- private final Empty closeRequestedMono = Sinks.empty();
- private final Mono closeMono;
+ private final Disposable refreshSubscription;
public CachedIndexSearcherManager(IndexWriter indexWriter,
@Nullable SnapshotsManager snapshotsManager,
@@ -76,8 +75,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
this.searcherManager = new SearcherManager(indexWriter, applyAllDeletes, writeAllDeletes, SEARCHER_FACTORY);
- Empty refresherClosed = Sinks.empty();
- var refreshSubscription = Mono
+ this.refreshSubscription = Mono
.fromRunnable(() -> {
try {
maybeRefresh();
@@ -85,11 +83,9 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
LOG.error("Failed to refresh the searcher manager", ex);
}
})
- .subscribeOn(luceneHeavyTasksScheduler)
+ .subscribeOn(uninterruptibleScheduler(luceneHeavyTasksScheduler))
.publishOn(Schedulers.parallel())
.repeatWhen(s -> s.delayElements(queryRefreshDebounceTime))
- .takeUntilOther(closeRequestedMono.asMono())
- .doAfterTerminate(refresherClosed::tryEmitEmpty)
.transform(LLUtils::handleDiscard)
.subscribe();
@@ -104,91 +100,52 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
}
});
this.cachedMainSearcher = this.generateCachedSearcher(null);
-
- this.closeMono = Mono
- .fromRunnable(() -> {
- LOG.debug("Closing IndexSearcherManager...");
- this.closeRequested.set(true);
- this.closeRequestedMono.tryEmitEmpty();
- refreshSubscription.dispose();
- })
- .then(refresherClosed.asMono())
- .then(Mono.fromRunnable(() -> {
- LOG.debug("Closed IndexSearcherManager");
- LOG.debug("Closing refreshes...");
- long initTime = System.nanoTime();
- while (activeRefreshes.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
- LockSupport.parkNanos(50000000);
- }
- LOG.debug("Closed refreshes...");
- LOG.debug("Closing active searchers...");
- initTime = System.nanoTime();
- while (activeSearchers.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
- LockSupport.parkNanos(50000000);
- }
- LOG.debug("Closed active searchers");
- LOG.debug("Stopping searcher executor...");
- cachedSnapshotSearchers.invalidateAll();
- cachedSnapshotSearchers.cleanUp();
- SEARCH_EXECUTOR.shutdown();
- try {
- //noinspection BlockingMethodInNonBlockingContext
- if (!SEARCH_EXECUTOR.awaitTermination(15, TimeUnit.SECONDS)) {
- SEARCH_EXECUTOR.shutdownNow();
- }
- } catch (InterruptedException e) {
- LOG.error("Failed to stop executor", e);
- }
- LOG.debug("Stopped searcher executor");
- }).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())))
- .publishOn(Schedulers.parallel())
- .cache();
}
private Mono generateCachedSearcher(@Nullable LLSnapshot snapshot) {
return Mono.fromCallable(() -> {
- if (closeRequested.get()) {
- return null;
- }
- activeSearchers.incrementAndGet();
- try {
- IndexSearcher indexSearcher;
- boolean fromSnapshot;
- if (snapshotsManager == null || snapshot == null) {
- indexSearcher = searcherManager.acquire();
- fromSnapshot = false;
- } else {
- //noinspection resource
- indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher(SEARCH_EXECUTOR);
- fromSnapshot = true;
- }
- indexSearcher.setSimilarity(similarity);
- assert indexSearcher.getIndexReader().getRefCount() > 0;
- var closed = new AtomicBoolean();
- LLIndexSearcher llIndexSearcher;
- if (fromSnapshot) {
- llIndexSearcher = new SnapshotIndexSearcher(indexSearcher, closed);
- } else {
- llIndexSearcher = new MainIndexSearcher(indexSearcher, closed);
- }
- SimpleResource.CLEANER.register(llIndexSearcher, () -> {
- if (closed.compareAndSet(false, true)) {
- LOG.warn("An index searcher was not closed!");
- if (!fromSnapshot) {
- try {
- searcherManager.release(indexSearcher);
- } catch (IOException e) {
- LOG.error("Failed to release the index searcher", e);
- }
- }
+ if (isClosed()) {
+ return null;
}
- });
- return llIndexSearcher;
- } catch (Throwable ex) {
- activeSearchers.decrementAndGet();
- throw ex;
- }
- })
+ activeSearchers.incrementAndGet();
+ try {
+ IndexSearcher indexSearcher;
+ boolean fromSnapshot;
+ if (snapshotsManager == null || snapshot == null) {
+ indexSearcher = searcherManager.acquire();
+ fromSnapshot = false;
+ } else {
+ //noinspection resource
+ indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher(SEARCH_EXECUTOR);
+ fromSnapshot = true;
+ }
+ indexSearcher.setSimilarity(similarity);
+ assert indexSearcher.getIndexReader().getRefCount() > 0;
+ var closed = new AtomicBoolean();
+ LLIndexSearcher llIndexSearcher;
+ if (fromSnapshot) {
+ llIndexSearcher = new SnapshotIndexSearcher(indexSearcher, closed);
+ } else {
+ llIndexSearcher = new MainIndexSearcher(indexSearcher, closed);
+ }
+ SimpleResource.CLEANER.register(llIndexSearcher, () -> {
+ if (closed.compareAndSet(false, true)) {
+ LOG.warn("An index searcher was not closed!");
+ if (!fromSnapshot) {
+ try {
+ searcherManager.release(indexSearcher);
+ } catch (IOException e) {
+ LOG.error("Failed to release the index searcher", e);
+ }
+ }
+ }
+ });
+ return llIndexSearcher;
+ } catch (Throwable ex) {
+ activeSearchers.decrementAndGet();
+ throw ex;
+ }
+ })
.subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.publishOn(Schedulers.parallel());
}
@@ -232,8 +189,34 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
}
@Override
- public Mono close() {
- return closeMono;
+ protected void onClose() {
+ LOG.debug("Closing IndexSearcherManager...");
+ refreshSubscription.dispose();
+ LOG.debug("Closed IndexSearcherManager");
+ LOG.debug("Closing refreshes...");
+ long initTime = System.nanoTime();
+ while (activeRefreshes.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
+ LockSupport.parkNanos(50000000);
+ }
+ LOG.debug("Closed refreshes...");
+ LOG.debug("Closing active searchers...");
+ initTime = System.nanoTime();
+ while (activeSearchers.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
+ LockSupport.parkNanos(50000000);
+ }
+ LOG.debug("Closed active searchers");
+ LOG.debug("Stopping searcher executor...");
+ cachedSnapshotSearchers.invalidateAll();
+ cachedSnapshotSearchers.cleanUp();
+ SEARCH_EXECUTOR.shutdown();
+ try {
+ if (!SEARCH_EXECUTOR.awaitTermination(15, TimeUnit.SECONDS)) {
+ SEARCH_EXECUTOR.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Failed to stop executor", e);
+ }
+ LOG.debug("Stopped searcher executor");
}
public long getActiveSearchers() {
@@ -251,10 +234,12 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
}
@Override
- public void onClose() throws IOException {
+ public void onClose() {
dropCachedIndexSearcher();
- if (getClosed().compareAndSet(false, true)) {
+ try {
searcherManager.release(indexSearcher);
+ } catch (IOException ex) {
+ throw new UncheckedIOException(ex);
}
}
}
@@ -267,7 +252,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
}
@Override
- public void onClose() throws IOException {
+ public void onClose() {
dropCachedIndexSearcher();
}
}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/HugePqEnv.java b/src/main/java/it/cavallium/dbengine/database/disk/HugePqEnv.java
index 0d91d5f..eae59b3 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/HugePqEnv.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/HugePqEnv.java
@@ -5,10 +5,12 @@ import static it.cavallium.dbengine.database.disk.LLTempHugePqEnv.getColumnOptio
import com.google.common.primitives.Ints;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.netty5.buffer.api.BufferAllocator;
+import it.cavallium.dbengine.utils.SimpleResource;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.io.Closeable;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;
@@ -20,7 +22,7 @@ import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
-public class HugePqEnv implements Closeable {
+public class HugePqEnv extends SimpleResource {
private final RocksDB db;
private final ArrayList defaultCfh;
@@ -32,14 +34,14 @@ public class HugePqEnv implements Closeable {
}
@Override
- public void close() throws IOException {
+ protected void onClose() {
for (var cfh : defaultCfh) {
db.destroyColumnFamilyHandle(cfh);
}
try {
db.closeE();
} catch (RocksDBException e) {
- throw new IOException(e);
+ throw new IllegalStateException(e);
}
}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/IndexSearcherManager.java b/src/main/java/it/cavallium/dbengine/database/disk/IndexSearcherManager.java
index c5973b3..ac17d74 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/IndexSearcherManager.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/IndexSearcherManager.java
@@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.disk;
import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.LLSnapshot;
+import it.cavallium.dbengine.database.SafeCloseable;
import java.io.IOException;
import java.util.function.Function;
import org.apache.lucene.search.IndexSearcher;
@@ -9,13 +10,11 @@ import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-public interface IndexSearcherManager {
+public interface IndexSearcherManager extends SafeCloseable {
void maybeRefreshBlocking() throws IOException;
void maybeRefresh() throws IOException;
Mono retrieveSearcher(@Nullable LLSnapshot snapshot);
-
- Mono close();
}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcher.java b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcher.java
index 707638f..ca2b816 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcher.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcher.java
@@ -4,6 +4,7 @@ import io.netty5.buffer.api.Drop;
import io.netty5.buffer.api.Owned;
import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.database.SafeCloseable;
+import it.cavallium.dbengine.utils.SimpleResource;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -12,43 +13,33 @@ import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.IndexSearcher;
-public abstract class LLIndexSearcher implements Closeable {
+public abstract class LLIndexSearcher extends SimpleResource {
protected static final Logger LOG = LogManager.getLogger(LLIndexSearcher.class);
protected final IndexSearcher indexSearcher;
- private final AtomicBoolean closed;
public LLIndexSearcher(IndexSearcher indexSearcher) {
+ super();
this.indexSearcher = indexSearcher;
- this.closed = new AtomicBoolean();
}
public LLIndexSearcher(IndexSearcher indexSearcher, AtomicBoolean closed) {
+ super(closed);
this.indexSearcher = indexSearcher;
- this.closed = closed;
}
public IndexReader getIndexReader() {
- if (closed.get()) throw new IllegalStateException("Closed");
+ ensureOpen();
return indexSearcher.getIndexReader();
}
public IndexSearcher getIndexSearcher() {
- if (closed.get()) throw new IllegalStateException("Closed");
+ ensureOpen();
return indexSearcher;
}
public AtomicBoolean getClosed() {
- return closed;
+ return super.getClosed();
}
-
- @Override
- public final void close() throws IOException {
- if (closed.compareAndSet(false, true)) {
- onClose();
- }
- }
-
- protected abstract void onClose() throws IOException;
}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearchers.java b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearchers.java
index 5fb6c72..19d36a8 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearchers.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearchers.java
@@ -5,7 +5,9 @@ import io.netty5.buffer.api.Owned;
import io.netty5.buffer.api.Resource;
import io.netty5.buffer.api.Send;
import io.netty5.buffer.api.internal.ResourceSupport;
+import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.lucene.searcher.ShardIndexSearcher;
+import it.cavallium.dbengine.utils.SimpleResource;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import java.io.Closeable;
import java.io.IOException;
@@ -21,7 +23,7 @@ import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.MultiReader;
import org.apache.lucene.search.IndexSearcher;
-public interface LLIndexSearchers extends Closeable {
+public interface LLIndexSearchers extends SafeCloseable {
static LLIndexSearchers of(List indexSearchers) {
return new ShardedIndexSearchers(indexSearchers);
@@ -41,7 +43,7 @@ public interface LLIndexSearchers extends Closeable {
IndexReader allShards();
- class UnshardedIndexSearchers implements LLIndexSearchers {
+ class UnshardedIndexSearchers extends SimpleResource implements LLIndexSearchers {
private final LLIndexSearcher indexSearcher;
@@ -89,12 +91,12 @@ public interface LLIndexSearchers extends Closeable {
}
@Override
- public void close() throws IOException {
+ protected void onClose() {
indexSearcher.close();
}
}
- class ShardedIndexSearchers implements LLIndexSearchers {
+ class ShardedIndexSearchers extends SimpleResource implements LLIndexSearchers {
private final List indexSearchers;
private final List indexSearchersVals;
@@ -160,7 +162,7 @@ public interface LLIndexSearchers extends Closeable {
}
@Override
- public void close() throws IOException {
+ protected void onClose() {
for (LLIndexSearcher indexSearcher : indexSearchers) {
indexSearcher.close();
}
@@ -186,8 +188,8 @@ public interface LLIndexSearchers extends Closeable {
}
@Override
- protected void onClose() throws IOException {
- parent.onClose();
+ protected void onClose() {
+ parent.close();
}
}
}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java
index 38a4d45..0efbd09 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java
@@ -40,7 +40,9 @@ import it.cavallium.dbengine.lucene.searcher.LocalSearcher;
import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers;
import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities;
import it.cavallium.dbengine.rpc.current.data.LuceneOptions;
+import it.cavallium.dbengine.utils.SimpleResource;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
@@ -76,7 +78,7 @@ import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
-public class LLLocalLuceneIndex implements LLLuceneIndex {
+public class LLLocalLuceneIndex extends SimpleResource implements LLLuceneIndex {
protected static final Logger logger = LogManager.getLogger(LLLocalLuceneIndex.class);
@@ -135,7 +137,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private final boolean lowMemory;
private final Phaser activeTasks = new Phaser(1);
- private final AtomicBoolean closeRequested = new AtomicBoolean();
public LLLocalLuceneIndex(LLTempHugePqEnv env,
MeterRegistry meterRegistry,
@@ -304,7 +305,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private Mono ensureOpen(Mono mono) {
return Mono.fromCallable(() -> {
- if (closeRequested.get()) {
+ if (isClosed()) {
throw new IllegalStateException("Lucene index is closed");
} else {
return null;
@@ -538,39 +539,24 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}
@Override
- public Mono close() {
- return Mono
- .fromCallable(() -> {
- logger.debug("Waiting IndexWriter tasks...");
- activeTasks.arriveAndAwaitAdvance();
- logger.debug("IndexWriter tasks ended");
- return null;
- })
- .subscribeOn(luceneHeavyTasksScheduler)
- .then(searcherManager.close())
- .then(Mono.fromCallable(() -> {
- shutdownLock.lock();
- try {
- logger.debug("Closing IndexWriter...");
- indexWriter.close();
- directory.close();
- logger.debug("IndexWriter closed");
- } finally {
- shutdownLock.unlock();
- }
- return null;
- }).subscribeOn(luceneHeavyTasksScheduler))
-
- // Avoid closing multiple times
- .transformDeferred(mono -> {
- if (this.closeRequested.compareAndSet(false, true)) {
- logger.trace("Set closeRequested to true. Further update/write calls will result in an error");
- return mono;
- } else {
- logger.debug("Tried to close more than once");
- return Mono.empty();
- }
- });
+ protected void onClose() {
+ logger.debug("Waiting IndexWriter tasks...");
+ activeTasks.arriveAndAwaitAdvance();
+ logger.debug("IndexWriter tasks ended");
+ shutdownLock.lock();
+ try {
+ logger.debug("Closing searcher manager...");
+ searcherManager.close();
+ logger.debug("Searcher manager closed");
+ logger.debug("Closing IndexWriter...");
+ indexWriter.close();
+ directory.close();
+ logger.debug("IndexWriter closed");
+ } catch (IOException ex) {
+ throw new UncheckedIOException(ex);
+ } finally {
+ shutdownLock.unlock();
+ }
}
@Override
@@ -580,7 +566,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
if (activeTasks.isTerminated()) return null;
shutdownLock.lock();
try {
- if (closeRequested.get()) {
+ if (isClosed()) {
return null;
}
flushTime.recordCallable(() -> {
@@ -603,7 +589,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
if (activeTasks.isTerminated()) return null;
shutdownLock.lock();
try {
- if (closeRequested.get()) {
+ if (isClosed()) {
return null;
}
var mergeScheduler = indexWriter.getConfig().getMergeScheduler();
@@ -626,7 +612,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
if (activeTasks.isTerminated()) return null;
shutdownLock.lock();
try {
- if (closeRequested.get()) {
+ if (isClosed()) {
return null;
}
indexWriter.getConfig().setMergePolicy(NoMergePolicy.INSTANCE);
@@ -653,7 +639,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
if (activeTasks.isTerminated()) return null;
shutdownLock.lock();
try {
- if (closeRequested.get()) {
+ if (isClosed()) {
return null;
}
refreshTime.recordCallable(() -> {
@@ -681,7 +667,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
public void scheduledCommit() {
shutdownLock.lock();
try {
- if (closeRequested.get()) {
+ if (isClosed()) {
return;
}
commitTime.recordCallable(() -> {
@@ -702,7 +688,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
public void scheduledMerge() { // Do not use. Merges are done automatically by merge policies
shutdownLock.lock();
try {
- if (closeRequested.get()) {
+ if (isClosed()) {
return;
}
mergeTime.recordCallable(() -> {
@@ -724,7 +710,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private double getSnapshotsCount() {
shutdownLock.lock();
try {
- if (closeRequested.get()) {
+ if (isClosed()) {
return 0d;
}
if (snapshotsManager == null) return 0d;
@@ -737,7 +723,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private double getIndexWriterFlushingBytes() {
shutdownLock.lock();
try {
- if (closeRequested.get()) {
+ if (isClosed()) {
return 0d;
}
return indexWriter.getFlushingBytes();
@@ -749,7 +735,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private double getIndexWriterMaxCompletedSequenceNumber() {
shutdownLock.lock();
try {
- if (closeRequested.get()) {
+ if (isClosed()) {
return 0d;
}
return indexWriter.getMaxCompletedSequenceNumber();
@@ -761,7 +747,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private double getIndexWriterPendingNumDocs() {
shutdownLock.lock();
try {
- if (closeRequested.get()) {
+ if (isClosed()) {
return 0d;
}
return indexWriter.getPendingNumDocs();
@@ -773,7 +759,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private double getIndexWriterMergingSegmentsSize() {
shutdownLock.lock();
try {
- if (closeRequested.get()) {
+ if (isClosed()) {
return 0d;
}
return indexWriter.getMergingSegments().size();
@@ -785,7 +771,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private double getDirectoryPendingDeletionsCount() {
shutdownLock.lock();
try {
- if (closeRequested.get()) {
+ if (isClosed()) {
return 0d;
}
return indexWriter.getDirectory().getPendingDeletions().size();
@@ -799,7 +785,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private double getDocCount() {
shutdownLock.lock();
try {
- if (closeRequested.get()) {
+ if (isClosed()) {
return 0d;
}
var docStats = indexWriter.getDocStats();
@@ -816,7 +802,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private double getMaxDoc() {
shutdownLock.lock();
try {
- if (closeRequested.get()) {
+ if (isClosed()) {
return 0d;
}
var docStats = indexWriter.getDocStats();
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java
index b2cddf0..09ede71 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java
@@ -29,9 +29,11 @@ import it.cavallium.dbengine.lucene.searcher.MultiSearcher;
import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers;
import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities;
import it.cavallium.dbengine.rpc.current.data.LuceneOptions;
+import it.cavallium.dbengine.utils.SimpleResource;
import it.unimi.dsi.fastutil.ints.IntList;
import java.io.Closeable;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -54,7 +56,7 @@ import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Schedulers;
-public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
+public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneIndex {
private static final Logger LOG = LogManager.getLogger(LLLuceneIndex.class);
private static final boolean BYPASS_GROUPBY_BUG = Boolean.parseBoolean(System.getProperty(
@@ -157,7 +159,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
.doOnDiscard(LLIndexSearcher.class, indexSearcher -> {
try {
indexSearcher.close();
- } catch (IOException ex) {
+ } catch (UncheckedIOException ex) {
LOG.error("Failed to close an index searcher", ex);
}
})
@@ -314,10 +316,17 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
}
@Override
- public Mono close() {
- Iterable> it = () -> luceneIndicesSet.stream().map(LLLocalLuceneIndex::close).iterator();
+ protected void onClose() {
+ Iterable> it = () -> luceneIndicesSet
+ .stream()
+ .map(part -> Mono
+ .fromRunnable(part::close)
+ .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
+ .publishOn(Schedulers.parallel())
+ )
+ .iterator();
var indicesCloseMono = Mono.whenDelayError(it);
- return indicesCloseMono
+ indicesCloseMono
.then(Mono.fromCallable(() -> {
if (multiSearcher instanceof Closeable closeable) {
//noinspection BlockingMethodInNonBlockingContext
@@ -326,7 +335,8 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
return null;
}).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())))
.publishOn(Schedulers.parallel())
- .then();
+ .then()
+ .block();
}
@Override
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LuceneIndexSnapshot.java b/src/main/java/it/cavallium/dbengine/database/disk/LuceneIndexSnapshot.java
index e52daa3..a160425 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LuceneIndexSnapshot.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LuceneIndexSnapshot.java
@@ -1,7 +1,9 @@
package it.cavallium.dbengine.database.disk;
+import it.cavallium.dbengine.utils.SimpleResource;
import java.io.Closeable;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import org.apache.lucene.index.DirectoryReader;
@@ -9,7 +11,7 @@ import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.search.IndexSearcher;
import org.jetbrains.annotations.Nullable;
-public class LuceneIndexSnapshot implements Closeable {
+public class LuceneIndexSnapshot extends SimpleResource {
private final IndexCommit snapshot;
private boolean initialized;
@@ -57,11 +59,16 @@ public class LuceneIndexSnapshot implements Closeable {
}
}
- public synchronized void close() throws IOException {
+ @Override
+ protected synchronized void onClose() {
closed = true;
if (initialized && !failed) {
- indexReader.close();
+ try {
+ indexReader.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
indexSearcher = null;
}
}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/SimpleIndexSearcherManager.java b/src/main/java/it/cavallium/dbengine/database/disk/SimpleIndexSearcherManager.java
index 187239a..c8fe28d 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/SimpleIndexSearcherManager.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/SimpleIndexSearcherManager.java
@@ -9,6 +9,7 @@ import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.utils.SimpleResource;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.lang.ref.Cleaner;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
@@ -28,6 +29,7 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import it.cavallium.dbengine.utils.ShortNamedThreadFactory;
+import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Empty;
@@ -35,7 +37,7 @@ import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
// todo: deduplicate code between Cached and Simple searcher managers
-public class SimpleIndexSearcherManager implements IndexSearcherManager {
+public class SimpleIndexSearcherManager extends SimpleResource implements IndexSearcherManager {
private static final Logger LOG = LogManager.getLogger(SimpleIndexSearcherManager.class);
private static final ExecutorService SEARCH_EXECUTOR = Executors.newFixedThreadPool(
@@ -56,10 +58,7 @@ public class SimpleIndexSearcherManager implements IndexSearcherManager {
private final AtomicLong activeSearchers = new AtomicLong(0);
private final AtomicLong activeRefreshes = new AtomicLong(0);
-
- private final AtomicBoolean closeRequested = new AtomicBoolean();
- private final Empty closeRequestedMono = Sinks.empty();
- private final Mono closeMono;
+ private final Disposable refreshSubscription;
public SimpleIndexSearcherManager(IndexWriter indexWriter,
@Nullable SnapshotsManager snapshotsManager,
@@ -75,8 +74,7 @@ public class SimpleIndexSearcherManager implements IndexSearcherManager {
this.searcherManager = new SearcherManager(indexWriter, applyAllDeletes, writeAllDeletes, SEARCHER_FACTORY);
- Empty refresherClosed = Sinks.empty();
- var refreshSubscription = Mono
+ refreshSubscription = Mono
.fromRunnable(() -> {
try {
maybeRefresh();
@@ -87,47 +85,9 @@ public class SimpleIndexSearcherManager implements IndexSearcherManager {
.subscribeOn(luceneHeavyTasksScheduler)
.publishOn(Schedulers.parallel())
.repeatWhen(s -> s.delayElements(queryRefreshDebounceTime))
- .takeUntilOther(closeRequestedMono.asMono())
- .doAfterTerminate(refresherClosed::tryEmitEmpty)
.transform(LLUtils::handleDiscard)
.subscribe();
- this.closeMono = Mono
- .fromRunnable(() -> {
- LOG.debug("Closing IndexSearcherManager...");
- this.closeRequested.set(true);
- this.closeRequestedMono.tryEmitEmpty();
- refreshSubscription.dispose();
- })
- .then(refresherClosed.asMono())
- .then(Mono.fromRunnable(() -> {
- LOG.debug("Closed IndexSearcherManager");
- LOG.debug("Closing refreshes...");
- long initTime = System.nanoTime();
- while (activeRefreshes.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
- LockSupport.parkNanos(50000000);
- }
- LOG.debug("Closed refreshes...");
- LOG.debug("Closing active searchers...");
- initTime = System.nanoTime();
- while (activeSearchers.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
- LockSupport.parkNanos(50000000);
- }
- LOG.debug("Closed active searchers");
- LOG.debug("Stopping searcher executor...");
- SEARCH_EXECUTOR.shutdown();
- try {
- //noinspection BlockingMethodInNonBlockingContext
- if (!SEARCH_EXECUTOR.awaitTermination(15, TimeUnit.SECONDS)) {
- SEARCH_EXECUTOR.shutdownNow();
- }
- } catch (InterruptedException e) {
- LOG.error("Failed to stop executor", e);
- }
- LOG.debug("Stopped searcher executor");
- }).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())))
- .publishOn(Schedulers.parallel())
- .cache();
this.noSnapshotSearcherMono = retrieveSearcherInternal(null);
}
@@ -171,7 +131,7 @@ public class SimpleIndexSearcherManager implements IndexSearcherManager {
private Mono retrieveSearcherInternal(@Nullable LLSnapshot snapshot) {
return Mono.fromCallable(() -> {
- if (closeRequested.get()) {
+ if (isClosed()) {
return null;
}
activeSearchers.incrementAndGet();
@@ -218,8 +178,32 @@ public class SimpleIndexSearcherManager implements IndexSearcherManager {
}
@Override
- public Mono close() {
- return closeMono;
+ protected void onClose() {
+ LOG.debug("Closing IndexSearcherManager...");
+ refreshSubscription.dispose();
+ LOG.debug("Closed IndexSearcherManager");
+ LOG.debug("Closing refreshes...");
+ long initTime = System.nanoTime();
+ while (activeRefreshes.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
+ LockSupport.parkNanos(50000000);
+ }
+ LOG.debug("Closed refreshes...");
+ LOG.debug("Closing active searchers...");
+ initTime = System.nanoTime();
+ while (activeSearchers.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
+ LockSupport.parkNanos(50000000);
+ }
+ LOG.debug("Closed active searchers");
+ LOG.debug("Stopping searcher executor...");
+ SEARCH_EXECUTOR.shutdown();
+ try {
+ if (!SEARCH_EXECUTOR.awaitTermination(15, TimeUnit.SECONDS)) {
+ SEARCH_EXECUTOR.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Failed to stop executor", e);
+ }
+ LOG.debug("Stopped searcher executor");
}
public long getActiveSearchers() {
@@ -237,10 +221,12 @@ public class SimpleIndexSearcherManager implements IndexSearcherManager {
}
@Override
- public void onClose() throws IOException {
+ public void onClose() {
dropCachedIndexSearcher();
- if (getClosed().compareAndSet(false, true)) {
+ try {
searcherManager.release(indexSearcher);
+ } catch (IOException ex) {
+ throw new UncheckedIOException(ex);
}
}
}
@@ -253,7 +239,7 @@ public class SimpleIndexSearcherManager implements IndexSearcherManager {
}
@Override
- public void onClose() throws IOException {
+ public void onClose() {
dropCachedIndexSearcher();
}
}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java b/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java
index 2526748..cc0b031 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java
@@ -3,7 +3,9 @@ package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import it.cavallium.dbengine.database.LLSnapshot;
+import it.cavallium.dbengine.utils.SimpleResource;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@@ -16,7 +18,7 @@ import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
-public class SnapshotsManager {
+public class SnapshotsManager extends SimpleResource {
private final IndexWriter indexWriter;
private final SnapshotDeletionPolicy snapshotter;
@@ -70,7 +72,7 @@ public class SnapshotsManager {
if (prevSnapshot != null) {
try {
prevSnapshot.close();
- } catch (IOException e) {
+ } catch (UncheckedIOException e) {
throw new IllegalStateException("Can't close snapshot", e);
}
}
@@ -105,7 +107,8 @@ public class SnapshotsManager {
return Math.max(snapshots.size(), snapshotter.getSnapshotCount());
}
- public void close() {
+ @Override
+ protected void onClose() {
if (!activeTasks.isTerminated()) {
activeTasks.arriveAndAwaitAdvance();
}
diff --git a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java
index 8a8a869..2d56f94 100644
--- a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java
+++ b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java
@@ -509,8 +509,8 @@ public class LLQuicConnection implements LLDatabaseConnection {
}
@Override
- public Mono close() {
- return sendRequest(new CloseLuceneIndex(id)).then();
+ public void close() {
+ sendRequest(new CloseLuceneIndex(id)).then().block();
}
@Override
diff --git a/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java b/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java
index 13af4b7..164855a 100644
--- a/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java
+++ b/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java
@@ -4,6 +4,7 @@ import static it.cavallium.dbengine.lucene.LLDocElementScoreComparator.SCORE_DOC
import static org.apache.lucene.search.TotalHits.Relation.*;
import it.cavallium.dbengine.lucene.collector.FullFieldDocs;
+import it.cavallium.dbengine.utils.SimpleResource;
import java.io.IOException;
import java.util.Comparator;
import org.apache.lucene.search.FieldComparator;
@@ -31,27 +32,7 @@ public interface FullDocs extends ResourceIterable {
static FullDocs merge(@Nullable Sort sort, FullDocs[] fullDocs) {
ResourceIterable mergedIterable = mergeResourceIterable(sort, fullDocs);
TotalHits mergedTotalHits = mergeTotalHits(fullDocs);
- FullDocs docs = new FullDocs<>() {
- @Override
- public void close() {
- mergedIterable.close();
- }
-
- @Override
- public Flux iterate() {
- return mergedIterable.iterate();
- }
-
- @Override
- public Flux iterate(long skips) {
- return mergedIterable.iterate(skips);
- }
-
- @Override
- public TotalHits totalHits() {
- return mergedTotalHits;
- }
- };
+ FullDocs docs = new MergedFullDocs<>(mergedIterable, mergedTotalHits);
if (sort != null) {
return new FullFieldDocs<>(docs, sort.getSort());
} else {
@@ -76,88 +57,7 @@ public interface FullDocs extends ResourceIterable {
static ResourceIterable mergeResourceIterable(
@Nullable Sort sort,
FullDocs[] fullDocs) {
- return new ResourceIterable<>() {
- @Override
- public void close() {
- for (FullDocs fullDoc : fullDocs) {
- fullDoc.close();
- }
- }
-
- @Override
- public Flux iterate() {
- @SuppressWarnings("unchecked") Flux[] iterables = new Flux[fullDocs.length];
-
- for (int i = 0; i < fullDocs.length; i++) {
- var singleFullDocs = fullDocs[i].iterate();
- iterables[i] = singleFullDocs;
- }
-
- Comparator comp;
- if (sort == null) {
- // Merge maintaining sorting order (Algorithm taken from TopDocs.ScoreMergeSortQueue)
-
- comp = SCORE_DOC_SCORE_ELEM_COMPARATOR.thenComparing(DEFAULT_TIE_BREAKER);
- } else {
- // Merge maintaining sorting order (Algorithm taken from TopDocs.MergeSortQueue)
-
- SortField[] sortFields = sort.getSort();
- var comparators = new FieldComparator[sortFields.length];
- var reverseMul = new int[sortFields.length];
-
- for (int compIDX = 0; compIDX < sortFields.length; ++compIDX) {
- SortField sortField = sortFields[compIDX];
- comparators[compIDX] = sortField.getComparator(1, compIDX == 0);
- reverseMul[compIDX] = sortField.getReverse() ? -1 : 1;
- }
-
- comp = (first, second) -> {
- assert first != second;
-
- LLFieldDoc firstFD = (LLFieldDoc) first;
- LLFieldDoc secondFD = (LLFieldDoc) second;
-
- for (int compIDX = 0; compIDX < comparators.length; ++compIDX) {
- //noinspection rawtypes
- FieldComparator fieldComp = comparators[compIDX];
- //noinspection unchecked
- int cmp = reverseMul[compIDX] * fieldComp.compareValues(firstFD.fields().get(compIDX),
- secondFD.fields().get(compIDX)
- );
- if (cmp != 0) {
- return cmp;
- }
- }
-
- return tieBreakCompare(first, second, DEFAULT_TIE_BREAKER);
- };
- }
-
- @SuppressWarnings("unchecked") Flux[] fluxes = new Flux[fullDocs.length];
- for (int i = 0; i < iterables.length; i++) {
- var shardIndex = i;
- fluxes[i] = iterables[i].map(shard -> {
- if (shard instanceof LLScoreDoc scoreDoc) {
- //noinspection unchecked
- return (T) new LLScoreDoc(scoreDoc.doc(), scoreDoc.score(), shardIndex);
- } else if (shard instanceof LLFieldDoc fieldDoc) {
- //noinspection unchecked
- return (T) new LLFieldDoc(fieldDoc.doc(), fieldDoc.score(), shardIndex, fieldDoc.fields());
- } else if (shard instanceof LLSlotDoc slotDoc) {
- //noinspection unchecked
- return (T) new LLSlotDoc(slotDoc.doc(), slotDoc.score(), shardIndex, slotDoc.slot());
- } else {
- throw new UnsupportedOperationException("Unsupported type " + (shard == null ? null : shard.getClass()));
- }
- });
- if (fullDocs[i].totalHits().relation == EQUAL_TO) {
- fluxes[i] = fluxes[i].take(fullDocs[i].totalHits().value, true);
- }
- }
-
- return Flux.mergeComparing(comp, fluxes);
- }
- };
+ return new MergedResourceIterable<>(fullDocs, sort);
}
static TotalHits mergeTotalHits(FullDocs[] fullDocs) {
@@ -173,4 +73,127 @@ public interface FullDocs extends ResourceIterable {
}
return new TotalHits(totalCount, totalRelation);
}
+
+ class MergedResourceIterable extends SimpleResource implements ResourceIterable {
+
+ private final FullDocs[] fullDocs;
+ private final @Nullable Sort sort;
+
+ public MergedResourceIterable(FullDocs[] fullDocs, @Nullable Sort sort) {
+ this.fullDocs = fullDocs;
+ this.sort = sort;
+ }
+
+ @Override
+ protected void onClose() {
+ for (FullDocs fullDoc : fullDocs) {
+ fullDoc.close();
+ }
+ }
+
+ @Override
+ public Flux iterate() {
+ @SuppressWarnings("unchecked") Flux[] iterables = new Flux[fullDocs.length];
+
+ for (int i = 0; i < fullDocs.length; i++) {
+ var singleFullDocs = fullDocs[i].iterate();
+ iterables[i] = singleFullDocs;
+ }
+
+ Comparator comp;
+ if (sort == null) {
+ // Merge maintaining sorting order (Algorithm taken from TopDocs.ScoreMergeSortQueue)
+
+ comp = SCORE_DOC_SCORE_ELEM_COMPARATOR.thenComparing(DEFAULT_TIE_BREAKER);
+ } else {
+ // Merge maintaining sorting order (Algorithm taken from TopDocs.MergeSortQueue)
+
+ SortField[] sortFields = sort.getSort();
+ var comparators = new FieldComparator[sortFields.length];
+ var reverseMul = new int[sortFields.length];
+
+ for (int compIDX = 0; compIDX < sortFields.length; ++compIDX) {
+ SortField sortField = sortFields[compIDX];
+ comparators[compIDX] = sortField.getComparator(1, compIDX == 0);
+ reverseMul[compIDX] = sortField.getReverse() ? -1 : 1;
+ }
+
+ comp = (first, second) -> {
+ assert first != second;
+
+ LLFieldDoc firstFD = (LLFieldDoc) first;
+ LLFieldDoc secondFD = (LLFieldDoc) second;
+
+ for (int compIDX = 0; compIDX < comparators.length; ++compIDX) {
+ //noinspection rawtypes
+ FieldComparator fieldComp = comparators[compIDX];
+ //noinspection unchecked
+ int cmp = reverseMul[compIDX] * fieldComp.compareValues(firstFD.fields().get(compIDX),
+ secondFD.fields().get(compIDX)
+ );
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+
+ return tieBreakCompare(first, second, DEFAULT_TIE_BREAKER);
+ };
+ }
+
+ @SuppressWarnings("unchecked") Flux[] fluxes = new Flux[fullDocs.length];
+ for (int i = 0; i < iterables.length; i++) {
+ var shardIndex = i;
+ fluxes[i] = iterables[i].map(shard -> {
+ if (shard instanceof LLScoreDoc scoreDoc) {
+ //noinspection unchecked
+ return (T) new LLScoreDoc(scoreDoc.doc(), scoreDoc.score(), shardIndex);
+ } else if (shard instanceof LLFieldDoc fieldDoc) {
+ //noinspection unchecked
+ return (T) new LLFieldDoc(fieldDoc.doc(), fieldDoc.score(), shardIndex, fieldDoc.fields());
+ } else if (shard instanceof LLSlotDoc slotDoc) {
+ //noinspection unchecked
+ return (T) new LLSlotDoc(slotDoc.doc(), slotDoc.score(), shardIndex, slotDoc.slot());
+ } else {
+ throw new UnsupportedOperationException("Unsupported type " + (shard == null ? null : shard.getClass()));
+ }
+ });
+ if (fullDocs[i].totalHits().relation == EQUAL_TO) {
+ fluxes[i] = fluxes[i].take(fullDocs[i].totalHits().value, true);
+ }
+ }
+
+ return Flux.mergeComparing(comp, fluxes);
+ }
+ }
+
+ class MergedFullDocs extends SimpleResource implements FullDocs {
+
+ private final ResourceIterable mergedIterable;
+ private final TotalHits mergedTotalHits;
+
+ public MergedFullDocs(ResourceIterable mergedIterable, TotalHits mergedTotalHits) {
+ this.mergedIterable = mergedIterable;
+ this.mergedTotalHits = mergedTotalHits;
+ }
+
+ @Override
+ public void onClose() {
+ mergedIterable.close();
+ }
+
+ @Override
+ public Flux iterate() {
+ return mergedIterable.iterate();
+ }
+
+ @Override
+ public Flux iterate(long skips) {
+ return mergedIterable.iterate(skips);
+ }
+
+ @Override
+ public TotalHits totalHits() {
+ return mergedTotalHits;
+ }
+ }
}
diff --git a/src/main/java/it/cavallium/dbengine/lucene/HugePqArray.java b/src/main/java/it/cavallium/dbengine/lucene/HugePqArray.java
index d28f857..41138b1 100644
--- a/src/main/java/it/cavallium/dbengine/lucene/HugePqArray.java
+++ b/src/main/java/it/cavallium/dbengine/lucene/HugePqArray.java
@@ -6,6 +6,7 @@ import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.database.disk.HugePqEnv;
import it.cavallium.dbengine.database.disk.StandardRocksDBColumn;
+import it.cavallium.dbengine.utils.SimpleResource;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ReadOptions;
@@ -13,13 +14,12 @@ import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
-public class HugePqArray implements IArray, SafeCloseable {
+public class HugePqArray extends SimpleResource implements IArray, SafeCloseable {
static {
RocksDB.loadLibrary();
}
- private final AtomicBoolean closed = new AtomicBoolean();
private final HugePqCodec valueCodec;
private final LLTempHugePqEnv tempEnv;
private final HugePqEnv env;
@@ -118,11 +118,9 @@ public class HugePqArray implements IArray, SafeCloseable {
}
@Override
- public void close() {
- if (closed.compareAndSet(false, true)) {
- ensureThread();
- this.tempEnv.freeDb(hugePqId);
- }
+ public void onClose() {
+ ensureThread();
+ this.tempEnv.freeDb(hugePqId);
}
@Override
diff --git a/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java b/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java
index b96720a..940af71 100644
--- a/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java
+++ b/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java
@@ -10,6 +10,7 @@ import it.cavallium.dbengine.database.disk.RocksIterWithReadOpts;
import it.cavallium.dbengine.database.disk.StandardRocksDBColumn;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultMode;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultPrevious;
+import it.cavallium.dbengine.utils.SimpleResource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -26,13 +27,13 @@ import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import reactor.core.publisher.Flux;
-public class HugePqPriorityQueue implements PriorityQueue, Reversable>, ReversableResourceIterable {
+public class HugePqPriorityQueue extends SimpleResource
+ implements PriorityQueue, Reversable>, ReversableResourceIterable {
static {
RocksDB.loadLibrary();
}
- private final AtomicBoolean closed = new AtomicBoolean();
private final LLTempHugePqEnv tempEnv;
private final HugePqEnv env;
private final int hugePqId;
@@ -351,12 +352,10 @@ public class HugePqPriorityQueue implements PriorityQueue, Reversable implements PriorityQueue, Reversable reverse() {
- return new ReversableResourceIterable<>() {
- @Override
- public void close() {
- HugePqPriorityQueue.this.close();
- }
+ return new ReversedResourceIterable();
+ }
- @Override
- public Flux iterate() {
- return reverseIterate();
- }
+ private class ReversedResourceIterable extends SimpleResource implements ReversableResourceIterable {
- @Override
- public Flux iterate(long skips) {
- return reverseIterate(skips);
- }
+ @Override
+ public void onClose() {
+ HugePqPriorityQueue.this.close();
+ }
- @Override
- public ReversableResourceIterable reverse() {
- return HugePqPriorityQueue.this;
- }
- };
+ @Override
+ public Flux iterate() {
+ return reverseIterate();
+ }
+
+ @Override
+ public Flux iterate(long skips) {
+ return reverseIterate(skips);
+ }
+
+ @Override
+ public ReversableResourceIterable reverse() {
+ return HugePqPriorityQueue.this;
+ }
}
}
diff --git a/src/main/java/it/cavallium/dbengine/lucene/LLSlotDocCodec.java b/src/main/java/it/cavallium/dbengine/lucene/LLSlotDocCodec.java
index a058a8d..d0ba9ef 100644
--- a/src/main/java/it/cavallium/dbengine/lucene/LLSlotDocCodec.java
+++ b/src/main/java/it/cavallium/dbengine/lucene/LLSlotDocCodec.java
@@ -3,6 +3,7 @@ package it.cavallium.dbengine.lucene;
import io.netty5.buffer.api.Buffer;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
+import it.cavallium.dbengine.utils.SimpleResource;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -18,7 +19,8 @@ import org.apache.lucene.search.SortField;
import org.rocksdb.AbstractComparator;
import org.rocksdb.ComparatorOptions;
-public class LLSlotDocCodec implements HugePqCodec, FieldValueHitQueue, SafeCloseable {
+public class LLSlotDocCodec extends SimpleResource
+ implements HugePqCodec, FieldValueHitQueue, SafeCloseable {
private final SortField[] fields;
@@ -189,7 +191,7 @@ public class LLSlotDocCodec implements HugePqCodec, FieldValueHitQueu
}
@Override
- public void close() {
+ protected void onClose() {
for (FieldComparator> comparator : this.comparators) {
if (comparator instanceof SafeCloseable closeable) {
closeable.close();
diff --git a/src/main/java/it/cavallium/dbengine/lucene/LazyFullDocs.java b/src/main/java/it/cavallium/dbengine/lucene/LazyFullDocs.java
index 6d11321..4a07594 100644
--- a/src/main/java/it/cavallium/dbengine/lucene/LazyFullDocs.java
+++ b/src/main/java/it/cavallium/dbengine/lucene/LazyFullDocs.java
@@ -1,11 +1,12 @@
package it.cavallium.dbengine.lucene;
+import it.cavallium.dbengine.utils.SimpleResource;
import java.io.Closeable;
import java.io.IOException;
import org.apache.lucene.search.TotalHits;
import reactor.core.publisher.Flux;
-public class LazyFullDocs implements FullDocs {
+public class LazyFullDocs extends SimpleResource implements FullDocs {
private final ResourceIterable pq;
private final TotalHits totalHits;
@@ -31,7 +32,7 @@ public class LazyFullDocs implements FullDocs {
}
@Override
- public void close() {
+ protected void onClose() {
pq.close();
}
}
diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/FullDocsCollector.java b/src/main/java/it/cavallium/dbengine/lucene/collector/FullDocsCollector.java
index fe18cdd..1081395 100644
--- a/src/main/java/it/cavallium/dbengine/lucene/collector/FullDocsCollector.java
+++ b/src/main/java/it/cavallium/dbengine/lucene/collector/FullDocsCollector.java
@@ -24,6 +24,7 @@ import it.cavallium.dbengine.lucene.PriorityQueue;
import it.cavallium.dbengine.lucene.ResourceIterable;
import it.cavallium.dbengine.lucene.Reversable;
import it.cavallium.dbengine.lucene.ReversableResourceIterable;
+import it.cavallium.dbengine.utils.SimpleResource;
import java.io.Closeable;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.TopDocs;
@@ -39,7 +40,7 @@ import org.apache.lucene.search.TotalHits;
* all methods, in order to avoid a NullPointerException.
*/
public abstract class FullDocsCollector & Reversable>, INTERNAL extends LLDoc,
- EXTERNAL extends LLDoc> implements Collector, SafeCloseable {
+ EXTERNAL extends LLDoc> extends SimpleResource implements Collector, SafeCloseable {
/**
* The priority queue which holds the top documents. Note that different implementations of
@@ -72,7 +73,7 @@ public abstract class FullDocsCollector & Rev
public abstract ResourceIterable mapResults(ResourceIterable it);
@Override
- public void close() {
+ public void onClose() {
pq.close();
}
}
\ No newline at end of file
diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/FullFieldDocs.java b/src/main/java/it/cavallium/dbengine/lucene/collector/FullFieldDocs.java
index 22467db..c533caa 100644
--- a/src/main/java/it/cavallium/dbengine/lucene/collector/FullFieldDocs.java
+++ b/src/main/java/it/cavallium/dbengine/lucene/collector/FullFieldDocs.java
@@ -4,13 +4,14 @@ import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.lucene.FullDocs;
import it.cavallium.dbengine.lucene.LLDoc;
import it.cavallium.dbengine.lucene.LLFieldDoc;
+import it.cavallium.dbengine.utils.SimpleResource;
import java.io.Closeable;
import java.io.IOException;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TotalHits;
import reactor.core.publisher.Flux;
-public class FullFieldDocs implements FullDocs, SafeCloseable {
+public class FullFieldDocs extends SimpleResource implements FullDocs, SafeCloseable {
private final FullDocs fullDocs;
private final SortField[] fields;
@@ -40,7 +41,7 @@ public class FullFieldDocs implements FullDocs, SafeCloseabl
}
@Override
- public void close() {
+ protected void onClose() {
fullDocs.close();
}
}
diff --git a/src/main/java/it/cavallium/dbengine/lucene/hugepq/search/HugePqFullFieldDocCollector.java b/src/main/java/it/cavallium/dbengine/lucene/hugepq/search/HugePqFullFieldDocCollector.java
index 9d4271c..d1867db 100644
--- a/src/main/java/it/cavallium/dbengine/lucene/hugepq/search/HugePqFullFieldDocCollector.java
+++ b/src/main/java/it/cavallium/dbengine/lucene/hugepq/search/HugePqFullFieldDocCollector.java
@@ -485,11 +485,11 @@ public abstract class HugePqFullFieldDocCollector extends
}
@Override
- public void close() {
+ public void onClose() {
this.pq.close();
if (this.firstComparator instanceof SafeCloseable closeable) {
closeable.close();
}
- super.close();
+ super.onClose();
}
}
diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java
index 5296cf4..b803c86 100644
--- a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java
+++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java
@@ -11,6 +11,7 @@ import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.utils.SimpleResource;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.logging.log4j.LogManager;
@@ -77,7 +78,7 @@ public class CountMultiSearcher implements MultiSearcher {
resultsToDrop.forEach(LLUtils::finalizeResourceNow);
try {
indexSearchers.close();
- } catch (IOException e) {
+ } catch (UncheckedIOException e) {
LOG.error("Can't close index searchers", e);
}
});
@@ -122,7 +123,7 @@ public class CountMultiSearcher implements MultiSearcher {
.map(count -> new LuceneSearchResult(TotalHitsCount.of(count, true), Flux.empty(), () -> {
try {
indexSearcher.close();
- } catch (IOException e) {
+ } catch (UncheckedIOException e) {
LOG.error("Can't close index searchers", e);
}
}));
diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java
index 647a32d..ca6839c 100644
--- a/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java
+++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java
@@ -53,10 +53,16 @@ public class DecimalBucketMultiSearcher {
bucketParams.collectionRate(),
bucketParams.sampleSize()
);
- return Flux.fromIterable(indexSearchers).flatMap(shard -> Mono.fromCallable(() -> {
- LLUtils.ensureBlocking();
- return cmm.search(shard);
- })).collectList().flatMap(results -> Mono.fromSupplier(() -> cmm.reduce(results)));
+ return Flux
+ .fromIterable(indexSearchers)
+ .flatMap(shard -> Mono.fromCallable(() -> {
+ LLUtils.ensureBlocking();
+ return cmm.search(shard);
+ }))
+ .collectList()
+ .flatMap(results -> Mono.fromSupplier(() -> cmm.reduce(results)))
+ .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
+ .publishOn(Schedulers.parallel());
});
}
}
diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLocalSearcher.java
index 3fc372c..58fdcd6 100644
--- a/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLocalSearcher.java
+++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLocalSearcher.java
@@ -14,6 +14,7 @@ import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.collector.TopDocsCollectorMultiManager;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import org.apache.logging.log4j.LogManager;
@@ -69,7 +70,7 @@ public class PagedLocalSearcher implements LocalSearcher {
() -> {
try {
indexSearcher.close();
- } catch (IOException e) {
+ } catch (UncheckedIOException e) {
LOG.error(e);
}
}
diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java
index a89195a..8c0342d 100644
--- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java
+++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java
@@ -13,6 +13,7 @@ import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.PageLimits;
import it.cavallium.dbengine.lucene.collector.ScoringShardsCollectorMultiManager;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
@@ -68,7 +69,7 @@ public class ScoredPagedMultiSearcher implements MultiSearcher {
() -> {
try {
indexSearchers.close();
- } catch (IOException e) {
+ } catch (UncheckedIOException e) {
LOG.error("Can't close index searchers", e);
}
}
diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedByScoreFullMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedByScoreFullMultiSearcher.java
index 333105e..31af292 100644
--- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedByScoreFullMultiSearcher.java
+++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedByScoreFullMultiSearcher.java
@@ -13,6 +13,7 @@ import it.cavallium.dbengine.lucene.FullDocs;
import it.cavallium.dbengine.lucene.LLScoreDoc;
import it.cavallium.dbengine.lucene.hugepq.search.HugePqFullScoreDocCollector;
import java.io.IOException;
+import java.io.UncheckedIOException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.IndexSearcher;
@@ -124,7 +125,7 @@ public class SortedByScoreFullMultiSearcher implements MultiSearcher {
return new LuceneSearchResult(totalHitsCount, hitsFlux, () -> {
try {
indexSearchers.close();
- } catch (IOException e) {
+ } catch (UncheckedIOException e) {
LOG.error("Can't close index searchers", e);
}
try {
diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java
index 35df8d1..092adba 100644
--- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java
+++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java
@@ -13,6 +13,7 @@ import it.cavallium.dbengine.lucene.LLFieldDoc;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.hugepq.search.HugePqFullFieldDocCollector;
import java.io.IOException;
+import java.io.UncheckedIOException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.IndexSearcher;
@@ -118,7 +119,7 @@ public class SortedScoredFullMultiSearcher implements MultiSearcher {
return new LuceneSearchResult(totalHitsCount, hitsFlux, () -> {
try {
indexSearchers.close();
- } catch (IOException e) {
+ } catch (UncheckedIOException e) {
LOG.error("Can't close index searchers", e);
}
data.close();
diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/StandardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/StandardSearcher.java
index bec8356..ad77ea0 100644
--- a/src/main/java/it/cavallium/dbengine/lucene/searcher/StandardSearcher.java
+++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/StandardSearcher.java
@@ -9,6 +9,7 @@ import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.LuceneUtils;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -143,7 +144,7 @@ public class StandardSearcher implements MultiSearcher {
return new LuceneSearchResult(totalHitsCount, hitsFlux, () -> {
try {
indexSearchers.close();
- } catch (IOException e) {
+ } catch (UncheckedIOException e) {
LOG.error("Can't close index searchers", e);
}
});
diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedStreamingMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedStreamingMultiSearcher.java
index 4136a4e..9614cff 100644
--- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedStreamingMultiSearcher.java
+++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedStreamingMultiSearcher.java
@@ -11,6 +11,7 @@ import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.MaxScoreAccumulator;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.List;
import it.cavallium.dbengine.lucene.hugepq.search.CustomHitsThresholdChecker;
import org.apache.logging.log4j.LogManager;
@@ -62,7 +63,7 @@ public class UnsortedStreamingMultiSearcher implements MultiSearcher {
return new LuceneSearchResult(totalHitsCount, mergedFluxes, () -> {
try {
indexSearchers.close();
- } catch (IOException e) {
+ } catch (UncheckedIOException e) {
LOG.error("Can't close index searchers", e);
}
});
diff --git a/src/main/java/it/cavallium/dbengine/utils/SimpleResource.java b/src/main/java/it/cavallium/dbengine/utils/SimpleResource.java
index 0bc2094..ac88412 100644
--- a/src/main/java/it/cavallium/dbengine/utils/SimpleResource.java
+++ b/src/main/java/it/cavallium/dbengine/utils/SimpleResource.java
@@ -24,8 +24,15 @@ public abstract class SimpleResource implements SafeCloseable {
}
protected SimpleResource(boolean canClose) {
+ this(canClose, new AtomicBoolean());
+ }
+
+ protected SimpleResource(AtomicBoolean closed) {
+ this(true, closed);
+ }
+
+ private SimpleResource(boolean canClose, AtomicBoolean closed) {
this.canClose = canClose;
- var closed = new AtomicBoolean();
this.closed = closed;
if (ENABLE_LEAK_DETECTION && canClose) {
@@ -53,10 +60,14 @@ public abstract class SimpleResource implements SafeCloseable {
}
}
- private boolean isClosed() {
+ protected boolean isClosed() {
return canClose && closed.get();
}
+ protected AtomicBoolean getClosed() {
+ return closed;
+ }
+
protected void ensureOpen() {
if (canClose && closed.get()) {
throw new IllegalStateException("Resource is closed");
diff --git a/src/test/java/it/cavallium/dbengine/PriorityQueueAdaptor.java b/src/test/java/it/cavallium/dbengine/PriorityQueueAdaptor.java
index 42d5362..80b4e8d 100644
--- a/src/test/java/it/cavallium/dbengine/PriorityQueueAdaptor.java
+++ b/src/test/java/it/cavallium/dbengine/PriorityQueueAdaptor.java
@@ -1,6 +1,7 @@
package it.cavallium.dbengine;
import it.cavallium.dbengine.lucene.PriorityQueue;
+import it.cavallium.dbengine.utils.SimpleResource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -8,7 +9,7 @@ import java.util.Objects;
import org.apache.lucene.search.HitQueue;
import reactor.core.publisher.Flux;
-public class PriorityQueueAdaptor implements PriorityQueue {
+public class PriorityQueueAdaptor extends SimpleResource implements PriorityQueue {
private final org.apache.lucene.util.PriorityQueue hitQueue;
@@ -72,7 +73,7 @@ public class PriorityQueueAdaptor implements PriorityQueue {
}
@Override
- public void close() {
+ protected void onClose() {
hitQueue.clear();
hitQueue.updateTop();
}
diff --git a/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java b/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java
index 711ffbd..754b7b2 100644
--- a/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java
+++ b/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java
@@ -18,6 +18,7 @@ import it.cavallium.dbengine.lucene.searcher.MultiSearcher;
import it.cavallium.dbengine.lucene.searcher.ShardIndexSearcher;
import it.cavallium.dbengine.utils.SimpleResource;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.logging.log4j.LogManager;
@@ -81,7 +82,7 @@ public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher {
resultsToDrop.forEach(SimpleResource::close);
try {
indexSearchers.close();
- } catch (IOException e) {
+ } catch (UncheckedIOException e) {
LOG.error("Can't close index searchers", e);
}
});