diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 1808cfc..a800428 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -50,12 +50,12 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep valueSerializer; protected DatabaseMapDictionary(LLDictionary dictionary, - @Nullable Buffer prefixKey, + @Nullable BufSupplier prefixKeySupplier, SerializerFixedBinaryLength keySuffixSerializer, Serializer valueSerializer, Runnable onClose) { // Do not retain or release or use the prefixKey here - super(dictionary, prefixKey, keySuffixSerializer, new SubStageGetterSingle<>(valueSerializer), 0, onClose); + super(dictionary, prefixKeySupplier, keySuffixSerializer, new SubStageGetterSingle<>(valueSerializer), 0, onClose); this.valueSerializer = valueSerializer; } @@ -67,11 +67,11 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep DatabaseMapDictionary tail(LLDictionary dictionary, - @Nullable Buffer prefixKey, + @Nullable BufSupplier prefixKeySupplier, SerializerFixedBinaryLength keySuffixSerializer, Serializer valueSerializer, Runnable onClose) { - return new DatabaseMapDictionary<>(dictionary, prefixKey, keySuffixSerializer, valueSerializer, onClose); + return new DatabaseMapDictionary<>(dictionary, prefixKeySupplier, keySuffixSerializer, valueSerializer, onClose); } public static Flux> getLeavesFrom(DatabaseMapDictionary databaseMapDictionary, @@ -134,17 +134,19 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> extend protected final Mono rangeMono; protected RangeSupplier rangeSupplier; - protected Buffer keyPrefix; - protected Buffer keySuffixAndExtZeroBuffer; + protected BufSupplier keyPrefixSupplier; protected Runnable onClose; private static void incrementPrefix(Buffer prefix, int prefixLength) { @@ -199,73 +190,70 @@ public class DatabaseMapDictionaryDeep> extend } public static > DatabaseMapDictionaryDeep deepIntermediate( - LLDictionary dictionary, Buffer prefixKey, SerializerFixedBinaryLength keySuffixSerializer, + LLDictionary dictionary, BufSupplier prefixKey, SerializerFixedBinaryLength keySuffixSerializer, SubStageGetter subStageGetter, int keyExtLength, Runnable onClose) { return new DatabaseMapDictionaryDeep<>(dictionary, prefixKey, keySuffixSerializer, subStageGetter, keyExtLength, onClose); } @SuppressWarnings({"unchecked", "rawtypes"}) - protected DatabaseMapDictionaryDeep(LLDictionary dictionary, @Nullable Buffer prefixKey, + protected DatabaseMapDictionaryDeep(LLDictionary dictionary, @Nullable BufSupplier prefixKeySupplier, SerializerFixedBinaryLength keySuffixSerializer, SubStageGetter subStageGetter, int keyExtLength, Runnable onClose) { super((Drop>) (Drop) DROP); - try { + try (var prefixKey = prefixKeySupplier != null ? prefixKeySupplier.get() : null) { this.dictionary = dictionary; this.alloc = dictionary.getAllocator(); this.subStageGetter = subStageGetter; this.keySuffixSerializer = keySuffixSerializer; - assert prefixKey == null || prefixKey.isAccessible(); - this.keyPrefixLength = prefixKey == null ? 0 : prefixKey.readableBytes(); + this.keyPrefixLength = prefixKey != null ? prefixKey.readableBytes() : 0; this.keySuffixLength = keySuffixSerializer.getSerializedBinaryLength(); this.keyExtLength = keyExtLength; - this.keySuffixAndExtZeroBuffer = alloc + try (var keySuffixAndExtZeroBuffer = alloc .allocate(keySuffixLength + keyExtLength) .fill((byte) 0) .writerOffset(keySuffixLength + keyExtLength) - .makeReadOnly(); - assert keySuffixAndExtZeroBuffer.readableBytes() == keySuffixLength + keyExtLength : - "Key suffix and ext zero buffer readable length is not equal" - + " to the key suffix length + key ext length. keySuffixAndExtZeroBuffer=" - + keySuffixAndExtZeroBuffer.readableBytes() + " keySuffixLength=" + keySuffixLength + " keyExtLength=" - + keyExtLength; - assert keySuffixAndExtZeroBuffer.readableBytes() > 0; - var firstKey = prefixKey == null ? alloc.allocate(keyPrefixLength + keySuffixLength + keyExtLength) - : prefixKey.copy(); - try { - firstRangeKey(firstKey, keyPrefixLength, keySuffixAndExtZeroBuffer); - var nextRangeKey = prefixKey == null ? alloc.allocate(keyPrefixLength + keySuffixLength + keyExtLength) - : prefixKey.copy(); + .makeReadOnly()) { + assert keySuffixAndExtZeroBuffer.readableBytes() == keySuffixLength + keyExtLength : + "Key suffix and ext zero buffer readable length is not equal" + + " to the key suffix length + key ext length. keySuffixAndExtZeroBuffer=" + + keySuffixAndExtZeroBuffer.readableBytes() + " keySuffixLength=" + keySuffixLength + " keyExtLength=" + + keyExtLength; + assert keySuffixAndExtZeroBuffer.readableBytes() > 0; + var firstKey = prefixKey != null ? prefixKeySupplier.get() + : alloc.allocate(keyPrefixLength + keySuffixLength + keyExtLength); try { - nextRangeKey(nextRangeKey, keyPrefixLength, keySuffixAndExtZeroBuffer); - assert prefixKey == null || prefixKey.isAccessible(); - assert keyPrefixLength == 0 || !LLUtils.equals(firstKey, nextRangeKey); - if (keyPrefixLength == 0) { - this.rangeSupplier = RangeSupplier.ofOwned(LLRange.all()); - firstKey.close(); + firstRangeKey(firstKey, keyPrefixLength, keySuffixAndExtZeroBuffer); + var nextRangeKey = prefixKey != null ? prefixKeySupplier.get() + : alloc.allocate(keyPrefixLength + keySuffixLength + keyExtLength); + try { + nextRangeKey(nextRangeKey, keyPrefixLength, keySuffixAndExtZeroBuffer); + assert prefixKey == null || prefixKey.isAccessible(); + assert keyPrefixLength == 0 || !LLUtils.equals(firstKey, nextRangeKey); + if (keyPrefixLength == 0) { + this.rangeSupplier = RangeSupplier.ofOwned(LLRange.all()); + firstKey.close(); + nextRangeKey.close(); + } else { + this.rangeSupplier = RangeSupplier.ofOwned(LLRange.ofUnsafe(firstKey, nextRangeKey)); + } + this.rangeMono = Mono.fromSupplier(rangeSupplier); + assert subStageKeysConsistency(keyPrefixLength + keySuffixLength + keyExtLength); + } catch (Throwable t) { nextRangeKey.close(); - } else { - this.rangeSupplier = RangeSupplier.ofOwned(LLRange.ofUnsafe(firstKey, nextRangeKey)); + throw t; } - this.rangeMono = Mono.fromSupplier(rangeSupplier); - assert subStageKeysConsistency(keyPrefixLength + keySuffixLength + keyExtLength); } catch (Throwable t) { - nextRangeKey.close(); + firstKey.close(); throw t; } - } catch (Throwable t) { - firstKey.close(); - throw t; - } - this.keyPrefix = prefixKey; - this.onClose = onClose; - } catch (Throwable t) { - if (this.keySuffixAndExtZeroBuffer != null && keySuffixAndExtZeroBuffer.isAccessible()) { - keySuffixAndExtZeroBuffer.close(); + this.keyPrefixSupplier = prefixKeySupplier; + this.onClose = onClose; } - if (prefixKey != null && prefixKey.isAccessible()) { - prefixKey.close(); + } catch (Throwable t) { + if (prefixKeySupplier != null) { + prefixKeySupplier.close(); } throw t; } @@ -281,8 +269,7 @@ public class DatabaseMapDictionaryDeep> extend int keyExtLength, Mono rangeMono, RangeSupplier rangeSupplier, - Buffer keyPrefix, - Buffer keySuffixAndExtZeroBuffer, + BufSupplier keyPrefixSupplier, Runnable onClose) { super((Drop>) (Drop) DROP); this.dictionary = dictionary; @@ -295,8 +282,7 @@ public class DatabaseMapDictionaryDeep> extend this.rangeMono = rangeMono; this.rangeSupplier = rangeSupplier; - this.keyPrefix = keyPrefix; - this.keySuffixAndExtZeroBuffer = keySuffixAndExtZeroBuffer; + this.keyPrefixSupplier = keyPrefixSupplier; this.onClose = onClose; } @@ -349,15 +335,18 @@ public class DatabaseMapDictionaryDeep> extend @Override public Mono at(@Nullable CompositeSnapshot snapshot, T keySuffix) { var suffixKeyWithoutExt = Mono.fromCallable(() -> { - try (var keyWithoutExtBuf = keyPrefix == null - ? alloc.allocate(keySuffixLength + keyExtLength) : keyPrefix.copy()) { + var keyWithoutExtBuf = keyPrefixSupplier == null + ? alloc.allocate(keySuffixLength + keyExtLength) : keyPrefixSupplier.get(); + try { keyWithoutExtBuf.ensureWritable(keySuffixLength + keyExtLength); serializeSuffix(keySuffix, keyWithoutExtBuf); - return keyWithoutExtBuf.send(); + } catch (Throwable ex) { + keyWithoutExtBuf.close(); + throw ex; } + return keyWithoutExtBuf; }); - return this.subStageGetter - .subStage(dictionary, snapshot, suffixKeyWithoutExt); + return this.subStageGetter.subStage(dictionary, snapshot, suffixKeyWithoutExt); } @Override @@ -374,21 +363,17 @@ public class DatabaseMapDictionaryDeep> extend public Flux> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange) { return dictionary .getRangeKeyPrefixes(resolveSnapshot(snapshot), rangeMono, keyPrefixLength + keySuffixLength, smallRange) - .flatMapSequential(groupKeyWithoutExtSend_ -> Mono.using( - () -> groupKeyWithoutExtSend_, - groupKeyWithoutExtSend -> this.subStageGetter - .subStage(dictionary, snapshot, Mono.fromCallable(() -> groupKeyWithoutExtSend.copy().send())) - .handle((us, sink) -> { - T deserializedSuffix; - try (var splittedGroupSuffix = splitGroupSuffix(groupKeyWithoutExtSend)) { - deserializedSuffix = this.deserializeSuffix(splittedGroupSuffix); - sink.next(new SubStageEntry<>(deserializedSuffix, us)); - } catch (SerializationException ex) { - sink.error(ex); - } - }), - Resource::close - )); + .flatMapSequential(groupKeyWithoutExt -> this.subStageGetter + .subStage(dictionary, snapshot, Mono.fromCallable(groupKeyWithoutExt::copy)) + .map(us -> { + T deserializedSuffix; + try (var splittedGroupSuffix = splitGroupSuffix(groupKeyWithoutExt)) { + deserializedSuffix = this.deserializeSuffix(splittedGroupSuffix); + return new SubStageEntry<>(deserializedSuffix, us); + } + }) + .doFinally(s -> groupKeyWithoutExt.close()) + ); } /** @@ -447,7 +432,6 @@ public class DatabaseMapDictionaryDeep> extend protected T deserializeSuffix(@NotNull Buffer keySuffix) throws SerializationException { assert suffixKeyLengthConsistency(keySuffix.readableBytes()); var result = keySuffixSerializer.deserialize(keySuffix); - assert keyPrefix == null || keyPrefix.isAccessible(); return result; } @@ -457,7 +441,6 @@ public class DatabaseMapDictionaryDeep> extend keySuffixSerializer.serialize(keySuffix, output); var afterWriterOffset = output.writerOffset(); assert suffixKeyLengthConsistency(afterWriterOffset - beforeWriterOffset); - assert keyPrefix == null || keyPrefix.isAccessible(); } @Override @@ -467,8 +450,7 @@ public class DatabaseMapDictionaryDeep> extend @Override protected Owned> prepareSend() { - var keyPrefix = this.keyPrefix.send(); - var keySuffixAndExtZeroBuffer = this.keySuffixAndExtZeroBuffer.send(); + var keyPrefixSupplier = this.keyPrefixSupplier; var rangeSupplier = this.rangeSupplier; var onClose = this.onClose; return drop -> { @@ -481,8 +463,7 @@ public class DatabaseMapDictionaryDeep> extend keyExtLength, rangeMono, rangeSupplier, - keyPrefix.receive(), - keySuffixAndExtZeroBuffer.receive(), + keyPrefixSupplier, onClose ); drop.attach(instance); @@ -492,8 +473,7 @@ public class DatabaseMapDictionaryDeep> extend @Override protected void makeInaccessible() { - this.keyPrefix = null; - this.keySuffixAndExtZeroBuffer = null; + this.keyPrefixSupplier = null; this.rangeSupplier = null; this.onClose = null; } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java index 1c104d6..7454c77 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java @@ -7,6 +7,7 @@ import io.netty5.buffer.api.Owned; import io.netty5.buffer.api.Send; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.database.BufSupplier; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLUtils; import io.netty5.buffer.api.internal.ResourceSupport; @@ -67,7 +68,7 @@ public class DatabaseMapDictionaryHashed extends @SuppressWarnings({"unchecked", "rawtypes"}) protected DatabaseMapDictionaryHashed(LLDictionary dictionary, - @Nullable Buffer prefixKey, + @Nullable BufSupplier prefixKeySupplier, Serializer keySuffixSerializer, Serializer valueSerializer, Function keySuffixHashFunction, @@ -82,7 +83,7 @@ public class DatabaseMapDictionaryHashed extends = new ValueWithHashSerializer<>(keySuffixSerializer, valueSerializer); ValuesSetSerializer> valuesSetSerializer = new ValuesSetSerializer<>(valueWithHashSerializer); - this.subDictionary = DatabaseMapDictionary.tail(dictionary, prefixKey, keySuffixHashSerializer, + this.subDictionary = DatabaseMapDictionary.tail(dictionary, prefixKeySupplier, keySuffixHashSerializer, valuesSetSerializer, onClose); this.keySuffixHashFunction = keySuffixHashFunction; } @@ -117,14 +118,14 @@ public class DatabaseMapDictionaryHashed extends } public static DatabaseMapDictionaryHashed tail(LLDictionary dictionary, - @Nullable Buffer prefixKey, + @Nullable BufSupplier prefixKeySupplier, Serializer keySuffixSerializer, Serializer valueSerializer, Function keySuffixHashFunction, SerializerFixedBinaryLength keySuffixHashSerializer, Runnable onClose) { return new DatabaseMapDictionaryHashed<>(dictionary, - prefixKey, + prefixKeySupplier, keySuffixSerializer, valueSerializer, keySuffixHashFunction, diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java index 117a290..90bc21d 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java @@ -80,13 +80,9 @@ public class DatabaseMapSingle extends ResourceSupport, Data } } - private void deserializeValue(Buffer value, SynchronousSink sink) { + private U deserializeValue(Buffer value) { try { - U deserializedValue; - try (value) { - deserializedValue = serializer.deserialize(value); - } - sink.next(deserializedValue); + return serializer.deserialize(value); } catch (IndexOutOfBoundsException ex) { var exMessage = ex.getMessage(); if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) { @@ -94,12 +90,12 @@ public class DatabaseMapSingle extends ResourceSupport, Data LOG.error("Unexpected zero-bytes value at " + dictionary.getDatabaseName() + ":" + dictionary.getColumnName() + ":" + LLUtils.toStringSafe(key)); } - sink.complete(); + return null; } else { - sink.error(ex); + throw ex; } } catch (SerializationException ex) { - sink.error(ex); + throw ex; } } @@ -118,22 +114,24 @@ public class DatabaseMapSingle extends ResourceSupport, Data @Override public Mono get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { - return dictionary - .get(resolveSnapshot(snapshot), keyMono) - .handle(this::deserializeValue); + return Mono.usingWhen(dictionary.get(resolveSnapshot(snapshot), keyMono), + buf -> Mono.fromSupplier(() -> deserializeValue(buf)), + buf -> Mono.fromRunnable(buf::close) + ); } @Override public Mono setAndGetPrevious(U value) { - return dictionary - .put(keyMono, Mono.fromCallable(() -> serializeValue(value)), LLDictionaryResultType.PREVIOUS_VALUE) - .handle(this::deserializeValue); + return Mono.usingWhen(dictionary + .put(keyMono, Mono.fromCallable(() -> serializeValue(value)), LLDictionaryResultType.PREVIOUS_VALUE), + buf -> Mono.fromSupplier(() -> deserializeValue(buf)), + buf -> Mono.fromRunnable(buf::close)); } @Override public Mono update(SerializationFunction<@Nullable U, @Nullable U> updater, UpdateReturnMode updateReturnMode) { - return dictionary + var resultMono = dictionary .update(keyMono, (oldValueSer) -> { U result; if (oldValueSer == null) { @@ -147,8 +145,11 @@ public class DatabaseMapSingle extends ResourceSupport, Data } else { return serializeValue(result); } - }, updateReturnMode) - .handle(this::deserializeValue); + }, updateReturnMode); + return Mono.usingWhen(resultMono, + result -> Mono.fromSupplier(() -> deserializeValue(result)), + result -> Mono.fromRunnable(result::close) + ); } @Override @@ -172,9 +173,10 @@ public class DatabaseMapSingle extends ResourceSupport, Data @Override public Mono clearAndGetPrevious() { - return dictionary - .remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE) - .handle(this::deserializeValue); + return Mono.usingWhen(dictionary.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE), + result -> Mono.fromSupplier(() -> deserializeValue(result)), + result -> Mono.fromRunnable(result::close) + ); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionary.java index 850454d..17e3a2b 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionary.java @@ -4,6 +4,7 @@ import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Drop; import io.netty5.buffer.api.Send; import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.database.BufSupplier; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing; @@ -19,10 +20,15 @@ import reactor.core.publisher.Mono; public class DatabaseSetDictionary extends DatabaseMapDictionary { protected DatabaseSetDictionary(LLDictionary dictionary, - Buffer prefixKey, + BufSupplier prefixKeySupplier, SerializerFixedBinaryLength keySuffixSerializer, Runnable onClose) { - super(dictionary, prefixKey, keySuffixSerializer, DatabaseEmpty.nothingSerializer(dictionary.getAllocator()), onClose); + super(dictionary, + prefixKeySupplier, + keySuffixSerializer, + DatabaseEmpty.nothingSerializer(dictionary.getAllocator()), + onClose + ); } public static DatabaseSetDictionary simple(LLDictionary dictionary, @@ -32,10 +38,10 @@ public class DatabaseSetDictionary extends DatabaseMapDictionary } public static DatabaseSetDictionary tail(LLDictionary dictionary, - Buffer prefixKey, + BufSupplier prefixKeySupplier, SerializerFixedBinaryLength keySuffixSerializer, Runnable onClose) { - return new DatabaseSetDictionary<>(dictionary, prefixKey, keySuffixSerializer, onClose); + return new DatabaseSetDictionary<>(dictionary, prefixKeySupplier, keySuffixSerializer, onClose); } public Mono> getKeySet(@Nullable CompositeSnapshot snapshot) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionaryHashed.java index c0fc89c..1bac2d1 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionaryHashed.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionaryHashed.java @@ -4,6 +4,7 @@ import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Drop; import io.netty5.buffer.api.Send; import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.database.BufSupplier; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing; @@ -22,13 +23,13 @@ import reactor.core.publisher.Mono; public class DatabaseSetDictionaryHashed extends DatabaseMapDictionaryHashed { protected DatabaseSetDictionaryHashed(LLDictionary dictionary, - @Nullable Buffer prefixKey, + @Nullable BufSupplier prefixKeySupplier, Serializer keySuffixSerializer, Function keySuffixHashFunction, SerializerFixedBinaryLength keySuffixHashSerializer, Runnable onClose) { super(dictionary, - prefixKey, + prefixKeySupplier, keySuffixSerializer, DatabaseEmpty.nothingSerializer(dictionary.getAllocator()), keySuffixHashFunction, @@ -52,13 +53,13 @@ public class DatabaseSetDictionaryHashed extends DatabaseMapDictionaryHas } public static DatabaseSetDictionaryHashed tail(LLDictionary dictionary, - @Nullable Buffer prefixKey, + @Nullable BufSupplier prefixKeySupplier, Serializer keySuffixSerializer, Function keyHashFunction, SerializerFixedBinaryLength keyHashSerializer, Runnable onClose) { return new DatabaseSetDictionaryHashed<>(dictionary, - prefixKey, + prefixKeySupplier, keySuffixSerializer, keyHashFunction, keyHashSerializer, diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetter.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetter.java index d4b502b..c58a095 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetter.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetter.java @@ -11,6 +11,6 @@ public interface SubStageGetter> { Mono subStage(LLDictionary dictionary, @Nullable CompositeSnapshot snapshot, - Mono> prefixKey); + Mono prefixKey); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java index ab6512b..bc994c6 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java @@ -4,6 +4,7 @@ import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Resource; import io.netty5.buffer.api.Send; import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.database.BufSupplier; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.serialization.Serializer; @@ -36,18 +37,15 @@ public class SubStageGetterHashMap implements @Override public Mono> subStage(LLDictionary dictionary, @Nullable CompositeSnapshot snapshot, - Mono> prefixKeyMono) { - return prefixKeyMono.map(prefixKeyToReceive -> { - var prefixKey = prefixKeyToReceive.receive(); - return DatabaseMapDictionaryHashed.tail(dictionary, - prefixKey, - keySerializer, - valueSerializer, - keyHashFunction, - keyHashSerializer, - null - ); - }); + Mono prefixKeyMono) { + return prefixKeyMono.map(prefixKey -> DatabaseMapDictionaryHashed.tail(dictionary, + BufSupplier.ofOwned(prefixKey), + keySerializer, + valueSerializer, + keyHashFunction, + keyHashSerializer, + null + )); } public int getKeyHashBinaryLength() { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java index d1fc9ad..d82b375 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java @@ -4,6 +4,7 @@ import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Resource; import io.netty5.buffer.api.Send; import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.database.BufSupplier; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing; @@ -34,17 +35,14 @@ public class SubStageGetterHashSet implements @Override public Mono> subStage(LLDictionary dictionary, @Nullable CompositeSnapshot snapshot, - Mono> prefixKeyMono) { - return prefixKeyMono.map(prefixKeyToReceive -> { - var prefixKey = prefixKeyToReceive.receive(); - return DatabaseSetDictionaryHashed.tail(dictionary, - prefixKey, - keySerializer, - keyHashFunction, - keyHashSerializer, - null - ); - }); + Mono prefixKeyMono) { + return prefixKeyMono.map(prefixKey -> DatabaseSetDictionaryHashed.tail(dictionary, + BufSupplier.ofOwned(prefixKey), + keySerializer, + keyHashFunction, + keyHashSerializer, + null + )); } public int getKeyHashBinaryLength() { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java index c0400d0..95f89a2 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java @@ -1,15 +1,12 @@ package it.cavallium.dbengine.database.collections; import io.netty5.buffer.api.Buffer; -import io.netty5.buffer.api.Resource; -import io.netty5.buffer.api.Send; import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.database.BufSupplier; import it.cavallium.dbengine.database.LLDictionary; -import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap; -import java.util.Map; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; @@ -28,11 +25,13 @@ public class SubStageGetterMap implements @Override public Mono> subStage(LLDictionary dictionary, @Nullable CompositeSnapshot snapshot, - Mono> prefixKeyMono) { - return prefixKeyMono.map(prefixKeyToReceive -> { - var prefixKey = prefixKeyToReceive.receive(); - return DatabaseMapDictionary.tail(dictionary, prefixKey, keySerializer, valueSerializer, null); - }); + Mono prefixKeyMono) { + return prefixKeyMono.map(prefixKey -> DatabaseMapDictionary.tail(dictionary, + BufSupplier.ofOwned(prefixKey), + keySerializer, + valueSerializer, + null + )); } public int getKeyBinaryLength() { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java index 5a80b17..a4ac9ac 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java @@ -4,6 +4,7 @@ import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Resource; import io.netty5.buffer.api.Send; import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.database.BufSupplier; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; @@ -42,17 +43,14 @@ public class SubStageGetterMapDeep> implements @Override public Mono> subStage(LLDictionary dictionary, @Nullable CompositeSnapshot snapshot, - Mono> prefixKeyMono) { - return prefixKeyMono.map(prefixKeyToReceive -> { - var prefixKey = prefixKeyToReceive.receive(); - return DatabaseMapDictionaryDeep.deepIntermediate(dictionary, - prefixKey, - keySerializer, - subStageGetter, - keyExtLength, - null - ); - }); + Mono prefixKeyMono) { + return prefixKeyMono.map(prefixKey -> DatabaseMapDictionaryDeep.deepIntermediate(dictionary, + BufSupplier.ofOwned(prefixKey), + keySerializer, + subStageGetter, + keyExtLength, + null + )); } public int getKeyBinaryLength() { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java index a45bfc0..6ea7783 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java @@ -4,6 +4,7 @@ import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Resource; import io.netty5.buffer.api.Send; import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.database.BufSupplier; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; @@ -24,11 +25,12 @@ public class SubStageGetterSet implements @Override public Mono> subStage(LLDictionary dictionary, @Nullable CompositeSnapshot snapshot, - Mono> prefixKeyMono) { - return prefixKeyMono.map(prefixKeyToReceive -> { - var prefixKey = prefixKeyToReceive.receive(); - return DatabaseSetDictionary.tail(dictionary, prefixKey, keySerializer, null); - }); + Mono prefixKeyMono) { + return prefixKeyMono.map(prefixKey -> DatabaseSetDictionary.tail(dictionary, + BufSupplier.ofOwned(prefixKey), + keySerializer, + null + )); } public int getKeyBinaryLength() { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java index 7aeaf56..889eb3f 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java @@ -20,8 +20,12 @@ public class SubStageGetterSingle implements SubStageGetter> subStage(LLDictionary dictionary, @Nullable CompositeSnapshot snapshot, - Mono> keyPrefixMono) { - return keyPrefixMono.map(keyPrefix -> new DatabaseMapSingle<>(dictionary, BufSupplier.of(keyPrefix), serializer, null)); + Mono keyPrefixMono) { + return keyPrefixMono.map(keyPrefix -> new DatabaseMapSingle<>(dictionary, + BufSupplier.ofOwned(keyPrefix), + serializer, + null + )); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index eafea4a..64f1934 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -60,6 +60,7 @@ import org.rocksdb.Slice; import org.rocksdb.Snapshot; import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; +import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; @@ -772,13 +773,7 @@ public class LLLocalDictionary implements LLDictionary { } private Flux getRangeSingle(LLSnapshot snapshot, Mono keyMono) { - return Mono - .zip(keyMono, this.get(snapshot, keyMono)) - .map(result -> LLEntry.of( - result.getT1().touch("get-range-single key"), - result.getT2().touch("get-range-single value") - )) - .flux(); + return Mono.zip(keyMono, this.get(snapshot, keyMono), LLEntry::of).flux(); } private Flux getRangeMulti(LLSnapshot snapshot,