Use SimpleResource when possible

This commit is contained in:
Andrea Cavalli 2022-06-14 18:05:26 +02:00
parent fb0bd092a4
commit 0d830fbd21
8 changed files with 80 additions and 163 deletions

View File

@ -7,6 +7,7 @@ import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import io.netty5.buffer.api.internal.ResourceSupport; import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.database.collections.ValueGetter; import it.cavallium.dbengine.database.collections.ValueGetter;
import it.cavallium.dbengine.database.collections.ValueTransformer; import it.cavallium.dbengine.database.collections.ValueTransformer;
import it.cavallium.dbengine.utils.SimpleResource;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Optional; import java.util.Optional;
import java.util.function.Function; import java.util.function.Function;
@ -14,34 +15,12 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.util.function.Tuples; import reactor.util.function.Tuples;
public final class Hits<T> extends ResourceSupport<Hits<T>, Hits<T>> { public final class Hits<T> extends SimpleResource {
private static final Drop<Hits<?>> DROP = new Drop<>() {
@Override
public void drop(Hits<?> obj) {
if (obj.onClose != null) {
obj.onClose.run();
}
}
@Override
public Drop<Hits<?>> fork() {
return this;
}
@Override
public void attach(Hits<?> obj) {
}
};
private Flux<T> results; private Flux<T> results;
private TotalHitsCount totalHitsCount; private TotalHitsCount totalHitsCount;
private Runnable onClose; private Runnable onClose;
@SuppressWarnings({"unchecked", "rawtypes"})
public Hits(Flux<T> results, TotalHitsCount totalHitsCount, Runnable onClose) { public Hits(Flux<T> results, TotalHitsCount totalHitsCount, Runnable onClose) {
super((Drop<Hits<T>>) (Drop) DROP);
this.results = results; this.results = results;
this.totalHitsCount = totalHitsCount; this.totalHitsCount = totalHitsCount;
this.onClose = onClose; this.onClose = onClose;
@ -102,10 +81,12 @@ public final class Hits<T> extends ResourceSupport<Hits<T>, Hits<T>> {
} }
public Flux<T> results() { public Flux<T> results() {
ensureOpen();
return results; return results;
} }
public TotalHitsCount totalHitsCount() { public TotalHitsCount totalHitsCount() {
ensureOpen();
return totalHitsCount; return totalHitsCount;
} }
@ -115,25 +96,9 @@ public final class Hits<T> extends ResourceSupport<Hits<T>, Hits<T>> {
} }
@Override @Override
protected RuntimeException createResourceClosedException() { protected void onClose() {
return new IllegalStateException("Closed"); if (onClose != null) {
onClose.run();
} }
@Override
protected Owned<Hits<T>> prepareSend() {
var results = this.results;
var totalHitsCount = this.totalHitsCount;
var onClose = this.onClose;
return drop -> {
var instance = new Hits<>(results, totalHitsCount, onClose);
drop.attach(instance);
return instance;
};
}
protected void makeInaccessible() {
this.results = null;
this.totalHitsCount = null;
this.onClose = null;
} }
} }

View File

