Update LLLocalKeyValueDatabase.java

This commit is contained in:
Andrea Cavalli 2021-01-30 01:42:37 +01:00
parent 52d4f022bd
commit b1612cb20f

View File

@ -36,6 +36,8 @@ import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot;
import org.rocksdb.WALRecoveryMode;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
@ -321,20 +323,29 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
}
@Override
public LLSnapshot takeSnapshot() {
var snapshot = db.getSnapshot();
long currentSnapshotSequenceNumber = nextSnapshotNumbers.getAndIncrement();
this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot);
return new LLSnapshot(currentSnapshotSequenceNumber);
public Mono<LLSnapshot> takeSnapshot() {
return Mono
.fromCallable(() -> {
var snapshot = db.getSnapshot();
long currentSnapshotSequenceNumber = nextSnapshotNumbers.getAndIncrement();
this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot);
return new LLSnapshot(currentSnapshotSequenceNumber);
})
.subscribeOn(Schedulers.boundedElastic());
}
@Override
public void releaseSnapshot(LLSnapshot snapshot) throws IOException {
Snapshot dbSnapshot = this.snapshotsHandles.remove(snapshot.getSequenceNumber());
if (dbSnapshot == null) {
throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!");
}
db.releaseSnapshot(dbSnapshot);
public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
return Mono
.<Void>fromCallable(() -> {
Snapshot dbSnapshot = this.snapshotsHandles.remove(snapshot.getSequenceNumber());
if (dbSnapshot == null) {
throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!");
}
db.releaseSnapshot(dbSnapshot);
return null;
})
.subscribeOn(Schedulers.boundedElastic());
}
@Override