diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index 3fdd5af..03e97a4 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.collections; import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.BufferAllocator; +import io.netty5.buffer.api.DefaultBufferAllocators; import io.netty5.buffer.api.Drop; import io.netty5.buffer.api.Owned; import io.netty5.buffer.api.Resource; @@ -53,6 +54,13 @@ public class DatabaseMapDictionaryDeep> extend } catch (Throwable ex) { LOG.error("Failed to close keyPrefix", ex); } + try { + if (obj.keySuffixAndExtZeroBuffer != null) { + obj.keySuffixAndExtZeroBuffer.close(); + } + } catch (Throwable ex) { + LOG.error("Failed to close keySuffixAndExtZeroBuffer", ex); + } try { if (obj.onClose != null) { obj.onClose.run(); @@ -85,6 +93,7 @@ public class DatabaseMapDictionaryDeep> extend protected LLRange range; protected Buffer keyPrefix; + protected Buffer keySuffixAndExtZeroBuffer; protected Runnable onClose; private static void incrementPrefix(Buffer prefix, int prefixLength) { @@ -119,26 +128,48 @@ public class DatabaseMapDictionaryDeep> extend } } - static void firstRangeKey(Buffer prefixKey, int prefixLength, int suffixLength, int extLength) { - zeroFillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength); + static void firstRangeKey(Buffer prefixKey, int prefixLength, Buffer suffixAndExtZeroes) { + zeroFillKeySuffixAndExt(prefixKey, prefixLength, suffixAndExtZeroes); } - static void nextRangeKey(Buffer prefixKey, int prefixLength, int suffixLength, int extLength) { - zeroFillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength); + static void nextRangeKey(Buffer prefixKey, int prefixLength, Buffer suffixAndExtZeroes) { + zeroFillKeySuffixAndExt(prefixKey, prefixLength, suffixAndExtZeroes); incrementPrefix(prefixKey, prefixLength); } + @Deprecated + static void firstRangeKey(Buffer prefixKey, int prefixLength, int suffixLength, int extLength) { + try (var zeroBuf = DefaultBufferAllocators.offHeapAllocator().allocate(suffixLength + extLength)) { + zeroBuf.fill((byte) 0); + zeroBuf.writerOffset(suffixLength + extLength); + zeroFillKeySuffixAndExt(prefixKey, prefixLength, zeroBuf); + } + } + + @Deprecated + static void nextRangeKey(Buffer prefixKey, int prefixLength, int suffixLength, int extLength) { + try (var zeroBuf = DefaultBufferAllocators.offHeapAllocator().allocate(suffixLength + extLength)) { + zeroBuf.fill((byte) 0); + zeroBuf.writerOffset(suffixLength + extLength); + zeroFillKeySuffixAndExt(prefixKey, prefixLength, zeroBuf); + incrementPrefix(prefixKey, prefixLength); + } + } + protected static void zeroFillKeySuffixAndExt(@NotNull Buffer prefixKey, - int prefixLength, int suffixLength, int extLength) { + int prefixLength, Buffer suffixAndExtZeroes) { //noinspection UnnecessaryLocalVariable var result = prefixKey; + var suffixLengthAndExtLength = suffixAndExtZeroes.readableBytes(); assert result.readableBytes() == prefixLength; - assert suffixLength > 0; - assert extLength >= 0; - result.ensureWritable(suffixLength + extLength, suffixLength + extLength, true); - for (int i = 0; i < suffixLength + extLength; i++) { - result.writeByte((byte) 0x0); - } + assert suffixLengthAndExtLength > 0 : "Suffix length + ext length is < 0: " + suffixLengthAndExtLength; + prefixKey.ensureWritable(suffixLengthAndExtLength); + suffixAndExtZeroes.copyInto(suffixAndExtZeroes.readerOffset(), + prefixKey, + prefixKey.writerOffset(), + suffixLengthAndExtLength + ); + prefixKey.skipWritable(suffixLengthAndExtLength); } /** @@ -180,14 +211,25 @@ public class DatabaseMapDictionaryDeep> extend this.keyPrefixLength = prefixKey == null ? 0 : prefixKey.readableBytes(); this.keySuffixLength = keySuffixSerializer.getSerializedBinaryLength(); this.keyExtLength = keyExtLength; + this.keySuffixAndExtZeroBuffer = alloc + .allocate(keySuffixLength + keyExtLength) + .fill((byte) 0) + .writerOffset(keySuffixLength + keyExtLength) + .makeReadOnly(); + assert keySuffixAndExtZeroBuffer.readableBytes() == keySuffixLength + keyExtLength : + "Key suffix and ext zero buffer readable length is not equal" + + " to the key suffix length + key ext length. keySuffixAndExtZeroBuffer=" + + keySuffixAndExtZeroBuffer.readableBytes() + " keySuffixLength=" + keySuffixLength + " keyExtLength=" + + keyExtLength; + assert keySuffixAndExtZeroBuffer.readableBytes() > 0; var firstKey = prefixKey == null ? alloc.allocate(keyPrefixLength + keySuffixLength + keyExtLength) : prefixKey.copy(); try { - firstRangeKey(firstKey, keyPrefixLength, keySuffixLength, keyExtLength); + firstRangeKey(firstKey, keyPrefixLength, keySuffixAndExtZeroBuffer); var nextRangeKey = prefixKey == null ? alloc.allocate(keyPrefixLength + keySuffixLength + keyExtLength) : prefixKey.copy(); try { - nextRangeKey(nextRangeKey, keyPrefixLength, keySuffixLength, keyExtLength); + nextRangeKey(nextRangeKey, keyPrefixLength, keySuffixAndExtZeroBuffer); assert prefixKey == null || prefixKey.isAccessible(); assert keyPrefixLength == 0 || !LLUtils.equals(firstKey, nextRangeKey); if (keyPrefixLength == 0) { @@ -211,6 +253,9 @@ public class DatabaseMapDictionaryDeep> extend this.keyPrefix = prefixKey; this.onClose = onClose; } catch (Throwable t) { + if (this.keySuffixAndExtZeroBuffer != null && keySuffixAndExtZeroBuffer.isAccessible()) { + keySuffixAndExtZeroBuffer.close(); + } if (prefixKey != null && prefixKey.isAccessible()) { prefixKey.close(); } @@ -229,6 +274,7 @@ public class DatabaseMapDictionaryDeep> extend Mono> rangeMono, Send range, Send keyPrefix, + Send keySuffixAndExtZeroBuffer, Runnable onClose) { super((Drop>) (Drop) DROP); this.dictionary = dictionary; @@ -242,6 +288,7 @@ public class DatabaseMapDictionaryDeep> extend this.range = range.receive(); this.keyPrefix = keyPrefix.receive(); + this.keySuffixAndExtZeroBuffer = keySuffixAndExtZeroBuffer.receive(); this.onClose = onClose; } @@ -323,7 +370,7 @@ public class DatabaseMapDictionaryDeep> extend groupKeyWithoutExtSend_::receive, groupKeyWithoutExtSend -> this.subStageGetter .subStage(dictionary, snapshot, Mono.fromCallable(() -> groupKeyWithoutExtSend.copy().send())) - .>handle((us, sink) -> { + .handle((us, sink) -> { T deserializedSuffix; try (var splittedGroupSuffix = splitGroupSuffix(groupKeyWithoutExtSend)) { deserializedSuffix = this.deserializeSuffix(splittedGroupSuffix); @@ -413,7 +460,8 @@ public class DatabaseMapDictionaryDeep> extend @Override protected Owned> prepareSend() { - var keyPrefix = this.keyPrefix == null ? null : this.keyPrefix.send(); + var keyPrefix = this.keyPrefix.send(); + var keySuffixAndExtZeroBuffer = this.keySuffixAndExtZeroBuffer.send(); var range = this.range.send(); var onClose = this.onClose; return drop -> { @@ -427,6 +475,7 @@ public class DatabaseMapDictionaryDeep> extend rangeMono, range, keyPrefix, + keySuffixAndExtZeroBuffer, onClose ); drop.attach(instance); @@ -437,6 +486,7 @@ public class DatabaseMapDictionaryDeep> extend @Override protected void makeInaccessible() { this.keyPrefix = null; + this.keySuffixAndExtZeroBuffer = null; this.range = null; this.onClose = null; } diff --git a/src/test/java/it/cavallium/dbengine/database/collections/TestRanges.java b/src/test/java/it/cavallium/dbengine/database/collections/TestRanges.java index a2c93ee..7c0ed25 100644 --- a/src/test/java/it/cavallium/dbengine/database/collections/TestRanges.java +++ b/src/test/java/it/cavallium/dbengine/database/collections/TestRanges.java @@ -1,5 +1,7 @@ package it.cavallium.dbengine.database.collections; +import static io.netty5.buffer.Unpooled.wrappedBuffer; + import io.netty5.buffer.Unpooled; import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.BufferAllocator; @@ -9,7 +11,6 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import static io.netty5.buffer.Unpooled.*; public class TestRanges { @@ -63,7 +64,10 @@ public class TestRanges { } if (Arrays.equals(prefixKey, new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF})) { - Assertions.assertArrayEquals(new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, 0}, nextRangeKey); + org.assertj.core.api.Assertions + .assertThat(nextRangeKey) + .isEqualTo(new byte[]{(byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, + (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, 0}); } else { long biPrefix = 0; var s = 0;