Use resource

This commit is contained in:
Andrea Cavalli 2022-06-30 13:54:55 +02:00
parent 831af1ef81
commit ab93ede348
37 changed files with 510 additions and 463 deletions

51
pom.xml
View File

@ -90,7 +90,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>2020.0.19</version>
<version>2020.0.20</version>
<type>pom</type>
<scope>import</scope>
</dependency>
@ -102,7 +102,7 @@
<artifactId>reactor-tools</artifactId>
<classifier>original</classifier>
<scope>runtime</scope>
<version>3.4.18</version>
<version>3.4.19</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
@ -157,12 +157,28 @@
<artifactId>junit-jupiter-params</artifactId>
<version>${junit.jupiter.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.23.1</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</exclusion>
<exclusion>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- This will get hamcrest-core automatically -->
<dependency>
@ -579,6 +595,25 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy-maven-plugin</artifactId>
<version>1.12.13-20220629</version>
<executions>
<execution>
<goals>
<goal>transform</goal>
</goals>
</execution>
</executions>
<configuration>
<transformations>
<transformation>
<plugin>reactor.tools.agent.ReactorDebugByteBuddyPlugin</plugin>
</transformation>
</transformations>
</configuration>
</plugin>
</plugins>
<pluginManagement>
<plugins>
@ -613,18 +648,6 @@
</lifecycleMappingMetadata>
</configuration>
</plugin>
<plugin>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy-maven-plugin</artifactId>
<version>1.12.10</version>
<configuration>
<transformations>
<transformation>
<plugin>reactor.tools.agent.ReactorDebugByteBuddyPlugin</plugin>
</transformation>
</transformations>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>

View File

@ -66,7 +66,7 @@ public interface LuceneIndex<T, U> extends LLSnapshottable {
boolean isLowMemoryMode();
Mono<Void> close();
void close();
Mono<Void> flush();

View File

@ -109,9 +109,10 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
mltDocumentFields
)
.collectList()
.flatMap(shards -> mergeResults(queryParams, shards))
.mapNotNull(shards -> mergeResults(queryParams, shards))
.map(this::mapResults)
.single();
.defaultIfEmpty(Hits.empty())
.doOnDiscard(SimpleResource.class, SimpleResource::close);
}
@Override
@ -122,11 +123,10 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
indicizer.getKeyFieldName()
)
.collectList()
.flatMap(shards -> mergeResults(queryParams, shards))
.mapNotNull(shards -> mergeResults(queryParams, shards))
.map(this::mapResults)
.defaultIfEmpty(Hits.empty())
.doOnDiscard(LLSearchResultShard.class, SimpleResource::close)
.doOnDiscard(Hits.class, SimpleResource::close);
.doOnDiscard(SimpleResource.class, SimpleResource::close);
}
@Override
@ -162,8 +162,8 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
}
@Override
public Mono<Void> close() {
return luceneIndex.close();
public void close() {
luceneIndex.close();
}
/**
@ -202,41 +202,40 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
return luceneIndex.releaseSnapshot(snapshot);
}
private static Mono<LLSearchResultShard> mergeResults(ClientQueryParams queryParams,
@Nullable
private static LLSearchResultShard mergeResults(ClientQueryParams queryParams,
List<LLSearchResultShard> shards) {
if (shards.size() == 0) {
return Mono.empty();
return null;
} else if (shards.size() == 1) {
return Mono.just(shards.get(0));
return shards.get(0);
}
return Mono.fromCallable(() -> {
TotalHitsCount count = null;
ObjectArrayList<Flux<LLKeyScore>> results = new ObjectArrayList<>(shards.size());
ObjectArrayList<SimpleResource> resources = new ObjectArrayList<>(shards.size());
for (LLSearchResultShard shard : shards) {
if (count == null) {
count = shard.totalHitsCount();
} else {
count = LuceneUtils.sum(count, shard.totalHitsCount());
}
var maxLimit = queryParams.offset() + queryParams.limit();
results.add(shard.results().take(maxLimit, true));
resources.add(shard);
}
Objects.requireNonNull(count);
Flux<LLKeyScore> resultsFlux;
if (results.size() == 0) {
resultsFlux = Flux.empty();
} else if (results.size() == 1) {
resultsFlux = results.get(0);
TotalHitsCount count = null;
ObjectArrayList<Flux<LLKeyScore>> results = new ObjectArrayList<>(shards.size());
ObjectArrayList<SimpleResource> resources = new ObjectArrayList<>(shards.size());
for (LLSearchResultShard shard : shards) {
if (count == null) {
count = shard.totalHitsCount();
} else {
resultsFlux = Flux.merge(results);
count = LuceneUtils.sum(count, shard.totalHitsCount());
}
var maxLimit = queryParams.offset() + queryParams.limit();
results.add(shard.results().take(maxLimit, true));
resources.add(shard);
}
Objects.requireNonNull(count);
Flux<LLKeyScore> resultsFlux;
if (results.size() == 0) {
resultsFlux = Flux.empty();
} else if (results.size() == 1) {
resultsFlux = results.get(0);
} else {
resultsFlux = Flux.merge(results);
}
return new LLSearchResultShard(resultsFlux, count, () -> {
for (var resource : resources) {
resource.close();
}
return new LLSearchResultShard(resultsFlux, count, () -> {
for (var resource : resources) {
resource.close();
}
});
});
}
}

View File

@ -6,6 +6,7 @@ import io.netty5.buffer.api.Owned;
import io.netty5.buffer.api.Resource;
import io.netty5.buffer.api.Send;
import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.utils.SimpleResource;
import java.util.Objects;
import java.util.StringJoiner;
import org.apache.logging.log4j.LogManager;
@ -13,7 +14,7 @@ import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public class LLEntry implements SafeCloseable {
public class LLEntry extends SimpleResource implements SafeCloseable {
private static final Logger logger = LogManager.getLogger(LLEntry.class);
private Buffer key;
@ -91,7 +92,7 @@ public class LLEntry implements SafeCloseable {
}
@Override
public void close() {
protected void onClose() {
try {
if (key != null && key.isAccessible()) {
key.close();

View File

@ -15,7 +15,7 @@ import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface LLLuceneIndex extends LLSnapshottable {
public interface LLLuceneIndex extends LLSnapshottable, SafeCloseable {
String getLuceneIndexName();
@ -74,8 +74,6 @@ public interface LLLuceneIndex extends LLSnapshottable {
boolean isLowMemoryMode();
Mono<Void> close();
/**
* Flush writes to disk.
* This does not commit, it syncs the data to the disk

View File

@ -193,9 +193,9 @@ public class LLMultiLuceneIndex implements LLLuceneIndex {
}
@Override
public Mono<Void> close() {
Iterable<Mono<Void>> it = () -> luceneIndicesSet.stream().map(LLLuceneIndex::close).iterator();
return Mono.whenDelayError(it);
public void close() {
Iterable<Mono<Void>> it = () -> luceneIndicesSet.stream().map(e -> Mono.<Void>fromRunnable(e::close)).iterator();
Mono.whenDelayError(it).block();
}
@Override

View File

@ -25,6 +25,7 @@ import it.cavallium.dbengine.lucene.RandomSortField;
import it.cavallium.dbengine.utils.SimpleResource;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
@ -947,13 +948,13 @@ public class LLUtils {
} else if (next instanceof LLIndexSearcher searcher) {
try {
searcher.close();
} catch (IOException e) {
} catch (UncheckedIOException e) {
logger.error("Failed to close searcher {}", searcher, e);
}
} else if (next instanceof LLIndexSearchers searchers) {
try {
searchers.close();
} catch (IOException e) {
} catch (UncheckedIOException e) {
logger.error("Failed to close searchers {}", searchers, e);
}
} else if (next instanceof Optional<?> optional) {

View File

@ -9,6 +9,7 @@ import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.utils.SimpleResource;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.ref.Cleaner;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
@ -28,6 +29,7 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import it.cavallium.dbengine.utils.ShortNamedThreadFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Empty;
@ -35,7 +37,7 @@ import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
// todo: deduplicate code between Cached and Simple searcher managers
public class CachedIndexSearcherManager implements IndexSearcherManager {
public class CachedIndexSearcherManager extends SimpleResource implements IndexSearcherManager {
private static final Logger LOG = LogManager.getLogger(SimpleIndexSearcherManager.class);
private static final ExecutorService SEARCH_EXECUTOR = Executors.newFixedThreadPool(
@ -57,10 +59,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
private final LoadingCache<LLSnapshot, Mono<LLIndexSearcher>> cachedSnapshotSearchers;
private final Mono<LLIndexSearcher> cachedMainSearcher;
private final AtomicBoolean closeRequested = new AtomicBoolean();
private final Empty<Void> closeRequestedMono = Sinks.empty();
private final Mono<Void> closeMono;
private final Disposable refreshSubscription;
public CachedIndexSearcherManager(IndexWriter indexWriter,
@Nullable SnapshotsManager snapshotsManager,
@ -76,8 +75,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
this.searcherManager = new SearcherManager(indexWriter, applyAllDeletes, writeAllDeletes, SEARCHER_FACTORY);
Empty<Void> refresherClosed = Sinks.empty();
var refreshSubscription = Mono
this.refreshSubscription = Mono
.fromRunnable(() -> {
try {
maybeRefresh();
@ -85,11 +83,9 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
LOG.error("Failed to refresh the searcher manager", ex);
}
})
.subscribeOn(luceneHeavyTasksScheduler)
.subscribeOn(uninterruptibleScheduler(luceneHeavyTasksScheduler))
.publishOn(Schedulers.parallel())
.repeatWhen(s -> s.delayElements(queryRefreshDebounceTime))
.takeUntilOther(closeRequestedMono.asMono())
.doAfterTerminate(refresherClosed::tryEmitEmpty)
.transform(LLUtils::handleDiscard)
.subscribe();
@ -104,91 +100,52 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
}
});
this.cachedMainSearcher = this.generateCachedSearcher(null);
this.closeMono = Mono
.fromRunnable(() -> {
LOG.debug("Closing IndexSearcherManager...");
this.closeRequested.set(true);
this.closeRequestedMono.tryEmitEmpty();
refreshSubscription.dispose();
})
.then(refresherClosed.asMono())
.then(Mono.<Void>fromRunnable(() -> {
LOG.debug("Closed IndexSearcherManager");
LOG.debug("Closing refreshes...");
long initTime = System.nanoTime();
while (activeRefreshes.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
LockSupport.parkNanos(50000000);
}
LOG.debug("Closed refreshes...");
LOG.debug("Closing active searchers...");
initTime = System.nanoTime();
while (activeSearchers.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
LockSupport.parkNanos(50000000);
}
LOG.debug("Closed active searchers");
LOG.debug("Stopping searcher executor...");
cachedSnapshotSearchers.invalidateAll();
cachedSnapshotSearchers.cleanUp();
SEARCH_EXECUTOR.shutdown();
try {
//noinspection BlockingMethodInNonBlockingContext
if (!SEARCH_EXECUTOR.awaitTermination(15, TimeUnit.SECONDS)) {
SEARCH_EXECUTOR.shutdownNow();
}
} catch (InterruptedException e) {
LOG.error("Failed to stop executor", e);
}
LOG.debug("Stopped searcher executor");
}).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())))
.publishOn(Schedulers.parallel())
.cache();
}
private Mono<LLIndexSearcher> generateCachedSearcher(@Nullable LLSnapshot snapshot) {
return Mono.fromCallable(() -> {
if (closeRequested.get()) {
return null;
}
activeSearchers.incrementAndGet();
try {
IndexSearcher indexSearcher;
boolean fromSnapshot;
if (snapshotsManager == null || snapshot == null) {
indexSearcher = searcherManager.acquire();
fromSnapshot = false;
} else {
//noinspection resource
indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher(SEARCH_EXECUTOR);
fromSnapshot = true;
}
indexSearcher.setSimilarity(similarity);
assert indexSearcher.getIndexReader().getRefCount() > 0;
var closed = new AtomicBoolean();
LLIndexSearcher llIndexSearcher;
if (fromSnapshot) {
llIndexSearcher = new SnapshotIndexSearcher(indexSearcher, closed);
} else {
llIndexSearcher = new MainIndexSearcher(indexSearcher, closed);
}
SimpleResource.CLEANER.register(llIndexSearcher, () -> {
if (closed.compareAndSet(false, true)) {
LOG.warn("An index searcher was not closed!");
if (!fromSnapshot) {
try {
searcherManager.release(indexSearcher);
} catch (IOException e) {
LOG.error("Failed to release the index searcher", e);
}
}
if (isClosed()) {
return null;
}
});
return llIndexSearcher;
} catch (Throwable ex) {
activeSearchers.decrementAndGet();
throw ex;
}
})
activeSearchers.incrementAndGet();
try {
IndexSearcher indexSearcher;
boolean fromSnapshot;
if (snapshotsManager == null || snapshot == null) {
indexSearcher = searcherManager.acquire();
fromSnapshot = false;
} else {
//noinspection resource
indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher(SEARCH_EXECUTOR);
fromSnapshot = true;
}
indexSearcher.setSimilarity(similarity);
assert indexSearcher.getIndexReader().getRefCount() > 0;
var closed = new AtomicBoolean();
LLIndexSearcher llIndexSearcher;
if (fromSnapshot) {
llIndexSearcher = new SnapshotIndexSearcher(indexSearcher, closed);
} else {
llIndexSearcher = new MainIndexSearcher(indexSearcher, closed);
}
SimpleResource.CLEANER.register(llIndexSearcher, () -> {
if (closed.compareAndSet(false, true)) {
LOG.warn("An index searcher was not closed!");
if (!fromSnapshot) {
try {
searcherManager.release(indexSearcher);
} catch (IOException e) {
LOG.error("Failed to release the index searcher", e);
}
}
}
});
return llIndexSearcher;
} catch (Throwable ex) {
activeSearchers.decrementAndGet();
throw ex;
}
})
.subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.publishOn(Schedulers.parallel());
}
@ -232,8 +189,34 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
}
@Override
public Mono<Void> close() {
return closeMono;
protected void onClose() {
LOG.debug("Closing IndexSearcherManager...");
refreshSubscription.dispose();
LOG.debug("Closed IndexSearcherManager");
LOG.debug("Closing refreshes...");
long initTime = System.nanoTime();
while (activeRefreshes.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
LockSupport.parkNanos(50000000);
}
LOG.debug("Closed refreshes...");
LOG.debug("Closing active searchers...");
initTime = System.nanoTime();
while (activeSearchers.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
LockSupport.parkNanos(50000000);
}
LOG.debug("Closed active searchers");
LOG.debug("Stopping searcher executor...");
cachedSnapshotSearchers.invalidateAll();
cachedSnapshotSearchers.cleanUp();
SEARCH_EXECUTOR.shutdown();
try {
if (!SEARCH_EXECUTOR.awaitTermination(15, TimeUnit.SECONDS)) {
SEARCH_EXECUTOR.shutdownNow();
}
} catch (InterruptedException e) {
LOG.error("Failed to stop executor", e);
}
LOG.debug("Stopped searcher executor");
}
public long getActiveSearchers() {
@ -251,10 +234,12 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
}
@Override
public void onClose() throws IOException {
public void onClose() {
dropCachedIndexSearcher();
if (getClosed().compareAndSet(false, true)) {
try {
searcherManager.release(indexSearcher);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
}
@ -267,7 +252,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
}
@Override
public void onClose() throws IOException {
public void onClose() {
dropCachedIndexSearcher();
}
}

View File

@ -5,10 +5,12 @@ import static it.cavallium.dbengine.database.disk.LLTempHugePqEnv.getColumnOptio
import com.google.common.primitives.Ints;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.netty5.buffer.api.BufferAllocator;
import it.cavallium.dbengine.utils.SimpleResource;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;
@ -20,7 +22,7 @@ import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
public class HugePqEnv implements Closeable {
public class HugePqEnv extends SimpleResource {
private final RocksDB db;
private final ArrayList<ColumnFamilyHandle> defaultCfh;
@ -32,14 +34,14 @@ public class HugePqEnv implements Closeable {
}
@Override
public void close() throws IOException {
protected void onClose() {
for (var cfh : defaultCfh) {
db.destroyColumnFamilyHandle(cfh);
}
try {
db.closeE();
} catch (RocksDBException e) {
throw new IOException(e);
throw new IllegalStateException(e);
}
}

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.disk;
import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.SafeCloseable;
import java.io.IOException;
import java.util.function.Function;
import org.apache.lucene.search.IndexSearcher;
@ -9,13 +10,11 @@ import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface IndexSearcherManager {
public interface IndexSearcherManager extends SafeCloseable {
void maybeRefreshBlocking() throws IOException;
void maybeRefresh() throws IOException;
Mono<LLIndexSearcher> retrieveSearcher(@Nullable LLSnapshot snapshot);
Mono<Void> close();
}

View File

@ -4,6 +4,7 @@ import io.netty5.buffer.api.Drop;
import io.netty5.buffer.api.Owned;
import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.utils.SimpleResource;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
@ -12,43 +13,33 @@ import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.IndexSearcher;
public abstract class LLIndexSearcher implements Closeable {
public abstract class LLIndexSearcher extends SimpleResource {
protected static final Logger LOG = LogManager.getLogger(LLIndexSearcher.class);
protected final IndexSearcher indexSearcher;
private final AtomicBoolean closed;
public LLIndexSearcher(IndexSearcher indexSearcher) {
super();
this.indexSearcher = indexSearcher;
this.closed = new AtomicBoolean();
}
public LLIndexSearcher(IndexSearcher indexSearcher, AtomicBoolean closed) {
super(closed);
this.indexSearcher = indexSearcher;
this.closed = closed;
}
public IndexReader getIndexReader() {
if (closed.get()) throw new IllegalStateException("Closed");
ensureOpen();
return indexSearcher.getIndexReader();
}
public IndexSearcher getIndexSearcher() {
if (closed.get()) throw new IllegalStateException("Closed");
ensureOpen();
return indexSearcher;
}
public AtomicBoolean getClosed() {
return closed;
return super.getClosed();
}
@Override
public final void close() throws IOException {
if (closed.compareAndSet(false, true)) {
onClose();
}
}
protected abstract void onClose() throws IOException;
}

View File

@ -5,7 +5,9 @@ import io.netty5.buffer.api.Owned;
import io.netty5.buffer.api.Resource;
import io.netty5.buffer.api.Send;
import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.lucene.searcher.ShardIndexSearcher;
import it.cavallium.dbengine.utils.SimpleResource;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import java.io.Closeable;
import java.io.IOException;
@ -21,7 +23,7 @@ import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.MultiReader;
import org.apache.lucene.search.IndexSearcher;
public interface LLIndexSearchers extends Closeable {
public interface LLIndexSearchers extends SafeCloseable {
static LLIndexSearchers of(List<LLIndexSearcher> indexSearchers) {
return new ShardedIndexSearchers(indexSearchers);
@ -41,7 +43,7 @@ public interface LLIndexSearchers extends Closeable {
IndexReader allShards();
class UnshardedIndexSearchers implements LLIndexSearchers {
class UnshardedIndexSearchers extends SimpleResource implements LLIndexSearchers {
private final LLIndexSearcher indexSearcher;
@ -89,12 +91,12 @@ public interface LLIndexSearchers extends Closeable {
}
@Override
public void close() throws IOException {
protected void onClose() {
indexSearcher.close();
}
}
class ShardedIndexSearchers implements LLIndexSearchers {
class ShardedIndexSearchers extends SimpleResource implements LLIndexSearchers {
private final List<LLIndexSearcher> indexSearchers;
private final List<IndexSearcher> indexSearchersVals;
@ -160,7 +162,7 @@ public interface LLIndexSearchers extends Closeable {
}
@Override
public void close() throws IOException {
protected void onClose() {
for (LLIndexSearcher indexSearcher : indexSearchers) {
indexSearcher.close();
}
@ -186,8 +188,8 @@ public interface LLIndexSearchers extends Closeable {
}
@Override
protected void onClose() throws IOException {
parent.onClose();
protected void onClose() {
parent.close();
}
}
}

View File

@ -40,7 +40,9 @@ import it.cavallium.dbengine.lucene.searcher.LocalSearcher;
import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers;
import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities;
import it.cavallium.dbengine.rpc.current.data.LuceneOptions;
import it.cavallium.dbengine.utils.SimpleResource;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
@ -76,7 +78,7 @@ import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class LLLocalLuceneIndex implements LLLuceneIndex {
public class LLLocalLuceneIndex extends SimpleResource implements LLLuceneIndex {
protected static final Logger logger = LogManager.getLogger(LLLocalLuceneIndex.class);
@ -135,7 +137,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private final boolean lowMemory;
private final Phaser activeTasks = new Phaser(1);
private final AtomicBoolean closeRequested = new AtomicBoolean();
public LLLocalLuceneIndex(LLTempHugePqEnv env,
MeterRegistry meterRegistry,
@ -304,7 +305,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private <V> Mono<V> ensureOpen(Mono<V> mono) {
return Mono.<Void>fromCallable(() -> {
if (closeRequested.get()) {
if (isClosed()) {
throw new IllegalStateException("Lucene index is closed");
} else {
return null;
@ -538,39 +539,24 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}
@Override
public Mono<Void> close() {
return Mono
.<Void>fromCallable(() -> {
logger.debug("Waiting IndexWriter tasks...");
activeTasks.arriveAndAwaitAdvance();
logger.debug("IndexWriter tasks ended");
return null;
})
.subscribeOn(luceneHeavyTasksScheduler)
.then(searcherManager.close())
.then(Mono.<Void>fromCallable(() -> {
shutdownLock.lock();
try {
logger.debug("Closing IndexWriter...");
indexWriter.close();
directory.close();
logger.debug("IndexWriter closed");
} finally {
shutdownLock.unlock();
}
return null;
}).subscribeOn(luceneHeavyTasksScheduler))
// Avoid closing multiple times
.transformDeferred(mono -> {
if (this.closeRequested.compareAndSet(false, true)) {
logger.trace("Set closeRequested to true. Further update/write calls will result in an error");
return mono;
} else {
logger.debug("Tried to close more than once");
return Mono.empty();
}
});
protected void onClose() {
logger.debug("Waiting IndexWriter tasks...");
activeTasks.arriveAndAwaitAdvance();
logger.debug("IndexWriter tasks ended");
shutdownLock.lock();
try {
logger.debug("Closing searcher manager...");
searcherManager.close();
logger.debug("Searcher manager closed");
logger.debug("Closing IndexWriter...");
indexWriter.close();
directory.close();
logger.debug("IndexWriter closed");
} catch (IOException ex) {
throw new UncheckedIOException(ex);
} finally {
shutdownLock.unlock();
}
}
@Override
@ -580,7 +566,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
if (activeTasks.isTerminated()) return null;
shutdownLock.lock();
try {
if (closeRequested.get()) {
if (isClosed()) {
return null;
}
flushTime.recordCallable(() -> {
@ -603,7 +589,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
if (activeTasks.isTerminated()) return null;
shutdownLock.lock();
try {
if (closeRequested.get()) {
if (isClosed()) {
return null;
}
var mergeScheduler = indexWriter.getConfig().getMergeScheduler();
@ -626,7 +612,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
if (activeTasks.isTerminated()) return null;
shutdownLock.lock();
try {
if (closeRequested.get()) {
if (isClosed()) {
return null;
}
indexWriter.getConfig().setMergePolicy(NoMergePolicy.INSTANCE);
@ -653,7 +639,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
if (activeTasks.isTerminated()) return null;
shutdownLock.lock();
try {
if (closeRequested.get()) {
if (isClosed()) {
return null;
}
refreshTime.recordCallable(() -> {
@ -681,7 +667,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
public void scheduledCommit() {
shutdownLock.lock();
try {
if (closeRequested.get()) {
if (isClosed()) {
return;
}
commitTime.recordCallable(() -> {
@ -702,7 +688,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
public void scheduledMerge() { // Do not use. Merges are done automatically by merge policies
shutdownLock.lock();
try {
if (closeRequested.get()) {
if (isClosed()) {
return;
}
mergeTime.recordCallable(() -> {
@ -724,7 +710,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private double getSnapshotsCount() {
shutdownLock.lock();
try {
if (closeRequested.get()) {
if (isClosed()) {
return 0d;
}
if (snapshotsManager == null) return 0d;
@ -737,7 +723,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private double getIndexWriterFlushingBytes() {
shutdownLock.lock();
try {
if (closeRequested.get()) {
if (isClosed()) {
return 0d;
}
return indexWriter.getFlushingBytes();
@ -749,7 +735,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private double getIndexWriterMaxCompletedSequenceNumber() {
shutdownLock.lock();
try {
if (closeRequested.get()) {
if (isClosed()) {
return 0d;
}
return indexWriter.getMaxCompletedSequenceNumber();
@ -761,7 +747,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private double getIndexWriterPendingNumDocs() {
shutdownLock.lock();
try {
if (closeRequested.get()) {
if (isClosed()) {
return 0d;
}
return indexWriter.getPendingNumDocs();
@ -773,7 +759,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private double getIndexWriterMergingSegmentsSize() {
shutdownLock.lock();
try {
if (closeRequested.get()) {
if (isClosed()) {
return 0d;
}
return indexWriter.getMergingSegments().size();
@ -785,7 +771,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private double getDirectoryPendingDeletionsCount() {
shutdownLock.lock();
try {
if (closeRequested.get()) {
if (isClosed()) {
return 0d;
}
return indexWriter.getDirectory().getPendingDeletions().size();
@ -799,7 +785,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private double getDocCount() {
shutdownLock.lock();
try {
if (closeRequested.get()) {
if (isClosed()) {
return 0d;
}
var docStats = indexWriter.getDocStats();
@ -816,7 +802,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private double getMaxDoc() {
shutdownLock.lock();
try {
if (closeRequested.get()) {
if (isClosed()) {
return 0d;
}
var docStats = indexWriter.getDocStats();

View File

@ -29,9 +29,11 @@ import it.cavallium.dbengine.lucene.searcher.MultiSearcher;
import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers;
import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities;
import it.cavallium.dbengine.rpc.current.data.LuceneOptions;
import it.cavallium.dbengine.utils.SimpleResource;
import it.unimi.dsi.fastutil.ints.IntList;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@ -54,7 +56,7 @@ import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Schedulers;
public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneIndex {
private static final Logger LOG = LogManager.getLogger(LLLuceneIndex.class);
private static final boolean BYPASS_GROUPBY_BUG = Boolean.parseBoolean(System.getProperty(
@ -157,7 +159,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
.doOnDiscard(LLIndexSearcher.class, indexSearcher -> {
try {
indexSearcher.close();
} catch (IOException ex) {
} catch (UncheckedIOException ex) {
LOG.error("Failed to close an index searcher", ex);
}
})
@ -314,10 +316,17 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
}
@Override
public Mono<Void> close() {
Iterable<Mono<Void>> it = () -> luceneIndicesSet.stream().map(LLLocalLuceneIndex::close).iterator();
protected void onClose() {
Iterable<Mono<Void>> it = () -> luceneIndicesSet
.stream()
.map(part -> Mono
.<Void>fromRunnable(part::close)
.subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.publishOn(Schedulers.parallel())
)
.iterator();
var indicesCloseMono = Mono.whenDelayError(it);
return indicesCloseMono
indicesCloseMono
.then(Mono.fromCallable(() -> {
if (multiSearcher instanceof Closeable closeable) {
//noinspection BlockingMethodInNonBlockingContext
@ -326,7 +335,8 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
return null;
}).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())))
.publishOn(Schedulers.parallel())
.then();
.then()
.block();
}
@Override

View File

@ -1,7 +1,9 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.utils.SimpleResource;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import org.apache.lucene.index.DirectoryReader;
@ -9,7 +11,7 @@ import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.search.IndexSearcher;
import org.jetbrains.annotations.Nullable;
public class LuceneIndexSnapshot implements Closeable {
public class LuceneIndexSnapshot extends SimpleResource {
private final IndexCommit snapshot;
private boolean initialized;
@ -57,11 +59,16 @@ public class LuceneIndexSnapshot implements Closeable {
}
}
public synchronized void close() throws IOException {
@Override
protected synchronized void onClose() {
closed = true;
if (initialized && !failed) {
indexReader.close();
try {
indexReader.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
indexSearcher = null;
}
}

View File

@ -9,6 +9,7 @@ import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.utils.SimpleResource;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.ref.Cleaner;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
@ -28,6 +29,7 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import it.cavallium.dbengine.utils.ShortNamedThreadFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Empty;
@ -35,7 +37,7 @@ import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
// todo: deduplicate code between Cached and Simple searcher managers
public class SimpleIndexSearcherManager implements IndexSearcherManager {
public class SimpleIndexSearcherManager extends SimpleResource implements IndexSearcherManager {
private static final Logger LOG = LogManager.getLogger(SimpleIndexSearcherManager.class);
private static final ExecutorService SEARCH_EXECUTOR = Executors.newFixedThreadPool(
@ -56,10 +58,7 @@ public class SimpleIndexSearcherManager implements IndexSearcherManager {
private final AtomicLong activeSearchers = new AtomicLong(0);
private final AtomicLong activeRefreshes = new AtomicLong(0);
private final AtomicBoolean closeRequested = new AtomicBoolean();
private final Empty<Void> closeRequestedMono = Sinks.empty();
private final Mono<Void> closeMono;
private final Disposable refreshSubscription;
public SimpleIndexSearcherManager(IndexWriter indexWriter,
@Nullable SnapshotsManager snapshotsManager,
@ -75,8 +74,7 @@ public class SimpleIndexSearcherManager implements IndexSearcherManager {
this.searcherManager = new SearcherManager(indexWriter, applyAllDeletes, writeAllDeletes, SEARCHER_FACTORY);
Empty<Void> refresherClosed = Sinks.empty();
var refreshSubscription = Mono
refreshSubscription = Mono
.fromRunnable(() -> {
try {
maybeRefresh();
@ -87,47 +85,9 @@ public class SimpleIndexSearcherManager implements IndexSearcherManager {
.subscribeOn(luceneHeavyTasksScheduler)
.publishOn(Schedulers.parallel())
.repeatWhen(s -> s.delayElements(queryRefreshDebounceTime))
.takeUntilOther(closeRequestedMono.asMono())
.doAfterTerminate(refresherClosed::tryEmitEmpty)
.transform(LLUtils::handleDiscard)
.subscribe();
this.closeMono = Mono
.fromRunnable(() -> {
LOG.debug("Closing IndexSearcherManager...");
this.closeRequested.set(true);
this.closeRequestedMono.tryEmitEmpty();
refreshSubscription.dispose();
})
.then(refresherClosed.asMono())
.then(Mono.<Void>fromRunnable(() -> {
LOG.debug("Closed IndexSearcherManager");
LOG.debug("Closing refreshes...");
long initTime = System.nanoTime();
while (activeRefreshes.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
LockSupport.parkNanos(50000000);
}
LOG.debug("Closed refreshes...");
LOG.debug("Closing active searchers...");
initTime = System.nanoTime();
while (activeSearchers.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
LockSupport.parkNanos(50000000);
}
LOG.debug("Closed active searchers");
LOG.debug("Stopping searcher executor...");
SEARCH_EXECUTOR.shutdown();
try {
//noinspection BlockingMethodInNonBlockingContext
if (!SEARCH_EXECUTOR.awaitTermination(15, TimeUnit.SECONDS)) {
SEARCH_EXECUTOR.shutdownNow();
}
} catch (InterruptedException e) {
LOG.error("Failed to stop executor", e);
}
LOG.debug("Stopped searcher executor");
}).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())))
.publishOn(Schedulers.parallel())
.cache();
this.noSnapshotSearcherMono = retrieveSearcherInternal(null);
}
@ -171,7 +131,7 @@ public class SimpleIndexSearcherManager implements IndexSearcherManager {
private Mono<LLIndexSearcher> retrieveSearcherInternal(@Nullable LLSnapshot snapshot) {
return Mono.fromCallable(() -> {
if (closeRequested.get()) {
if (isClosed()) {
return null;
}
activeSearchers.incrementAndGet();
@ -218,8 +178,32 @@ public class SimpleIndexSearcherManager implements IndexSearcherManager {
}
@Override
public Mono<Void> close() {
return closeMono;
protected void onClose() {
LOG.debug("Closing IndexSearcherManager...");
refreshSubscription.dispose();
LOG.debug("Closed IndexSearcherManager");
LOG.debug("Closing refreshes...");
long initTime = System.nanoTime();
while (activeRefreshes.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
LockSupport.parkNanos(50000000);
}
LOG.debug("Closed refreshes...");
LOG.debug("Closing active searchers...");
initTime = System.nanoTime();
while (activeSearchers.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
LockSupport.parkNanos(50000000);
}
LOG.debug("Closed active searchers");
LOG.debug("Stopping searcher executor...");
SEARCH_EXECUTOR.shutdown();
try {
if (!SEARCH_EXECUTOR.awaitTermination(15, TimeUnit.SECONDS)) {
SEARCH_EXECUTOR.shutdownNow();
}
} catch (InterruptedException e) {
LOG.error("Failed to stop executor", e);
}
LOG.debug("Stopped searcher executor");
}
public long getActiveSearchers() {
@ -237,10 +221,12 @@ public class SimpleIndexSearcherManager implements IndexSearcherManager {
}
@Override
public void onClose() throws IOException {
public void onClose() {
dropCachedIndexSearcher();
if (getClosed().compareAndSet(false, true)) {
try {
searcherManager.release(indexSearcher);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
}
@ -253,7 +239,7 @@ public class SimpleIndexSearcherManager implements IndexSearcherManager {
}
@Override
public void onClose() throws IOException {
public void onClose() {
dropCachedIndexSearcher();
}
}

View File

@ -3,7 +3,9 @@ package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.utils.SimpleResource;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@ -16,7 +18,7 @@ import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class SnapshotsManager {
public class SnapshotsManager extends SimpleResource {
private final IndexWriter indexWriter;
private final SnapshotDeletionPolicy snapshotter;
@ -70,7 +72,7 @@ public class SnapshotsManager {
if (prevSnapshot != null) {
try {
prevSnapshot.close();
} catch (IOException e) {
} catch (UncheckedIOException e) {
throw new IllegalStateException("Can't close snapshot", e);
}
}
@ -105,7 +107,8 @@ public class SnapshotsManager {
return Math.max(snapshots.size(), snapshotter.getSnapshotCount());
}
public void close() {
@Override
protected void onClose() {
if (!activeTasks.isTerminated()) {
activeTasks.arriveAndAwaitAdvance();
}

View File

@ -509,8 +509,8 @@ public class LLQuicConnection implements LLDatabaseConnection {
}
@Override
public Mono<Void> close() {
return sendRequest(new CloseLuceneIndex(id)).then();
public void close() {
sendRequest(new CloseLuceneIndex(id)).then().block();
}
@Override

View File

@ -4,6 +4,7 @@ import static it.cavallium.dbengine.lucene.LLDocElementScoreComparator.SCORE_DOC
import static org.apache.lucene.search.TotalHits.Relation.*;
import it.cavallium.dbengine.lucene.collector.FullFieldDocs;
import it.cavallium.dbengine.utils.SimpleResource;
import java.io.IOException;
import java.util.Comparator;
import org.apache.lucene.search.FieldComparator;
@ -31,27 +32,7 @@ public interface FullDocs<T extends LLDoc> extends ResourceIterable<T> {
static <T extends LLDoc> FullDocs<T> merge(@Nullable Sort sort, FullDocs<T>[] fullDocs) {
ResourceIterable<T> mergedIterable = mergeResourceIterable(sort, fullDocs);
TotalHits mergedTotalHits = mergeTotalHits(fullDocs);
FullDocs<T> docs = new FullDocs<>() {
@Override
public void close() {
mergedIterable.close();
}
@Override
public Flux<T> iterate() {
return mergedIterable.iterate();
}
@Override
public Flux<T> iterate(long skips) {
return mergedIterable.iterate(skips);
}
@Override
public TotalHits totalHits() {
return mergedTotalHits;
}
};
FullDocs<T> docs = new MergedFullDocs<>(mergedIterable, mergedTotalHits);
if (sort != null) {
return new FullFieldDocs<>(docs, sort.getSort());
} else {
@ -76,88 +57,7 @@ public interface FullDocs<T extends LLDoc> extends ResourceIterable<T> {
static <T extends LLDoc> ResourceIterable<T> mergeResourceIterable(
@Nullable Sort sort,
FullDocs<T>[] fullDocs) {
return new ResourceIterable<>() {
@Override
public void close() {
for (FullDocs<T> fullDoc : fullDocs) {
fullDoc.close();
}
}
@Override
public Flux<T> iterate() {
@SuppressWarnings("unchecked") Flux<T>[] iterables = new Flux[fullDocs.length];
for (int i = 0; i < fullDocs.length; i++) {
var singleFullDocs = fullDocs[i].iterate();
iterables[i] = singleFullDocs;
}
Comparator<LLDoc> comp;
if (sort == null) {
// Merge maintaining sorting order (Algorithm taken from TopDocs.ScoreMergeSortQueue)
comp = SCORE_DOC_SCORE_ELEM_COMPARATOR.thenComparing(DEFAULT_TIE_BREAKER);
} else {
// Merge maintaining sorting order (Algorithm taken from TopDocs.MergeSortQueue)
SortField[] sortFields = sort.getSort();
var comparators = new FieldComparator[sortFields.length];
var reverseMul = new int[sortFields.length];
for (int compIDX = 0; compIDX < sortFields.length; ++compIDX) {
SortField sortField = sortFields[compIDX];
comparators[compIDX] = sortField.getComparator(1, compIDX == 0);
reverseMul[compIDX] = sortField.getReverse() ? -1 : 1;
}
comp = (first, second) -> {
assert first != second;
LLFieldDoc firstFD = (LLFieldDoc) first;
LLFieldDoc secondFD = (LLFieldDoc) second;
for (int compIDX = 0; compIDX < comparators.length; ++compIDX) {
//noinspection rawtypes
FieldComparator fieldComp = comparators[compIDX];
//noinspection unchecked
int cmp = reverseMul[compIDX] * fieldComp.compareValues(firstFD.fields().get(compIDX),
secondFD.fields().get(compIDX)
);
if (cmp != 0) {
return cmp;
}
}
return tieBreakCompare(first, second, DEFAULT_TIE_BREAKER);
};
}
@SuppressWarnings("unchecked") Flux<T>[] fluxes = new Flux[fullDocs.length];
for (int i = 0; i < iterables.length; i++) {
var shardIndex = i;
fluxes[i] = iterables[i].map(shard -> {
if (shard instanceof LLScoreDoc scoreDoc) {
//noinspection unchecked
return (T) new LLScoreDoc(scoreDoc.doc(), scoreDoc.score(), shardIndex);
} else if (shard instanceof LLFieldDoc fieldDoc) {
//noinspection unchecked
return (T) new LLFieldDoc(fieldDoc.doc(), fieldDoc.score(), shardIndex, fieldDoc.fields());
} else if (shard instanceof LLSlotDoc slotDoc) {
//noinspection unchecked
return (T) new LLSlotDoc(slotDoc.doc(), slotDoc.score(), shardIndex, slotDoc.slot());
} else {
throw new UnsupportedOperationException("Unsupported type " + (shard == null ? null : shard.getClass()));
}
});
if (fullDocs[i].totalHits().relation == EQUAL_TO) {
fluxes[i] = fluxes[i].take(fullDocs[i].totalHits().value, true);
}
}
return Flux.mergeComparing(comp, fluxes);
}
};
return new MergedResourceIterable<>(fullDocs, sort);
}
static <T extends LLDoc> TotalHits mergeTotalHits(FullDocs<T>[] fullDocs) {
@ -173,4 +73,127 @@ public interface FullDocs<T extends LLDoc> extends ResourceIterable<T> {
}
return new TotalHits(totalCount, totalRelation);
}
class MergedResourceIterable<T extends LLDoc> extends SimpleResource implements ResourceIterable<T> {
private final FullDocs<T>[] fullDocs;
private final @Nullable Sort sort;
public MergedResourceIterable(FullDocs<T>[] fullDocs, @Nullable Sort sort) {
this.fullDocs = fullDocs;
this.sort = sort;
}
@Override
protected void onClose() {
for (FullDocs<T> fullDoc : fullDocs) {
fullDoc.close();
}
}
@Override
public Flux<T> iterate() {
@SuppressWarnings("unchecked") Flux<T>[] iterables = new Flux[fullDocs.length];
for (int i = 0; i < fullDocs.length; i++) {
var singleFullDocs = fullDocs[i].iterate();
iterables[i] = singleFullDocs;
}
Comparator<LLDoc> comp;
if (sort == null) {
// Merge maintaining sorting order (Algorithm taken from TopDocs.ScoreMergeSortQueue)
comp = SCORE_DOC_SCORE_ELEM_COMPARATOR.thenComparing(DEFAULT_TIE_BREAKER);
} else {
// Merge maintaining sorting order (Algorithm taken from TopDocs.MergeSortQueue)
SortField[] sortFields = sort.getSort();
var comparators = new FieldComparator[sortFields.length];
var reverseMul = new int[sortFields.length];
for (int compIDX = 0; compIDX < sortFields.length; ++compIDX) {
SortField sortField = sortFields[compIDX];
comparators[compIDX] = sortField.getComparator(1, compIDX == 0);
reverseMul[compIDX] = sortField.getReverse() ? -1 : 1;
}
comp = (first, second) -> {
assert first != second;
LLFieldDoc firstFD = (LLFieldDoc) first;
LLFieldDoc secondFD = (LLFieldDoc) second;
for (int compIDX = 0; compIDX < comparators.length; ++compIDX) {
//noinspection rawtypes
FieldComparator fieldComp = comparators[compIDX];
//noinspection unchecked
int cmp = reverseMul[compIDX] * fieldComp.compareValues(firstFD.fields().get(compIDX),
secondFD.fields().get(compIDX)
);
if (cmp != 0) {
return cmp;
}
}
return tieBreakCompare(first, second, DEFAULT_TIE_BREAKER);
};
}
@SuppressWarnings("unchecked") Flux<T>[] fluxes = new Flux[fullDocs.length];
for (int i = 0; i < iterables.length; i++) {
var shardIndex = i;
fluxes[i] = iterables[i].map(shard -> {
if (shard instanceof LLScoreDoc scoreDoc) {
//noinspection unchecked
return (T) new LLScoreDoc(scoreDoc.doc(), scoreDoc.score(), shardIndex);
} else if (shard instanceof LLFieldDoc fieldDoc) {
//noinspection unchecked
return (T) new LLFieldDoc(fieldDoc.doc(), fieldDoc.score(), shardIndex, fieldDoc.fields());
} else if (shard instanceof LLSlotDoc slotDoc) {
//noinspection unchecked
return (T) new LLSlotDoc(slotDoc.doc(), slotDoc.score(), shardIndex, slotDoc.slot());
} else {
throw new UnsupportedOperationException("Unsupported type " + (shard == null ? null : shard.getClass()));
}
});
if (fullDocs[i].totalHits().relation == EQUAL_TO) {
fluxes[i] = fluxes[i].take(fullDocs[i].totalHits().value, true);
}
}
return Flux.mergeComparing(comp, fluxes);
}
}
class MergedFullDocs<T extends LLDoc> extends SimpleResource implements FullDocs<T> {
private final ResourceIterable<T> mergedIterable;
private final TotalHits mergedTotalHits;
public MergedFullDocs(ResourceIterable<T> mergedIterable, TotalHits mergedTotalHits) {
this.mergedIterable = mergedIterable;
this.mergedTotalHits = mergedTotalHits;
}
@Override
public void onClose() {
mergedIterable.close();
}
@Override
public Flux<T> iterate() {
return mergedIterable.iterate();
}
@Override
public Flux<T> iterate(long skips) {
return mergedIterable.iterate(skips);
}
@Override
public TotalHits totalHits() {
return mergedTotalHits;
}
}
}

View File

@ -6,6 +6,7 @@ import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.database.disk.HugePqEnv;
import it.cavallium.dbengine.database.disk.StandardRocksDBColumn;
import it.cavallium.dbengine.utils.SimpleResource;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ReadOptions;
@ -13,13 +14,12 @@ import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
public class HugePqArray<V> implements IArray<V>, SafeCloseable {
public class HugePqArray<V> extends SimpleResource implements IArray<V>, SafeCloseable {
static {
RocksDB.loadLibrary();
}
private final AtomicBoolean closed = new AtomicBoolean();
private final HugePqCodec<V> valueCodec;
private final LLTempHugePqEnv tempEnv;
private final HugePqEnv env;
@ -118,11 +118,9 @@ public class HugePqArray<V> implements IArray<V>, SafeCloseable {
}
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
ensureThread();
this.tempEnv.freeDb(hugePqId);
}
public void onClose() {
ensureThread();
this.tempEnv.freeDb(hugePqId);
}
@Override

View File

@ -10,6 +10,7 @@ import it.cavallium.dbengine.database.disk.RocksIterWithReadOpts;
import it.cavallium.dbengine.database.disk.StandardRocksDBColumn;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultMode;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultPrevious;
import it.cavallium.dbengine.utils.SimpleResource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@ -26,13 +27,13 @@ import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import reactor.core.publisher.Flux;
public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<ReversableResourceIterable<T>>, ReversableResourceIterable<T> {
public class HugePqPriorityQueue<T> extends SimpleResource
implements PriorityQueue<T>, Reversable<ReversableResourceIterable<T>>, ReversableResourceIterable<T> {
static {
RocksDB.loadLibrary();
}
private final AtomicBoolean closed = new AtomicBoolean();
private final LLTempHugePqEnv tempEnv;
private final HugePqEnv env;
private final int hugePqId;
@ -351,12 +352,10 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
}
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
this.tempEnv.freeDb(hugePqId);
if (this.codec instanceof SafeCloseable closeable) {
closeable.close();
}
protected void onClose() {
this.tempEnv.freeDb(hugePqId);
if (this.codec instanceof SafeCloseable closeable) {
closeable.close();
}
}
@ -369,26 +368,29 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
@Override
public ReversableResourceIterable<T> reverse() {
return new ReversableResourceIterable<>() {
@Override
public void close() {
HugePqPriorityQueue.this.close();
}
return new ReversedResourceIterable();
}
@Override
public Flux<T> iterate() {
return reverseIterate();
}
private class ReversedResourceIterable extends SimpleResource implements ReversableResourceIterable<T> {
@Override
public Flux<T> iterate(long skips) {
return reverseIterate(skips);
}
@Override
public void onClose() {
HugePqPriorityQueue.this.close();
}
@Override
public ReversableResourceIterable<T> reverse() {
return HugePqPriorityQueue.this;
}
};
@Override
public Flux<T> iterate() {
return reverseIterate();
}
@Override
public Flux<T> iterate(long skips) {
return reverseIterate(skips);
}
@Override
public ReversableResourceIterable<T> reverse() {
return HugePqPriorityQueue.this;
}
}
}

View File

@ -3,6 +3,7 @@ package it.cavallium.dbengine.lucene;
import io.netty5.buffer.api.Buffer;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.utils.SimpleResource;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -18,7 +19,8 @@ import org.apache.lucene.search.SortField;
import org.rocksdb.AbstractComparator;
import org.rocksdb.ComparatorOptions;
public class LLSlotDocCodec implements HugePqCodec<LLSlotDoc>, FieldValueHitQueue, SafeCloseable {
public class LLSlotDocCodec extends SimpleResource
implements HugePqCodec<LLSlotDoc>, FieldValueHitQueue, SafeCloseable {
private final SortField[] fields;
@ -189,7 +191,7 @@ public class LLSlotDocCodec implements HugePqCodec<LLSlotDoc>, FieldValueHitQueu
}
@Override
public void close() {
protected void onClose() {
for (FieldComparator<?> comparator : this.comparators) {
if (comparator instanceof SafeCloseable closeable) {
closeable.close();

View File

@ -1,11 +1,12 @@
package it.cavallium.dbengine.lucene;
import it.cavallium.dbengine.utils.SimpleResource;
import java.io.Closeable;
import java.io.IOException;
import org.apache.lucene.search.TotalHits;
import reactor.core.publisher.Flux;
public class LazyFullDocs<T extends LLDoc> implements FullDocs<T> {
public class LazyFullDocs<T extends LLDoc> extends SimpleResource implements FullDocs<T> {
private final ResourceIterable<T> pq;
private final TotalHits totalHits;
@ -31,7 +32,7 @@ public class LazyFullDocs<T extends LLDoc> implements FullDocs<T> {
}
@Override
public void close() {
protected void onClose() {
pq.close();
}
}

View File

@ -24,6 +24,7 @@ import it.cavallium.dbengine.lucene.PriorityQueue;
import it.cavallium.dbengine.lucene.ResourceIterable;
import it.cavallium.dbengine.lucene.Reversable;
import it.cavallium.dbengine.lucene.ReversableResourceIterable;
import it.cavallium.dbengine.utils.SimpleResource;
import java.io.Closeable;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.TopDocs;
@ -39,7 +40,7 @@ import org.apache.lucene.search.TotalHits;
* all methods, in order to avoid a NullPointerException.
*/
public abstract class FullDocsCollector<PQ extends PriorityQueue<INTERNAL> & Reversable<ReversableResourceIterable<INTERNAL>>, INTERNAL extends LLDoc,
EXTERNAL extends LLDoc> implements Collector, SafeCloseable {
EXTERNAL extends LLDoc> extends SimpleResource implements Collector, SafeCloseable {
/**
* The priority queue which holds the top documents. Note that different implementations of
@ -72,7 +73,7 @@ public abstract class FullDocsCollector<PQ extends PriorityQueue<INTERNAL> & Rev
public abstract ResourceIterable<EXTERNAL> mapResults(ResourceIterable<INTERNAL> it);
@Override
public void close() {
public void onClose() {
pq.close();
}
}

View File

@ -4,13 +4,14 @@ import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.lucene.FullDocs;
import it.cavallium.dbengine.lucene.LLDoc;
import it.cavallium.dbengine.lucene.LLFieldDoc;
import it.cavallium.dbengine.utils.SimpleResource;
import java.io.Closeable;
import java.io.IOException;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TotalHits;
import reactor.core.publisher.Flux;
public class FullFieldDocs<T extends LLDoc> implements FullDocs<T>, SafeCloseable {
public class FullFieldDocs<T extends LLDoc> extends SimpleResource implements FullDocs<T>, SafeCloseable {
private final FullDocs<T> fullDocs;
private final SortField[] fields;
@ -40,7 +41,7 @@ public class FullFieldDocs<T extends LLDoc> implements FullDocs<T>, SafeCloseabl
}
@Override
public void close() {
protected void onClose() {
fullDocs.close();
}
}

View File

@ -485,11 +485,11 @@ public abstract class HugePqFullFieldDocCollector extends
}
@Override
public void close() {
public void onClose() {
this.pq.close();
if (this.firstComparator instanceof SafeCloseable closeable) {
closeable.close();
}
super.close();
super.onClose();
}
}

View File

@ -11,6 +11,7 @@ import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.utils.SimpleResource;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.logging.log4j.LogManager;
@ -77,7 +78,7 @@ public class CountMultiSearcher implements MultiSearcher {
resultsToDrop.forEach(LLUtils::finalizeResourceNow);
try {
indexSearchers.close();
} catch (IOException e) {
} catch (UncheckedIOException e) {
LOG.error("Can't close index searchers", e);
}
});
@ -122,7 +123,7 @@ public class CountMultiSearcher implements MultiSearcher {
.map(count -> new LuceneSearchResult(TotalHitsCount.of(count, true), Flux.empty(), () -> {
try {
indexSearcher.close();
} catch (IOException e) {
} catch (UncheckedIOException e) {
LOG.error("Can't close index searchers", e);
}
}));

View File

@ -53,10 +53,16 @@ public class DecimalBucketMultiSearcher {
bucketParams.collectionRate(),
bucketParams.sampleSize()
);
return Flux.fromIterable(indexSearchers).flatMap(shard -> Mono.fromCallable(() -> {
LLUtils.ensureBlocking();
return cmm.search(shard);
})).collectList().flatMap(results -> Mono.fromSupplier(() -> cmm.reduce(results)));
return Flux
.fromIterable(indexSearchers)
.flatMap(shard -> Mono.fromCallable(() -> {
LLUtils.ensureBlocking();
return cmm.search(shard);
}))
.collectList()
.flatMap(results -> Mono.fromSupplier(() -> cmm.reduce(results)))
.subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.publishOn(Schedulers.parallel());
});
}
}

View File

@ -14,6 +14,7 @@ import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.collector.TopDocsCollectorMultiManager;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import org.apache.logging.log4j.LogManager;
@ -69,7 +70,7 @@ public class PagedLocalSearcher implements LocalSearcher {
() -> {
try {
indexSearcher.close();
} catch (IOException e) {
} catch (UncheckedIOException e) {
LOG.error(e);
}
}

View File

@ -13,6 +13,7 @@ import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.PageLimits;
import it.cavallium.dbengine.lucene.collector.ScoringShardsCollectorMultiManager;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
@ -68,7 +69,7 @@ public class ScoredPagedMultiSearcher implements MultiSearcher {
() -> {
try {
indexSearchers.close();
} catch (IOException e) {
} catch (UncheckedIOException e) {
LOG.error("Can't close index searchers", e);
}
}

View File

@ -13,6 +13,7 @@ import it.cavallium.dbengine.lucene.FullDocs;
import it.cavallium.dbengine.lucene.LLScoreDoc;
import it.cavallium.dbengine.lucene.hugepq.search.HugePqFullScoreDocCollector;
import java.io.IOException;
import java.io.UncheckedIOException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.IndexSearcher;
@ -124,7 +125,7 @@ public class SortedByScoreFullMultiSearcher implements MultiSearcher {
return new LuceneSearchResult(totalHitsCount, hitsFlux, () -> {
try {
indexSearchers.close();
} catch (IOException e) {
} catch (UncheckedIOException e) {
LOG.error("Can't close index searchers", e);
}
try {

View File

@ -13,6 +13,7 @@ import it.cavallium.dbengine.lucene.LLFieldDoc;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.hugepq.search.HugePqFullFieldDocCollector;
import java.io.IOException;
import java.io.UncheckedIOException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.IndexSearcher;
@ -118,7 +119,7 @@ public class SortedScoredFullMultiSearcher implements MultiSearcher {
return new LuceneSearchResult(totalHitsCount, hitsFlux, () -> {
try {
indexSearchers.close();
} catch (IOException e) {
} catch (UncheckedIOException e) {
LOG.error("Can't close index searchers", e);
}
data.close();

View File

@ -9,6 +9,7 @@ import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.LuceneUtils;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -143,7 +144,7 @@ public class StandardSearcher implements MultiSearcher {
return new LuceneSearchResult(totalHitsCount, hitsFlux, () -> {
try {
indexSearchers.close();
} catch (IOException e) {
} catch (UncheckedIOException e) {
LOG.error("Can't close index searchers", e);
}
});

View File

@ -11,6 +11,7 @@ import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.MaxScoreAccumulator;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import it.cavallium.dbengine.lucene.hugepq.search.CustomHitsThresholdChecker;
import org.apache.logging.log4j.LogManager;
@ -62,7 +63,7 @@ public class UnsortedStreamingMultiSearcher implements MultiSearcher {
return new LuceneSearchResult(totalHitsCount, mergedFluxes, () -> {
try {
indexSearchers.close();
} catch (IOException e) {
} catch (UncheckedIOException e) {
LOG.error("Can't close index searchers", e);
}
});

View File

@ -24,8 +24,15 @@ public abstract class SimpleResource implements SafeCloseable {
}
protected SimpleResource(boolean canClose) {
this(canClose, new AtomicBoolean());
}
protected SimpleResource(AtomicBoolean closed) {
this(true, closed);
}
private SimpleResource(boolean canClose, AtomicBoolean closed) {
this.canClose = canClose;
var closed = new AtomicBoolean();
this.closed = closed;
if (ENABLE_LEAK_DETECTION && canClose) {
@ -53,10 +60,14 @@ public abstract class SimpleResource implements SafeCloseable {
}
}
private boolean isClosed() {
protected boolean isClosed() {
return canClose && closed.get();
}
protected AtomicBoolean getClosed() {
return closed;
}
protected void ensureOpen() {
if (canClose && closed.get()) {
throw new IllegalStateException("Resource is closed");

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine;
import it.cavallium.dbengine.lucene.PriorityQueue;
import it.cavallium.dbengine.utils.SimpleResource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -8,7 +9,7 @@ import java.util.Objects;
import org.apache.lucene.search.HitQueue;
import reactor.core.publisher.Flux;
public class PriorityQueueAdaptor<T> implements PriorityQueue<T> {
public class PriorityQueueAdaptor<T> extends SimpleResource implements PriorityQueue<T> {
private final org.apache.lucene.util.PriorityQueue<T> hitQueue;
@ -72,7 +73,7 @@ public class PriorityQueueAdaptor<T> implements PriorityQueue<T> {
}
@Override
public void close() {
protected void onClose() {
hitQueue.clear();
hitQueue.updateTop();
}

View File

@ -18,6 +18,7 @@ import it.cavallium.dbengine.lucene.searcher.MultiSearcher;
import it.cavallium.dbengine.lucene.searcher.ShardIndexSearcher;
import it.cavallium.dbengine.utils.SimpleResource;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.logging.log4j.LogManager;
@ -81,7 +82,7 @@ public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher {
resultsToDrop.forEach(SimpleResource::close);
try {
indexSearchers.close();
} catch (IOException e) {
} catch (UncheckedIOException e) {
LOG.error("Can't close index searchers", e);
}
});