Separate write and read schedulers

This commit is contained in:
Andrea Cavalli 2022-04-05 13:58:12 +02:00
parent 02cd99a963
commit 6ac9505653
3 changed files with 95 additions and 73 deletions

View File

@ -28,10 +28,8 @@ import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.serialization.KVSerializationFunction;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@ -57,7 +55,6 @@ import org.rocksdb.DirectSlice;
import org.rocksdb.FlushOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Slice;
import org.rocksdb.Snapshot;
import org.rocksdb.WriteBatch;
@ -119,7 +116,8 @@ public class LLLocalDictionary implements LLDictionary {
private final ColumnFamilyHandle cfh;
private final String databaseName;
private final String columnName;
private final Scheduler dbScheduler;
private final Scheduler dbWScheduler;
private final Scheduler dbRScheduler;
private final Function<LLSnapshot, Snapshot> snapshotResolver;
private final UpdateMode updateMode;
private final boolean nettyDirect;
@ -141,12 +139,12 @@ public class LLLocalDictionary implements LLDictionary {
private final Counter endedRemove;
private final Timer removeTime;
public LLLocalDictionary(
BufferAllocator allocator,
public LLLocalDictionary(BufferAllocator allocator,
@NotNull RocksDBColumn db,
String databaseName,
String columnName,
Scheduler dbScheduler,
Scheduler dbWScheduler,
Scheduler dbRScheduler,
Function<LLSnapshot, Snapshot> snapshotResolver,
UpdateMode updateMode,
DatabaseOptions databaseOptions) {
@ -155,7 +153,8 @@ public class LLLocalDictionary implements LLDictionary {
this.cfh = db.getColumnFamilyHandle();
this.databaseName = databaseName;
this.columnName = columnName;
this.dbScheduler = dbScheduler;
this.dbWScheduler = dbWScheduler;
this.dbRScheduler = dbRScheduler;
this.snapshotResolver = snapshotResolver;
this.updateMode = updateMode;
alloc = allocator;
@ -242,14 +241,14 @@ public class LLLocalDictionary implements LLDictionary {
return alloc;
}
private <T> @NotNull Mono<T> runOnDb(Callable<@Nullable T> callable) {
return Mono.fromCallable(callable).subscribeOn(dbScheduler);
private <T> @NotNull Mono<T> runOnDb(boolean write, Callable<@Nullable T> callable) {
return Mono.fromCallable(callable).subscribeOn(write ? dbWScheduler : dbRScheduler);
}
@Override
public Mono<Send<Buffer>> get(@Nullable LLSnapshot snapshot, Mono<Send<Buffer>> keyMono) {
return keyMono
.publishOn(dbScheduler)
.publishOn(dbRScheduler)
.<Send<Buffer>>handle((keySend, sink) -> {
try (var key = keySend.receive()) {
logger.trace(MARKER_ROCKSDB, "Reading {}", () -> toStringSafe(key));
@ -279,7 +278,7 @@ public class LLLocalDictionary implements LLDictionary {
@Override
public Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, boolean fillCache) {
return rangeMono.publishOn(dbScheduler).handle((rangeSend, sink) -> {
return rangeMono.publishOn(dbRScheduler).handle((rangeSend, sink) -> {
try (var range = rangeSend.receive()) {
assert !Schedulers.isInNonBlockingThread() : "Called isRangeEmpty in a nonblocking thread";
startedContains.increment();
@ -380,7 +379,7 @@ public class LLLocalDictionary implements LLDictionary {
var previousDataMono = this.getPreviousData(keyMono, resultType);
// Write the new entry to the database
Mono<Send<Buffer>> putMono = entryMono
.publishOn(dbScheduler)
.publishOn(dbWScheduler)
.handle((entry, sink) -> {
try (var key = entry.getKey().receive()) {
try (var value = entry.getValue().receive()) {
@ -430,7 +429,7 @@ public class LLLocalDictionary implements LLDictionary {
BinarySerializationFunction updater,
UpdateReturnMode updateReturnMode) {
return keyMono
.publishOn(dbScheduler)
.publishOn(dbWScheduler)
.handle((keySend, sink) -> {
try (var key = keySend.receive()) {
assert !Schedulers.isInNonBlockingThread() : "Called update in a nonblocking thread";
@ -473,7 +472,7 @@ public class LLLocalDictionary implements LLDictionary {
public Mono<Send<LLDelta>> updateAndGetDelta(Mono<Send<Buffer>> keyMono,
BinarySerializationFunction updater) {
return keyMono
.publishOn(dbScheduler)
.publishOn(dbWScheduler)
.handle((keySend, sink) -> {
try (var key = keySend.receive()) {
assert !Schedulers.isInNonBlockingThread() : "Called update in a nonblocking thread";
@ -509,7 +508,7 @@ public class LLLocalDictionary implements LLDictionary {
Mono<Send<Buffer>> previousDataMono = this.getPreviousData(keyMono, resultType);
// Delete the value from the database
Mono<Send<Buffer>> removeMono = keyMono
.publishOn(dbScheduler)
.publishOn(dbWScheduler)
.handle((keySend, sink) -> {
try (var key = keySend.receive()) {
logger.trace(MARKER_ROCKSDB, "Deleting {}", () -> toStringSafe(key));
@ -536,7 +535,7 @@ public class LLLocalDictionary implements LLDictionary {
private Mono<Send<Buffer>> getPreviousData(Mono<Send<Buffer>> keyMono, LLDictionaryResultType resultType) {
return switch (resultType) {
case PREVIOUS_VALUE_EXISTENCE -> keyMono
.publishOn(dbScheduler)
.publishOn(dbRScheduler)
.handle((keySend, sink) -> {
try (var key = keySend.receive()) {
var contained = containsKey(null, key);
@ -546,7 +545,7 @@ public class LLLocalDictionary implements LLDictionary {
}
});
case PREVIOUS_VALUE -> keyMono
.publishOn(dbScheduler)
.publishOn(dbRScheduler)
.handle((keySend, sink) -> {
try (var key = keySend.receive()) {
assert !Schedulers.isInNonBlockingThread() : "Called getPreviousData in a nonblocking thread";
@ -569,7 +568,7 @@ public class LLLocalDictionary implements LLDictionary {
public Flux<Optional<Buffer>> getMulti(@Nullable LLSnapshot snapshot, Flux<Send<Buffer>> keys) {
return keys
.buffer(MULTI_GET_WINDOW)
.publishOn(dbScheduler)
.publishOn(dbRScheduler)
.<ArrayList<Optional<Buffer>>>handle((keysWindow, sink) -> {
List<Buffer> keyBufsWindow = new ArrayList<>(keysWindow.size());
for (Send<Buffer> bufferSend : keysWindow) {
@ -609,7 +608,7 @@ public class LLLocalDictionary implements LLDictionary {
public Mono<Void> putMulti(Flux<Send<LLEntry>> entries) {
return entries
.buffer(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP))
.publishOn(dbScheduler)
.publishOn(dbWScheduler)
.handle((entriesWindowList, sink) -> {
var entriesWindow = new ArrayList<LLEntry>(entriesWindowList.size());
for (Send<LLEntry> entrySend : entriesWindowList) {
@ -662,7 +661,7 @@ public class LLLocalDictionary implements LLDictionary {
KVSerializationFunction<K, @Nullable Send<Buffer>, @Nullable Buffer> updateFunction) {
return Flux.zip(keys, serializedKeys)
.buffer(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP))
.flatMapSequential(ew -> this.<List<Boolean>>runOnDb(() -> {
.flatMapSequential(ew -> this.<List<Boolean>>runOnDb(true, () -> {
List<Tuple2<K, Buffer>> entriesWindow = new ArrayList<>(ew.size());
for (Tuple2<K, Send<Buffer>> tuple : ew) {
entriesWindow.add(tuple.mapT2(Send::receive));
@ -815,7 +814,7 @@ public class LLLocalDictionary implements LLDictionary {
return new LLLocalEntryReactiveRocksIterator(db, rangeSend, nettyDirect, resolvedSnapshot, reverse, smallRange);
});
return Flux.usingWhen(iteratorMono,
iterator -> iterator.flux().subscribeOn(dbScheduler, false),
iterator -> iterator.flux().subscribeOn(dbRScheduler, false),
iterator -> Mono.fromRunnable(iterator::close)
);
}
@ -834,7 +833,7 @@ public class LLLocalDictionary implements LLDictionary {
});
return Flux.usingWhen(
iteratorMono,
iterator -> iterator.flux().subscribeOn(dbScheduler, false),
iterator -> iterator.flux().subscribeOn(dbRScheduler, false),
iterator -> Mono.fromRunnable(iterator::close)
);
}
@ -871,7 +870,7 @@ public class LLLocalDictionary implements LLDictionary {
);
});
return Flux.usingWhen(iteratorMono,
iterator -> iterator.flux().subscribeOn(dbScheduler, false),
iterator -> iterator.flux().subscribeOn(dbRScheduler, false),
iterator -> Mono.fromRunnable(iterator::close)
);
}
@ -909,7 +908,7 @@ public class LLLocalDictionary implements LLDictionary {
sink.error(ex);
}
})
.subscribeOn(dbScheduler),
.subscribeOn(dbRScheduler),
rangeSend -> Mono.fromRunnable(rangeSend::close)
);
}
@ -924,14 +923,14 @@ public class LLLocalDictionary implements LLDictionary {
);
});
return Flux.usingWhen(iteratorMono,
iterator -> iterator.flux().subscribeOn(dbScheduler),
iterator -> iterator.flux().subscribeOn(dbRScheduler),
iterator -> Mono.fromRunnable(iterator::close)
);
}
private Flux<Send<Buffer>> getRangeKeysSingle(LLSnapshot snapshot, Mono<Send<Buffer>> keyMono) {
return keyMono
.publishOn(dbScheduler)
.publishOn(dbRScheduler)
.<Send<Buffer>>handle((keySend, sink) -> {
try (var key = keySend.receive()) {
if (containsKey(snapshot, key)) {
@ -955,7 +954,7 @@ public class LLLocalDictionary implements LLDictionary {
return new LLLocalKeyReactiveRocksIterator(db, range, nettyDirect, resolvedSnapshot, reverse, smallRange);
});
return Flux.usingWhen(iteratorMono,
iterator -> iterator.flux().subscribeOn(dbScheduler, false),
iterator -> iterator.flux().subscribeOn(dbRScheduler, false),
iterator -> Mono.fromRunnable(iterator::close)
);
}
@ -964,7 +963,7 @@ public class LLLocalDictionary implements LLDictionary {
public Mono<Void> setRange(Mono<Send<LLRange>> rangeMono, Flux<Send<LLEntry>> entries, boolean smallRange) {
if (USE_WINDOW_IN_SET_RANGE) {
return rangeMono
.publishOn(dbScheduler)
.publishOn(dbWScheduler)
.<Void>handle((rangeSend, sink) -> {
try (var range = rangeSend.receive()) {
assert !Schedulers.isInNonBlockingThread() : "Called setRange in a nonblocking thread";
@ -1046,7 +1045,7 @@ public class LLLocalDictionary implements LLDictionary {
.flatMap(keysWindowFlux -> keysWindowFlux
.collectList()
.flatMap(entriesListSend -> this
.<Void>runOnDb(() -> {
.<Void>runOnDb(true, () -> {
List<LLEntry> entriesList = new ArrayList<>(entriesListSend.size());
for (Send<LLEntry> entrySend : entriesListSend) {
entriesList.add(entrySend.receive());
@ -1108,7 +1107,7 @@ public class LLLocalDictionary implements LLDictionary {
}
var deleteMono = this
.getRange(null, rangeMono, false, smallRange)
.publishOn(dbScheduler)
.publishOn(dbWScheduler)
.handle((oldValueSend, sink) -> {
try (var oldValue = oldValueSend.receive()) {
db.delete(EMPTY_WRITE_OPTIONS, oldValue.getKeyUnsafe());
@ -1120,7 +1119,7 @@ public class LLLocalDictionary implements LLDictionary {
.then(Mono.<Void>empty());
var putMono = entries
.publishOn(dbScheduler)
.publishOn(dbWScheduler)
.handle((entrySend, sink) -> {
try (var entry = entrySend.receive()) {
if (entry.getKeyUnsafe() != null && entry.getValueUnsafe() != null) {
@ -1313,13 +1312,13 @@ public class LLLocalDictionary implements LLDictionary {
}
})
.onErrorMap(cause -> new IOException("Failed to clear", cause))
.subscribeOn(dbScheduler);
.subscribeOn(dbWScheduler);
}
@Override
public Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, boolean fast) {
return rangeMono.publishOn(dbScheduler).handle((rangeSend, sink) -> {
return rangeMono.publishOn(dbRScheduler).handle((rangeSend, sink) -> {
try (var range = rangeSend.receive()) {
assert !Schedulers.isInNonBlockingThread() : "Called sizeRange in a nonblocking thread";
if (range.isAll()) {
@ -1383,7 +1382,7 @@ public class LLLocalDictionary implements LLDictionary {
@Override
public Mono<Send<LLEntry>> getOne(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) {
return rangeMono.publishOn(dbScheduler).handle((rangeSend, sink) -> {
return rangeMono.publishOn(dbRScheduler).handle((rangeSend, sink) -> {
try (var range = rangeSend.receive()) {
assert !Schedulers.isInNonBlockingThread() : "Called getOne in a nonblocking thread";
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
@ -1438,7 +1437,7 @@ public class LLLocalDictionary implements LLDictionary {
@Override
public Mono<Send<Buffer>> getOneKey(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) {
return rangeMono.publishOn(dbScheduler).handle((rangeSend, sink) -> {
return rangeMono.publishOn(dbRScheduler).handle((rangeSend, sink) -> {
try (var range = rangeSend.receive()) {
assert !Schedulers.isInNonBlockingThread() : "Called getOneKey in a nonblocking thread";
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
@ -1600,7 +1599,7 @@ public class LLLocalDictionary implements LLDictionary {
@Override
public Mono<Send<LLEntry>> removeOne(Mono<Send<LLRange>> rangeMono) {
return rangeMono.publishOn(dbScheduler).handle((rangeSend, sink) -> {
return rangeMono.publishOn(dbWScheduler).handle((rangeSend, sink) -> {
try (var range = rangeSend.receive()) {
assert !Schedulers.isInNonBlockingThread() : "Called removeOne in a nonblocking thread";
try (var readOpts = new ReadOptions(getReadOptions(null))) {

View File

@ -90,7 +90,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
private final BufferAllocator allocator;
private final MeterRegistry meterRegistry;
private final Scheduler dbScheduler;
private final Scheduler dbWScheduler;
private final Scheduler dbRScheduler;
private final Timer snapshotTime;
@ -204,34 +205,40 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
if (!databaseOptions.lowMemory()) {
final BloomFilter bloomFilter = new BloomFilter(3);
final BloomFilter bloomFilter = new BloomFilter(10);
tableOptions.setFilterPolicy(bloomFilter);
tableOptions.setOptimizeFiltersForMemory(true);
tableOptions.setVerifyCompression(false);
}
boolean cacheIndexAndFilterBlocks = databaseOptions.setCacheIndexAndFilterBlocks().orElse(true);
boolean cacheIndexAndFilterBlocks = databaseOptions.setCacheIndexAndFilterBlocks()
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.orElse(true);
if (databaseOptions.spinning()) {
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
cacheIndexAndFilterBlocks = true;
}
tableOptions
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.setPinTopLevelIndexAndFilter(true)
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.setPinL0FilterAndIndexBlocksInCache(true)
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.setCacheIndexAndFilterBlocksWithHighPriority(true)
.setCacheIndexAndFilterBlocks(cacheIndexAndFilterBlocks)
.setPartitionFilters(true)
//.setPartitionFilters(true)
.setIndexType(IndexType.kTwoLevelIndexSearch)
.setFormatVersion(5)
//todo: replace with kxxhash3
.setChecksumType(ChecksumType.kxxHash)
.setBlockCacheCompressed(optionsWithCache.compressedCache())
.setBlockCache(optionsWithCache.standardCache())
// Spinning disks: 64KiB to 256KiB (also 512KiB). SSDs: 16KiB
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
.setBlockSize((databaseOptions.spinning() ? 256 : 16) * SizeUnit.KB);
.setBlockSize((databaseOptions.spinning() ? 512 : 16) * SizeUnit.KB);
columnOptions.setTableFormatConfig(tableOptions);
columnOptions.setCompactionPriority(CompactionPriority.MinOverlappingRatio);
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
columnOptions.setOptimizeFiltersForHits(true);
descriptors.add(new ColumnFamilyDescriptor(column.name().getBytes(StandardCharsets.US_ASCII), columnOptions));
}
@ -250,17 +257,24 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
if (databaseOptions.lowMemory()) {
threadCap = Math.max(1, Runtime.getRuntime().availableProcessors());
this.dbScheduler = Schedulers.boundedElastic();
this.dbWScheduler = Schedulers.boundedElastic();
this.dbRScheduler = Schedulers.boundedElastic();
} else {
// 8 or more
threadCap = Math.max(8, Math.max(Runtime.getRuntime().availableProcessors(),
Integer.parseInt(System.getProperty("it.cavallium.dbengine.scheduler.threads", "0"))));
if (Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.scheduler.shared", "true"))) {
this.dbScheduler = Schedulers.boundedElastic();
this.dbWScheduler = Schedulers.boundedElastic();
this.dbRScheduler = Schedulers.boundedElastic();
} else {
this.dbScheduler = Schedulers.newBoundedElastic(threadCap,
this.dbWScheduler = Schedulers.newBoundedElastic(threadCap,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
new ShortNamedThreadFactory("db-" + name).setDaemon(true).withGroup(new ThreadGroup("database-threads")),
new ShortNamedThreadFactory("db-write-" + name).setDaemon(true).withGroup(new ThreadGroup("database-write")),
60
);
this.dbRScheduler = Schedulers.newBoundedElastic(threadCap,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
new ShortNamedThreadFactory("db-write-" + name).setDaemon(true).withGroup(new ThreadGroup("database-read")),
60
);
}
@ -503,16 +517,23 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
.setWalSizeLimitMB(0)
.setMaxTotalWalSize(0) // automatic
;
blockCache = new ClockCache(databaseOptions.blockCache().orElse( 8L * SizeUnit.MB) / 2, -1, true);
compressedCache = null;
blockCache = new ClockCache(databaseOptions.blockCache().orElse( 8L * SizeUnit.MB), 6, false);
compressedCache = new ClockCache(databaseOptions.blockCache().orElse( 8L * SizeUnit.MB), 6, false);
if (databaseOptions.spinning()) {
options
// method documentation
.setCompactionReadaheadSize(16 * SizeUnit.MB)
// guessed
.setWritableFileMaxBufferSize(16 * SizeUnit.MB);
}
if (databaseOptions.useDirectIO()) {
options
// Option to enable readahead in compaction
// If not set, it will be set to 2MB internally
.setCompactionReadaheadSize(2 * 1024 * 1024) // recommend at least 2MB
.setCompactionReadaheadSize(2 * SizeUnit.MB) // recommend at least 2MB
// Option to tune write buffer for direct writes
.setWritableFileMaxBufferSize(1024 * 1024)
.setWritableFileMaxBufferSize(2 * SizeUnit.MB)
;
}
} else {
@ -632,11 +653,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
LLLocalKeyValueDatabase.this.name,
name,
ColumnUtils.toString(singletonListColumnName),
dbScheduler,
defaultValue
dbWScheduler, dbRScheduler, defaultValue
))
.onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause))
.subscribeOn(dbScheduler);
.subscribeOn(dbRScheduler);
}
@Override
@ -647,12 +667,13 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
getRocksDBColumn(db, getCfh(columnName)),
name,
ColumnUtils.toString(columnName),
dbScheduler,
dbWScheduler,
dbRScheduler,
(snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()),
updateMode,
databaseOptions
))
.subscribeOn(dbScheduler);
.subscribeOn(dbRScheduler);
}
public RocksDBColumn getRocksDBColumn(byte[] columnName) {
@ -689,7 +710,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
public Mono<Long> getProperty(String propertyName) {
return Mono.fromCallable(() -> db.getAggregatedLongProperty(propertyName))
.onErrorMap(cause -> new IOException("Failed to read " + propertyName, cause))
.subscribeOn(dbScheduler);
.subscribeOn(dbRScheduler);
}
@Override
@ -703,7 +724,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
db.getAggregatedLongProperty("rocksdb.block-cache-pinned-usage")
))
.onErrorMap(cause -> new IOException("Failed to read memory stats", cause))
.subscribeOn(dbScheduler);
.subscribeOn(dbRScheduler);
}
@Override
@ -715,7 +736,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
})
.onErrorMap(cause -> new IOException("Failed to verify checksum of database \""
+ getDatabaseName() + "\"", cause))
.subscribeOn(dbScheduler);
.subscribeOn(dbRScheduler);
}
@Override
@ -737,7 +758,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot);
return new LLSnapshot(currentSnapshotSequenceNumber);
}))
.subscribeOn(dbScheduler);
.subscribeOn(dbRScheduler);
}
@Override
@ -751,7 +772,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
db.releaseSnapshot(dbSnapshot);
return null;
})
.subscribeOn(dbScheduler);
.subscribeOn(dbRScheduler);
}
@Override
@ -768,7 +789,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
return null;
})
.onErrorMap(cause -> new IOException("Failed to close", cause))
.subscribeOn(dbScheduler);
.subscribeOn(dbWScheduler);
}
/**

View File

@ -9,7 +9,6 @@ import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.Callable;
@ -34,14 +33,16 @@ public class LLLocalSingleton implements LLSingleton {
private final String columnName;
private final Mono<Send<Buffer>> nameMono;
private final String databaseName;
private final Scheduler dbScheduler;
private final Scheduler dbWScheduler;
private final Scheduler dbRScheduler;
public LLLocalSingleton(RocksDBColumn db,
Function<LLSnapshot, Snapshot> snapshotResolver,
String databaseName,
byte[] name,
String columnName,
Scheduler dbScheduler,
Scheduler dbWScheduler,
Scheduler dbRScheduler,
byte @Nullable [] defaultValue) throws RocksDBException {
this.db = db;
this.databaseName = databaseName;
@ -55,7 +56,8 @@ public class LLLocalSingleton implements LLSingleton {
return nameBuf.send();
}
});
this.dbScheduler = dbScheduler;
this.dbWScheduler = dbWScheduler;
this.dbRScheduler = dbRScheduler;
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Initialized in a nonblocking thread");
}
@ -64,8 +66,8 @@ public class LLLocalSingleton implements LLSingleton {
}
}
private <T> @NotNull Mono<T> runOnDb(Callable<@Nullable T> callable) {
return Mono.fromCallable(callable).subscribeOn(dbScheduler);
private <T> @NotNull Mono<T> runOnDb(boolean write, Callable<@Nullable T> callable) {
return Mono.fromCallable(callable).subscribeOn(write ? dbWScheduler : dbRScheduler);
}
private ReadOptions resolveSnapshot(LLSnapshot snapshot) {
@ -83,7 +85,7 @@ public class LLLocalSingleton implements LLSingleton {
@Override
public Mono<Send<Buffer>> get(@Nullable LLSnapshot snapshot) {
return nameMono.publishOn(Schedulers.boundedElastic()).handle((nameSend, sink) -> {
return nameMono.publishOn(dbRScheduler).handle((nameSend, sink) -> {
try (Buffer name = nameSend.receive()) {
Buffer result = db.get(resolveSnapshot(snapshot), name);
if (result != null) {
@ -99,7 +101,7 @@ public class LLLocalSingleton implements LLSingleton {
@Override
public Mono<Void> set(Mono<Send<Buffer>> valueMono) {
return Mono.zip(nameMono, valueMono).publishOn(Schedulers.boundedElastic()).handle((tuple, sink) -> {
return Mono.zip(nameMono, valueMono).publishOn(dbWScheduler).handle((tuple, sink) -> {
var nameSend = tuple.getT1();
var valueSend = tuple.getT2();
try (Buffer name = nameSend.receive()) {
@ -114,7 +116,7 @@ public class LLLocalSingleton implements LLSingleton {
}
private Mono<Void> unset() {
return nameMono.publishOn(Schedulers.boundedElastic()).handle((nameSend, sink) -> {
return nameMono.publishOn(dbWScheduler).handle((nameSend, sink) -> {
try (Buffer name = nameSend.receive()) {
db.delete(EMPTY_WRITE_OPTIONS, name);
} catch (RocksDBException ex) {
@ -126,7 +128,7 @@ public class LLLocalSingleton implements LLSingleton {
@Override
public Mono<Send<Buffer>> update(BinarySerializationFunction updater,
UpdateReturnMode updateReturnMode) {
return Mono.usingWhen(nameMono, keySend -> runOnDb(() -> {
return Mono.usingWhen(nameMono, keySend -> runOnDb(true, () -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
}
@ -150,7 +152,7 @@ public class LLLocalSingleton implements LLSingleton {
@Override
public Mono<Send<LLDelta>> updateAndGetDelta(BinarySerializationFunction updater) {
return Mono.usingWhen(nameMono, keySend -> runOnDb(() -> {
return Mono.usingWhen(nameMono, keySend -> runOnDb(true, () -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
}