Clean database code

This commit is contained in:
Andrea Cavalli 2022-01-26 19:03:51 +01:00
parent cdb65b31f3
commit 95afa6f9dd
11 changed files with 355 additions and 387 deletions

View File

@ -67,7 +67,7 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
return getMulti(snapshot, keys, false);
}
Flux<Send<LLEntry>> putMulti(Flux<Send<LLEntry>> entries, boolean getOldValues);
Mono<Void> putMulti(Flux<Send<LLEntry>> entries);
<K> Flux<Boolean> updateMulti(Flux<K> keys, Flux<Send<Buffer>> serializedKeys,
KVSerializationFunction<K, @Nullable Send<Buffer>, @Nullable Buffer> updateFunction);
@ -111,11 +111,7 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
.flatMap(entriesReplacer)
);
} else {
return this
.putMulti(this
.getRange(null, range, existsAlmostCertainly)
.flatMap(entriesReplacer), false)
.then();
return this.putMulti(this.getRange(null, range, existsAlmostCertainly).flatMap(entriesReplacer));
}
});
}

View File

@ -391,7 +391,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
sink.error(e);
}
});
return dictionary.putMulti(serializedEntries, false).then();
return dictionary.putMulti(serializedEntries);
}
@Override

View File

@ -1,6 +1,5 @@
package it.cavallium.dbengine.database.collections;
import io.net5.buffer.api.Buffer;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLUtils;
@ -81,8 +80,10 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
default Mono<Delta<U>> updateValueAndGetDelta(T key,
boolean existsAlmostCertainly,
SerializationFunction<@Nullable U, @Nullable U> updater) {
return LLUtils.usingResource(this.at(null, key).single(),
stage -> stage.updateAndGetDelta(updater, existsAlmostCertainly), true);
var stageMono = this.at(null, key).single();
return stageMono.flatMap(stage -> stage
.updateAndGetDelta(updater, existsAlmostCertainly)
.doFinally(s -> stage.close()));
}
default Mono<Delta<U>> updateValueAndGetDelta(T key, SerializationFunction<@Nullable U, @Nullable U> updater) {

View File

@ -38,6 +38,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.TimeUnit;
@ -68,7 +69,6 @@ import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuple4;
import reactor.util.function.Tuples;
public class LLLocalDictionary implements LLDictionary {
@ -128,7 +128,6 @@ public class LLLocalDictionary implements LLDictionary {
private final Scheduler dbScheduler;
private final Function<LLSnapshot, Snapshot> snapshotResolver;
private final UpdateMode updateMode;
private final DatabaseOptions databaseOptions;
private final boolean nettyDirect;
private final BufferAllocator alloc;
@ -165,7 +164,6 @@ public class LLLocalDictionary implements LLDictionary {
this.dbScheduler = dbScheduler;
this.snapshotResolver = snapshotResolver;
this.updateMode = updateMode;
this.databaseOptions = databaseOptions;
alloc = allocator;
this.nettyDirect = databaseOptions.allowNettyDirect() && alloc.getAllocationType() == OFF_HEAP;
var meterRegistry = db.getMeterRegistry();
@ -350,6 +348,9 @@ public class LLLocalDictionary implements LLDictionary {
});
assert result != null;
sink.next(!result);
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to read range " + LLUtils.toStringSafe(range)
+ ": " + ex.getMessage()));
} finally {
endedContains.increment();
}
@ -397,39 +398,43 @@ public class LLLocalDictionary implements LLDictionary {
// Obtain the previous value from the database
var previousDataMono = this.getPreviousData(keyMono, resultType, false);
// Write the new entry to the database
var putMono = entryMono
Mono<Send<Buffer>> putMono = entryMono
.publishOn(dbScheduler)
.<Void>handle((entry, sink) -> {
.handle((entry, sink) -> {
try (var key = entry.getKey().receive()) {
try (var value = entry.getValue().receive()) {
assert key.isAccessible();
assert value.isAccessible();
var varargs = new Supplier<?>[]{() -> toStringSafe(key), () -> toStringSafe(value)};
logger.trace(MARKER_ROCKSDB, "Writing {}: {}", varargs);
db.put(EMPTY_WRITE_OPTIONS, key, value);
if (logger.isTraceEnabled(MARKER_ROCKSDB)) {
var varargs = new Supplier<?>[]{() -> toStringSafe(key), () -> toStringSafe(value)};
logger.trace(MARKER_ROCKSDB, "Writing {}: {}", varargs);
}
startedPut.increment();
try {
putTime.recordCallable(() -> {
db.put(EMPTY_WRITE_OPTIONS, key, value);
return null;
});
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to write: " + ex.getMessage()));
return;
} catch (Exception ex) {
sink.error(ex);
return;
} finally {
endedPut.increment();
}
sink.complete();
}
} catch (Throwable ex) {
sink.error(ex);
}
});
// Read the previous data, then write the new data, then return the previous data
return Flux
.concat(previousDataMono, putMono.then(Mono.empty()))
.singleOrEmpty()
.onErrorMap(cause -> new IOException("Failed to write", cause))
.elapsed()
.map(tuple -> {
putTime.record(tuple.getT1(), TimeUnit.MILLISECONDS);
return tuple.getT2();
})
.doFirst(startedPut::increment)
.doFinally(s -> endedPut.increment());
return Flux.concat(previousDataMono, putMono).singleOrEmpty();
}
@Override
public Mono<UpdateMode> getUpdateMode() {
return Mono.fromSupplier(() -> updateMode);
return Mono.just(updateMode);
}
@SuppressWarnings("DuplicatedCode")
@ -438,34 +443,43 @@ public class LLLocalDictionary implements LLDictionary {
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
UpdateReturnMode updateReturnMode,
boolean existsAlmostCertainly) {
return Mono.usingWhen(keyMono, keySend -> runOnDb(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
return keyMono
.publishOn(dbScheduler)
.handle((keySend, sink) -> {
try (keySend) {
assert !Schedulers.isInNonBlockingThread() : "Called update in a nonblocking thread";
if (updateMode == UpdateMode.DISALLOW) {
sink.error(new UnsupportedOperationException("update() is disallowed"));
return;
}
UpdateAtomicResultMode returnMode = switch (updateReturnMode) {
case NOTHING -> UpdateAtomicResultMode.NOTHING;
case GET_NEW_VALUE -> UpdateAtomicResultMode.CURRENT;
case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS;
};
UpdateAtomicResult result;
startedUpdates.increment();
try {
result = updateTime.recordCallable(() -> db.updateAtomic(EMPTY_READ_OPTIONS,
EMPTY_WRITE_OPTIONS, keySend, updater, existsAlmostCertainly, returnMode));
} finally {
endedUpdates.increment();
}
assert result != null;
var previous = switch (updateReturnMode) {
case NOTHING -> null;
case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current();
case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous();
};
if (previous != null) {
sink.next(previous);
} else {
sink.complete();
}
} catch (Exception ex) {
sink.error(ex);
}
if (updateMode == UpdateMode.DISALLOW) {
throw new UnsupportedOperationException("update() is disallowed");
}
UpdateAtomicResultMode returnMode = switch (updateReturnMode) {
case NOTHING -> UpdateAtomicResultMode.NOTHING;
case GET_NEW_VALUE -> UpdateAtomicResultMode.CURRENT;
case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS;
};
UpdateAtomicResult result;
startedUpdates.increment();
try {
result = updateTime.recordCallable(() -> db.updateAtomic(EMPTY_READ_OPTIONS,
EMPTY_WRITE_OPTIONS, keySend, updater, existsAlmostCertainly, returnMode));
} finally {
endedUpdates.increment();
}
assert result != null;
return switch (updateReturnMode) {
case NOTHING -> null;
case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current();
case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous();
};
}).onErrorMap(cause -> new IOException("Failed to read or write", cause)),
keySend -> Mono.fromRunnable(keySend::close));
});
}
@SuppressWarnings("DuplicatedCode")
@ -473,58 +487,65 @@ public class LLLocalDictionary implements LLDictionary {
public Mono<Send<LLDelta>> updateAndGetDelta(Mono<Send<Buffer>> keyMono,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
boolean existsAlmostCertainly) {
return Mono.usingWhen(keyMono, keySend -> runOnDb(() -> {
if (Schedulers.isInNonBlockingThread()) {
keySend.close();
throw new UnsupportedOperationException("Called update in a nonblocking thread");
return keyMono
.publishOn(dbScheduler)
.handle((keySend, sink) -> {
try (keySend) {
assert !Schedulers.isInNonBlockingThread() : "Called update in a nonblocking thread";
if (updateMode == UpdateMode.DISALLOW) {
sink.error(new UnsupportedOperationException("update() is disallowed"));
return;
}
if (updateMode == UpdateMode.ALLOW && !db.supportsTransactions()) {
sink.error(new UnsupportedOperationException("update() is disallowed because the database doesn't support"
+ "safe atomic operations"));
return;
}
UpdateAtomicResult result;
startedUpdates.increment();
try {
result = updateTime.recordCallable(() -> db.updateAtomic(EMPTY_READ_OPTIONS,
EMPTY_WRITE_OPTIONS, keySend, updater, existsAlmostCertainly, UpdateAtomicResultMode.DELTA));
} finally {
endedUpdates.increment();
}
assert result != null;
sink.next(((UpdateAtomicResultDelta) result).delta());
} catch (Exception ex) {
sink.error(ex);
}
if (updateMode == UpdateMode.DISALLOW) {
keySend.close();
throw new UnsupportedOperationException("update() is disallowed");
}
if (updateMode == UpdateMode.ALLOW && !db.supportsTransactions()) {
throw new UnsupportedOperationException("update() is disallowed because the database doesn't support"
+ "safe atomic operations");
}
UpdateAtomicResult result;
startedUpdates.increment();
try {
result = updateTime.recordCallable(() -> db.updateAtomic(EMPTY_READ_OPTIONS,
EMPTY_WRITE_OPTIONS, keySend, updater, existsAlmostCertainly, UpdateAtomicResultMode.DELTA));
} finally {
endedUpdates.increment();
}
assert result != null;
return ((UpdateAtomicResultDelta) result).delta();
}).onErrorMap(cause -> new IOException("Failed to read or write", cause)),
keySend -> Mono.fromRunnable(keySend::close));
});
}
@Override
public Mono<Send<Buffer>> remove(Mono<Send<Buffer>> keyMono, LLDictionaryResultType resultType) {
return Mono.usingWhen(keyMono,
keySend -> this
.getPreviousData(keyMono, resultType, true)
.concatWith(this
.<Send<Buffer>>runOnDb(() -> {
try (var key = keySend.receive()) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {}", toStringSafe(key));
db.delete(EMPTY_WRITE_OPTIONS, key);
} else {
db.delete(EMPTY_WRITE_OPTIONS, key);
}
}
return null;
})
.onErrorMap(cause -> new IOException("Failed to delete", cause))
)
.singleOrEmpty(),
keySend -> Mono.fromRunnable(keySend::close)
).elapsed().map(tuple -> {
removeTime.record(tuple.getT1(), TimeUnit.MILLISECONDS);
return tuple.getT2();
}).doFirst(startedRemove::increment).doFinally(s -> endedRemove.increment());
// Obtain the previous value from the database
Mono<Send<Buffer>> previousDataMono = this.getPreviousData(keyMono, resultType, true);
// Delete the value from the database
Mono<Send<Buffer>> removeMono = keyMono
.publishOn(dbScheduler)
.handle((keySend, sink) -> {
try (var key = keySend.receive()) {
logger.trace(MARKER_ROCKSDB, "Deleting {}", () -> toStringSafe(key));
startedRemove.increment();
try {
removeTime.recordCallable(() -> {
db.delete(EMPTY_WRITE_OPTIONS, key);
return null;
});
} finally {
endedRemove.increment();
}
sink.complete();
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to delete: " + ex.getMessage()));
} catch (Exception ex) {
sink.error(ex);
}
});
// Read the previous data, then delete the data, then return the previous data
return Flux.concat(previousDataMono, removeMono).singleOrEmpty();
}
private Mono<Send<Buffer>> getPreviousData(Mono<Send<Buffer>> keyMono, LLDictionaryResultType resultType,
@ -534,28 +555,22 @@ public class LLLocalDictionary implements LLDictionary {
.containsKey(null, keyMono)
.single()
.map((Boolean bool) -> LLUtils.booleanToResponseByteBuffer(alloc, bool));
case PREVIOUS_VALUE -> Mono.usingWhen(
keyMono,
keySend -> this
.runOnDb(() -> {
try (var key = keySend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called getPreviousData in a nonblocking thread");
}
if (logger.isTraceEnabled()) {
var keyString = toStringSafe(key);
var result = db.get(EMPTY_READ_OPTIONS, key, existsAlmostCertainly);
logger.trace(MARKER_ROCKSDB, "Reading {}: {}", keyString, toStringSafe(result));
return result == null ? null : result.send();
} else {
var result = db.get(EMPTY_READ_OPTIONS, key, existsAlmostCertainly);
return result == null ? null : result.send();
}
}
})
.onErrorMap(cause -> new IOException("Failed to read ", cause)),
keySend -> Mono.fromRunnable(keySend::close));
case PREVIOUS_VALUE -> keyMono
.publishOn(dbScheduler)
.handle((keySend, sink) -> {
try (var key = keySend.receive()) {
assert !Schedulers.isInNonBlockingThread() : "Called getPreviousData in a nonblocking thread";
var result = db.get(EMPTY_READ_OPTIONS, key, existsAlmostCertainly);
logger.trace(MARKER_ROCKSDB, "Reading {}: {}", () -> toStringSafe(key), () -> toStringSafe(result));
if (result == null) {
sink.complete();
} else {
sink.next(result.send());
}
} catch (Exception ex) {
sink.error(ex);
}
});
case VOID -> Mono.empty();
};
}
@ -566,15 +581,14 @@ public class LLLocalDictionary implements LLDictionary {
boolean existsAlmostCertainly) {
return keys
.buffer(MULTI_GET_WINDOW)
.flatMapSequential(keysWindow -> runOnDb(() -> {
.publishOn(dbScheduler)
.<ArrayList<Optional<Buffer>>>handle((keysWindow, sink) -> {
List<Buffer> keyBufsWindow = new ArrayList<>(keysWindow.size());
for (Send<Buffer> bufferSend : keysWindow) {
keyBufsWindow.add(bufferSend.receive());
}
try {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called getMulti in a nonblocking thread");
}
assert !Schedulers.isInNonBlockingThread() : "Called getMulti in a nonblocking thread";
var readOptions = Objects.requireNonNullElse(resolveSnapshot(snapshot), EMPTY_READ_OPTIONS);
List<byte[]> results = db.multiGetAsList(readOptions, LLUtils.toArray(keyBufsWindow));
var mappedResults = new ArrayList<Optional<Buffer>>(results.size());
@ -591,84 +605,68 @@ public class LLLocalDictionary implements LLDictionary {
}
mappedResults.add(valueOpt);
}
return mappedResults;
sink.next(mappedResults);
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to read keys: " + ex.getMessage()));
} finally {
for (Buffer buffer : keyBufsWindow) {
buffer.close();
}
}
})
.flatMapIterable(list -> list)
.onErrorMap(cause -> new IOException("Failed to read keys", cause))
.doAfterTerminate(() -> keysWindow.forEach(Send::close)), 2); // Max concurrency is 2 to read data while preparing the next segment;
.flatMapIterable(list -> list);
}
@Override
public Flux<Send<LLEntry>> putMulti(Flux<Send<LLEntry>> entries, boolean getOldValues) {
public Mono<Void> putMulti(Flux<Send<LLEntry>> entries) {
return entries
.buffer(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP))
.flatMapSequential(ew -> Mono
.<List<Send<LLEntry>>>fromCallable(() -> {
var entriesWindow = new ArrayList<LLEntry>(ew.size());
for (Send<LLEntry> entrySend : ew) {
entriesWindow.add(entrySend.receive());
}
try {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called putMulti in a nonblocking thread");
}
ArrayList<Send<LLEntry>> oldValues;
if (getOldValues) {
oldValues = new ArrayList<>(entriesWindow.size());
try (var readOptions = resolveSnapshot(null)) {
for (LLEntry entry : entriesWindow) {
try (var key = entry.getKey().receive()) {
Buffer oldValue = db.get(readOptions, key, false);
if (oldValue != null) {
oldValues.add(LLEntry.of(key, oldValue).send());
}
}
.publishOn(dbScheduler)
.handle((entriesWindowList, sink) -> {
var entriesWindow = new ArrayList<LLEntry>(entriesWindowList.size());
for (Send<LLEntry> entrySend : entriesWindowList) {
entriesWindow.add(entrySend.receive());
}
try {
assert !Schedulers.isInNonBlockingThread() : "Called putMulti in a nonblocking thread";
if (USE_WRITE_BATCHES_IN_PUT_MULTI) {
var batch = new CappedWriteBatch(db,
alloc,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS
);
for (LLEntry entry : entriesWindow) {
var k = entry.getKey();
var v = entry.getValue();
if (nettyDirect) {
batch.put(cfh, k, v);
} else {
try (var key = k.receive()) {
try (var value = v.receive()) {
batch.put(cfh, LLUtils.toArray(key), LLUtils.toArray(value));
}
}
} else {
oldValues = null;
}
if (USE_WRITE_BATCHES_IN_PUT_MULTI) {
var batch = new CappedWriteBatch(db,
alloc,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS
);
for (LLEntry entry : entriesWindow) {
var k = entry.getKey();
var v = entry.getValue();
if (nettyDirect) {
batch.put(cfh, k, v);
} else {
try (var key = k.receive()) {
try (var value = v.receive()) {
batch.put(cfh, LLUtils.toArray(key), LLUtils.toArray(value));
}
}
}
}
batch.writeToDbAndClose();
batch.close();
} else {
for (LLEntry entry : entriesWindow) {
db.put(EMPTY_WRITE_OPTIONS, entry.getKeyUnsafe(), entry.getValueUnsafe());
}
}
return oldValues;
} finally {
for (LLEntry llEntry : entriesWindow) {
llEntry.close();
}
}
}).subscribeOn(dbScheduler), 2) // Max concurrency is 2 to read data while preparing the next segment
.flatMapIterable(oldValuesList -> oldValuesList);
batch.writeToDbAndClose();
batch.close();
} else {
for (LLEntry entry : entriesWindow) {
db.put(EMPTY_WRITE_OPTIONS, entry.getKeyUnsafe(), entry.getValueUnsafe());
}
}
sink.complete();
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to write: " + ex.getMessage()));
} finally {
for (LLEntry llEntry : entriesWindow) {
llEntry.close();
}
}
})
.then();
}
@Override
@ -895,30 +893,22 @@ public class LLLocalDictionary implements LLDictionary {
ro.setReadaheadSize(32 * 1024);
}
ro.setVerifyChecksums(true);
var rocksIteratorTuple = getRocksIterator(alloc,
nettyDirect, ro, range, db
);
try {
try (var rocksIterator = rocksIteratorTuple.getT1()) {
rocksIterator.seekToFirst();
rocksIterator.status();
while (rocksIterator.isValid() && !sink.isCancelled()) {
try {
rocksIterator.status();
rocksIterator.key(DUMMY_WRITE_ONLY_BYTE_BUFFER);
rocksIterator.status();
rocksIterator.value(DUMMY_WRITE_ONLY_BYTE_BUFFER);
rocksIterator.status();
} catch (RocksDBException ex) {
sink.next(new BadBlock(databaseName, Column.special(columnName), null, ex));
}
rocksIterator.next();
try (var rocksIteratorTuple = getRocksIterator(nettyDirect, ro, range, db)) {
var rocksIterator = rocksIteratorTuple.iterator();
rocksIterator.seekToFirst();
rocksIterator.status();
while (rocksIterator.isValid() && !sink.isCancelled()) {
try {
rocksIterator.status();
rocksIterator.key(DUMMY_WRITE_ONLY_BYTE_BUFFER);
rocksIterator.status();
rocksIterator.value(DUMMY_WRITE_ONLY_BYTE_BUFFER);
rocksIterator.status();
} catch (RocksDBException ex) {
sink.next(new BadBlock(databaseName, Column.special(columnName), null, ex));
}
rocksIterator.next();
}
} finally {
rocksIteratorTuple.getT2().close();
rocksIteratorTuple.getT3().close();
rocksIteratorTuple.getT4().close();
}
sink.complete();
} catch (Throwable ex) {
@ -1271,6 +1261,9 @@ public class LLLocalDictionary implements LLDictionary {
}
}
/**
* This method should not modify or move the writerIndex/readerIndex of the key
*/
private static ReleasableSlice setIterateBound(boolean allowNettyDirect,
ReadOptions readOpts, IterateBound boundType, Buffer key) {
requireNonNull(key);
@ -1281,46 +1274,36 @@ public class LLLocalDictionary implements LLDictionary {
assert keyInternalByteBuffer.position() == 0;
slice = new DirectSlice(keyInternalByteBuffer, key.readableBytes());
assert slice.size() == key.readableBytes();
assert slice.compare(new Slice(LLUtils.toArray(key))) == 0;
if (boundType == IterateBound.LOWER) {
readOpts.setIterateLowerBound(slice);
} else {
readOpts.setIterateUpperBound(slice);
}
return new ReleasableSliceImpl(slice, null, key);
} else {
slice = new Slice(requireNonNull(LLUtils.toArray(key)));
if (boundType == IterateBound.LOWER) {
readOpts.setIterateLowerBound(slice);
} else {
readOpts.setIterateUpperBound(slice);
}
return new ReleasableSliceImpl(slice, null, null);
}
if (boundType == IterateBound.LOWER) {
readOpts.setIterateLowerBound(slice);
} else {
readOpts.setIterateUpperBound(slice);
}
return new ReleasableSliceImplWithRelease(slice);
}
private static ReleasableSlice emptyReleasableSlice() {
var arr = new byte[0];
return new SimpleSliceWithoutRelease(new Slice(arr), null, arr);
return new ReleasableSliceImplWithoutRelease(new Slice(arr));
}
public record SimpleSliceWithoutRelease(AbstractSlice<?> slice, @Nullable Buffer byteBuf,
@Nullable Object additionalData) implements ReleasableSlice {}
/**
* This method should not modify or move the writerIndex/readerIndex of the key
*/
public record ReleasableSliceImplWithoutRelease(AbstractSlice<?> slice) implements ReleasableSlice {}
public record ReleasableSliceImpl(AbstractSlice<?> slice, @Nullable Buffer byteBuf,
@Nullable Object additionalData) implements ReleasableSlice {
/**
* This class should not modify or move the writerIndex/readerIndex of the key
*/
public record ReleasableSliceImplWithRelease(AbstractSlice<?> slice) implements ReleasableSlice {
@Override
public void close() {
slice.clear();
slice.close();
if (byteBuf != null) {
byteBuf.close();
}
if (additionalData instanceof ByteBuffer bb && bb.isDirect()) {
PlatformDependent.freeDirectBuffer(bb);
}
}
}
@ -1745,15 +1728,16 @@ public class LLLocalDictionary implements LLDictionary {
);
}
/**
* This method should not modify or move the writerIndex/readerIndex of the buffers inside the range
*/
@NotNull
public static Tuple4<RocksIterator, ReleasableSlice, ReleasableSlice, SafeCloseable> getRocksIterator(BufferAllocator alloc,
public static RocksIteratorTuple getRocksIterator(
boolean allowNettyDirect,
ReadOptions readOptions,
LLRange range,
RocksDBColumn db) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called getRocksIterator in a nonblocking thread");
}
assert !Schedulers.isInNonBlockingThread() : "Called getRocksIterator in a nonblocking thread";
ReleasableSlice sliceMin;
ReleasableSlice sliceMax;
if (range.hasMin()) {
@ -1775,6 +1759,6 @@ public class LLLocalDictionary implements LLDictionary {
seekTo = () -> {};
rocksIterator.seekToFirst();
}
return Tuples.of(rocksIterator, sliceMin, sliceMax, seekTo);
return new RocksIteratorTuple(rocksIterator, sliceMin, sliceMax, seekTo);
}
}

View File

@ -91,10 +91,10 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> extends
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range));
}
return LLLocalDictionary.getRocksIterator(db.getAllocator(), allowNettyDirect, readOptions, range, db);
return LLLocalDictionary.getRocksIterator(allowNettyDirect, readOptions, range, db);
}, (tuple, sink) -> {
try {
var rocksIterator = tuple.getT1();
var rocksIterator = tuple.iterator();
ObjectArrayList<T> values = new ObjectArrayList<>();
Buffer firstGroupKey = null;
try {
@ -155,13 +155,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> extends
sink.error(ex);
}
return tuple;
}, tuple -> {
var rocksIterator = tuple.getT1();
rocksIterator.close();
tuple.getT2().close();
tuple.getT3().close();
tuple.getT4().close();
});
}, RocksIteratorTuple::close);
}
public abstract T getEntry(@Nullable Send<Buffer> key, @Nullable Send<Buffer> value);

