From 52d4f022bdccb1b351a63f6fb4f8abb5f8fe2c21 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 30 Jan 2021 01:41:04 +0100 Subject: [PATCH] Update LLLuceneIndex.java, LLSnapshottable.java, and 2 more files... --- .../dbengine/database/LLLuceneIndex.java | 24 +- .../dbengine/database/LLSnapshottable.java | 6 +- .../database/disk/LLLocalLuceneIndex.java | 166 ++++++------ .../disk/LLLocalMultiLuceneIndex.java | 254 ++++++------------ 4 files changed, 189 insertions(+), 261 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java index cbc0ff1..58361b9 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java @@ -1,27 +1,25 @@ package it.cavallium.dbengine.database; -import java.io.Closeable; -import java.io.IOException; import java.util.Map; import java.util.Set; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; -public interface LLLuceneIndex extends Closeable, LLSnapshottable { +public interface LLLuceneIndex extends LLSnapshottable { String getLuceneIndexName(); - void addDocument(LLTerm id, LLDocument doc) throws IOException; + Mono addDocument(LLTerm id, LLDocument doc); - void addDocuments(Iterable keys, Iterable documents) throws IOException; + Mono addDocuments(Iterable keys, Iterable documents); - void deleteDocument(LLTerm id) throws IOException; + Mono deleteDocument(LLTerm id); - void updateDocument(LLTerm id, LLDocument document) throws IOException; + Mono updateDocument(LLTerm id, LLDocument document); - void updateDocuments(Iterable ids, Iterable documents) throws IOException; + Mono updateDocuments(Iterable ids, Iterable documents); - void deleteAll() throws IOException; + Mono deleteAll(); /** * @@ -49,7 +47,13 @@ public interface LLLuceneIndex extends Closeable, LLSnapshottable { LLScoreMode scoreMode, String keyFieldName); - long count(@Nullable LLSnapshot snapshot, String query) throws IOException; + default Mono count(@Nullable LLSnapshot snapshot, String queryString) { + return this.search(snapshot, queryString, 0, null, null, null) + .flatMap(LLSearchResult::totalHitsCount) + .single(); + } boolean isLowMemoryMode(); + + Mono close(); } diff --git a/src/main/java/it/cavallium/dbengine/database/LLSnapshottable.java b/src/main/java/it/cavallium/dbengine/database/LLSnapshottable.java index 1e5df8a..9639116 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLSnapshottable.java +++ b/src/main/java/it/cavallium/dbengine/database/LLSnapshottable.java @@ -1,10 +1,10 @@ package it.cavallium.dbengine.database; -import java.io.IOException; +import reactor.core.publisher.Mono; public interface LLSnapshottable { - LLSnapshot takeSnapshot() throws IOException; + Mono takeSnapshot(); - void releaseSnapshot(LLSnapshot snapshot) throws IOException; + Mono releaseSnapshot(LLSnapshot snapshot); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 256089c..a8dc268 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -14,7 +14,6 @@ import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer; import it.cavallium.dbengine.database.luceneutil.AdaptiveStreamSearcher; import it.cavallium.dbengine.database.luceneutil.LuceneStreamSearcher; import it.cavallium.dbengine.database.luceneutil.PagedStreamSearcher; -import it.cavallium.luceneserializer.luceneserializer.ParseException; import it.cavallium.luceneserializer.luceneserializer.QueryParser; import java.io.IOException; import java.nio.file.Path; @@ -44,7 +43,6 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.jetbrains.annotations.Nullable; import org.warp.commonutils.concurrency.executor.ScheduledTaskLifecycle; -import org.warp.commonutils.functional.IOFunction; import org.warp.commonutils.type.ShortNamedThreadFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -137,87 +135,115 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } @Override - public LLSnapshot takeSnapshot() throws IOException { - - long snapshotSeqNo = lastSnapshotSeqNo.incrementAndGet(); - - IndexCommit snapshot = takeLuceneSnapshot(); - this.snapshots.put(snapshotSeqNo, new LuceneIndexSnapshot(snapshot)); - return new LLSnapshot(snapshotSeqNo); + public Mono takeSnapshot() { + return Mono + .fromCallable(lastSnapshotSeqNo::incrementAndGet) + .subscribeOn(Schedulers.boundedElastic()) + .flatMap(snapshotSeqNo -> takeLuceneSnapshot() + .flatMap(snapshot -> Mono + .fromCallable(() -> { + this.snapshots.put(snapshotSeqNo, new LuceneIndexSnapshot(snapshot)); + return new LLSnapshot(snapshotSeqNo); + }) + .subscribeOn(Schedulers.boundedElastic()) + ) + ); } /** * Use internally. This method commits before taking the snapshot if there are no commits in a new database, * avoiding the exception. */ - private IndexCommit takeLuceneSnapshot() throws IOException { - try { - return snapshotter.snapshot(); - } catch (IllegalStateException ex) { - if ("No index commit to snapshot".equals(ex.getMessage())) { - indexWriter.commit(); + private Mono takeLuceneSnapshot() { + return Mono.fromCallable(() -> { + try { return snapshotter.snapshot(); - } else { - throw ex; + } catch (IllegalStateException ex) { + if ("No index commit to snapshot".equals(ex.getMessage())) { + indexWriter.commit(); + return snapshotter.snapshot(); + } else { + throw ex; + } } - } + }).subscribeOn(Schedulers.boundedElastic()); } @Override - public void releaseSnapshot(LLSnapshot snapshot) throws IOException { - var indexSnapshot = this.snapshots.remove(snapshot.getSequenceNumber()); - if (indexSnapshot == null) { - throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!"); - } + public Mono releaseSnapshot(LLSnapshot snapshot) { + return Mono.fromCallable(() -> { + var indexSnapshot = this.snapshots.remove(snapshot.getSequenceNumber()); + if (indexSnapshot == null) { + throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!"); + } - indexSnapshot.close(); + indexSnapshot.close(); - var luceneIndexSnapshot = indexSnapshot.getSnapshot(); - snapshotter.release(luceneIndexSnapshot); - // Delete unused files after releasing the snapshot - indexWriter.deleteUnusedFiles(); + var luceneIndexSnapshot = indexSnapshot.getSnapshot(); + snapshotter.release(luceneIndexSnapshot); + // Delete unused files after releasing the snapshot + indexWriter.deleteUnusedFiles(); + return null; + }).subscribeOn(Schedulers.boundedElastic()); } @Override - public void addDocument(LLTerm key, LLDocument doc) throws IOException { - indexWriter.addDocument(LLUtils.toDocument(doc)); + public Mono addDocument(LLTerm key, LLDocument doc) { + return Mono.fromCallable(() -> { + indexWriter.addDocument(LLUtils.toDocument(doc)); + return null; + }).subscribeOn(Schedulers.boundedElastic()); } @Override - public void addDocuments(Iterable keys, Iterable docs) throws IOException { + public Mono addDocuments(Iterable keys, Iterable docs) { + return Mono.fromCallable(() -> { indexWriter.addDocuments(LLUtils.toDocuments(docs)); + return null; + }).subscribeOn(Schedulers.boundedElastic()); } @Override - public void deleteDocument(LLTerm id) throws IOException { - indexWriter.deleteDocuments(LLUtils.toTerm(id)); + public Mono deleteDocument(LLTerm id) { + return Mono.fromCallable(() -> { + indexWriter.deleteDocuments(LLUtils.toTerm(id)); + return null; + }).subscribeOn(Schedulers.boundedElastic()); } @Override - public void updateDocument(LLTerm id, LLDocument document) throws IOException { - indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document)); + public Mono updateDocument(LLTerm id, LLDocument document) { + return Mono.fromCallable(() -> { + indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document)); + return null; + }).subscribeOn(Schedulers.boundedElastic()); } @Override - public void updateDocuments(Iterable ids, Iterable documents) - throws IOException { - var idIt = ids.iterator(); - var docIt = documents.iterator(); - while (idIt.hasNext()) { - var id = idIt.next(); - var doc = docIt.next(); + public Mono updateDocuments(Iterable ids, Iterable documents) { + return Mono.fromCallable(() -> { + var idIt = ids.iterator(); + var docIt = documents.iterator(); + while (idIt.hasNext()) { + var id = idIt.next(); + var doc = docIt.next(); - indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(doc)); - } + indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(doc)); + } + return null; + }).subscribeOn(Schedulers.boundedElastic()); } @Override - public void deleteAll() throws IOException { - indexWriter.deleteAll(); - indexWriter.commit(); - indexWriter.forceMergeDeletes(true); - indexWriter.flush(); - indexWriter.commit(); + public Mono deleteAll() { + return Mono.fromCallable(() -> { + indexWriter.deleteAll(); + indexWriter.commit(); + indexWriter.forceMergeDeletes(true); + indexWriter.flush(); + indexWriter.commit(); + return null; + }).subscribeOn(Schedulers.boundedElastic()); } private Mono acquireSearcherWrapper(LLSnapshot snapshot) { @@ -351,23 +377,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } @Override - public long count(@Nullable LLSnapshot snapshot, String queryString) throws IOException { - try { - var luceneIndexSnapshot = resolveSnapshot(snapshot); - - Query query = QueryParser.parse(queryString); - - return (long) runSearch(luceneIndexSnapshot, (indexSearcher) -> indexSearcher.count(query)); - } catch (ParseException e) { - throw new IOException("Error during query count!", e); - } - } - - @Override - public void close() throws IOException { - scheduledTasksLifecycle.cancelAndWait(); - indexWriter.close(); - directory.close(); + public Mono close() { + return Mono + .fromCallable(() -> { + scheduledTasksLifecycle.cancelAndWait(); + indexWriter.close(); + directory.close(); + return null; + }) + .subscribeOn(Schedulers.boundedElastic()); } private void scheduledCommit() { @@ -390,20 +408,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } } - private U runSearch(@Nullable LuceneIndexSnapshot snapshot, IOFunction searchExecutor) - throws IOException { - if (snapshot != null) { - return searchExecutor.apply(snapshot.getIndexSearcher()); - } else { - var indexSearcher = searcherManager.acquire(); - try { - return searchExecutor.apply(indexSearcher); - } finally { - searcherManager.release(indexSearcher); - } - } - } - private LuceneIndexSnapshot resolveSnapshot(@Nullable LLSnapshot snapshot) { if (snapshot == null) { return null; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index 427144f..d837e8c 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -1,37 +1,30 @@ package it.cavallium.dbengine.database.disk; import it.cavallium.dbengine.database.LLDocument; -import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.LLScoreMode; import it.cavallium.dbengine.database.LLSearchResult; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLSort; import it.cavallium.dbengine.database.LLTerm; -import it.cavallium.dbengine.database.LLTopKeys; import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import it.unimi.dsi.fastutil.longs.Long2ObjectMap; import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; -import java.io.Closeable; import java.io.IOException; import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.StampedLock; import org.jetbrains.annotations.Nullable; import org.warp.commonutils.batch.ParallelUtils; import org.warp.commonutils.functional.IOBiConsumer; -import org.warp.commonutils.functional.IOConsumer; -import org.warp.commonutils.functional.IOTriConsumer; -import org.warp.commonutils.locks.LockUtils; +import org.warp.commonutils.functional.TriFunction; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -42,7 +35,6 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { private final Long2ObjectMap registeredSnapshots = new Long2ObjectOpenHashMap<>(); private final AtomicLong nextSnapshotNumber = new AtomicLong(1); private final LLLocalLuceneIndex[] luceneIndices; - private final StampedLock access = new StampedLock(); private final int maxQueueSize = 1000; @@ -87,29 +79,22 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { @Override public String getLuceneIndexName() { - return LockUtils.readLock(access, () -> luceneIndices[0].getLuceneIndexName()); + return luceneIndices[0].getLuceneIndexName(); } @Override - public void addDocument(LLTerm id, LLDocument doc) throws IOException { - LockUtils.readLockIO(access, () -> getLuceneIndex(id).addDocument(id, doc)); + public Mono addDocument(LLTerm id, LLDocument doc) { + return getLuceneIndex(id).addDocument(id, doc); } @Override - public void addDocuments(Iterable keys, Iterable documents) throws IOException { - LockUtils.readLockIO(access, () -> { - ParallelUtils.parallelizeIO(s -> runPerInstance(keys, documents, s), - maxQueueSize, - luceneIndices.length, - 1, - LLLuceneIndex::addDocuments - ); - }); + public Mono addDocuments(Iterable keys, Iterable documents) { + return runPerInstance(keys, documents, LLLuceneIndex::addDocuments); } - private void runPerInstance(Iterable keys, + private Mono runPerInstance(Iterable keys, Iterable documents, - IOTriConsumer, Iterable> consumer) throws IOException { + TriFunction, Iterable, Mono> consumer) { var keysIt = keys.iterator(); var docsIt = documents.iterator(); @@ -125,64 +110,37 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { perInstanceDocs.computeIfAbsent(instanceId, iid -> new ArrayList<>()).add(doc); } - for (Int2ObjectMap.Entry> currentInstanceEntry : perInstanceKeys.int2ObjectEntrySet()) { - int instanceId = currentInstanceEntry.getIntKey(); - List currentInstanceKeys = currentInstanceEntry.getValue(); - consumer.accept(this.luceneIndices[instanceId], currentInstanceKeys, perInstanceDocs.get(instanceId)); - } + return Flux + .fromIterable(perInstanceKeys.int2ObjectEntrySet()) + .flatMap(currentInstanceEntry -> { + int instanceId = currentInstanceEntry.getIntKey(); + List currentInstanceKeys = currentInstanceEntry.getValue(); + return consumer.apply(this.luceneIndices[instanceId], currentInstanceKeys, perInstanceDocs.get(instanceId)); + }) + .then(); } @Override - public void deleteDocument(LLTerm id) throws IOException { - LockUtils.readLockIO(access, () -> getLuceneIndex(id).deleteDocument(id)); + public Mono deleteDocument(LLTerm id) { + return getLuceneIndex(id).deleteDocument(id); } @Override - public void updateDocument(LLTerm id, LLDocument document) throws IOException { - LockUtils.readLockIO(access, () -> getLuceneIndex(id).updateDocument(id, document)); + public Mono updateDocument(LLTerm id, LLDocument document) { + return getLuceneIndex(id).updateDocument(id, document); } @Override - public void updateDocuments(Iterable keys, Iterable documents) throws IOException { - LockUtils.readLockIO(access, () -> { - ParallelUtils.parallelizeIO(s -> runPerInstance(keys, documents, s), - maxQueueSize, - luceneIndices.length, - 1, - LLLuceneIndex::updateDocuments - ); - }); + public Mono updateDocuments(Iterable keys, Iterable documents) { + return runPerInstance(keys, documents, LLLuceneIndex::updateDocuments); } @Override - public void deleteAll() throws IOException { - LockUtils.writeLockIO(access, () -> { - ParallelUtils.parallelizeIO((IOConsumer s) -> { - for (LLLocalLuceneIndex luceneIndex : luceneIndices) { - s.consume(luceneIndex); - } - }, maxQueueSize, luceneIndices.length, 1, LLLuceneIndex::deleteAll); - }); - } - - private LLTopKeys mergeTopKeys(Collection multi) { - long totalHitsCount = 0; - LLKeyScore[] hits; - int hitsArraySize = 0; - for (LLTopKeys llTopKeys : multi) { - totalHitsCount += llTopKeys.getTotalHitsCount(); - hitsArraySize += llTopKeys.getHits().length; - } - hits = new LLKeyScore[hitsArraySize]; - - int offset = 0; - for (LLTopKeys llTopKeys : multi) { - var part = llTopKeys.getHits(); - System.arraycopy(part, 0, hits, offset, part.length); - offset += part.length; - } - - return new LLTopKeys(totalHitsCount, hits); + public Mono deleteAll() { + return Flux + .fromArray(luceneIndices) + .flatMap(LLLocalLuceneIndex::deleteAll) + .then(); } private LLSnapshot resolveSnapshot(LLSnapshot multiSnapshot, int instanceId) { @@ -198,27 +156,14 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { Map> mltDocumentFields, int limit, String keyFieldName) { - return Mono - .fromSupplier(access::readLock) - .subscribeOn(Schedulers.boundedElastic()) - .flatMap(stamp -> Flux - .fromArray(luceneIndices) - .index() - .flatMap(tuple -> Mono - .fromCallable(() -> resolveSnapshot(snapshot, (int) (long) tuple.getT1())) - .subscribeOn(Schedulers.boundedElastic()) - .map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot)) - ) - .flatMap(tuple -> tuple.getT1().moreLikeThis(tuple.getT2(), mltDocumentFields, limit, keyFieldName)) - .reduce(LLSearchResult.accumulator()) - .materialize() - .flatMap(signal -> Mono - .fromRunnable(() -> access.unlockRead(stamp)) - .subscribeOn(Schedulers.boundedElastic()) - .thenReturn(signal) - ) - .dematerialize() - ); + return Flux + .fromArray(luceneIndices) + .index() + .flatMap(tuple -> Mono + .fromCallable(() -> resolveSnapshot(snapshot, (int) (long) tuple.getT1())) + .map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot))) + .flatMap(tuple -> tuple.getT1().moreLikeThis(tuple.getT2(), mltDocumentFields, limit, keyFieldName)) + .reduce(LLSearchResult.accumulator()); } @Override @@ -228,87 +173,62 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { @Nullable LLSort sort, LLScoreMode scoreMode, String keyFieldName) { + return Flux + .fromArray(luceneIndices) + .index() + .flatMap(tuple -> Mono + .fromCallable(() -> resolveSnapshot(snapshot, (int) (long) tuple.getT1())) + .map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot))) + .flatMap(tuple -> tuple.getT1().search(tuple.getT2(), query, limit, sort, scoreMode, keyFieldName)) + .reduce(LLSearchResult.accumulator()); + } + + @Override + public Mono close() { + return Flux + .fromArray(luceneIndices) + .flatMap(LLLocalLuceneIndex::close) + .then(); + } + + @Override + public Mono takeSnapshot() { return Mono - .fromSupplier(access::readLock) - .subscribeOn(Schedulers.boundedElastic()) - .flatMap(stamp -> Flux - .fromArray(luceneIndices) - .index() - .flatMap(tuple -> Mono - .fromCallable(() -> resolveSnapshot(snapshot, (int) (long) tuple.getT1())) - .subscribeOn(Schedulers.boundedElastic()) - .map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot)) - ) - .flatMap(tuple -> tuple.getT1().search(tuple.getT2(), query, limit, sort, scoreMode, keyFieldName)) - .reduce(LLSearchResult.accumulator()) - .materialize() - .flatMap(signal -> Mono - .fromRunnable(() -> access.unlockRead(stamp)) - .subscribeOn(Schedulers.boundedElastic()) - .thenReturn(signal) - ) - .dematerialize() - ); + .fromCallable(() -> { + CopyOnWriteArrayList instancesSnapshots = new CopyOnWriteArrayList<>(new LLSnapshot[luceneIndices.length]); + var snapIndex = nextSnapshotNumber.getAndIncrement(); + + ParallelUtils.parallelizeIO((IOBiConsumer s) -> { + for (int i = 0; i < luceneIndices.length; i++) { + s.consume(luceneIndices[i], i); + } + }, maxQueueSize, luceneIndices.length, 1, (instance, i) -> { + var instanceSnapshot = instance.takeSnapshot(); + //todo: reimplement better (don't block and take them parallel) + instancesSnapshots.set(i, instanceSnapshot.block()); + }); + + LLSnapshot[] instancesSnapshotsArray = instancesSnapshots.toArray(LLSnapshot[]::new); + registeredSnapshots.put(snapIndex, instancesSnapshotsArray); + + return new LLSnapshot(snapIndex); + }) + .subscribeOn(Schedulers.boundedElastic()); } @Override - public long count(@Nullable LLSnapshot snapshot, String query) throws IOException { - return LockUtils.readLockIO(access, () -> { - AtomicLong result = new AtomicLong(0); - - ParallelUtils.parallelizeIO((IOBiConsumer s) -> { - for (int i = 0; i < luceneIndices.length; i++) { - s.consume(luceneIndices[i], resolveSnapshot(snapshot, i)); - } - }, maxQueueSize, luceneIndices.length, 1, (instance, instanceSnapshot) -> { - result.addAndGet(instance.count(instanceSnapshot, query)); - }); - return result.get(); - }); - } - - @Override - public void close() throws IOException { - LockUtils.writeLockIO(access, () -> { - ParallelUtils.parallelizeIO((IOConsumer s) -> { - for (LLLocalLuceneIndex luceneIndex : luceneIndices) { - s.consume(luceneIndex); - } - }, maxQueueSize, luceneIndices.length, 1, Closeable::close); - }); - } - - @Override - public LLSnapshot takeSnapshot() throws IOException { - return LockUtils.writeLockIO(access, () -> { - CopyOnWriteArrayList instancesSnapshots = new CopyOnWriteArrayList<>(new LLSnapshot[luceneIndices.length]); - var snapIndex = nextSnapshotNumber.getAndIncrement(); - - ParallelUtils.parallelizeIO((IOBiConsumer s) -> { - for (int i = 0; i < luceneIndices.length; i++) { - s.consume(luceneIndices[i], i); - } - }, maxQueueSize, luceneIndices.length, 1, (instance, i) -> { - var instanceSnapshot = instance.takeSnapshot(); - instancesSnapshots.set(i, instanceSnapshot); - }); - - LLSnapshot[] instancesSnapshotsArray = instancesSnapshots.toArray(LLSnapshot[]::new); - registeredSnapshots.put(snapIndex, instancesSnapshotsArray); - - return new LLSnapshot(snapIndex); - }); - } - - @Override - public void releaseSnapshot(LLSnapshot snapshot) throws IOException { - LockUtils.writeLockIO(access, () -> { - LLSnapshot[] instancesSnapshots = registeredSnapshots.remove(snapshot.getSequenceNumber()); - for (int i = 0; i < luceneIndices.length; i++) { - LLLocalLuceneIndex luceneIndex = luceneIndices[i]; - luceneIndex.releaseSnapshot(instancesSnapshots[i]); - } - }); + public Mono releaseSnapshot(LLSnapshot snapshot) { + return Mono + .fromCallable(() -> { + LLSnapshot[] instancesSnapshots = registeredSnapshots.remove(snapshot.getSequenceNumber()); + for (int i = 0; i < luceneIndices.length; i++) { + LLLocalLuceneIndex luceneIndex = luceneIndices[i]; + //todo: reimplement better (don't block and take them parallel) + luceneIndex.releaseSnapshot(instancesSnapshots[i]).block(); + } + return null; + }) + .subscribeOn(Schedulers.boundedElastic()); } @Override