Close LMDB databases after each full search

This commit is contained in:
Andrea Cavalli 2021-12-18 21:01:14 +01:00
parent 480ab77db8
commit eaef75a304
27 changed files with 359 additions and 105 deletions

View File

@ -76,4 +76,9 @@ public class LLTempLMDBEnv implements Closeable {
.map(Path::toFile)
.forEach(File::delete);
}
public int countUsedDbs() {
var freeIds = this.freeIds.cardinality();
return MAX_DATABASES - freeIds;
}
}

View File

@ -1,11 +1,12 @@
package it.cavallium.dbengine.lucene;
import it.cavallium.dbengine.database.SafeCloseable;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import org.jetbrains.annotations.NotNull;
public interface CloseableIterable<T> extends Iterable<T>, Closeable {
public interface CloseableIterable<T> extends Iterable<T>, SafeCloseable {
@Override
void close();

View File

@ -49,7 +49,7 @@ public class EmptyPriorityQueue<T> implements PriorityQueue<T> {
}
@Override
public void close() throws IOException {
public void close() {
}
}

View File

@ -4,6 +4,7 @@ import static it.cavallium.dbengine.lucene.LLDocElementScoreComparator.SCORE_DOC
import static org.apache.lucene.search.TotalHits.Relation.*;
import it.cavallium.dbengine.lucene.collector.FullFieldDocs;
import java.io.IOException;
import java.util.Comparator;
import org.apache.lucene.search.FieldComparator;
import org.apache.lucene.search.Sort;
@ -31,6 +32,13 @@ public interface FullDocs<T extends LLDoc> extends ResourceIterable<T> {
ResourceIterable<T> mergedIterable = mergeResourceIterable(sort, fullDocs);
TotalHits mergedTotalHits = mergeTotalHits(fullDocs);
FullDocs<T> docs = new FullDocs<>() {
@Override
public void close() {
for (FullDocs<T> fullDoc : fullDocs) {
fullDoc.close();
}
}
@Override
public Flux<T> iterate() {
return mergedIterable.iterate();
@ -70,77 +78,85 @@ public interface FullDocs<T extends LLDoc> extends ResourceIterable<T> {
static <T extends LLDoc> ResourceIterable<T> mergeResourceIterable(
@Nullable Sort sort,
FullDocs<T>[] fullDocs) {
return () -> {
@SuppressWarnings("unchecked")
Flux<T>[] iterables = new Flux[fullDocs.length];
for (int i = 0; i < fullDocs.length; i++) {
var singleFullDocs = fullDocs[i].iterate();
iterables[i] = singleFullDocs;
return new ResourceIterable<T>() {
@Override
public void close() {
for (FullDocs<T> fullDoc : fullDocs) {
fullDoc.close();
}
}
Comparator<LLDoc> comp;
if (sort == null) {
// Merge maintaining sorting order (Algorithm taken from TopDocs.ScoreMergeSortQueue)
@Override
public Flux<T> iterate() {
@SuppressWarnings("unchecked") Flux<T>[] iterables = new Flux[fullDocs.length];
comp = SCORE_DOC_SCORE_ELEM_COMPARATOR.thenComparing(DEFAULT_TIE_BREAKER);
} else {
// Merge maintaining sorting order (Algorithm taken from TopDocs.MergeSortQueue)
SortField[] sortFields = sort.getSort();
var comparators = new FieldComparator[sortFields.length];
var reverseMul = new int[sortFields.length];
for(int compIDX = 0; compIDX < sortFields.length; ++compIDX) {
SortField sortField = sortFields[compIDX];
comparators[compIDX] = sortField.getComparator(1, compIDX);
reverseMul[compIDX] = sortField.getReverse() ? -1 : 1;
for (int i = 0; i < fullDocs.length; i++) {
var singleFullDocs = fullDocs[i].iterate();
iterables[i] = singleFullDocs;
}
comp = (first, second) -> {
assert first != second;
Comparator<LLDoc> comp;
if (sort == null) {
// Merge maintaining sorting order (Algorithm taken from TopDocs.ScoreMergeSortQueue)
LLFieldDoc firstFD = (LLFieldDoc) first;
LLFieldDoc secondFD = (LLFieldDoc) second;
comp = SCORE_DOC_SCORE_ELEM_COMPARATOR.thenComparing(DEFAULT_TIE_BREAKER);
} else {
// Merge maintaining sorting order (Algorithm taken from TopDocs.MergeSortQueue)
for(int compIDX = 0; compIDX < comparators.length; ++compIDX) {
//noinspection rawtypes
FieldComparator fieldComp = comparators[compIDX];
//noinspection unchecked
int cmp = reverseMul[compIDX] * fieldComp.compareValues(firstFD.fields().get(compIDX), secondFD.fields().get(compIDX));
if (cmp != 0) {
return cmp;
SortField[] sortFields = sort.getSort();
var comparators = new FieldComparator[sortFields.length];
var reverseMul = new int[sortFields.length];
for (int compIDX = 0; compIDX < sortFields.length; ++compIDX) {
SortField sortField = sortFields[compIDX];
comparators[compIDX] = sortField.getComparator(1, compIDX);
reverseMul[compIDX] = sortField.getReverse() ? -1 : 1;
}
comp = (first, second) -> {
assert first != second;
LLFieldDoc firstFD = (LLFieldDoc) first;
LLFieldDoc secondFD = (LLFieldDoc) second;
for (int compIDX = 0; compIDX < comparators.length; ++compIDX) {
//noinspection rawtypes
FieldComparator fieldComp = comparators[compIDX];
//noinspection unchecked
int cmp = reverseMul[compIDX] * fieldComp.compareValues(firstFD.fields().get(compIDX),
secondFD.fields().get(compIDX)
);
if (cmp != 0) {
return cmp;
}
}
}
return tieBreakCompare(first, second, DEFAULT_TIE_BREAKER);
};
}
@SuppressWarnings("unchecked")
Flux<T>[] fluxes = new Flux[fullDocs.length];
for (int i = 0; i < iterables.length; i++) {
var shardIndex = i;
fluxes[i] = iterables[i].<T>map(shard -> {
if (shard instanceof LLScoreDoc scoreDoc) {
//noinspection unchecked
return (T) new LLScoreDoc(scoreDoc.doc(), scoreDoc.score(), shardIndex);
} else if (shard instanceof LLFieldDoc fieldDoc) {
//noinspection unchecked
return (T) new LLFieldDoc(fieldDoc.doc(), fieldDoc.score(), shardIndex, fieldDoc.fields());
} else if (shard instanceof LLSlotDoc slotDoc) {
//noinspection unchecked
return (T) new LLSlotDoc(slotDoc.doc(), slotDoc.score(), shardIndex, slotDoc.slot());
} else {
throw new UnsupportedOperationException("Unsupported type " + shard.getClass());
}
});
if (fullDocs[i].totalHits().relation == EQUAL_TO) {
fluxes[i] = fluxes[i].take(fullDocs[i].totalHits().value, true);
return tieBreakCompare(first, second, DEFAULT_TIE_BREAKER);
};
}
}
return Flux.mergeComparing(comp, fluxes);
@SuppressWarnings("unchecked") Flux<T>[] fluxes = new Flux[fullDocs.length];
for (int i = 0; i < iterables.length; i++) {
var shardIndex = i;
fluxes[i] = iterables[i].<T>map(shard -> switch (shard) {
case LLScoreDoc scoreDoc ->
//noinspection unchecked
(T) new LLScoreDoc(scoreDoc.doc(), scoreDoc.score(), shardIndex);
case LLFieldDoc fieldDoc ->
//noinspection unchecked
(T) new LLFieldDoc(fieldDoc.doc(), fieldDoc.score(), shardIndex, fieldDoc.fields());
case LLSlotDoc slotDoc ->
//noinspection unchecked
(T) new LLSlotDoc(slotDoc.doc(), slotDoc.score(), shardIndex, slotDoc.slot());
case null, default -> throw new UnsupportedOperationException("Unsupported type " + (shard == null ? null : shard.getClass()));
});
if (fullDocs[i].totalHits().relation == EQUAL_TO) {
fluxes[i] = fluxes[i].take(fullDocs[i].totalHits().value, true);
}
}
return Flux.mergeComparing(comp, fluxes);
}
};
}

View File

@ -1,7 +1,9 @@
package it.cavallium.dbengine.lucene;
import io.net5.buffer.ByteBuf;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -14,7 +16,7 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
public class LLSlotDocCodec implements LMDBSortedCodec<LLSlotDoc>, FieldValueHitQueue {
public class LLSlotDocCodec implements LMDBSortedCodec<LLSlotDoc>, FieldValueHitQueue, SafeCloseable {
private final SortField[] fields;
@ -166,4 +168,13 @@ public class LLSlotDocCodec implements LMDBSortedCodec<LLSlotDoc>, FieldValueHit
public SortField[] getFields() {
return fields;
}
@Override
public void close() {
for (FieldComparator<?> comparator : this.comparators) {
if (comparator instanceof SafeCloseable closeable) {
closeable.close();
}
}
}
}

View File

@ -5,6 +5,7 @@ import static org.lmdbjava.DbiFlags.MDB_CREATE;
import io.net5.buffer.ByteBuf;
import io.net5.buffer.PooledByteBufAllocator;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
@ -23,7 +24,7 @@ import org.lmdbjava.Dbi;
import org.lmdbjava.Env;
import org.lmdbjava.Txn;
public class LMDBArray<V> implements IArray<V>, Closeable {
public class LMDBArray<V> implements IArray<V>, SafeCloseable {
private final AtomicBoolean closed = new AtomicBoolean();
@ -238,7 +239,7 @@ public class LMDBArray<V> implements IArray<V>, Closeable {
}
@Override
public void close() throws IOException {
public void close() {
if (closed.compareAndSet(false, true)) {
ensureThread();
for (ByteBuf value : toWrite.values()) {

View File

@ -5,7 +5,9 @@ import static org.lmdbjava.DbiFlags.*;
import io.net5.buffer.ByteBuf;
import io.net5.buffer.PooledByteBufAllocator;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
@ -77,6 +79,7 @@ public class LMDBPriorityQueue<T> implements PriorityQueue<T>, Reversable<Revers
}
private void switchToMode(boolean write, boolean wantCursor) {
assert !closed.get() : "Closed";
if (iterating) {
throw new IllegalStateException("Tried to " + (write ? "write" : "read") + " while still iterating");
}
@ -107,6 +110,7 @@ public class LMDBPriorityQueue<T> implements PriorityQueue<T>, Reversable<Revers
}
private void switchToModeUncached(boolean write, boolean wantCursor) {
assert !closed.get() : "Closed";
if (iterating) {
throw new IllegalStateException("Tried to " + (write ? "write" : "read") + " while still iterating");
}
@ -497,7 +501,7 @@ public class LMDBPriorityQueue<T> implements PriorityQueue<T>, Reversable<Revers
}
@Override
public void close() throws IOException {
public void close() {
if (closed.compareAndSet(false, true)) {
ensureThread();
for (ByteBuf toWriteKey : toWriteKeys) {
@ -523,6 +527,9 @@ public class LMDBPriorityQueue<T> implements PriorityQueue<T>, Reversable<Revers
}
lmdb.close();
this.tempEnv.freeDb(lmdbDbId);
if (this.codec instanceof SafeCloseable closeable) {
closeable.close();
}
}
}
@ -536,6 +543,11 @@ public class LMDBPriorityQueue<T> implements PriorityQueue<T>, Reversable<Revers
@Override
public ReversableResourceIterable<T> reverse() {
return new ReversableResourceIterable<>() {
@Override
public void close() {
LMDBPriorityQueue.this.close();
}
@Override
public Flux<T> iterate() {
return reverseIterate();

View File

@ -1,5 +1,7 @@
package it.cavallium.dbengine.lucene;
import java.io.Closeable;
import java.io.IOException;
import org.apache.lucene.search.TotalHits;
import reactor.core.publisher.Flux;
@ -27,4 +29,9 @@ public class LazyFullDocs<T extends LLDoc> implements FullDocs<T> {
public TotalHits totalHits() {
return totalHits;
}
@Override
public void close() {
pq.close();
}
}

View File

@ -1,8 +1,8 @@
package it.cavallium.dbengine.lucene;
import java.io.Closeable;
import it.cavallium.dbengine.database.SafeCloseable;
public interface PriorityQueue<T> extends ResourceIterable<T>, Closeable {
public interface PriorityQueue<T> extends ResourceIterable<T>, SafeCloseable {
/**
* Adds an Object to a PriorityQueue in log(size) time. If one tries to add more objects than maxSize from initialize

View File

@ -1,10 +1,12 @@
package it.cavallium.dbengine.lucene;
import it.cavallium.dbengine.database.SafeCloseable;
import java.io.Closeable;
import java.util.Iterator;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Flux;
public interface ResourceIterable<T> {
public interface ResourceIterable<T> extends SafeCloseable {
/**
* Iterate this PriorityQueue

View File

@ -16,6 +16,7 @@
*/
package it.cavallium.dbengine.lucene.collector;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.lucene.FullDocs;
import it.cavallium.dbengine.lucene.LLDoc;
import it.cavallium.dbengine.lucene.LazyFullDocs;
@ -38,7 +39,7 @@ import org.apache.lucene.search.TotalHits;
* all methods, in order to avoid a NullPointerException.
*/
public abstract class FullDocsCollector<PQ extends PriorityQueue<INTERNAL> & Reversable<ReversableResourceIterable<INTERNAL>>, INTERNAL extends LLDoc,
EXTERNAL extends LLDoc> implements Collector, AutoCloseable {
EXTERNAL extends LLDoc> implements Collector, SafeCloseable {
/**
* The priority queue which holds the top documents. Note that different implementations of
@ -71,7 +72,7 @@ public abstract class FullDocsCollector<PQ extends PriorityQueue<INTERNAL> & Rev
public abstract ResourceIterable<EXTERNAL> mapResults(ResourceIterable<INTERNAL> it);
@Override
public void close() throws Exception {
public void close() {
pq.close();
}
}

View File

@ -1,13 +1,16 @@
package it.cavallium.dbengine.lucene.collector;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.lucene.FullDocs;
import it.cavallium.dbengine.lucene.LLDoc;
import it.cavallium.dbengine.lucene.LLFieldDoc;
import java.io.Closeable;
import java.io.IOException;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TotalHits;
import reactor.core.publisher.Flux;
public class FullFieldDocs<T extends LLDoc> implements FullDocs<T> {
public class FullFieldDocs<T extends LLDoc> implements FullDocs<T>, SafeCloseable {
private final FullDocs<T> fullDocs;
private final SortField[] fields;
@ -35,4 +38,9 @@ public class FullFieldDocs<T extends LLDoc> implements FullDocs<T> {
public SortField[] fields() {
return fields;
}
@Override
public void close() {
fullDocs.close();
}
}

View File

@ -16,6 +16,7 @@
*/
package it.cavallium.dbengine.lucene.collector;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.FieldValueHitQueue;
import it.cavallium.dbengine.lucene.FullDocs;
@ -26,6 +27,7 @@ import it.cavallium.dbengine.lucene.LMDBPriorityQueue;
import it.cavallium.dbengine.lucene.MaxScoreAccumulator;
import it.cavallium.dbengine.lucene.PriorityQueue;
import it.cavallium.dbengine.lucene.ResourceIterable;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
@ -55,8 +57,7 @@ import reactor.core.publisher.Flux;
* <a href="https://github.com/apache/lucene/commits/main/lucene/core/src/java/org/apache/lucene/search/TopFieldCollector.java">
* Lucene TopFieldCollector changes on GitHub</a>
*/
public abstract class LMDBFullFieldDocCollector
extends FullDocsCollector<LMDBPriorityQueue<LLSlotDoc>, LLSlotDoc, LLFieldDoc> {
public abstract class LMDBFullFieldDocCollector extends FullDocsCollector<LMDBPriorityQueue<LLSlotDoc>, LLSlotDoc, LLFieldDoc> {
// TODO: one optimization we could do is to pre-fill
// the queue with sentinel value that guaranteed to
@ -239,6 +240,11 @@ public abstract class LMDBFullFieldDocCollector
@Override
public ResourceIterable<LLFieldDoc> mapResults(ResourceIterable<LLSlotDoc> it) {
return new ResourceIterable<>() {
@Override
public void close() {
it.close();
}
@Override
public Flux<LLFieldDoc> iterate() {
return it.iterate().map(fieldValueHitQueue::fillFields);
@ -469,4 +475,13 @@ public abstract class LMDBFullFieldDocCollector
public boolean isEarlyTerminated() {
return totalHitsRelation == Relation.GREATER_THAN_OR_EQUAL_TO;
}
@Override
public void close() {
this.pq.close();
if (this.firstComparator instanceof SafeCloseable closeable) {
closeable.close();
}
super.close();
}
}

View File

@ -316,4 +316,5 @@ public abstract class LMDBFullScoreDocCollector extends FullDocsCollector<LMDBPr
}
}
}
}

View File

@ -17,12 +17,14 @@
package it.cavallium.dbengine.lucene.comparators;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.IArray;
import it.cavallium.dbengine.lucene.IntCodec;
import it.cavallium.dbengine.lucene.LMDBArray;
import it.cavallium.dbengine.lucene.LMDBPriorityQueue;
import it.cavallium.dbengine.lucene.LongCodec;
import java.io.Closeable;
import java.io.IOException;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.DocIdSetIterator;
@ -34,7 +36,7 @@ import org.apache.lucene.search.Scorable;
* Comparator that sorts by asc _doc
* Based on {@link org.apache.lucene.search.comparators.DocComparator}
* */
public class DocComparator extends FieldComparator<Integer> {
public class DocComparator extends FieldComparator<Integer> implements SafeCloseable {
private final IArray<Integer> docIDs;
private final boolean enableSkipping; // if skipping functionality should be enabled
private int bottom;
@ -75,7 +77,14 @@ public class DocComparator extends FieldComparator<Integer> {
return docIDs.getOrDefault(slot, 0);
}
/**
@Override
public void close() {
if (docIDs instanceof SafeCloseable closeable) {
closeable.close();
}
}
/**
* DocLeafComparator with skipping functionality. When sort by _doc asc, after collecting top N
* matches and enough hits, the comparator can skip all the following documents. When sort by _doc
* asc and "top" document is set after which search should start, the comparator provides an

View File

@ -17,11 +17,13 @@
package it.cavallium.dbengine.lucene.comparators;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.DoubleCodec;
import it.cavallium.dbengine.lucene.IArray;
import it.cavallium.dbengine.lucene.LMDBArray;
import it.cavallium.dbengine.lucene.SortFieldCodec;
import java.io.Closeable;
import java.io.IOException;
import org.apache.lucene.document.DoublePoint;
import org.apache.lucene.index.LeafReaderContext;
@ -32,7 +34,7 @@ import org.apache.lucene.search.LeafFieldComparator;
* skipping functionality - an iterator that can skip over non-competitive documents.
* Based on {@link org.apache.lucene.search.comparators.DoubleComparator}
*/
public class DoubleComparator extends NumericComparator<Double> {
public class DoubleComparator extends NumericComparator<Double> implements SafeCloseable {
private final IArray<Double> values;
protected double topValue;
protected double bottom;
@ -64,7 +66,14 @@ public class DoubleComparator extends NumericComparator<Double> {
return new DoubleLeafComparator(context);
}
/** Leaf comparator for {@link DoubleComparator} that provides skipping functionality */
@Override
public void close() {
if (values instanceof SafeCloseable closeable) {
closeable.close();
}
}
/** Leaf comparator for {@link DoubleComparator} that provides skipping functionality */
public class DoubleLeafComparator extends NumericLeafComparator {
public DoubleLeafComparator(LeafReaderContext context) throws IOException {

View File

@ -17,11 +17,13 @@
package it.cavallium.dbengine.lucene.comparators;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.DoubleCodec;
import it.cavallium.dbengine.lucene.FloatCodec;
import it.cavallium.dbengine.lucene.IArray;
import it.cavallium.dbengine.lucene.LMDBArray;
import java.io.Closeable;
import java.io.IOException;
import org.apache.lucene.document.FloatPoint;
import org.apache.lucene.index.LeafReaderContext;
@ -32,7 +34,7 @@ import org.apache.lucene.search.LeafFieldComparator;
* skipping functionality an iterator that can skip over non-competitive documents.
* Based on {@link org.apache.lucene.search.comparators.FloatComparator}
*/
public class FloatComparator extends NumericComparator<Float> {
public class FloatComparator extends NumericComparator<Float> implements SafeCloseable {
private final IArray<Float> values;
protected float topValue;
protected float bottom;
@ -64,7 +66,14 @@ public class FloatComparator extends NumericComparator<Float> {
return new FloatLeafComparator(context);
}
/** Leaf comparator for {@link FloatComparator} that provides skipping functionality */
@Override
public void close() {
if (values instanceof SafeCloseable closeable) {
closeable.close();
}
}
/** Leaf comparator for {@link FloatComparator} that provides skipping functionality */
public class FloatLeafComparator extends NumericLeafComparator {
public FloatLeafComparator(LeafReaderContext context) throws IOException {

View File

@ -17,11 +17,13 @@
package it.cavallium.dbengine.lucene.comparators;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.FloatCodec;
import it.cavallium.dbengine.lucene.IArray;
import it.cavallium.dbengine.lucene.IntCodec;
import it.cavallium.dbengine.lucene.LMDBArray;
import java.io.Closeable;
import java.io.IOException;
import org.apache.lucene.document.IntPoint;
import org.apache.lucene.index.LeafReaderContext;
@ -32,7 +34,7 @@ import org.apache.lucene.search.LeafFieldComparator;
* skipping functionality an iterator that can skip over non-competitive documents.
* Based on {@link org.apache.lucene.search.comparators.IntComparator}
*/
public class IntComparator extends NumericComparator<Integer> {
public class IntComparator extends NumericComparator<Integer> implements SafeCloseable {
private final IArray<Integer> values;
protected int topValue;
protected int bottom;
@ -64,7 +66,14 @@ public class IntComparator extends NumericComparator<Integer> {
return new IntLeafComparator(context);
}
/** Leaf comparator for {@link IntComparator} that provides skipping functionality */
@Override
public void close() {
if (values instanceof SafeCloseable closeable) {
closeable.close();
}
}
/** Leaf comparator for {@link IntComparator} that provides skipping functionality */
public class IntLeafComparator extends NumericLeafComparator {
public IntLeafComparator(LeafReaderContext context) throws IOException {

View File

@ -17,11 +17,13 @@
package it.cavallium.dbengine.lucene.comparators;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.IArray;
import it.cavallium.dbengine.lucene.IntCodec;
import it.cavallium.dbengine.lucene.LMDBArray;
import it.cavallium.dbengine.lucene.LongCodec;
import java.io.Closeable;
import java.io.IOException;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.LeafReaderContext;
@ -32,7 +34,7 @@ import org.apache.lucene.search.LeafFieldComparator;
* functionality an iterator that can skip over non-competitive documents.
* Based on {@link org.apache.lucene.search.comparators.LongComparator}
*/
public class LongComparator extends NumericComparator<Long> {
public class LongComparator extends NumericComparator<Long> implements SafeCloseable {
private final IArray<Long> values;
protected long topValue;
protected long bottom;
@ -64,7 +66,14 @@ public class LongComparator extends NumericComparator<Long> {
return new LongLeafComparator(context);
}
/** Leaf comparator for {@link LongComparator} that provides skipping functionality */
@Override
public void close() {
if (values instanceof SafeCloseable closeable) {
closeable.close();
}
}
/** Leaf comparator for {@link LongComparator} that provides skipping functionality */
public class LongLeafComparator extends NumericLeafComparator {
public LongLeafComparator(LeafReaderContext context) throws IOException {

View File

@ -16,11 +16,13 @@
*/
package it.cavallium.dbengine.lucene.comparators;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.FloatCodec;
import it.cavallium.dbengine.lucene.IArray;
import it.cavallium.dbengine.lucene.LMDBArray;
import it.cavallium.dbengine.lucene.LongCodec;
import java.io.Closeable;
import java.io.IOException;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.DocValues;
@ -40,7 +42,7 @@ import org.apache.lucene.util.BytesRefBuilder;
* org.apache.lucene.search.IndexSearcher#search(Query, int)} uses when no {@link org.apache.lucene.search.Sort} is specified).
* Based on {@link org.apache.lucene.search.FieldComparator.RelevanceComparator}
*/
public final class RelevanceComparator extends FieldComparator<Float> implements LeafFieldComparator {
public final class RelevanceComparator extends FieldComparator<Float> implements LeafFieldComparator, SafeCloseable {
private final IArray<Float> scores;
private float bottom;
@ -115,4 +117,11 @@ public final class RelevanceComparator extends FieldComparator<Float> implements
assert !Float.isNaN(docValue);
return Float.compare(docValue, topValue);
}
@Override
public void close() {
if (this.scores instanceof SafeCloseable closeable) {
closeable.close();
}
}
}

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine.lucene.comparators;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.ByteArrayCodec;
import it.cavallium.dbengine.lucene.BytesRefCodec;
@ -7,6 +8,7 @@ import it.cavallium.dbengine.lucene.FloatCodec;
import it.cavallium.dbengine.lucene.IArray;
import it.cavallium.dbengine.lucene.IntCodec;
import it.cavallium.dbengine.lucene.LMDBArray;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import org.apache.lucene.index.DocValues;
@ -28,7 +30,7 @@ import org.apache.lucene.util.BytesRefBuilder;
* it may be slower.
* Based on {@link org.apache.lucene.search.FieldComparator.TermOrdValComparator}
*/
public class TermOrdValComparator extends FieldComparator<BytesRef> implements LeafFieldComparator {
public class TermOrdValComparator extends FieldComparator<BytesRef> implements LeafFieldComparator, SafeCloseable {
/* Ords for each slot.
@lucene.internal */
final IArray<Integer> ords;
@ -296,4 +298,17 @@ public class TermOrdValComparator extends FieldComparator<BytesRef> implements L
@Override
public void setScorer(Scorable scorer) {}
@Override
public void close() {
if (this.ords instanceof SafeCloseable closeable) {
closeable.close();
}
if (this.readerGen instanceof SafeCloseable closeable) {
closeable.close();
}
if (this.values instanceof SafeCloseable closeable) {
closeable.close();
}
}
}

View File

@ -3,13 +3,17 @@ package it.cavallium.dbengine.lucene.searcher;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.FullDocs;
import it.cavallium.dbengine.lucene.LLFieldDoc;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.collector.FullDocsCollector;
import it.cavallium.dbengine.lucene.collector.LMDBFullFieldDocCollector;
import it.cavallium.dbengine.lucene.collector.LMDBFullScoreDocCollector;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.IndexSearcher;
@ -62,23 +66,55 @@ public class SortedScoredFullMultiSearcher implements MultiSearcher {
return LMDBFullFieldDocCollector.createSharedManager(env, queryParams.sort(), queryParams.limitInt(),
totalHitsThreshold);
})
.flatMap(sharedManager -> Flux
.<FullDocs<LLFieldDoc>>flatMap(sharedManager -> Flux
.fromIterable(indexSearchers)
.flatMap(shard -> Mono.fromCallable(() -> {
LLUtils.ensureBlocking();
var collector = sharedManager.newCollector();
assert queryParams.computePreciseHitsCount() == collector.scoreMode().isExhaustive();
try {
assert queryParams.computePreciseHitsCount() == collector.scoreMode().isExhaustive();
shard.search(queryParams.query(), collector);
return collector;
shard.search(queryParams.query(), collector);
return collector;
} catch (Throwable ex) {
collector.close();
throw ex;
}
}))
.collectList()
.flatMap(collectors -> Mono.fromCallable(() -> {
LLUtils.ensureBlocking();
return sharedManager.reduce(collectors);
try {
LLUtils.ensureBlocking();
return sharedManager.reduce(collectors);
} catch (Throwable ex) {
for (LMDBFullFieldDocCollector collector : collectors) {
collector.close();
}
throw ex;
}
}))
);
)
.doOnDiscard(List.class, list -> {
try {
for (Object o : list) {
if (o instanceof LMDBFullFieldDocCollector fullDocsCollector) {
fullDocsCollector.close();
}
}
} catch (Exception ex) {
logger.error("Failed to discard collector", ex);
}
})
.doOnDiscard(LMDBFullFieldDocCollector.class, fullDocsCollector -> {
try {
fullDocsCollector.close();
} catch (Exception ex) {
logger.error("Failed to discard collector", ex);
}
})
.doOnDiscard(FullDocs.class, SafeCloseable::close);
}
/**
@ -96,7 +132,10 @@ public class SortedScoredFullMultiSearcher implements MultiSearcher {
indexSearchers.shards(), keyFieldName, true)
.take(queryParams.limitLong(), true);
return new LuceneSearchResult(totalHitsCount, hitsFlux, indexSearchers::close);
return new LuceneSearchResult(totalHitsCount, hitsFlux, () -> {
indexSearchers.close();
data.close();
});
});
}

View File

@ -3,13 +3,18 @@ package it.cavallium.dbengine.lucene.searcher;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.FullDocs;
import it.cavallium.dbengine.lucene.LLScoreDoc;
import it.cavallium.dbengine.lucene.collector.FullDocsCollector;
import it.cavallium.dbengine.lucene.collector.LMDBFullScoreDocCollector;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.IndexSearcher;
@ -74,17 +79,49 @@ public class UnsortedScoredFullMultiSearcher implements MultiSearcher {
LLUtils.ensureBlocking();
var collector = sharedManager.newCollector();
assert queryParams.computePreciseHitsCount() == collector.scoreMode().isExhaustive();
try {
assert queryParams.computePreciseHitsCount() == collector.scoreMode().isExhaustive();
shard.search(queryParams.query(), collector);
return collector;
shard.search(queryParams.query(), collector);
return collector;
} catch (Throwable ex) {
collector.close();
throw ex;
}
}))
.collectList()
.flatMap(collectors -> Mono.fromCallable(() -> {
LLUtils.ensureBlocking();
return sharedManager.reduce(collectors);
try {
LLUtils.ensureBlocking();
return sharedManager.reduce(collectors);
} catch (Throwable ex) {
for (LMDBFullScoreDocCollector collector : collectors) {
collector.close();
}
throw ex;
}
}))
);
)
.doOnDiscard(List.class, list -> {
try {
for (Object o : list) {
if (o instanceof FullDocsCollector<?,?,?> fullDocsCollector) {
fullDocsCollector.close();
}
}
} catch (Exception ex) {
logger.error("Failed to discard collector", ex);
}
})
.doOnDiscard(FullDocsCollector.class, fullDocsCollector -> {
try {
fullDocsCollector.close();
} catch (Exception ex) {
logger.error("Failed to discard collector", ex);
}
})
.doOnDiscard(FullDocs.class, SafeCloseable::close);
}
/**
@ -102,7 +139,14 @@ public class UnsortedScoredFullMultiSearcher implements MultiSearcher {
indexSearchers.shards(), keyFieldName, true)
.take(queryParams.limitLong(), true);
return new LuceneSearchResult(totalHitsCount, hitsFlux, indexSearchers::close);
return new LuceneSearchResult(totalHitsCount, hitsFlux, () -> {
indexSearchers.close();
try {
data.close();
} catch (Exception e) {
logger.error("Failed to discard data", e);
}
});
});
}

View File

@ -70,7 +70,7 @@ public class PriorityQueueAdaptor<T> implements PriorityQueue<T> {
}
@Override
public void close() throws IOException {
public void close() {
hitQueue.clear();
hitQueue.updateTop();
}

View File

@ -1,5 +1,7 @@
package it.cavallium.dbengine;
import static org.junit.jupiter.api.Assertions.assertEquals;
import io.net5.buffer.ByteBuf;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.LMDBSortedCodec;
@ -142,6 +144,7 @@ public class TestLMDB {
@AfterEach
public void afterEach() throws IOException {
queue.close();
assertEquals(0, env.countUsedDbs());
env.close();
}
}

View File

@ -1,7 +1,10 @@
package it.cavallium.dbengine;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.google.common.collect.Lists;
import io.net5.buffer.ByteBuf;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.LLScoreDoc;
import it.cavallium.dbengine.lucene.LMDBSortedCodec;
@ -27,7 +30,7 @@ public class TestLMDBHitQueue {
public static final int NUM_HITS = 1024;
private LLTempLMDBEnv env;
private Closeable lmdbQueue;
private SafeCloseable lmdbQueue;
private TestingPriorityQueue testingPriorityQueue;
@ -248,6 +251,7 @@ public class TestLMDBHitQueue {
@AfterEach
public void afterEach() throws IOException {
lmdbQueue.close();
assertEquals(0, env.countUsedDbs());
env.close();
}
@ -346,8 +350,9 @@ public class TestLMDBHitQueue {
}
@Override
public void close() throws IOException {
public void close() {
referenceQueue.close();
testQueue.close();
}
private void ensureEquality() {

View File

@ -53,8 +53,10 @@ import java.util.stream.Stream;
import org.apache.commons.lang3.function.FailableConsumer;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
@ -111,6 +113,7 @@ public class TestLuceneSearches {
luceneSingle = tempDb.luceneSingle();
luceneMulti = tempDb.luceneMulti();
ENV = new LLTempLMDBEnv();
assertEquals(0, ENV.countUsedDbs());
setUpIndex(true);
setUpIndex(false);
@ -201,9 +204,20 @@ public class TestLuceneSearches {
}
}
@BeforeEach
public void beforeEach() {
assertEquals(0, ENV.countUsedDbs());
}
@AfterEach
public void afterEach() {
assertEquals(0, ENV.countUsedDbs());
}
@AfterAll
public static void afterAll() throws IOException {
TEMP_DB_GENERATOR.closeTempDb(tempDb).block();
assertEquals(0, ENV.countUsedDbs());
ENV.close();
ensureNoLeaks(allocator.allocator(), true, false);
destroyAllocator(allocator);