View File

@ -3,7 +3,6 @@ package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Drop;
import io.net5.buffer.api.Owned;
import io.net5.buffer.api.Send;
@ -12,10 +11,7 @@ import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import reactor.core.publisher.Flux;
@ -27,8 +23,8 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends
@Override
public void drop(LLLocalKeyPrefixReactiveRocksIterator obj) {
try {
if (obj.range != null) {
obj.range.close();
if (obj.rangeShared != null) {
obj.rangeShared.close();
}
} catch (Throwable ex) {
logger.error("Failed to close range", ex);
@ -57,7 +53,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends
private final RocksDBColumn db;
private final int prefixLength;
private LLRange range;
private LLRange rangeShared;
private final boolean allowNettyDirect;
private ReadOptions readOptions;
private final boolean canFillCache;
@ -72,7 +68,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends
try (range) {
this.db = db;
this.prefixLength = prefixLength;
this.range = range.receive();
this.rangeShared = range.receive();
this.allowNettyDirect = allowNettyDirect;
this.readOptions = readOptions;
this.canFillCache = canFillCache;
@ -81,85 +77,78 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends
public Flux<Send<Buffer>> flux() {
return Flux.using(
() -> range.copy().send(),
rangeSend -> Flux
.generate(() -> {
var readOptions = new ReadOptions(this.readOptions);
if (!range.hasMin() || !range.hasMax()) {
readOptions.setReadaheadSize(32 * 1024); // 32KiB
readOptions.setFillCache(canFillCache);
return Flux.generate(() -> {
var readOptions = new ReadOptions(this.readOptions);
if (!rangeShared.hasMin() || !rangeShared.hasMax()) {
readOptions.setReadaheadSize(32 * 1024); // 32KiB
readOptions.setFillCache(canFillCache);
}
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(rangeShared));
}
return LLLocalDictionary.getRocksIterator(allowNettyDirect, readOptions, rangeShared, db);
}, (tuple, sink) -> {
try {
var rocksIterator = tuple.iterator();
rocksIterator.status();
Buffer firstGroupKey = null;
try {
while (rocksIterator.isValid()) {
Buffer key;
if (allowNettyDirect) {
key = LLUtils.readDirectNioBuffer(db.getAllocator(), rocksIterator::key);
} else {
key = LLUtils.fromByteArray(db.getAllocator(), rocksIterator.key());
}
try (key) {
if (firstGroupKey == null) {
firstGroupKey = key.copy();
} else if (!LLUtils.equals(firstGroupKey,
firstGroupKey.readerOffset(),
key,
key.readerOffset(),
prefixLength
)) {
break;
}
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range));
}
return LLLocalDictionary.getRocksIterator(db.getAllocator(), allowNettyDirect, readOptions, range, db);
}, (tuple, sink) -> {
try {
var rocksIterator = tuple.getT1();
rocksIterator.status();
Buffer firstGroupKey = null;
try {
while (rocksIterator.isValid()) {
Buffer key;
if (allowNettyDirect) {
key = LLUtils.readDirectNioBuffer(db.getAllocator(), rocksIterator::key);
} else {
key = LLUtils.fromByteArray(db.getAllocator(), rocksIterator.key());
}
try (key) {
if (firstGroupKey == null) {
firstGroupKey = key.copy();
} else if (!LLUtils.equals(firstGroupKey, firstGroupKey.readerOffset(), key, key.readerOffset(),
prefixLength)) {
break;
}
rocksIterator.next();
rocksIterator.status();
}
}
rocksIterator.next();
rocksIterator.status();
}
}
if (firstGroupKey != null) {
assert firstGroupKey.isAccessible();
var groupKeyPrefix = firstGroupKey.copy(firstGroupKey.readerOffset(), prefixLength);
assert groupKeyPrefix.isAccessible();
if (firstGroupKey != null) {
assert firstGroupKey.isAccessible();
var groupKeyPrefix = firstGroupKey.copy(firstGroupKey.readerOffset(), prefixLength);
assert groupKeyPrefix.isAccessible();
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Range {} is reading prefix {}",
LLUtils.toStringSafe(range),
LLUtils.toStringSafe(groupKeyPrefix)
);
}
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Range {} is reading prefix {}",
LLUtils.toStringSafe(rangeShared),
LLUtils.toStringSafe(groupKeyPrefix)
);
}
sink.next(groupKeyPrefix.send());
} else {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} ended", LLUtils.toStringSafe(range));
}
sink.complete();
}
} finally {
if (firstGroupKey != null) {
firstGroupKey.close();
}
}
} catch (RocksDBException ex) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} failed", LLUtils.toStringSafe(range));
}
sink.error(ex);
}
return tuple;
}, tuple -> {
var rocksIterator = tuple.getT1();
rocksIterator.close();
tuple.getT2().close();
tuple.getT3().close();
tuple.getT4().close();
}),
resource -> resource.close()
);
sink.next(groupKeyPrefix.send());
} else {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} ended", LLUtils.toStringSafe(rangeShared));
}
sink.complete();
}
} finally {
if (firstGroupKey != null) {
firstGroupKey.close();
}
}
} catch (RocksDBException ex) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} failed", LLUtils.toStringSafe(rangeShared));
}
sink.error(ex);
}
return tuple;
}, RocksIteratorTuple::close);
}
@Override
@ -169,7 +158,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends
@Override
protected Owned<LLLocalKeyPrefixReactiveRocksIterator> prepareSend() {
var range = this.range.send();
var range = this.rangeShared.send();
var readOptions = new ReadOptions(this.readOptions);
return drop -> new LLLocalKeyPrefixReactiveRocksIterator(db,
prefixLength,
@ -181,7 +170,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends
}
protected void makeInaccessible() {
this.range = null;
this.rangeShared = null;
this.readOptions = null;
}
}

