From cd26cf61b755f28232dd2e66b3f06d52d52dc5db Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 1 Apr 2022 01:30:56 +0200 Subject: [PATCH] More update statistics, avoid send in updates --- .../dbengine/database/LLDictionary.java | 6 +- .../dbengine/database/LLSingleton.java | 5 +- .../database/collections/DatabaseLong.java | 12 +- .../collections/DatabaseMapDictionary.java | 27 +- .../collections/DatabaseMapSingle.java | 54 ++-- .../collections/DatabaseSingleton.java | 10 +- .../database/disk/AbstractRocksDBColumn.java | 108 +++++++- .../disk/BinarySerializationFunction.java | 7 + .../database/disk/LLLocalDictionary.java | 15 +- .../database/disk/LLLocalSingleton.java | 60 +++-- .../disk/OptimisticRocksDBColumn.java | 254 +++++++++--------- .../disk/PessimisticRocksDBColumn.java | 189 ++++++------- .../dbengine/database/disk/RocksDBColumn.java | 2 +- .../database/disk/StandardRocksDBColumn.java | 165 ++++++------ .../database/disk/UpdateAtomicResultMode.java | 3 +- .../database/memory/LLMemoryDictionary.java | 6 +- .../database/memory/LLMemorySingleton.java | 5 +- .../database/remote/LLQuicConnection.java | 10 +- .../dbengine/TestLLDictionaryLeaks.java | 4 +- 19 files changed, 508 insertions(+), 434 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/database/disk/BinarySerializationFunction.java diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java index 02bfdbc..d58f186 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java @@ -4,6 +4,7 @@ import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.BufferAllocator; import io.netty5.buffer.api.Send; import it.cavallium.dbengine.client.BadBlock; +import it.cavallium.dbengine.database.disk.BinarySerializationFunction; import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationFunction; import java.util.List; @@ -27,15 +28,14 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure { Mono getUpdateMode(); default Mono> update(Mono> key, - SerializationFunction<@Nullable Send, @Nullable Buffer> updater, + BinarySerializationFunction updater, UpdateReturnMode updateReturnMode) { return this .updateAndGetDelta(key, updater) .transform(prev -> LLUtils.resolveLLDelta(prev, updateReturnMode)); } - Mono> updateAndGetDelta(Mono> key, - SerializationFunction<@Nullable Send, @Nullable Buffer> updater); + Mono> updateAndGetDelta(Mono> key, BinarySerializationFunction updater); Mono clear(); diff --git a/src/main/java/it/cavallium/dbengine/database/LLSingleton.java b/src/main/java/it/cavallium/dbengine/database/LLSingleton.java index 2c4822a..781d53a 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/LLSingleton.java @@ -3,6 +3,7 @@ package it.cavallium.dbengine.database; import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.BufferAllocator; import io.netty5.buffer.api.Send; +import it.cavallium.dbengine.database.disk.BinarySerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.unimi.dsi.fastutil.bytes.ByteList; import java.util.function.Function; @@ -18,14 +19,14 @@ public interface LLSingleton extends LLKeyValueDatabaseStructure { Mono set(Mono> value); - default Mono> update(SerializationFunction<@Nullable Send, @Nullable Buffer> updater, + default Mono> update(BinarySerializationFunction updater, UpdateReturnMode updateReturnMode) { return this .updateAndGetDelta(updater) .transform(prev -> LLUtils.resolveLLDelta(prev, updateReturnMode)); } - Mono> updateAndGetDelta(SerializationFunction<@Nullable Send, @Nullable Buffer> updater); + Mono> updateAndGetDelta(BinarySerializationFunction updater); String getColumnName(); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseLong.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseLong.java index 248b45d..17fa828 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseLong.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseLong.java @@ -64,13 +64,11 @@ public class DatabaseLong implements LLKeyValueDatabaseStructure { private Mono addAnd(long count, UpdateReturnMode updateReturnMode) { return singleton.update(prev -> { if (prev != null) { - try (var prevBuf = prev.receive()) { - var prevLong = prevBuf.readLong(); - var alloc = singleton.getAllocator(); - var buf = alloc.allocate(Long.BYTES); - buf.writeLong(prevLong + count); - return buf; - } + var prevLong = prev.readLong(); + var alloc = singleton.getAllocator(); + var buf = alloc.allocate(Long.BYTES); + buf.writeLong(prevLong + count); + return buf; } else { var alloc = singleton.getAllocator(); var buf = alloc.allocate(Long.BYTES); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index e1ea6a0..faae9b8 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -12,6 +12,7 @@ import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; +import it.cavallium.dbengine.database.disk.BinarySerializationFunction; import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; @@ -324,23 +325,19 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep, @Nullable Buffer> getSerializedUpdater( + public BinarySerializationFunction getSerializedUpdater( SerializationFunction<@Nullable U, @Nullable U> updater) { return oldSerialized -> { - try (oldSerialized) { - U result; - if (oldSerialized == null) { - result = updater.apply(null); - } else { - try (var oldSerializedReceived = oldSerialized.receive()) { - result = updater.apply(valueSerializer.deserialize(oldSerializedReceived)); - } - } - if (result == null) { - return null; - } else { - return serializeValue(result); - } + U result; + if (oldSerialized == null) { + result = updater.apply(null); + } else { + result = updater.apply(valueSerializer.deserialize(oldSerialized)); + } + if (result == null) { + return null; + } else { + return serializeValue(result); } }; } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java index 61dfae4..54bf27a 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java @@ -130,22 +130,17 @@ public class DatabaseMapSingle extends ResourceSupport, Data UpdateReturnMode updateReturnMode) { return dictionary .update(keyMono, (oldValueSer) -> { - try (oldValueSer) { - U result; - if (oldValueSer == null) { - result = updater.apply(null); - } else { - U deserializedValue; - try (var valueBuf = oldValueSer.receive()) { - deserializedValue = serializer.deserialize(valueBuf); - } - result = updater.apply(deserializedValue); - } - if (result == null) { - return null; - } else { - return serializeValue(result).receive(); - } + U result; + if (oldValueSer == null) { + result = updater.apply(null); + } else { + U deserializedValue = serializer.deserialize(oldValueSer); + result = updater.apply(deserializedValue); + } + if (result == null) { + return null; + } else { + return serializeValue(result).receive(); } }, updateReturnMode) .handle(this::deserializeValue); @@ -155,22 +150,17 @@ public class DatabaseMapSingle extends ResourceSupport, Data public Mono> updateAndGetDelta(SerializationFunction<@Nullable U, @Nullable U> updater) { return dictionary .updateAndGetDelta(keyMono, (oldValueSer) -> { - try (oldValueSer) { - U result; - if (oldValueSer == null) { - result = updater.apply(null); - } else { - U deserializedValue; - try (var valueBuf = oldValueSer.receive()) { - deserializedValue = serializer.deserialize(valueBuf); - } - result = updater.apply(deserializedValue); - } - if (result == null) { - return null; - } else { - return serializeValue(result).receive(); - } + U result; + if (oldValueSer == null) { + result = updater.apply(null); + } else { + U deserializedValue = serializer.deserialize(oldValueSer); + result = updater.apply(deserializedValue); + } + if (result == null) { + return null; + } else { + return serializeValue(result).receive(); } }).transform(mono -> LLUtils.mapLLDelta(mono, serialized -> { try (var valueBuf = serialized.receive()) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java index c97a847..9e44f1c 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java @@ -129,10 +129,7 @@ public class DatabaseSingleton extends ResourceSupport, Data if (oldValueSer == null) { result = updater.apply(null); } else { - U deserializedValue; - try (var valueBuf = oldValueSer.receive()) { - deserializedValue = serializer.deserialize(valueBuf); - } + U deserializedValue = serializer.deserialize(oldValueSer); result = updater.apply(deserializedValue); } if (result == null) { @@ -154,10 +151,7 @@ public class DatabaseSingleton extends ResourceSupport, Data if (oldValueSer == null) { result = updater.apply(null); } else { - U deserializedValue; - try (var valueBuf = oldValueSer.receive()) { - deserializedValue = serializer.deserialize(valueBuf); - } + U deserializedValue = serializer.deserialize(oldValueSer); result = updater.apply(deserializedValue); } if (result == null) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java index b6e2087..e202c95 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java @@ -15,7 +15,6 @@ import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.BufferAllocator; import io.netty5.buffer.api.DefaultBufferAllocators; import io.netty5.buffer.api.ReadableComponent; -import io.netty5.buffer.api.Send; import io.netty5.buffer.api.WritableComponent; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLUtils; @@ -23,13 +22,14 @@ import it.cavallium.dbengine.database.RepeatedElementList; import it.cavallium.dbengine.database.SafeCloseable; import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSliceImplWithRelease; import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSliceImplWithoutRelease; -import it.cavallium.dbengine.database.serialization.SerializationFunction; +import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Objects; +import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; @@ -44,7 +44,6 @@ import org.rocksdb.KeyMayExist.KeyMayExistEnum; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; -import org.rocksdb.RocksIterator; import org.rocksdb.Slice; import org.rocksdb.Transaction; import org.rocksdb.WriteBatch; @@ -75,10 +74,11 @@ public sealed abstract class AbstractRocksDBColumn implements protected final DistributionSummary keyBufferSize; protected final DistributionSummary readValueNotFoundWithoutBloomBufferSize; protected final DistributionSummary readValueNotFoundWithBloomBufferSize; + protected final DistributionSummary readValueNotFoundWithMayExistBloomBufferSize; protected final DistributionSummary readValueFoundWithBloomUncachedBufferSize; protected final DistributionSummary readValueFoundWithBloomCacheBufferSize; protected final DistributionSummary readValueFoundWithBloomSimpleBufferSize; - protected final DistributionSummary readValueNotFoundWithMayExistBloomBufferSize; + protected final DistributionSummary readValueFoundWithoutBloomBufferSize; protected final DistributionSummary writeValueBufferSize; protected final DistributionSummary readAttempts; @@ -89,6 +89,13 @@ public sealed abstract class AbstractRocksDBColumn implements private final Counter endedIterNext; private final Timer iterNextTime; + private final Counter startedUpdate; + private final Counter endedUpdate; + private final Timer updateAddedTime; + private final Timer updateReplacedTime; + private final Timer updateRemovedTime; + private final Timer updateUnchangedTime; + public AbstractRocksDBColumn(T db, DatabaseOptions databaseOptions, BufferAllocator alloc, @@ -123,7 +130,7 @@ public sealed abstract class AbstractRocksDBColumn implements .baseUnit("bytes") .scale(1) .publishPercentileHistogram() - .tags("db.name", databaseName, "db.column", columnName, "buffer.type", "val.read", "found", "false") + .tags("db.name", databaseName, "db.column", columnName, "buffer.type", "val.read", "found", "false", "bloom", "no") .register(meterRegistry); this.readValueNotFoundWithBloomBufferSize = DistributionSummary .builder("buffer.size.distribution") @@ -133,6 +140,14 @@ public sealed abstract class AbstractRocksDBColumn implements .publishPercentileHistogram() .tags("db.name", databaseName, "db.column", columnName, "buffer.type", "val.read", "found", "false", "bloom", "hit.notfound") .register(meterRegistry); + this.readValueNotFoundWithMayExistBloomBufferSize = DistributionSummary + .builder("buffer.size.distribution") + .publishPercentiles(0.2, 0.5, 0.95) + .baseUnit("bytes") + .scale(1) + .publishPercentileHistogram() + .tags("db.name", databaseName, "db.column", columnName, "buffer.type", "val.read", "found", "false", "bloom", "hit.wrong") + .register(meterRegistry); this.readValueFoundWithBloomUncachedBufferSize = DistributionSummary .builder("buffer.size.distribution") .publishPercentiles(0.2, 0.5, 0.95) @@ -157,13 +172,13 @@ public sealed abstract class AbstractRocksDBColumn implements .publishPercentileHistogram() .tags("db.name", databaseName, "db.column", columnName, "buffer.type", "val.read", "found", "true", "bloom", "hit") .register(meterRegistry); - this.readValueNotFoundWithMayExistBloomBufferSize = DistributionSummary + this.readValueFoundWithoutBloomBufferSize = DistributionSummary .builder("buffer.size.distribution") .publishPercentiles(0.2, 0.5, 0.95) .baseUnit("bytes") .scale(1) .publishPercentileHistogram() - .tags("db.name", databaseName, "db.column", columnName, "buffer.type", "val.read", "found", "false", "bloom", "hit.wrong") + .tags("db.name", databaseName, "db.column", columnName, "buffer.type", "val.read", "found", "true", "bloom", "no") .register(meterRegistry); this.writeValueBufferSize = DistributionSummary .builder("buffer.size.distribution") @@ -198,6 +213,33 @@ public sealed abstract class AbstractRocksDBColumn implements .publishPercentileHistogram() .tags("db.name", databaseName, "db.column", columnName) .register(meterRegistry); + + this.startedUpdate = meterRegistry.counter("db.write.update.started.counter", "db.name", databaseName, "db.column", columnName); + this.endedUpdate = meterRegistry.counter("db.write.update.ended.counter", "db.name", databaseName, "db.column", columnName); + this.updateAddedTime = Timer + .builder("db.write.update.timer") + .publishPercentiles(0.2, 0.5, 0.95) + .publishPercentileHistogram() + .tags("db.name", databaseName, "db.column", columnName, "update.type", "added") + .register(meterRegistry); + this.updateReplacedTime = Timer + .builder("db.write.update.timer") + .publishPercentiles(0.2, 0.5, 0.95) + .publishPercentileHistogram() + .tags("db.name", databaseName, "db.column", columnName, "update.type", "replaced") + .register(meterRegistry); + this.updateRemovedTime = Timer + .builder("db.write.update.timer") + .publishPercentiles(0.2, 0.5, 0.95) + .publishPercentileHistogram() + .tags("db.name", databaseName, "db.column", columnName, "update.type", "removed") + .register(meterRegistry); + this.updateUnchangedTime = Timer + .builder("db.write.update.timer") + .publishPercentiles(0.2, 0.5, 0.95) + .publishPercentileHistogram() + .tags("db.name", databaseName, "db.column", columnName, "update.type", "unchanged") + .register(meterRegistry); } /** @@ -775,16 +817,44 @@ public sealed abstract class AbstractRocksDBColumn implements @Override public final @NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions, @NotNull WriteOptions writeOptions, - Send keySend, - SerializationFunction<@Nullable Send, @Nullable Buffer> updater, + Buffer key, + BinarySerializationFunction updater, UpdateAtomicResultMode returnMode) throws IOException { - return updateAtomicImpl(readOptions, writeOptions, keySend, updater, returnMode); + try { + keyBufferSize.record(key.readableBytes()); + startedUpdate.increment(); + return updateAtomicImpl(readOptions, writeOptions, key, updater, returnMode); + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new IOException(e); + } finally { + endedUpdate.increment(); + } + } + + protected final void recordAtomicUpdateTime(boolean changed, boolean prevSet, boolean newSet, long initTime) { + long duration = System.nanoTime() - initTime; + Timer timer; + if (changed) { + if (prevSet && newSet) { + timer = updateReplacedTime; + } else if (newSet) { + timer = updateAddedTime; + } else { + timer = updateRemovedTime; + } + } else { + timer = updateUnchangedTime; + } + + timer.record(duration, TimeUnit.NANOSECONDS); } protected abstract @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions, @NotNull WriteOptions writeOptions, - Send keySend, - SerializationFunction<@Nullable Send, @Nullable Buffer> updater, + Buffer key, + BinarySerializationFunction updater, UpdateAtomicResultMode returnMode) throws IOException; @Override @@ -810,6 +880,20 @@ public sealed abstract class AbstractRocksDBColumn implements ); } + protected final Buffer applyUpdateAndCloseIfNecessary(BinarySerializationFunction updater, + @Nullable Buffer prevDataToSendToUpdater) + throws SerializationException { + @Nullable Buffer newData = null; + try { + newData = updater.apply(prevDataToSendToUpdater); + } finally { + if (prevDataToSendToUpdater != newData && prevDataToSendToUpdater != null) { + prevDataToSendToUpdater.close(); + } + } + return newData; + } + @Override public ColumnFamilyHandle getColumnFamilyHandle() { return cfh; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/BinarySerializationFunction.java b/src/main/java/it/cavallium/dbengine/database/disk/BinarySerializationFunction.java new file mode 100644 index 0000000..938f7a8 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/BinarySerializationFunction.java @@ -0,0 +1,7 @@ +package it.cavallium.dbengine.database.disk; + +import io.netty5.buffer.api.Buffer; +import it.cavallium.dbengine.database.serialization.SerializationFunction; +import org.jetbrains.annotations.Nullable; + +public interface BinarySerializationFunction extends SerializationFunction<@Nullable Buffer, @Nullable Buffer> {} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 6cb6025..18bf6e1 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -5,6 +5,7 @@ import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; import static it.cavallium.dbengine.database.LLUtils.fromByteArray; import static it.cavallium.dbengine.database.LLUtils.isReadOnlyDirect; import static it.cavallium.dbengine.database.LLUtils.toStringSafe; +import static it.cavallium.dbengine.database.disk.UpdateAtomicResultMode.DELTA; import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNullElse; @@ -426,12 +427,12 @@ public class LLLocalDictionary implements LLDictionary { @SuppressWarnings("DuplicatedCode") @Override public Mono> update(Mono> keyMono, - SerializationFunction<@Nullable Send, @Nullable Buffer> updater, + BinarySerializationFunction updater, UpdateReturnMode updateReturnMode) { return keyMono .publishOn(dbScheduler) .handle((keySend, sink) -> { - try (keySend) { + try (var key = keySend.receive()) { assert !Schedulers.isInNonBlockingThread() : "Called update in a nonblocking thread"; if (updateMode == UpdateMode.DISALLOW) { sink.error(new UnsupportedOperationException("update() is disallowed")); @@ -446,7 +447,7 @@ public class LLLocalDictionary implements LLDictionary { startedUpdates.increment(); try { result = updateTime.recordCallable(() -> db.updateAtomic(EMPTY_READ_OPTIONS, - EMPTY_WRITE_OPTIONS, keySend, updater, returnMode)); + EMPTY_WRITE_OPTIONS, key, updater, returnMode)); } finally { endedUpdates.increment(); } @@ -470,11 +471,11 @@ public class LLLocalDictionary implements LLDictionary { @SuppressWarnings("DuplicatedCode") @Override public Mono> updateAndGetDelta(Mono> keyMono, - SerializationFunction<@Nullable Send, @Nullable Buffer> updater) { + BinarySerializationFunction updater) { return keyMono .publishOn(dbScheduler) .handle((keySend, sink) -> { - try (keySend) { + try (var key = keySend.receive()) { assert !Schedulers.isInNonBlockingThread() : "Called update in a nonblocking thread"; if (updateMode == UpdateMode.DISALLOW) { sink.error(new UnsupportedOperationException("update() is disallowed")); @@ -489,8 +490,8 @@ public class LLLocalDictionary implements LLDictionary { UpdateAtomicResult result; startedUpdates.increment(); try { - result = updateTime.recordCallable(() -> db.updateAtomic(EMPTY_READ_OPTIONS, - EMPTY_WRITE_OPTIONS, keySend, updater, UpdateAtomicResultMode.DELTA)); + result = updateTime.recordCallable(() -> + db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, key, updater, DELTA)); } finally { endedUpdates.increment(); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java index 06593bb..5572464 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java @@ -1,5 +1,7 @@ package it.cavallium.dbengine.database.disk; +import static it.cavallium.dbengine.database.disk.UpdateAtomicResultMode.DELTA; + import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.BufferAllocator; import io.netty5.buffer.api.Send; @@ -122,39 +124,43 @@ public class LLLocalSingleton implements LLSingleton { } @Override - public Mono> update(SerializationFunction<@Nullable Send, @Nullable Buffer> updater, + public Mono> update(BinarySerializationFunction updater, UpdateReturnMode updateReturnMode) { return Mono.usingWhen(nameMono, keySend -> runOnDb(() -> { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called update in a nonblocking thread"); - } - UpdateAtomicResultMode returnMode = switch (updateReturnMode) { - case NOTHING -> UpdateAtomicResultMode.NOTHING; - case GET_NEW_VALUE -> UpdateAtomicResultMode.CURRENT; - case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS; - }; - UpdateAtomicResult result - = db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, keySend, updater, returnMode); - return switch (updateReturnMode) { - case NOTHING -> null; - case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current(); - case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous(); - }; - }).onErrorMap(cause -> new IOException("Failed to read or write", cause)), - keySend -> Mono.fromRunnable(keySend::close)); + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called update in a nonblocking thread"); + } + UpdateAtomicResultMode returnMode = switch (updateReturnMode) { + case NOTHING -> UpdateAtomicResultMode.NOTHING; + case GET_NEW_VALUE -> UpdateAtomicResultMode.CURRENT; + case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS; + }; + UpdateAtomicResult result; + try (var key = keySend.receive()) { + result = db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, key, updater, returnMode); + } + return switch (updateReturnMode) { + case NOTHING -> null; + case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current(); + case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous(); + }; + }).onErrorMap(cause -> new IOException("Failed to read or write", cause)), + keySend -> Mono.fromRunnable(keySend::close)); } @Override - public Mono> updateAndGetDelta(SerializationFunction<@Nullable Send, @Nullable Buffer> updater) { + public Mono> updateAndGetDelta(BinarySerializationFunction updater) { return Mono.usingWhen(nameMono, keySend -> runOnDb(() -> { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called update in a nonblocking thread"); - } - UpdateAtomicResult result - = db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, keySend, updater, UpdateAtomicResultMode.DELTA); - return ((UpdateAtomicResultDelta) result).delta(); - }).onErrorMap(cause -> new IOException("Failed to read or write", cause)), - keySend -> Mono.fromRunnable(keySend::close)); + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called update in a nonblocking thread"); + } + UpdateAtomicResult result; + try (var key = keySend.receive()) { + result = db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, key, updater, DELTA); + } + return ((UpdateAtomicResultDelta) result).delta(); + }).onErrorMap(cause -> new IOException("Failed to read or write", cause)), + keySend -> Mono.fromRunnable(keySend::close)); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java index ec64754..ed47c3e 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java @@ -10,7 +10,7 @@ import io.netty5.buffer.api.MemoryManager; import io.netty5.buffer.api.Send; import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLUtils; -import it.cavallium.dbengine.database.serialization.SerializationFunction; +import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.lucene.ExponentialPageLimits; import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; import java.io.IOException; @@ -21,7 +21,6 @@ import org.jetbrains.annotations.Nullable; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.OptimisticTransactionDB; import org.rocksdb.ReadOptions; -import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.Status.Code; import org.rocksdb.Transaction; @@ -79,153 +78,150 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn keySend, - SerializationFunction<@Nullable Send, @Nullable Buffer> updater, + Buffer key, + BinarySerializationFunction updater, UpdateAtomicResultMode returnMode) throws IOException { - try (Buffer key = keySend.receive()) { - try { - var cfh = getCfh(); - var keyArray = LLUtils.toArray(key); - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called update in a nonblocking thread"); - } - try (var tx = beginTransaction(writeOptions)) { - boolean committedSuccessfully; - int retries = 0; - ExponentialPageLimits retryTime = null; - Send sentPrevData; - Send sentCurData; - boolean changed; - do { - var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true); - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, - "Reading {}: {} (before update)", - LLUtils.toStringSafe(key), - LLUtils.toStringSafe(prevDataArray) - ); - } - Buffer prevData; - if (prevDataArray != null) { - prevData = MemoryManager.unsafeWrap(prevDataArray); - prevDataArray = null; + long initNanoTime = System.nanoTime(); + try { + var cfh = getCfh(); + var keyArray = LLUtils.toArray(key); + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called update in a nonblocking thread"); + } + try (var tx = beginTransaction(writeOptions)) { + boolean committedSuccessfully; + int retries = 0; + ExponentialPageLimits retryTime = null; + Send sentPrevData; + Send sentCurData; + boolean changed; + do { + var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true); + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, + "Reading {}: {} (before update)", + LLUtils.toStringSafe(key), + LLUtils.toStringSafe(prevDataArray) + ); + } + Buffer prevData; + if (prevDataArray != null) { + prevData = MemoryManager.unsafeWrap(prevDataArray); + prevDataArray = null; + } else { + prevData = null; + } + try (prevData) { + Buffer prevDataToSendToUpdater; + if (prevData != null) { + prevDataToSendToUpdater = prevData.copy().makeReadOnly(); } else { - prevData = null; + prevDataToSendToUpdater = null; } - try (prevData) { - Buffer prevDataToSendToUpdater; - if (prevData != null) { - prevDataToSendToUpdater = prevData.copy(); - } else { - prevDataToSendToUpdater = null; - } - @Nullable Buffer newData; - try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) { - newData = updater.apply(sentData); + @Nullable Buffer newData = applyUpdateAndCloseIfNecessary(updater, prevDataToSendToUpdater); + try (newData) { + var newDataArray = newData == null ? null : LLUtils.toArray(newData); + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, + "Updating {}. previous data: {}, updated data: {}", + LLUtils.toStringSafe(key), + LLUtils.toStringSafe(prevDataArray), + LLUtils.toStringSafe(newDataArray) + ); } - try (newData) { - var newDataArray = newData == null ? null : LLUtils.toArray(newData); + if (prevData != null && newData == null) { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key)); + } + tx.delete(cfh, keyArray, true); + changed = true; + committedSuccessfully = commitOptimistically(tx); + } else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) { if (logger.isTraceEnabled()) { logger.trace(MARKER_ROCKSDB, - "Updating {}. previous data: {}, updated data: {}", + "Writing {}: {} (after update)", LLUtils.toStringSafe(key), - LLUtils.toStringSafe(prevDataArray), - LLUtils.toStringSafe(newDataArray) + LLUtils.toStringSafe(newData) ); } - if (prevData != null && newData == null) { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key)); - } - tx.delete(cfh, keyArray, true); - changed = true; - committedSuccessfully = commitOptimistically(tx); - } else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, - "Writing {}: {} (after update)", - LLUtils.toStringSafe(key), - LLUtils.toStringSafe(newData) - ); - } - tx.put(cfh, keyArray, newDataArray); - changed = true; - committedSuccessfully = commitOptimistically(tx); - } else { - changed = false; - committedSuccessfully = true; - tx.rollback(); + tx.put(cfh, keyArray, newDataArray); + changed = true; + committedSuccessfully = commitOptimistically(tx); + } else { + changed = false; + committedSuccessfully = true; + tx.rollback(); + } + sentPrevData = prevData == null ? null : prevData.send(); + sentCurData = newData == null ? null : newData.send(); + if (!committedSuccessfully) { + tx.undoGetForUpdate(cfh, keyArray); + tx.rollback(); + if (sentPrevData != null) { + sentPrevData.close(); } - sentPrevData = prevData == null ? null : prevData.send(); - sentCurData = newData == null ? null : newData.send(); - if (!committedSuccessfully) { - tx.undoGetForUpdate(cfh, keyArray); - tx.rollback(); - if (sentPrevData != null) { - sentPrevData.close(); - } - if (sentCurData != null) { - sentCurData.close(); - } - retries++; + if (sentCurData != null) { + sentCurData.close(); + } + retries++; - if (retries == 1) { - retryTime = new ExponentialPageLimits(0, 2, 2000); - } - long retryNs = 1000000L * retryTime.getPageLimit(retries); + if (retries == 1) { + retryTime = new ExponentialPageLimits(0, 2, 2000); + } + long retryNs = 1000000L * retryTime.getPageLimit(retries); - // +- 30% - retryNs = retryNs + ThreadLocalRandom.current().nextLong(-retryNs * 30L / 100L, retryNs * 30L / 100L); + // +- 30% + retryNs = retryNs + ThreadLocalRandom.current().nextLong(-retryNs * 30L / 100L, retryNs * 30L / 100L); - if (retries >= 5 && retries % 5 == 0 || ALWAYS_PRINT_OPTIMISTIC_RETRIES) { - logger.warn(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):" - + " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries); - } else if (logger.isDebugEnabled(MARKER_ROCKSDB)) { - logger.debug(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):" - + " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries); - } - // Wait for n milliseconds - if (retryNs > 0) { - LockSupport.parkNanos(retryNs); - } + if (retries >= 5 && retries % 5 == 0 || ALWAYS_PRINT_OPTIMISTIC_RETRIES) { + logger.warn(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):" + + " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries); + } else if (logger.isDebugEnabled(MARKER_ROCKSDB)) { + logger.debug(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):" + + " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries); + } + // Wait for n milliseconds + if (retryNs > 0) { + LockSupport.parkNanos(retryNs); } } } - } while (!committedSuccessfully); - if (retries > 5) { - logger.warn(MARKER_ROCKSDB, "Took {} retries to update key {}", retries, LLUtils.toStringSafe(key)); } - optimisticAttempts.record(retries); - return switch (returnMode) { - case NOTHING -> { - if (sentPrevData != null) { - sentPrevData.close(); - } - if (sentCurData != null) { - sentCurData.close(); - } - yield RESULT_NOTHING; - } - case CURRENT -> { - if (sentPrevData != null) { - sentPrevData.close(); - } - yield new UpdateAtomicResultCurrent(sentCurData); - } - case PREVIOUS -> { - if (sentCurData != null) { - sentCurData.close(); - } - yield new UpdateAtomicResultPrevious(sentPrevData); - } - case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed); - case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(sentPrevData, sentCurData).send()); - }; + } while (!committedSuccessfully); + if (retries > 5) { + logger.warn(MARKER_ROCKSDB, "Took {} retries to update key {}", retries, LLUtils.toStringSafe(key)); } - } catch (Throwable ex) { - throw new IOException("Failed to update key " + LLUtils.toStringSafe(key), ex); + recordAtomicUpdateTime(changed, sentPrevData != null, sentCurData != null, initNanoTime); + optimisticAttempts.record(retries); + return switch (returnMode) { + case NOTHING -> { + if (sentPrevData != null) { + sentPrevData.close(); + } + if (sentCurData != null) { + sentCurData.close(); + } + yield RESULT_NOTHING; + } + case CURRENT -> { + if (sentPrevData != null) { + sentPrevData.close(); + } + yield new UpdateAtomicResultCurrent(sentCurData); + } + case PREVIOUS -> { + if (sentCurData != null) { + sentCurData.close(); + } + yield new UpdateAtomicResultPrevious(sentPrevData); + } + case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed); + case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(sentPrevData, sentCurData).send()); + }; } + } catch (Throwable ex) { + throw new IOException("Failed to update key " + LLUtils.toStringSafe(key), ex); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java index 7c80861..2483dbf 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java @@ -49,118 +49,119 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn keySend, - SerializationFunction<@Nullable Send, @Nullable Buffer> updater, + Buffer key, + BinarySerializationFunction updater, UpdateAtomicResultMode returnMode) throws IOException { - try (Buffer key = keySend.receive()) { - try { - var cfh = getCfh(); - var keyArray = LLUtils.toArray(key); - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called update in a nonblocking thread"); + long initNanoTime = System.nanoTime(); + try { + var cfh = getCfh(); + var keyArray = LLUtils.toArray(key); + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called update in a nonblocking thread"); + } + try (var tx = beginTransaction(writeOptions)) { + Send sentPrevData; + Send sentCurData; + boolean changed; + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Reading {} (before update lock)", LLUtils.toStringSafe(key)); } - try (var tx = beginTransaction(writeOptions)) { - Send sentPrevData; - Send sentCurData; - boolean changed; + var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true); + try { if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Reading {} (before update lock)", LLUtils.toStringSafe(key)); + logger.trace(MARKER_ROCKSDB, + "Reading {}: {} (before update)", + LLUtils.toStringSafe(key), + LLUtils.toStringSafe(prevDataArray) + ); } - var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true); - try { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, - "Reading {}: {} (before update)", - LLUtils.toStringSafe(key), - LLUtils.toStringSafe(prevDataArray) - ); - } - Buffer prevData; - if (prevDataArray != null) { - prevData = MemoryManager.unsafeWrap(prevDataArray); + Buffer prevData; + if (prevDataArray != null) { + readValueFoundWithoutBloomBufferSize.record(prevDataArray.length); + prevData = MemoryManager.unsafeWrap(prevDataArray); + } else { + readValueNotFoundWithoutBloomBufferSize.record(0); + prevData = null; + } + try (prevData) { + Buffer prevDataToSendToUpdater; + if (prevData != null) { + prevDataToSendToUpdater = prevData.copy().makeReadOnly(); } else { - prevData = null; + prevDataToSendToUpdater = null; } - try (prevData) { - Buffer prevDataToSendToUpdater; - if (prevData != null) { - prevDataToSendToUpdater = prevData.copy(); - } else { - prevDataToSendToUpdater = null; - } - @Nullable Buffer newData; - try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) { - newData = updater.apply(sentData); + @Nullable Buffer newData = applyUpdateAndCloseIfNecessary(updater, prevDataToSendToUpdater); + try (newData) { + var newDataArray = newData == null ? null : LLUtils.toArray(newData); + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, + "Updating {}. previous data: {}, updated data: {}", + LLUtils.toStringSafe(key), + LLUtils.toStringSafe(prevDataArray), + LLUtils.toStringSafe(newDataArray) + ); } - try (newData) { - var newDataArray = newData == null ? null : LLUtils.toArray(newData); + if (prevData != null && newData == null) { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key)); + } + writeValueBufferSize.record(0); + tx.delete(cfh, keyArray, true); + changed = true; + tx.commit(); + } else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) { if (logger.isTraceEnabled()) { logger.trace(MARKER_ROCKSDB, - "Updating {}. previous data: {}, updated data: {}", + "Writing {}: {} (after update)", LLUtils.toStringSafe(key), - LLUtils.toStringSafe(prevDataArray), - LLUtils.toStringSafe(newDataArray) + LLUtils.toStringSafe(newData) ); } - if (prevData != null && newData == null) { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key)); - } - tx.delete(cfh, keyArray, true); - changed = true; - tx.commit(); - } else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, - "Writing {}: {} (after update)", - LLUtils.toStringSafe(key), - LLUtils.toStringSafe(newData) - ); - } - tx.put(cfh, keyArray, newDataArray); - changed = true; - tx.commit(); - } else { - changed = false; - tx.rollback(); - } - sentPrevData = prevData == null ? null : prevData.send(); - sentCurData = newData == null ? null : newData.send(); + writeValueBufferSize.record(newDataArray.length); + tx.put(cfh, keyArray, newDataArray); + changed = true; + tx.commit(); + } else { + changed = false; + tx.rollback(); } + sentPrevData = prevData == null ? null : prevData.send(); + sentCurData = newData == null ? null : newData.send(); } - } finally { - tx.undoGetForUpdate(cfh, keyArray); } - return switch (returnMode) { - case NOTHING -> { - if (sentPrevData != null) { - sentPrevData.close(); - } - if (sentCurData != null) { - sentCurData.close(); - } - yield RESULT_NOTHING; - } - case CURRENT -> { - if (sentPrevData != null) { - sentPrevData.close(); - } - yield new UpdateAtomicResultCurrent(sentCurData); - } - case PREVIOUS -> { - if (sentCurData != null) { - sentCurData.close(); - } - yield new UpdateAtomicResultPrevious(sentPrevData); - } - case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed); - case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(sentPrevData, sentCurData).send()); - }; + } finally { + tx.undoGetForUpdate(cfh, keyArray); } - } catch (Throwable ex) { - throw new IOException("Failed to update key " + LLUtils.toStringSafe(key), ex); + recordAtomicUpdateTime(changed, sentPrevData != null, sentCurData != null, initNanoTime); + return switch (returnMode) { + case NOTHING -> { + if (sentPrevData != null) { + sentPrevData.close(); + } + if (sentCurData != null) { + sentCurData.close(); + } + yield RESULT_NOTHING; + } + case CURRENT -> { + if (sentPrevData != null) { + sentPrevData.close(); + } + yield new UpdateAtomicResultCurrent(sentCurData); + } + case PREVIOUS -> { + if (sentCurData != null) { + sentCurData.close(); + } + yield new UpdateAtomicResultPrevious(sentPrevData); + } + case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed); + case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(sentPrevData, sentCurData).send()); + }; } + } catch (Throwable ex) { + throw new IOException("Failed to update key " + LLUtils.toStringSafe(key), ex); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java index 3deee31..bc45c88 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java @@ -71,7 +71,7 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn { @NotNull RocksDBIterator newIterator(@NotNull ReadOptions readOptions); @NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions, @NotNull WriteOptions writeOptions, - Send keySend, SerializationFunction<@Nullable Send, @Nullable Buffer> updater, + Buffer key, BinarySerializationFunction updater, UpdateAtomicResultMode returnMode) throws RocksDBException, IOException; void delete(WriteOptions writeOptions, Buffer key) throws RocksDBException; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/StandardRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/StandardRocksDBColumn.java index 2402026..7992f53 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/StandardRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/StandardRocksDBColumn.java @@ -43,106 +43,103 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn @Override public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions, @NotNull WriteOptions writeOptions, - Send keySend, - SerializationFunction<@Nullable Send, @Nullable Buffer> updater, + Buffer key, + BinarySerializationFunction updater, UpdateAtomicResultMode returnMode) throws IOException { - try (Buffer key = keySend.receive()) { - try { - @Nullable Buffer prevData = this.get(readOptions, key); - try (prevData) { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, - "Reading {}: {} (before update)", - LLUtils.toStringSafe(key), - LLUtils.toStringSafe(prevData) - ); - } - @Nullable Buffer newData; - try (Buffer prevDataToSendToUpdater = prevData == null ? null : prevData.copy()) { - try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) { - try (var newDataToReceive = updater.apply(sentData)) { - newData = newDataToReceive; - } - } - } + long initNanoTime = System.nanoTime(); + try { + @Nullable Buffer prevData = this.get(readOptions, key); + try (prevData) { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, + "Reading {}: {} (before update)", + LLUtils.toStringSafe(key), + LLUtils.toStringSafe(prevData) + ); + } + + Buffer prevDataToSendToUpdater; + if (prevData != null) { + prevDataToSendToUpdater = prevData.copy().makeReadOnly(); + } else { + prevDataToSendToUpdater = null; + } + + @Nullable Buffer newData = applyUpdateAndCloseIfNecessary(updater, prevDataToSendToUpdater); + try (newData) { boolean changed; assert newData == null || newData.isAccessible(); - try { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, + "Updating {}. previous data: {}, updated data: {}", + LLUtils.toStringSafe(key), + LLUtils.toStringSafe(prevData), + LLUtils.toStringSafe(newData) + ); + } + if (prevData != null && newData == null) { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key)); + } + this.delete(writeOptions, key); + changed = true; + } else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) { if (logger.isTraceEnabled()) { logger.trace(MARKER_ROCKSDB, - "Updating {}. previous data: {}, updated data: {}", + "Writing {}: {} (after update)", LLUtils.toStringSafe(key), - LLUtils.toStringSafe(prevData), LLUtils.toStringSafe(newData) ); } - if (prevData != null && newData == null) { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key)); - } - this.delete(writeOptions, key); - changed = true; - } else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, - "Writing {}: {} (after update)", - LLUtils.toStringSafe(key), - LLUtils.toStringSafe(newData) - ); - } - Buffer dataToPut; - if (returnMode == UpdateAtomicResultMode.CURRENT) { - dataToPut = newData.copy(); - } else { - dataToPut = newData; - } - try { - this.put(writeOptions, key, dataToPut); - changed = true; - } finally { - if (dataToPut != newData) { - dataToPut.close(); - } - } + Buffer dataToPut; + if (returnMode == UpdateAtomicResultMode.CURRENT) { + dataToPut = newData.copy(); } else { - changed = false; + dataToPut = newData; } - return switch (returnMode) { - case NOTHING -> { - if (prevData != null) { - prevData.close(); - } - if (newData != null) { - newData.close(); - } - yield RESULT_NOTHING; + try { + this.put(writeOptions, key, dataToPut); + changed = true; + } finally { + if (dataToPut != newData) { + dataToPut.close(); } - case CURRENT -> { - if (prevData != null) { - prevData.close(); - } - yield new UpdateAtomicResultCurrent(newData != null ? newData.send() : null); - } - case PREVIOUS -> { - if (newData != null) { - newData.close(); - } - yield new UpdateAtomicResultPrevious(prevData != null ? prevData.send() : null); - } - case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed); - case DELTA -> new UpdateAtomicResultDelta(LLDelta - .of(prevData != null ? prevData.send() : null, newData != null ? newData.send() : null) - .send()); - }; - } finally { - if (newData != null) { - newData.close(); } + } else { + changed = false; } + recordAtomicUpdateTime(changed, prevData != null, newData != null, initNanoTime); + return switch (returnMode) { + case NOTHING -> { + if (prevData != null) { + prevData.close(); + } + if (newData != null) { + newData.close(); + } + yield RESULT_NOTHING; + } + case CURRENT -> { + if (prevData != null) { + prevData.close(); + } + yield new UpdateAtomicResultCurrent(newData != null ? newData.send() : null); + } + case PREVIOUS -> { + if (newData != null) { + newData.close(); + } + yield new UpdateAtomicResultPrevious(prevData != null ? prevData.send() : null); + } + case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed); + case DELTA -> new UpdateAtomicResultDelta(LLDelta + .of(prevData != null ? prevData.send() : null, newData != null ? newData.send() : null) + .send()); + }; } - } catch (Throwable ex) { - throw new IOException("Failed to update key " + LLUtils.toStringSafe(key), ex); } + } catch (Throwable ex) { + throw new IOException("Failed to update key " + LLUtils.toStringSafe(key), ex); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResultMode.java b/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResultMode.java index a3167b4..4c34d65 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResultMode.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResultMode.java @@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.disk; public enum UpdateAtomicResultMode { NOTHING, PREVIOUS, - CURRENT, BINARY_CHANGED, + CURRENT, + BINARY_CHANGED, DELTA } diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java index f0b2ec4..399b49d 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java @@ -12,6 +12,7 @@ import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; +import it.cavallium.dbengine.database.disk.BinarySerializationFunction; import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; @@ -194,8 +195,7 @@ public class LLMemoryDictionary implements LLDictionary { } @Override - public Mono> updateAndGetDelta(Mono> keyMono, - SerializationFunction<@Nullable Send, @Nullable Buffer> updater) { + public Mono> updateAndGetDelta(Mono> keyMono, BinarySerializationFunction updater) { return Mono.usingWhen(keyMono, key -> Mono.fromCallable(() -> { try (key) { @@ -208,7 +208,7 @@ public class LLMemoryDictionary implements LLDictionary { oldRef.set(kk(old)); } Buffer v; - try (var oldToSend = old != null ? kk(old) : null) { + try (var oldToSend = old != null ? kk(old).receive() : null) { v = updater.apply(oldToSend); } catch (SerializationException e) { throw new IllegalStateException(e); diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java index 86a0aaa..0d161e0 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java @@ -8,6 +8,7 @@ import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.UpdateReturnMode; +import it.cavallium.dbengine.database.disk.BinarySerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationFunction; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; @@ -54,13 +55,13 @@ public class LLMemorySingleton implements LLSingleton { } @Override - public Mono> update(SerializationFunction<@Nullable Send, @Nullable Buffer> updater, + public Mono> update(BinarySerializationFunction updater, UpdateReturnMode updateReturnMode) { return dict.update(singletonNameBufMono, updater, updateReturnMode); } @Override - public Mono> updateAndGetDelta(SerializationFunction<@Nullable Send, @Nullable Buffer> updater) { + public Mono> updateAndGetDelta(BinarySerializationFunction updater) { return dict.updateAndGetDelta(singletonNameBufMono, updater); } diff --git a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java index d9d6f6d..9cb1d8f 100644 --- a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java @@ -23,6 +23,7 @@ import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.database.LLUpdateDocument; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; +import it.cavallium.dbengine.database.disk.BinarySerializationFunction; import it.cavallium.dbengine.database.remote.RPCCodecs.RPCEventCodec; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; @@ -259,13 +260,12 @@ public class LLQuicConnection implements LLDatabaseConnection { } @Override - public Mono> update(SerializationFunction<@Nullable Send, @Nullable Buffer> updater, - UpdateReturnMode updateReturnMode) { + public Mono> update(BinarySerializationFunction updater, UpdateReturnMode updateReturnMode) { return LLQuicConnection.this.sendUpdateRequest(new SingletonUpdateInit(singletonId, updateReturnMode), prev -> { byte[] oldData = toArrayNoCopy(prev); - Send oldDataBuf; + Buffer oldDataBuf; if (oldData != null) { - oldDataBuf = allocator.copyOf(oldData).send(); + oldDataBuf = allocator.copyOf(oldData); } else { oldDataBuf = null; } @@ -292,7 +292,7 @@ public class LLQuicConnection implements LLDatabaseConnection { } @Override - public Mono> updateAndGetDelta(SerializationFunction<@Nullable Send, @Nullable Buffer> updater) { + public Mono> updateAndGetDelta(BinarySerializationFunction updater) { return Mono.error(new UnsupportedOperationException()); } diff --git a/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java b/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java index 5f0299a..c7b7b77 100644 --- a/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java +++ b/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java @@ -169,8 +169,8 @@ public abstract class TestLLDictionaryLeaks { ); } - private Buffer pass(@Nullable Send old) { - return old == null ? null : old.receive(); + private Buffer pass(@Nullable Buffer old) { + return old; } @ParameterizedTest