Better implementation of snapshots in lucene shards

This commit is contained in:
Andrea Cavalli 2021-06-25 20:07:19 +02:00
parent 47f6081cde
commit 4640d0dfad

View File

@ -32,6 +32,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.search.CollectionStatistics; import org.apache.lucene.search.CollectionStatistics;
@ -48,15 +50,16 @@ import reactor.util.function.Tuples;
public class LLLocalMultiLuceneIndex implements LLLuceneIndex { public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
private final Long2ObjectMap<LLSnapshot[]> registeredSnapshots = new Long2ObjectOpenHashMap<>(); private final ConcurrentHashMap<Long, LLSnapshot[]> registeredSnapshots = new ConcurrentHashMap<>();
private final AtomicLong nextSnapshotNumber = new AtomicLong(1); private final AtomicLong nextSnapshotNumber = new AtomicLong(1);
private final LLLocalLuceneIndex[] luceneIndices; private final LLLocalLuceneIndex[] luceneIndices;
private final AtomicLong nextActionId = new AtomicLong(0); private final AtomicLong nextActionId = new AtomicLong(0);
private final ConcurrentHashMap<Long, Cache<String, Optional<CollectionStatistics>>[]> statistics = new ConcurrentHashMap<>(); private final ConcurrentHashMap<Long, Cache<String, Optional<CollectionStatistics>>[]> statistics = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Long, AtomicInteger> completedStreams = new ConcurrentHashMap<>(); private final ConcurrentHashMap<Long, AtomicInteger> completedStreams = new ConcurrentHashMap<>();
private final int maxQueueSize = 1000;
public LLLocalMultiLuceneIndex(Path lucene, public LLLocalMultiLuceneIndex(Path lucene,
String name, String name,
@ -72,6 +75,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
} }
LLLocalLuceneIndex[] luceneIndices = new LLLocalLuceneIndex[instancesCount]; LLLocalLuceneIndex[] luceneIndices = new LLLocalLuceneIndex[instancesCount];
ExecutorService[] luceneIndexExecutorServices = new ExecutorService[instancesCount];
for (int i = 0; i < instancesCount; i++) { for (int i = 0; i < instancesCount; i++) {
int finalI = i; int finalI = i;
String instanceName; String instanceName;
@ -93,8 +97,10 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
actionId actionId
) )
); );
luceneIndexExecutorServices[i] = Executors.newSingleThreadExecutor();
} }
this.luceneIndices = luceneIndices; this.luceneIndices = luceneIndices;
this.indexExecutorServices = luceneIndexExecutorServices;
} }
private long newAction() { private long newAction() {
@ -437,42 +443,30 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
@Override @Override
public Mono<LLSnapshot> takeSnapshot() { public Mono<LLSnapshot> takeSnapshot() {
return Mono return Mono
.fromCallable(() -> { // Generate next snapshot index
CopyOnWriteArrayList<LLSnapshot> instancesSnapshots .fromCallable(nextSnapshotNumber::getAndIncrement)
= new CopyOnWriteArrayList<>(new LLSnapshot[luceneIndices.length]); .flatMap(snapshotIndex -> Flux
var snapIndex = nextSnapshotNumber.getAndIncrement(); .fromArray(luceneIndices)
.flatMapSequential(LLLocalLuceneIndex::takeSnapshot)
ParallelUtils.parallelizeIO((IOBiConsumer<LLLuceneIndex, Integer> s) -> { .collectList()
for (int i = 0; i < luceneIndices.length; i++) { .map(list -> list.toArray(LLSnapshot[]::new))
s.consume(luceneIndices[i], i); .doOnNext(instancesSnapshotsArray -> registeredSnapshots.put(snapshotIndex, instancesSnapshotsArray))
} .thenReturn(new LLSnapshot(snapshotIndex))
}, maxQueueSize, luceneIndices.length, 1, (instance, i) -> { );
var instanceSnapshot = instance.takeSnapshot();
//todo: reimplement better (don't block and take them parallel)
instancesSnapshots.set(i, instanceSnapshot.block());
});
LLSnapshot[] instancesSnapshotsArray = instancesSnapshots.toArray(LLSnapshot[]::new);
registeredSnapshots.put(snapIndex, instancesSnapshotsArray);
return new LLSnapshot(snapIndex);
})
.subscribeOn(Schedulers.boundedElastic());
} }
@Override @Override
public Mono<Void> releaseSnapshot(LLSnapshot snapshot) { public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
return Mono return Mono
.<Void>fromCallable(() -> { .fromCallable(() -> registeredSnapshots.remove(snapshot.getSequenceNumber()))
LLSnapshot[] instancesSnapshots = registeredSnapshots.remove(snapshot.getSequenceNumber()); .flatMapMany(Flux::fromArray)
for (int i = 0; i < luceneIndices.length; i++) { .index()
LLLocalLuceneIndex luceneIndex = luceneIndices[i]; .flatMapSequential(tuple -> {
//todo: reimplement better (don't block and take them parallel) int index = (int) (long) tuple.getT1();
luceneIndex.releaseSnapshot(instancesSnapshots[i]).block(); LLSnapshot instanceSnapshot = tuple.getT2();
} return luceneIndices[index].releaseSnapshot(instanceSnapshot);
return null;
}) })
.subscribeOn(Schedulers.boundedElastic()); .then();
} }
@Override @Override