View File

@ -25,8 +25,8 @@ public abstract class LLLocalReactiveRocksIterator<T> extends
@Override
public void drop(LLLocalReactiveRocksIterator<?> obj) {
try {
if (obj.range != null) {
obj.range.close();
if (obj.rangeShared != null) {
obj.rangeShared.close();
}
} catch (Throwable ex) {
logger.error("Failed to close range", ex);
@ -54,7 +54,7 @@ public abstract class LLLocalReactiveRocksIterator<T> extends
};
private final RocksDBColumn db;
private LLRange range;
private LLRange rangeShared;
private final boolean allowNettyDirect;
private ReadOptions readOptions;
private final boolean readValues;
@ -66,27 +66,29 @@ public abstract class LLLocalReactiveRocksIterator<T> extends
ReadOptions readOptions,
boolean readValues) {
super((Drop<LLLocalReactiveRocksIterator<T>>) (Drop) DROP);
this.db = db;
this.range = range.receive();
this.allowNettyDirect = allowNettyDirect;
this.readOptions = readOptions;
this.readValues = readValues;
try (range) {
this.db = db;
this.rangeShared = range.receive();
this.allowNettyDirect = allowNettyDirect;
this.readOptions = readOptions;
this.readValues = readValues;
}
}
public final Flux<T> flux() {
return Flux.generate(() -> {
var readOptions = new ReadOptions(this.readOptions);
if (!range.hasMin() || !range.hasMax()) {
if (!rangeShared.hasMin() || !rangeShared.hasMax()) {
readOptions.setReadaheadSize(32 * 1024); // 32KiB
readOptions.setFillCache(false);
}
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range));
logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(rangeShared));
}
return getRocksIterator(db.getAllocator(), allowNettyDirect, readOptions, range, db);
return getRocksIterator(allowNettyDirect, readOptions, rangeShared, db);
}, (tuple, sink) -> {
try {
var rocksIterator = tuple.getT1();
var rocksIterator = tuple.iterator();
rocksIterator.status();
if (rocksIterator.isValid()) {
Buffer key;
@ -110,7 +112,7 @@ public abstract class LLLocalReactiveRocksIterator<T> extends
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Range {} is reading {}: {}",
LLUtils.toStringSafe(range),
LLUtils.toStringSafe(rangeShared),
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(value)
);
@ -128,24 +130,18 @@ public abstract class LLLocalReactiveRocksIterator<T> extends
}
} else {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} ended", LLUtils.toStringSafe(range));
logger.trace(MARKER_ROCKSDB, "Range {} ended", LLUtils.toStringSafe(rangeShared));
}
sink.complete();
}
} catch (RocksDBException ex) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} failed", LLUtils.toStringSafe(range));
logger.trace(MARKER_ROCKSDB, "Range {} failed", LLUtils.toStringSafe(rangeShared));
}
sink.error(ex);
}
return tuple;
}, tuple -> {
var rocksIterator = tuple.getT1();
rocksIterator.close();
tuple.getT2().close();
tuple.getT3().close();
tuple.getT4().close();
});
}, RocksIteratorTuple::close);
}
public abstract T getEntry(@Nullable Send<Buffer> key, @Nullable Send<Buffer> value);
@ -157,7 +153,7 @@ public abstract class LLLocalReactiveRocksIterator<T> extends
@Override
protected Owned<LLLocalReactiveRocksIterator<T>> prepareSend() {
var range = this.range.send();
var range = this.rangeShared.send();
var readOptions = new ReadOptions(this.readOptions);
return drop -> new LLLocalReactiveRocksIterator<>(db, range, allowNettyDirect, readOptions, readValues) {
@Override
@ -168,7 +164,7 @@ public abstract class LLLocalReactiveRocksIterator<T> extends
}
protected void makeInaccessible() {
this.range = null;
this.rangeShared = null;
this.readOptions = null;
}
}

