Further optimizations

This commit is contained in:
Andrea Cavalli 2022-01-26 21:18:43 +01:00
parent fb19a7a9f3
commit 574a35907d

View File

@ -13,7 +13,6 @@ import io.micrometer.core.instrument.Timer;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.Column;
@ -400,6 +399,18 @@ public class LLLocalDictionary implements LLDictionary {
.handle((entry, sink) -> {
try (var key = entry.getKey().receive()) {
try (var value = entry.getValue().receive()) {
put(key, value);
sink.complete();
} catch (RocksDBException ex) {
sink.error(ex);
}
}
});
// Read the previous data, then write the new data, then return the previous data
return Flux.concat(previousDataMono, putMono).singleOrEmpty();
}
private void put(Buffer key, Buffer value) throws RocksDBException {
assert key.isAccessible();
assert value.isAccessible();
if (logger.isTraceEnabled(MARKER_ROCKSDB)) {
@ -413,20 +424,14 @@ public class LLLocalDictionary implements LLDictionary {
return null;
});
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to write: " + ex.getMessage()));
return;
throw new RocksDBException("Failed to write: " + ex.getMessage());
} catch (RuntimeException ex) {
throw ex;
} catch (Exception ex) {
sink.error(ex);
return;
throw new RuntimeException("Failed to write", ex);
} finally {
endedPut.increment();
}
sink.complete();
}
}
});
// Read the previous data, then write the new data, then return the previous data
return Flux.concat(previousDataMono, putMono).singleOrEmpty();
}
@Override
@ -950,16 +955,12 @@ public class LLLocalDictionary implements LLDictionary {
@Override
public Mono<Void> setRange(Mono<Send<LLRange>> rangeMono, Flux<Send<LLEntry>> entries) {
//todo: change usingWhen and use a better alternative
return Mono.usingWhen(rangeMono,
rangeSend -> {
if (USE_WINDOW_IN_SET_RANGE) {
return this
.<Void>runOnDb(() -> {
return rangeMono
.publishOn(dbScheduler)
.<Void>handle((rangeSend, sink) -> {
try (var range = rangeSend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called setRange in a nonblocking thread");
}
assert !Schedulers.isInNonBlockingThread() : "Called setRange in a nonblocking thread";
if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) {
assert EMPTY_READ_OPTIONS.isOwningHandle();
try (var opts = new ReadOptions(EMPTY_READ_OPTIONS)) {
@ -1031,7 +1032,9 @@ public class LLLocalDictionary implements LLDictionary {
batch.clear();
}
}
return null;
sink.complete();
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to set a range: " + ex.getMessage()));
}
})
.thenMany(entries.window(MULTI_GET_WINDOW))
@ -1095,35 +1098,38 @@ public class LLLocalDictionary implements LLDictionary {
.onErrorMap(cause -> new IOException("Failed to write range", cause));
} else {
if (USE_WRITE_BATCHES_IN_SET_RANGE) {
return Mono.fromCallable(() -> {
throw new UnsupportedOperationException("Can't use write batches in setRange without window."
+ " Please fix the parameters");
});
return Mono.error(() -> new UnsupportedOperationException(
"Can't use write batches in setRange without window. Please fix the parameters"));
}
return this
var deleteMono = this
.getRange(null, rangeMono, false)
.flatMap(oldValueSend -> this.<Void>runOnDb(() -> {
.publishOn(dbScheduler)
.handle((oldValueSend, sink) -> {
try (var oldValue = oldValueSend.receive()) {
db.delete(EMPTY_WRITE_OPTIONS, oldValue.getKeyUnsafe());
return null;
sink.complete();
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to write range: " + ex.getMessage()));
}
}))
.then(entries
.flatMap(entrySend -> Mono.using(
entrySend::receive,
entry -> this
.put(LLUtils.lazyRetain(entry::getKey), LLUtils.lazyRetain(entry::getValue),
LLDictionaryResultType.VOID)
.doOnNext(Send::close),
ResourceSupport::close
))
.then(Mono.<Void>empty())
)
.onErrorMap(cause -> new IOException("Failed to write range", cause));
})
.then(Mono.<Void>empty());
var putMono = entries
.publishOn(dbScheduler)
.handle((entrySend, sink) -> {
try (var entry = entrySend.receive()) {
if (entry.getKeyUnsafe() != null && entry.getValueUnsafe() != null) {
this.put(entry.getKeyUnsafe(), entry.getValueUnsafe());
}
sink.complete();
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to write range: " + ex.getMessage()));
}
})
.then(Mono.<Void>empty());
return deleteMono.then(putMono);
}
},
rangeSend -> Mono.fromRunnable(rangeSend::close)
);
}
//todo: this is broken, check why. (is this still true?)
@ -1289,9 +1295,7 @@ public class LLLocalDictionary implements LLDictionary {
public Mono<Void> clear() {
return Mono
.<Void>fromCallable(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called clear in a nonblocking thread");
}
assert !Schedulers.isInNonBlockingThread() : "Called clear in a nonblocking thread";
boolean shouldCompactLater = false;
try (var readOpts = new ReadOptions(getReadOptions(null))) {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
@ -1360,13 +1364,11 @@ public class LLLocalDictionary implements LLDictionary {
@Override
public Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, boolean fast) {
return Mono.usingWhen(rangeMono, rangeSend -> runOnDb(() -> {
return rangeMono.publishOn(dbScheduler).handle((rangeSend, sink) -> {
try (var range = rangeSend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called sizeRange in a nonblocking thread");
}
assert !Schedulers.isInNonBlockingThread() : "Called sizeRange in a nonblocking thread";
if (range.isAll()) {
return fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot);
sink.next(fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot));
} else {
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
readOpts.setFillCache(false);
@ -1405,7 +1407,7 @@ public class LLLocalDictionary implements LLDictionary {
rocksIterator.status();
i++;
}
return i;
sink.next(i);
} finally {
if (seekTo != null) {
seekTo.close();
@ -1420,19 +1422,17 @@ public class LLLocalDictionary implements LLDictionary {
}
}
}
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to get size of range: " + ex.getMessage()));
}
}).onErrorMap(cause -> new IOException("Failed to get size of range", cause)),
rangeSend -> Mono.fromRunnable(rangeSend::close));
});
}
@Override
public Mono<Send<LLEntry>> getOne(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) {
return Mono.usingWhen(rangeMono,
rangeSend -> runOnDb(() -> {
return rangeMono.publishOn(dbScheduler).handle((rangeSend, sink) -> {
try (var range = rangeSend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called getOne in a nonblocking thread");
}
assert !Schedulers.isInNonBlockingThread() : "Called getOne in a nonblocking thread";
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
ReleasableSlice minBound;
if (range.hasMin()) {
@ -1460,11 +1460,11 @@ public class LLLocalDictionary implements LLDictionary {
if (rocksIterator.isValid()) {
try (var key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)) {
try (var value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value)) {
return LLEntry.of(key.send(), value.send()).send();
sink.next(LLEntry.of(key.send(), value.send()).send());
}
}
} else {
return null;
sink.complete();
}
} finally {
if (seekTo != null) {
@ -1478,20 +1478,17 @@ public class LLLocalDictionary implements LLDictionary {
minBound.close();
}
}
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to get one entry: " + ex.getMessage()));
}
}),
rangeSend -> Mono.fromRunnable(rangeSend::close)
);
});
}
@Override
public Mono<Send<Buffer>> getOneKey(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) {
return Mono.usingWhen(rangeMono,
rangeSend -> runOnDb(() -> {
return rangeMono.publishOn(dbScheduler).handle((rangeSend, sink) -> {
try (var range = rangeSend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called getOneKey in a nonblocking thread");
}
assert !Schedulers.isInNonBlockingThread() : "Called getOneKey in a nonblocking thread";
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
ReleasableSlice minBound;
if (range.hasMin()) {
@ -1517,9 +1514,9 @@ public class LLLocalDictionary implements LLDictionary {
try {
rocksIterator.status();
if (rocksIterator.isValid()) {
return LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).send();
sink.next(LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).send());
} else {
return null;
sink.complete();
}
} finally {
if (seekTo != null) {
@ -1533,10 +1530,10 @@ public class LLLocalDictionary implements LLDictionary {
minBound.close();
}
}
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to get one key: " + ex.getMessage()));
}
}),
rangeSend -> Mono.fromRunnable(rangeSend::close)
);
});
}
private long fastSizeAll(@Nullable LLSnapshot snapshot) throws RocksDBException {
@ -1652,12 +1649,9 @@ public class LLLocalDictionary implements LLDictionary {
@Override
public Mono<Send<LLEntry>> removeOne(Mono<Send<LLRange>> rangeMono) {
return Mono.usingWhen(rangeMono,
rangeSend -> runOnDb(() -> {
return rangeMono.publishOn(dbScheduler).handle((rangeSend, sink) -> {
try (var range = rangeSend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called removeOne in a nonblocking thread");
}
assert !Schedulers.isInNonBlockingThread() : "Called removeOne in a nonblocking thread";
try (var readOpts = new ReadOptions(getReadOptions(null))) {
ReleasableSlice minBound;
if (range.hasMin()) {
@ -1683,12 +1677,13 @@ public class LLLocalDictionary implements LLDictionary {
try {
rocksIterator.status();
if (!rocksIterator.isValid()) {
return null;
sink.complete();
return;
}
Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
Buffer value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value);
db.delete(EMPTY_WRITE_OPTIONS, key);
return LLEntry.of(key, value).send();
sink.next(LLEntry.of(key, value).send());
} finally {
if (seekTo != null) {
seekTo.close();
@ -1701,10 +1696,10 @@ public class LLLocalDictionary implements LLDictionary {
minBound.close();
}
}
} catch (RocksDBException ex) {
sink.error(ex);
}
}).onErrorMap(cause -> new IOException("Failed to delete", cause)),
rangeSend -> Mono.fromRunnable(rangeSend::close)
);
});
}
/**