Improve suffix performance
This commit is contained in:
parent
ba3765eece
commit
59c37c0fc9
@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import io.netty5.buffer.api.Buffer;
|
||||
import io.netty5.buffer.api.BufferAllocator;
|
||||
import io.netty5.buffer.api.DefaultBufferAllocators;
|
||||
import io.netty5.buffer.api.Drop;
|
||||
import io.netty5.buffer.api.Owned;
|
||||
import io.netty5.buffer.api.Resource;
|
||||
@ -53,6 +54,13 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
|
||||
} catch (Throwable ex) {
|
||||
LOG.error("Failed to close keyPrefix", ex);
|
||||
}
|
||||
try {
|
||||
if (obj.keySuffixAndExtZeroBuffer != null) {
|
||||
obj.keySuffixAndExtZeroBuffer.close();
|
||||
}
|
||||
} catch (Throwable ex) {
|
||||
LOG.error("Failed to close keySuffixAndExtZeroBuffer", ex);
|
||||
}
|
||||
try {
|
||||
if (obj.onClose != null) {
|
||||
obj.onClose.run();
|
||||
@ -85,6 +93,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
|
||||
|
||||
protected LLRange range;
|
||||
protected Buffer keyPrefix;
|
||||
protected Buffer keySuffixAndExtZeroBuffer;
|
||||
protected Runnable onClose;
|
||||
|
||||
private static void incrementPrefix(Buffer prefix, int prefixLength) {
|
||||
@ -119,26 +128,48 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
|
||||
}
|
||||
}
|
||||
|
||||
static void firstRangeKey(Buffer prefixKey, int prefixLength, int suffixLength, int extLength) {
|
||||
zeroFillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength);
|
||||
static void firstRangeKey(Buffer prefixKey, int prefixLength, Buffer suffixAndExtZeroes) {
|
||||
zeroFillKeySuffixAndExt(prefixKey, prefixLength, suffixAndExtZeroes);
|
||||
}
|
||||
|
||||
static void nextRangeKey(Buffer prefixKey, int prefixLength, int suffixLength, int extLength) {
|
||||
zeroFillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength);
|
||||
static void nextRangeKey(Buffer prefixKey, int prefixLength, Buffer suffixAndExtZeroes) {
|
||||
zeroFillKeySuffixAndExt(prefixKey, prefixLength, suffixAndExtZeroes);
|
||||
incrementPrefix(prefixKey, prefixLength);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
static void firstRangeKey(Buffer prefixKey, int prefixLength, int suffixLength, int extLength) {
|
||||
try (var zeroBuf = DefaultBufferAllocators.offHeapAllocator().allocate(suffixLength + extLength)) {
|
||||
zeroBuf.fill((byte) 0);
|
||||
zeroBuf.writerOffset(suffixLength + extLength);
|
||||
zeroFillKeySuffixAndExt(prefixKey, prefixLength, zeroBuf);
|
||||
}
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
static void nextRangeKey(Buffer prefixKey, int prefixLength, int suffixLength, int extLength) {
|
||||
try (var zeroBuf = DefaultBufferAllocators.offHeapAllocator().allocate(suffixLength + extLength)) {
|
||||
zeroBuf.fill((byte) 0);
|
||||
zeroBuf.writerOffset(suffixLength + extLength);
|
||||
zeroFillKeySuffixAndExt(prefixKey, prefixLength, zeroBuf);
|
||||
incrementPrefix(prefixKey, prefixLength);
|
||||
}
|
||||
}
|
||||
|
||||
protected static void zeroFillKeySuffixAndExt(@NotNull Buffer prefixKey,
|
||||
int prefixLength, int suffixLength, int extLength) {
|
||||
int prefixLength, Buffer suffixAndExtZeroes) {
|
||||
//noinspection UnnecessaryLocalVariable
|
||||
var result = prefixKey;
|
||||
var suffixLengthAndExtLength = suffixAndExtZeroes.readableBytes();
|
||||
assert result.readableBytes() == prefixLength;
|
||||
assert suffixLength > 0;
|
||||
assert extLength >= 0;
|
||||
result.ensureWritable(suffixLength + extLength, suffixLength + extLength, true);
|
||||
for (int i = 0; i < suffixLength + extLength; i++) {
|
||||
result.writeByte((byte) 0x0);
|
||||
}
|
||||
assert suffixLengthAndExtLength > 0 : "Suffix length + ext length is < 0: " + suffixLengthAndExtLength;
|
||||
prefixKey.ensureWritable(suffixLengthAndExtLength);
|
||||
suffixAndExtZeroes.copyInto(suffixAndExtZeroes.readerOffset(),
|
||||
prefixKey,
|
||||
prefixKey.writerOffset(),
|
||||
suffixLengthAndExtLength
|
||||
);
|
||||
prefixKey.skipWritable(suffixLengthAndExtLength);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -180,14 +211,25 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
|
||||
this.keyPrefixLength = prefixKey == null ? 0 : prefixKey.readableBytes();
|
||||
this.keySuffixLength = keySuffixSerializer.getSerializedBinaryLength();
|
||||
this.keyExtLength = keyExtLength;
|
||||
this.keySuffixAndExtZeroBuffer = alloc
|
||||
.allocate(keySuffixLength + keyExtLength)
|
||||
.fill((byte) 0)
|
||||
.writerOffset(keySuffixLength + keyExtLength)
|
||||
.makeReadOnly();
|
||||
assert keySuffixAndExtZeroBuffer.readableBytes() == keySuffixLength + keyExtLength :
|
||||
"Key suffix and ext zero buffer readable length is not equal"
|
||||
+ " to the key suffix length + key ext length. keySuffixAndExtZeroBuffer="
|
||||
+ keySuffixAndExtZeroBuffer.readableBytes() + " keySuffixLength=" + keySuffixLength + " keyExtLength="
|
||||
+ keyExtLength;
|
||||
assert keySuffixAndExtZeroBuffer.readableBytes() > 0;
|
||||
var firstKey = prefixKey == null ? alloc.allocate(keyPrefixLength + keySuffixLength + keyExtLength)
|
||||
: prefixKey.copy();
|
||||
try {
|
||||
firstRangeKey(firstKey, keyPrefixLength, keySuffixLength, keyExtLength);
|
||||
firstRangeKey(firstKey, keyPrefixLength, keySuffixAndExtZeroBuffer);
|
||||
var nextRangeKey = prefixKey == null ? alloc.allocate(keyPrefixLength + keySuffixLength + keyExtLength)
|
||||
: prefixKey.copy();
|
||||
try {
|
||||
nextRangeKey(nextRangeKey, keyPrefixLength, keySuffixLength, keyExtLength);
|
||||
nextRangeKey(nextRangeKey, keyPrefixLength, keySuffixAndExtZeroBuffer);
|
||||
assert prefixKey == null || prefixKey.isAccessible();
|
||||
assert keyPrefixLength == 0 || !LLUtils.equals(firstKey, nextRangeKey);
|
||||
if (keyPrefixLength == 0) {
|
||||
@ -211,6 +253,9 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
|
||||
this.keyPrefix = prefixKey;
|
||||
this.onClose = onClose;
|
||||
} catch (Throwable t) {
|
||||
if (this.keySuffixAndExtZeroBuffer != null && keySuffixAndExtZeroBuffer.isAccessible()) {
|
||||
keySuffixAndExtZeroBuffer.close();
|
||||
}
|
||||
if (prefixKey != null && prefixKey.isAccessible()) {
|
||||
prefixKey.close();
|
||||
}
|
||||
@ -229,6 +274,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
|
||||
Mono<Send<LLRange>> rangeMono,
|
||||
Send<LLRange> range,
|
||||
Send<Buffer> keyPrefix,
|
||||
Send<Buffer> keySuffixAndExtZeroBuffer,
|
||||
Runnable onClose) {
|
||||
super((Drop<DatabaseMapDictionaryDeep<T,U,US>>) (Drop) DROP);
|
||||
this.dictionary = dictionary;
|
||||
@ -242,6 +288,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
|
||||
|
||||
this.range = range.receive();
|
||||
this.keyPrefix = keyPrefix.receive();
|
||||
this.keySuffixAndExtZeroBuffer = keySuffixAndExtZeroBuffer.receive();
|
||||
this.onClose = onClose;
|
||||
}
|
||||
|
||||
@ -323,7 +370,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
|
||||
groupKeyWithoutExtSend_::receive,
|
||||
groupKeyWithoutExtSend -> this.subStageGetter
|
||||
.subStage(dictionary, snapshot, Mono.fromCallable(() -> groupKeyWithoutExtSend.copy().send()))
|
||||
.<Entry<T, US>>handle((us, sink) -> {
|
||||
.handle((us, sink) -> {
|
||||
T deserializedSuffix;
|
||||
try (var splittedGroupSuffix = splitGroupSuffix(groupKeyWithoutExtSend)) {
|
||||
deserializedSuffix = this.deserializeSuffix(splittedGroupSuffix);
|
||||
@ -413,7 +460,8 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
|
||||
|
||||
@Override
|
||||
protected Owned<DatabaseMapDictionaryDeep<T, U, US>> prepareSend() {
|
||||
var keyPrefix = this.keyPrefix == null ? null : this.keyPrefix.send();
|
||||
var keyPrefix = this.keyPrefix.send();
|
||||
var keySuffixAndExtZeroBuffer = this.keySuffixAndExtZeroBuffer.send();
|
||||
var range = this.range.send();
|
||||
var onClose = this.onClose;
|
||||
return drop -> {
|
||||
@ -427,6 +475,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
|
||||
rangeMono,
|
||||
range,
|
||||
keyPrefix,
|
||||
keySuffixAndExtZeroBuffer,
|
||||
onClose
|
||||
);
|
||||
drop.attach(instance);
|
||||
@ -437,6 +486,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
|
||||
@Override
|
||||
protected void makeInaccessible() {
|
||||
this.keyPrefix = null;
|
||||
this.keySuffixAndExtZeroBuffer = null;
|
||||
this.range = null;
|
||||
this.onClose = null;
|
||||
}
|
||||
|
@ -1,5 +1,7 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import static io.netty5.buffer.Unpooled.wrappedBuffer;
|
||||
|
||||
import io.netty5.buffer.Unpooled;
|
||||
import io.netty5.buffer.api.Buffer;
|
||||
import io.netty5.buffer.api.BufferAllocator;
|
||||
@ -9,7 +11,6 @@ import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import static io.netty5.buffer.Unpooled.*;
|
||||
|
||||
public class TestRanges {
|
||||
|
||||
@ -63,7 +64,10 @@ public class TestRanges {
|
||||
}
|
||||
|
||||
if (Arrays.equals(prefixKey, new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF})) {
|
||||
Assertions.assertArrayEquals(new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, 0}, nextRangeKey);
|
||||
org.assertj.core.api.Assertions
|
||||
.assertThat(nextRangeKey)
|
||||
.isEqualTo(new byte[]{(byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF,
|
||||
(byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, 0});
|
||||
} else {
|
||||
long biPrefix = 0;
|
||||
var s = 0;
|
||||
|
Loading…
Reference in New Issue
Block a user