Remove sends to improve performance

This commit is contained in:
Andrea Cavalli 2021-11-08 16:33:41 +01:00
parent 232e46bcea
commit 81b7df8702
29 changed files with 357 additions and 411 deletions

View File

@ -33,7 +33,7 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
Mono<UpdateMode> getUpdateMode();
default Mono<Send<Buffer>> update(Mono<Send<Buffer>> key,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> updater,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
UpdateReturnMode updateReturnMode,
boolean existsAlmostCertainly) {
return this
@ -42,17 +42,17 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
}
default Mono<Send<Buffer>> update(Mono<Send<Buffer>> key,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> updater,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
UpdateReturnMode returnMode) {
return update(key, updater, returnMode, false);
}
Mono<Send<LLDelta>> updateAndGetDelta(Mono<Send<Buffer>> key,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> updater,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
boolean existsAlmostCertainly);
default Mono<Send<LLDelta>> updateAndGetDelta(Mono<Send<Buffer>> key,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> updater) {
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater) {
return updateAndGetDelta(key, updater, false);
}
@ -60,11 +60,11 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
Mono<Send<Buffer>> remove(Mono<Send<Buffer>> key, LLDictionaryResultType resultType);
Flux<Optional<Send<Buffer>>> getMulti(@Nullable LLSnapshot snapshot,
Flux<Optional<Buffer>> getMulti(@Nullable LLSnapshot snapshot,
Flux<Send<Buffer>> keys,
boolean existsAlmostCertainly);
default Flux<Optional<Send<Buffer>>> getMulti(@Nullable LLSnapshot snapshot,
default Flux<Optional<Buffer>> getMulti(@Nullable LLSnapshot snapshot,
Flux<Send<Buffer>> keys) {
return getMulti(snapshot, keys, false);
}
@ -72,7 +72,7 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
Flux<Send<LLEntry>> putMulti(Flux<Send<LLEntry>> entries, boolean getOldValues);
<K> Flux<Boolean> updateMulti(Flux<K> keys, Flux<Send<Buffer>> serializedKeys,
KVSerializationFunction<K, @Nullable Send<Buffer>, @Nullable Send<Buffer>> updateFunction);
KVSerializationFunction<K, @Nullable Send<Buffer>, @Nullable Buffer> updateFunction);
Flux<Send<LLEntry>> getRange(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> range, boolean existsAlmostCertainly);

View File

@ -3,6 +3,7 @@ package it.cavallium.dbengine.database;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.Drop;
import io.net5.buffer.api.Owned;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport;
import java.util.Objects;
@ -57,6 +58,12 @@ public class LLEntry extends ResourceSupport<LLEntry, LLEntry> {
this.value = value.receive().makeReadOnly();
assert isAllAccessible();
}
private LLEntry(@NotNull Buffer key, @NotNull Buffer value) {
super(DROP);
this.key = key.makeReadOnly();
this.value = value.makeReadOnly();
assert isAllAccessible();
}
private boolean isAllAccessible() {
assert key != null && key.isAccessible();
@ -70,6 +77,10 @@ public class LLEntry extends ResourceSupport<LLEntry, LLEntry> {
return new LLEntry(key, value);
}
public static LLEntry of(@NotNull Buffer key, @NotNull Buffer value) {
return new LLEntry(key, value);
}
public Send<Buffer> getKey() {
ensureOwned();
return Objects.requireNonNull(key).copy().send();

View File

@ -56,7 +56,7 @@ public class LLRange extends ResourceSupport<LLRange, LLRange> {
}
};
private static final LLRange RANGE_ALL = new LLRange(null, null, null);
private static final LLRange RANGE_ALL = new LLRange((Buffer) null, (Buffer) null, (Buffer) null);
@Nullable
private Buffer min;
@Nullable
@ -73,6 +73,15 @@ public class LLRange extends ResourceSupport<LLRange, LLRange> {
this.single = single != null ? single.receive().makeReadOnly() : null;
}
private LLRange(Buffer min, Buffer max, Buffer single) {
super(DROP);
assert isAllAccessible();
assert single == null || (min == null && max == null);
this.min = min != null ? min.makeReadOnly() : null;
this.max = max != null ? max.makeReadOnly() : null;
this.single = single != null ? single.makeReadOnly() : null;
}
private boolean isAllAccessible() {
assert min == null || min.isAccessible();
assert max == null || max.isAccessible();
@ -102,6 +111,10 @@ public class LLRange extends ResourceSupport<LLRange, LLRange> {
return new LLRange(min, max, null);
}
public static LLRange ofUnsafe(Buffer min, Buffer max) {
return new LLRange(min, max, null);
}
public boolean isAll() {
ensureOwned();
return min == null && max == null && single == null;

View File

@ -39,7 +39,7 @@ public class DatabaseEmpty {
private DatabaseEmpty() {
}
public static DatabaseStageEntry<Nothing> create(LLDictionary dictionary, Send<Buffer> key, Runnable onClose) {
public static DatabaseStageEntry<Nothing> create(LLDictionary dictionary, Buffer key, Runnable onClose) {
return new DatabaseSingle<>(dictionary, key, nothingSerializer(dictionary.getAllocator()), onClose);
}

View File

@ -40,7 +40,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
private final Serializer<U> valueSerializer;
protected DatabaseMapDictionary(LLDictionary dictionary,
@NotNull Send<Buffer> prefixKey,
@Nullable Buffer prefixKey,
SerializerFixedBinaryLength<T> keySuffixSerializer,
Serializer<U> valueSerializer,
Runnable onClose) {
@ -53,12 +53,12 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
SerializerFixedBinaryLength<T> keySerializer,
Serializer<U> valueSerializer,
Runnable onClose) {
return new DatabaseMapDictionary<>(dictionary, LLUtils.empty(dictionary.getAllocator()), keySerializer,
return new DatabaseMapDictionary<>(dictionary, null, keySerializer,
valueSerializer, onClose);
}
public static <T, U> DatabaseMapDictionary<T, U> tail(LLDictionary dictionary,
Send<Buffer> prefixKey,
@Nullable Buffer prefixKey,
SerializerFixedBinaryLength<T> keySuffixSerializer,
Serializer<U> valueSerializer,
Runnable onClose) {
@ -73,40 +73,55 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
}
}
private Send<Buffer> serializeValue(U value) throws SerializationException {
private Buffer serializeValue(U value) throws SerializationException {
var valSizeHint = valueSerializer.getSerializedSizeHint();
if (valSizeHint == -1) valSizeHint = 128;
try (var valBuf = dictionary.getAllocator().allocate(valSizeHint)) {
var valBuf = dictionary.getAllocator().allocate(valSizeHint);
try {
valueSerializer.serialize(value, valBuf);
return valBuf.send();
return valBuf;
} catch (Throwable t) {
valBuf.close();
throw t;
}
}
private Send<Buffer> serializeKeySuffixToKey(T keySuffix) throws SerializationException {
try (var keyBuf = keyPrefix.copy()) {
private Buffer serializeKeySuffixToKey(T keySuffix) throws SerializationException {
Buffer keyBuf;
if (keyPrefix != null) {
keyBuf = keyPrefix.copy();
} else {
keyBuf = this.dictionary.getAllocator().allocate(keyPrefixLength + keySuffixLength + keyExtLength);
}
try {
assert keyBuf.readableBytes() == keyPrefixLength;
keyBuf.ensureWritable(keySuffixLength + keyExtLength);
serializeSuffix(keySuffix, keyBuf);
assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
return keyBuf.send();
return keyBuf;
} catch (Throwable t) {
keyBuf.close();
throw t;
}
}
private Send<Buffer> toKey(Send<Buffer> suffixKeyToSend) {
try (var suffixKey = suffixKeyToSend.receive()) {
assert suffixKeyLengthConsistency(suffixKey.readableBytes());
if (keyPrefix.readableBytes() > 0) {
try (var result = LLUtils.compositeBuffer(dictionary.getAllocator(),
LLUtils.copy(dictionary.getAllocator(), keyPrefix),
suffixKey.send()
)) {
assert result.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
return result.send();
}
} else {
assert suffixKey.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
return suffixKey.send();
private Buffer toKey(Buffer suffixKey) {
assert suffixKeyLengthConsistency(suffixKey.readableBytes());
if (keyPrefix != null && keyPrefix.readableBytes() > 0) {
var result = LLUtils.compositeBuffer(dictionary.getAllocator(),
LLUtils.copy(dictionary.getAllocator(), keyPrefix),
suffixKey.send()
);
try {
assert result.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
return result;
} catch (Throwable t) {
result.close();
throw t;
}
} else {
assert suffixKey.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
return suffixKey;
}
}
@ -177,7 +192,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public Mono<U> getValue(@Nullable CompositeSnapshot snapshot, T keySuffix, boolean existsAlmostCertainly) {
return dictionary
.get(resolveSnapshot(snapshot),
Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)),
Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send()),
existsAlmostCertainly
)
.handle(this::deserializeValue);
@ -185,8 +200,8 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<Void> putValue(T keySuffix, U value) {
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)).single();
var valueMono = Mono.fromCallable(() -> serializeValue(value)).single();
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send()).single();
var valueMono = Mono.fromCallable(() -> serializeValue(value).send()).single();
return dictionary
.put(keyMono, valueMono, LLDictionaryResultType.VOID)
.doOnNext(Send::close)
@ -203,7 +218,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
UpdateReturnMode updateReturnMode,
boolean existsAlmostCertainly,
SerializationFunction<@Nullable U, @Nullable U> updater) {
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send());
return dictionary
.update(keyMono, getSerializedUpdater(updater), updateReturnMode, existsAlmostCertainly)
.handle(this::deserializeValue);
@ -213,7 +228,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public Mono<Delta<U>> updateValueAndGetDelta(T keySuffix,
boolean existsAlmostCertainly,
SerializationFunction<@Nullable U, @Nullable U> updater) {
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send());
return dictionary
.updateAndGetDelta(keyMono, getSerializedUpdater(updater), existsAlmostCertainly)
.transform(mono -> LLUtils.mapLLDelta(mono, serializedToReceive -> {
@ -223,7 +238,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
}));
}
public SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> getSerializedUpdater(
public SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> getSerializedUpdater(
SerializationFunction<@Nullable U, @Nullable U> updater) {
return oldSerialized -> {
try (oldSerialized) {
@ -244,7 +259,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
};
}
public KVSerializationFunction<@NotNull T, @Nullable Send<Buffer>, @Nullable Send<Buffer>> getSerializedUpdater(
public KVSerializationFunction<@NotNull T, @Nullable Send<Buffer>, @Nullable Buffer> getSerializedUpdater(
KVSerializationFunction<@NotNull T, @Nullable U, @Nullable U> updater) {
return (key, oldSerialized) -> {
try (oldSerialized) {
@ -267,15 +282,15 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<U> putValueAndGetPrevious(T keySuffix, U value) {
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
var valueMono = Mono.fromCallable(() -> serializeValue(value));
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send());
var valueMono = Mono.fromCallable(() -> serializeValue(value).send());
return dictionary.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE).handle(this::deserializeValue);
}
@Override
public Mono<Boolean> putValueAndGetChanged(T keySuffix, U value) {
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
var valueMono = Mono.fromCallable(() -> serializeValue(value));
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send());
var valueMono = Mono.fromCallable(() -> serializeValue(value).send());
return dictionary
.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE)
.handle(this::deserializeValue)
@ -285,7 +300,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<Void> remove(T keySuffix) {
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send());
return dictionary
.remove(keyMono, LLDictionaryResultType.VOID)
.doOnNext(Send::close)
@ -294,13 +309,13 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<U> removeAndGetPrevious(T keySuffix) {
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send());
return dictionary.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE).handle(this::deserializeValue);
}
@Override
public Mono<Boolean> removeAndGetStatus(T keySuffix) {
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send());
return dictionary
.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE_EXISTENCE)
.map(LLUtils::responseToBoolean);
@ -311,8 +326,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
var mappedKeys = keys
.<Send<Buffer>>handle((keySuffix, sink) -> {
try {
var buf = serializeKeySuffixToKey(keySuffix);
sink.next(buf);
sink.next(serializeKeySuffixToKey(keySuffix).send());
} catch (Throwable ex) {
sink.error(ex);
}
@ -323,9 +337,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
try {
Optional<U> valueOpt;
if (valueBufOpt.isPresent()) {
try (var buf = valueBufOpt.get().receive()) {
valueOpt = Optional.of(valueSerializer.deserialize(buf));
}
valueOpt = Optional.of(valueSerializer.deserialize(valueBufOpt.get()));
} else {
valueOpt = Optional.empty();
}
@ -333,23 +345,26 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
} catch (Throwable ex) {
sink.error(ex);
} finally {
valueBufOpt.ifPresent(Send::close);
valueBufOpt.ifPresent(Resource::close);
}
})
.transform(LLUtils::handleDiscard);
}
private Send<LLEntry> serializeEntry(T keySuffix, U value) throws SerializationException {
try (var key = serializeKeySuffixToKey(keySuffix)) {
try (var serializedValue = serializeValue(value)) {
return LLEntry.of(key, serializedValue).send();
}
private LLEntry serializeEntry(T keySuffix, U value) throws SerializationException {
var key = serializeKeySuffixToKey(keySuffix);
try {
var serializedValue = serializeValue(value);
return LLEntry.of(key, serializedValue);
} catch (Throwable t) {
key.close();
throw t;
}
}
private void serializeEntrySink(Entry<T,U> entry, SynchronousSink<Send<LLEntry>> sink) {
try {
sink.next(serializeEntry(entry.getKey(), entry.getValue()));
sink.next(serializeEntry(entry.getKey(), entry.getValue()).send());
} catch (Throwable e) {
sink.error(e);
}
@ -360,7 +375,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
var serializedEntries = entries
.<Send<LLEntry>>handle((entry, sink) -> {
try {
sink.next(serializeEntry(entry.getKey(), entry.getValue()));
sink.next(serializeEntry(entry.getKey(), entry.getValue()).send());
} catch (Throwable e) {
sink.error(e);
}
@ -391,7 +406,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
var serializedKeys = sharedKeys
.<Send<Buffer>>handle((key, sink) -> {
try {
Send<Buffer> serializedKey = serializeKeySuffixToKey(key);
Send<Buffer> serializedKey = serializeKeySuffixToKey(key).send();
sink.next(serializedKey);
} catch (Throwable ex) {
sink.error(ex);
@ -414,7 +429,8 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
return dictionary
.getRangeKeys(resolveSnapshot(snapshot), rangeMono)
.handle((keyBufToReceive, sink) -> {
try (var keyBuf = keyBufToReceive.receive()) {
var keyBuf = keyBufToReceive.receive();
try {
assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
// Remove prefix. Keep only the suffix and the ext
splitPrefix(keyBuf).close();
@ -423,9 +439,10 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
try (var keyBufCopy = keyBuf.copy()) {
keySuffix = deserializeSuffix(keyBufCopy);
}
var subStage = new DatabaseSingle<>(dictionary, toKey(keyBuf.send()), valueSerializer, null);
var subStage = new DatabaseSingle<>(dictionary, toKey(keyBuf), valueSerializer, null);
sink.next(Map.entry(keySuffix, subStage));
} catch (Throwable ex) {
keyBuf.close();
sink.error(ex);
}
});
@ -439,19 +456,17 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
try {
Entry<T, U> entry;
try (var serializedEntry = serializedEntryToReceive.receive()) {
try (var keyBuf = serializedEntry.getKey().receive()) {
assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
// Remove prefix. Keep only the suffix and the ext
splitPrefix(keyBuf).close();
suffixKeyLengthConsistency(keyBuf.readableBytes());
T keySuffix = deserializeSuffix(keyBuf);
var keyBuf = serializedEntry.getKeyUnsafe();
assert keyBuf != null;
assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
// Remove prefix. Keep only the suffix and the ext
splitPrefix(keyBuf).close();
suffixKeyLengthConsistency(keyBuf.readableBytes());
T keySuffix = deserializeSuffix(keyBuf);
U value;
try (var valueBuf = serializedEntry.getValue().receive()) {
value = valueSerializer.deserialize(valueBuf);
}
entry = Map.entry(keySuffix, value);
}
assert serializedEntry.getValueUnsafe() != null;
U value = valueSerializer.deserialize(serializedEntry.getValueUnsafe());
entry = Map.entry(keySuffix, value);
}
sink.next(entry);
} catch (Throwable e) {

View File

@ -19,7 +19,6 @@ import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.log.Logger;
@ -115,71 +114,25 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
}
}
static Buffer firstRangeKey(BufferAllocator alloc, Send<Buffer> prefixKey, int prefixLength, int suffixLength,
int extLength) {
return zeroFillKeySuffixAndExt(alloc, prefixKey, prefixLength, suffixLength, extLength);
static void firstRangeKey(Buffer prefixKey, int prefixLength, int suffixLength, int extLength) {
zeroFillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength);
}
static Buffer nextRangeKey(BufferAllocator alloc, Send<Buffer> prefixKey, int prefixLength, int suffixLength,
int extLength) {
try (prefixKey) {
Buffer nonIncremented = zeroFillKeySuffixAndExt(alloc, prefixKey, prefixLength, suffixLength, extLength);
incrementPrefix(nonIncremented, prefixLength);
return nonIncremented;
}
static void nextRangeKey(Buffer prefixKey, int prefixLength, int suffixLength, int extLength) {
zeroFillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength);
incrementPrefix(prefixKey, prefixLength);
}
protected static Buffer zeroFillKeySuffixAndExt(BufferAllocator alloc, @NotNull Send<Buffer> prefixKeySend,
protected static void zeroFillKeySuffixAndExt(@NotNull Buffer prefixKey,
int prefixLength, int suffixLength, int extLength) {
var result = prefixKeySend.receive();
if (result == null) {
assert prefixLength == 0;
var buf = alloc.allocate(prefixLength + suffixLength + extLength);
buf.writerOffset(prefixLength + suffixLength + extLength);
buf.fill((byte) 0);
return buf;
} else {
assert result.readableBytes() == prefixLength;
assert suffixLength > 0;
assert extLength >= 0;
result.ensureWritable(suffixLength + extLength, suffixLength + extLength, true);
for (int i = 0; i < suffixLength + extLength; i++) {
result.writeByte((byte) 0x0);
}
return result;
}
}
static Buffer firstRangeKey(BufferAllocator alloc, Send<Buffer> prefixKey, Send<Buffer> suffixKey, int prefixLength,
int suffixLength, int extLength) {
return zeroFillKeyExt(alloc, prefixKey, suffixKey, prefixLength, suffixLength, extLength);
}
static Buffer nextRangeKey(BufferAllocator alloc, Send<Buffer> prefixKey, Send<Buffer> suffixKey, int prefixLength,
int suffixLength, int extLength) {
Buffer nonIncremented = zeroFillKeyExt(alloc, prefixKey, suffixKey, prefixLength, suffixLength, extLength);
incrementPrefix(nonIncremented, prefixLength + suffixLength);
return nonIncremented;
}
protected static Buffer zeroFillKeyExt(BufferAllocator alloc, Send<Buffer> prefixKeySend, Send<Buffer> suffixKeySend,
int prefixLength, int suffixLength, int extLength) {
try (var prefixKey = prefixKeySend.receive()) {
try (var suffixKey = suffixKeySend.receive()) {
assert prefixKey.readableBytes() == prefixLength;
assert suffixKey.readableBytes() == suffixLength;
assert suffixLength > 0;
assert extLength >= 0;
Buffer result = LLUtils.compositeBuffer(alloc, prefixKey.send(), suffixKey.send());
result.ensureWritable(extLength, extLength, true);
for (int i = 0; i < extLength; i++) {
result.writeByte((byte) 0);
}
assert result.readableBytes() == prefixLength + suffixLength + extLength;
return result;
}
//noinspection UnnecessaryLocalVariable
var result = prefixKey;
assert result.readableBytes() == prefixLength;
assert suffixLength > 0;
assert extLength >= 0;
result.ensureWritable(suffixLength + extLength, suffixLength + extLength, true);
for (int i = 0; i < suffixLength + extLength; i++) {
result.writeByte((byte) 0x0);
}
}
@ -190,54 +143,73 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
public static <T, U> DatabaseMapDictionaryDeep<T, U, DatabaseStageEntry<U>> simple(LLDictionary dictionary,
SerializerFixedBinaryLength<T> keySerializer, SubStageGetterSingle<U> subStageGetter,
Runnable onClose) {
return new DatabaseMapDictionaryDeep<>(dictionary, LLUtils.empty(dictionary.getAllocator()), keySerializer,
return new DatabaseMapDictionaryDeep<>(dictionary, null, keySerializer,
subStageGetter, 0, onClose);
}
public static <T, U, US extends DatabaseStage<U>> DatabaseMapDictionaryDeep<T, U, US> deepTail(
LLDictionary dictionary, SerializerFixedBinaryLength<T> keySerializer, int keyExtLength,
SubStageGetter<U, US> subStageGetter, Runnable onClose) {
return new DatabaseMapDictionaryDeep<>(dictionary, LLUtils.empty(dictionary.getAllocator()), keySerializer,
return new DatabaseMapDictionaryDeep<>(dictionary, null, keySerializer,
subStageGetter, keyExtLength, onClose);
}
public static <T, U, US extends DatabaseStage<U>> DatabaseMapDictionaryDeep<T, U, US> deepIntermediate(
LLDictionary dictionary, Send<Buffer> prefixKey, SerializerFixedBinaryLength<T> keySuffixSerializer,
LLDictionary dictionary, Buffer prefixKey, SerializerFixedBinaryLength<T> keySuffixSerializer,
SubStageGetter<U, US> subStageGetter, int keyExtLength, Runnable onClose) {
return new DatabaseMapDictionaryDeep<>(dictionary, prefixKey, keySuffixSerializer, subStageGetter,
keyExtLength, onClose);
}
@SuppressWarnings({"unchecked", "rawtypes"})
protected DatabaseMapDictionaryDeep(LLDictionary dictionary, @NotNull Send<Buffer> prefixKeyToReceive,
protected DatabaseMapDictionaryDeep(LLDictionary dictionary, @Nullable Buffer prefixKey,
SerializerFixedBinaryLength<T> keySuffixSerializer, SubStageGetter<U, US> subStageGetter, int keyExtLength,
Runnable onClose) {
super((Drop<DatabaseMapDictionaryDeep<T, U, US>>) (Drop) DROP);
try (var prefixKey = prefixKeyToReceive.receive()) {
try {
this.dictionary = dictionary;
this.alloc = dictionary.getAllocator();
this.subStageGetter = subStageGetter;
this.keySuffixSerializer = keySuffixSerializer;
assert prefixKey.isAccessible();
this.keyPrefixLength = prefixKey.readableBytes();
assert prefixKey == null || prefixKey.isAccessible();
this.keyPrefixLength = prefixKey == null ? 0 : prefixKey.readableBytes();
this.keySuffixLength = keySuffixSerializer.getSerializedBinaryLength();
this.keyExtLength = keyExtLength;
Buffer firstKey = firstRangeKey(alloc, LLUtils.copy(alloc, prefixKey), keyPrefixLength,
keySuffixLength, keyExtLength);
try (firstKey) {
var nextRangeKey = nextRangeKey(alloc, LLUtils.copy(alloc, prefixKey),
keyPrefixLength, keySuffixLength, keyExtLength);
try (nextRangeKey) {
assert prefixKey.isAccessible();
var firstKey = prefixKey == null ? alloc.allocate(keyPrefixLength + keySuffixLength + keyExtLength)
: prefixKey.copy();
try {
firstRangeKey(firstKey, keyPrefixLength, keySuffixLength, keyExtLength);
var nextRangeKey = prefixKey == null ? alloc.allocate(keyPrefixLength + keySuffixLength + keyExtLength)
: prefixKey.copy();
try {
nextRangeKey(nextRangeKey, keyPrefixLength, keySuffixLength, keyExtLength);
assert prefixKey == null || prefixKey.isAccessible();
assert keyPrefixLength == 0 || !LLUtils.equals(firstKey, nextRangeKey);
this.range = keyPrefixLength == 0 ? LLRange.all() : LLRange.of(firstKey.send(), nextRangeKey.send());
if (keyPrefixLength == 0) {
this.range = LLRange.all();
firstKey.close();
nextRangeKey.close();
} else {
this.range = LLRange.ofUnsafe(firstKey, nextRangeKey);
}
this.rangeMono = LLUtils.lazyRetainRange(this.range);
assert subStageKeysConsistency(keyPrefixLength + keySuffixLength + keyExtLength);
} catch (Throwable t) {
nextRangeKey.close();
throw t;
}
} catch (Throwable t) {
firstKey.close();
throw t;
}
this.keyPrefix = prefixKey.send().receive();
this.keyPrefix = prefixKey;
this.onClose = onClose;
} catch (Throwable t) {
if (prefixKey != null && prefixKey.isAccessible()) {
prefixKey.close();
}
throw t;
}
}
@ -296,26 +268,6 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
return prefix;
}
/**
* Removes the ext from the key
*/
protected void removeExt(Buffer key) {
assert key.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
key.writerOffset(keyPrefixLength + keySuffixLength);
assert key.readableBytes() == keyPrefixLength + keySuffixLength;
}
protected Send<Buffer> toKeyWithoutExt(Send<Buffer> suffixKeyToReceive) {
try (var suffixKey = suffixKeyToReceive.receive()) {
assert suffixKey.readableBytes() == keySuffixLength;
try (var result = Objects.requireNonNull(LLUtils.compositeBuffer(alloc,
LLUtils.copy(alloc, keyPrefix), suffixKey.send()))) {
assert result.readableBytes() == keyPrefixLength + keySuffixLength;
return result.send();
}
}
}
protected LLSnapshot resolveSnapshot(@Nullable CompositeSnapshot snapshot) {
if (snapshot == null) {
return null;
@ -337,7 +289,8 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
@Override
public Mono<US> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
var suffixKeyWithoutExt = Mono.fromCallable(() -> {
try (var keyWithoutExtBuf = keyPrefix.copy()) {
try (var keyWithoutExtBuf = keyPrefix == null
? alloc.allocate(keySuffixLength + keyExtLength) : keyPrefix.copy()) {
keyWithoutExtBuf.ensureWritable(keySuffixLength + keyExtLength);
serializeSuffix(keySuffix, keyWithoutExtBuf);
return keyWithoutExtBuf.send();
@ -395,15 +348,6 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
return groupKey.readSplit(keySuffixLength);
}
private Send<Buffer> getGroupWithoutExt(Send<Buffer> groupKeyWithExtSend) {
try (var buffer = groupKeyWithExtSend.receive()) {
assert subStageKeysConsistency(buffer.readableBytes());
this.removeExt(buffer);
assert subStageKeysConsistency(buffer.readableBytes() + keyExtLength);
return buffer.send();
}
}
private boolean subStageKeysConsistency(int totalKeyLength) {
if (subStageGetter instanceof SubStageGetterMapDeep) {
return totalKeyLength
@ -448,7 +392,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
protected T deserializeSuffix(@NotNull Buffer keySuffix) throws SerializationException {
assert suffixKeyLengthConsistency(keySuffix.readableBytes());
var result = keySuffixSerializer.deserialize(keySuffix);
assert keyPrefix.isAccessible();
assert keyPrefix == null || keyPrefix.isAccessible();
return result;
}
@ -459,7 +403,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
keySuffixSerializer.serialize(keySuffix, output);
var afterWriterOffset = output.writerOffset();
assert suffixKeyLengthConsistency(afterWriterOffset - beforeWriterOffset);
assert keyPrefix.isAccessible();
assert keyPrefix == null || keyPrefix.isAccessible();
}
@Override
@ -469,7 +413,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
@Override
protected Owned<DatabaseMapDictionaryDeep<T, U, US>> prepareSend() {
var keyPrefix = this.keyPrefix.send();
var keyPrefix = this.keyPrefix == null ? null : this.keyPrefix.send();
var range = this.range.send();
var onClose = this.onClose;
return drop -> {

View File

@ -65,7 +65,7 @@ public class DatabaseMapDictionaryHashed<T, U, TH> extends ResourceSupport<Datab
@SuppressWarnings({"unchecked", "rawtypes"})
protected DatabaseMapDictionaryHashed(LLDictionary dictionary,
@NotNull Send<Buffer> prefixKey,
@Nullable Buffer prefixKey,
Serializer<T> keySuffixSerializer,
Serializer<U> valueSerializer,
Function<T, TH> keySuffixHashFunction,
@ -105,7 +105,7 @@ public class DatabaseMapDictionaryHashed<T, U, TH> extends ResourceSupport<Datab
Runnable onClose) {
return new DatabaseMapDictionaryHashed<>(
dictionary,
LLUtils.empty(dictionary.getAllocator()),
null,
keySerializer,
valueSerializer,
keyHashFunction,
@ -115,7 +115,7 @@ public class DatabaseMapDictionaryHashed<T, U, TH> extends ResourceSupport<Datab
}
public static <T, U, UH> DatabaseMapDictionaryHashed<T, U, UH> tail(LLDictionary dictionary,
Send<Buffer> prefixKey,
@Nullable Buffer prefixKey,
Serializer<T> keySuffixSerializer,
Serializer<U> valueSerializer,
Function<T, UH> keySuffixHashFunction,

View File

@ -18,7 +18,7 @@ import reactor.core.publisher.Mono;
public class DatabaseSetDictionary<T> extends DatabaseMapDictionary<T, Nothing> {
protected DatabaseSetDictionary(LLDictionary dictionary,
Send<Buffer> prefixKey,
Buffer prefixKey,
SerializerFixedBinaryLength<T> keySuffixSerializer,
Runnable onClose) {
super(dictionary, prefixKey, keySuffixSerializer, DatabaseEmpty.nothingSerializer(dictionary.getAllocator()), onClose);
@ -27,11 +27,11 @@ public class DatabaseSetDictionary<T> extends DatabaseMapDictionary<T, Nothing>
public static <T> DatabaseSetDictionary<T> simple(LLDictionary dictionary,
SerializerFixedBinaryLength<T> keySerializer,
Runnable onClose) {
return new DatabaseSetDictionary<>(dictionary, LLUtils.empty(dictionary.getAllocator()), keySerializer, onClose);
return new DatabaseSetDictionary<>(dictionary, null, keySerializer, onClose);
}
public static <T> DatabaseSetDictionary<T> tail(LLDictionary dictionary,
Send<Buffer> prefixKey,
Buffer prefixKey,
SerializerFixedBinaryLength<T> keySuffixSerializer,
Runnable onClose) {
return new DatabaseSetDictionary<>(dictionary, prefixKey, keySuffixSerializer, onClose);

View File

@ -21,7 +21,7 @@ import reactor.core.publisher.Mono;
public class DatabaseSetDictionaryHashed<T, TH> extends DatabaseMapDictionaryHashed<T, Nothing, TH> {
protected DatabaseSetDictionaryHashed(LLDictionary dictionary,
@NotNull Send<Buffer> prefixKey,
@Nullable Buffer prefixKey,
Serializer<T> keySuffixSerializer,
Function<T, TH> keySuffixHashFunction,
SerializerFixedBinaryLength<TH> keySuffixHashSerializer,
@ -42,7 +42,7 @@ public class DatabaseSetDictionaryHashed<T, TH> extends DatabaseMapDictionaryHas
SerializerFixedBinaryLength<TH> keyHashSerializer,
Runnable onClose) {
return new DatabaseSetDictionaryHashed<>(dictionary,
LLUtils.empty(dictionary.getAllocator()),
null,
keySerializer,
keyHashFunction,
keyHashSerializer,
@ -51,7 +51,7 @@ public class DatabaseSetDictionaryHashed<T, TH> extends DatabaseMapDictionaryHas
}
public static <T, TH> DatabaseSetDictionaryHashed<T, TH> tail(LLDictionary dictionary,
Send<Buffer> prefixKey,
@Nullable Buffer prefixKey,
Serializer<T> keySuffixSerializer,
Function<T, TH> keyHashFunction,
SerializerFixedBinaryLength<TH> keyHashSerializer,

View File

@ -62,16 +62,14 @@ public class DatabaseSingle<U> extends ResourceSupport<DatabaseStage<U>, Databas
private Runnable onClose;
@SuppressWarnings({"unchecked", "rawtypes"})
public DatabaseSingle(LLDictionary dictionary, Send<Buffer> key, Serializer<U> serializer,
public DatabaseSingle(LLDictionary dictionary, Buffer key, Serializer<U> serializer,
Runnable onClose) {
super((Drop<DatabaseSingle<U>>) (Drop) DROP);
try (key) {
this.dictionary = dictionary;
this.key = key.receive();
this.keyMono = LLUtils.lazyRetain(this.key);
this.serializer = serializer;
this.onClose = onClose;
}
this.dictionary = dictionary;
this.key = key;
this.keyMono = LLUtils.lazyRetain(this.key);
this.serializer = serializer;
this.onClose = onClose;
}
private LLSnapshot resolveSnapshot(@Nullable CompositeSnapshot snapshot) {
@ -137,7 +135,7 @@ public class DatabaseSingle<U> extends ResourceSupport<DatabaseStage<U>, Databas
if (result == null) {
return null;
} else {
return serializeValue(result);
return serializeValue(result).receive();
}
}
}, updateReturnMode, existsAlmostCertainly)
@ -163,7 +161,7 @@ public class DatabaseSingle<U> extends ResourceSupport<DatabaseStage<U>, Databas
if (result == null) {
return null;
} else {
return serializeValue(result);
return serializeValue(result).receive();
}
}
}, existsAlmostCertainly).transform(mono -> LLUtils.mapLLDelta(mono, serialized -> {
@ -205,9 +203,10 @@ public class DatabaseSingle<U> extends ResourceSupport<DatabaseStage<U>, Databas
@Override
protected Owned<DatabaseSingle<U>> prepareSend() {
var key = this.key.send();
var keySend = this.key.send();
var onClose = this.onClose;
return drop -> {
var key = keySend.receive();
var instance = new DatabaseSingle<>(dictionary, key, serializer, onClose);
drop.attach(instance);
return instance;

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.database.collections;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
@ -35,9 +36,17 @@ public class SubStageGetterHashMap<T, U, TH> implements
public Mono<DatabaseMapDictionaryHashed<T, U, TH>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot,
Mono<Send<Buffer>> prefixKeyMono) {
return LLUtils.usingSend(prefixKeyMono, prefixKey -> Mono.just(DatabaseMapDictionaryHashed
.tail(dictionary, prefixKey, keySerializer, valueSerializer, keyHashFunction,
keyHashSerializer, null)), true);
return prefixKeyMono.map(prefixKeyToReceive -> {
var prefixKey = prefixKeyToReceive.receive();
return DatabaseMapDictionaryHashed.tail(dictionary,
prefixKey,
keySerializer,
valueSerializer,
keyHashFunction,
keyHashSerializer,
null
);
}).doOnDiscard(Send.class, Send::close).doOnDiscard(Resource.class, Resource::close);
}
public int getKeyHashBinaryLength() {

View File

@ -1,9 +1,11 @@
package it.cavallium.dbengine.database.collections;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
@ -32,13 +34,16 @@ public class SubStageGetterHashSet<T, TH> implements
public Mono<DatabaseSetDictionaryHashed<T, TH>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot,
Mono<Send<Buffer>> prefixKeyMono) {
return Mono.usingWhen(prefixKeyMono,
prefixKey -> Mono
.fromSupplier(() -> DatabaseSetDictionaryHashed.tail(dictionary, prefixKey, keySerializer,
keyHashFunction, keyHashSerializer, null)
),
prefixKey -> Mono.fromRunnable(prefixKey::close)
);
return prefixKeyMono.map(prefixKeyToReceive -> {
var prefixKey = prefixKeyToReceive.receive();
return DatabaseSetDictionaryHashed.tail(dictionary,
prefixKey,
keySerializer,
keyHashFunction,
keyHashSerializer,
null
);
}).doOnDiscard(Send.class, Send::close).doOnDiscard(Resource.class, Resource::close);
}
public int getKeyHashBinaryLength() {

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.database.collections;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
@ -26,9 +27,10 @@ public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, Databa
public Mono<DatabaseMapDictionary<T, U>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot,
Mono<Send<Buffer>> prefixKeyMono) {
return LLUtils.usingSend(prefixKeyMono,
prefixKey -> Mono.fromSupplier(() -> DatabaseMapDictionary
.tail(dictionary, prefixKey, keySerializer, valueSerializer, null)), true);
return prefixKeyMono.map(prefixKeyToReceive -> {
var prefixKey = prefixKeyToReceive.receive();
return DatabaseMapDictionary.tail(dictionary, prefixKey, keySerializer, valueSerializer, null);
}).doOnDiscard(Send.class, Send::close).doOnDiscard(Resource.class, Resource::close);
}
public int getKeyBinaryLength() {

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.database.collections;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
@ -41,8 +42,16 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
public Mono<DatabaseMapDictionaryDeep<T, U, US>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot,
Mono<Send<Buffer>> prefixKeyMono) {
return LLUtils.usingSend(prefixKeyMono, prefixKey -> Mono.just(DatabaseMapDictionaryDeep
.deepIntermediate(dictionary, prefixKey, keySerializer, subStageGetter, keyExtLength, null)), true);
return prefixKeyMono.map(prefixKeyToReceive -> {
var prefixKey = prefixKeyToReceive.receive();
return DatabaseMapDictionaryDeep.deepIntermediate(dictionary,
prefixKey,
keySerializer,
subStageGetter,
keyExtLength,
null
);
}).doOnDiscard(Send.class, Send::close).doOnDiscard(Resource.class, Resource::close);
}
public int getKeyBinaryLength() {

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.database.collections;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
@ -22,11 +23,10 @@ public class SubStageGetterSet<T> implements SubStageGetter<Map<T, Nothing>, Dat
public Mono<DatabaseSetDictionary<T>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot,
Mono<Send<Buffer>> prefixKeyMono) {
return Mono.usingWhen(prefixKeyMono,
prefixKey -> Mono
.fromSupplier(() -> DatabaseSetDictionary.tail(dictionary, prefixKey, keySerializer, null)),
prefixKey -> Mono.fromRunnable(prefixKey::close)
);
return prefixKeyMono.map(prefixKeyToReceive -> {
var prefixKey = prefixKeyToReceive.receive();
return DatabaseSetDictionary.tail(dictionary, prefixKey, keySerializer, null);
}).doOnDiscard(Send.class, Send::close).doOnDiscard(Resource.class, Resource::close);
}
public int getKeyBinaryLength() {

View File

@ -20,7 +20,7 @@ public class SubStageGetterSingle<T> implements SubStageGetter<T, DatabaseStageE
public Mono<DatabaseStageEntry<T>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot,
Mono<Send<Buffer>> keyPrefixMono) {
return keyPrefixMono.map(keyPrefix -> new DatabaseSingle<>(dictionary, keyPrefix, serializer, null));
return keyPrefixMono.map(keyPrefix -> new DatabaseSingle<>(dictionary, keyPrefix.receive(), serializer, null));
}
}

View File

@ -371,7 +371,7 @@ public class LLLocalDictionary implements LLDictionary {
@SuppressWarnings("DuplicatedCode")
@Override
public Mono<Send<Buffer>> update(Mono<Send<Buffer>> keyMono,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> updater,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
UpdateReturnMode updateReturnMode,
boolean existsAlmostCertainly) {
return Mono.usingWhen(keyMono, keySend -> runOnDb(() -> {
@ -400,7 +400,7 @@ public class LLLocalDictionary implements LLDictionary {
@SuppressWarnings("DuplicatedCode")
@Override
public Mono<Send<LLDelta>> updateAndGetDelta(Mono<Send<Buffer>> keyMono,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> updater,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
boolean existsAlmostCertainly) {
return Mono.usingWhen(keyMono, keySend -> runOnDb(() -> {
if (Schedulers.isInNonBlockingThread()) {
@ -486,7 +486,7 @@ public class LLLocalDictionary implements LLDictionary {
}
@Override
public Flux<Optional<Send<Buffer>>> getMulti(@Nullable LLSnapshot snapshot,
public Flux<Optional<Buffer>> getMulti(@Nullable LLSnapshot snapshot,
Flux<Send<Buffer>> keys,
boolean existsAlmostCertainly) {
return keys
@ -515,7 +515,7 @@ public class LLLocalDictionary implements LLDictionary {
}
var readOptions = Objects.requireNonNullElse(resolveSnapshot(snapshot), EMPTY_READ_OPTIONS);
List<byte[]> results = db.multiGetAsList(readOptions, LLUtils.toArray(keyBufsWindow));
var mappedResults = new ArrayList<Optional<Send<Buffer>>>(results.size());
var mappedResults = new ArrayList<Optional<Buffer>>(results.size());
for (int i = 0; i < results.size(); i++) {
byte[] val = results.get(i);
Optional<Buffer> valueOpt;
@ -527,7 +527,7 @@ public class LLLocalDictionary implements LLDictionary {
} else {
valueOpt = Optional.empty();
}
mappedResults.add(valueOpt.map(Resource::send));
mappedResults.add(valueOpt);
}
return mappedResults;
} finally {
@ -618,7 +618,7 @@ public class LLLocalDictionary implements LLDictionary {
@Override
public <K> Flux<Boolean> updateMulti(Flux<K> keys, Flux<Send<Buffer>> serializedKeys,
KVSerializationFunction<K, @Nullable Send<Buffer>, @Nullable Send<Buffer>> updateFunction) {
KVSerializationFunction<K, @Nullable Send<Buffer>, @Nullable Buffer> updateFunction) {
return Flux.zip(keys, serializedKeys)
.buffer(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP))
.flatMapSequential(ew -> this.<List<Boolean>>runOnDb(() -> {
@ -657,25 +657,28 @@ public class LLLocalDictionary implements LLDictionary {
}
}
}
var updatedValuesToWrite = new ArrayList<Send<Buffer>>(mappedInputs.size());
var updatedValuesToWrite = new ArrayList<Buffer>(mappedInputs.size());
var valueChangedResult = new ArrayList<Boolean>(mappedInputs.size());
try {
for (var mappedInput : mappedInputs) {
try (var updatedValueToReceive = updateFunction
.apply(mappedInput.getT1(), mappedInput.getT2())) {
if (updatedValueToReceive != null) {
try (var updatedValue = updatedValueToReceive.receive()) {
try (var t3 = mappedInput.getT3().map(Send::receive).orElse(null)) {
valueChangedResult.add(!LLUtils.equals(t3, updatedValue));
}
updatedValuesToWrite.add(updatedValue.send());
var updatedValue = updateFunction.apply(mappedInput.getT1(), mappedInput.getT2());
try {
if (updatedValue != null) {
try (var t3 = mappedInput.getT3().map(Send::receive).orElse(null)) {
valueChangedResult.add(!LLUtils.equals(t3, updatedValue));
}
updatedValuesToWrite.add(updatedValue);
} else {
try (var t3 = mappedInput.getT3().map(Send::receive).orElse(null)) {
valueChangedResult.add(!LLUtils.equals(t3, null));
}
updatedValuesToWrite.add(null);
}
} catch (Throwable t) {
if (updatedValue != null) {
updatedValue.close();
}
throw t;
}
}
} finally {
@ -694,11 +697,12 @@ public class LLLocalDictionary implements LLDictionary {
);
int i = 0;
for (Tuple2<K, Buffer> entry : entriesWindow) {
var valueToWrite = updatedValuesToWrite.get(i);
if (valueToWrite == null) {
batch.delete(cfh, entry.getT2().send());
} else {
batch.put(cfh, entry.getT2().send(), valueToWrite);
try (var valueToWrite = updatedValuesToWrite.get(i)) {
if (valueToWrite == null) {
batch.delete(cfh, entry.getT2().send());
} else {
batch.put(cfh, entry.getT2().send(), valueToWrite.send());
}
}
i++;
}
@ -707,7 +711,7 @@ public class LLLocalDictionary implements LLDictionary {
} else {
int i = 0;
for (Tuple2<K, Buffer> entry : entriesWindow) {
db.put(EMPTY_WRITE_OPTIONS, entry.getT2().send(), updatedValuesToWrite.get(i));
db.put(EMPTY_WRITE_OPTIONS, entry.getT2().send(), updatedValuesToWrite.get(i).send());
i++;
}
}

View File

@ -66,7 +66,7 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
public @NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions,
@NotNull WriteOptions writeOptions,
Send<Buffer> keySend,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> updater,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
boolean existsAlmostCertainly,
UpdateAtomicResultMode returnMode) throws IOException, RocksDBException {
try (Buffer key = keySend.receive()) {
@ -107,13 +107,7 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
@Nullable Buffer newData;
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
try (var newDataToReceive = updater.apply(sentData)) {
if (newDataToReceive != null) {
newData = newDataToReceive.receive();
} else {
newData = null;
}
}
newData = updater.apply(sentData);
}
try (newData) {
var newDataArray = newData == null ? null : LLUtils.toArray(newData);

View File

@ -49,7 +49,7 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<Transa
public @NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions,
@NotNull WriteOptions writeOptions,
Send<Buffer> keySend,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> updater,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
boolean existsAlmostCertainly,
UpdateAtomicResultMode returnMode) throws IOException, RocksDBException {
try (Buffer key = keySend.receive()) {
@ -88,7 +88,7 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<Transa
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
try (var newDataToReceive = updater.apply(sentData)) {
if (newDataToReceive != null) {
newData = newDataToReceive.receive();
newData = newDataToReceive;
} else {
newData = null;
}

View File

@ -38,7 +38,7 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
@NotNull RocksIterator newIterator(@NotNull ReadOptions readOptions);
@NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions, @NotNull WriteOptions writeOptions,
Send<Buffer> keySend, SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> updater,
Send<Buffer> keySend, SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
boolean existsAlmostCertainly, UpdateAtomicResultMode returnMode) throws RocksDBException, IOException;
void delete(WriteOptions writeOptions, Send<Buffer> keySend) throws RocksDBException;

View File

@ -44,7 +44,7 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB>
public @NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions,
@NotNull WriteOptions writeOptions,
Send<Buffer> keySend,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> updater,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
boolean existsAlmostCertainly,
UpdateAtomicResultMode returnMode) throws IOException, RocksDBException {
try (Buffer key = keySend.receive()) {
@ -82,7 +82,7 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB>
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
try (var newDataToReceive = updater.apply(sentData)) {
if (newDataToReceive != null) {
newData = newDataToReceive.receive();
newData = newDataToReceive;
} else {
newData = null;
}

View File

@ -106,6 +106,17 @@ public class LLMemoryDictionary implements LLDictionary {
}
}
private Buffer kkB(ByteList bytesList) {
var buffer = getAllocator().allocate(bytesList.size());
try {
buffer.writeBytes(bytesList.toByteArray());
return buffer;
} catch (Throwable t) {
buffer.close();
throw t;
}
}
private BLRange r(Send<LLRange> send) {
try(var range = send.receive()) {
if (range.isAll()) {
@ -186,7 +197,7 @@ public class LLMemoryDictionary implements LLDictionary {
@Override
public Mono<Send<LLDelta>> updateAndGetDelta(Mono<Send<Buffer>> keyMono,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> updater,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
boolean existsAlmostCertainly) {
return Mono.usingWhen(keyMono,
key -> Mono.fromCallable(() -> {
@ -201,8 +212,7 @@ public class LLMemoryDictionary implements LLDictionary {
}
Buffer v;
try (var oldToSend = old != null ? kk(old) : null) {
var vToReceive = updater.apply(oldToSend);
v = vToReceive != null ? vToReceive.receive() : null;
v = updater.apply(oldToSend);
} catch (SerializationException e) {
throw new IllegalStateException(e);
}
@ -254,13 +264,13 @@ public class LLMemoryDictionary implements LLDictionary {
}
@Override
public Flux<Optional<Send<Buffer>>> getMulti(@Nullable LLSnapshot snapshot, Flux<Send<Buffer>> keys,
public Flux<Optional<Buffer>> getMulti(@Nullable LLSnapshot snapshot, Flux<Send<Buffer>> keys,
boolean existsAlmostCertainly) {
return keys.map(key -> {
try (var t2 = key.receive()) {
ByteList v = snapshots.get(resolveSnapshot(snapshot)).get(k(t2.copy().send()));
if (v != null) {
return Optional.of(kk(v));
return Optional.of(kkB(v));
} else {
return Optional.empty();
}
@ -287,7 +297,7 @@ public class LLMemoryDictionary implements LLDictionary {
@Override
public <K> Flux<Boolean> updateMulti(Flux<K> keys,
Flux<Send<Buffer>> serializedKeys,
KVSerializationFunction<K, @Nullable Send<Buffer>, @Nullable Send<Buffer>> updateFunction) {
KVSerializationFunction<K, @Nullable Send<Buffer>, @Nullable Buffer> updateFunction) {
return Flux.error(new UnsupportedOperationException("Not implemented"));
}

View File

@ -5,14 +5,15 @@ import org.apache.lucene.search.Collector;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Sinks.Many;
public class ReactiveCollector implements Collector {
private final Many<ScoreDoc> scoreDocsSink;
private final FluxSink<ScoreDoc> scoreDocsSink;
private int shardIndex;
public ReactiveCollector(Many<ScoreDoc> scoreDocsSink) {
public ReactiveCollector(FluxSink<ScoreDoc> scoreDocsSink) {
this.scoreDocsSink = scoreDocsSink;
}

View File

@ -4,13 +4,14 @@ import java.util.Collection;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.ScoreDoc;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Sinks.Many;
public class ReactiveCollectorManager implements CollectorManager<Collector, Void> {
private final Many<ScoreDoc> scoreDocsSink;
private final FluxSink<ScoreDoc> scoreDocsSink;
public ReactiveCollectorManager(Many<ScoreDoc> scoreDocsSink) {
public ReactiveCollectorManager(FluxSink<ScoreDoc> scoreDocsSink) {
this.scoreDocsSink = scoreDocsSink;
}

View File

@ -6,16 +6,17 @@ import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreDoc;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.publisher.Sinks.Many;
public class ReactiveLeafCollector implements LeafCollector {
private final LeafReaderContext leafReaderContext;
private final Many<ScoreDoc> scoreDocsSink;
private final FluxSink<ScoreDoc> scoreDocsSink;
private final int shardIndex;
public ReactiveLeafCollector(LeafReaderContext leafReaderContext, Many<ScoreDoc> scoreDocsSink, int shardIndex) {
public ReactiveLeafCollector(LeafReaderContext leafReaderContext, FluxSink<ScoreDoc> scoreDocsSink, int shardIndex) {
this.leafReaderContext = leafReaderContext;
this.scoreDocsSink = scoreDocsSink;
this.shardIndex = shardIndex;
@ -30,17 +31,10 @@ public class ReactiveLeafCollector implements LeafCollector {
public void collect(int i) {
LLUtils.ensureBlocking();
var scoreDoc = new ScoreDoc(leafReaderContext.docBase + i, 0, shardIndex);
boolean shouldRetry;
do {
var currentError = scoreDocsSink.tryEmitNext(scoreDoc);
shouldRetry = currentError == EmitResult.FAIL_NON_SERIALIZED || currentError == EmitResult.FAIL_OVERFLOW
|| currentError == EmitResult.FAIL_ZERO_SUBSCRIBER;
if (shouldRetry) {
LockSupport.parkNanos(10);
} else {
currentError.orThrow();
}
} while (shouldRetry);
while (scoreDocsSink.requestedFromDownstream() < 0 && !scoreDocsSink.isCancelled()) {
// 100ms
LockSupport.parkNanos(100L * 1000000L);
}
scoreDocsSink.next(scoreDoc);
}
}

View File

@ -15,6 +15,7 @@ import java.util.function.Supplier;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ScoreDoc;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitResult;
@ -61,44 +62,42 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher {
return Mono.fromCallable(() -> {
LLUtils.ensureBlocking();
Many<ScoreDoc> scoreDocsSink = Sinks.many().unicast().onBackpressureBuffer(QUEUE_SUPPLIER.get());
var cm = new ReactiveCollectorManager(scoreDocsSink);
AtomicInteger runningTasks = new AtomicInteger(0);
var shards = indexSearchers.shards();
runningTasks.addAndGet(shards.size());
int mutableShardIndex = 0;
for (IndexSearcher shard : shards) {
int shardIndex = mutableShardIndex++;
UNSCORED_UNSORTED_EXECUTOR.schedule(() -> {
try {
var collector = cm.newCollector();
assert queryParams.complete() == collector.scoreMode().isExhaustive();
assert queryParams
.getScoreModeOptional()
.map(scoreMode -> scoreMode == collector.scoreMode())
.orElse(true);
Flux<ScoreDoc> scoreDocsFlux = Flux.create(scoreDocsSink -> {
var cm = new ReactiveCollectorManager(scoreDocsSink);
collector.setShardIndex(shardIndex);
AtomicInteger runningTasks = new AtomicInteger(0);
shard.search(localQueryParams.query(), collector);
} catch (Throwable e) {
while (scoreDocsSink.tryEmitError(e) == EmitResult.FAIL_NON_SERIALIZED) {
LockSupport.parkNanos(10);
}
} finally {
if (runningTasks.decrementAndGet() <= 0) {
while (scoreDocsSink.tryEmitComplete() == EmitResult.FAIL_NON_SERIALIZED) {
LockSupport.parkNanos(10);
runningTasks.addAndGet(shards.size());
int mutableShardIndex = 0;
for (IndexSearcher shard : shards) {
int shardIndex = mutableShardIndex++;
UNSCORED_UNSORTED_EXECUTOR.schedule(() -> {
try {
var collector = cm.newCollector();
assert queryParams.complete() == collector.scoreMode().isExhaustive();
assert queryParams
.getScoreModeOptional()
.map(scoreMode -> scoreMode == collector.scoreMode())
.orElse(true);
collector.setShardIndex(shardIndex);
shard.search(localQueryParams.query(), collector);
} catch (Throwable e) {
scoreDocsSink.error(e);
} finally {
if (runningTasks.decrementAndGet() <= 0) {
scoreDocsSink.complete();
}
}
}
});
}
});
}
}, OverflowStrategy.BUFFER);
Flux<LLKeyScore> resultsFlux = LuceneUtils.convertHits(scoreDocsSink.asFlux(), shards, keyFieldName, false);
Flux<LLKeyScore> resultsFlux = LuceneUtils.convertHits(scoreDocsFlux, shards, keyFieldName, false);
var totalHitsCount = new TotalHitsCount(0, false);
Flux<LLKeyScore> mergedFluxes = resultsFlux

View File

@ -81,11 +81,11 @@ public abstract class TestLLDictionary {
private LLDictionary getDict(UpdateMode updateMode) {
var dict = DbTestUtils.tempDictionary(db, updateMode).blockOptional().orElseThrow();
var key1 = Mono.fromCallable(() -> fromString("test-key-1"));
var key2 = Mono.fromCallable(() -> fromString("test-key-2"));
var key3 = Mono.fromCallable(() -> fromString("test-key-3"));
var key4 = Mono.fromCallable(() -> fromString("test-key-4"));
var value = Mono.fromCallable(() -> fromString("test-value"));
var key1 = Mono.fromCallable(() -> fromString("test-key-1").send());
var key2 = Mono.fromCallable(() -> fromString("test-key-2").send());
var key3 = Mono.fromCallable(() -> fromString("test-key-3").send());
var key4 = Mono.fromCallable(() -> fromString("test-key-4").send());
var value = Mono.fromCallable(() -> fromString("test-value").send());
dict.put(key1, value, LLDictionaryResultType.VOID).block();
dict.put(key2, value, LLDictionaryResultType.VOID).block();
dict.put(key3, value, LLDictionaryResultType.VOID).block();
@ -93,7 +93,7 @@ public abstract class TestLLDictionary {
return dict;
}
private Send<Buffer> fromString(String s) {
private Buffer fromString(String s) {
var sb = s.getBytes(StandardCharsets.UTF_8);
try (var b = db.getAllocator().allocate(sb.length + 3 + 13)) {
assert b.writerOffset() == 0;
@ -104,7 +104,7 @@ public abstract class TestLLDictionary {
var part1 = b.split();
return LLUtils.compositeBuffer(db.getAllocator(), part1.send(), b.send()).send();
return LLUtils.compositeBuffer(db.getAllocator(), part1.send(), b.send());
}
}
@ -154,8 +154,8 @@ public abstract class TestLLDictionary {
@MethodSource("provideArguments")
public void testGet(UpdateMode updateMode) {
var dict = getDict(updateMode);
var keyEx = Mono.fromCallable(() -> fromString("test-key-1"));
var keyNonEx = Mono.fromCallable(() -> fromString("test-nonexistent"));
var keyEx = Mono.fromCallable(() -> fromString("test-key-1").send());
var keyNonEx = Mono.fromCallable(() -> fromString("test-nonexistent").send());
Assertions.assertEquals("test-value", run(dict.get(null, keyEx).map(this::toString).transform(LLUtils::handleDiscard)));
Assertions.assertEquals("test-value", run(dict.get(null, keyEx, true).map(this::toString).transform(LLUtils::handleDiscard)));
Assertions.assertEquals("test-value", run(dict.get(null, keyEx, false).map(this::toString).transform(LLUtils::handleDiscard)));
@ -168,8 +168,8 @@ public abstract class TestLLDictionary {
@MethodSource("providePutArguments")
public void testPutExisting(UpdateMode updateMode, LLDictionaryResultType resultType) {
var dict = getDict(updateMode);
var keyEx = Mono.fromCallable(() -> fromString("test-key-1"));
var value = Mono.fromCallable(() -> fromString("test-value"));
var keyEx = Mono.fromCallable(() -> fromString("test-key-1").send());
var value = Mono.fromCallable(() -> fromString("test-value").send());
var beforeSize = run(dict.sizeRange(null, RANGE_ALL, false));
@ -183,8 +183,8 @@ public abstract class TestLLDictionary {
@MethodSource("providePutArguments")
public void testPutNew(UpdateMode updateMode, LLDictionaryResultType resultType) {
var dict = getDict(updateMode);
var keyNonEx = Mono.fromCallable(() -> fromString("test-nonexistent"));
var value = Mono.fromCallable(() -> fromString("test-value"));
var keyNonEx = Mono.fromCallable(() -> fromString("test-nonexistent").send());
var value = Mono.fromCallable(() -> fromString("test-value").send());
var beforeSize = run(dict.sizeRange(null, RANGE_ALL, false));
@ -207,7 +207,7 @@ public abstract class TestLLDictionary {
@MethodSource("provideUpdateArguments")
public void testUpdateExisting(UpdateMode updateMode, UpdateReturnMode updateReturnMode) {
var dict = getDict(updateMode);
var keyEx = Mono.fromCallable(() -> fromString("test-key-1"));
var keyEx = Mono.fromCallable(() -> fromString("test-key-1").send());
var beforeSize = run(dict.sizeRange(null, RANGE_ALL, false));
long afterSize;
runVoid(updateMode == UpdateMode.DISALLOW,
@ -232,7 +232,7 @@ public abstract class TestLLDictionary {
public void testUpdateNew(UpdateMode updateMode, UpdateReturnMode updateReturnMode) {
int expected = updateMode == UpdateMode.DISALLOW ? 0 : 1;
var dict = getDict(updateMode);
var keyNonEx = Mono.fromCallable(() -> fromString("test-nonexistent"));
var keyNonEx = Mono.fromCallable(() -> fromString("test-nonexistent").send());
var beforeSize = run(dict.sizeRange(null, RANGE_ALL, false));
long afterSize;
runVoid(updateMode == UpdateMode.DISALLOW,

View File

@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Stream;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -159,29 +160,33 @@ public abstract class TestLLDictionaryLeaks {
var dict = getDict(updateMode);
var key = Mono.fromCallable(() -> fromString("test-key"));
runVoid(updateMode == UpdateMode.DISALLOW,
dict.update(key, old -> old, updateReturnMode, true).then().transform(LLUtils::handleDiscard)
dict.update(key, this::pass, updateReturnMode, true).then().transform(LLUtils::handleDiscard)
);
runVoid(updateMode == UpdateMode.DISALLOW,
dict.update(key, old -> old, updateReturnMode, false).then().transform(LLUtils::handleDiscard)
dict.update(key, this::pass, updateReturnMode, false).then().transform(LLUtils::handleDiscard)
);
runVoid(updateMode == UpdateMode.DISALLOW,
dict.update(key, old -> old, updateReturnMode).then().transform(LLUtils::handleDiscard)
dict.update(key, this::pass, updateReturnMode).then().transform(LLUtils::handleDiscard)
);
}
private Buffer pass(@Nullable Send<Buffer> old) {
return old == null ? null : old.receive();
}
@ParameterizedTest
@MethodSource("provideArguments")
public void testUpdateAndGetDelta(UpdateMode updateMode) {
var dict = getDict(updateMode);
var key = Mono.fromCallable(() -> fromString("test-key"));
runVoid(updateMode == UpdateMode.DISALLOW,
dict.updateAndGetDelta(key, old -> old, true).then().transform(LLUtils::handleDiscard)
dict.updateAndGetDelta(key, this::pass, true).then().transform(LLUtils::handleDiscard)
);
runVoid(updateMode == UpdateMode.DISALLOW,
dict.updateAndGetDelta(key, old -> old, false).then().transform(LLUtils::handleDiscard)
dict.updateAndGetDelta(key, this::pass, false).then().transform(LLUtils::handleDiscard)
);
runVoid(updateMode == UpdateMode.DISALLOW,
dict.updateAndGetDelta(key, old -> old).then().transform(LLUtils::handleDiscard)
dict.updateAndGetDelta(key, this::pass).then().transform(LLUtils::handleDiscard)
);
}

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.database.collections;
import io.net5.buffer.Unpooled;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import it.cavallium.dbengine.database.LLUtils;
import java.util.Arrays;
@ -49,18 +50,15 @@ public class TestRanges {
public void testNextRangeKey(byte[] prefixKey) {
byte[] firstRangeKey;
try (var firstRangeKeyBuf = DatabaseMapDictionaryDeep.firstRangeKey(alloc,
alloc.allocate(prefixKey.length).writeBytes(prefixKey).send(),
prefixKey.length, 7, 3)) {
Buffer firstRangeKeyBuf = alloc.allocate(prefixKey.length).writeBytes(prefixKey);
try (firstRangeKeyBuf) {
DatabaseMapDictionaryDeep.firstRangeKey(firstRangeKeyBuf, prefixKey.length, 7, 3);
firstRangeKey = LLUtils.toArray(firstRangeKeyBuf);
}
byte[] nextRangeKey;
try (var nextRangeKeyBuf = DatabaseMapDictionaryDeep.nextRangeKey(alloc,
alloc.allocate(prefixKey.length).writeBytes(prefixKey).send(),
prefixKey.length,
7,
3
)) {
Buffer nextRangeKeyBuf = alloc.allocate(prefixKey.length).writeBytes(prefixKey);
try (nextRangeKeyBuf) {
DatabaseMapDictionaryDeep.nextRangeKey(nextRangeKeyBuf, prefixKey.length, 7, 3);
nextRangeKey = LLUtils.toArray(nextRangeKeyBuf);
}
@ -88,71 +86,4 @@ public class TestRanges {
);
}
}
@Test
public void testNextRangeKeyWithSuffix() {
testNextRangeKeyWithSuffix(new byte[] {0x00, 0x01, (byte) 0xFF}, new byte[] {0x00, 0x00, 0x00});
testNextRangeKeyWithSuffix(new byte[] {0x00, 0x00, 0x01}, new byte[] {0x00, 0x00, 0x01});
testNextRangeKeyWithSuffix(new byte[] {0x00, 0x00, 0x02}, new byte[] {0x00, 0x00, 0x02});
testNextRangeKeyWithSuffix(new byte[] {0x00, 0x01, 0x02}, new byte[] {0x00, 0x01, 0x02});
testNextRangeKeyWithSuffix(new byte[] {0x00, 0x00, (byte) 0xFF}, new byte[] {0x00, 0x00, (byte) 0xFF});
testNextRangeKeyWithSuffix(new byte[] {0x00, 0x01, (byte) 0xFF}, new byte[] {0x00, 0x01, (byte) 0xFF});
testNextRangeKeyWithSuffix(new byte[] {0x00, (byte) 0xFF, (byte) 0xFF}, new byte[] {0x00, (byte) 0xFF, (byte) 0xFF});
}
@Test
public void testNextRangeKeyWithSuffix2() {
testNextRangeKeyWithSuffix(new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF}, new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF});
testNextRangeKeyWithSuffix(new byte[] {(byte) 0xFF, (byte) 0, (byte) 0xFF}, new byte[] {(byte) 0xFF, (byte) 0, (byte) 0xFF});
testNextRangeKeyWithSuffix(new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0}, new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0});
}
public void testNextRangeKeyWithSuffix(byte[] prefixKey, byte[] suffixKey) {
byte[] firstRangeKey;
try (var firstRangeKeyBuf = DatabaseMapDictionaryDeep.firstRangeKey(alloc,
alloc.allocate(prefixKey.length).writeBytes(prefixKey).send(),
alloc.allocate(suffixKey.length).writeBytes(suffixKey).send(),
prefixKey.length,
3,
7
)) {
firstRangeKey = LLUtils.toArray(firstRangeKeyBuf);
}
try (var nextRangeKeyBuf = DatabaseMapDictionaryDeep.nextRangeKey(alloc,
alloc.allocate(prefixKey.length).writeBytes(prefixKey).send(),
alloc.allocate(suffixKey.length).writeBytes(suffixKey).send(),
prefixKey.length,
3,
7
)) {
byte[] nextRangeKey = LLUtils.toArray(nextRangeKeyBuf);
if (Arrays.equals(prefixKey, new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF}) && Arrays.equals(suffixKey, new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF})) {
Assertions.assertArrayEquals(new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, 0}, nextRangeKey);
} else {
long biPrefix = 0;
var s = 0;
for (int i = (suffixKey.length) - 1; i >= 0; i--) {
biPrefix += ((long) (suffixKey[i] & 0xFF)) << s;
s += Byte.SIZE;
}
for (int i = (prefixKey.length) - 1; i >= 0; i--) {
biPrefix += ((long) (prefixKey[i] & 0xFF)) << s;
s += Byte.SIZE;
}
var nrPrefix = Arrays.copyOf(nextRangeKey, prefixKey.length + suffixKey.length);
long biNextPrefix = 0;
s = 0;
for (int i = (prefixKey.length + suffixKey.length) - 1; i >= 0; i--) {
biNextPrefix += ((long) (nrPrefix[i] & 0xFF)) << s;
s += Byte.SIZE;
}
Assertions.assertEquals(biPrefix + 1, biNextPrefix);
Assertions.assertArrayEquals(
new byte[7],
Arrays.copyOfRange(nextRangeKey, prefixKey.length + suffixKey.length, prefixKey.length + suffixKey.length + 7)
);
}
}
}
}