diff --git a/pom.xml b/pom.xml
index 5e02850..adac45e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -90,13 +90,20 @@
io.projectreactor
reactor-bom
- 2020.0.18
+ 2020.0.19
pom
import
+
+ io.projectreactor
+ reactor-tools
+ original
+ runtime
+ 3.4.18
+
com.google.guava
guava
@@ -106,6 +113,16 @@
io.netty
netty5-buffer
5.0.0.Alpha2
+
+
+ io.netty
+ netty-common
+
+
+ io.netty
+ netty-buffer
+
+
org.yaml
@@ -418,11 +435,11 @@
3.12.0
compile
-
- io.projectreactor
- reactor-test
- test
-
+
+ io.projectreactor
+ reactor-test
+ test
+
src/test/java
@@ -600,6 +617,18 @@
+
+ net.bytebuddy
+ byte-buddy-maven-plugin
+ 1.12.10
+
+
+
+ reactor.tools.agent.ReactorDebugByteBuddyPlugin
+
+
+
+
diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java
index ccdaf7d..46bb712 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java
@@ -797,29 +797,21 @@ public class LLUtils {
}
public static Mono resolveLLDelta(Mono prev, UpdateReturnMode updateReturnMode) {
- return prev.handle((delta, sink) -> {
+ return prev.mapNotNull(delta -> {
final Buffer previous = delta.previousUnsafe();
final Buffer current = delta.currentUnsafe();
- switch (updateReturnMode) {
+ return switch (updateReturnMode) {
case GET_NEW_VALUE -> {
if (previous != null && previous.isAccessible()) {
previous.close();
}
- if (current != null) {
- sink.next(current);
- } else {
- sink.complete();
- }
+ yield current;
}
case GET_OLD_VALUE -> {
if (current != null && current.isAccessible()) {
current.close();
}
- if (previous != null) {
- sink.next(previous);
- } else {
- sink.complete();
- }
+ yield previous;
}
case NOTHING -> {
if (previous != null && previous.isAccessible()) {
@@ -828,10 +820,9 @@ public class LLUtils {
if (current != null && current.isAccessible()) {
current.close();
}
- sink.complete();
+ yield null;
}
- default -> sink.error(new IllegalStateException());
- }
+ };
});
}
@@ -862,27 +853,23 @@ public class LLUtils {
public static Mono> mapLLDelta(Mono mono,
SerializationFunction<@NotNull Buffer, @Nullable U> mapper) {
- return mono.handle((delta, sink) -> {
- try (delta) {
- Buffer prev = delta.previousUnsafe();
- Buffer curr = delta.currentUnsafe();
- U newPrev;
- U newCurr;
- if (prev != null) {
- newPrev = mapper.apply(prev);
- } else {
- newPrev = null;
- }
- if (curr != null) {
- newCurr = mapper.apply(curr);
- } else {
- newCurr = null;
- }
- sink.next(new Delta<>(newPrev, newCurr));
- } catch (SerializationException ex) {
- sink.error(ex);
+ return Mono.usingWhen(mono, delta -> Mono.fromCallable(() -> {
+ Buffer prev = delta.previousUnsafe();
+ Buffer curr = delta.currentUnsafe();
+ U newPrev;
+ U newCurr;
+ if (prev != null) {
+ newPrev = mapper.apply(prev);
+ } else {
+ newPrev = null;
}
- });
+ if (curr != null) {
+ newCurr = mapper.apply(curr);
+ } else {
+ newCurr = null;
+ }
+ return new Delta<>(newPrev, newCurr);
+ }), delta -> Mono.fromRunnable(delta::close));
}
public static boolean isDeltaChanged(Delta delta) {
diff --git a/src/main/java/it/cavallium/dbengine/database/SubStageEntry.java b/src/main/java/it/cavallium/dbengine/database/SubStageEntry.java
new file mode 100644
index 0000000..bd8839d
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/SubStageEntry.java
@@ -0,0 +1,60 @@
+package it.cavallium.dbengine.database;
+
+import it.cavallium.dbengine.database.collections.DatabaseStage;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+public final class SubStageEntry> implements SafeCloseable, Entry {
+
+ private final T key;
+ private final U value;
+
+ public SubStageEntry(T key, U value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public void close() {
+ if (value != null && value.isAccessible()) {
+ value.close();
+ }
+ }
+
+ @Override
+ public T getKey() {
+ return key;
+ }
+
+ @Override
+ public U getValue() {
+ return value;
+ }
+
+ @Override
+ public U setValue(U value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this)
+ return true;
+ if (obj == null || obj.getClass() != this.getClass())
+ return false;
+ //noinspection rawtypes
+ var that = (SubStageEntry) obj;
+ return Objects.equals(this.key, that.key) && Objects.equals(this.value, that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, value);
+ }
+
+ @Override
+ public String toString() {
+ return "SubStageEntry[" + "key=" + key + ", " + "value=" + value + ']';
+ }
+
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java
index fe6f594..1808cfc 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java
@@ -4,7 +4,6 @@ import static java.util.Objects.requireNonNullElseGet;
import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.Resource;
-import io.netty5.buffer.api.Send;
import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.BufSupplier;
@@ -14,6 +13,7 @@ import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLEntry;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
+import it.cavallium.dbengine.database.SubStageEntry;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.disk.BinarySerializationFunction;
@@ -126,32 +126,31 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep sink) {
+ private @Nullable U deserializeValue(T keySuffix, Buffer value) {
try {
- sink.next(valueSerializer.deserialize(value));
+ return valueSerializer.deserialize(value);
} catch (IndexOutOfBoundsException ex) {
var exMessage = ex.getMessage();
if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) {
var totalZeroBytesErrors = this.totalZeroBytesErrors.incrementAndGet();
if (totalZeroBytesErrors < 512 || totalZeroBytesErrors % 10000 == 0) {
try (var keySuffixBytes = serializeKeySuffixToKey(keySuffix)) {
- LOG.error("Unexpected zero-bytes value at " + dictionary.getDatabaseName()
- + ":" + dictionary.getColumnName()
- + ":" + LLUtils.toStringSafe(this.keyPrefix)
- + ":" + keySuffix + "(" + LLUtils.toStringSafe(keySuffixBytes) + ") total=" + totalZeroBytesErrors);
+ LOG.error(
+ "Unexpected zero-bytes value at "
+ + dictionary.getDatabaseName() + ":" + dictionary.getColumnName()
+ + ":" + LLUtils.toStringSafe(this.keyPrefix) + ":" + keySuffix
+ + "(" + LLUtils.toStringSafe(keySuffixBytes) + ") total=" + totalZeroBytesErrors);
} catch (SerializationException e) {
- LOG.error("Unexpected zero-bytes value at " + dictionary.getDatabaseName()
- + ":" + dictionary.getColumnName()
- + ":" + LLUtils.toStringSafe(this.keyPrefix)
- + ":" + keySuffix + "(?) total=" + totalZeroBytesErrors);
+ LOG.error(
+ "Unexpected zero-bytes value at " + dictionary.getDatabaseName() + ":" + dictionary.getColumnName()
+ + ":" + LLUtils.toStringSafe(this.keyPrefix) + ":" + keySuffix + "(?) total="
+ + totalZeroBytesErrors);
}
}
- sink.complete();
+ return null;
} else {
- sink.error(ex);
+ throw ex;
}
- } catch (Throwable ex) {
- sink.error(ex);
}
}
@@ -278,13 +277,10 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep getValue(@Nullable CompositeSnapshot snapshot, T keySuffix, boolean existsAlmostCertainly) {
- return dictionary
- .get(resolveSnapshot(snapshot), Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)))
- .handle((valueToReceive, sink) -> {
- try (valueToReceive) {
- deserializeValue(keySuffix, valueToReceive, sink);
- }
- });
+ return Mono.usingWhen(dictionary
+ .get(resolveSnapshot(snapshot), Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix))),
+ value -> Mono.fromCallable(() -> deserializeValue(keySuffix, value)),
+ value -> Mono.fromRunnable(value::close));
}
@Override
@@ -306,13 +302,10 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep updateValue(T keySuffix, UpdateReturnMode updateReturnMode,
SerializationFunction<@Nullable U, @Nullable U> updater) {
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
- return dictionary
- .update(keyMono, getSerializedUpdater(updater), updateReturnMode)
- .handle((valueToReceive, sink) -> {
- try (valueToReceive) {
- deserializeValue(keySuffix, valueToReceive, sink);
- }
- });
+ return Mono.usingWhen(dictionary.update(keyMono, getSerializedUpdater(updater), updateReturnMode),
+ result -> Mono.fromCallable(() -> deserializeValue(keySuffix, result)),
+ result -> Mono.fromRunnable(result::close)
+ );
}
@Override
@@ -320,11 +313,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep serializeKeySuffixToKey(keySuffix));
return dictionary
.updateAndGetDelta(keyMono, getSerializedUpdater(updater))
- .transform(mono -> LLUtils.mapLLDelta(mono, serialized -> {
- try (serialized) {
- return valueSerializer.deserialize(serialized);
- }
- }));
+ .transform(mono -> LLUtils.mapLLDelta(mono, valueSerializer::deserialize));
}
public BinarySerializationFunction getSerializedUpdater(SerializationFunction<@Nullable U, @Nullable U> updater) {
@@ -368,26 +357,21 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep putValueAndGetPrevious(T keySuffix, U value) {
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
var valueMono = Mono.fromCallable(() -> serializeValue(value));
- return dictionary
- .put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE)
- .handle((valueToReceive, sink) -> {
- try (valueToReceive) {
- deserializeValue(keySuffix, valueToReceive, sink);
- }
- });
+ return Mono.usingWhen(dictionary.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE),
+ valueBuf -> Mono.fromCallable(() -> deserializeValue(keySuffix, valueBuf)),
+ valueBuf -> Mono.fromRunnable(valueBuf::close)
+ );
}
@Override
public Mono putValueAndGetChanged(T keySuffix, U value) {
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
var valueMono = Mono.fromCallable(() -> serializeValue(value));
- return dictionary
- .put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE)
- .handle((Buffer valueBuf, SynchronousSink sink) -> {
- try (valueBuf) {
- deserializeValue(keySuffix, valueBuf, sink);
- }
- })
+ return Mono
+ .usingWhen(dictionary.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE),
+ valueBuf -> Mono.fromCallable(() -> deserializeValue(keySuffix, valueBuf)),
+ valueBuf -> Mono.fromRunnable(valueBuf::close)
+ )
.map(oldValue -> !Objects.equals(oldValue, value))
.defaultIfEmpty(value != null);
}
@@ -404,13 +388,10 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep removeAndGetPrevious(T keySuffix) {
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
- return dictionary
- .remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE)
- .handle((valueToReceive, sink) -> {
- try (valueToReceive) {
- deserializeValue(keySuffix, valueToReceive, sink);
- }
- });
+ return Mono.usingWhen(dictionary.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE),
+ valueBuf -> Mono.fromCallable(() -> deserializeValue(keySuffix, valueBuf)),
+ valueBuf -> Mono.fromRunnable(valueBuf::close)
+ );
}
@Override
@@ -433,15 +414,9 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>handle((valueBufOpt, sink) -> {
+ .handle((valueBufOpt, sink) -> {
try {
- Optional valueOpt;
- if (valueBufOpt.isPresent()) {
- valueOpt = Optional.of(valueSerializer.deserialize(valueBufOpt.get()));
- } else {
- valueOpt = Optional.empty();
- }
- sink.next(valueOpt);
+ sink.next(valueBufOpt.map(valueSerializer::deserialize));
} catch (Throwable ex) {
sink.error(ex);
} finally {
@@ -499,7 +474,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
+ public Flux>> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
return getAllStages(snapshot, rangeMono, false, smallRange);
}
@@ -530,9 +505,8 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>> getAllStages(@Nullable CompositeSnapshot snapshot,
+ public Flux>> getAllStages(@Nullable CompositeSnapshot snapshot,
@Nullable T keyMin,
@Nullable T keyMax,
boolean reverse,
@@ -540,39 +514,41 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep boundedRangeMono = rangeMono
- .handle((fullRange, sink) -> {
- try (fullRange) {
- sink.next(getPatchedRange(fullRange, keyMin, keyMax));
- } catch (SerializationException e) {
- sink.error(e);
- }
- });
+ Mono boundedRangeMono = Mono.usingWhen(rangeMono,
+ range -> Mono.fromCallable(() -> getPatchedRange(range, keyMin, keyMax)),
+ range -> Mono.fromRunnable(range::close)
+ );
return getAllStages(snapshot, boundedRangeMono, reverse, smallRange);
}
}
- private Flux>> getAllStages(@Nullable CompositeSnapshot snapshot,
+ private Flux>> getAllStages(@Nullable CompositeSnapshot snapshot,
Mono sliceRangeMono, boolean reverse, boolean smallRange) {
return dictionary
.getRangeKeys(resolveSnapshot(snapshot), sliceRangeMono, reverse, smallRange)
- .handle((keyBuf, sink) -> {
- try {
- assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
- // Remove prefix. Keep only the suffix and the ext
- splitPrefix(keyBuf).close();
- suffixKeyLengthConsistency(keyBuf.readableBytes());
- T keySuffix;
- try (var keyBufCopy = keyBuf.copy()) {
- keySuffix = deserializeSuffix(keyBufCopy);
- }
- var subStage = new DatabaseMapSingle<>(dictionary, BufSupplier.ofOwned(toKey(keyBuf)), valueSerializer, null);
- sink.next(Map.entry(keySuffix, subStage));
- } catch (Throwable ex) {
- keyBuf.close();
- sink.error(ex);
- }
- });
+ .flatMapSequential(keyBuf -> Mono
+ .>>fromCallable(() -> {
+ assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
+ // Remove prefix. Keep only the suffix and the ext
+ splitPrefix(keyBuf).close();
+ suffixKeyLengthConsistency(keyBuf.readableBytes());
+ T keySuffix;
+ try (var keyBufCopy = keyBuf.copy()) {
+ keySuffix = deserializeSuffix(keyBufCopy);
+ }
+ var bufSupplier = BufSupplier.ofOwned(toKey(keyBuf));
+ var subStage = new DatabaseMapSingle<>(dictionary, bufSupplier, valueSerializer, null);
+ return new SubStageEntry<>(keySuffix, subStage);
+ }).doOnCancel(() -> {
+ if (keyBuf.isAccessible()) {
+ keyBuf.close();
+ }
+ }).doOnError(ex -> {
+ if (keyBuf.isAccessible()) {
+ keyBuf.close();
+ }
+ })
+ );
}
@Override
@@ -583,7 +559,6 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> getAllValues(@Nullable CompositeSnapshot snapshot,
@Nullable T keyMin,
@@ -593,14 +568,9 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep boundedRangeMono = rangeMono
- .handle((fullRange, sink) -> {
- try (fullRange) {
- sink.next(getPatchedRange(fullRange, keyMin, keyMax));
- } catch (SerializationException e) {
- sink.error(e);
- }
- });
+ Mono boundedRangeMono = Mono.usingWhen(rangeMono,
+ range -> Mono.fromCallable(() -> getPatchedRange(range, keyMin, keyMax)),
+ range -> Mono.fromRunnable(range::close));
return getAllValues(snapshot, boundedRangeMono, reverse, smallRange);
}
}
@@ -610,7 +580,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>handle((serializedEntry, sink) -> {
+ .handle((serializedEntry, sink) -> {
try {
Entry entry;
try (serializedEntry) {
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java
index 8a30d16..e02f21a 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java
@@ -16,6 +16,7 @@ import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.RangeSupplier;
+import it.cavallium.dbengine.database.SubStageEntry;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing;
import it.cavallium.dbengine.database.serialization.SerializationException;
@@ -370,7 +371,7 @@ public class DatabaseMapDictionaryDeep> extend
}
@Override
- public Flux> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
+ public Flux> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
return dictionary
.getRangeKeyPrefixes(resolveSnapshot(snapshot), rangeMono, keyPrefixLength + keySuffixLength, smallRange)
.flatMapSequential(groupKeyWithoutExtSend_ -> Mono.using(
@@ -381,7 +382,7 @@ public class DatabaseMapDictionaryDeep> extend
T deserializedSuffix;
try (var splittedGroupSuffix = splitGroupSuffix(groupKeyWithoutExtSend)) {
deserializedSuffix = this.deserializeSuffix(splittedGroupSuffix);
- sink.next(Map.entry(deserializedSuffix, us));
+ sink.next(new SubStageEntry<>(deserializedSuffix, us));
} catch (SerializationException ex) {
sink.error(ex);
}
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java
index 4fc0e3b..1c104d6 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java
@@ -10,6 +10,7 @@ import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLUtils;
import io.netty5.buffer.api.internal.ResourceSupport;
+import it.cavallium.dbengine.database.SubStageEntry;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
@@ -195,7 +196,7 @@ public class DatabaseMapDictionaryHashed extends
public Mono> at(@Nullable CompositeSnapshot snapshot, T key) {
return this
.atPrivate(snapshot, key, keySuffixHashFunction.apply(key))
- .map(cast -> (DatabaseStageEntry) cast);
+ .map(cast -> cast);
}
private Mono> atPrivate(@Nullable CompositeSnapshot snapshot, T key, TH hash) {
@@ -210,7 +211,8 @@ public class DatabaseMapDictionaryHashed extends
}
@Override
- public Flux>> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
+ public Flux>> getAllStages(@Nullable CompositeSnapshot snapshot,
+ boolean smallRange) {
return subDictionary
.getAllValues(snapshot, smallRange)
.map(Entry::getValue)
@@ -218,8 +220,7 @@ public class DatabaseMapDictionaryHashed extends
.flatMap(bucket -> Flux
.fromIterable(bucket)
.map(Entry::getKey)
- .flatMap(key -> this.at(snapshot, key).map(stage -> Map.entry(key, stage)))
- );
+ .flatMap(key -> this.at(snapshot, key).map(stage -> new SubStageEntry<>(key, stage))));
}
@Override
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java
index c5a4db1..117a290 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java
@@ -167,11 +167,7 @@ public class DatabaseMapSingle extends ResourceSupport, Data
} else {
return serializeValue(result);
}
- }).transform(mono -> LLUtils.mapLLDelta(mono, serialized -> {
- try (serialized) {
- return serializer.deserialize(serialized);
- }
- }));
+ }).transform(mono -> LLUtils.mapLLDelta(mono, serializer::deserialize));
}
@Override
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java
index 3ddcbfc..3cf222d 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java
@@ -124,17 +124,15 @@ public class DatabaseSingleBucket
@Override
public Mono> updateAndGetDelta(SerializationFunction<@Nullable V, @Nullable V> updater) {
- return bucketStage
- .updateAndGetDelta(oldBucket -> {
- V oldValue = extractValue(oldBucket);
- var result = updater.apply(oldValue);
- if (result == null) {
- return this.removeValueOrDelete(oldBucket);
- } else {
- return this.insertValueOrCreate(oldBucket, result);
- }
- })
- .transform(mono -> LLUtils.mapDelta(mono, this::extractValue));
+ return bucketStage.updateAndGetDelta(oldBucket -> {
+ V oldValue = extractValue(oldBucket);
+ var result = updater.apply(oldValue);
+ if (result == null) {
+ return this.removeValueOrDelete(oldBucket);
+ } else {
+ return this.insertValueOrCreate(oldBucket, result);
+ }
+ }).transform(mono -> LLUtils.mapDelta(mono, this::extractValue));
}
@Override
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java
index ed77d80..8b12a3b 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java
@@ -164,11 +164,7 @@ public class DatabaseSingleton extends ResourceSupport, Data
return serializeValue(result);
}
}
- }).transform(mono -> LLUtils.mapLLDelta(mono, serialized -> {
- try (serialized) {
- return serializer.deserialize(serialized);
- }
- }));
+ }).transform(mono -> LLUtils.mapLLDelta(mono, serializer::deserialize));
}
@Override
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java
index 57037c3..4f4a7d1 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java
@@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.collections;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLUtils;
+import it.cavallium.dbengine.database.SubStageEntry;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.serialization.KVSerializationFunction;
@@ -120,7 +121,7 @@ public interface DatabaseStageMap> extends
return entries.flatMap(entry -> this.putValue(entry.getKey(), entry.getValue())).then();
}
- Flux> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange);
+ Flux> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange);
default Flux> getAllValues(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
return this
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java
index 881ca2d..eafea4a 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java
@@ -421,26 +421,33 @@ public class LLLocalDictionary implements LLDictionary {
case GET_NEW_VALUE -> UpdateAtomicResultMode.CURRENT;
case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS;
};
- UpdateAtomicResult result;
- var readOptions = generateReadOptionsOrStatic(null);
- startedUpdates.increment();
- try (var writeOptions = new WriteOptions()) {
- result = updateTime.recordCallable(() -> db.updateAtomic(readOptions, writeOptions, key, updater, returnMode));
- } finally {
- endedUpdates.increment();
- if (readOptions != EMPTY_READ_OPTIONS) {
- readOptions.close();
+ UpdateAtomicResult result = null;
+ try {
+ var readOptions = generateReadOptionsOrStatic(null);
+ startedUpdates.increment();
+ try (var writeOptions = new WriteOptions()) {
+ result = updateTime.recordCallable(() -> db.updateAtomic(readOptions, writeOptions, key, updater, returnMode));
+ } finally {
+ endedUpdates.increment();
+ if (readOptions != EMPTY_READ_OPTIONS) {
+ readOptions.close();
+ }
}
- }
- assert result != null;
- return switch (updateReturnMode) {
- case NOTHING -> {
+ assert result != null;
+ return switch (updateReturnMode) {
+ case NOTHING -> {
+ result.close();
+ yield null;
+ }
+ case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current();
+ case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous();
+ };
+ } catch (Throwable ex) {
+ if (result != null) {
result.close();
- yield null;
}
- case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current();
- case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous();
- };
+ throw ex;
+ }
}), key -> Mono.fromRunnable(key::close));
}
@@ -458,19 +465,27 @@ public class LLLocalDictionary implements LLDictionary {
+ "safe atomic operations");
}
- UpdateAtomicResult result;
- var readOptions = generateReadOptionsOrStatic(null);
- startedUpdates.increment();
- try (var writeOptions = new WriteOptions()) {
- result = updateTime.recordCallable(() -> db.updateAtomic(readOptions, writeOptions, key, updater, DELTA));
- } finally {
- endedUpdates.increment();
- if (readOptions != EMPTY_READ_OPTIONS) {
- readOptions.close();
+ UpdateAtomicResultDelta result = null;
+ try {
+ var readOptions = generateReadOptionsOrStatic(null);
+ startedUpdates.increment();
+ try (var writeOptions = new WriteOptions()) {
+ result = updateTime.recordCallable(() ->
+ (UpdateAtomicResultDelta) db.updateAtomic(readOptions, writeOptions, key, updater, DELTA));
+ } finally {
+ endedUpdates.increment();
+ if (readOptions != EMPTY_READ_OPTIONS) {
+ readOptions.close();
+ }
}
+ assert result != null;
+ return result.delta();
+ } catch (Throwable ex) {
+ if (result != null && result.delta().isAccessible()) {
+ result.close();
+ }
+ throw ex;
}
- assert result != null;
- return ((UpdateAtomicResultDelta) result).delta();
}), key -> Mono.fromRunnable(key::close));
}
@@ -938,7 +953,7 @@ public class LLLocalDictionary implements LLDictionary {
if (USE_WINDOW_IN_SET_RANGE) {
return Mono
.usingWhen(rangeMono, range -> runOnDb(true, () -> {
- try (var writeOptions = new WriteOptions(); range) {
+ try (var writeOptions = new WriteOptions()) {
assert !Schedulers.isInNonBlockingThread() : "Called setRange in a nonblocking thread";
if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) {
try (var opts = LLUtils.generateCustomReadOptions(null, true, isBoundedRange(range), smallRange)) {
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java
index d7aff26..9b749c7 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java
@@ -96,33 +96,37 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn= 5 && retries % 5 == 0 || ALWAYS_PRINT_OPTIMISTIC_RETRIES) {
- logger.warn(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):"
- + " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries);
- } else if (logger.isDebugEnabled(MARKER_ROCKSDB)) {
- logger.debug(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):"
- + " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries);
- }
- // Wait for n milliseconds
- if (retryNs > 0) {
- LockSupport.parkNanos(retryNs);
- }
+ if (retries >= 5 && retries % 5 == 0 || ALWAYS_PRINT_OPTIMISTIC_RETRIES) {
+ logger.warn(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):"
+ + " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries);
+ } else if (logger.isDebugEnabled(MARKER_ROCKSDB)) {
+ logger.debug(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):"
+ + " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries);
+ }
+ // Wait for n milliseconds
+ if (retryNs > 0) {
+ LockSupport.parkNanos(retryNs);
}
}
+ } while (!committedSuccessfully);
+ if (retries > 5) {
+ logger.warn(MARKER_ROCKSDB, "Took {} retries to update key {}", retries, LLUtils.toStringSafe(key));
}
- } while (!committedSuccessfully);
- if (retries > 5) {
- logger.warn(MARKER_ROCKSDB, "Took {} retries to update key {}", retries, LLUtils.toStringSafe(key));
+ recordAtomicUpdateTime(changed, prevData != null, newData != null, initNanoTime);
+ optimisticAttempts.record(retries);
+ return switch (returnMode) {
+ case NOTHING -> {
+ if (prevData != null) {
+ prevData.close();
+ }
+ if (newData != null) {
+ newData.close();
+ }
+ yield RESULT_NOTHING;
+ }
+ case CURRENT -> {
+ if (prevData != null) {
+ prevData.close();
+ }
+ yield new UpdateAtomicResultCurrent(newData);
+ }
+ case PREVIOUS -> {
+ if (newData != null) {
+ newData.close();
+ }
+ yield new UpdateAtomicResultPrevious(prevData);
+ }
+ case BINARY_CHANGED -> {
+ if (prevData != null) {
+ prevData.close();
+ }
+ if (newData != null) {
+ newData.close();
+ }
+ yield new UpdateAtomicResultBinaryChanged(changed);
+ }
+ case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(prevData, newData));
+ };
+ } catch (Throwable ex) {
+ if (prevData != null && prevData.isAccessible()) {
+ prevData.close();
+ }
+ if (newData != null && newData.isAccessible()) {
+ newData.close();
+ }
+ throw ex;
}
- recordAtomicUpdateTime(changed, sentPrevData != null, sentCurData != null, initNanoTime);
- optimisticAttempts.record(retries);
- return switch (returnMode) {
- case NOTHING -> {
- if (sentPrevData != null) {
- sentPrevData.close();
- }
- if (sentCurData != null) {
- sentCurData.close();
- }
- yield RESULT_NOTHING;
- }
- case CURRENT -> {
- if (sentPrevData != null) {
- sentPrevData.close();
- }
- yield new UpdateAtomicResultCurrent(sentCurData);
- }
- case PREVIOUS -> {
- if (sentCurData != null) {
- sentCurData.close();
- }
- yield new UpdateAtomicResultPrevious(sentPrevData);
- }
- case BINARY_CHANGED -> {
- if (sentPrevData != null) {
- sentPrevData.close();
- }
- if (sentCurData != null) {
- sentCurData.close();
- }
- yield new UpdateAtomicResultBinaryChanged(changed);
- }
- case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(sentPrevData, sentCurData));
- };
}
} catch (Throwable ex) {
throw new IOException("Failed to update key " + LLUtils.toStringSafe(key), ex);
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java
index f0d7873..f2db6bb 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java
@@ -64,30 +64,28 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn {
+ if (prevData != null) {
+ prevData.close();
}
- sentPrevData = prevData == null ? null : prevData.copy();
- sentCurData = newData == null ? null : newData.copy();
+ if (newData != null) {
+ newData.close();
+ }
+ yield RESULT_NOTHING;
}
+ case CURRENT -> {
+ if (prevData != null) {
+ prevData.close();
+ }
+ yield new UpdateAtomicResultCurrent(newData);
+ }
+ case PREVIOUS -> {
+ if (newData != null) {
+ newData.close();
+ }
+ yield new UpdateAtomicResultPrevious(prevData);
+ }
+ case BINARY_CHANGED -> {
+ if (prevData != null) {
+ prevData.close();
+ }
+ if (newData != null) {
+ newData.close();
+ }
+ yield new UpdateAtomicResultBinaryChanged(changed);
+ }
+ case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(prevData, newData));
+ };
+ } catch (Throwable ex) {
+ if (prevData != null && prevData.isAccessible()) {
+ prevData.close();
}
- } finally {
- tx.undoGetForUpdate(cfh, keyArray);
+ if (newData != null && newData.isAccessible()) {
+ newData.close();
+ }
+ throw ex;
}
- recordAtomicUpdateTime(changed, sentPrevData != null, sentCurData != null, initNanoTime);
- return switch (returnMode) {
- case NOTHING -> {
- if (sentPrevData != null) {
- sentPrevData.close();
- }
- if (sentCurData != null) {
- sentCurData.close();
- }
- yield RESULT_NOTHING;
- }
- case CURRENT -> {
- if (sentPrevData != null) {
- sentPrevData.close();
- }
- yield new UpdateAtomicResultCurrent(sentCurData);
- }
- case PREVIOUS -> {
- if (sentCurData != null) {
- sentCurData.close();
- }
- yield new UpdateAtomicResultPrevious(sentPrevData);
- }
- case BINARY_CHANGED -> {
- if (sentPrevData != null) {
- sentPrevData.close();
- }
- if (sentCurData != null) {
- sentCurData.close();
- }
- yield new UpdateAtomicResultBinaryChanged(changed);
- }
- case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(sentPrevData, sentCurData));
- };
}
} catch (Throwable ex) {
throw new IOException("Failed to update key " + LLUtils.toStringSafe(key), ex);