Fix putmulti

This commit is contained in:
Andrea Cavalli 2022-04-12 00:25:18 +02:00
parent 1b150dcbaf
commit 0ad5a15792

View File

@ -643,7 +643,7 @@ public class LLLocalDictionary implements LLDictionary {
db.put(EMPTY_WRITE_OPTIONS, entry.getKeyUnsafe(), entry.getValueUnsafe());
}
}
sink.complete();
sink.next(true);
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to write: " + ex.getMessage()));
} finally {
@ -963,7 +963,7 @@ public class LLLocalDictionary implements LLDictionary {
if (USE_WINDOW_IN_SET_RANGE) {
return rangeMono
.publishOn(dbWScheduler)
.<Void>handle((rangeSend, sink) -> {
.<Boolean>handle((rangeSend, sink) -> {
try (var range = rangeSend.receive()) {
assert !Schedulers.isInNonBlockingThread() : "Called setRange in a nonblocking thread";
if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) {
@ -1035,7 +1035,7 @@ public class LLLocalDictionary implements LLDictionary {
batch.clear();
}
}
sink.complete();
sink.next(true);
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to set a range: " + ex.getMessage()));
}
@ -1110,7 +1110,7 @@ public class LLLocalDictionary implements LLDictionary {
.handle((oldValueSend, sink) -> {
try (var oldValue = oldValueSend.receive()) {
db.delete(EMPTY_WRITE_OPTIONS, oldValue.getKeyUnsafe());
sink.complete();
sink.next(true);
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to write range: " + ex.getMessage()));
}
@ -1124,7 +1124,7 @@ public class LLLocalDictionary implements LLDictionary {
if (entry.getKeyUnsafe() != null && entry.getValueUnsafe() != null) {
this.put(entry.getKeyUnsafe(), entry.getValueUnsafe());
}
sink.complete();
sink.next(true);
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to write range: " + ex.getMessage()));
}