diff --git a/src/main/java/it/cavallium/dbengine/client/Hits.java b/src/main/java/it/cavallium/dbengine/client/Hits.java index b88c9be..532bd9f 100644 --- a/src/main/java/it/cavallium/dbengine/client/Hits.java +++ b/src/main/java/it/cavallium/dbengine/client/Hits.java @@ -7,6 +7,7 @@ import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import io.netty5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.database.collections.ValueGetter; import it.cavallium.dbengine.database.collections.ValueTransformer; +import it.cavallium.dbengine.utils.SimpleResource; import java.util.Map.Entry; import java.util.Optional; import java.util.function.Function; @@ -14,34 +15,12 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuples; -public final class Hits extends ResourceSupport, Hits> { - - private static final Drop> DROP = new Drop<>() { - @Override - public void drop(Hits obj) { - if (obj.onClose != null) { - obj.onClose.run(); - } - } - - @Override - public Drop> fork() { - return this; - } - - @Override - public void attach(Hits obj) { - - } - }; - +public final class Hits extends SimpleResource { private Flux results; private TotalHitsCount totalHitsCount; private Runnable onClose; - @SuppressWarnings({"unchecked", "rawtypes"}) public Hits(Flux results, TotalHitsCount totalHitsCount, Runnable onClose) { - super((Drop>) (Drop) DROP); this.results = results; this.totalHitsCount = totalHitsCount; this.onClose = onClose; @@ -102,10 +81,12 @@ public final class Hits extends ResourceSupport, Hits> { } public Flux results() { + ensureOpen(); return results; } public TotalHitsCount totalHitsCount() { + ensureOpen(); return totalHitsCount; } @@ -115,25 +96,9 @@ public final class Hits extends ResourceSupport, Hits> { } @Override - protected RuntimeException createResourceClosedException() { - return new IllegalStateException("Closed"); - } - - @Override - protected Owned> prepareSend() { - var results = this.results; - var totalHitsCount = this.totalHitsCount; - var onClose = this.onClose; - return drop -> { - var instance = new Hits<>(results, totalHitsCount, onClose); - drop.attach(instance); - return instance; - }; - } - - protected void makeInaccessible() { - this.results = null; - this.totalHitsCount = null; - this.onClose = null; + protected void onClose() { + if (onClose != null) { + onClose.run(); + } } } diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java index 8a374f6..3ce6c54 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java @@ -15,6 +15,7 @@ import it.cavallium.dbengine.database.LLUpdateDocument; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.collector.Buckets; import it.cavallium.dbengine.lucene.searcher.BucketParams; +import it.cavallium.dbengine.utils.SimpleResource; import it.unimi.dsi.fastutil.objects.ObjectArrayList; import java.time.Duration; import java.util.ArrayList; @@ -123,8 +124,8 @@ public class LuceneIndexImpl implements LuceneIndex { .flatMap(shards -> mergeResults(queryParams, shards)) .map(this::mapResults) .single() - .doOnDiscard(LLSearchResultShard.class, ResourceSupport::close) - .doOnDiscard(Hits.class, ResourceSupport::close); + .doOnDiscard(LLSearchResultShard.class, SimpleResource::close) + .doOnDiscard(Hits.class, SimpleResource::close); } @Override @@ -207,7 +208,7 @@ public class LuceneIndexImpl implements LuceneIndex { return Mono.fromCallable(() -> { TotalHitsCount count = null; ObjectArrayList> results = new ObjectArrayList<>(shards.size()); - ObjectArrayList> resources = new ObjectArrayList<>(shards.size()); + ObjectArrayList resources = new ObjectArrayList<>(shards.size()); for (LLSearchResultShard shard : shards) { if (count == null) { count = shard.totalHitsCount(); @@ -228,7 +229,7 @@ public class LuceneIndexImpl implements LuceneIndex { resultsFlux = Flux.merge(results); } return new LLSearchResultShard(resultsFlux, count, () -> { - for (Resource resource : resources) { + for (var resource : resources) { resource.close(); } }); diff --git a/src/main/java/it/cavallium/dbengine/database/LLSearchResultShard.java b/src/main/java/it/cavallium/dbengine/database/LLSearchResultShard.java index f8af0a3..8d21b62 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLSearchResultShard.java +++ b/src/main/java/it/cavallium/dbengine/database/LLSearchResultShard.java @@ -4,60 +4,33 @@ import io.netty5.buffer.api.Drop; import io.netty5.buffer.api.Owned; import io.netty5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; +import it.cavallium.dbengine.utils.SimpleResource; import java.util.Objects; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import reactor.core.publisher.Flux; -public final class LLSearchResultShard extends ResourceSupport { +public final class LLSearchResultShard extends SimpleResource { private static final Logger logger = LogManager.getLogger(LLSearchResultShard.class); - private static final Drop DROP = new Drop<>() { - @Override - public void drop(LLSearchResultShard obj) { - try { - if (obj.onClose != null) { - obj.onClose.run(); - } - } catch (Throwable ex) { - logger.error("Failed to close onClose", ex); - } - } - - @Override - public Drop fork() { - return this; - } - - @Override - public void attach(LLSearchResultShard obj) { - - } - }; - - private Flux results; - private TotalHitsCount totalHitsCount; - private Runnable onClose; + private final Flux results; + private final TotalHitsCount totalHitsCount; + private final Runnable onClose; public LLSearchResultShard(Flux results, TotalHitsCount totalHitsCount, Runnable onClose) { - super(DROP); this.results = results; this.totalHitsCount = totalHitsCount; this.onClose = onClose; } public Flux results() { - if (!isOwned()) { - throw attachTrace(new IllegalStateException("LLSearchResultShard must be owned to be used")); - } + ensureOpen(); return results; } public TotalHitsCount totalHitsCount() { - if (!isOwned()) { - throw attachTrace(new IllegalStateException("LLSearchResultShard must be owned to be used")); - } + ensureOpen(); return totalHitsCount; } @@ -82,21 +55,14 @@ public final class LLSearchResultShard extends ResourceSupport prepareSend() { - var results = this.results; - var totalHitsCount = this.totalHitsCount; - var onClose = this.onClose; - return drop -> new LLSearchResultShard(results, totalHitsCount, onClose); - } - - protected void makeInaccessible() { - this.results = null; - this.totalHitsCount = null; - this.onClose = null; + public void onClose() { + try { + var onClose = this.onClose; + if (onClose != null) { + onClose.run(); + } + } catch (Throwable ex) { + logger.error("Failed to close onClose", ex); + } } } diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index f5dbeb9..7026a67 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -22,6 +22,7 @@ import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.lucene.RandomSortField; +import it.cavallium.dbengine.utils.SimpleResource; import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; @@ -645,6 +646,10 @@ public class LLUtils { return Mono.fromRunnable(() -> LLUtils.closeResource(resource)); } + public static Mono finalizeResource(SimpleResource resource) { + return Mono.fromRunnable(() -> LLUtils.closeResource(resource)); + } + public static Flux handleDiscard(Flux flux) { return flux.doOnDiscard(Object.class, LLUtils::onDiscard); } @@ -917,6 +922,8 @@ public class LLUtils { public static void closeResource(Object next) { if (next instanceof Send send) { send.close(); + } if (next instanceof SimpleResource simpleResource) { + simpleResource.close(); } else if (next instanceof Resource resource && resource.isAccessible()) { resource.close(); } else if (next instanceof Iterable iterable) { @@ -927,8 +934,6 @@ public class LLUtils { if (rocksObj.isOwningHandle()) { rocksObj.close(); } - } else if (next instanceof Hits hits) { - hits.close(); } else if (next instanceof LLIndexSearcher searcher) { try { searcher.close(); @@ -941,8 +946,6 @@ public class LLUtils { } catch (IOException e) { logger.error("Failed to close searchers {}", searchers, e); } - } else if (next instanceof LLSearchResultShard shard) { - shard.close(); } else if (next instanceof Optional optional) { optional.ifPresent(LLUtils::onNextDropped); } else if (next instanceof Map.Entry entry) { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java index 1151e02..55817db 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java @@ -9,6 +9,7 @@ import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearchers; +import it.cavallium.dbengine.utils.SimpleResource; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -73,11 +74,7 @@ public class CountMultiSearcher implements MultiSearcher { .take(queryParams2.limitLong(), true); return new LuceneSearchResult(totalHitsCount, mergedFluxes, () -> { - for (LuceneSearchResult luceneSearchResult : resultsToDrop) { - if (luceneSearchResult.isAccessible()) { - luceneSearchResult.close(); - } - } + resultsToDrop.forEach(SimpleResource::close); try { indexSearchers.close(); } catch (IOException e) { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneSearchResult.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneSearchResult.java index 4a23092..b0d8bef 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneSearchResult.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneSearchResult.java @@ -5,60 +5,33 @@ import io.netty5.buffer.api.Owned; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLKeyScore; import io.netty5.buffer.api.internal.ResourceSupport; +import it.cavallium.dbengine.utils.SimpleResource; import java.util.Objects; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import reactor.core.publisher.Flux; -public final class LuceneSearchResult extends ResourceSupport { +public final class LuceneSearchResult extends SimpleResource { private static final Logger logger = LogManager.getLogger(LuceneSearchResult.class); - private static final Drop DROP = new Drop<>() { - @Override - public void drop(LuceneSearchResult obj) { - try { - if (obj.onClose != null) { - obj.onClose.run(); - } - } catch (Throwable ex) { - logger.error("Failed to close onClose", ex); - } - } - - @Override - public Drop fork() { - return this; - } - - @Override - public void attach(LuceneSearchResult obj) { - - } - }; - private TotalHitsCount totalHitsCount; private Flux results; private Runnable onClose; public LuceneSearchResult(TotalHitsCount totalHitsCount, Flux results, Runnable onClose) { - super(DROP); this.totalHitsCount = totalHitsCount; this.results = results; this.onClose = onClose; } public TotalHitsCount totalHitsCount() { - if (!isOwned()) { - throw attachTrace(new IllegalStateException("LuceneSearchResult must be owned to be used")); - } + ensureOpen(); return totalHitsCount; } public Flux results() { - if (!isOwned()) { - throw attachTrace(new IllegalStateException("LuceneSearchResult must be owned to be used")); - } + ensureOpen(); return results; } @@ -83,26 +56,13 @@ public final class LuceneSearchResult extends ResourceSupport prepareSend() { - var totalHitsCount = this.totalHitsCount; - var results = this.results; - var onClose = this.onClose; - return drop -> { - var instance = new LuceneSearchResult(totalHitsCount, results, onClose); - drop.attach(instance); - return instance; - }; - } - - protected void makeInaccessible() { - this.totalHitsCount = null; - this.results = null; - this.onClose = null; - } - } diff --git a/src/main/java/it/cavallium/dbengine/utils/SimpleResource.java b/src/main/java/it/cavallium/dbengine/utils/SimpleResource.java new file mode 100644 index 0000000..249254e --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/utils/SimpleResource.java @@ -0,0 +1,28 @@ +package it.cavallium.dbengine.utils; + +import it.cavallium.dbengine.database.SafeCloseable; +import java.util.concurrent.atomic.AtomicBoolean; + +public abstract class SimpleResource implements SafeCloseable { + + private final AtomicBoolean closed = new AtomicBoolean(); + + @Override + public final void close() { + if (closed.compareAndSet(false, true)) { + onClose(); + } + } + + private boolean isClosed() { + return closed.get(); + } + + protected void ensureOpen() { + if (closed.get()) { + throw new IllegalStateException("Resource is closed"); + } + } + + protected abstract void onClose(); +} diff --git a/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java b/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java index e4eb58f..11e26f3 100644 --- a/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java +++ b/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java @@ -14,6 +14,7 @@ import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; import it.cavallium.dbengine.lucene.searcher.LocalSearcher; import it.cavallium.dbengine.lucene.searcher.LuceneSearchResult; import it.cavallium.dbengine.lucene.searcher.MultiSearcher; +import it.cavallium.dbengine.utils.SimpleResource; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -75,11 +76,7 @@ public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher { .take(queryParams2.limitLong(), true); return new LuceneSearchResult(totalHitsCount, mergedFluxes, () -> { - for (LuceneSearchResult luceneSearchResult : resultsToDrop) { - if (luceneSearchResult.isAccessible()) { - luceneSearchResult.close(); - } - } + resultsToDrop.forEach(SimpleResource::close); try { indexSearchers.close(); } catch (IOException e) {