From b1612cb20f1b764a56a01f7d1de9022f20111288 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 30 Jan 2021 01:42:37 +0100 Subject: [PATCH] Update LLLocalKeyValueDatabase.java --- .../disk/LLLocalKeyValueDatabase.java | 33 ++++++++++++------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 6e68669..49f6ace 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -36,6 +36,8 @@ import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.Snapshot; import org.rocksdb.WALRecoveryMode; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { @@ -321,20 +323,29 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } @Override - public LLSnapshot takeSnapshot() { - var snapshot = db.getSnapshot(); - long currentSnapshotSequenceNumber = nextSnapshotNumbers.getAndIncrement(); - this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot); - return new LLSnapshot(currentSnapshotSequenceNumber); + public Mono takeSnapshot() { + return Mono + .fromCallable(() -> { + var snapshot = db.getSnapshot(); + long currentSnapshotSequenceNumber = nextSnapshotNumbers.getAndIncrement(); + this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot); + return new LLSnapshot(currentSnapshotSequenceNumber); + }) + .subscribeOn(Schedulers.boundedElastic()); } @Override - public void releaseSnapshot(LLSnapshot snapshot) throws IOException { - Snapshot dbSnapshot = this.snapshotsHandles.remove(snapshot.getSequenceNumber()); - if (dbSnapshot == null) { - throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!"); - } - db.releaseSnapshot(dbSnapshot); + public Mono releaseSnapshot(LLSnapshot snapshot) { + return Mono + .fromCallable(() -> { + Snapshot dbSnapshot = this.snapshotsHandles.remove(snapshot.getSequenceNumber()); + if (dbSnapshot == null) { + throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!"); + } + db.releaseSnapshot(dbSnapshot); + return null; + }) + .subscribeOn(Schedulers.boundedElastic()); } @Override