View File

@ -10,10 +10,4 @@ public interface ReleasableSlice extends SafeCloseable {
default void close() {
}
AbstractSlice<?> slice();
Buffer byteBuf();
Object additionalData();
}

View File

@ -0,0 +1,18 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.SafeCloseable;
import org.jetbrains.annotations.NotNull;
import org.rocksdb.RocksIterator;
public record RocksIteratorTuple(@NotNull RocksIterator iterator, @NotNull ReleasableSlice sliceMin,
@NotNull ReleasableSlice sliceMax, @NotNull SafeCloseable seekTo) implements
SafeCloseable {
@Override
public void close() {
iterator.close();
sliceMin.close();
sliceMax.close();
seekTo.close();
}
}

View File

@ -279,19 +279,16 @@ public class LLMemoryDictionary implements LLDictionary {
}
@Override
public Flux<Send<LLEntry>> putMulti(Flux<Send<LLEntry>> entries, boolean getOldValues) {
return entries.handle((entryToReceive, sink) -> {
public Mono<Void> putMulti(Flux<Send<LLEntry>> entries) {
return entries.doOnNext(entryToReceive -> {
try (var entry = entryToReceive.receive()) {
try (var key = entry.getKey().receive()) {
try (var val = entry.getValue().receive()) {
var oldValue = mainDb.put(k(key.copy().send()), k(val.send()));
if (oldValue != null && getOldValues) {
sink.next(LLEntry.of(key.send(), kk(oldValue)).send());
}
mainDb.put(k(key.copy().send()), k(val.send()));
}
}
}
});
}).then();
}
@Override

View File

@ -3,7 +3,8 @@ package it.cavallium.dbengine;
import static it.cavallium.dbengine.DbTestUtils.destroyAllocator;
import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks;
import static it.cavallium.dbengine.DbTestUtils.newAllocator;
import static it.cavallium.dbengine.SyncUtils.*;
import static it.cavallium.dbengine.SyncUtils.run;
import static it.cavallium.dbengine.SyncUtils.runVoid;
import static org.junit.jupiter.api.Assertions.assertEquals;
import io.net5.buffer.api.Buffer;
@ -30,9 +31,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public abstract class TestLLDictionary {
@ -211,17 +210,17 @@ public abstract class TestLLDictionary {
var beforeSize = run(dict.sizeRange(null, RANGE_ALL, false));
long afterSize;
runVoid(updateMode == UpdateMode.DISALLOW,
dict.update(keyEx, old -> fromString("test-value"), updateReturnMode, true).then()
dict.update(keyEx, old -> fromString("test-value"), updateReturnMode, true).doOnNext(Send::close).then()
);
afterSize = run(dict.sizeRange(null, RANGE_ALL, false));
assertEquals(0, afterSize - beforeSize);
runVoid(updateMode == UpdateMode.DISALLOW,
dict.update(keyEx, old -> fromString("test-value"), updateReturnMode, false).then()
dict.update(keyEx, old -> fromString("test-value"), updateReturnMode, false).doOnNext(Send::close).then()
);
afterSize = run(dict.sizeRange(null, RANGE_ALL, false));
assertEquals(0, afterSize - beforeSize);
runVoid(updateMode == UpdateMode.DISALLOW,
dict.update(keyEx, old -> fromString("test-value"), updateReturnMode).then()
dict.update(keyEx, old -> fromString("test-value"), updateReturnMode).doOnNext(Send::close).then()
);
afterSize = run(dict.sizeRange(null, RANGE_ALL, false));
assertEquals(0, afterSize - beforeSize);
@ -236,17 +235,17 @@ public abstract class TestLLDictionary {
var beforeSize = run(dict.sizeRange(null, RANGE_ALL, false));
long afterSize;
runVoid(updateMode == UpdateMode.DISALLOW,
dict.update(keyNonEx, old -> fromString("test-value"), updateReturnMode, true).then()
dict.update(keyNonEx, old -> fromString("test-value"), updateReturnMode, true).doOnNext(Send::close).then()
);
afterSize = run(dict.sizeRange(null, RANGE_ALL, false));
assertEquals(expected, afterSize - beforeSize);
runVoid(updateMode == UpdateMode.DISALLOW,
dict.update(keyNonEx, old -> fromString("test-value"), updateReturnMode, false).then()
dict.update(keyNonEx, old -> fromString("test-value"), updateReturnMode, false).doOnNext(Send::close).then()
);
afterSize = run(dict.sizeRange(null, RANGE_ALL, false));
assertEquals(expected, afterSize - beforeSize);
runVoid(updateMode == UpdateMode.DISALLOW,
dict.update(keyNonEx, old -> fromString("test-value"), updateReturnMode).then()
dict.update(keyNonEx, old -> fromString("test-value"), updateReturnMode).doOnNext(Send::close).then()
);
afterSize = run(dict.sizeRange(null, RANGE_ALL, false));
assertEquals(expected, afterSize - beforeSize);