From 18d5ddf6e17368398f2314d92d681546041dea19 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 20 May 2022 23:59:56 +0200 Subject: [PATCH] Remove some leaks --- pom.xml | 41 ++- .../cavallium/dbengine/database/LLUtils.java | 57 ++--- .../dbengine/database/SubStageEntry.java | 60 +++++ .../collections/DatabaseMapDictionary.java | 174 ++++++------- .../DatabaseMapDictionaryDeep.java | 5 +- .../DatabaseMapDictionaryHashed.java | 9 +- .../collections/DatabaseMapSingle.java | 6 +- .../collections/DatabaseSingleBucket.java | 20 +- .../collections/DatabaseSingleton.java | 6 +- .../collections/DatabaseStageMap.java | 3 +- .../database/disk/LLLocalDictionary.java | 73 +++--- .../disk/OptimisticRocksDBColumn.java | 240 +++++++++--------- .../disk/PessimisticRocksDBColumn.java | 177 ++++++------- 13 files changed, 461 insertions(+), 410 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/database/SubStageEntry.java diff --git a/pom.xml b/pom.xml index 5e02850..adac45e 100644 --- a/pom.xml +++ b/pom.xml @@ -90,13 +90,20 @@ io.projectreactor reactor-bom - 2020.0.18 + 2020.0.19 pom import + + io.projectreactor + reactor-tools + original + runtime + 3.4.18 + com.google.guava guava @@ -106,6 +113,16 @@ io.netty netty5-buffer 5.0.0.Alpha2 + + + io.netty + netty-common + + + io.netty + netty-buffer + + org.yaml @@ -418,11 +435,11 @@ 3.12.0 compile - - io.projectreactor - reactor-test - test - + + io.projectreactor + reactor-test + test + src/test/java @@ -600,6 +617,18 @@ + + net.bytebuddy + byte-buddy-maven-plugin + 1.12.10 + + + + reactor.tools.agent.ReactorDebugByteBuddyPlugin + + + + diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index ccdaf7d..46bb712 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -797,29 +797,21 @@ public class LLUtils { } public static Mono resolveLLDelta(Mono prev, UpdateReturnMode updateReturnMode) { - return prev.handle((delta, sink) -> { + return prev.mapNotNull(delta -> { final Buffer previous = delta.previousUnsafe(); final Buffer current = delta.currentUnsafe(); - switch (updateReturnMode) { + return switch (updateReturnMode) { case GET_NEW_VALUE -> { if (previous != null && previous.isAccessible()) { previous.close(); } - if (current != null) { - sink.next(current); - } else { - sink.complete(); - } + yield current; } case GET_OLD_VALUE -> { if (current != null && current.isAccessible()) { current.close(); } - if (previous != null) { - sink.next(previous); - } else { - sink.complete(); - } + yield previous; } case NOTHING -> { if (previous != null && previous.isAccessible()) { @@ -828,10 +820,9 @@ public class LLUtils { if (current != null && current.isAccessible()) { current.close(); } - sink.complete(); + yield null; } - default -> sink.error(new IllegalStateException()); - } + }; }); } @@ -862,27 +853,23 @@ public class LLUtils { public static Mono> mapLLDelta(Mono mono, SerializationFunction<@NotNull Buffer, @Nullable U> mapper) { - return mono.handle((delta, sink) -> { - try (delta) { - Buffer prev = delta.previousUnsafe(); - Buffer curr = delta.currentUnsafe(); - U newPrev; - U newCurr; - if (prev != null) { - newPrev = mapper.apply(prev); - } else { - newPrev = null; - } - if (curr != null) { - newCurr = mapper.apply(curr); - } else { - newCurr = null; - } - sink.next(new Delta<>(newPrev, newCurr)); - } catch (SerializationException ex) { - sink.error(ex); + return Mono.usingWhen(mono, delta -> Mono.fromCallable(() -> { + Buffer prev = delta.previousUnsafe(); + Buffer curr = delta.currentUnsafe(); + U newPrev; + U newCurr; + if (prev != null) { + newPrev = mapper.apply(prev); + } else { + newPrev = null; } - }); + if (curr != null) { + newCurr = mapper.apply(curr); + } else { + newCurr = null; + } + return new Delta<>(newPrev, newCurr); + }), delta -> Mono.fromRunnable(delta::close)); } public static boolean isDeltaChanged(Delta delta) { diff --git a/src/main/java/it/cavallium/dbengine/database/SubStageEntry.java b/src/main/java/it/cavallium/dbengine/database/SubStageEntry.java new file mode 100644 index 0000000..bd8839d --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/SubStageEntry.java @@ -0,0 +1,60 @@ +package it.cavallium.dbengine.database; + +import it.cavallium.dbengine.database.collections.DatabaseStage; +import java.util.Map.Entry; +import java.util.Objects; + +public final class SubStageEntry> implements SafeCloseable, Entry { + + private final T key; + private final U value; + + public SubStageEntry(T key, U value) { + this.key = key; + this.value = value; + } + + @Override + public void close() { + if (value != null && value.isAccessible()) { + value.close(); + } + } + + @Override + public T getKey() { + return key; + } + + @Override + public U getValue() { + return value; + } + + @Override + public U setValue(U value) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + //noinspection rawtypes + var that = (SubStageEntry) obj; + return Objects.equals(this.key, that.key) && Objects.equals(this.value, that.value); + } + + @Override + public int hashCode() { + return Objects.hash(key, value); + } + + @Override + public String toString() { + return "SubStageEntry[" + "key=" + key + ", " + "value=" + value + ']'; + } + +} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index fe6f594..1808cfc 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -4,7 +4,6 @@ import static java.util.Objects.requireNonNullElseGet; import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Resource; -import io.netty5.buffer.api.Send; import io.netty5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.BufSupplier; @@ -14,6 +13,7 @@ import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLEntry; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.SubStageEntry; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.disk.BinarySerializationFunction; @@ -126,32 +126,31 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep sink) { + private @Nullable U deserializeValue(T keySuffix, Buffer value) { try { - sink.next(valueSerializer.deserialize(value)); + return valueSerializer.deserialize(value); } catch (IndexOutOfBoundsException ex) { var exMessage = ex.getMessage(); if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) { var totalZeroBytesErrors = this.totalZeroBytesErrors.incrementAndGet(); if (totalZeroBytesErrors < 512 || totalZeroBytesErrors % 10000 == 0) { try (var keySuffixBytes = serializeKeySuffixToKey(keySuffix)) { - LOG.error("Unexpected zero-bytes value at " + dictionary.getDatabaseName() - + ":" + dictionary.getColumnName() - + ":" + LLUtils.toStringSafe(this.keyPrefix) - + ":" + keySuffix + "(" + LLUtils.toStringSafe(keySuffixBytes) + ") total=" + totalZeroBytesErrors); + LOG.error( + "Unexpected zero-bytes value at " + + dictionary.getDatabaseName() + ":" + dictionary.getColumnName() + + ":" + LLUtils.toStringSafe(this.keyPrefix) + ":" + keySuffix + + "(" + LLUtils.toStringSafe(keySuffixBytes) + ") total=" + totalZeroBytesErrors); } catch (SerializationException e) { - LOG.error("Unexpected zero-bytes value at " + dictionary.getDatabaseName() - + ":" + dictionary.getColumnName() - + ":" + LLUtils.toStringSafe(this.keyPrefix) - + ":" + keySuffix + "(?) total=" + totalZeroBytesErrors); + LOG.error( + "Unexpected zero-bytes value at " + dictionary.getDatabaseName() + ":" + dictionary.getColumnName() + + ":" + LLUtils.toStringSafe(this.keyPrefix) + ":" + keySuffix + "(?) total=" + + totalZeroBytesErrors); } } - sink.complete(); + return null; } else { - sink.error(ex); + throw ex; } - } catch (Throwable ex) { - sink.error(ex); } } @@ -278,13 +277,10 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep getValue(@Nullable CompositeSnapshot snapshot, T keySuffix, boolean existsAlmostCertainly) { - return dictionary - .get(resolveSnapshot(snapshot), Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix))) - .handle((valueToReceive, sink) -> { - try (valueToReceive) { - deserializeValue(keySuffix, valueToReceive, sink); - } - }); + return Mono.usingWhen(dictionary + .get(resolveSnapshot(snapshot), Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix))), + value -> Mono.fromCallable(() -> deserializeValue(keySuffix, value)), + value -> Mono.fromRunnable(value::close)); } @Override @@ -306,13 +302,10 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep updateValue(T keySuffix, UpdateReturnMode updateReturnMode, SerializationFunction<@Nullable U, @Nullable U> updater) { var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)); - return dictionary - .update(keyMono, getSerializedUpdater(updater), updateReturnMode) - .handle((valueToReceive, sink) -> { - try (valueToReceive) { - deserializeValue(keySuffix, valueToReceive, sink); - } - }); + return Mono.usingWhen(dictionary.update(keyMono, getSerializedUpdater(updater), updateReturnMode), + result -> Mono.fromCallable(() -> deserializeValue(keySuffix, result)), + result -> Mono.fromRunnable(result::close) + ); } @Override @@ -320,11 +313,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep serializeKeySuffixToKey(keySuffix)); return dictionary .updateAndGetDelta(keyMono, getSerializedUpdater(updater)) - .transform(mono -> LLUtils.mapLLDelta(mono, serialized -> { - try (serialized) { - return valueSerializer.deserialize(serialized); - } - })); + .transform(mono -> LLUtils.mapLLDelta(mono, valueSerializer::deserialize)); } public BinarySerializationFunction getSerializedUpdater(SerializationFunction<@Nullable U, @Nullable U> updater) { @@ -368,26 +357,21 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep putValueAndGetPrevious(T keySuffix, U value) { var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)); var valueMono = Mono.fromCallable(() -> serializeValue(value)); - return dictionary - .put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE) - .handle((valueToReceive, sink) -> { - try (valueToReceive) { - deserializeValue(keySuffix, valueToReceive, sink); - } - }); + return Mono.usingWhen(dictionary.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE), + valueBuf -> Mono.fromCallable(() -> deserializeValue(keySuffix, valueBuf)), + valueBuf -> Mono.fromRunnable(valueBuf::close) + ); } @Override public Mono putValueAndGetChanged(T keySuffix, U value) { var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)); var valueMono = Mono.fromCallable(() -> serializeValue(value)); - return dictionary - .put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE) - .handle((Buffer valueBuf, SynchronousSink sink) -> { - try (valueBuf) { - deserializeValue(keySuffix, valueBuf, sink); - } - }) + return Mono + .usingWhen(dictionary.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE), + valueBuf -> Mono.fromCallable(() -> deserializeValue(keySuffix, valueBuf)), + valueBuf -> Mono.fromRunnable(valueBuf::close) + ) .map(oldValue -> !Objects.equals(oldValue, value)) .defaultIfEmpty(value != null); } @@ -404,13 +388,10 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep removeAndGetPrevious(T keySuffix) { var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)); - return dictionary - .remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE) - .handle((valueToReceive, sink) -> { - try (valueToReceive) { - deserializeValue(keySuffix, valueToReceive, sink); - } - }); + return Mono.usingWhen(dictionary.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE), + valueBuf -> Mono.fromCallable(() -> deserializeValue(keySuffix, valueBuf)), + valueBuf -> Mono.fromRunnable(valueBuf::close) + ); } @Override @@ -433,15 +414,9 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>handle((valueBufOpt, sink) -> { + .handle((valueBufOpt, sink) -> { try { - Optional valueOpt; - if (valueBufOpt.isPresent()) { - valueOpt = Optional.of(valueSerializer.deserialize(valueBufOpt.get())); - } else { - valueOpt = Optional.empty(); - } - sink.next(valueOpt); + sink.next(valueBufOpt.map(valueSerializer::deserialize)); } catch (Throwable ex) { sink.error(ex); } finally { @@ -499,7 +474,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange) { + public Flux>> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange) { return getAllStages(snapshot, rangeMono, false, smallRange); } @@ -530,9 +505,8 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>> getAllStages(@Nullable CompositeSnapshot snapshot, + public Flux>> getAllStages(@Nullable CompositeSnapshot snapshot, @Nullable T keyMin, @Nullable T keyMax, boolean reverse, @@ -540,39 +514,41 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep boundedRangeMono = rangeMono - .handle((fullRange, sink) -> { - try (fullRange) { - sink.next(getPatchedRange(fullRange, keyMin, keyMax)); - } catch (SerializationException e) { - sink.error(e); - } - }); + Mono boundedRangeMono = Mono.usingWhen(rangeMono, + range -> Mono.fromCallable(() -> getPatchedRange(range, keyMin, keyMax)), + range -> Mono.fromRunnable(range::close) + ); return getAllStages(snapshot, boundedRangeMono, reverse, smallRange); } } - private Flux>> getAllStages(@Nullable CompositeSnapshot snapshot, + private Flux>> getAllStages(@Nullable CompositeSnapshot snapshot, Mono sliceRangeMono, boolean reverse, boolean smallRange) { return dictionary .getRangeKeys(resolveSnapshot(snapshot), sliceRangeMono, reverse, smallRange) - .handle((keyBuf, sink) -> { - try { - assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength; - // Remove prefix. Keep only the suffix and the ext - splitPrefix(keyBuf).close(); - suffixKeyLengthConsistency(keyBuf.readableBytes()); - T keySuffix; - try (var keyBufCopy = keyBuf.copy()) { - keySuffix = deserializeSuffix(keyBufCopy); - } - var subStage = new DatabaseMapSingle<>(dictionary, BufSupplier.ofOwned(toKey(keyBuf)), valueSerializer, null); - sink.next(Map.entry(keySuffix, subStage)); - } catch (Throwable ex) { - keyBuf.close(); - sink.error(ex); - } - }); + .flatMapSequential(keyBuf -> Mono + .>>fromCallable(() -> { + assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength; + // Remove prefix. Keep only the suffix and the ext + splitPrefix(keyBuf).close(); + suffixKeyLengthConsistency(keyBuf.readableBytes()); + T keySuffix; + try (var keyBufCopy = keyBuf.copy()) { + keySuffix = deserializeSuffix(keyBufCopy); + } + var bufSupplier = BufSupplier.ofOwned(toKey(keyBuf)); + var subStage = new DatabaseMapSingle<>(dictionary, bufSupplier, valueSerializer, null); + return new SubStageEntry<>(keySuffix, subStage); + }).doOnCancel(() -> { + if (keyBuf.isAccessible()) { + keyBuf.close(); + } + }).doOnError(ex -> { + if (keyBuf.isAccessible()) { + keyBuf.close(); + } + }) + ); } @Override @@ -583,7 +559,6 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> getAllValues(@Nullable CompositeSnapshot snapshot, @Nullable T keyMin, @@ -593,14 +568,9 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep boundedRangeMono = rangeMono - .handle((fullRange, sink) -> { - try (fullRange) { - sink.next(getPatchedRange(fullRange, keyMin, keyMax)); - } catch (SerializationException e) { - sink.error(e); - } - }); + Mono boundedRangeMono = Mono.usingWhen(rangeMono, + range -> Mono.fromCallable(() -> getPatchedRange(range, keyMin, keyMax)), + range -> Mono.fromRunnable(range::close)); return getAllValues(snapshot, boundedRangeMono, reverse, smallRange); } } @@ -610,7 +580,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>handle((serializedEntry, sink) -> { + .handle((serializedEntry, sink) -> { try { Entry entry; try (serializedEntry) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index 8a30d16..e02f21a 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -16,6 +16,7 @@ import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.RangeSupplier; +import it.cavallium.dbengine.database.SubStageEntry; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing; import it.cavallium.dbengine.database.serialization.SerializationException; @@ -370,7 +371,7 @@ public class DatabaseMapDictionaryDeep> extend } @Override - public Flux> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange) { + public Flux> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange) { return dictionary .getRangeKeyPrefixes(resolveSnapshot(snapshot), rangeMono, keyPrefixLength + keySuffixLength, smallRange) .flatMapSequential(groupKeyWithoutExtSend_ -> Mono.using( @@ -381,7 +382,7 @@ public class DatabaseMapDictionaryDeep> extend T deserializedSuffix; try (var splittedGroupSuffix = splitGroupSuffix(groupKeyWithoutExtSend)) { deserializedSuffix = this.deserializeSuffix(splittedGroupSuffix); - sink.next(Map.entry(deserializedSuffix, us)); + sink.next(new SubStageEntry<>(deserializedSuffix, us)); } catch (SerializationException ex) { sink.error(ex); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java index 4fc0e3b..1c104d6 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java @@ -10,6 +10,7 @@ import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLUtils; import io.netty5.buffer.api.internal.ResourceSupport; +import it.cavallium.dbengine.database.SubStageEntry; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; @@ -195,7 +196,7 @@ public class DatabaseMapDictionaryHashed extends public Mono> at(@Nullable CompositeSnapshot snapshot, T key) { return this .atPrivate(snapshot, key, keySuffixHashFunction.apply(key)) - .map(cast -> (DatabaseStageEntry) cast); + .map(cast -> cast); } private Mono> atPrivate(@Nullable CompositeSnapshot snapshot, T key, TH hash) { @@ -210,7 +211,8 @@ public class DatabaseMapDictionaryHashed extends } @Override - public Flux>> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange) { + public Flux>> getAllStages(@Nullable CompositeSnapshot snapshot, + boolean smallRange) { return subDictionary .getAllValues(snapshot, smallRange) .map(Entry::getValue) @@ -218,8 +220,7 @@ public class DatabaseMapDictionaryHashed extends .flatMap(bucket -> Flux .fromIterable(bucket) .map(Entry::getKey) - .flatMap(key -> this.at(snapshot, key).map(stage -> Map.entry(key, stage))) - ); + .flatMap(key -> this.at(snapshot, key).map(stage -> new SubStageEntry<>(key, stage)))); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java index c5a4db1..117a290 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java @@ -167,11 +167,7 @@ public class DatabaseMapSingle extends ResourceSupport, Data } else { return serializeValue(result); } - }).transform(mono -> LLUtils.mapLLDelta(mono, serialized -> { - try (serialized) { - return serializer.deserialize(serialized); - } - })); + }).transform(mono -> LLUtils.mapLLDelta(mono, serializer::deserialize)); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java index 3ddcbfc..3cf222d 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java @@ -124,17 +124,15 @@ public class DatabaseSingleBucket @Override public Mono> updateAndGetDelta(SerializationFunction<@Nullable V, @Nullable V> updater) { - return bucketStage - .updateAndGetDelta(oldBucket -> { - V oldValue = extractValue(oldBucket); - var result = updater.apply(oldValue); - if (result == null) { - return this.removeValueOrDelete(oldBucket); - } else { - return this.insertValueOrCreate(oldBucket, result); - } - }) - .transform(mono -> LLUtils.mapDelta(mono, this::extractValue)); + return bucketStage.updateAndGetDelta(oldBucket -> { + V oldValue = extractValue(oldBucket); + var result = updater.apply(oldValue); + if (result == null) { + return this.removeValueOrDelete(oldBucket); + } else { + return this.insertValueOrCreate(oldBucket, result); + } + }).transform(mono -> LLUtils.mapDelta(mono, this::extractValue)); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java index ed77d80..8b12a3b 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java @@ -164,11 +164,7 @@ public class DatabaseSingleton extends ResourceSupport, Data return serializeValue(result); } } - }).transform(mono -> LLUtils.mapLLDelta(mono, serialized -> { - try (serialized) { - return serializer.deserialize(serialized); - } - })); + }).transform(mono -> LLUtils.mapLLDelta(mono, serializer::deserialize)); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index 57037c3..4f4a7d1 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.collections; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.SubStageEntry; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.serialization.KVSerializationFunction; @@ -120,7 +121,7 @@ public interface DatabaseStageMap> extends return entries.flatMap(entry -> this.putValue(entry.getKey(), entry.getValue())).then(); } - Flux> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange); + Flux> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange); default Flux> getAllValues(@Nullable CompositeSnapshot snapshot, boolean smallRange) { return this diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 881ca2d..eafea4a 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -421,26 +421,33 @@ public class LLLocalDictionary implements LLDictionary { case GET_NEW_VALUE -> UpdateAtomicResultMode.CURRENT; case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS; }; - UpdateAtomicResult result; - var readOptions = generateReadOptionsOrStatic(null); - startedUpdates.increment(); - try (var writeOptions = new WriteOptions()) { - result = updateTime.recordCallable(() -> db.updateAtomic(readOptions, writeOptions, key, updater, returnMode)); - } finally { - endedUpdates.increment(); - if (readOptions != EMPTY_READ_OPTIONS) { - readOptions.close(); + UpdateAtomicResult result = null; + try { + var readOptions = generateReadOptionsOrStatic(null); + startedUpdates.increment(); + try (var writeOptions = new WriteOptions()) { + result = updateTime.recordCallable(() -> db.updateAtomic(readOptions, writeOptions, key, updater, returnMode)); + } finally { + endedUpdates.increment(); + if (readOptions != EMPTY_READ_OPTIONS) { + readOptions.close(); + } } - } - assert result != null; - return switch (updateReturnMode) { - case NOTHING -> { + assert result != null; + return switch (updateReturnMode) { + case NOTHING -> { + result.close(); + yield null; + } + case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current(); + case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous(); + }; + } catch (Throwable ex) { + if (result != null) { result.close(); - yield null; } - case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current(); - case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous(); - }; + throw ex; + } }), key -> Mono.fromRunnable(key::close)); } @@ -458,19 +465,27 @@ public class LLLocalDictionary implements LLDictionary { + "safe atomic operations"); } - UpdateAtomicResult result; - var readOptions = generateReadOptionsOrStatic(null); - startedUpdates.increment(); - try (var writeOptions = new WriteOptions()) { - result = updateTime.recordCallable(() -> db.updateAtomic(readOptions, writeOptions, key, updater, DELTA)); - } finally { - endedUpdates.increment(); - if (readOptions != EMPTY_READ_OPTIONS) { - readOptions.close(); + UpdateAtomicResultDelta result = null; + try { + var readOptions = generateReadOptionsOrStatic(null); + startedUpdates.increment(); + try (var writeOptions = new WriteOptions()) { + result = updateTime.recordCallable(() -> + (UpdateAtomicResultDelta) db.updateAtomic(readOptions, writeOptions, key, updater, DELTA)); + } finally { + endedUpdates.increment(); + if (readOptions != EMPTY_READ_OPTIONS) { + readOptions.close(); + } } + assert result != null; + return result.delta(); + } catch (Throwable ex) { + if (result != null && result.delta().isAccessible()) { + result.close(); + } + throw ex; } - assert result != null; - return ((UpdateAtomicResultDelta) result).delta(); }), key -> Mono.fromRunnable(key::close)); } @@ -938,7 +953,7 @@ public class LLLocalDictionary implements LLDictionary { if (USE_WINDOW_IN_SET_RANGE) { return Mono .usingWhen(rangeMono, range -> runOnDb(true, () -> { - try (var writeOptions = new WriteOptions(); range) { + try (var writeOptions = new WriteOptions()) { assert !Schedulers.isInNonBlockingThread() : "Called setRange in a nonblocking thread"; if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) { try (var opts = LLUtils.generateCustomReadOptions(null, true, isBoundedRange(range), smallRange)) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java index d7aff26..9b749c7 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java @@ -96,33 +96,37 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn= 5 && retries % 5 == 0 || ALWAYS_PRINT_OPTIMISTIC_RETRIES) { - logger.warn(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):" - + " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries); - } else if (logger.isDebugEnabled(MARKER_ROCKSDB)) { - logger.debug(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):" - + " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries); - } - // Wait for n milliseconds - if (retryNs > 0) { - LockSupport.parkNanos(retryNs); - } + if (retries >= 5 && retries % 5 == 0 || ALWAYS_PRINT_OPTIMISTIC_RETRIES) { + logger.warn(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):" + + " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries); + } else if (logger.isDebugEnabled(MARKER_ROCKSDB)) { + logger.debug(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):" + + " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries); + } + // Wait for n milliseconds + if (retryNs > 0) { + LockSupport.parkNanos(retryNs); } } + } while (!committedSuccessfully); + if (retries > 5) { + logger.warn(MARKER_ROCKSDB, "Took {} retries to update key {}", retries, LLUtils.toStringSafe(key)); } - } while (!committedSuccessfully); - if (retries > 5) { - logger.warn(MARKER_ROCKSDB, "Took {} retries to update key {}", retries, LLUtils.toStringSafe(key)); + recordAtomicUpdateTime(changed, prevData != null, newData != null, initNanoTime); + optimisticAttempts.record(retries); + return switch (returnMode) { + case NOTHING -> { + if (prevData != null) { + prevData.close(); + } + if (newData != null) { + newData.close(); + } + yield RESULT_NOTHING; + } + case CURRENT -> { + if (prevData != null) { + prevData.close(); + } + yield new UpdateAtomicResultCurrent(newData); + } + case PREVIOUS -> { + if (newData != null) { + newData.close(); + } + yield new UpdateAtomicResultPrevious(prevData); + } + case BINARY_CHANGED -> { + if (prevData != null) { + prevData.close(); + } + if (newData != null) { + newData.close(); + } + yield new UpdateAtomicResultBinaryChanged(changed); + } + case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(prevData, newData)); + }; + } catch (Throwable ex) { + if (prevData != null && prevData.isAccessible()) { + prevData.close(); + } + if (newData != null && newData.isAccessible()) { + newData.close(); + } + throw ex; } - recordAtomicUpdateTime(changed, sentPrevData != null, sentCurData != null, initNanoTime); - optimisticAttempts.record(retries); - return switch (returnMode) { - case NOTHING -> { - if (sentPrevData != null) { - sentPrevData.close(); - } - if (sentCurData != null) { - sentCurData.close(); - } - yield RESULT_NOTHING; - } - case CURRENT -> { - if (sentPrevData != null) { - sentPrevData.close(); - } - yield new UpdateAtomicResultCurrent(sentCurData); - } - case PREVIOUS -> { - if (sentCurData != null) { - sentCurData.close(); - } - yield new UpdateAtomicResultPrevious(sentPrevData); - } - case BINARY_CHANGED -> { - if (sentPrevData != null) { - sentPrevData.close(); - } - if (sentCurData != null) { - sentCurData.close(); - } - yield new UpdateAtomicResultBinaryChanged(changed); - } - case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(sentPrevData, sentCurData)); - }; } } catch (Throwable ex) { throw new IOException("Failed to update key " + LLUtils.toStringSafe(key), ex); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java index f0d7873..f2db6bb 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java @@ -64,30 +64,28 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn { + if (prevData != null) { + prevData.close(); } - sentPrevData = prevData == null ? null : prevData.copy(); - sentCurData = newData == null ? null : newData.copy(); + if (newData != null) { + newData.close(); + } + yield RESULT_NOTHING; } + case CURRENT -> { + if (prevData != null) { + prevData.close(); + } + yield new UpdateAtomicResultCurrent(newData); + } + case PREVIOUS -> { + if (newData != null) { + newData.close(); + } + yield new UpdateAtomicResultPrevious(prevData); + } + case BINARY_CHANGED -> { + if (prevData != null) { + prevData.close(); + } + if (newData != null) { + newData.close(); + } + yield new UpdateAtomicResultBinaryChanged(changed); + } + case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(prevData, newData)); + }; + } catch (Throwable ex) { + if (prevData != null && prevData.isAccessible()) { + prevData.close(); } - } finally { - tx.undoGetForUpdate(cfh, keyArray); + if (newData != null && newData.isAccessible()) { + newData.close(); + } + throw ex; } - recordAtomicUpdateTime(changed, sentPrevData != null, sentCurData != null, initNanoTime); - return switch (returnMode) { - case NOTHING -> { - if (sentPrevData != null) { - sentPrevData.close(); - } - if (sentCurData != null) { - sentCurData.close(); - } - yield RESULT_NOTHING; - } - case CURRENT -> { - if (sentPrevData != null) { - sentPrevData.close(); - } - yield new UpdateAtomicResultCurrent(sentCurData); - } - case PREVIOUS -> { - if (sentCurData != null) { - sentCurData.close(); - } - yield new UpdateAtomicResultPrevious(sentPrevData); - } - case BINARY_CHANGED -> { - if (sentPrevData != null) { - sentPrevData.close(); - } - if (sentCurData != null) { - sentCurData.close(); - } - yield new UpdateAtomicResultBinaryChanged(changed); - } - case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(sentPrevData, sentCurData)); - }; } } catch (Throwable ex) { throw new IOException("Failed to update key " + LLUtils.toStringSafe(key), ex);