Schedule singleton values on correct scheduler

This commit is contained in:
Andrea Cavalli 2021-02-11 22:27:43 +01:00
parent b4e25c8573
commit 9111743bd3
2 changed files with 9 additions and 5 deletions

View File

@ -282,6 +282,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
(snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()), (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()),
LLLocalKeyValueDatabase.this.name, LLLocalKeyValueDatabase.this.name,
name, name,
dbScheduler,
defaultValue defaultValue
)) ))
.onErrorMap(IOException::new) .onErrorMap(IOException::new)

View File

@ -1,5 +1,7 @@
package it.cavallium.dbengine.database.disk; package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot;
import java.io.IOException; import java.io.IOException;
import java.util.function.Function; import java.util.function.Function;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
@ -8,10 +10,8 @@ import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB; import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot; import org.rocksdb.Snapshot;
import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Scheduler;
public class LLLocalSingleton implements LLSingleton { public class LLLocalSingleton implements LLSingleton {
@ -21,17 +21,20 @@ public class LLLocalSingleton implements LLSingleton {
private final Function<LLSnapshot, Snapshot> snapshotResolver; private final Function<LLSnapshot, Snapshot> snapshotResolver;
private final byte[] name; private final byte[] name;
private final String databaseName; private final String databaseName;
private final Scheduler dbScheduler;
public LLLocalSingleton(RocksDB db, ColumnFamilyHandle singletonListColumn, public LLLocalSingleton(RocksDB db, ColumnFamilyHandle singletonListColumn,
Function<LLSnapshot, Snapshot> snapshotResolver, Function<LLSnapshot, Snapshot> snapshotResolver,
String databaseName, String databaseName,
byte[] name, byte[] name,
Scheduler dbScheduler,
byte[] defaultValue) throws RocksDBException { byte[] defaultValue) throws RocksDBException {
this.db = db; this.db = db;
this.cfh = singletonListColumn; this.cfh = singletonListColumn;
this.databaseName = databaseName; this.databaseName = databaseName;
this.snapshotResolver = snapshotResolver; this.snapshotResolver = snapshotResolver;
this.name = name; this.name = name;
this.dbScheduler = dbScheduler;
if (db.get(cfh, this.name) == null) { if (db.get(cfh, this.name) == null) {
db.put(cfh, this.name, defaultValue); db.put(cfh, this.name, defaultValue);
} }
@ -50,7 +53,7 @@ public class LLLocalSingleton implements LLSingleton {
return Mono return Mono
.fromCallable(() -> db.get(cfh, resolveSnapshot(snapshot), name)) .fromCallable(() -> db.get(cfh, resolveSnapshot(snapshot), name))
.onErrorMap(IOException::new) .onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic()); .subscribeOn(dbScheduler);
} }
@Override @Override
@ -61,7 +64,7 @@ public class LLLocalSingleton implements LLSingleton {
return null; return null;
}) })
.onErrorMap(IOException::new) .onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic()); .subscribeOn(dbScheduler);
} }
@Override @Override