From 9111743bd36ab4f2d30dc636c2ec7c4170933747 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 11 Feb 2021 22:27:43 +0100 Subject: [PATCH] Schedule singleton values on correct scheduler --- .../database/disk/LLLocalKeyValueDatabase.java | 1 + .../dbengine/database/disk/LLLocalSingleton.java | 13 ++++++++----- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index fb52bf9..61cd9fd 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -282,6 +282,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()), LLLocalKeyValueDatabase.this.name, name, + dbScheduler, defaultValue )) .onErrorMap(IOException::new) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java index 5a34a0f..3c40f37 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java @@ -1,5 +1,7 @@ package it.cavallium.dbengine.database.disk; +import it.cavallium.dbengine.database.LLSingleton; +import it.cavallium.dbengine.database.LLSnapshot; import java.io.IOException; import java.util.function.Function; import org.jetbrains.annotations.Nullable; @@ -8,10 +10,8 @@ import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.Snapshot; -import it.cavallium.dbengine.database.LLSingleton; -import it.cavallium.dbengine.database.LLSnapshot; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; +import reactor.core.scheduler.Scheduler; public class LLLocalSingleton implements LLSingleton { @@ -21,17 +21,20 @@ public class LLLocalSingleton implements LLSingleton { private final Function snapshotResolver; private final byte[] name; private final String databaseName; + private final Scheduler dbScheduler; public LLLocalSingleton(RocksDB db, ColumnFamilyHandle singletonListColumn, Function snapshotResolver, String databaseName, byte[] name, + Scheduler dbScheduler, byte[] defaultValue) throws RocksDBException { this.db = db; this.cfh = singletonListColumn; this.databaseName = databaseName; this.snapshotResolver = snapshotResolver; this.name = name; + this.dbScheduler = dbScheduler; if (db.get(cfh, this.name) == null) { db.put(cfh, this.name, defaultValue); } @@ -50,7 +53,7 @@ public class LLLocalSingleton implements LLSingleton { return Mono .fromCallable(() -> db.get(cfh, resolveSnapshot(snapshot), name)) .onErrorMap(IOException::new) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(dbScheduler); } @Override @@ -61,7 +64,7 @@ public class LLLocalSingleton implements LLSingleton { return null; }) .onErrorMap(IOException::new) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(dbScheduler); } @Override