Schedule correctly lucene closeables

This commit is contained in:
Andrea Cavalli 2022-07-23 14:25:59 +02:00
parent a4a8926e02
commit b9ffa1dd49
6 changed files with 198 additions and 69 deletions

View File

@ -7,6 +7,7 @@ import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.database.DiscardingCloseable;
import it.cavallium.dbengine.database.collections.ValueGetter;
import it.cavallium.dbengine.database.collections.ValueTransformer;
import it.cavallium.dbengine.lucene.LuceneCloseable;
import it.cavallium.dbengine.utils.SimpleResource;
import java.util.Map.Entry;
import java.util.Optional;
@ -15,22 +16,20 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;
public final class Hits<T> extends SimpleResource implements DiscardingCloseable {
public class Hits<T> extends SimpleResource implements DiscardingCloseable {
private static final Hits<?> EMPTY_HITS = new Hits<>(Flux.empty(), TotalHitsCount.of(0, true), null, false);
private Flux<T> results;
private TotalHitsCount totalHitsCount;
private Runnable onClose;
private static final Hits<?> EMPTY_HITS = new Hits<>(Flux.empty(), TotalHitsCount.of(0, true), false);
private final Flux<T> results;
private final TotalHitsCount totalHitsCount;
public Hits(Flux<T> results, TotalHitsCount totalHitsCount, Runnable onClose) {
this(results, totalHitsCount, onClose, true);
public Hits(Flux<T> results, TotalHitsCount totalHitsCount) {
this(results, totalHitsCount, true);
}
private Hits(Flux<T> results, TotalHitsCount totalHitsCount, Runnable onClose, boolean canClose) {
super(canClose && onClose != null);
private Hits(Flux<T> results, TotalHitsCount totalHitsCount, boolean canClose) {
super(canClose);
this.results = results;
this.totalHitsCount = totalHitsCount;
this.onClose = onClose;
}
@SuppressWarnings("unchecked")
@ -38,25 +37,12 @@ public final class Hits<T> extends SimpleResource implements DiscardingCloseable
return (Hits<T>) EMPTY_HITS;
}
public static <K, V> Hits<LazyHitEntry<K, V>> withValuesLazy(Hits<LazyHitKey<K>> hits,
ValueGetter<K, V> valuesGetter) {
var hitsEntry = hits.results().map(hitKey -> hitKey.withValue(valuesGetter::get));
return new Hits<>(hitsEntry, hits.totalHitsCount, hits::close);
}
public static <K, V> Hits<HitEntry<K, V>> withValues(Hits<HitKey<K>> hits, ValueGetter<K, V> valuesGetter) {
var hitsEntry = hits.results().flatMap(hitKey -> hitKey.withValue(valuesGetter::get));
return new Hits<>(hitsEntry, hits.totalHitsCount, hits::close);
}
public static <T, U> Function<Hits<HitKey<T>>, Hits<LazyHitEntry<T, U>>> generateMapper(
ValueGetter<T, U> valueGetter) {
return result -> {
var hitsToTransform = result.results()
.map(hit -> new LazyHitEntry<>(Mono.just(hit.key()), valueGetter.get(hit.key()), hit.score()));
return new Hits<>(hitsToTransform, result.totalHitsCount(), result::close);
return new MappedHits<>(hitsToTransform, result.totalHitsCount(), result);
};
}
@ -80,7 +66,7 @@ public final class Hits<T> extends SimpleResource implements DiscardingCloseable
return new LazyHitEntry<>(keyMono, valMono, score);
}, keysFlux, valuesFlux, scoresFlux);
return new Hits<>(transformedFlux, result.totalHitsCount(), result::close);
return new MappedHits<>(transformedFlux, result.totalHitsCount(), result);
} catch (Throwable t) {
result.close();
throw t;
@ -105,8 +91,30 @@ public final class Hits<T> extends SimpleResource implements DiscardingCloseable
@Override
protected void onClose() {
if (onClose != null) {
onClose.run();
}
private static sealed class MappedHits<U> extends Hits<U> {
private final Hits<?> parent;
public MappedHits(Flux<U> hits,
TotalHitsCount count,
Hits<?> parent) {
super(hits, count);
this.parent = parent;
}
@Override
protected void onClose() {
parent.close();
super.onClose();
}
}
private static final class MappedLuceneHits<U> extends MappedHits<U> implements LuceneCloseable {
public MappedLuceneHits(Flux<U> hits, TotalHitsCount count, Hits<?> parent) {
super(hits, count, parent);
}
}
}

View File

@ -1,32 +1,32 @@
package it.cavallium.dbengine.client;
import io.netty5.util.Resource;
import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.client.query.ClientQueryParams;
import it.cavallium.dbengine.client.query.current.data.Query;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.DiscardingCloseable;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLSearchResult;
import it.cavallium.dbengine.database.LLSearchResultShard;
import it.cavallium.dbengine.database.LLSearchResultShard.LuceneLLSearchResultShard;
import it.cavallium.dbengine.database.LLSearchResultShard.ResourcesLLSearchResultShard;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.database.LLUpdateDocument;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.lucene.LuceneCloseable;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.collector.Buckets;
import it.cavallium.dbengine.lucene.searcher.BucketParams;
import it.cavallium.dbengine.utils.SimpleResource;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.logging.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
@ -36,6 +36,7 @@ import reactor.core.publisher.SignalType;
public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
private static final Duration MAX_COUNT_TIME = Duration.ofSeconds(30);
private static final Logger LOG = LogManager.getLogger(LuceneIndex.class);
private final LLLuceneIndex luceneIndex;
private final Indicizer<T,U> indicizer;
@ -142,10 +143,14 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
}
private Hits<HitKey<T>> mapResults(LLSearchResultShard llSearchResult) {
var scoresWithKeysFlux = llSearchResult.results()
Flux<HitKey<T>> scoresWithKeysFlux = llSearchResult.results()
.map(hit -> new HitKey<>(indicizer.getKey(hit.key()), hit.score()));
return new Hits<>(scoresWithKeysFlux, llSearchResult.totalHitsCount(), llSearchResult::close);
if (llSearchResult instanceof LuceneCloseable luceneCloseable) {
return new LuceneHits<>(scoresWithKeysFlux, llSearchResult.totalHitsCount(), luceneCloseable);
} else {
return new CloseableHits<>(scoresWithKeysFlux, llSearchResult.totalHitsCount(), llSearchResult);
}
}
@Override
@ -201,6 +206,7 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
return luceneIndex.releaseSnapshot(snapshot);
}
@SuppressWarnings({"unchecked", "rawtypes"})
@Nullable
private static LLSearchResultShard mergeResults(ClientQueryParams queryParams, List<LLSearchResultShard> shards) {
if (shards.size() == 0) {
@ -210,8 +216,12 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
}
TotalHitsCount count = null;
ObjectArrayList<Flux<LLKeyScore>> results = new ObjectArrayList<>(shards.size());
ObjectArrayList<SimpleResource> resources = new ObjectArrayList<>(shards.size());
ObjectArrayList resources = new ObjectArrayList(shards.size());
boolean luceneResources = false;
for (LLSearchResultShard shard : shards) {
if (!luceneResources && shard instanceof LuceneCloseable) {
luceneResources = true;
}
if (count == null) {
count = shard.totalHitsCount();
} else {
@ -230,10 +240,51 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
} else {
resultsFlux = Flux.merge(results);
}
return new LLSearchResultShard(resultsFlux, count, () -> {
for (var resource : resources) {
resource.close();
}
});
if (luceneResources) {
return new LuceneLLSearchResultShard(resultsFlux, count, (List<LuceneCloseable>) resources);
} else {
return new ResourcesLLSearchResultShard(resultsFlux, count, (List<SafeCloseable>) resources);
}
}
private static final class LuceneHits<U> extends Hits<U> implements LuceneCloseable {
private final LuceneCloseable resource;
public LuceneHits(Flux<U> hits, TotalHitsCount count, LuceneCloseable resource) {
super(hits, count);
this.resource = resource;
}
@Override
protected void onClose() {
try {
resource.close();
} catch (Throwable ex) {
LOG.error("Failed to close resource", ex);
}
super.onClose();
}
}
private static final class CloseableHits<U> extends Hits<U> {
private final SafeCloseable resource;
public CloseableHits(Flux<U> hits, TotalHitsCount count, SafeCloseable resource) {
super(hits, count);
this.resource = resource;
}
@Override
protected void onClose() {
try {
resource.close();
} catch (Throwable ex) {
LOG.error("Failed to close resource", ex);
}
super.onClose();
}
}
}

