diff --git a/src/main/java/it/cavallium/dbengine/client/Hits.java b/src/main/java/it/cavallium/dbengine/client/Hits.java index 1460951..33b9272 100644 --- a/src/main/java/it/cavallium/dbengine/client/Hits.java +++ b/src/main/java/it/cavallium/dbengine/client/Hits.java @@ -7,6 +7,7 @@ import io.netty5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.database.DiscardingCloseable; import it.cavallium.dbengine.database.collections.ValueGetter; import it.cavallium.dbengine.database.collections.ValueTransformer; +import it.cavallium.dbengine.lucene.LuceneCloseable; import it.cavallium.dbengine.utils.SimpleResource; import java.util.Map.Entry; import java.util.Optional; @@ -15,22 +16,20 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuples; -public final class Hits extends SimpleResource implements DiscardingCloseable { +public class Hits extends SimpleResource implements DiscardingCloseable { - private static final Hits EMPTY_HITS = new Hits<>(Flux.empty(), TotalHitsCount.of(0, true), null, false); - private Flux results; - private TotalHitsCount totalHitsCount; - private Runnable onClose; + private static final Hits EMPTY_HITS = new Hits<>(Flux.empty(), TotalHitsCount.of(0, true), false); + private final Flux results; + private final TotalHitsCount totalHitsCount; - public Hits(Flux results, TotalHitsCount totalHitsCount, Runnable onClose) { - this(results, totalHitsCount, onClose, true); + public Hits(Flux results, TotalHitsCount totalHitsCount) { + this(results, totalHitsCount, true); } - private Hits(Flux results, TotalHitsCount totalHitsCount, Runnable onClose, boolean canClose) { - super(canClose && onClose != null); + private Hits(Flux results, TotalHitsCount totalHitsCount, boolean canClose) { + super(canClose); this.results = results; this.totalHitsCount = totalHitsCount; - this.onClose = onClose; } @SuppressWarnings("unchecked") @@ -38,25 +37,12 @@ public final class Hits extends SimpleResource implements DiscardingCloseable return (Hits) EMPTY_HITS; } - public static Hits> withValuesLazy(Hits> hits, - ValueGetter valuesGetter) { - var hitsEntry = hits.results().map(hitKey -> hitKey.withValue(valuesGetter::get)); - - return new Hits<>(hitsEntry, hits.totalHitsCount, hits::close); - } - - public static Hits> withValues(Hits> hits, ValueGetter valuesGetter) { - var hitsEntry = hits.results().flatMap(hitKey -> hitKey.withValue(valuesGetter::get)); - - return new Hits<>(hitsEntry, hits.totalHitsCount, hits::close); - } - public static Function>, Hits>> generateMapper( ValueGetter valueGetter) { return result -> { var hitsToTransform = result.results() .map(hit -> new LazyHitEntry<>(Mono.just(hit.key()), valueGetter.get(hit.key()), hit.score())); - return new Hits<>(hitsToTransform, result.totalHitsCount(), result::close); + return new MappedHits<>(hitsToTransform, result.totalHitsCount(), result); }; } @@ -80,7 +66,7 @@ public final class Hits extends SimpleResource implements DiscardingCloseable return new LazyHitEntry<>(keyMono, valMono, score); }, keysFlux, valuesFlux, scoresFlux); - return new Hits<>(transformedFlux, result.totalHitsCount(), result::close); + return new MappedHits<>(transformedFlux, result.totalHitsCount(), result); } catch (Throwable t) { result.close(); throw t; @@ -105,8 +91,30 @@ public final class Hits extends SimpleResource implements DiscardingCloseable @Override protected void onClose() { - if (onClose != null) { - onClose.run(); + } + + private static sealed class MappedHits extends Hits { + + private final Hits parent; + + public MappedHits(Flux hits, + TotalHitsCount count, + Hits parent) { + super(hits, count); + this.parent = parent; + } + + @Override + protected void onClose() { + parent.close(); + super.onClose(); + } + } + + private static final class MappedLuceneHits extends MappedHits implements LuceneCloseable { + + public MappedLuceneHits(Flux hits, TotalHitsCount count, Hits parent) { + super(hits, count, parent); } } } diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java index be9091a..b90c203 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java @@ -1,32 +1,32 @@ package it.cavallium.dbengine.client; -import io.netty5.util.Resource; -import io.netty5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.client.query.ClientQueryParams; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.DiscardingCloseable; import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLLuceneIndex; -import it.cavallium.dbengine.database.LLSearchResult; import it.cavallium.dbengine.database.LLSearchResultShard; +import it.cavallium.dbengine.database.LLSearchResultShard.LuceneLLSearchResultShard; +import it.cavallium.dbengine.database.LLSearchResultShard.ResourcesLLSearchResultShard; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.database.LLUpdateDocument; -import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.SafeCloseable; +import it.cavallium.dbengine.lucene.LuceneCloseable; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.collector.Buckets; import it.cavallium.dbengine.lucene.searcher.BucketParams; import it.cavallium.dbengine.utils.SimpleResource; import it.unimi.dsi.fastutil.objects.ObjectArrayList; import java.time.Duration; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.logging.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; @@ -36,6 +36,7 @@ import reactor.core.publisher.SignalType; public class LuceneIndexImpl implements LuceneIndex { private static final Duration MAX_COUNT_TIME = Duration.ofSeconds(30); + private static final Logger LOG = LogManager.getLogger(LuceneIndex.class); private final LLLuceneIndex luceneIndex; private final Indicizer indicizer; @@ -142,10 +143,14 @@ public class LuceneIndexImpl implements LuceneIndex { } private Hits> mapResults(LLSearchResultShard llSearchResult) { - var scoresWithKeysFlux = llSearchResult.results() + Flux> scoresWithKeysFlux = llSearchResult.results() .map(hit -> new HitKey<>(indicizer.getKey(hit.key()), hit.score())); - return new Hits<>(scoresWithKeysFlux, llSearchResult.totalHitsCount(), llSearchResult::close); + if (llSearchResult instanceof LuceneCloseable luceneCloseable) { + return new LuceneHits<>(scoresWithKeysFlux, llSearchResult.totalHitsCount(), luceneCloseable); + } else { + return new CloseableHits<>(scoresWithKeysFlux, llSearchResult.totalHitsCount(), llSearchResult); + } } @Override @@ -201,6 +206,7 @@ public class LuceneIndexImpl implements LuceneIndex { return luceneIndex.releaseSnapshot(snapshot); } + @SuppressWarnings({"unchecked", "rawtypes"}) @Nullable private static LLSearchResultShard mergeResults(ClientQueryParams queryParams, List shards) { if (shards.size() == 0) { @@ -210,8 +216,12 @@ public class LuceneIndexImpl implements LuceneIndex { } TotalHitsCount count = null; ObjectArrayList> results = new ObjectArrayList<>(shards.size()); - ObjectArrayList resources = new ObjectArrayList<>(shards.size()); + ObjectArrayList resources = new ObjectArrayList(shards.size()); + boolean luceneResources = false; for (LLSearchResultShard shard : shards) { + if (!luceneResources && shard instanceof LuceneCloseable) { + luceneResources = true; + } if (count == null) { count = shard.totalHitsCount(); } else { @@ -230,10 +240,51 @@ public class LuceneIndexImpl implements LuceneIndex { } else { resultsFlux = Flux.merge(results); } - return new LLSearchResultShard(resultsFlux, count, () -> { - for (var resource : resources) { - resource.close(); - } - }); + if (luceneResources) { + return new LuceneLLSearchResultShard(resultsFlux, count, (List) resources); + } else { + return new ResourcesLLSearchResultShard(resultsFlux, count, (List) resources); + } } + + private static final class LuceneHits extends Hits implements LuceneCloseable { + + private final LuceneCloseable resource; + + public LuceneHits(Flux hits, TotalHitsCount count, LuceneCloseable resource) { + super(hits, count); + this.resource = resource; + } + + @Override + protected void onClose() { + try { + resource.close(); + } catch (Throwable ex) { + LOG.error("Failed to close resource", ex); + } + super.onClose(); + } + } + + private static final class CloseableHits extends Hits { + + private final SafeCloseable resource; + + public CloseableHits(Flux hits, TotalHitsCount count, SafeCloseable resource) { + super(hits, count); + this.resource = resource; + } + + @Override + protected void onClose() { + try { + resource.close(); + } catch (Throwable ex) { + LOG.error("Failed to close resource", ex); + } + super.onClose(); + } + } + } diff --git a/src/main/java/it/cavallium/dbengine/database/LLSearchResultShard.java b/src/main/java/it/cavallium/dbengine/database/LLSearchResultShard.java index 50bc5ca..0130437 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLSearchResultShard.java +++ b/src/main/java/it/cavallium/dbengine/database/LLSearchResultShard.java @@ -3,25 +3,37 @@ package it.cavallium.dbengine.database; import io.netty5.buffer.api.Drop; import io.netty5.buffer.api.Owned; import io.netty5.buffer.api.internal.ResourceSupport; +import it.cavallium.dbengine.client.LuceneIndexImpl; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; +import it.cavallium.dbengine.lucene.LuceneCloseable; import it.cavallium.dbengine.utils.SimpleResource; +import it.unimi.dsi.fastutil.objects.ObjectArrayList; +import java.util.List; import java.util.Objects; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import reactor.core.publisher.Flux; -public final class LLSearchResultShard extends SimpleResource implements DiscardingCloseable { +public class LLSearchResultShard extends SimpleResource implements DiscardingCloseable { - private static final Logger logger = LogManager.getLogger(LLSearchResultShard.class); + private static final Logger LOG = LogManager.getLogger(LLSearchResultShard.class); private final Flux results; private final TotalHitsCount totalHitsCount; - private final Runnable onClose; - public LLSearchResultShard(Flux results, TotalHitsCount totalHitsCount, Runnable onClose) { + public LLSearchResultShard(Flux results, TotalHitsCount totalHitsCount) { this.results = results; this.totalHitsCount = totalHitsCount; - this.onClose = onClose; + } + + public static LLSearchResultShard withResource(Flux results, + TotalHitsCount totalHitsCount, + SafeCloseable closeableResource) { + if (closeableResource instanceof LuceneCloseable luceneCloseable) { + return new LuceneLLSearchResultShard(results, totalHitsCount, List.of(luceneCloseable)); + } else { + return new ResourcesLLSearchResultShard(results, totalHitsCount, List.of(closeableResource)); + } } public Flux results() { @@ -56,13 +68,61 @@ public final class LLSearchResultShard extends SimpleResource implements Discard @Override public void onClose() { - try { - var onClose = this.onClose; - if (onClose != null) { - onClose.run(); + } + + public static class ResourcesLLSearchResultShard extends LLSearchResultShard { + + private final List resources; + + public ResourcesLLSearchResultShard(Flux resultsFlux, + TotalHitsCount count, + List resources) { + super(resultsFlux, count); + this.resources = resources; + } + + @Override + public void onClose() { + try { + for (SafeCloseable resource : resources) { + try { + resource.close(); + } catch (Throwable ex) { + LOG.error("Failed to close resource", ex); + } + } + } catch (Throwable ex) { + LOG.error("Failed to close resources", ex); } - } catch (Throwable ex) { - logger.error("Failed to close onClose", ex); + super.onClose(); + } + } + + public static class LuceneLLSearchResultShard extends LLSearchResultShard implements LuceneCloseable { + + private final List resources; + + public LuceneLLSearchResultShard(Flux resultsFlux, + TotalHitsCount count, + List resources) { + super(resultsFlux, count); + this.resources = resources; + } + + @Override + public void onClose() { + try { + for (LuceneCloseable resource : resources) { + try { + resource.close(); + } catch (Throwable ex) { + LOG.error("Failed to close resource", ex); + } + } + } catch (Throwable ex) { + LOG.error("Failed to close resources", ex); + } + super.onClose(); } } } diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 43fd953..435288e 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -23,6 +23,7 @@ import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.lucene.LuceneCloseable; +import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.RandomSortField; import it.cavallium.dbengine.utils.SimpleResource; import java.io.Closeable; @@ -652,7 +653,7 @@ public class LLUtils { public static Mono finalizeResource(SafeCloseable resource) { Mono runnable = Mono.fromRunnable(resource::close); if (resource instanceof LuceneCloseable) { - return runnable.subscribeOn(luceneScheduler()); + return runnable.transform(LuceneUtils::scheduleLucene); } else { return runnable; } @@ -683,8 +684,9 @@ public class LLUtils { */ public static Mono singleOrClose(Mono resourceMono, Function> closure) { - return Mono.usingWhen(resourceMono, - resource -> closure.apply(resource).doOnSuccess(s -> { + return Mono.usingWhen(resourceMono, resource -> { + if (resource instanceof LuceneCloseable) { + return closure.apply(resource).publishOn(luceneScheduler()).doOnSuccess(s -> { if (s == null) { try { resource.close(); @@ -692,17 +694,25 @@ public class LLUtils { throw new RuntimeException(e); } } - }), - resource -> Mono.empty(), - (resource, ex) -> Mono.fromCallable(() -> { - resource.close(); - return null; - }), - resource -> Mono.fromCallable(() -> { - resource.close(); - return null; - }) - ); + }).publishOn(Schedulers.parallel()); + } else { + return closure.apply(resource).doOnSuccess(s -> { + if (s == null) { + try { + resource.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + } + }, resource -> Mono.empty(), (resource, ex) -> Mono.fromCallable(() -> { + resource.close(); + return null; + }), r -> (r instanceof SafeCloseable s) ? LLUtils.finalizeResource(s) : Mono.fromCallable(() -> { + r.close(); + return null; + })); } @Deprecated diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 6b23566..706f1f2 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -500,7 +500,7 @@ public class LLLocalLuceneIndex extends SimpleResource implements LLLuceneIndex, return localSearcher .collect(searcher, localQueryParams, keyFieldName, transformer) - .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close)) + .map(result -> LLSearchResultShard.withResource(result.results(), result.totalHitsCount(), result)) .flux(); } @@ -508,7 +508,7 @@ public class LLLocalLuceneIndex extends SimpleResource implements LLLuceneIndex, public Flux search(@Nullable LLSnapshot snapshot, QueryParams queryParams, @Nullable String keyFieldName) { return searchInternal(snapshot, queryParams, keyFieldName) - .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close)) + .map(result -> LLSearchResultShard.withResource(result.results(), result.totalHitsCount(), result)) .flux(); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index 3b8e1ba..030f712 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -286,7 +286,7 @@ public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneI return multiSearcher .collectMulti(searchers, localQueryParams, keyFieldName, transformer) // Transform the result type - .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close)) + .map(result -> LLSearchResultShard.withResource(result.results(), result.totalHitsCount(), result)) .flux(); } @@ -296,7 +296,7 @@ public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneI @Nullable String keyFieldName) { return searchInternal(snapshot, queryParams, keyFieldName) // Transform the result type - .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close)) + .map(result -> LLSearchResultShard.withResource(result.results(), result.totalHitsCount(), result)) .flux(); }