Update LLLuceneIndex.java, LLSnapshottable.java, and 2 more files...

This commit is contained in:
Andrea Cavalli 2021-01-30 01:41:04 +01:00
parent 74fdb752b4
commit 52d4f022bd
4 changed files with 189 additions and 261 deletions

View File

@ -1,27 +1,25 @@
package it.cavallium.dbengine.database; package it.cavallium.dbengine.database;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
public interface LLLuceneIndex extends Closeable, LLSnapshottable { public interface LLLuceneIndex extends LLSnapshottable {
String getLuceneIndexName(); String getLuceneIndexName();
void addDocument(LLTerm id, LLDocument doc) throws IOException; Mono<Void> addDocument(LLTerm id, LLDocument doc);
void addDocuments(Iterable<LLTerm> keys, Iterable<LLDocument> documents) throws IOException; Mono<Void> addDocuments(Iterable<LLTerm> keys, Iterable<LLDocument> documents);
void deleteDocument(LLTerm id) throws IOException; Mono<Void> deleteDocument(LLTerm id);
void updateDocument(LLTerm id, LLDocument document) throws IOException; Mono<Void> updateDocument(LLTerm id, LLDocument document);
void updateDocuments(Iterable<LLTerm> ids, Iterable<LLDocument> documents) throws IOException; Mono<Void> updateDocuments(Iterable<LLTerm> ids, Iterable<LLDocument> documents);
void deleteAll() throws IOException; Mono<Void> deleteAll();
/** /**
* *
@ -49,7 +47,13 @@ public interface LLLuceneIndex extends Closeable, LLSnapshottable {
LLScoreMode scoreMode, LLScoreMode scoreMode,
String keyFieldName); String keyFieldName);
long count(@Nullable LLSnapshot snapshot, String query) throws IOException; default Mono<Long> count(@Nullable LLSnapshot snapshot, String queryString) {
return this.search(snapshot, queryString, 0, null, null, null)
.flatMap(LLSearchResult::totalHitsCount)
.single();
}
boolean isLowMemoryMode(); boolean isLowMemoryMode();
Mono<Void> close();
} }

View File

@ -1,10 +1,10 @@
package it.cavallium.dbengine.database; package it.cavallium.dbengine.database;
import java.io.IOException; import reactor.core.publisher.Mono;
public interface LLSnapshottable { public interface LLSnapshottable {
LLSnapshot takeSnapshot() throws IOException; Mono<LLSnapshot> takeSnapshot();
void releaseSnapshot(LLSnapshot snapshot) throws IOException; Mono<Void> releaseSnapshot(LLSnapshot snapshot);
} }

View File

@ -14,7 +14,6 @@ import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.database.luceneutil.AdaptiveStreamSearcher; import it.cavallium.dbengine.database.luceneutil.AdaptiveStreamSearcher;
import it.cavallium.dbengine.database.luceneutil.LuceneStreamSearcher; import it.cavallium.dbengine.database.luceneutil.LuceneStreamSearcher;
import it.cavallium.dbengine.database.luceneutil.PagedStreamSearcher; import it.cavallium.dbengine.database.luceneutil.PagedStreamSearcher;
import it.cavallium.luceneserializer.luceneserializer.ParseException;
import it.cavallium.luceneserializer.luceneserializer.QueryParser; import it.cavallium.luceneserializer.luceneserializer.QueryParser;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path; import java.nio.file.Path;
@ -44,7 +43,6 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FSDirectory;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.concurrency.executor.ScheduledTaskLifecycle; import org.warp.commonutils.concurrency.executor.ScheduledTaskLifecycle;
import org.warp.commonutils.functional.IOFunction;
import org.warp.commonutils.type.ShortNamedThreadFactory; import org.warp.commonutils.type.ShortNamedThreadFactory;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@ -137,87 +135,115 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
} }
@Override @Override
public LLSnapshot takeSnapshot() throws IOException { public Mono<LLSnapshot> takeSnapshot() {
return Mono
long snapshotSeqNo = lastSnapshotSeqNo.incrementAndGet(); .fromCallable(lastSnapshotSeqNo::incrementAndGet)
.subscribeOn(Schedulers.boundedElastic())
IndexCommit snapshot = takeLuceneSnapshot(); .flatMap(snapshotSeqNo -> takeLuceneSnapshot()
this.snapshots.put(snapshotSeqNo, new LuceneIndexSnapshot(snapshot)); .flatMap(snapshot -> Mono
return new LLSnapshot(snapshotSeqNo); .fromCallable(() -> {
this.snapshots.put(snapshotSeqNo, new LuceneIndexSnapshot(snapshot));
return new LLSnapshot(snapshotSeqNo);
})
.subscribeOn(Schedulers.boundedElastic())
)
);
} }
/** /**
* Use internally. This method commits before taking the snapshot if there are no commits in a new database, * Use internally. This method commits before taking the snapshot if there are no commits in a new database,
* avoiding the exception. * avoiding the exception.
*/ */
private IndexCommit takeLuceneSnapshot() throws IOException { private Mono<IndexCommit> takeLuceneSnapshot() {
try { return Mono.fromCallable(() -> {
return snapshotter.snapshot(); try {
} catch (IllegalStateException ex) {
if ("No index commit to snapshot".equals(ex.getMessage())) {
indexWriter.commit();
return snapshotter.snapshot(); return snapshotter.snapshot();
} else { } catch (IllegalStateException ex) {
throw ex; if ("No index commit to snapshot".equals(ex.getMessage())) {
indexWriter.commit();
return snapshotter.snapshot();
} else {
throw ex;
}
} }
} }).subscribeOn(Schedulers.boundedElastic());
} }
@Override @Override
public void releaseSnapshot(LLSnapshot snapshot) throws IOException { public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
var indexSnapshot = this.snapshots.remove(snapshot.getSequenceNumber()); return Mono.<Void>fromCallable(() -> {
if (indexSnapshot == null) { var indexSnapshot = this.snapshots.remove(snapshot.getSequenceNumber());
throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!"); if (indexSnapshot == null) {
} throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!");
}
indexSnapshot.close(); indexSnapshot.close();
var luceneIndexSnapshot = indexSnapshot.getSnapshot(); var luceneIndexSnapshot = indexSnapshot.getSnapshot();
snapshotter.release(luceneIndexSnapshot); snapshotter.release(luceneIndexSnapshot);
// Delete unused files after releasing the snapshot // Delete unused files after releasing the snapshot
indexWriter.deleteUnusedFiles(); indexWriter.deleteUnusedFiles();
return null;
}).subscribeOn(Schedulers.boundedElastic());
} }
@Override @Override
public void addDocument(LLTerm key, LLDocument doc) throws IOException { public Mono<Void> addDocument(LLTerm key, LLDocument doc) {
indexWriter.addDocument(LLUtils.toDocument(doc)); return Mono.<Void>fromCallable(() -> {
indexWriter.addDocument(LLUtils.toDocument(doc));
return null;
}).subscribeOn(Schedulers.boundedElastic());
} }
@Override @Override
public void addDocuments(Iterable<LLTerm> keys, Iterable<LLDocument> docs) throws IOException { public Mono<Void> addDocuments(Iterable<LLTerm> keys, Iterable<LLDocument> docs) {
return Mono.<Void>fromCallable(() -> {
indexWriter.addDocuments(LLUtils.toDocuments(docs)); indexWriter.addDocuments(LLUtils.toDocuments(docs));
return null;
}).subscribeOn(Schedulers.boundedElastic());
} }
@Override @Override
public void deleteDocument(LLTerm id) throws IOException { public Mono<Void> deleteDocument(LLTerm id) {
indexWriter.deleteDocuments(LLUtils.toTerm(id)); return Mono.<Void>fromCallable(() -> {
indexWriter.deleteDocuments(LLUtils.toTerm(id));
return null;
}).subscribeOn(Schedulers.boundedElastic());
} }
@Override @Override
public void updateDocument(LLTerm id, LLDocument document) throws IOException { public Mono<Void> updateDocument(LLTerm id, LLDocument document) {
indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document)); return Mono.<Void>fromCallable(() -> {
indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document));
return null;
}).subscribeOn(Schedulers.boundedElastic());
} }
@Override @Override
public void updateDocuments(Iterable<LLTerm> ids, Iterable<LLDocument> documents) public Mono<Void> updateDocuments(Iterable<LLTerm> ids, Iterable<LLDocument> documents) {
throws IOException { return Mono.<Void>fromCallable(() -> {
var idIt = ids.iterator(); var idIt = ids.iterator();
var docIt = documents.iterator(); var docIt = documents.iterator();
while (idIt.hasNext()) { while (idIt.hasNext()) {
var id = idIt.next(); var id = idIt.next();
var doc = docIt.next(); var doc = docIt.next();
indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(doc)); indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(doc));
} }
return null;
}).subscribeOn(Schedulers.boundedElastic());
} }
@Override @Override
public void deleteAll() throws IOException { public Mono<Void> deleteAll() {
indexWriter.deleteAll(); return Mono.<Void>fromCallable(() -> {
indexWriter.commit(); indexWriter.deleteAll();
indexWriter.forceMergeDeletes(true); indexWriter.commit();
indexWriter.flush(); indexWriter.forceMergeDeletes(true);
indexWriter.commit(); indexWriter.flush();
indexWriter.commit();
return null;
}).subscribeOn(Schedulers.boundedElastic());
} }
private Mono<IndexSearcher> acquireSearcherWrapper(LLSnapshot snapshot) { private Mono<IndexSearcher> acquireSearcherWrapper(LLSnapshot snapshot) {
@ -351,23 +377,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
} }
@Override @Override
public long count(@Nullable LLSnapshot snapshot, String queryString) throws IOException { public Mono<Void> close() {
try { return Mono
var luceneIndexSnapshot = resolveSnapshot(snapshot); .<Void>fromCallable(() -> {
scheduledTasksLifecycle.cancelAndWait();
Query query = QueryParser.parse(queryString); indexWriter.close();
directory.close();
return (long) runSearch(luceneIndexSnapshot, (indexSearcher) -> indexSearcher.count(query)); return null;
} catch (ParseException e) { })
throw new IOException("Error during query count!", e); .subscribeOn(Schedulers.boundedElastic());
}
}
@Override
public void close() throws IOException {
scheduledTasksLifecycle.cancelAndWait();
indexWriter.close();
directory.close();
} }
private void scheduledCommit() { private void scheduledCommit() {
@ -390,20 +408,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
} }
} }
private <U> U runSearch(@Nullable LuceneIndexSnapshot snapshot, IOFunction<IndexSearcher, U> searchExecutor)
throws IOException {
if (snapshot != null) {
return searchExecutor.apply(snapshot.getIndexSearcher());
} else {
var indexSearcher = searcherManager.acquire();
try {
return searchExecutor.apply(indexSearcher);
} finally {
searcherManager.release(indexSearcher);
}
}
}
private LuceneIndexSnapshot resolveSnapshot(@Nullable LLSnapshot snapshot) { private LuceneIndexSnapshot resolveSnapshot(@Nullable LLSnapshot snapshot) {
if (snapshot == null) { if (snapshot == null) {
return null; return null;

View File

@ -1,37 +1,30 @@
package it.cavallium.dbengine.database.disk; package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.LLDocument; import it.cavallium.dbengine.database.LLDocument;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLScoreMode; import it.cavallium.dbengine.database.LLScoreMode;
import it.cavallium.dbengine.database.LLSearchResult; import it.cavallium.dbengine.database.LLSearchResult;
import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLSort; import it.cavallium.dbengine.database.LLSort;
import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.database.LLTopKeys;
import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer; import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap; import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.StampedLock;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.batch.ParallelUtils; import org.warp.commonutils.batch.ParallelUtils;
import org.warp.commonutils.functional.IOBiConsumer; import org.warp.commonutils.functional.IOBiConsumer;
import org.warp.commonutils.functional.IOConsumer; import org.warp.commonutils.functional.TriFunction;
import org.warp.commonutils.functional.IOTriConsumer;
import org.warp.commonutils.locks.LockUtils;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
@ -42,7 +35,6 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
private final Long2ObjectMap<LLSnapshot[]> registeredSnapshots = new Long2ObjectOpenHashMap<>(); private final Long2ObjectMap<LLSnapshot[]> registeredSnapshots = new Long2ObjectOpenHashMap<>();
private final AtomicLong nextSnapshotNumber = new AtomicLong(1); private final AtomicLong nextSnapshotNumber = new AtomicLong(1);
private final LLLocalLuceneIndex[] luceneIndices; private final LLLocalLuceneIndex[] luceneIndices;
private final StampedLock access = new StampedLock();
private final int maxQueueSize = 1000; private final int maxQueueSize = 1000;
@ -87,29 +79,22 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
@Override @Override
public String getLuceneIndexName() { public String getLuceneIndexName() {
return LockUtils.readLock(access, () -> luceneIndices[0].getLuceneIndexName()); return luceneIndices[0].getLuceneIndexName();
} }
@Override @Override
public void addDocument(LLTerm id, LLDocument doc) throws IOException { public Mono<Void> addDocument(LLTerm id, LLDocument doc) {
LockUtils.readLockIO(access, () -> getLuceneIndex(id).addDocument(id, doc)); return getLuceneIndex(id).addDocument(id, doc);
} }
@Override @Override
public void addDocuments(Iterable<LLTerm> keys, Iterable<LLDocument> documents) throws IOException { public Mono<Void> addDocuments(Iterable<LLTerm> keys, Iterable<LLDocument> documents) {
LockUtils.readLockIO(access, () -> { return runPerInstance(keys, documents, LLLuceneIndex::addDocuments);
ParallelUtils.parallelizeIO(s -> runPerInstance(keys, documents, s),
maxQueueSize,
luceneIndices.length,
1,
LLLuceneIndex::addDocuments
);
});
} }
private void runPerInstance(Iterable<LLTerm> keys, private Mono<Void> runPerInstance(Iterable<LLTerm> keys,
Iterable<LLDocument> documents, Iterable<LLDocument> documents,
IOTriConsumer<LLLuceneIndex, Iterable<LLTerm>, Iterable<LLDocument>> consumer) throws IOException { TriFunction<LLLuceneIndex, Iterable<LLTerm>, Iterable<LLDocument>, Mono<Void>> consumer) {
var keysIt = keys.iterator(); var keysIt = keys.iterator();
var docsIt = documents.iterator(); var docsIt = documents.iterator();
@ -125,64 +110,37 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
perInstanceDocs.computeIfAbsent(instanceId, iid -> new ArrayList<>()).add(doc); perInstanceDocs.computeIfAbsent(instanceId, iid -> new ArrayList<>()).add(doc);
} }
for (Int2ObjectMap.Entry<List<LLTerm>> currentInstanceEntry : perInstanceKeys.int2ObjectEntrySet()) { return Flux
int instanceId = currentInstanceEntry.getIntKey(); .fromIterable(perInstanceKeys.int2ObjectEntrySet())
List<LLTerm> currentInstanceKeys = currentInstanceEntry.getValue(); .flatMap(currentInstanceEntry -> {
consumer.accept(this.luceneIndices[instanceId], currentInstanceKeys, perInstanceDocs.get(instanceId)); int instanceId = currentInstanceEntry.getIntKey();
} List<LLTerm> currentInstanceKeys = currentInstanceEntry.getValue();
return consumer.apply(this.luceneIndices[instanceId], currentInstanceKeys, perInstanceDocs.get(instanceId));
})
.then();
} }
@Override @Override
public void deleteDocument(LLTerm id) throws IOException { public Mono<Void> deleteDocument(LLTerm id) {
LockUtils.readLockIO(access, () -> getLuceneIndex(id).deleteDocument(id)); return getLuceneIndex(id).deleteDocument(id);
} }
@Override @Override
public void updateDocument(LLTerm id, LLDocument document) throws IOException { public Mono<Void> updateDocument(LLTerm id, LLDocument document) {
LockUtils.readLockIO(access, () -> getLuceneIndex(id).updateDocument(id, document)); return getLuceneIndex(id).updateDocument(id, document);
} }
@Override @Override
public void updateDocuments(Iterable<LLTerm> keys, Iterable<LLDocument> documents) throws IOException { public Mono<Void> updateDocuments(Iterable<LLTerm> keys, Iterable<LLDocument> documents) {
LockUtils.readLockIO(access, () -> { return runPerInstance(keys, documents, LLLuceneIndex::updateDocuments);
ParallelUtils.parallelizeIO(s -> runPerInstance(keys, documents, s),
maxQueueSize,
luceneIndices.length,
1,
LLLuceneIndex::updateDocuments
);
});
} }
@Override @Override
public void deleteAll() throws IOException { public Mono<Void> deleteAll() {
LockUtils.writeLockIO(access, () -> { return Flux
ParallelUtils.parallelizeIO((IOConsumer<LLLuceneIndex> s) -> { .fromArray(luceneIndices)
for (LLLocalLuceneIndex luceneIndex : luceneIndices) { .flatMap(LLLocalLuceneIndex::deleteAll)
s.consume(luceneIndex); .then();
}
}, maxQueueSize, luceneIndices.length, 1, LLLuceneIndex::deleteAll);
});
}
private LLTopKeys mergeTopKeys(Collection<LLTopKeys> multi) {
long totalHitsCount = 0;
LLKeyScore[] hits;
int hitsArraySize = 0;
for (LLTopKeys llTopKeys : multi) {
totalHitsCount += llTopKeys.getTotalHitsCount();
hitsArraySize += llTopKeys.getHits().length;
}
hits = new LLKeyScore[hitsArraySize];
int offset = 0;
for (LLTopKeys llTopKeys : multi) {
var part = llTopKeys.getHits();
System.arraycopy(part, 0, hits, offset, part.length);
offset += part.length;
}
return new LLTopKeys(totalHitsCount, hits);
} }
private LLSnapshot resolveSnapshot(LLSnapshot multiSnapshot, int instanceId) { private LLSnapshot resolveSnapshot(LLSnapshot multiSnapshot, int instanceId) {
@ -198,27 +156,14 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
Map<String, Set<String>> mltDocumentFields, Map<String, Set<String>> mltDocumentFields,
int limit, int limit,
String keyFieldName) { String keyFieldName) {
return Mono return Flux
.fromSupplier(access::readLock) .fromArray(luceneIndices)
.subscribeOn(Schedulers.boundedElastic()) .index()
.flatMap(stamp -> Flux .flatMap(tuple -> Mono
.fromArray(luceneIndices) .fromCallable(() -> resolveSnapshot(snapshot, (int) (long) tuple.getT1()))
.index() .map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot)))
.flatMap(tuple -> Mono .flatMap(tuple -> tuple.getT1().moreLikeThis(tuple.getT2(), mltDocumentFields, limit, keyFieldName))
.fromCallable(() -> resolveSnapshot(snapshot, (int) (long) tuple.getT1())) .reduce(LLSearchResult.accumulator());
.subscribeOn(Schedulers.boundedElastic())
.map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot))
)
.flatMap(tuple -> tuple.getT1().moreLikeThis(tuple.getT2(), mltDocumentFields, limit, keyFieldName))
.reduce(LLSearchResult.accumulator())
.materialize()
.flatMap(signal -> Mono
.fromRunnable(() -> access.unlockRead(stamp))
.subscribeOn(Schedulers.boundedElastic())
.thenReturn(signal)
)
.dematerialize()
);
} }
@Override @Override
@ -228,87 +173,62 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
@Nullable LLSort sort, @Nullable LLSort sort,
LLScoreMode scoreMode, LLScoreMode scoreMode,
String keyFieldName) { String keyFieldName) {
return Flux
.fromArray(luceneIndices)
.index()
.flatMap(tuple -> Mono
.fromCallable(() -> resolveSnapshot(snapshot, (int) (long) tuple.getT1()))
.map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot)))
.flatMap(tuple -> tuple.getT1().search(tuple.getT2(), query, limit, sort, scoreMode, keyFieldName))
.reduce(LLSearchResult.accumulator());
}
@Override
public Mono<Void> close() {
return Flux
.fromArray(luceneIndices)
.flatMap(LLLocalLuceneIndex::close)
.then();
}
@Override
public Mono<LLSnapshot> takeSnapshot() {
return Mono return Mono
.fromSupplier(access::readLock) .fromCallable(() -> {
.subscribeOn(Schedulers.boundedElastic()) CopyOnWriteArrayList<LLSnapshot> instancesSnapshots = new CopyOnWriteArrayList<>(new LLSnapshot[luceneIndices.length]);
.flatMap(stamp -> Flux var snapIndex = nextSnapshotNumber.getAndIncrement();
.fromArray(luceneIndices)
.index() ParallelUtils.parallelizeIO((IOBiConsumer<LLLuceneIndex, Integer> s) -> {
.flatMap(tuple -> Mono for (int i = 0; i < luceneIndices.length; i++) {
.fromCallable(() -> resolveSnapshot(snapshot, (int) (long) tuple.getT1())) s.consume(luceneIndices[i], i);
.subscribeOn(Schedulers.boundedElastic()) }
.map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot)) }, maxQueueSize, luceneIndices.length, 1, (instance, i) -> {
) var instanceSnapshot = instance.takeSnapshot();
.flatMap(tuple -> tuple.getT1().search(tuple.getT2(), query, limit, sort, scoreMode, keyFieldName)) //todo: reimplement better (don't block and take them parallel)
.reduce(LLSearchResult.accumulator()) instancesSnapshots.set(i, instanceSnapshot.block());
.materialize() });
.flatMap(signal -> Mono
.fromRunnable(() -> access.unlockRead(stamp)) LLSnapshot[] instancesSnapshotsArray = instancesSnapshots.toArray(LLSnapshot[]::new);
.subscribeOn(Schedulers.boundedElastic()) registeredSnapshots.put(snapIndex, instancesSnapshotsArray);
.thenReturn(signal)
) return new LLSnapshot(snapIndex);
.dematerialize() })
); .subscribeOn(Schedulers.boundedElastic());
} }
@Override @Override
public long count(@Nullable LLSnapshot snapshot, String query) throws IOException { public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
return LockUtils.readLockIO(access, () -> { return Mono
AtomicLong result = new AtomicLong(0); .<Void>fromCallable(() -> {
LLSnapshot[] instancesSnapshots = registeredSnapshots.remove(snapshot.getSequenceNumber());
ParallelUtils.parallelizeIO((IOBiConsumer<LLLuceneIndex, LLSnapshot> s) -> { for (int i = 0; i < luceneIndices.length; i++) {
for (int i = 0; i < luceneIndices.length; i++) { LLLocalLuceneIndex luceneIndex = luceneIndices[i];
s.consume(luceneIndices[i], resolveSnapshot(snapshot, i)); //todo: reimplement better (don't block and take them parallel)
} luceneIndex.releaseSnapshot(instancesSnapshots[i]).block();
}, maxQueueSize, luceneIndices.length, 1, (instance, instanceSnapshot) -> { }
result.addAndGet(instance.count(instanceSnapshot, query)); return null;
}); })
return result.get(); .subscribeOn(Schedulers.boundedElastic());
});
}
@Override
public void close() throws IOException {
LockUtils.writeLockIO(access, () -> {
ParallelUtils.parallelizeIO((IOConsumer<LLLuceneIndex> s) -> {
for (LLLocalLuceneIndex luceneIndex : luceneIndices) {
s.consume(luceneIndex);
}
}, maxQueueSize, luceneIndices.length, 1, Closeable::close);
});
}
@Override
public LLSnapshot takeSnapshot() throws IOException {
return LockUtils.writeLockIO(access, () -> {
CopyOnWriteArrayList<LLSnapshot> instancesSnapshots = new CopyOnWriteArrayList<>(new LLSnapshot[luceneIndices.length]);
var snapIndex = nextSnapshotNumber.getAndIncrement();
ParallelUtils.parallelizeIO((IOBiConsumer<LLLuceneIndex, Integer> s) -> {
for (int i = 0; i < luceneIndices.length; i++) {
s.consume(luceneIndices[i], i);
}
}, maxQueueSize, luceneIndices.length, 1, (instance, i) -> {
var instanceSnapshot = instance.takeSnapshot();
instancesSnapshots.set(i, instanceSnapshot);
});
LLSnapshot[] instancesSnapshotsArray = instancesSnapshots.toArray(LLSnapshot[]::new);
registeredSnapshots.put(snapIndex, instancesSnapshotsArray);
return new LLSnapshot(snapIndex);
});
}
@Override
public void releaseSnapshot(LLSnapshot snapshot) throws IOException {
LockUtils.writeLockIO(access, () -> {
LLSnapshot[] instancesSnapshots = registeredSnapshots.remove(snapshot.getSequenceNumber());
for (int i = 0; i < luceneIndices.length; i++) {
LLLocalLuceneIndex luceneIndex = luceneIndices[i];
luceneIndex.releaseSnapshot(instancesSnapshots[i]);
}
});
} }
@Override @Override