From 554facde13b14798467e40735889262f8dcfb281 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Mon, 1 Feb 2021 02:21:53 +0100 Subject: [PATCH] Update Example.java, DatabaseMapDictionary.java, and 4 more files... --- .../it.cavallium.dbengine.client/Example.java | 57 +++++-- .../collections/DatabaseMapDictionary.java | 8 + .../collections/SubStageGetterSingle.java | 2 +- .../database/disk/LLLocalDictionary.java | 141 ++++++++++++------ .../disk/LLLocalKeyValueDatabase.java | 21 ++- .../database/disk/LLLocalLuceneIndex.java | 36 ++--- 6 files changed, 181 insertions(+), 84 deletions(-) diff --git a/src/example/java/it.cavallium.dbengine.client/Example.java b/src/example/java/it.cavallium.dbengine.client/Example.java index ddd641b..55a7ab6 100644 --- a/src/example/java/it.cavallium.dbengine.client/Example.java +++ b/src/example/java/it.cavallium.dbengine.client/Example.java @@ -5,20 +5,24 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.LLKeyValueDatabase; -import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep; import it.cavallium.dbengine.database.collections.DatabaseMapDictionary; +import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep; import it.cavallium.dbengine.database.collections.FixedLengthSerializer; import it.cavallium.dbengine.database.collections.Serializer; import it.cavallium.dbengine.database.collections.SubStageGetterSingleBytes; import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection; +import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.text.DecimalFormat; import java.time.Duration; import java.time.Instant; import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Locale; +import java.util.concurrent.CompletionException; import java.util.function.Function; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -30,10 +34,11 @@ import reactor.util.function.Tuples; public class Example { private static final boolean printPreviousValue = false; - private static final int numRepeats = 500; - private static final int batchSize = 1000; + private static final int numRepeats = 100; + private static final int batchSize = 10000; public static void main(String[] args) throws InterruptedException { + /* testAtPut(); testPutValueAndGetPrevious(); testPutValue(); @@ -45,6 +50,14 @@ public class Example { .then(rangeTestPutMulti()) .subscribeOn(Schedulers.parallel()) .blockOptional(); + + + */ + + testPutMulti() + .then(rangeTestPutMulti()) + .subscribeOn(Schedulers.parallel()) + .blockOptional(); } private static Mono testAtPut() { @@ -128,9 +141,8 @@ public class Example { private static Mono testPutMulti() { var ssg = new SubStageGetterSingleBytes(); var ser = FixedLengthSerializer.noop(4); - int batchSize = 1000; HashMap keysToPut = new HashMap<>(); - for (int i = 0; i < 1000; i++) { + for (int i = 0; i < batchSize; i++) { keysToPut.put(Unpooled.wrappedBuffer(Ints.toByteArray(i * 3)), Ints.toByteArray(i * 11)); } var putMultiFlux = Flux.fromIterable(keysToPut.entrySet()); @@ -243,8 +255,25 @@ public class Example { } private static Mono tempDb() { - return new LLLocalDatabaseConnection(Path.of("/tmp/"), true) - .connect() + var wrkspcPath = Path.of("/home/ubuntu/tempdb/"); + return Mono + .fromCallable(() -> { + if (Files.exists(wrkspcPath)) { + Files.walk(wrkspcPath) + .sorted(Comparator.reverseOrder()) + .forEach(file -> { + try { + Files.delete(file); + } catch (IOException ex) { + throw new CompletionException(ex); + } + }); + } + Files.createDirectories(wrkspcPath); + return null; + }) + .subscribeOn(Schedulers.boundedElastic()) + .then(new LLLocalDatabaseConnection(wrkspcPath, true).connect()) .flatMap(conn -> conn.getDatabase("testdb", List.of(Column.dictionary("testmap")), false)); } @@ -256,14 +285,13 @@ public class Example { Duration WAIT_TIME = Duration.ofSeconds(5); Duration WAIT_TIME_END = Duration.ofSeconds(5); return Mono - .fromRunnable(() -> instantInit.tryEmitValue(now())) + .delay(WAIT_TIME) + .then(Mono.fromRunnable(() -> instantInit.tryEmitValue(now()))) .then(setup) - .delayElement(WAIT_TIME) .doOnSuccess(s -> instantInitTest.tryEmitValue(now())) .flatMap(a ->Mono.defer(() -> test.apply(a)).repeat(numRepeats) .then() .doOnSuccess(s -> instantEndTest.tryEmitValue(now())) - .delayElement(WAIT_TIME_END) .then(close.apply(a))) .doOnSuccess(s -> instantEnd.tryEmitValue(now())) .then(Mono.zip(instantInit.asMono(), instantInitTest.asMono(), instantEndTest.asMono(), instantEnd.asMono())) @@ -271,23 +299,24 @@ public class Example { System.out.println("----------------------------------------------------------------------"); System.out.println(name); System.out.println( - "\t - Executed " + DecimalFormat.getInstance(Locale.ITALY).format(numRepeats) + " times:"); + "\t - Executed " + DecimalFormat.getInstance(Locale.ITALY).format((numRepeats * batchSize)) + " times:"); System.out.println("\t - Test time: " + DecimalFormat .getInstance(Locale.ITALY) - .format(Duration.between(tuple.getT2(), tuple.getT3()).toNanos() / (double) numRepeats / (double) 1000000) + .format(Duration.between(tuple.getT2(), tuple.getT3()).toNanos() / (double) (numRepeats * batchSize) / (double) 1000000) + "ms"); System.out.println("\t - Test speed: " + DecimalFormat .getInstance(Locale.ITALY) - .format(numRepeats / (Duration.between(tuple.getT2(), tuple.getT3()).toNanos() / (double) 1000000 / (double) 1000)) + .format((numRepeats * batchSize) / (Duration.between(tuple.getT2(), tuple.getT3()).toNanos() / (double) 1000000 / (double) 1000)) + " tests/s"); System.out.println("\t - Total time: " + DecimalFormat .getInstance(Locale.ITALY) .format(Duration.between(tuple.getT2(), tuple.getT3()).toNanos() / (double) 1000000) + "ms"); System.out.println("\t - Total time (setup+test+end): " + DecimalFormat .getInstance(Locale.ITALY) - .format(Duration.between(tuple.getT1(), tuple.getT4().minus(WAIT_TIME)).toNanos() / (double) 1000000) + "ms"); + .format(Duration.between(tuple.getT1(), tuple.getT4()).toNanos() / (double) 1000000) + "ms"); System.out.println("----------------------------------------------------------------------"); }) + .delayElement(WAIT_TIME_END) .then(); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 23a3240..77fbd77 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -136,6 +136,14 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep Map.entry(deserializeSuffix(stripPrefix(entry.getKey())), deserialize(entry.getValue()))); } + @Override + public Mono putMulti(Flux> entries) { + return dictionary + .putMulti(entries + .map(entry -> Map.entry(toKey(serializeSuffix(entry.getKey())), serialize(entry.getValue()))), false) + .then(); + } + @Override public Flux>> getAllStages(@Nullable CompositeSnapshot snapshot) { return dictionary diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java index 8e3759e..de9baf7 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java @@ -26,7 +26,7 @@ public class SubStageGetterSingle implements SubStageGetter(dictionary, keyPrefix, serializer)); } //todo: temporary wrapper. convert the whole class to buffers diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index d1934d7..793ead1 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -5,9 +5,12 @@ import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; +import it.unimi.dsi.fastutil.objects.ObjectArrayList; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; @@ -27,7 +30,7 @@ import org.warp.commonutils.concurrency.atomicity.NotAtomic; import org.warp.commonutils.type.VariableWrapper; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; +import reactor.core.scheduler.Scheduler; @NotAtomic public class LLLocalDictionary implements LLDictionary { @@ -36,6 +39,7 @@ public class LLLocalDictionary implements LLDictionary { static final int RESERVED_WRITE_BATCH_SIZE = 2 * 1024 * 1024; // 2MiB static final long MAX_WRITE_BATCH_SIZE = 1024L * 1024L * 1024L; // 1GiB static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations + static final int MULTI_GET_WINDOW = 500; static final WriteOptions BATCH_WRITE_OPTIONS = new WriteOptions().setLowPri(true); private static final byte[] FIRST_KEY = new byte[]{}; @@ -44,17 +48,20 @@ public class LLLocalDictionary implements LLDictionary { private final RocksDB db; private final ColumnFamilyHandle cfh; private final String databaseName; + private final Scheduler dbScheduler; private final Function snapshotResolver; public LLLocalDictionary(@NotNull RocksDB db, @NotNull ColumnFamilyHandle columnFamilyHandle, String databaseName, + Scheduler dbScheduler, Function snapshotResolver) { Objects.requireNonNull(db); this.db = db; Objects.requireNonNull(columnFamilyHandle); this.cfh = columnFamilyHandle; this.databaseName = databaseName; + this.dbScheduler = dbScheduler; this.snapshotResolver = snapshotResolver; } @@ -95,7 +102,7 @@ public class LLLocalDictionary implements LLDictionary { } }) .onErrorMap(IOException::new) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(dbScheduler); } @Override @@ -129,7 +136,7 @@ public class LLLocalDictionary implements LLDictionary { } }) .onErrorMap(IOException::new) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(dbScheduler); } private Mono containsKey(@Nullable LLSnapshot snapshot, byte[] key) { @@ -147,7 +154,7 @@ public class LLLocalDictionary implements LLDictionary { return size != RocksDB.NOT_FOUND; }) .onErrorMap(IOException::new) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(dbScheduler); } @Override @@ -159,7 +166,7 @@ public class LLLocalDictionary implements LLDictionary { return null; }) .onErrorMap(IOException::new) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(dbScheduler) .then(response); } @@ -172,7 +179,7 @@ public class LLLocalDictionary implements LLDictionary { return null; }) .onErrorMap(IOException::new) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(dbScheduler) .then(response); } @@ -195,7 +202,7 @@ public class LLLocalDictionary implements LLDictionary { } }) .onErrorMap(IOException::new) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(dbScheduler); case VOID: return Mono.empty(); default: @@ -205,40 +212,57 @@ public class LLLocalDictionary implements LLDictionary { @Override public Flux> getMulti(@Nullable LLSnapshot snapshot, Flux keys) { - return keys.flatMap(key -> this.get(snapshot, key).map(value -> Map.entry(key, value))); + return keys + .window(MULTI_GET_WINDOW) + .flatMap(keysWindowFlux -> keysWindowFlux.collectList() + .flatMapMany(keysWindow -> Mono + .>>fromCallable(() -> { + var handlesArray = new ColumnFamilyHandle[keysWindow.size()]; + Arrays.fill(handlesArray, cfh); + var handles = ObjectArrayList.wrap(handlesArray, handlesArray.length); + var results = db.multiGetAsList(resolveSnapshot(snapshot), handles, keysWindow); + var mappedResults = new ArrayList>(results.size()); + for (int i = 0; i < results.size(); i++) { + var val = results.get(i); + if (val != null) { + results.set(i, null); + mappedResults.add(Map.entry(keysWindow.get(i), val)); + } + } + return mappedResults; + }) + .subscribeOn(dbScheduler) + .>flatMapMany(Flux::fromIterable) + ) + ) + .onErrorMap(IOException::new); } @Override public Flux> putMulti(Flux> entries, boolean getOldValues) { - return Mono - .fromCallable(() -> new CappedWriteBatch(db, - CAPPED_WRITE_BATCH_CAP, - RESERVED_WRITE_BATCH_SIZE, - MAX_WRITE_BATCH_SIZE, - BATCH_WRITE_OPTIONS - )) - .subscribeOn(Schedulers.boundedElastic()) - .flatMapMany(writeBatch -> entries - .flatMap(newEntry -> putEntryToWriteBatch(newEntry, getOldValues, writeBatch)) - .concatWith(Mono - .>fromCallable(() -> { - synchronized (writeBatch) { - writeBatch.writeToDbAndClose(); - writeBatch.close(); - } - return null; - }) - .subscribeOn(Schedulers.boundedElastic()) - ) - .doFinally(signalType -> { - synchronized (writeBatch) { - writeBatch.close(); + return entries + .window(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP)) + .publishOn(dbScheduler) + .flatMap(Flux::collectList) + .flatMap(entriesWindow -> this + .getMulti(null, Flux.fromIterable(entriesWindow).map(Entry::getKey)) + .concatWith(Mono.fromCallable(() -> { + var batch = new CappedWriteBatch(db, + CAPPED_WRITE_BATCH_CAP, + RESERVED_WRITE_BATCH_SIZE, + MAX_WRITE_BATCH_SIZE, + BATCH_WRITE_OPTIONS + ); + for (Entry entry : entriesWindow) { + batch.put(entry.getKey(), entry.getValue()); } - }) - ) - .onErrorMap(IOException::new); + batch.writeToDbAndClose(); + batch.close(); + return null; + }))); } + @NotNull private Mono> putEntryToWriteBatch(Entry newEntry, boolean getOldValues, CappedWriteBatch writeBatch) { @@ -256,10 +280,35 @@ public class LLLocalDictionary implements LLDictionary { } return null; }) - .subscribeOn(Schedulers.boundedElastic())) + .subscribeOn(dbScheduler)) .map(oldValue -> Map.entry(newEntry.getKey(), oldValue))); } + @NotNull + private Flux> putEntryToWriteBatch(List> newEntries, boolean getOldValues, + CappedWriteBatch writeBatch) { + return Flux + .from(Flux + .defer(() -> { + if (getOldValues) { + return getMulti(null, Flux.fromIterable(newEntries).map(Entry::getKey)); + } else { + return Flux.empty(); + } + }) + .concatWith(Mono + .>fromCallable(() -> { + synchronized (writeBatch) { + for (Entry newEntry : newEntries) { + writeBatch.put(cfh, newEntry.getKey(), newEntry.getValue()); + } + } + return null; + }).subscribeOn(dbScheduler) + ) + ); + } + @Override public Flux> getRange(@Nullable LLSnapshot snapshot, LLRange range) { if (range.isSingle()) { @@ -287,7 +336,7 @@ public class LLLocalDictionary implements LLDictionary { } return iter; }) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(dbScheduler) .flatMapMany(rocksIterator -> Flux .>fromIterable(() -> { VariableWrapper nextKey = new VariableWrapper<>(null); @@ -327,7 +376,7 @@ public class LLLocalDictionary implements LLDictionary { }; }) .doFinally(signalType -> rocksIterator.close()) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(dbScheduler) ); } @@ -359,7 +408,7 @@ public class LLLocalDictionary implements LLDictionary { } return iter; }) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(dbScheduler) .flatMapMany(rocksIterator -> Flux .fromIterable(() -> { VariableWrapper nextKey = new VariableWrapper<>(null); @@ -391,7 +440,7 @@ public class LLLocalDictionary implements LLDictionary { }; }) .doFinally(signalType -> rocksIterator.close()) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(dbScheduler) ); } @@ -404,7 +453,7 @@ public class LLLocalDictionary implements LLDictionary { } else { return Mono .fromCallable(() -> new CappedWriteBatch(db, CAPPED_WRITE_BATCH_CAP, RESERVED_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE, BATCH_WRITE_OPTIONS)) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(dbScheduler) .flatMapMany(writeBatch -> Mono .fromCallable(() -> { synchronized (writeBatch) { @@ -426,7 +475,7 @@ public class LLLocalDictionary implements LLDictionary { } return null; }) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(dbScheduler) .thenMany(entries) .flatMap(newEntry -> putEntryToWriteBatch(newEntry, getOldValues, writeBatch)) .concatWith(Mono @@ -437,7 +486,7 @@ public class LLLocalDictionary implements LLDictionary { } return null; }) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(dbScheduler) ) .doFinally(signalType -> { synchronized (writeBatch) { @@ -478,7 +527,7 @@ public class LLLocalDictionary implements LLDictionary { return null; }) .onErrorMap(IOException::new) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(dbScheduler); } @@ -490,7 +539,7 @@ public class LLLocalDictionary implements LLDictionary { return Mono .fromCallable(() -> fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot)) .onErrorMap(IOException::new) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(dbScheduler); } else { return Mono .fromCallable(() -> { @@ -516,7 +565,7 @@ public class LLLocalDictionary implements LLDictionary { } }) .onErrorMap(IOException::new) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(dbScheduler); } }); } @@ -579,6 +628,6 @@ public class LLLocalDictionary implements LLDictionary { } }) .onErrorMap(IOException::new) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(dbScheduler); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 5a72172..5375000 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -35,6 +35,7 @@ import org.rocksdb.RocksDBException; import org.rocksdb.Snapshot; import org.rocksdb.WALRecoveryMode; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { @@ -46,6 +47,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { private static final ColumnFamilyDescriptor DEFAULT_COLUMN_FAMILY = new ColumnFamilyDescriptor( RocksDB.DEFAULT_COLUMN_FAMILY); + private final Scheduler dbScheduler; private final Path dbPath; private final String name; private RocksDB db; @@ -73,6 +75,12 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { Path dbPath = Paths.get(dbPathString); this.dbPath = dbPath; this.name = name; + this.dbScheduler = Schedulers.newBoundedElastic(Runtime.getRuntime().availableProcessors(), + Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, + "db-" + name, + 60, + true + ); createIfNotExists(descriptors, options, dbPath, dbPathString); // Create all column families that don't exist @@ -301,7 +309,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { defaultValue )) .onErrorMap(IOException::new) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(dbScheduler); } @Override @@ -310,16 +318,17 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { .fromCallable(() -> new LLLocalDictionary(db, handles.get(Column.special(Column.toString(columnName))), name, + dbScheduler, (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()) )) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(dbScheduler); } @Override public Mono getProperty(String propertyName) { return Mono.fromCallable(() -> db.getAggregatedLongProperty(propertyName)) .onErrorMap(IOException::new) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(dbScheduler); } @Override @@ -331,7 +340,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot); return new LLSnapshot(currentSnapshotSequenceNumber); }) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(dbScheduler); } @Override @@ -345,7 +354,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { db.releaseSnapshot(dbSnapshot); return null; }) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(dbScheduler); } @Override @@ -361,7 +370,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { return null; }) .onErrorMap(IOException::new) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(dbScheduler); } /** diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 2a7c095..6ddf833 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -53,6 +53,7 @@ import reactor.core.publisher.Sinks.EmissionException; import reactor.core.publisher.Sinks.EmitResult; import reactor.core.publisher.Sinks.Many; import reactor.core.publisher.Sinks.One; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; @@ -67,6 +68,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { */ private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new ShortNamedThreadFactory("Lucene")); + private static final Scheduler luceneScheduler = Schedulers.fromExecutorService(scheduler); private final String luceneIndexName; private final SnapshotDeletionPolicy snapshotter; @@ -141,14 +143,14 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { public Mono takeSnapshot() { return Mono .fromCallable(lastSnapshotSeqNo::incrementAndGet) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(luceneScheduler) .flatMap(snapshotSeqNo -> takeLuceneSnapshot() .flatMap(snapshot -> Mono .fromCallable(() -> { this.snapshots.put(snapshotSeqNo, new LuceneIndexSnapshot(snapshot)); return new LLSnapshot(snapshotSeqNo); }) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(luceneScheduler) ) ); } @@ -169,7 +171,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { throw ex; } } - }).subscribeOn(Schedulers.boundedElastic()); + }).subscribeOn(luceneScheduler); } @Override @@ -187,7 +189,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { // Delete unused files after releasing the snapshot indexWriter.deleteUnusedFiles(); return null; - }).subscribeOn(Schedulers.boundedElastic()); + }).subscribeOn(luceneScheduler); } @Override @@ -195,7 +197,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return Mono.fromCallable(() -> { indexWriter.addDocument(LLUtils.toDocument(doc)); return null; - }).subscribeOn(Schedulers.boundedElastic()); + }).subscribeOn(luceneScheduler); } @Override @@ -208,7 +210,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { indexWriter.addDocuments(LLUtils.toDocuments(docs)); return null; }) - .subscribeOn(Schedulers.boundedElastic())) + .subscribeOn(luceneScheduler)) ) .then(); } @@ -219,7 +221,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return Mono.fromCallable(() -> { indexWriter.deleteDocuments(LLUtils.toTerm(id)); return null; - }).subscribeOn(Schedulers.boundedElastic()); + }).subscribeOn(luceneScheduler); } @Override @@ -227,7 +229,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return Mono.fromCallable(() -> { indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document)); return null; - }).subscribeOn(Schedulers.boundedElastic()); + }).subscribeOn(luceneScheduler); } @Override @@ -244,7 +246,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { indexWriter.updateDocuments(LLUtils.toTerm(documents.key()), luceneDocuments); return null; }) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(luceneScheduler) ); } @@ -257,7 +259,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { indexWriter.flush(); indexWriter.commit(); return null; - }).subscribeOn(Schedulers.boundedElastic()); + }).subscribeOn(luceneScheduler); } private Mono acquireSearcherWrapper(LLSnapshot snapshot) { @@ -267,7 +269,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } else { return resolveSnapshot(snapshot).getIndexSearcher(); } - }).subscribeOn(Schedulers.boundedElastic()); + }).subscribeOn(luceneScheduler); } private Mono releaseSearcherWrapper(LLSnapshot snapshot, IndexSearcher indexSearcher) { @@ -279,7 +281,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { e.printStackTrace(); } } - }).subscribeOn(Schedulers.boundedElastic()); + }).subscribeOn(luceneScheduler); } @SuppressWarnings({"Convert2MethodRef", "unchecked", "rawtypes"}) @@ -308,7 +310,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { // Get the reference doc and apply it to MoreLikeThis, to generate the query return mlt.like((Map) mltDocumentFields); }) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(luceneScheduler) .flatMap(query -> Mono .fromCallable(() -> { One totalHitsCountSink = Sinks.one(); @@ -334,7 +336,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { }); return new LLSearchResult(totalHitsCountSink.asMono(), Flux.just(topKeysSink.asFlux())); - }).subscribeOn(Schedulers.boundedElastic()) + }).subscribeOn(luceneScheduler) ).then() .materialize() .flatMap(value -> releaseSearcherWrapper(snapshot, indexSearcher).thenReturn(value)) @@ -356,7 +358,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { org.apache.lucene.search.ScoreMode luceneScoreMode = LLUtils.toScoreMode(scoreMode); return Tuples.of(query, Optional.ofNullable(luceneSort), luceneScoreMode); }) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(luceneScheduler) .flatMap(tuple -> Mono .fromCallable(() -> { Query query = tuple.getT1(); @@ -386,7 +388,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { }); return new LLSearchResult(totalHitsCountSink.asMono(), Flux.just(topKeysSink.asFlux())); - }).subscribeOn(Schedulers.boundedElastic()) + }).subscribeOn(luceneScheduler) ) .materialize() .flatMap(value -> releaseSearcherWrapper(snapshot, indexSearcher).thenReturn(value)) @@ -403,7 +405,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { directory.close(); return null; }) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(luceneScheduler); } private void scheduledCommit() {