@ -15,6 +15,7 @@ import it.cavallium.dbengine.database.LLUpdateDocument;
import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.collector.Buckets; import it.cavallium.dbengine.lucene.collector.Buckets;
import it.cavallium.dbengine.lucene.searcher.BucketParams; import it.cavallium.dbengine.lucene.searcher.BucketParams;
import it.cavallium.dbengine.utils.SimpleResource;
import it.unimi.dsi.fastutil.objects.ObjectArrayList; import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
@ -123,8 +124,8 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
.flatMap(shards -> mergeResults(queryParams, shards)) .flatMap(shards -> mergeResults(queryParams, shards))
.map(this::mapResults) .map(this::mapResults)
.single() .single()
.doOnDiscard(LLSearchResultShard.class, ResourceSupport::close) .doOnDiscard(LLSearchResultShard.class, SimpleResource::close)
.doOnDiscard(Hits.class, ResourceSupport::close); .doOnDiscard(Hits.class, SimpleResource::close);
} }
@Override @Override
@ -207,7 +208,7 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
return Mono.fromCallable(() -> { return Mono.fromCallable(() -> {
TotalHitsCount count = null; TotalHitsCount count = null;
ObjectArrayList<Flux<LLKeyScore>> results = new ObjectArrayList<>(shards.size()); ObjectArrayList<Flux<LLKeyScore>> results = new ObjectArrayList<>(shards.size());
ObjectArrayList<Resource<?>> resources = new ObjectArrayList<>(shards.size()); ObjectArrayList<SimpleResource> resources = new ObjectArrayList<>(shards.size());
for (LLSearchResultShard shard : shards) { for (LLSearchResultShard shard : shards) {
if (count == null) { if (count == null) {
count = shard.totalHitsCount(); count = shard.totalHitsCount();
@ -228,7 +229,7 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
resultsFlux = Flux.merge(results); resultsFlux = Flux.merge(results);
} }
return new LLSearchResultShard(resultsFlux, count, () -> { return new LLSearchResultShard(resultsFlux, count, () -> {
for (Resource<?> resource : resources) { for (var resource : resources) {
resource.close(); resource.close();
} }
}); });

View File

@ -4,60 +4,33 @@ import io.netty5.buffer.api.Drop;
import io.netty5.buffer.api.Owned; import io.netty5.buffer.api.Owned;
import io.netty5.buffer.api.internal.ResourceSupport; import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.utils.SimpleResource;
import java.util.Objects; import java.util.Objects;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
public final class LLSearchResultShard extends ResourceSupport<LLSearchResultShard, LLSearchResultShard> { public final class LLSearchResultShard extends SimpleResource {
private static final Logger logger = LogManager.getLogger(LLSearchResultShard.class); private static final Logger logger = LogManager.getLogger(LLSearchResultShard.class);
private static final Drop<LLSearchResultShard> DROP = new Drop<>() { private final Flux<LLKeyScore> results;
@Override private final TotalHitsCount totalHitsCount;
public void drop(LLSearchResultShard obj) { private final Runnable onClose;
try {
if (obj.onClose != null) {
obj.onClose.run();
}
} catch (Throwable ex) {
logger.error("Failed to close onClose", ex);
}
}
@Override
public Drop<LLSearchResultShard> fork() {
return this;
}
@Override
public void attach(LLSearchResultShard obj) {
}
};
private Flux<LLKeyScore> results;
private TotalHitsCount totalHitsCount;
private Runnable onClose;
public LLSearchResultShard(Flux<LLKeyScore> results, TotalHitsCount totalHitsCount, Runnable onClose) { public LLSearchResultShard(Flux<LLKeyScore> results, TotalHitsCount totalHitsCount, Runnable onClose) {
super(DROP);
this.results = results; this.results = results;
this.totalHitsCount = totalHitsCount; this.totalHitsCount = totalHitsCount;
this.onClose = onClose; this.onClose = onClose;
} }
public Flux<LLKeyScore> results() { public Flux<LLKeyScore> results() {
if (!isOwned()) { ensureOpen();
throw attachTrace(new IllegalStateException("LLSearchResultShard must be owned to be used"));
}
return results; return results;
} }
public TotalHitsCount totalHitsCount() { public TotalHitsCount totalHitsCount() {
if (!isOwned()) { ensureOpen();
throw attachTrace(new IllegalStateException("LLSearchResultShard must be owned to be used"));
}
return totalHitsCount; return totalHitsCount;
} }
@ -82,21 +55,14 @@ public final class LLSearchResultShard extends ResourceSupport<LLSearchResultSha
} }
@Override @Override
protected RuntimeException createResourceClosedException() { public void onClose() {
return new IllegalStateException("Closed"); try {
}
@Override
protected Owned<LLSearchResultShard> prepareSend() {
var results = this.results;
var totalHitsCount = this.totalHitsCount;
var onClose = this.onClose; var onClose = this.onClose;
return drop -> new LLSearchResultShard(results, totalHitsCount, onClose); if (onClose != null) {
onClose.run();
}
} catch (Throwable ex) {
logger.error("Failed to close onClose", ex);
} }
protected void makeInaccessible() {
this.results = null;
this.totalHitsCount = null;
this.onClose = null;
} }
} }

View File

@ -22,6 +22,7 @@ import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.lucene.RandomSortField; import it.cavallium.dbengine.lucene.RandomSortField;
import it.cavallium.dbengine.utils.SimpleResource;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -645,6 +646,10 @@ public class LLUtils {
return Mono.fromRunnable(() -> LLUtils.closeResource(resource)); return Mono.fromRunnable(() -> LLUtils.closeResource(resource));
} }
public static Mono<Void> finalizeResource(SimpleResource resource) {
return Mono.fromRunnable(() -> LLUtils.closeResource(resource));
}
public static <V> Flux<V> handleDiscard(Flux<V> flux) { public static <V> Flux<V> handleDiscard(Flux<V> flux) {
return flux.doOnDiscard(Object.class, LLUtils::onDiscard); return flux.doOnDiscard(Object.class, LLUtils::onDiscard);
} }
@ -917,6 +922,8 @@ public class LLUtils {
public static void closeResource(Object next) { public static void closeResource(Object next) {
if (next instanceof Send<?> send) { if (next instanceof Send<?> send) {
send.close(); send.close();
} if (next instanceof SimpleResource simpleResource) {
simpleResource.close();
} else if (next instanceof Resource<?> resource && resource.isAccessible()) { } else if (next instanceof Resource<?> resource && resource.isAccessible()) {
resource.close(); resource.close();
} else if (next instanceof Iterable<?> iterable) { } else if (next instanceof Iterable<?> iterable) {
@ -927,8 +934,6 @@ public class LLUtils {
if (rocksObj.isOwningHandle()) { if (rocksObj.isOwningHandle()) {
rocksObj.close(); rocksObj.close();
} }
} else if (next instanceof Hits<?> hits) {
hits.close();
} else if (next instanceof LLIndexSearcher searcher) { } else if (next instanceof LLIndexSearcher searcher) {
try { try {
searcher.close(); searcher.close();
@ -941,8 +946,6 @@ public class LLUtils {
} catch (IOException e) { } catch (IOException e) {
logger.error("Failed to close searchers {}", searchers, e); logger.error("Failed to close searchers {}", searchers, e);
} }
} else if (next instanceof LLSearchResultShard shard) {
shard.close();
} else if (next instanceof Optional<?> optional) { } else if (next instanceof Optional<?> optional) {
optional.ifPresent(LLUtils::onNextDropped); optional.ifPresent(LLUtils::onNextDropped);
} else if (next instanceof Map.Entry<?, ?> entry) { } else if (next instanceof Map.Entry<?, ?> entry) {

View File

@ -9,6 +9,7 @@ import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.utils.SimpleResource;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -73,11 +74,7 @@ public class CountMultiSearcher implements MultiSearcher {
.take(queryParams2.limitLong(), true); .take(queryParams2.limitLong(), true);
return new LuceneSearchResult(totalHitsCount, mergedFluxes, () -> { return new LuceneSearchResult(totalHitsCount, mergedFluxes, () -> {
for (LuceneSearchResult luceneSearchResult : resultsToDrop) { resultsToDrop.forEach(SimpleResource::close);
if (luceneSearchResult.isAccessible()) {
luceneSearchResult.close();
}
}
try { try {
indexSearchers.close(); indexSearchers.close();
} catch (IOException e) { } catch (IOException e) {

View File

@ -5,60 +5,33 @@ import io.netty5.buffer.api.Owned;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLKeyScore;
import io.netty5.buffer.api.internal.ResourceSupport; import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.utils.SimpleResource;
import java.util.Objects; import java.util.Objects;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
public final class LuceneSearchResult extends ResourceSupport<LuceneSearchResult, LuceneSearchResult> { public final class LuceneSearchResult extends SimpleResource {
private static final Logger logger = LogManager.getLogger(LuceneSearchResult.class); private static final Logger logger = LogManager.getLogger(LuceneSearchResult.class);
private static final Drop<LuceneSearchResult> DROP = new Drop<>() {
@Override
public void drop(LuceneSearchResult obj) {
try {
if (obj.onClose != null) {
obj.onClose.run();
}
} catch (Throwable ex) {
logger.error("Failed to close onClose", ex);
}
}
@Override
public Drop<LuceneSearchResult> fork() {
return this;
}
@Override
public void attach(LuceneSearchResult obj) {
}
};
private TotalHitsCount totalHitsCount; private TotalHitsCount totalHitsCount;
private Flux<LLKeyScore> results; private Flux<LLKeyScore> results;
private Runnable onClose; private Runnable onClose;
public LuceneSearchResult(TotalHitsCount totalHitsCount, Flux<LLKeyScore> results, Runnable onClose) { public LuceneSearchResult(TotalHitsCount totalHitsCount, Flux<LLKeyScore> results, Runnable onClose) {
super(DROP);
this.totalHitsCount = totalHitsCount; this.totalHitsCount = totalHitsCount;
this.results = results; this.results = results;
this.onClose = onClose; this.onClose = onClose;
} }
public TotalHitsCount totalHitsCount() { public TotalHitsCount totalHitsCount() {
if (!isOwned()) { ensureOpen();
throw attachTrace(new IllegalStateException("LuceneSearchResult must be owned to be used"));
}
return totalHitsCount; return totalHitsCount;
} }
public Flux<LLKeyScore> results() { public Flux<LLKeyScore> results() {
if (!isOwned()) { ensureOpen();
throw attachTrace(new IllegalStateException("LuceneSearchResult must be owned to be used"));
}
return results; return results;
} }
@ -83,26 +56,13 @@ public final class LuceneSearchResult extends ResourceSupport<LuceneSearchResult
} }
@Override @Override
protected RuntimeException createResourceClosedException() { protected void onClose() {
return new IllegalStateException("Closed"); try {
if (onClose != null) {
onClose.run();
} }
} catch (Throwable ex) {
@Override logger.error("Failed to close onClose", ex);
protected Owned<LuceneSearchResult> prepareSend() {
var totalHitsCount = this.totalHitsCount;
var results = this.results;
var onClose = this.onClose;
return drop -> {
var instance = new LuceneSearchResult(totalHitsCount, results, onClose);
drop.attach(instance);
return instance;
};
} }
protected void makeInaccessible() {
this.totalHitsCount = null;
this.results = null;
this.onClose = null;
} }
} }

View File

@ -0,0 +1,28 @@
package it.cavallium.dbengine.utils;
import it.cavallium.dbengine.database.SafeCloseable;
import java.util.concurrent.atomic.AtomicBoolean;
public abstract class SimpleResource implements SafeCloseable {
private final AtomicBoolean closed = new AtomicBoolean();
@Override
public final void close() {
if (closed.compareAndSet(false, true)) {
onClose();
}
}
private boolean isClosed() {
return closed.get();
}
protected void ensureOpen() {
if (closed.get()) {
throw new IllegalStateException("Resource is closed");
}
}
protected abstract void onClose();
}

View File

@ -14,6 +14,7 @@ import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
import it.cavallium.dbengine.lucene.searcher.LocalSearcher; import it.cavallium.dbengine.lucene.searcher.LocalSearcher;
import it.cavallium.dbengine.lucene.searcher.LuceneSearchResult; import it.cavallium.dbengine.lucene.searcher.LuceneSearchResult;
import it.cavallium.dbengine.lucene.searcher.MultiSearcher; import it.cavallium.dbengine.lucene.searcher.MultiSearcher;
import it.cavallium.dbengine.utils.SimpleResource;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -75,11 +76,7 @@ public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher {
.take(queryParams2.limitLong(), true); .take(queryParams2.limitLong(), true);
return new LuceneSearchResult(totalHitsCount, mergedFluxes, () -> { return new LuceneSearchResult(totalHitsCount, mergedFluxes, () -> {
for (LuceneSearchResult luceneSearchResult : resultsToDrop) { resultsToDrop.forEach(SimpleResource::close);
if (luceneSearchResult.isAccessible()) {
luceneSearchResult.close();
}
}
try { try {
indexSearchers.close(); indexSearchers.close();
} catch (IOException e) { } catch (IOException e) {