More update statistics, avoid send in updates

This commit is contained in:
Andrea Cavalli 2022-04-01 01:30:56 +02:00
parent 7891b0b9e0
commit cd26cf61b7
19 changed files with 508 additions and 434 deletions

View File

@ -4,6 +4,7 @@ import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator;
import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.database.disk.BinarySerializationFunction;
import it.cavallium.dbengine.database.serialization.KVSerializationFunction;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import java.util.List;
@ -27,15 +28,14 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
Mono<UpdateMode> getUpdateMode();
default Mono<Send<Buffer>> update(Mono<Send<Buffer>> key,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
BinarySerializationFunction updater,
UpdateReturnMode updateReturnMode) {
return this
.updateAndGetDelta(key, updater)
.transform(prev -> LLUtils.resolveLLDelta(prev, updateReturnMode));
}
Mono<Send<LLDelta>> updateAndGetDelta(Mono<Send<Buffer>> key,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater);
Mono<Send<LLDelta>> updateAndGetDelta(Mono<Send<Buffer>> key, BinarySerializationFunction updater);
Mono<Void> clear();

View File

@ -3,6 +3,7 @@ package it.cavallium.dbengine.database;
import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator;
import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.disk.BinarySerializationFunction;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.unimi.dsi.fastutil.bytes.ByteList;
import java.util.function.Function;
@ -18,14 +19,14 @@ public interface LLSingleton extends LLKeyValueDatabaseStructure {
Mono<Void> set(Mono<Send<Buffer>> value);
default Mono<Send<Buffer>> update(SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
default Mono<Send<Buffer>> update(BinarySerializationFunction updater,
UpdateReturnMode updateReturnMode) {
return this
.updateAndGetDelta(updater)
.transform(prev -> LLUtils.resolveLLDelta(prev, updateReturnMode));
}
Mono<Send<LLDelta>> updateAndGetDelta(SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater);
Mono<Send<LLDelta>> updateAndGetDelta(BinarySerializationFunction updater);
String getColumnName();

View File

@ -64,13 +64,11 @@ public class DatabaseLong implements LLKeyValueDatabaseStructure {
private Mono<Long> addAnd(long count, UpdateReturnMode updateReturnMode) {
return singleton.update(prev -> {
if (prev != null) {
try (var prevBuf = prev.receive()) {
var prevLong = prevBuf.readLong();
var alloc = singleton.getAllocator();
var buf = alloc.allocate(Long.BYTES);
buf.writeLong(prevLong + count);
return buf;
}
var prevLong = prev.readLong();
var alloc = singleton.getAllocator();
var buf = alloc.allocate(Long.BYTES);
buf.writeLong(prevLong + count);
return buf;
} else {
var alloc = singleton.getAllocator();
var buf = alloc.allocate(Long.BYTES);

View File

@ -12,6 +12,7 @@ import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.disk.BinarySerializationFunction;
import it.cavallium.dbengine.database.serialization.KVSerializationFunction;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
@ -324,23 +325,19 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
}));
}
public SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> getSerializedUpdater(
public BinarySerializationFunction getSerializedUpdater(
SerializationFunction<@Nullable U, @Nullable U> updater) {
return oldSerialized -> {
try (oldSerialized) {
U result;
if (oldSerialized == null) {
result = updater.apply(null);
} else {
try (var oldSerializedReceived = oldSerialized.receive()) {
result = updater.apply(valueSerializer.deserialize(oldSerializedReceived));
}
}
if (result == null) {
return null;
} else {
return serializeValue(result);
}
U result;
if (oldSerialized == null) {
result = updater.apply(null);
} else {
result = updater.apply(valueSerializer.deserialize(oldSerialized));
}
if (result == null) {
return null;
} else {
return serializeValue(result);
}
};
}

View File

@ -130,22 +130,17 @@ public class DatabaseMapSingle<U> extends ResourceSupport<DatabaseStage<U>, Data
UpdateReturnMode updateReturnMode) {
return dictionary
.update(keyMono, (oldValueSer) -> {
try (oldValueSer) {
U result;
if (oldValueSer == null) {
result = updater.apply(null);
} else {
U deserializedValue;
try (var valueBuf = oldValueSer.receive()) {
deserializedValue = serializer.deserialize(valueBuf);
}
result = updater.apply(deserializedValue);
}
if (result == null) {
return null;
} else {
return serializeValue(result).receive();
}
U result;
if (oldValueSer == null) {
result = updater.apply(null);
} else {
U deserializedValue = serializer.deserialize(oldValueSer);
result = updater.apply(deserializedValue);
}
if (result == null) {
return null;
} else {
return serializeValue(result).receive();
}
}, updateReturnMode)
.handle(this::deserializeValue);
@ -155,22 +150,17 @@ public class DatabaseMapSingle<U> extends ResourceSupport<DatabaseStage<U>, Data
public Mono<Delta<U>> updateAndGetDelta(SerializationFunction<@Nullable U, @Nullable U> updater) {
return dictionary
.updateAndGetDelta(keyMono, (oldValueSer) -> {
try (oldValueSer) {
U result;
if (oldValueSer == null) {
result = updater.apply(null);
} else {
U deserializedValue;
try (var valueBuf = oldValueSer.receive()) {
deserializedValue = serializer.deserialize(valueBuf);
}
result = updater.apply(deserializedValue);
}
if (result == null) {
return null;
} else {
return serializeValue(result).receive();
}
U result;
if (oldValueSer == null) {
result = updater.apply(null);
} else {
U deserializedValue = serializer.deserialize(oldValueSer);
result = updater.apply(deserializedValue);
}
if (result == null) {
return null;
} else {
return serializeValue(result).receive();
}
}).transform(mono -> LLUtils.mapLLDelta(mono, serialized -> {
try (var valueBuf = serialized.receive()) {

View File

@ -129,10 +129,7 @@ public class DatabaseSingleton<U> extends ResourceSupport<DatabaseStage<U>, Data
if (oldValueSer == null) {
result = updater.apply(null);
} else {
U deserializedValue;
try (var valueBuf = oldValueSer.receive()) {
deserializedValue = serializer.deserialize(valueBuf);
}
U deserializedValue = serializer.deserialize(oldValueSer);
result = updater.apply(deserializedValue);
}
if (result == null) {
@ -154,10 +151,7 @@ public class DatabaseSingleton<U> extends ResourceSupport<DatabaseStage<U>, Data
if (oldValueSer == null) {
result = updater.apply(null);
} else {
U deserializedValue;
try (var valueBuf = oldValueSer.receive()) {
deserializedValue = serializer.deserialize(valueBuf);
}
U deserializedValue = serializer.deserialize(oldValueSer);
result = updater.apply(deserializedValue);
}
if (result == null) {

View File

@ -15,7 +15,6 @@ import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator;
import io.netty5.buffer.api.DefaultBufferAllocators;
import io.netty5.buffer.api.ReadableComponent;
import io.netty5.buffer.api.Send;
import io.netty5.buffer.api.WritableComponent;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
@ -23,13 +22,14 @@ import it.cavallium.dbengine.database.RepeatedElementList;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSliceImplWithRelease;
import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSliceImplWithoutRelease;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
@ -44,7 +44,6 @@ import org.rocksdb.KeyMayExist.KeyMayExistEnum;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Slice;
import org.rocksdb.Transaction;
import org.rocksdb.WriteBatch;
@ -75,10 +74,11 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
protected final DistributionSummary keyBufferSize;
protected final DistributionSummary readValueNotFoundWithoutBloomBufferSize;
protected final DistributionSummary readValueNotFoundWithBloomBufferSize;
protected final DistributionSummary readValueNotFoundWithMayExistBloomBufferSize;
protected final DistributionSummary readValueFoundWithBloomUncachedBufferSize;
protected final DistributionSummary readValueFoundWithBloomCacheBufferSize;
protected final DistributionSummary readValueFoundWithBloomSimpleBufferSize;
protected final DistributionSummary readValueNotFoundWithMayExistBloomBufferSize;
protected final DistributionSummary readValueFoundWithoutBloomBufferSize;
protected final DistributionSummary writeValueBufferSize;
protected final DistributionSummary readAttempts;
@ -89,6 +89,13 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
private final Counter endedIterNext;
private final Timer iterNextTime;
private final Counter startedUpdate;
private final Counter endedUpdate;
private final Timer updateAddedTime;
private final Timer updateReplacedTime;
private final Timer updateRemovedTime;
private final Timer updateUnchangedTime;
public AbstractRocksDBColumn(T db,
DatabaseOptions databaseOptions,
BufferAllocator alloc,
@ -123,7 +130,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
.baseUnit("bytes")
.scale(1)
.publishPercentileHistogram()
.tags("db.name", databaseName, "db.column", columnName, "buffer.type", "val.read", "found", "false")
.tags("db.name", databaseName, "db.column", columnName, "buffer.type", "val.read", "found", "false", "bloom", "no")
.register(meterRegistry);
this.readValueNotFoundWithBloomBufferSize = DistributionSummary
.builder("buffer.size.distribution")
@ -133,6 +140,14 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
.publishPercentileHistogram()
.tags("db.name", databaseName, "db.column", columnName, "buffer.type", "val.read", "found", "false", "bloom", "hit.notfound")
.register(meterRegistry);
this.readValueNotFoundWithMayExistBloomBufferSize = DistributionSummary
.builder("buffer.size.distribution")
.publishPercentiles(0.2, 0.5, 0.95)
.baseUnit("bytes")
.scale(1)
.publishPercentileHistogram()
.tags("db.name", databaseName, "db.column", columnName, "buffer.type", "val.read", "found", "false", "bloom", "hit.wrong")
.register(meterRegistry);
this.readValueFoundWithBloomUncachedBufferSize = DistributionSummary
.builder("buffer.size.distribution")
.publishPercentiles(0.2, 0.5, 0.95)
@ -157,13 +172,13 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
.publishPercentileHistogram()
.tags("db.name", databaseName, "db.column", columnName, "buffer.type", "val.read", "found", "true", "bloom", "hit")
.register(meterRegistry);
this.readValueNotFoundWithMayExistBloomBufferSize = DistributionSummary
this.readValueFoundWithoutBloomBufferSize = DistributionSummary
.builder("buffer.size.distribution")
.publishPercentiles(0.2, 0.5, 0.95)
.baseUnit("bytes")
.scale(1)
.publishPercentileHistogram()
.tags("db.name", databaseName, "db.column", columnName, "buffer.type", "val.read", "found", "false", "bloom", "hit.wrong")
.tags("db.name", databaseName, "db.column", columnName, "buffer.type", "val.read", "found", "true", "bloom", "no")
.register(meterRegistry);
this.writeValueBufferSize = DistributionSummary
.builder("buffer.size.distribution")
@ -198,6 +213,33 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
.publishPercentileHistogram()
.tags("db.name", databaseName, "db.column", columnName)
.register(meterRegistry);
this.startedUpdate = meterRegistry.counter("db.write.update.started.counter", "db.name", databaseName, "db.column", columnName);
this.endedUpdate = meterRegistry.counter("db.write.update.ended.counter", "db.name", databaseName, "db.column", columnName);
this.updateAddedTime = Timer
.builder("db.write.update.timer")
.publishPercentiles(0.2, 0.5, 0.95)
.publishPercentileHistogram()
.tags("db.name", databaseName, "db.column", columnName, "update.type", "added")
.register(meterRegistry);
this.updateReplacedTime = Timer
.builder("db.write.update.timer")
.publishPercentiles(0.2, 0.5, 0.95)
.publishPercentileHistogram()
.tags("db.name", databaseName, "db.column", columnName, "update.type", "replaced")
.register(meterRegistry);
this.updateRemovedTime = Timer
.builder("db.write.update.timer")
.publishPercentiles(0.2, 0.5, 0.95)
.publishPercentileHistogram()
.tags("db.name", databaseName, "db.column", columnName, "update.type", "removed")
.register(meterRegistry);
this.updateUnchangedTime = Timer
.builder("db.write.update.timer")
.publishPercentiles(0.2, 0.5, 0.95)
.publishPercentileHistogram()
.tags("db.name", databaseName, "db.column", columnName, "update.type", "unchanged")
.register(meterRegistry);
}
/**
@ -775,16 +817,44 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
@Override
public final @NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions,
@NotNull WriteOptions writeOptions,
Send<Buffer> keySend,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
Buffer key,
BinarySerializationFunction updater,
UpdateAtomicResultMode returnMode) throws IOException {
return updateAtomicImpl(readOptions, writeOptions, keySend, updater, returnMode);
try {
keyBufferSize.record(key.readableBytes());
startedUpdate.increment();
return updateAtomicImpl(readOptions, writeOptions, key, updater, returnMode);
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IOException(e);
} finally {
endedUpdate.increment();
}
}
protected final void recordAtomicUpdateTime(boolean changed, boolean prevSet, boolean newSet, long initTime) {
long duration = System.nanoTime() - initTime;
Timer timer;
if (changed) {
if (prevSet && newSet) {
timer = updateReplacedTime;
} else if (newSet) {
timer = updateAddedTime;
} else {
timer = updateRemovedTime;
}
} else {
timer = updateUnchangedTime;
}
timer.record(duration, TimeUnit.NANOSECONDS);
}
protected abstract @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions,
@NotNull WriteOptions writeOptions,
Send<Buffer> keySend,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
Buffer key,
BinarySerializationFunction updater,
UpdateAtomicResultMode returnMode) throws IOException;
@Override
@ -810,6 +880,20 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
);
}
protected final Buffer applyUpdateAndCloseIfNecessary(BinarySerializationFunction updater,
@Nullable Buffer prevDataToSendToUpdater)
throws SerializationException {
@Nullable Buffer newData = null;
try {
newData = updater.apply(prevDataToSendToUpdater);
} finally {
if (prevDataToSendToUpdater != newData && prevDataToSendToUpdater != null) {
prevDataToSendToUpdater.close();
}
}
return newData;
}
@Override
public ColumnFamilyHandle getColumnFamilyHandle() {
return cfh;

View File

@ -0,0 +1,7 @@
package it.cavallium.dbengine.database.disk;
import io.netty5.buffer.api.Buffer;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import org.jetbrains.annotations.Nullable;
public interface BinarySerializationFunction extends SerializationFunction<@Nullable Buffer, @Nullable Buffer> {}

View File

@ -5,6 +5,7 @@ import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import static it.cavallium.dbengine.database.LLUtils.fromByteArray;
import static it.cavallium.dbengine.database.LLUtils.isReadOnlyDirect;
import static it.cavallium.dbengine.database.LLUtils.toStringSafe;
import static it.cavallium.dbengine.database.disk.UpdateAtomicResultMode.DELTA;
import static java.util.Objects.requireNonNull;
import static java.util.Objects.requireNonNullElse;
@ -426,12 +427,12 @@ public class LLLocalDictionary implements LLDictionary {
@SuppressWarnings("DuplicatedCode")
@Override
public Mono<Send<Buffer>> update(Mono<Send<Buffer>> keyMono,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
BinarySerializationFunction updater,
UpdateReturnMode updateReturnMode) {
return keyMono
.publishOn(dbScheduler)
.handle((keySend, sink) -> {
try (keySend) {
try (var key = keySend.receive()) {
assert !Schedulers.isInNonBlockingThread() : "Called update in a nonblocking thread";
if (updateMode == UpdateMode.DISALLOW) {
sink.error(new UnsupportedOperationException("update() is disallowed"));
@ -446,7 +447,7 @@ public class LLLocalDictionary implements LLDictionary {
startedUpdates.increment();
try {
result = updateTime.recordCallable(() -> db.updateAtomic(EMPTY_READ_OPTIONS,
EMPTY_WRITE_OPTIONS, keySend, updater, returnMode));
EMPTY_WRITE_OPTIONS, key, updater, returnMode));
} finally {
endedUpdates.increment();
}
@ -470,11 +471,11 @@ public class LLLocalDictionary implements LLDictionary {
@SuppressWarnings("DuplicatedCode")
@Override
public Mono<Send<LLDelta>> updateAndGetDelta(Mono<Send<Buffer>> keyMono,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater) {
BinarySerializationFunction updater) {
return keyMono
.publishOn(dbScheduler)
.handle((keySend, sink) -> {
try (keySend) {
try (var key = keySend.receive()) {
assert !Schedulers.isInNonBlockingThread() : "Called update in a nonblocking thread";
if (updateMode == UpdateMode.DISALLOW) {
sink.error(new UnsupportedOperationException("update() is disallowed"));
@ -489,8 +490,8 @@ public class LLLocalDictionary implements LLDictionary {
UpdateAtomicResult result;
startedUpdates.increment();
try {
result = updateTime.recordCallable(() -> db.updateAtomic(EMPTY_READ_OPTIONS,
EMPTY_WRITE_OPTIONS, keySend, updater, UpdateAtomicResultMode.DELTA));
result = updateTime.recordCallable(() ->
db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, key, updater, DELTA));
} finally {
endedUpdates.increment();
}

View File

@ -1,5 +1,7 @@
package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.database.disk.UpdateAtomicResultMode.DELTA;
import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator;
import io.netty5.buffer.api.Send;
@ -122,39 +124,43 @@ public class LLLocalSingleton implements LLSingleton {
}
@Override
public Mono<Send<Buffer>> update(SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
public Mono<Send<Buffer>> update(BinarySerializationFunction updater,
UpdateReturnMode updateReturnMode) {
return Mono.usingWhen(nameMono, keySend -> runOnDb(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
}
UpdateAtomicResultMode returnMode = switch (updateReturnMode) {
case NOTHING -> UpdateAtomicResultMode.NOTHING;
case GET_NEW_VALUE -> UpdateAtomicResultMode.CURRENT;
case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS;
};
UpdateAtomicResult result
= db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, keySend, updater, returnMode);
return switch (updateReturnMode) {
case NOTHING -> null;
case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current();
case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous();
};
}).onErrorMap(cause -> new IOException("Failed to read or write", cause)),
keySend -> Mono.fromRunnable(keySend::close));
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
}
UpdateAtomicResultMode returnMode = switch (updateReturnMode) {
case NOTHING -> UpdateAtomicResultMode.NOTHING;
case GET_NEW_VALUE -> UpdateAtomicResultMode.CURRENT;
case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS;
};
UpdateAtomicResult result;
try (var key = keySend.receive()) {
result = db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, key, updater, returnMode);
}
return switch (updateReturnMode) {
case NOTHING -> null;
case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current();
case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous();
};
}).onErrorMap(cause -> new IOException("Failed to read or write", cause)),
keySend -> Mono.fromRunnable(keySend::close));
}
@Override
public Mono<Send<LLDelta>> updateAndGetDelta(SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater) {
public Mono<Send<LLDelta>> updateAndGetDelta(BinarySerializationFunction updater) {
return Mono.usingWhen(nameMono, keySend -> runOnDb(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
}
UpdateAtomicResult result
= db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, keySend, updater, UpdateAtomicResultMode.DELTA);
return ((UpdateAtomicResultDelta) result).delta();
}).onErrorMap(cause -> new IOException("Failed to read or write", cause)),
keySend -> Mono.fromRunnable(keySend::close));
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
}
UpdateAtomicResult result;
try (var key = keySend.receive()) {
result = db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, key, updater, DELTA);
}
return ((UpdateAtomicResultDelta) result).delta();
}).onErrorMap(cause -> new IOException("Failed to read or write", cause)),
keySend -> Mono.fromRunnable(keySend::close));
}
@Override

View File

@ -10,7 +10,7 @@ import io.netty5.buffer.api.MemoryManager;
import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.lucene.ExponentialPageLimits;
import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
import java.io.IOException;
@ -21,7 +21,6 @@ import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Status.Code;
import org.rocksdb.Transaction;
@ -79,153 +78,150 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
@Override
public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions,
@NotNull WriteOptions writeOptions,
Send<Buffer> keySend,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
Buffer key,
BinarySerializationFunction updater,
UpdateAtomicResultMode returnMode) throws IOException {
try (Buffer key = keySend.receive()) {
try {
var cfh = getCfh();
var keyArray = LLUtils.toArray(key);
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
}
try (var tx = beginTransaction(writeOptions)) {
boolean committedSuccessfully;
int retries = 0;
ExponentialPageLimits retryTime = null;
Send<Buffer> sentPrevData;
Send<Buffer> sentCurData;
boolean changed;
do {
var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true);
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Reading {}: {} (before update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray)
);
}
Buffer prevData;
if (prevDataArray != null) {
prevData = MemoryManager.unsafeWrap(prevDataArray);
prevDataArray = null;
long initNanoTime = System.nanoTime();
try {
var cfh = getCfh();
var keyArray = LLUtils.toArray(key);
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
}
try (var tx = beginTransaction(writeOptions)) {
boolean committedSuccessfully;
int retries = 0;
ExponentialPageLimits retryTime = null;
Send<Buffer> sentPrevData;
Send<Buffer> sentCurData;
boolean changed;
do {
var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true);
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Reading {}: {} (before update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray)
);
}
Buffer prevData;
if (prevDataArray != null) {
prevData = MemoryManager.unsafeWrap(prevDataArray);
prevDataArray = null;
} else {
prevData = null;
}
try (prevData) {
Buffer prevDataToSendToUpdater;
if (prevData != null) {
prevDataToSendToUpdater = prevData.copy().makeReadOnly();
} else {
prevData = null;
prevDataToSendToUpdater = null;
}
try (prevData) {
Buffer prevDataToSendToUpdater;
if (prevData != null) {
prevDataToSendToUpdater = prevData.copy();
} else {
prevDataToSendToUpdater = null;
}
@Nullable Buffer newData;
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
newData = updater.apply(sentData);
@Nullable Buffer newData = applyUpdateAndCloseIfNecessary(updater, prevDataToSendToUpdater);
try (newData) {
var newDataArray = newData == null ? null : LLUtils.toArray(newData);
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Updating {}. previous data: {}, updated data: {}",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray),
LLUtils.toStringSafe(newDataArray)
);
}
try (newData) {
var newDataArray = newData == null ? null : LLUtils.toArray(newData);
if (prevData != null && newData == null) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
tx.delete(cfh, keyArray, true);
changed = true;
committedSuccessfully = commitOptimistically(tx);
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Updating {}. previous data: {}, updated data: {}",
"Writing {}: {} (after update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray),
LLUtils.toStringSafe(newDataArray)
LLUtils.toStringSafe(newData)
);
}
if (prevData != null && newData == null) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
tx.delete(cfh, keyArray, true);
changed = true;
committedSuccessfully = commitOptimistically(tx);
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Writing {}: {} (after update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(newData)
);
}
tx.put(cfh, keyArray, newDataArray);
changed = true;
committedSuccessfully = commitOptimistically(tx);
} else {
changed = false;
committedSuccessfully = true;
tx.rollback();
tx.put(cfh, keyArray, newDataArray);
changed = true;
committedSuccessfully = commitOptimistically(tx);
} else {
changed = false;
committedSuccessfully = true;
tx.rollback();
}
sentPrevData = prevData == null ? null : prevData.send();
sentCurData = newData == null ? null : newData.send();
if (!committedSuccessfully) {
tx.undoGetForUpdate(cfh, keyArray);
tx.rollback();
if (sentPrevData != null) {
sentPrevData.close();
}
sentPrevData = prevData == null ? null : prevData.send();
sentCurData = newData == null ? null : newData.send();
if (!committedSuccessfully) {
tx.undoGetForUpdate(cfh, keyArray);
tx.rollback();
if (sentPrevData != null) {
sentPrevData.close();
}
if (sentCurData != null) {
sentCurData.close();
}
retries++;
if (sentCurData != null) {
sentCurData.close();
}
retries++;
if (retries == 1) {
retryTime = new ExponentialPageLimits(0, 2, 2000);
}
long retryNs = 1000000L * retryTime.getPageLimit(retries);
if (retries == 1) {
retryTime = new ExponentialPageLimits(0, 2, 2000);
}
long retryNs = 1000000L * retryTime.getPageLimit(retries);
// +- 30%
retryNs = retryNs + ThreadLocalRandom.current().nextLong(-retryNs * 30L / 100L, retryNs * 30L / 100L);
// +- 30%
retryNs = retryNs + ThreadLocalRandom.current().nextLong(-retryNs * 30L / 100L, retryNs * 30L / 100L);
if (retries >= 5 && retries % 5 == 0 || ALWAYS_PRINT_OPTIMISTIC_RETRIES) {
logger.warn(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):"
+ " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries);
} else if (logger.isDebugEnabled(MARKER_ROCKSDB)) {
logger.debug(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):"
+ " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries);
}
// Wait for n milliseconds
if (retryNs > 0) {
LockSupport.parkNanos(retryNs);
}
if (retries >= 5 && retries % 5 == 0 || ALWAYS_PRINT_OPTIMISTIC_RETRIES) {
logger.warn(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):"
+ " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries);
} else if (logger.isDebugEnabled(MARKER_ROCKSDB)) {
logger.debug(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):"
+ " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries);
}
// Wait for n milliseconds
if (retryNs > 0) {
LockSupport.parkNanos(retryNs);
}
}
}
} while (!committedSuccessfully);
if (retries > 5) {
logger.warn(MARKER_ROCKSDB, "Took {} retries to update key {}", retries, LLUtils.toStringSafe(key));
}
optimisticAttempts.record(retries);
return switch (returnMode) {
case NOTHING -> {
if (sentPrevData != null) {
sentPrevData.close();
}
if (sentCurData != null) {
sentCurData.close();
}
yield RESULT_NOTHING;
}
case CURRENT -> {
if (sentPrevData != null) {
sentPrevData.close();
}
yield new UpdateAtomicResultCurrent(sentCurData);
}
case PREVIOUS -> {
if (sentCurData != null) {
sentCurData.close();
}
yield new UpdateAtomicResultPrevious(sentPrevData);
}
case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed);
case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(sentPrevData, sentCurData).send());
};
} while (!committedSuccessfully);
if (retries > 5) {
logger.warn(MARKER_ROCKSDB, "Took {} retries to update key {}", retries, LLUtils.toStringSafe(key));
}
} catch (Throwable ex) {
throw new IOException("Failed to update key " + LLUtils.toStringSafe(key), ex);
recordAtomicUpdateTime(changed, sentPrevData != null, sentCurData != null, initNanoTime);
optimisticAttempts.record(retries);
return switch (returnMode) {
case NOTHING -> {
if (sentPrevData != null) {
sentPrevData.close();
}
if (sentCurData != null) {
sentCurData.close();
}
yield RESULT_NOTHING;
}
case CURRENT -> {
if (sentPrevData != null) {
sentPrevData.close();
}
yield new UpdateAtomicResultCurrent(sentCurData);
}
case PREVIOUS -> {
if (sentCurData != null) {
sentCurData.close();
}
yield new UpdateAtomicResultPrevious(sentPrevData);
}
case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed);
case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(sentPrevData, sentCurData).send());
};
}
} catch (Throwable ex) {
throw new IOException("Failed to update key " + LLUtils.toStringSafe(key), ex);
}
}

View File

@ -49,118 +49,119 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<Transa
@Override
public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions,
@NotNull WriteOptions writeOptions,
Send<Buffer> keySend,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
Buffer key,
BinarySerializationFunction updater,
UpdateAtomicResultMode returnMode) throws IOException {
try (Buffer key = keySend.receive()) {
try {
var cfh = getCfh();
var keyArray = LLUtils.toArray(key);
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
long initNanoTime = System.nanoTime();
try {
var cfh = getCfh();
var keyArray = LLUtils.toArray(key);
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
}
try (var tx = beginTransaction(writeOptions)) {
Send<Buffer> sentPrevData;
Send<Buffer> sentCurData;
boolean changed;
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Reading {} (before update lock)", LLUtils.toStringSafe(key));
}
try (var tx = beginTransaction(writeOptions)) {
Send<Buffer> sentPrevData;
Send<Buffer> sentCurData;
boolean changed;
var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true);
try {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Reading {} (before update lock)", LLUtils.toStringSafe(key));
logger.trace(MARKER_ROCKSDB,
"Reading {}: {} (before update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray)
);
}
var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true);
try {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Reading {}: {} (before update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray)
);
}
Buffer prevData;
if (prevDataArray != null) {
prevData = MemoryManager.unsafeWrap(prevDataArray);
Buffer prevData;
if (prevDataArray != null) {
readValueFoundWithoutBloomBufferSize.record(prevDataArray.length);
prevData = MemoryManager.unsafeWrap(prevDataArray);
} else {
readValueNotFoundWithoutBloomBufferSize.record(0);
prevData = null;
}
try (prevData) {
Buffer prevDataToSendToUpdater;
if (prevData != null) {
prevDataToSendToUpdater = prevData.copy().makeReadOnly();
} else {
prevData = null;
prevDataToSendToUpdater = null;
}
try (prevData) {
Buffer prevDataToSendToUpdater;
if (prevData != null) {
prevDataToSendToUpdater = prevData.copy();
} else {
prevDataToSendToUpdater = null;
}
@Nullable Buffer newData;
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
newData = updater.apply(sentData);
@Nullable Buffer newData = applyUpdateAndCloseIfNecessary(updater, prevDataToSendToUpdater);
try (newData) {
var newDataArray = newData == null ? null : LLUtils.toArray(newData);
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Updating {}. previous data: {}, updated data: {}",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray),
LLUtils.toStringSafe(newDataArray)
);
}
try (newData) {
var newDataArray = newData == null ? null : LLUtils.toArray(newData);
if (prevData != null && newData == null) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
writeValueBufferSize.record(0);
tx.delete(cfh, keyArray, true);
changed = true;
tx.commit();
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Updating {}. previous data: {}, updated data: {}",
"Writing {}: {} (after update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray),
LLUtils.toStringSafe(newDataArray)
LLUtils.toStringSafe(newData)
);
}
if (prevData != null && newData == null) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
tx.delete(cfh, keyArray, true);
changed = true;
tx.commit();
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Writing {}: {} (after update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(newData)
);
}
tx.put(cfh, keyArray, newDataArray);
changed = true;
tx.commit();
} else {
changed = false;
tx.rollback();
}
sentPrevData = prevData == null ? null : prevData.send();
sentCurData = newData == null ? null : newData.send();
writeValueBufferSize.record(newDataArray.length);
tx.put(cfh, keyArray, newDataArray);
changed = true;
tx.commit();
} else {
changed = false;
tx.rollback();
}
sentPrevData = prevData == null ? null : prevData.send();
sentCurData = newData == null ? null : newData.send();
}
} finally {
tx.undoGetForUpdate(cfh, keyArray);
}
return switch (returnMode) {
case NOTHING -> {
if (sentPrevData != null) {
sentPrevData.close();
}
if (sentCurData != null) {
sentCurData.close();
}
yield RESULT_NOTHING;
}
case CURRENT -> {
if (sentPrevData != null) {
sentPrevData.close();
}
yield new UpdateAtomicResultCurrent(sentCurData);
}
case PREVIOUS -> {
if (sentCurData != null) {
sentCurData.close();
}
yield new UpdateAtomicResultPrevious(sentPrevData);
}
case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed);
case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(sentPrevData, sentCurData).send());
};
} finally {
tx.undoGetForUpdate(cfh, keyArray);
}
} catch (Throwable ex) {
throw new IOException("Failed to update key " + LLUtils.toStringSafe(key), ex);
recordAtomicUpdateTime(changed, sentPrevData != null, sentCurData != null, initNanoTime);
return switch (returnMode) {
case NOTHING -> {
if (sentPrevData != null) {
sentPrevData.close();
}
if (sentCurData != null) {
sentCurData.close();
}
yield RESULT_NOTHING;
}
case CURRENT -> {
if (sentPrevData != null) {
sentPrevData.close();
}
yield new UpdateAtomicResultCurrent(sentCurData);
}
case PREVIOUS -> {
if (sentCurData != null) {
sentCurData.close();
}
yield new UpdateAtomicResultPrevious(sentPrevData);
}
case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed);
case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(sentPrevData, sentCurData).send());
};
}
} catch (Throwable ex) {
throw new IOException("Failed to update key " + LLUtils.toStringSafe(key), ex);
}
}

View File

@ -71,7 +71,7 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
@NotNull RocksDBIterator newIterator(@NotNull ReadOptions readOptions);
@NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions, @NotNull WriteOptions writeOptions,
Send<Buffer> keySend, SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
Buffer key, BinarySerializationFunction updater,
UpdateAtomicResultMode returnMode) throws RocksDBException, IOException;
void delete(WriteOptions writeOptions, Buffer key) throws RocksDBException;

View File

@ -43,106 +43,103 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB>
@Override
public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions,
@NotNull WriteOptions writeOptions,
Send<Buffer> keySend,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
Buffer key,
BinarySerializationFunction updater,
UpdateAtomicResultMode returnMode) throws IOException {
try (Buffer key = keySend.receive()) {
try {
@Nullable Buffer prevData = this.get(readOptions, key);
try (prevData) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Reading {}: {} (before update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevData)
);
}
@Nullable Buffer newData;
try (Buffer prevDataToSendToUpdater = prevData == null ? null : prevData.copy()) {
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
try (var newDataToReceive = updater.apply(sentData)) {
newData = newDataToReceive;
}
}
}
long initNanoTime = System.nanoTime();
try {
@Nullable Buffer prevData = this.get(readOptions, key);
try (prevData) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Reading {}: {} (before update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevData)
);
}
Buffer prevDataToSendToUpdater;
if (prevData != null) {
prevDataToSendToUpdater = prevData.copy().makeReadOnly();
} else {
prevDataToSendToUpdater = null;
}
@Nullable Buffer newData = applyUpdateAndCloseIfNecessary(updater, prevDataToSendToUpdater);
try (newData) {
boolean changed;
assert newData == null || newData.isAccessible();
try {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Updating {}. previous data: {}, updated data: {}",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevData),
LLUtils.toStringSafe(newData)
);
}
if (prevData != null && newData == null) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
this.delete(writeOptions, key);
changed = true;
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Updating {}. previous data: {}, updated data: {}",
"Writing {}: {} (after update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevData),
LLUtils.toStringSafe(newData)
);
}
if (prevData != null && newData == null) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
this.delete(writeOptions, key);
changed = true;
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Writing {}: {} (after update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(newData)
);
}
Buffer dataToPut;
if (returnMode == UpdateAtomicResultMode.CURRENT) {
dataToPut = newData.copy();
} else {
dataToPut = newData;
}
try {
this.put(writeOptions, key, dataToPut);
changed = true;
} finally {
if (dataToPut != newData) {
dataToPut.close();
}
}
Buffer dataToPut;
if (returnMode == UpdateAtomicResultMode.CURRENT) {
dataToPut = newData.copy();
} else {
changed = false;
dataToPut = newData;
}
return switch (returnMode) {
case NOTHING -> {
if (prevData != null) {
prevData.close();
}
if (newData != null) {
newData.close();
}
yield RESULT_NOTHING;
try {
this.put(writeOptions, key, dataToPut);
changed = true;
} finally {
if (dataToPut != newData) {
dataToPut.close();
}
case CURRENT -> {
if (prevData != null) {
prevData.close();
}
yield new UpdateAtomicResultCurrent(newData != null ? newData.send() : null);
}
case PREVIOUS -> {
if (newData != null) {
newData.close();
}
yield new UpdateAtomicResultPrevious(prevData != null ? prevData.send() : null);
}
case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed);
case DELTA -> new UpdateAtomicResultDelta(LLDelta
.of(prevData != null ? prevData.send() : null, newData != null ? newData.send() : null)
.send());
};
} finally {
if (newData != null) {
newData.close();
}
} else {
changed = false;
}
recordAtomicUpdateTime(changed, prevData != null, newData != null, initNanoTime);
return switch (returnMode) {
case NOTHING -> {
if (prevData != null) {
prevData.close();
}
if (newData != null) {
newData.close();
}
yield RESULT_NOTHING;
}
case CURRENT -> {
if (prevData != null) {
prevData.close();
}
yield new UpdateAtomicResultCurrent(newData != null ? newData.send() : null);
}
case PREVIOUS -> {
if (newData != null) {
newData.close();
}
yield new UpdateAtomicResultPrevious(prevData != null ? prevData.send() : null);
}
case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed);
case DELTA -> new UpdateAtomicResultDelta(LLDelta
.of(prevData != null ? prevData.send() : null, newData != null ? newData.send() : null)
.send());
};
}
} catch (Throwable ex) {
throw new IOException("Failed to update key " + LLUtils.toStringSafe(key), ex);
}
} catch (Throwable ex) {
throw new IOException("Failed to update key " + LLUtils.toStringSafe(key), ex);
}
}

View File

@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.disk;
public enum UpdateAtomicResultMode {
NOTHING,
PREVIOUS,
CURRENT, BINARY_CHANGED,
CURRENT,
BINARY_CHANGED,
DELTA
}

View File

@ -12,6 +12,7 @@ import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.disk.BinarySerializationFunction;
import it.cavallium.dbengine.database.serialization.KVSerializationFunction;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
@ -194,8 +195,7 @@ public class LLMemoryDictionary implements LLDictionary {
}
@Override
public Mono<Send<LLDelta>> updateAndGetDelta(Mono<Send<Buffer>> keyMono,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater) {
public Mono<Send<LLDelta>> updateAndGetDelta(Mono<Send<Buffer>> keyMono, BinarySerializationFunction updater) {
return Mono.usingWhen(keyMono,
key -> Mono.fromCallable(() -> {
try (key) {
@ -208,7 +208,7 @@ public class LLMemoryDictionary implements LLDictionary {
oldRef.set(kk(old));
}
Buffer v;
try (var oldToSend = old != null ? kk(old) : null) {
try (var oldToSend = old != null ? kk(old).receive() : null) {
v = updater.apply(oldToSend);
} catch (SerializationException e) {
throw new IllegalStateException(e);

View File

@ -8,6 +8,7 @@ import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.disk.BinarySerializationFunction;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
@ -54,13 +55,13 @@ public class LLMemorySingleton implements LLSingleton {
}
@Override
public Mono<Send<Buffer>> update(SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
public Mono<Send<Buffer>> update(BinarySerializationFunction updater,
UpdateReturnMode updateReturnMode) {
return dict.update(singletonNameBufMono, updater, updateReturnMode);
}
@Override
public Mono<Send<LLDelta>> updateAndGetDelta(SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater) {
public Mono<Send<LLDelta>> updateAndGetDelta(BinarySerializationFunction updater) {
return dict.updateAndGetDelta(singletonNameBufMono, updater);
}

View File

@ -23,6 +23,7 @@ import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.database.LLUpdateDocument;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.disk.BinarySerializationFunction;
import it.cavallium.dbengine.database.remote.RPCCodecs.RPCEventCodec;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
@ -259,13 +260,12 @@ public class LLQuicConnection implements LLDatabaseConnection {
}
@Override
public Mono<Send<Buffer>> update(SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
UpdateReturnMode updateReturnMode) {
public Mono<Send<Buffer>> update(BinarySerializationFunction updater, UpdateReturnMode updateReturnMode) {
return LLQuicConnection.this.<BinaryOptional, SingletonUpdateOldData>sendUpdateRequest(new SingletonUpdateInit(singletonId, updateReturnMode), prev -> {
byte[] oldData = toArrayNoCopy(prev);
Send<Buffer> oldDataBuf;
Buffer oldDataBuf;
if (oldData != null) {
oldDataBuf = allocator.copyOf(oldData).send();
oldDataBuf = allocator.copyOf(oldData);
} else {
oldDataBuf = null;
}
@ -292,7 +292,7 @@ public class LLQuicConnection implements LLDatabaseConnection {
}
@Override
public Mono<Send<LLDelta>> updateAndGetDelta(SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater) {
public Mono<Send<LLDelta>> updateAndGetDelta(BinarySerializationFunction updater) {
return Mono.error(new UnsupportedOperationException());
}

View File

@ -169,8 +169,8 @@ public abstract class TestLLDictionaryLeaks {
);
}
private Buffer pass(@Nullable Send<Buffer> old) {
return old == null ? null : old.receive();
private Buffer pass(@Nullable Buffer old) {
return old;
}
@ParameterizedTest