diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 18bf6e1..2c18a50 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -28,10 +28,8 @@ import it.cavallium.dbengine.database.SafeCloseable; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.serialization.KVSerializationFunction; -import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; import java.io.IOException; -import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -57,7 +55,6 @@ import org.rocksdb.DirectSlice; import org.rocksdb.FlushOptions; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDBException; -import org.rocksdb.RocksIterator; import org.rocksdb.Slice; import org.rocksdb.Snapshot; import org.rocksdb.WriteBatch; @@ -119,7 +116,8 @@ public class LLLocalDictionary implements LLDictionary { private final ColumnFamilyHandle cfh; private final String databaseName; private final String columnName; - private final Scheduler dbScheduler; + private final Scheduler dbWScheduler; + private final Scheduler dbRScheduler; private final Function snapshotResolver; private final UpdateMode updateMode; private final boolean nettyDirect; @@ -141,12 +139,12 @@ public class LLLocalDictionary implements LLDictionary { private final Counter endedRemove; private final Timer removeTime; - public LLLocalDictionary( - BufferAllocator allocator, + public LLLocalDictionary(BufferAllocator allocator, @NotNull RocksDBColumn db, String databaseName, String columnName, - Scheduler dbScheduler, + Scheduler dbWScheduler, + Scheduler dbRScheduler, Function snapshotResolver, UpdateMode updateMode, DatabaseOptions databaseOptions) { @@ -155,7 +153,8 @@ public class LLLocalDictionary implements LLDictionary { this.cfh = db.getColumnFamilyHandle(); this.databaseName = databaseName; this.columnName = columnName; - this.dbScheduler = dbScheduler; + this.dbWScheduler = dbWScheduler; + this.dbRScheduler = dbRScheduler; this.snapshotResolver = snapshotResolver; this.updateMode = updateMode; alloc = allocator; @@ -242,14 +241,14 @@ public class LLLocalDictionary implements LLDictionary { return alloc; } - private @NotNull Mono runOnDb(Callable<@Nullable T> callable) { - return Mono.fromCallable(callable).subscribeOn(dbScheduler); + private @NotNull Mono runOnDb(boolean write, Callable<@Nullable T> callable) { + return Mono.fromCallable(callable).subscribeOn(write ? dbWScheduler : dbRScheduler); } @Override public Mono> get(@Nullable LLSnapshot snapshot, Mono> keyMono) { return keyMono - .publishOn(dbScheduler) + .publishOn(dbRScheduler) .>handle((keySend, sink) -> { try (var key = keySend.receive()) { logger.trace(MARKER_ROCKSDB, "Reading {}", () -> toStringSafe(key)); @@ -279,7 +278,7 @@ public class LLLocalDictionary implements LLDictionary { @Override public Mono isRangeEmpty(@Nullable LLSnapshot snapshot, Mono> rangeMono, boolean fillCache) { - return rangeMono.publishOn(dbScheduler).handle((rangeSend, sink) -> { + return rangeMono.publishOn(dbRScheduler).handle((rangeSend, sink) -> { try (var range = rangeSend.receive()) { assert !Schedulers.isInNonBlockingThread() : "Called isRangeEmpty in a nonblocking thread"; startedContains.increment(); @@ -380,7 +379,7 @@ public class LLLocalDictionary implements LLDictionary { var previousDataMono = this.getPreviousData(keyMono, resultType); // Write the new entry to the database Mono> putMono = entryMono - .publishOn(dbScheduler) + .publishOn(dbWScheduler) .handle((entry, sink) -> { try (var key = entry.getKey().receive()) { try (var value = entry.getValue().receive()) { @@ -430,7 +429,7 @@ public class LLLocalDictionary implements LLDictionary { BinarySerializationFunction updater, UpdateReturnMode updateReturnMode) { return keyMono - .publishOn(dbScheduler) + .publishOn(dbWScheduler) .handle((keySend, sink) -> { try (var key = keySend.receive()) { assert !Schedulers.isInNonBlockingThread() : "Called update in a nonblocking thread"; @@ -473,7 +472,7 @@ public class LLLocalDictionary implements LLDictionary { public Mono> updateAndGetDelta(Mono> keyMono, BinarySerializationFunction updater) { return keyMono - .publishOn(dbScheduler) + .publishOn(dbWScheduler) .handle((keySend, sink) -> { try (var key = keySend.receive()) { assert !Schedulers.isInNonBlockingThread() : "Called update in a nonblocking thread"; @@ -509,7 +508,7 @@ public class LLLocalDictionary implements LLDictionary { Mono> previousDataMono = this.getPreviousData(keyMono, resultType); // Delete the value from the database Mono> removeMono = keyMono - .publishOn(dbScheduler) + .publishOn(dbWScheduler) .handle((keySend, sink) -> { try (var key = keySend.receive()) { logger.trace(MARKER_ROCKSDB, "Deleting {}", () -> toStringSafe(key)); @@ -536,7 +535,7 @@ public class LLLocalDictionary implements LLDictionary { private Mono> getPreviousData(Mono> keyMono, LLDictionaryResultType resultType) { return switch (resultType) { case PREVIOUS_VALUE_EXISTENCE -> keyMono - .publishOn(dbScheduler) + .publishOn(dbRScheduler) .handle((keySend, sink) -> { try (var key = keySend.receive()) { var contained = containsKey(null, key); @@ -546,7 +545,7 @@ public class LLLocalDictionary implements LLDictionary { } }); case PREVIOUS_VALUE -> keyMono - .publishOn(dbScheduler) + .publishOn(dbRScheduler) .handle((keySend, sink) -> { try (var key = keySend.receive()) { assert !Schedulers.isInNonBlockingThread() : "Called getPreviousData in a nonblocking thread"; @@ -569,7 +568,7 @@ public class LLLocalDictionary implements LLDictionary { public Flux> getMulti(@Nullable LLSnapshot snapshot, Flux> keys) { return keys .buffer(MULTI_GET_WINDOW) - .publishOn(dbScheduler) + .publishOn(dbRScheduler) .>>handle((keysWindow, sink) -> { List keyBufsWindow = new ArrayList<>(keysWindow.size()); for (Send bufferSend : keysWindow) { @@ -609,7 +608,7 @@ public class LLLocalDictionary implements LLDictionary { public Mono putMulti(Flux> entries) { return entries .buffer(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP)) - .publishOn(dbScheduler) + .publishOn(dbWScheduler) .handle((entriesWindowList, sink) -> { var entriesWindow = new ArrayList(entriesWindowList.size()); for (Send entrySend : entriesWindowList) { @@ -662,7 +661,7 @@ public class LLLocalDictionary implements LLDictionary { KVSerializationFunction, @Nullable Buffer> updateFunction) { return Flux.zip(keys, serializedKeys) .buffer(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP)) - .flatMapSequential(ew -> this.>runOnDb(() -> { + .flatMapSequential(ew -> this.>runOnDb(true, () -> { List> entriesWindow = new ArrayList<>(ew.size()); for (Tuple2> tuple : ew) { entriesWindow.add(tuple.mapT2(Send::receive)); @@ -815,7 +814,7 @@ public class LLLocalDictionary implements LLDictionary { return new LLLocalEntryReactiveRocksIterator(db, rangeSend, nettyDirect, resolvedSnapshot, reverse, smallRange); }); return Flux.usingWhen(iteratorMono, - iterator -> iterator.flux().subscribeOn(dbScheduler, false), + iterator -> iterator.flux().subscribeOn(dbRScheduler, false), iterator -> Mono.fromRunnable(iterator::close) ); } @@ -834,7 +833,7 @@ public class LLLocalDictionary implements LLDictionary { }); return Flux.usingWhen( iteratorMono, - iterator -> iterator.flux().subscribeOn(dbScheduler, false), + iterator -> iterator.flux().subscribeOn(dbRScheduler, false), iterator -> Mono.fromRunnable(iterator::close) ); } @@ -871,7 +870,7 @@ public class LLLocalDictionary implements LLDictionary { ); }); return Flux.usingWhen(iteratorMono, - iterator -> iterator.flux().subscribeOn(dbScheduler, false), + iterator -> iterator.flux().subscribeOn(dbRScheduler, false), iterator -> Mono.fromRunnable(iterator::close) ); } @@ -909,7 +908,7 @@ public class LLLocalDictionary implements LLDictionary { sink.error(ex); } }) - .subscribeOn(dbScheduler), + .subscribeOn(dbRScheduler), rangeSend -> Mono.fromRunnable(rangeSend::close) ); } @@ -924,14 +923,14 @@ public class LLLocalDictionary implements LLDictionary { ); }); return Flux.usingWhen(iteratorMono, - iterator -> iterator.flux().subscribeOn(dbScheduler), + iterator -> iterator.flux().subscribeOn(dbRScheduler), iterator -> Mono.fromRunnable(iterator::close) ); } private Flux> getRangeKeysSingle(LLSnapshot snapshot, Mono> keyMono) { return keyMono - .publishOn(dbScheduler) + .publishOn(dbRScheduler) .>handle((keySend, sink) -> { try (var key = keySend.receive()) { if (containsKey(snapshot, key)) { @@ -955,7 +954,7 @@ public class LLLocalDictionary implements LLDictionary { return new LLLocalKeyReactiveRocksIterator(db, range, nettyDirect, resolvedSnapshot, reverse, smallRange); }); return Flux.usingWhen(iteratorMono, - iterator -> iterator.flux().subscribeOn(dbScheduler, false), + iterator -> iterator.flux().subscribeOn(dbRScheduler, false), iterator -> Mono.fromRunnable(iterator::close) ); } @@ -964,7 +963,7 @@ public class LLLocalDictionary implements LLDictionary { public Mono setRange(Mono> rangeMono, Flux> entries, boolean smallRange) { if (USE_WINDOW_IN_SET_RANGE) { return rangeMono - .publishOn(dbScheduler) + .publishOn(dbWScheduler) .handle((rangeSend, sink) -> { try (var range = rangeSend.receive()) { assert !Schedulers.isInNonBlockingThread() : "Called setRange in a nonblocking thread"; @@ -1046,7 +1045,7 @@ public class LLLocalDictionary implements LLDictionary { .flatMap(keysWindowFlux -> keysWindowFlux .collectList() .flatMap(entriesListSend -> this - .runOnDb(() -> { + .runOnDb(true, () -> { List entriesList = new ArrayList<>(entriesListSend.size()); for (Send entrySend : entriesListSend) { entriesList.add(entrySend.receive()); @@ -1108,7 +1107,7 @@ public class LLLocalDictionary implements LLDictionary { } var deleteMono = this .getRange(null, rangeMono, false, smallRange) - .publishOn(dbScheduler) + .publishOn(dbWScheduler) .handle((oldValueSend, sink) -> { try (var oldValue = oldValueSend.receive()) { db.delete(EMPTY_WRITE_OPTIONS, oldValue.getKeyUnsafe()); @@ -1120,7 +1119,7 @@ public class LLLocalDictionary implements LLDictionary { .then(Mono.empty()); var putMono = entries - .publishOn(dbScheduler) + .publishOn(dbWScheduler) .handle((entrySend, sink) -> { try (var entry = entrySend.receive()) { if (entry.getKeyUnsafe() != null && entry.getValueUnsafe() != null) { @@ -1313,13 +1312,13 @@ public class LLLocalDictionary implements LLDictionary { } }) .onErrorMap(cause -> new IOException("Failed to clear", cause)) - .subscribeOn(dbScheduler); + .subscribeOn(dbWScheduler); } @Override public Mono sizeRange(@Nullable LLSnapshot snapshot, Mono> rangeMono, boolean fast) { - return rangeMono.publishOn(dbScheduler).handle((rangeSend, sink) -> { + return rangeMono.publishOn(dbRScheduler).handle((rangeSend, sink) -> { try (var range = rangeSend.receive()) { assert !Schedulers.isInNonBlockingThread() : "Called sizeRange in a nonblocking thread"; if (range.isAll()) { @@ -1383,7 +1382,7 @@ public class LLLocalDictionary implements LLDictionary { @Override public Mono> getOne(@Nullable LLSnapshot snapshot, Mono> rangeMono) { - return rangeMono.publishOn(dbScheduler).handle((rangeSend, sink) -> { + return rangeMono.publishOn(dbRScheduler).handle((rangeSend, sink) -> { try (var range = rangeSend.receive()) { assert !Schedulers.isInNonBlockingThread() : "Called getOne in a nonblocking thread"; try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { @@ -1438,7 +1437,7 @@ public class LLLocalDictionary implements LLDictionary { @Override public Mono> getOneKey(@Nullable LLSnapshot snapshot, Mono> rangeMono) { - return rangeMono.publishOn(dbScheduler).handle((rangeSend, sink) -> { + return rangeMono.publishOn(dbRScheduler).handle((rangeSend, sink) -> { try (var range = rangeSend.receive()) { assert !Schedulers.isInNonBlockingThread() : "Called getOneKey in a nonblocking thread"; try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { @@ -1600,7 +1599,7 @@ public class LLLocalDictionary implements LLDictionary { @Override public Mono> removeOne(Mono> rangeMono) { - return rangeMono.publishOn(dbScheduler).handle((rangeSend, sink) -> { + return rangeMono.publishOn(dbWScheduler).handle((rangeSend, sink) -> { try (var range = rangeSend.receive()) { assert !Schedulers.isInNonBlockingThread() : "Called removeOne in a nonblocking thread"; try (var readOpts = new ReadOptions(getReadOptions(null))) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 89bad47..9c63e0c 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -90,7 +90,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { private final BufferAllocator allocator; private final MeterRegistry meterRegistry; - private final Scheduler dbScheduler; + private final Scheduler dbWScheduler; + private final Scheduler dbRScheduler; private final Timer snapshotTime; @@ -204,34 +205,40 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig(); if (!databaseOptions.lowMemory()) { - final BloomFilter bloomFilter = new BloomFilter(3); + final BloomFilter bloomFilter = new BloomFilter(10); tableOptions.setFilterPolicy(bloomFilter); tableOptions.setOptimizeFiltersForMemory(true); tableOptions.setVerifyCompression(false); } - boolean cacheIndexAndFilterBlocks = databaseOptions.setCacheIndexAndFilterBlocks().orElse(true); + boolean cacheIndexAndFilterBlocks = databaseOptions.setCacheIndexAndFilterBlocks() + // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters + .orElse(true); if (databaseOptions.spinning()) { // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks cacheIndexAndFilterBlocks = true; } tableOptions + // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters .setPinTopLevelIndexAndFilter(true) + // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters .setPinL0FilterAndIndexBlocksInCache(true) + // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters .setCacheIndexAndFilterBlocksWithHighPriority(true) .setCacheIndexAndFilterBlocks(cacheIndexAndFilterBlocks) - .setPartitionFilters(true) + //.setPartitionFilters(true) .setIndexType(IndexType.kTwoLevelIndexSearch) - .setFormatVersion(5) //todo: replace with kxxhash3 .setChecksumType(ChecksumType.kxxHash) .setBlockCacheCompressed(optionsWithCache.compressedCache()) .setBlockCache(optionsWithCache.standardCache()) // Spinning disks: 64KiB to 256KiB (also 512KiB). SSDs: 16KiB // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks - .setBlockSize((databaseOptions.spinning() ? 256 : 16) * SizeUnit.KB); + .setBlockSize((databaseOptions.spinning() ? 512 : 16) * SizeUnit.KB); columnOptions.setTableFormatConfig(tableOptions); columnOptions.setCompactionPriority(CompactionPriority.MinOverlappingRatio); + // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks + columnOptions.setOptimizeFiltersForHits(true); descriptors.add(new ColumnFamilyDescriptor(column.name().getBytes(StandardCharsets.US_ASCII), columnOptions)); } @@ -250,17 +257,24 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { if (databaseOptions.lowMemory()) { threadCap = Math.max(1, Runtime.getRuntime().availableProcessors()); - this.dbScheduler = Schedulers.boundedElastic(); + this.dbWScheduler = Schedulers.boundedElastic(); + this.dbRScheduler = Schedulers.boundedElastic(); } else { // 8 or more threadCap = Math.max(8, Math.max(Runtime.getRuntime().availableProcessors(), Integer.parseInt(System.getProperty("it.cavallium.dbengine.scheduler.threads", "0")))); if (Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.scheduler.shared", "true"))) { - this.dbScheduler = Schedulers.boundedElastic(); + this.dbWScheduler = Schedulers.boundedElastic(); + this.dbRScheduler = Schedulers.boundedElastic(); } else { - this.dbScheduler = Schedulers.newBoundedElastic(threadCap, + this.dbWScheduler = Schedulers.newBoundedElastic(threadCap, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, - new ShortNamedThreadFactory("db-" + name).setDaemon(true).withGroup(new ThreadGroup("database-threads")), + new ShortNamedThreadFactory("db-write-" + name).setDaemon(true).withGroup(new ThreadGroup("database-write")), + 60 + ); + this.dbRScheduler = Schedulers.newBoundedElastic(threadCap, + Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, + new ShortNamedThreadFactory("db-write-" + name).setDaemon(true).withGroup(new ThreadGroup("database-read")), 60 ); } @@ -503,16 +517,23 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { .setWalSizeLimitMB(0) .setMaxTotalWalSize(0) // automatic ; - blockCache = new ClockCache(databaseOptions.blockCache().orElse( 8L * SizeUnit.MB) / 2, -1, true); - compressedCache = null; + blockCache = new ClockCache(databaseOptions.blockCache().orElse( 8L * SizeUnit.MB), 6, false); + compressedCache = new ClockCache(databaseOptions.blockCache().orElse( 8L * SizeUnit.MB), 6, false); + if (databaseOptions.spinning()) { + options + // method documentation + .setCompactionReadaheadSize(16 * SizeUnit.MB) + // guessed + .setWritableFileMaxBufferSize(16 * SizeUnit.MB); + } if (databaseOptions.useDirectIO()) { options // Option to enable readahead in compaction // If not set, it will be set to 2MB internally - .setCompactionReadaheadSize(2 * 1024 * 1024) // recommend at least 2MB + .setCompactionReadaheadSize(2 * SizeUnit.MB) // recommend at least 2MB // Option to tune write buffer for direct writes - .setWritableFileMaxBufferSize(1024 * 1024) + .setWritableFileMaxBufferSize(2 * SizeUnit.MB) ; } } else { @@ -632,11 +653,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { LLLocalKeyValueDatabase.this.name, name, ColumnUtils.toString(singletonListColumnName), - dbScheduler, - defaultValue + dbWScheduler, dbRScheduler, defaultValue )) .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause)) - .subscribeOn(dbScheduler); + .subscribeOn(dbRScheduler); } @Override @@ -647,12 +667,13 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { getRocksDBColumn(db, getCfh(columnName)), name, ColumnUtils.toString(columnName), - dbScheduler, + dbWScheduler, + dbRScheduler, (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()), updateMode, databaseOptions )) - .subscribeOn(dbScheduler); + .subscribeOn(dbRScheduler); } public RocksDBColumn getRocksDBColumn(byte[] columnName) { @@ -689,7 +710,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { public Mono getProperty(String propertyName) { return Mono.fromCallable(() -> db.getAggregatedLongProperty(propertyName)) .onErrorMap(cause -> new IOException("Failed to read " + propertyName, cause)) - .subscribeOn(dbScheduler); + .subscribeOn(dbRScheduler); } @Override @@ -703,7 +724,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { db.getAggregatedLongProperty("rocksdb.block-cache-pinned-usage") )) .onErrorMap(cause -> new IOException("Failed to read memory stats", cause)) - .subscribeOn(dbScheduler); + .subscribeOn(dbRScheduler); } @Override @@ -715,7 +736,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { }) .onErrorMap(cause -> new IOException("Failed to verify checksum of database \"" + getDatabaseName() + "\"", cause)) - .subscribeOn(dbScheduler); + .subscribeOn(dbRScheduler); } @Override @@ -737,7 +758,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot); return new LLSnapshot(currentSnapshotSequenceNumber); })) - .subscribeOn(dbScheduler); + .subscribeOn(dbRScheduler); } @Override @@ -751,7 +772,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { db.releaseSnapshot(dbSnapshot); return null; }) - .subscribeOn(dbScheduler); + .subscribeOn(dbRScheduler); } @Override @@ -768,7 +789,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { return null; }) .onErrorMap(cause -> new IOException("Failed to close", cause)) - .subscribeOn(dbScheduler); + .subscribeOn(dbWScheduler); } /** diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java index 5572464..dfdd376 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java @@ -9,7 +9,6 @@ import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.UpdateReturnMode; -import it.cavallium.dbengine.database.serialization.SerializationFunction; import java.io.IOException; import java.util.Arrays; import java.util.concurrent.Callable; @@ -34,14 +33,16 @@ public class LLLocalSingleton implements LLSingleton { private final String columnName; private final Mono> nameMono; private final String databaseName; - private final Scheduler dbScheduler; + private final Scheduler dbWScheduler; + private final Scheduler dbRScheduler; public LLLocalSingleton(RocksDBColumn db, Function snapshotResolver, String databaseName, byte[] name, String columnName, - Scheduler dbScheduler, + Scheduler dbWScheduler, + Scheduler dbRScheduler, byte @Nullable [] defaultValue) throws RocksDBException { this.db = db; this.databaseName = databaseName; @@ -55,7 +56,8 @@ public class LLLocalSingleton implements LLSingleton { return nameBuf.send(); } }); - this.dbScheduler = dbScheduler; + this.dbWScheduler = dbWScheduler; + this.dbRScheduler = dbRScheduler; if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Initialized in a nonblocking thread"); } @@ -64,8 +66,8 @@ public class LLLocalSingleton implements LLSingleton { } } - private @NotNull Mono runOnDb(Callable<@Nullable T> callable) { - return Mono.fromCallable(callable).subscribeOn(dbScheduler); + private @NotNull Mono runOnDb(boolean write, Callable<@Nullable T> callable) { + return Mono.fromCallable(callable).subscribeOn(write ? dbWScheduler : dbRScheduler); } private ReadOptions resolveSnapshot(LLSnapshot snapshot) { @@ -83,7 +85,7 @@ public class LLLocalSingleton implements LLSingleton { @Override public Mono> get(@Nullable LLSnapshot snapshot) { - return nameMono.publishOn(Schedulers.boundedElastic()).handle((nameSend, sink) -> { + return nameMono.publishOn(dbRScheduler).handle((nameSend, sink) -> { try (Buffer name = nameSend.receive()) { Buffer result = db.get(resolveSnapshot(snapshot), name); if (result != null) { @@ -99,7 +101,7 @@ public class LLLocalSingleton implements LLSingleton { @Override public Mono set(Mono> valueMono) { - return Mono.zip(nameMono, valueMono).publishOn(Schedulers.boundedElastic()).handle((tuple, sink) -> { + return Mono.zip(nameMono, valueMono).publishOn(dbWScheduler).handle((tuple, sink) -> { var nameSend = tuple.getT1(); var valueSend = tuple.getT2(); try (Buffer name = nameSend.receive()) { @@ -114,7 +116,7 @@ public class LLLocalSingleton implements LLSingleton { } private Mono unset() { - return nameMono.publishOn(Schedulers.boundedElastic()).handle((nameSend, sink) -> { + return nameMono.publishOn(dbWScheduler).handle((nameSend, sink) -> { try (Buffer name = nameSend.receive()) { db.delete(EMPTY_WRITE_OPTIONS, name); } catch (RocksDBException ex) { @@ -126,7 +128,7 @@ public class LLLocalSingleton implements LLSingleton { @Override public Mono> update(BinarySerializationFunction updater, UpdateReturnMode updateReturnMode) { - return Mono.usingWhen(nameMono, keySend -> runOnDb(() -> { + return Mono.usingWhen(nameMono, keySend -> runOnDb(true, () -> { if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called update in a nonblocking thread"); } @@ -150,7 +152,7 @@ public class LLLocalSingleton implements LLSingleton { @Override public Mono> updateAndGetDelta(BinarySerializationFunction updater) { - return Mono.usingWhen(nameMono, keySend -> runOnDb(() -> { + return Mono.usingWhen(nameMono, keySend -> runOnDb(true, () -> { if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called update in a nonblocking thread"); }