View File

@ -3,25 +3,37 @@ package it.cavallium.dbengine.database;
import io.netty5.buffer.api.Drop;
import io.netty5.buffer.api.Owned;
import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.client.LuceneIndexImpl;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.lucene.LuceneCloseable;
import it.cavallium.dbengine.utils.SimpleResource;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import reactor.core.publisher.Flux;
public final class LLSearchResultShard extends SimpleResource implements DiscardingCloseable {
public class LLSearchResultShard extends SimpleResource implements DiscardingCloseable {
private static final Logger logger = LogManager.getLogger(LLSearchResultShard.class);
private static final Logger LOG = LogManager.getLogger(LLSearchResultShard.class);
private final Flux<LLKeyScore> results;
private final TotalHitsCount totalHitsCount;
private final Runnable onClose;
public LLSearchResultShard(Flux<LLKeyScore> results, TotalHitsCount totalHitsCount, Runnable onClose) {
public LLSearchResultShard(Flux<LLKeyScore> results, TotalHitsCount totalHitsCount) {
this.results = results;
this.totalHitsCount = totalHitsCount;
this.onClose = onClose;
}
public static LLSearchResultShard withResource(Flux<LLKeyScore> results,
TotalHitsCount totalHitsCount,
SafeCloseable closeableResource) {
if (closeableResource instanceof LuceneCloseable luceneCloseable) {
return new LuceneLLSearchResultShard(results, totalHitsCount, List.of(luceneCloseable));
} else {
return new ResourcesLLSearchResultShard(results, totalHitsCount, List.of(closeableResource));
}
}
public Flux<LLKeyScore> results() {
@ -56,13 +68,61 @@ public final class LLSearchResultShard extends SimpleResource implements Discard
@Override
public void onClose() {
try {
var onClose = this.onClose;
if (onClose != null) {
onClose.run();
}
public static class ResourcesLLSearchResultShard extends LLSearchResultShard {
private final List<SafeCloseable> resources;
public ResourcesLLSearchResultShard(Flux<LLKeyScore> resultsFlux,
TotalHitsCount count,
List<SafeCloseable> resources) {
super(resultsFlux, count);
this.resources = resources;
}
@Override
public void onClose() {
try {
for (SafeCloseable resource : resources) {
try {
resource.close();
} catch (Throwable ex) {
LOG.error("Failed to close resource", ex);
}
}
} catch (Throwable ex) {
LOG.error("Failed to close resources", ex);
}
} catch (Throwable ex) {
logger.error("Failed to close onClose", ex);
super.onClose();
}
}
public static class LuceneLLSearchResultShard extends LLSearchResultShard implements LuceneCloseable {
private final List<LuceneCloseable> resources;
public LuceneLLSearchResultShard(Flux<LLKeyScore> resultsFlux,
TotalHitsCount count,
List<LuceneCloseable> resources) {
super(resultsFlux, count);
this.resources = resources;
}
@Override
public void onClose() {
try {
for (LuceneCloseable resource : resources) {
try {
resource.close();
} catch (Throwable ex) {
LOG.error("Failed to close resource", ex);
}
}
} catch (Throwable ex) {
LOG.error("Failed to close resources", ex);
}
super.onClose();
}
}
}

View File

@ -23,6 +23,7 @@ import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.lucene.LuceneCloseable;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.RandomSortField;
import it.cavallium.dbengine.utils.SimpleResource;
import java.io.Closeable;
@ -652,7 +653,7 @@ public class LLUtils {
public static Mono<Void> finalizeResource(SafeCloseable resource) {
Mono<Void> runnable = Mono.fromRunnable(resource::close);
if (resource instanceof LuceneCloseable) {
return runnable.subscribeOn(luceneScheduler());
return runnable.transform(LuceneUtils::scheduleLucene);
} else {
return runnable;
}
@ -683,8 +684,9 @@ public class LLUtils {
*/
public static <T extends AutoCloseable, U> Mono<U> singleOrClose(Mono<T> resourceMono,
Function<T, Mono<U>> closure) {
return Mono.usingWhen(resourceMono,
resource -> closure.apply(resource).doOnSuccess(s -> {
return Mono.usingWhen(resourceMono, resource -> {
if (resource instanceof LuceneCloseable) {
return closure.apply(resource).publishOn(luceneScheduler()).doOnSuccess(s -> {
if (s == null) {
try {
resource.close();
@ -692,17 +694,25 @@ public class LLUtils {
throw new RuntimeException(e);
}
}
}),
resource -> Mono.empty(),
(resource, ex) -> Mono.fromCallable(() -> {
resource.close();
return null;
}),
resource -> Mono.fromCallable(() -> {
resource.close();
return null;
})
);
}).publishOn(Schedulers.parallel());
} else {
return closure.apply(resource).doOnSuccess(s -> {
if (s == null) {
try {
resource.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
}
}, resource -> Mono.empty(), (resource, ex) -> Mono.fromCallable(() -> {
resource.close();
return null;
}), r -> (r instanceof SafeCloseable s) ? LLUtils.finalizeResource(s) : Mono.fromCallable(() -> {
r.close();
return null;
}));
}
@Deprecated

View File

@ -500,7 +500,7 @@ public class LLLocalLuceneIndex extends SimpleResource implements LLLuceneIndex,
return localSearcher
.collect(searcher, localQueryParams, keyFieldName, transformer)
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close))
.map(result -> LLSearchResultShard.withResource(result.results(), result.totalHitsCount(), result))
.flux();
}
@ -508,7 +508,7 @@ public class LLLocalLuceneIndex extends SimpleResource implements LLLuceneIndex,
public Flux<LLSearchResultShard> search(@Nullable LLSnapshot snapshot, QueryParams queryParams,
@Nullable String keyFieldName) {
return searchInternal(snapshot, queryParams, keyFieldName)
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close))
.map(result -> LLSearchResultShard.withResource(result.results(), result.totalHitsCount(), result))
.flux();
}

View File

@ -286,7 +286,7 @@ public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneI
return multiSearcher
.collectMulti(searchers, localQueryParams, keyFieldName, transformer)
// Transform the result type
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close))
.map(result -> LLSearchResultShard.withResource(result.results(), result.totalHitsCount(), result))
.flux();
}
@ -296,7 +296,7 @@ public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneI
@Nullable String keyFieldName) {
return searchInternal(snapshot, queryParams, keyFieldName)
// Transform the result type
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close))
.map(result -> LLSearchResultShard.withResource(result.results(), result.totalHitsCount(), result))
.flux();
}