From fb5febce324d294b9911e0ab03093c9707efb1d9 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sun, 24 Jan 2021 03:15:05 +0100 Subject: [PATCH] Update LLLocalMultiLuceneIndex.java --- .../disk/LLLocalMultiLuceneIndex.java | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index 36c3e48..b1f3556 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -1,5 +1,13 @@ package it.cavallium.dbengine.database.disk; +import it.cavallium.dbengine.database.LLDocument; +import it.cavallium.dbengine.database.LLKeyScore; +import it.cavallium.dbengine.database.LLLuceneIndex; +import it.cavallium.dbengine.database.LLSnapshot; +import it.cavallium.dbengine.database.LLSort; +import it.cavallium.dbengine.database.LLTerm; +import it.cavallium.dbengine.database.LLTopKeys; +import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import it.unimi.dsi.fastutil.longs.Long2ObjectMap; @@ -16,7 +24,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.StampedLock; import java.util.function.BiConsumer; import java.util.stream.Collectors; import org.jetbrains.annotations.Nullable; @@ -25,14 +33,6 @@ import org.warp.commonutils.functional.IOBiConsumer; import org.warp.commonutils.functional.IOConsumer; import org.warp.commonutils.functional.IOTriConsumer; import org.warp.commonutils.locks.LockUtils; -import it.cavallium.dbengine.database.LLDocument; -import it.cavallium.dbengine.database.LLKeyScore; -import it.cavallium.dbengine.database.LLLuceneIndex; -import it.cavallium.dbengine.database.LLSnapshot; -import it.cavallium.dbengine.database.LLSort; -import it.cavallium.dbengine.database.LLTerm; -import it.cavallium.dbengine.database.LLTopKeys; -import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; @@ -43,7 +43,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { private final Long2ObjectMap registeredSnapshots = new Long2ObjectOpenHashMap<>(); private final AtomicLong nextSnapshotNumber = new AtomicLong(1); private final LLLocalLuceneIndex[] luceneIndices; - private final ReentrantReadWriteLock access = new ReentrantReadWriteLock(); + private final StampedLock access = new StampedLock(); private final int maxQueueSize = 1000; @@ -88,17 +88,17 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { @Override public String getLuceneIndexName() { - return LockUtils.lock(access.readLock(), () -> luceneIndices[0].getLuceneIndexName()); + return LockUtils.readLock(access, () -> luceneIndices[0].getLuceneIndexName()); } @Override public void addDocument(LLTerm id, LLDocument doc) throws IOException { - LockUtils.lockIO(access.readLock(), () -> getLuceneIndex(id).addDocument(id, doc)); + LockUtils.readLockIO(access, () -> getLuceneIndex(id).addDocument(id, doc)); } @Override public void addDocuments(Iterable keys, Iterable documents) throws IOException { - LockUtils.lockIO(access.readLock(), () -> { + LockUtils.readLockIO(access, () -> { ParallelUtils.parallelizeIO(s -> runPerInstance(keys, documents, s), maxQueueSize, luceneIndices.length, @@ -135,17 +135,17 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { @Override public void deleteDocument(LLTerm id) throws IOException { - LockUtils.lockIO(access.readLock(), () -> getLuceneIndex(id).deleteDocument(id)); + LockUtils.readLockIO(access, () -> getLuceneIndex(id).deleteDocument(id)); } @Override public void updateDocument(LLTerm id, LLDocument document) throws IOException { - LockUtils.lockIO(access.readLock(), () -> getLuceneIndex(id).updateDocument(id, document)); + LockUtils.readLockIO(access, () -> getLuceneIndex(id).updateDocument(id, document)); } @Override public void updateDocuments(Iterable keys, Iterable documents) throws IOException { - LockUtils.lockIO(access.readLock(), () -> { + LockUtils.readLockIO(access, () -> { ParallelUtils.parallelizeIO(s -> runPerInstance(keys, documents, s), maxQueueSize, luceneIndices.length, @@ -157,7 +157,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { @Override public void deleteAll() throws IOException { - LockUtils.lockIO(access.writeLock(), () -> { + LockUtils.writeLockIO(access, () -> { ParallelUtils.parallelizeIO((IOConsumer s) -> { for (LLLocalLuceneIndex luceneIndex : luceneIndices) { s.consume(luceneIndex); @@ -172,7 +172,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { int limit, @Nullable LLSort sort, String keyFieldName) throws IOException { - return LockUtils.lockIO(access.readLock(), () -> { + return LockUtils.readLockIO(access, () -> { Collection> result = new ConcurrentLinkedQueue<>(); ParallelUtils.parallelizeIO((IOBiConsumer s) -> { @@ -219,7 +219,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { Map> mltDocumentFields, int limit, String keyFieldName) throws IOException { - return LockUtils.lockIO(access.readLock(), () -> { + return LockUtils.readLockIO(access, () -> { Collection> result = new ConcurrentLinkedQueue<>(); ParallelUtils.parallelizeIO((IOBiConsumer s) -> { @@ -239,7 +239,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { int limit, @Nullable LLSort sort, String keyFieldName) { - Collection, Collection>>> multi = LockUtils.lock(access.readLock(), () -> { + Collection, Collection>>> multi = LockUtils.readLock(access, () -> { Collection, Collection>>> result = new ConcurrentLinkedQueue<>(); ParallelUtils.parallelize((BiConsumer s) -> { @@ -270,7 +270,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { @Override public long count(@Nullable LLSnapshot snapshot, String query) throws IOException { - return LockUtils.lockIO(access.readLock(), () -> { + return LockUtils.readLockIO(access, () -> { AtomicLong result = new AtomicLong(0); ParallelUtils.parallelizeIO((IOBiConsumer s) -> { @@ -286,7 +286,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { @Override public void close() throws IOException { - LockUtils.lockIO(access.writeLock(), () -> { + LockUtils.writeLockIO(access, () -> { ParallelUtils.parallelizeIO((IOConsumer s) -> { for (LLLocalLuceneIndex luceneIndex : luceneIndices) { s.consume(luceneIndex); @@ -297,7 +297,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { @Override public LLSnapshot takeSnapshot() throws IOException { - return LockUtils.lockIO(access.writeLock(), () -> { + return LockUtils.writeLockIO(access, () -> { CopyOnWriteArrayList instancesSnapshots = new CopyOnWriteArrayList<>(new LLSnapshot[luceneIndices.length]); var snapIndex = nextSnapshotNumber.getAndIncrement(); @@ -319,7 +319,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { @Override public void releaseSnapshot(LLSnapshot snapshot) throws IOException { - LockUtils.lockIO(access.writeLock(), () -> { + LockUtils.writeLockIO(access, () -> { LLSnapshot[] instancesSnapshots = registeredSnapshots.remove(snapshot.getSequenceNumber()); for (int i = 0; i < luceneIndices.length; i++) { LLLocalLuceneIndex luceneIndex = luceneIndices[i];