Use a new approach to completely avoid memory leaks

This commit is contained in:
Andrea Cavalli 2021-08-22 18:20:05 +02:00
parent 6ed31ab2a6
commit bc759c344d
23 changed files with 599 additions and 688 deletions

View File

@ -5,6 +5,7 @@ import static io.netty.buffer.Unpooled.wrappedUnmodifiableBuffer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.IllegalReferenceCountException;
import java.util.Arrays;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicInteger;
@ -14,16 +15,18 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class LLRange {
private static final LLRange RANGE_ALL = new LLRange(null, null);
private static final LLRange RANGE_ALL = new LLRange(null, null, false);
private final ByteBuf min;
private final ByteBuf max;
private final boolean releasable;
private final AtomicInteger refCnt = new AtomicInteger(1);
private LLRange(ByteBuf min, ByteBuf max) {
private LLRange(ByteBuf min, ByteBuf max, boolean releasable) {
assert min == null || min.refCnt() > 0;
assert max == null || max.refCnt() > 0;
this.min = min;
this.max = max;
this.releasable = releasable;
}
public static LLRange all() {
@ -31,23 +34,23 @@ public class LLRange {
}
public static LLRange from(ByteBuf min) {
return new LLRange(min, null);
return new LLRange(min, null, true);
}
public static LLRange to(ByteBuf max) {
return new LLRange(null, max);
return new LLRange(null, max, true);
}
public static LLRange single(ByteBuf single) {
try {
return new LLRange(single.retain(), single.retain());
return new LLRange(single.retain(), single.retain(), true);
} finally {
single.release();
}
}
public static LLRange of(ByteBuf min, ByteBuf max) {
return new LLRange(min, max);
return new LLRange(min, max, true);
}
public boolean isAll() {
@ -104,8 +107,11 @@ public class LLRange {
}
private void checkReleased() {
if (!releasable) {
return;
}
if (refCnt.get() <= 0) {
throw new IllegalStateException("Released");
throw new IllegalReferenceCountException(0);
}
}
@ -137,8 +143,11 @@ public class LLRange {
}
public LLRange retain() {
if (!releasable) {
return this;
}
if (refCnt.updateAndGet(refCnt -> refCnt <= 0 ? 0 : (refCnt + 1)) <= 0) {
throw new IllegalStateException("Released");
throw new IllegalReferenceCountException(0, 1);
}
if (min != null) {
min.retain();
@ -150,8 +159,11 @@ public class LLRange {
}
public void release() {
if (!releasable) {
return;
}
if (refCnt.decrementAndGet() < 0) {
throw new IllegalStateException("Already released");
throw new IllegalReferenceCountException(0, -1);
}
if (min != null) {
min.release();

View File

@ -488,4 +488,12 @@ public class LLUtils {
public static <R, V> boolean isDeltaChanged(Delta<V> delta) {
return !Objects.equals(delta.previous(), delta.current());
}
public static Mono<ByteBuf> lazyRetain(ByteBuf buf) {
return Mono.just(buf).map(ByteBuf::retain);
}
public static Mono<LLRange> lazyRetain(LLRange range) {
return Mono.just(range).map(LLRange::retain);
}
}

View File

@ -5,7 +5,6 @@ import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.ExtraKeyOperationResult;
import it.cavallium.dbengine.database.KeyOperationResult;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLUtils;
@ -40,7 +39,6 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
Serializer<U, ByteBuf> valueSerializer) {
// Do not retain or release or use the prefixKey here
super(dictionary, prefixKey, keySuffixSerializer, new SubStageGetterSingle<>(valueSerializer), 0);
prefixKey = null;
this.valueSerializer = valueSerializer;
}
@ -68,15 +66,13 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<Map<T, U>> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) {
return Flux
.defer(() -> dictionary.getRange(resolveSnapshot(snapshot), range.retain(), existsAlmostCertainly))
return dictionary
.getRange(resolveSnapshot(snapshot), rangeMono, existsAlmostCertainly)
.collectMap(
entry -> deserializeSuffix(stripPrefix(entry.getKey(), false)),
entry -> deserialize(entry.getValue()),
entry -> valueSerializer.deserialize(entry.getValue()),
HashMap::new)
.filter(map -> !map.isEmpty())
.doFirst(range::retain)
.doAfterTerminate(range::release);
.filter(map -> !map.isEmpty());
}
@Override
@ -86,16 +82,15 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
Mono.just(true),
b -> get(null, false),
b -> dictionary
.setRange(range.retain(),
.setRange(rangeMono,
Flux
.fromIterable(Collections.unmodifiableMap(value).entrySet())
.map(entry -> Map
.entry(this.toKey(serializeSuffix(entry.getKey())), serialize(entry.getValue()))
)
.entry(this.toKey(serializeSuffix(entry.getKey())),
valueSerializer.serialize(entry.getValue())
))
)
)
.doFirst(range::retain)
.doAfterTerminate(range::release);
);
}
@Override
@ -106,18 +101,12 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<Long> leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) {
return Mono
.defer(() -> dictionary.sizeRange(resolveSnapshot(snapshot), range.retain(), fast))
.doFirst(range::retain)
.doAfterTerminate(range::release);
return dictionary.sizeRange(resolveSnapshot(snapshot), rangeMono, fast);
}
@Override
public Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) {
return Mono
.defer(() -> dictionary.isRangeEmpty(resolveSnapshot(snapshot), range.retain()))
.doFirst(range::retain)
.doAfterTerminate(range::release);
return dictionary.isRangeEmpty(resolveSnapshot(snapshot), rangeMono);
}
@Override
@ -135,8 +124,8 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.using(
() -> toKey(serializeSuffix(keySuffix)),
keyBuf -> dictionary
.get(resolveSnapshot(snapshot), keyBuf.retain(), existsAlmostCertainly)
.map(this::deserialize),
.get(resolveSnapshot(snapshot), LLUtils.lazyRetain(keyBuf), existsAlmostCertainly)
.map(valueSerializer::deserialize),
ReferenceCounted::release
);
}
@ -150,10 +139,11 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.using(
() -> toKey(keySuffixBuf.retain()),
keyBuf -> Mono
.using(
() -> serialize(value),
.using(() -> valueSerializer.serialize(value),
valueBuf -> dictionary
.put(keyBuf.retain(), valueBuf.retain(), LLDictionaryResultType.VOID)
.put(LLUtils.lazyRetain(keyBuf),
LLUtils.lazyRetain(valueBuf),
LLDictionaryResultType.VOID)
.doOnNext(ReferenceCounted::release),
ReferenceCounted::release
),
@ -178,8 +168,8 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.using(
() -> toKey(serializeSuffix(keySuffix)),
keyBuf -> dictionary
.update(keyBuf.retain(), getSerializedUpdater(updater), updateReturnMode, existsAlmostCertainly)
.map(this::deserialize),
.update(LLUtils.lazyRetain(keyBuf), getSerializedUpdater(updater), updateReturnMode, existsAlmostCertainly)
.map(valueSerializer::deserialize),
ReferenceCounted::release
);
}
@ -192,8 +182,8 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.using(
() -> toKey(serializeSuffix(keySuffix)),
keyBuf -> dictionary
.updateAndGetDelta(keyBuf.retain(), getSerializedUpdater(updater), existsAlmostCertainly)
.transform(mono -> LLUtils.mapDelta(mono, this::deserialize)),
.updateAndGetDelta(LLUtils.lazyRetain(keyBuf), getSerializedUpdater(updater), existsAlmostCertainly)
.transform(mono -> LLUtils.mapDelta(mono, valueSerializer::deserialize)),
ReferenceCounted::release
);
}
@ -201,11 +191,11 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public Function<@Nullable ByteBuf, @Nullable ByteBuf> getSerializedUpdater(Function<@Nullable U, @Nullable U> updater) {
return oldSerialized -> {
try {
var result = updater.apply(oldSerialized == null ? null : this.deserialize(oldSerialized.retain()));
var result = updater.apply(oldSerialized == null ? null : valueSerializer.deserialize(oldSerialized.retain()));
if (result == null) {
return null;
} else {
return this.serialize(result);
return valueSerializer.serialize(result);
}
} finally {
if (oldSerialized != null) {
@ -218,11 +208,11 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public <X> BiFunction<@Nullable ByteBuf, X, @Nullable ByteBuf> getSerializedUpdater(BiFunction<@Nullable U, X, @Nullable U> updater) {
return (oldSerialized, extra) -> {
try {
var result = updater.apply(oldSerialized == null ? null : this.deserialize(oldSerialized.retain()), extra);
var result = updater.apply(oldSerialized == null ? null : valueSerializer.deserialize(oldSerialized.retain()), extra);
if (result == null) {
return null;
} else {
return this.serialize(result);
return valueSerializer.serialize(result);
}
} finally {
if (oldSerialized != null) {
@ -241,11 +231,12 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.using(
() -> toKey(keySuffixBuf.retain()),
keyBuf -> Mono
.using(
() -> serialize(value),
.using(() -> valueSerializer.serialize(value),
valueBuf -> dictionary
.put(keyBuf.retain(), valueBuf.retain(), LLDictionaryResultType.PREVIOUS_VALUE)
.map(this::deserialize),
.put(LLUtils.lazyRetain(keyBuf),
LLUtils.lazyRetain(valueBuf),
LLDictionaryResultType.PREVIOUS_VALUE)
.map(valueSerializer::deserialize),
ReferenceCounted::release
),
ReferenceCounted::release
@ -263,11 +254,13 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.using(
() -> toKey(keySuffixBuf.retain()),
keyBuf -> Mono
.using(
() -> serialize(value),
.using(() -> valueSerializer.serialize(value),
valueBuf -> dictionary
.put(keyBuf.retain(), valueBuf.retain(), LLDictionaryResultType.PREVIOUS_VALUE)
.map(this::deserialize)
.put(LLUtils.lazyRetain(keyBuf),
LLUtils.lazyRetain(valueBuf),
LLDictionaryResultType.PREVIOUS_VALUE
)
.map(valueSerializer::deserialize)
.map(oldValue -> !Objects.equals(oldValue, value))
.defaultIfEmpty(value != null),
ReferenceCounted::release
@ -284,7 +277,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.using(
() -> toKey(serializeSuffix(keySuffix)),
keyBuf -> dictionary
.remove(keyBuf.retain(), LLDictionaryResultType.VOID)
.remove(LLUtils.lazyRetain(keyBuf), LLDictionaryResultType.VOID)
.doOnNext(ReferenceCounted::release)
.then(),
ReferenceCounted::release
@ -297,8 +290,8 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.using(
() -> toKey(serializeSuffix(keySuffix)),
keyBuf -> dictionary
.remove(keyBuf.retain(), LLDictionaryResultType.PREVIOUS_VALUE)
.map(this::deserialize),
.remove(LLUtils.lazyRetain(keyBuf), LLDictionaryResultType.PREVIOUS_VALUE)
.map(valueSerializer::deserialize),
ReferenceCounted::release
);
}
@ -309,7 +302,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.using(
() -> toKey(serializeSuffix(keySuffix)),
keyBuf -> dictionary
.remove(keyBuf.retain(), LLDictionaryResultType.PREVIOUS_VALUE_EXISTENCE)
.remove(LLUtils.lazyRetain(keyBuf), LLDictionaryResultType.PREVIOUS_VALUE_EXISTENCE)
.map(LLUtils::responseToBoolean),
ReferenceCounted::release
);
@ -317,28 +310,25 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Flux<Entry<T, Optional<U>>> getMulti(@Nullable CompositeSnapshot snapshot, Flux<T> keys, boolean existsAlmostCertainly) {
return Flux
.defer(() -> dictionary
.getMulti(resolveSnapshot(snapshot), keys.flatMap(keySuffix -> Mono.fromCallable(() -> {
ByteBuf keySuffixBuf = serializeSuffix(keySuffix);
try {
return Tuples.of(keySuffix, toKey(keySuffixBuf.retain()));
} finally {
keySuffixBuf.release();
}
})), existsAlmostCertainly)
)
.flatMapSequential(entry -> {
entry.getT2().release();
return Mono
.fromCallable(() -> Map.entry(entry.getT1(), entry.getT3().map(this::deserialize)));
});
return dictionary
.getMulti(resolveSnapshot(snapshot), keys.flatMap(keySuffix -> Mono.fromCallable(() -> {
ByteBuf keySuffixBuf = serializeSuffix(keySuffix);
try {
return Tuples.of(keySuffix, toKey(keySuffixBuf.retain()));
} finally {
keySuffixBuf.release();
}
})), existsAlmostCertainly)
.flatMapSequential(entry -> {
entry.getT2().release();
return Mono.fromCallable(() -> Map.entry(entry.getT1(), entry.getT3().map(valueSerializer::deserialize)));
});
}
private Entry<ByteBuf, ByteBuf> serializeEntry(T key, U value) {
ByteBuf serializedKey = toKey(serializeSuffix(key));
try {
ByteBuf serializedValue = serialize(value);
ByteBuf serializedValue = valueSerializer.serialize(value);
try {
return Map.entry(serializedKey.retain(), serializedValue.retain());
} finally {
@ -393,9 +383,9 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot) {
return Flux
.defer(() -> dictionary.getRangeKeys(resolveSnapshot(snapshot), range.retain()))
.<Entry<T, DatabaseStageEntry<U>>>map(key -> {
return dictionary
.getRangeKeys(resolveSnapshot(snapshot), rangeMono)
.map(key -> {
ByteBuf keySuffixWithExt = stripPrefix(key.retain(), false);
try {
try {
@ -411,15 +401,13 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
} finally {
key.release();
}
})
.doFirst(range::retain)
.doAfterTerminate(range::release);
});
}
@Override
public Flux<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot) {
return Flux
.defer(() -> dictionary.getRange(resolveSnapshot(snapshot), range.retain()))
return dictionary
.getRange(resolveSnapshot(snapshot), rangeMono)
.map(serializedEntry -> Map.entry(
deserializeSuffix(stripPrefix(serializedEntry.getKey(), false)),
valueSerializer.deserialize(serializedEntry.getValue())
@ -431,9 +419,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
if (uncastedEntry.getValue() instanceof ByteBuf byteBuf) {
byteBuf.release();
}
})
.doFirst(range::retain)
.doAfterTerminate(range::release);
});
}
@Override
@ -443,46 +429,28 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
Mono.just(true),
b -> getAllValues(null),
b -> dictionary
.setRange(range.retain(),
entries.map(entry ->
Map.entry(toKey(serializeSuffix(entry.getKey())), serialize(entry.getValue()))
)
.setRange(rangeMono,
entries.map(entry -> Map.entry(toKey(serializeSuffix(entry.getKey())),
valueSerializer.serialize(entry.getValue())
))
)
)
.doFirst(range::retain)
.doAfterTerminate(range::release);
);
}
@Override
public Mono<Void> clear() {
return Mono
.defer(() -> {
if (range.isAll()) {
return dictionary.clear();
} else if (range.isSingle()) {
return dictionary
.remove(range.getSingle().retain(), LLDictionaryResultType.VOID)
.doOnNext(ReferenceCounted::release)
.then();
} else {
return dictionary.setRange(range.retain(), Flux.empty());
}
})
.doFirst(range::retain)
.doAfterTerminate(range::release);
return Mono.defer(() -> {
if (range.isAll()) {
return dictionary.clear();
} else if (range.isSingle()) {
return dictionary
.remove(LLUtils.lazyRetain(range.getSingle()), LLDictionaryResultType.VOID)
.doOnNext(ReferenceCounted::release)
.then();
} else {
return dictionary.setRange(rangeMono, Flux.empty());
}
});
}
/**
* This method is just a shorter version than valueSerializer::deserialize
*/
private U deserialize(ByteBuf bytes) {
return valueSerializer.deserialize(bytes);
}
/**
* This method is just a shorter version than valueSerializer::serialize
*/
private ByteBuf serialize(U bytes) {
return valueSerializer.serialize(bytes);
}
}

View File

@ -2,10 +2,10 @@ package it.cavallium.dbengine.database.collections;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.IllegalReferenceCountException;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLRange;
@ -13,22 +13,16 @@ import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.disk.LLLocalDictionary;
import it.cavallium.dbengine.database.disk.ReleasableSlice;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
// todo: implement optimized methods
// todo: implement optimized methods (which?)
public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implements DatabaseStageMap<T, U, US> {
protected final LLDictionary dictionary;
@ -40,6 +34,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
protected final int keySuffixLength;
protected final int keyExtLength;
protected final LLRange range;
protected final Mono<LLRange> rangeMono;
private volatile boolean released;
private static ByteBuf incrementPrefix(ByteBufAllocator alloc, ByteBuf originalKey, int prefixLength) {
@ -90,11 +85,19 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
}
}
static ByteBuf firstRangeKey(ByteBufAllocator alloc, ByteBuf prefixKey, int prefixLength, int suffixLength, int extLength) {
static ByteBuf firstRangeKey(ByteBufAllocator alloc,
ByteBuf prefixKey,
int prefixLength,
int suffixLength,
int extLength) {
return zeroFillKeySuffixAndExt(alloc, prefixKey, prefixLength, suffixLength, extLength);
}
static ByteBuf nextRangeKey(ByteBufAllocator alloc, ByteBuf prefixKey, int prefixLength, int suffixLength, int extLength) {
static ByteBuf nextRangeKey(ByteBufAllocator alloc,
ByteBuf prefixKey,
int prefixLength,
int suffixLength,
int extLength) {
try {
ByteBuf nonIncremented = zeroFillKeySuffixAndExt(alloc, prefixKey.retain(), prefixLength, suffixLength, extLength);
try {
@ -107,7 +110,11 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
}
}
protected static ByteBuf zeroFillKeySuffixAndExt(ByteBufAllocator alloc, ByteBuf prefixKey, int prefixLength, int suffixLength, int extLength) {
protected static ByteBuf zeroFillKeySuffixAndExt(ByteBufAllocator alloc,
ByteBuf prefixKey,
int prefixLength,
int suffixLength,
int extLength) {
try {
assert prefixKey.readableBytes() == prefixLength;
assert suffixLength > 0;
@ -262,6 +269,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
assert keyPrefix.refCnt() > 0;
assert keyPrefixLength == 0 || !LLUtils.equals(firstKey, nextRangeKey);
this.range = keyPrefixLength == 0 ? LLRange.all() : LLRange.of(firstKey.retain(), nextRangeKey.retain());
this.rangeMono = LLUtils.lazyRetain(this.range);
assert subStageKeysConsistency(keyPrefixLength + keySuffixLength + keyExtLength);
} finally {
nextRangeKey.release();
@ -371,18 +379,12 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
@Override
public Mono<Long> leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) {
return Mono
.defer(() -> dictionary.sizeRange(resolveSnapshot(snapshot), range.retain(), fast))
.doFirst(range::retain)
.doAfterTerminate(range::release);
return dictionary.sizeRange(resolveSnapshot(snapshot), rangeMono, fast);
}
@Override
public Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) {
return Mono
.defer(() -> dictionary.isRangeEmpty(resolveSnapshot(snapshot), range.retain()))
.doFirst(range::retain)
.doAfterTerminate(range::release);
return dictionary.isRangeEmpty(resolveSnapshot(snapshot), rangeMono);
}
@Override
@ -391,32 +393,26 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
.using(
() -> serializeSuffix(keySuffix),
keySuffixData -> {
Mono<List<ByteBuf>> debuggingKeysMono = Mono
.defer(() -> {
Flux<ByteBuf> debuggingKeysFlux = Mono
.<List<ByteBuf>>defer(() -> {
if (LLLocalDictionary.DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED
&& this.subStageGetter.needsDebuggingKeyFlux()) {
return Flux
.using(
() -> toExtRange(keySuffixData.retain()),
extRangeBuf -> this.dictionary
.getRangeKeys(resolveSnapshot(snapshot), extRangeBuf.retain()),
.getRangeKeys(resolveSnapshot(snapshot), LLUtils.lazyRetain(extRangeBuf)),
LLRange::release
)
.collectList();
} else {
return Mono.just(List.of());
}
});
return Mono
.using(
() -> toKeyWithoutExt(keySuffixData.retain()),
keyBuf -> debuggingKeysMono
.flatMap(debuggingKeysList -> this.subStageGetter
.subStage(dictionary, snapshot, keyBuf.retain(), debuggingKeysList)
),
ReferenceCounted::release
)
.doOnDiscard(DatabaseStage.class, DatabaseStage::release);
})
.flatMapIterable(it -> it);
Mono<ByteBuf> keyBufMono = LLUtils.lazyRetain(toKeyWithoutExt(keySuffixData.retain()));
return this.subStageGetter
.subStage(dictionary, snapshot, keyBufMono, debuggingKeysFlux);
},
ReferenceCounted::release
)
@ -430,10 +426,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
@Override
public Flux<BadBlock> badBlocks() {
return Flux
.defer(() -> dictionary.badBlocks(range.retain()))
.doFirst(range::retain)
.doAfterTerminate(range::release);
return dictionary.badBlocks(rangeMono);
}
private static record GroupBuffers(ByteBuf groupKeyWithExt, ByteBuf groupKeyWithoutExt, ByteBuf groupSuffix) {}
@ -443,9 +436,9 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
return Flux
.defer(() -> {
if (LLLocalDictionary.DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED && this.subStageGetter.needsDebuggingKeyFlux()) {
return Flux
.defer(() -> dictionary.getRangeKeysGrouped(resolveSnapshot(snapshot), range.retain(), keyPrefixLength + keySuffixLength))
.flatMapSequential(rangeKeys -> Flux
return dictionary
.getRangeKeysGrouped(resolveSnapshot(snapshot), rangeMono, keyPrefixLength + keySuffixLength)
.concatMap(rangeKeys -> Flux
.using(
() -> {
assert this.subStageGetter.isMultiKey() || rangeKeys.size() == 1;
@ -462,8 +455,8 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
.then(this.subStageGetter
.subStage(dictionary,
snapshot,
buffers.groupKeyWithoutExt.retain(),
rangeKeys.stream().map(ByteBuf::retain).collect(Collectors.toList())
LLUtils.lazyRetain(buffers.groupKeyWithoutExt),
Flux.fromIterable(rangeKeys).map(ByteBuf::retain)
)
.map(us -> Map.entry(this.deserializeSuffix(buffers.groupSuffix.retain()), us))
),
@ -488,7 +481,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
});
} else {
return Flux
.defer(() -> dictionary.getRangeKeyPrefixes(resolveSnapshot(snapshot), range.retain(), keyPrefixLength + keySuffixLength))
.defer(() -> dictionary.getRangeKeyPrefixes(resolveSnapshot(snapshot), rangeMono, keyPrefixLength + keySuffixLength))
.flatMapSequential(groupKeyWithoutExt -> Mono
.using(
() -> {
@ -499,17 +492,15 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
groupSuffix -> this.subStageGetter
.subStage(dictionary,
snapshot,
groupKeyWithoutExt.retain(),
List.of()
LLUtils.lazyRetain(groupKeyWithoutExt),
Flux.empty()
)
.map(us -> Map.entry(this.deserializeSuffix(groupSuffix.retain()), us)),
ReferenceCounted::release
)
);
}
})
.doFirst(range::retain)
.doAfterTerminate(range::release);
});
}
private boolean subStageKeysConsistency(int totalKeyLength) {
@ -530,17 +521,8 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
.getAllValues(null)
.concatWith(this
.clear()
.then(entries
.flatMap(entry -> this
.at(null, entry.getKey())
.flatMap(us -> us
.set(entry.getValue())
.doAfterTerminate(us::release)
)
)
.doOnDiscard(DatabaseStage.class, DatabaseStage::release)
.then(Mono.empty())
)
.then(this.putMulti(entries))
.then(Mono.empty())
);
}
@ -552,11 +534,11 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
return dictionary.clear();
} else if (range.isSingle()) {
return dictionary
.remove(range.getSingle().retain(), LLDictionaryResultType.VOID)
.remove(LLUtils.lazyRetain(range.getSingle()), LLDictionaryResultType.VOID)
.doOnNext(ReferenceCounted::release)
.then();
} else {
return dictionary.setRange(range.retain(), Flux.empty());
return dictionary.setRange(LLUtils.lazyRetain(range), Flux.empty());
}
});
}
@ -588,7 +570,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
this.range.release();
this.keyPrefix.release();
} else {
throw new IllegalStateException("Already released");
throw new IllegalReferenceCountException(0, -1);
}
}
}

View File

@ -188,10 +188,7 @@ public class DatabaseMapDictionaryHashed<T, U, TH> implements DatabaseStageMap<T
.flatMap(bucket -> Flux
.fromIterable(bucket)
.map(Entry::getKey)
.flatMap(key -> this
.at(snapshot, key)
.flatMap(stage -> Mono.just(Map.entry(key, stage)).doAfterTerminate(stage::release))
)
.flatMap(key -> this.at(snapshot, key).map(stage -> Map.entry(key, stage)))
);
}
@ -207,13 +204,13 @@ public class DatabaseMapDictionaryHashed<T, U, TH> implements DatabaseStageMap<T
@Override
public Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) {
return entries
.flatMap(entry -> this
.at(null, entry.getKey())
.flatMap(stage -> stage
.setAndGetPrevious(entry.getValue())
.map(prev -> Map.entry(entry.getKey(), prev))
.doAfterTerminate(stage::release))
);
.flatMap(entry -> Flux.usingWhen(
this.at(null, entry.getKey()),
stage -> stage
.setAndGetPrevious(entry.getValue())
.map(prev -> Map.entry(entry.getKey(), prev)),
stage -> Mono.fromRunnable(stage::release)
));
}
@Override

View File

@ -4,7 +4,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
@ -13,24 +12,23 @@ import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.unimi.dsi.fastutil.bytes.ByteList;
import java.util.Optional;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import static io.netty.buffer.Unpooled.*;
public class DatabaseSingle<U> implements DatabaseStageEntry<U> {
private final LLDictionary dictionary;
private final ByteBuf key;
private final Mono<ByteBuf> keyMono;
private final Serializer<U, ByteBuf> serializer;
public DatabaseSingle(LLDictionary dictionary, ByteBuf key, Serializer<U, ByteBuf> serializer) {
try {
this.dictionary = dictionary;
this.key = key.retain();
this.keyMono = LLUtils.lazyRetain(this.key);
this.serializer = serializer;
} finally {
key.release();
@ -47,101 +45,70 @@ public class DatabaseSingle<U> implements DatabaseStageEntry<U> {
@Override
public Mono<U> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) {
return Mono
.defer(() -> dictionary.get(resolveSnapshot(snapshot), key.retain(), existsAlmostCertainly))
.map(this::deserialize)
.doFirst(key::retain)
.doAfterTerminate(key::release);
return dictionary
.get(resolveSnapshot(snapshot), keyMono, existsAlmostCertainly)
.map(serializer::deserialize);
}
@Override
public Mono<U> setAndGetPrevious(U value) {
return Mono
.using(
() -> serialize(value),
.using(() -> serializer.serialize(value),
valueByteBuf -> dictionary
.put(key.retain(), valueByteBuf.retain(), LLDictionaryResultType.PREVIOUS_VALUE)
.map(this::deserialize),
.put(keyMono, LLUtils.lazyRetain(valueByteBuf), LLDictionaryResultType.PREVIOUS_VALUE)
.map(serializer::deserialize),
ReferenceCounted::release
)
.doFirst(key::retain)
.doAfterTerminate(key::release);
);
}
@Override
public Mono<U> update(Function<@Nullable U, @Nullable U> updater,
UpdateReturnMode updateReturnMode,
boolean existsAlmostCertainly) {
return Mono
.defer(() -> dictionary.update(key.retain(), (oldValueSer) -> {
var result = updater.apply(oldValueSer == null ? null : this.deserialize(oldValueSer));
return dictionary
.update(keyMono, (oldValueSer) -> {
var result = updater.apply(oldValueSer == null ? null : serializer.deserialize(oldValueSer));
if (result == null) {
return null;
} else {
return this.serialize(result);
return serializer.serialize(result);
}
}, updateReturnMode, existsAlmostCertainly))
.map(this::deserialize)
.doFirst(key::retain)
.doAfterTerminate(key::release);
}, updateReturnMode, existsAlmostCertainly)
.map(serializer::deserialize);
}
@Override
public Mono<Delta<U>> updateAndGetDelta(Function<@Nullable U, @Nullable U> updater,
boolean existsAlmostCertainly) {
return Mono
.defer(() -> dictionary.updateAndGetDelta(key.retain(), (oldValueSer) -> {
var result = updater.apply(oldValueSer == null ? null : this.deserialize(oldValueSer));
return dictionary
.updateAndGetDelta(keyMono, (oldValueSer) -> {
var result = updater.apply(oldValueSer == null ? null : serializer.deserialize(oldValueSer));
if (result == null) {
return null;
} else {
return this.serialize(result);
return serializer.serialize(result);
}
}, existsAlmostCertainly).transform(mono -> LLUtils.mapDelta(mono, this::deserialize)))
.doFirst(key::retain)
.doAfterTerminate(key::release);
}, existsAlmostCertainly).transform(mono -> LLUtils.mapDelta(mono, serializer::deserialize));
}
@Override
public Mono<U> clearAndGetPrevious() {
return Mono
.defer(() -> dictionary
.remove(key.retain(), LLDictionaryResultType.PREVIOUS_VALUE)
)
.map(this::deserialize)
.doFirst(key::retain)
.doAfterTerminate(key::release);
return dictionary
.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE)
.map(serializer::deserialize);
}
@Override
public Mono<Long> leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) {
return Mono
.defer(() -> dictionary
.isRangeEmpty(resolveSnapshot(snapshot), LLRange.single(key.retain()))
)
.map(empty -> empty ? 0L : 1L)
.doFirst(key::retain)
.doAfterTerminate(key::release);
return dictionary
.isRangeEmpty(resolveSnapshot(snapshot), keyMono.map(LLRange::single))
.map(empty -> empty ? 0L : 1L);
}
@Override
public Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) {
return Mono
.defer(() -> dictionary
.isRangeEmpty(resolveSnapshot(snapshot), LLRange.single(key.retain()))
)
.doFirst(key::retain)
.doAfterTerminate(key::release);
}
//todo: temporary wrapper. convert the whole class to buffers
private U deserialize(ByteBuf bytes) {
return serializer.deserialize(bytes);
}
//todo: temporary wrapper. convert the whole class to buffers
private ByteBuf serialize(U bytes) {
return serializer.serialize(bytes);
return dictionary
.isRangeEmpty(resolveSnapshot(snapshot), keyMono.map(LLRange::single));
}
@Override
@ -151,7 +118,6 @@ public class DatabaseSingle<U> implements DatabaseStageEntry<U> {
@Override
public Flux<BadBlock> badBlocks() {
return Flux
.defer(() -> dictionary.badBlocks(LLRange.single(key.retain())));
return dictionary.badBlocks(keyMono.map(LLRange::single));
}
}

View File

@ -30,7 +30,11 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
Mono<US> at(@Nullable CompositeSnapshot snapshot, T key);
default Mono<U> getValue(@Nullable CompositeSnapshot snapshot, T key, boolean existsAlmostCertainly) {
return this.at(snapshot, key).flatMap(v -> v.get(snapshot, existsAlmostCertainly).doAfterTerminate(v::release));
return Mono.usingWhen(
this.at(snapshot, key),
stage -> stage.get(snapshot, existsAlmostCertainly),
stage -> Mono.fromRunnable(stage::release)
);
}
default Mono<U> getValue(@Nullable CompositeSnapshot snapshot, T key) {
@ -42,19 +46,24 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
}
default Mono<Void> putValue(T key, U value) {
return at(null, key).single().flatMap(v -> v.set(value).doAfterTerminate(v::release));
return Mono.usingWhen(
at(null, key).single(),
stage -> stage.set(value),
stage -> Mono.fromRunnable(stage::release)
);
}
Mono<UpdateMode> getUpdateMode();
default Mono<U> updateValue(T key, UpdateReturnMode updateReturnMode, boolean existsAlmostCertainly, Function<@Nullable U, @Nullable U> updater) {
return this
.at(null, key)
.single()
.flatMap(v -> v
.update(updater, updateReturnMode, existsAlmostCertainly)
.doAfterTerminate(v::release)
);
default Mono<U> updateValue(T key,
UpdateReturnMode updateReturnMode,
boolean existsAlmostCertainly,
Function<@Nullable U, @Nullable U> updater) {
return Mono.usingWhen(
this.at(null, key).single(),
stage -> stage.update(updater, updateReturnMode, existsAlmostCertainly),
stage -> Mono.fromRunnable(stage::release)
);
}
default <X> Flux<ExtraKeyOperationResult<T, X>> updateMulti(Flux<Tuple2<T, X>> entries, BiFunction<@Nullable U, X, @Nullable U> updater) {
@ -77,14 +86,14 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
return updateValueAndGetDelta(key, existsAlmostCertainly, updater).map(LLUtils::isDeltaChanged).single();
}
default Mono<Delta<U>> updateValueAndGetDelta(T key, boolean existsAlmostCertainly, Function<@Nullable U, @Nullable U> updater) {
return this
.at(null, key)
.single()
.flatMap(v -> v
.updateAndGetDelta(updater, existsAlmostCertainly)
.doAfterTerminate(v::release)
);
default Mono<Delta<U>> updateValueAndGetDelta(T key,
boolean existsAlmostCertainly,
Function<@Nullable U, @Nullable U> updater) {
return Mono.usingWhen(
this.at(null, key).single(),
stage -> stage.updateAndGetDelta(updater, existsAlmostCertainly),
stage -> Mono.fromRunnable(stage::release)
);
}
default Mono<Delta<U>> updateValueAndGetDelta(T key, Function<@Nullable U, @Nullable U> updater) {
@ -92,17 +101,22 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
}
default Mono<U> putValueAndGetPrevious(T key, U value) {
return at(null, key).single().flatMap(v -> v.setAndGetPrevious(value).doAfterTerminate(v::release));
return Mono.usingWhen(
at(null, key).single(),
stage -> stage.setAndGetPrevious(value),
stage -> Mono.fromRunnable(stage::release)
);
}
/**
*
* @param key
* @param value
* @return true if the key was associated with any value, false if the key didn't exist.
*/
default Mono<Boolean> putValueAndGetChanged(T key, U value) {
return at(null, key).single().flatMap(v -> v.setAndGetChanged(value).doAfterTerminate(v::release)).single();
return Mono.usingWhen(
at(null, key).single(),
stage -> stage.setAndGetChanged(value),
stage -> Mono.fromRunnable(stage::release)
).single();
}
default Mono<Void> remove(T key) {
@ -110,7 +124,11 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
}
default Mono<U> removeAndGetPrevious(T key) {
return at(null, key).flatMap(v -> v.clearAndGetPrevious().doAfterTerminate(v::release));
return Mono.usingWhen(
at(null, key),
DatabaseStage::clearAndGetPrevious,
stage -> Mono.fromRunnable(stage::release)
);
}
default Mono<Boolean> removeAndGetStatus(T key) {

View File

@ -13,8 +13,8 @@ public interface SubStageGetter<U, US extends DatabaseStage<U>> {
Mono<US> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot,
ByteBuf prefixKey,
List<ByteBuf> debuggingKeyFlux);
Mono<ByteBuf> prefixKey,
Flux<ByteBuf> debuggingKeyFlux);
boolean isMultiKey();

View File

@ -1,7 +1,6 @@
package it.cavallium.dbengine.database.collections;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.serialization.Serializer;
@ -47,39 +46,39 @@ public class SubStageGetterHashMap<T, U, TH> implements
@Override
public Mono<DatabaseMapDictionaryHashed<T, U, TH>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot,
ByteBuf prefixKey,
List<ByteBuf> debuggingKeys) {
try {
return Mono
.defer(() -> {
if (assertsEnabled && enableAssertionsWhenUsingAssertions) {
return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys);
} else {
return Mono
.fromCallable(() -> {
for (ByteBuf key : debuggingKeys) {
key.release();
Mono<ByteBuf> prefixKeyMono,
Flux<ByteBuf> debuggingKeysFlux) {
return Mono.usingWhen(
prefixKeyMono,
prefixKey -> Mono
.fromSupplier(() -> DatabaseMapDictionaryHashed
.tail(dictionary,
prefixKey.retain(),
keySerializer,
valueSerializer,
keyHashFunction,
keyHashSerializer
)
)
.transform(mono -> {
if (assertsEnabled && enableAssertionsWhenUsingAssertions) {
return debuggingKeysFlux.handle((key, sink) -> {
try {
if (key.readableBytes() != prefixKey.readableBytes() + getKeyHashBinaryLength()) {
sink.error(new IndexOutOfBoundsException());
} else {
sink.complete();
}
return null;
});
}
})
.then(Mono
.fromSupplier(() -> DatabaseMapDictionaryHashed
.tail(dictionary,
prefixKey.retain(),
keySerializer,
valueSerializer,
keyHashFunction,
keyHashSerializer
)
)
)
.doFirst(prefixKey::retain)
.doAfterTerminate(prefixKey::release);
} finally {
prefixKey.release();
}
} finally {
key.release();
}
}).then(mono);
} else {
return mono;
}
}),
prefixKey -> Mono.fromRunnable(prefixKey::release)
);
}
@Override
@ -92,23 +91,6 @@ public class SubStageGetterHashMap<T, U, TH> implements
return assertsEnabled && enableAssertionsWhenUsingAssertions;
}
private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, List<ByteBuf> keys) {
return Mono
.fromCallable(() -> {
try {
for (ByteBuf key : keys) {
assert key.readableBytes() == prefixKey.readableBytes() + getKeyHashBinaryLength();
}
} finally {
prefixKey.release();
for (ByteBuf key : keys) {
key.release();
}
}
return null;
});
}
public int getKeyHashBinaryLength() {
return keyHashSerializer.getSerializedBinaryLength();
}

View File

@ -1,20 +1,18 @@
package it.cavallium.dbengine.database.collections;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@SuppressWarnings("unused")
@SuppressWarnings({"unused", "ClassCanBeRecord"})
public class SubStageGetterHashSet<T, TH> implements
SubStageGetter<Map<T, Nothing>, DatabaseSetDictionaryHashed<T, TH>> {
@ -45,38 +43,37 @@ public class SubStageGetterHashSet<T, TH> implements
@Override
public Mono<DatabaseSetDictionaryHashed<T, TH>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot,
ByteBuf prefixKey,
List<ByteBuf> debuggingKeys) {
try {
return Mono
.defer(() -> {
if (assertsEnabled && enableAssertionsWhenUsingAssertions) {
return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys);
} else {
return Mono
.fromCallable(() -> {
for (ByteBuf key : debuggingKeys) {
key.release();
Mono<ByteBuf> prefixKeyMono,
Flux<ByteBuf> debuggingKeysFlux) {
return Mono.usingWhen(prefixKeyMono,
prefixKey -> Mono
.fromSupplier(() -> DatabaseSetDictionaryHashed
.tail(dictionary,
prefixKey.retain(),
keySerializer,
keyHashFunction,
keyHashSerializer
)
)
.transform(mono -> {
if (assertsEnabled && enableAssertionsWhenUsingAssertions) {
return debuggingKeysFlux.handle((key, sink) -> {
try {
if (key.readableBytes() != prefixKey.readableBytes() + getKeyHashBinaryLength()) {
sink.error(new IndexOutOfBoundsException());
} else {
sink.complete();
}
return null;
});
}
})
.then(Mono
.fromSupplier(() -> DatabaseSetDictionaryHashed
.tail(dictionary,
prefixKey.retain(),
keySerializer,
keyHashFunction,
keyHashSerializer
)
)
)
.doFirst(prefixKey::retain)
.doAfterTerminate(prefixKey::release);
} finally {
prefixKey.release();
}
} finally {
key.release();
}
}).then(mono);
} else {
return mono;
}
}),
prefixKey -> Mono.fromRunnable(prefixKey::release)
);
}
@Override
@ -89,23 +86,6 @@ public class SubStageGetterHashSet<T, TH> implements
return assertsEnabled && enableAssertionsWhenUsingAssertions;
}
private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, List<ByteBuf> keys) {
return Mono
.fromCallable(() -> {
try {
for (ByteBuf key : keys) {
assert key.readableBytes() == prefixKey.readableBytes() + getKeyHashBinaryLength();
}
} finally {
prefixKey.release();
for (ByteBuf key : keys) {
key.release();
}
}
return null;
});
}
public int getKeyHashBinaryLength() {
return keyHashSerializer.getSerializedBinaryLength();
}

View File

@ -37,37 +37,36 @@ public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, Databa
@Override
public Mono<DatabaseMapDictionary<T, U>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot,
ByteBuf prefixKey,
List<ByteBuf> debuggingKeys) {
try {
return Mono
.defer(() -> {
if (assertsEnabled && enableAssertionsWhenUsingAssertions) {
return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys);
} else {
return Mono
.fromCallable(() -> {
for (ByteBuf key : debuggingKeys) {
key.release();
Mono<ByteBuf> prefixKeyMono,
Flux<ByteBuf> debuggingKeysFlux) {
return Mono.usingWhen(prefixKeyMono,
prefixKey -> Mono
.fromSupplier(() -> DatabaseMapDictionary
.tail(dictionary,
prefixKey.retain(),
keySerializer,
valueSerializer
)
)
.transform(mono -> {
if (assertsEnabled && enableAssertionsWhenUsingAssertions) {
return debuggingKeysFlux.handle((key, sink) -> {
try {
if (key.readableBytes() != prefixKey.readableBytes() + getKeyBinaryLength()) {
sink.error(new IndexOutOfBoundsException());
} else {
sink.complete();
}
return null;
});
}
})
.then(Mono
.fromSupplier(() -> DatabaseMapDictionary
.tail(dictionary,
prefixKey.retain(),
keySerializer,
valueSerializer
)
)
)
.doFirst(prefixKey::retain)
.doAfterTerminate(prefixKey::release);
} finally {
prefixKey.release();
}
} finally {
key.release();
}
}).then(mono);
} else {
return mono;
}
}),
prefixKey -> Mono.fromRunnable(prefixKey::release)
);
}
@Override
@ -80,23 +79,6 @@ public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, Databa
return assertsEnabled && enableAssertionsWhenUsingAssertions;
}
private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, List<ByteBuf> keys) {
return Mono
.fromCallable(() -> {
try {
for (ByteBuf key : keys) {
assert key.readableBytes() == prefixKey.readableBytes() + getKeyBinaryLength();
}
} finally {
prefixKey.release();
for (ByteBuf key : keys) {
key.release();
}
}
return null;
});
}
public int getKeyBinaryLength() {
return keySerializer.getSerializedBinaryLength();
}

View File

@ -11,7 +11,8 @@ import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements SubStageGetter<Map<T, U>, DatabaseMapDictionaryDeep<T, U, US>> {
public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
SubStageGetter<Map<T, U>, DatabaseMapDictionaryDeep<T, U, US>> {
private static final boolean assertsEnabled;
static {
@ -51,38 +52,37 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
@Override
public Mono<DatabaseMapDictionaryDeep<T, U, US>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot,
ByteBuf prefixKey,
List<ByteBuf> debuggingKeys) {
try {
return Mono
.defer(() -> {
if (assertsEnabled && enableAssertionsWhenUsingAssertions) {
return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys);
} else {
return Mono
.fromCallable(() -> {
for (ByteBuf key : debuggingKeys) {
key.release();
Mono<ByteBuf> prefixKeyMono,
Flux<ByteBuf> debuggingKeysFlux) {
return Mono.usingWhen(prefixKeyMono,
prefixKey -> Mono
.fromSupplier(() -> DatabaseMapDictionaryDeep
.deepIntermediate(dictionary,
prefixKey.retain(),
keySerializer,
subStageGetter,
keyExtLength
)
)
.transform(mono -> {
if (assertsEnabled && enableAssertionsWhenUsingAssertions) {
return debuggingKeysFlux.handle((key, sink) -> {
try {
if (key.readableBytes() != prefixKey.readableBytes() + getKeyBinaryLength()) {
sink.error(new IndexOutOfBoundsException());
} else {
sink.complete();
}
return null;
});
}
})
.then(Mono
.fromSupplier(() -> DatabaseMapDictionaryDeep
.deepIntermediate(dictionary,
prefixKey.retain(),
keySerializer,
subStageGetter,
keyExtLength
)
)
)
.doFirst(prefixKey::retain)
.doAfterTerminate(prefixKey::release);
} finally {
prefixKey.release();
}
} finally {
key.release();
}
}).then(mono);
} else {
return mono;
}
}),
prefixKey -> Mono.fromRunnable(prefixKey::release)
);
}
@Override

View File

@ -35,37 +35,30 @@ public class SubStageGetterSet<T> implements SubStageGetter<Map<T, Nothing>, Dat
@Override
public Mono<DatabaseSetDictionary<T>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot,
ByteBuf prefixKey,
List<ByteBuf> debuggingKeys) {
try {
return Mono
.defer(() -> {
if (assertsEnabled && enableAssertionsWhenUsingAssertions) {
return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys);
} else {
return Mono
.fromCallable(() -> {
for (ByteBuf key : debuggingKeys) {
key.release();
Mono<ByteBuf> prefixKeyMono,
Flux<ByteBuf> debuggingKeysFlux) {
return Mono.usingWhen(prefixKeyMono,
prefixKey -> Mono
.fromSupplier(() -> DatabaseSetDictionary.tail(dictionary, prefixKey.retain(), keySerializer))
.transform(mono -> {
if (assertsEnabled && enableAssertionsWhenUsingAssertions) {
return debuggingKeysFlux.handle((key, sink) -> {
try {
if (key.readableBytes() != prefixKey.readableBytes() + getKeyBinaryLength()) {
sink.error(new IndexOutOfBoundsException());
} else {
sink.complete();
}
return null;
});
}
})
.then(Mono
.fromSupplier(() -> DatabaseSetDictionary
.tail(
dictionary,
prefixKey.retain(),
keySerializer
)
)
)
.doFirst(prefixKey::retain)
.doAfterTerminate(prefixKey::release);
} finally {
prefixKey.release();
}
} finally {
key.release();
}
}).then(mono);
} else {
return mono;
}
}),
prefixKey -> Mono.fromRunnable(prefixKey::release)
);
}
@Override
@ -78,28 +71,6 @@ public class SubStageGetterSet<T> implements SubStageGetter<Map<T, Nothing>, Dat
return assertsEnabled && enableAssertionsWhenUsingAssertions;
}
private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, List<ByteBuf> keys) {
try {
return Mono
.<Void>fromCallable(() -> {
try {
for (ByteBuf key : keys) {
assert key.readableBytes() == prefixKey.readableBytes() + getKeyBinaryLength();
}
} finally {
for (ByteBuf key : keys) {
key.release();
}
}
return null;
})
.doFirst(prefixKey::retain)
.doAfterTerminate(prefixKey::release);
} finally {
prefixKey.release();
}
}
public int getKeyBinaryLength() {
return keySerializer.getSerializedBinaryLength();
}

View File

@ -31,32 +31,31 @@ public class SubStageGetterSingle<T> implements SubStageGetter<T, DatabaseStageE
@Override
public Mono<DatabaseStageEntry<T>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot,
ByteBuf keyPrefix,
List<ByteBuf> debuggingKeys) {
try {
return Mono
.fromCallable(() -> {
try {
for (ByteBuf key : debuggingKeys) {
if (!LLUtils.equals(keyPrefix, key)) {
throw new IndexOutOfBoundsException("Found more than one element!");
}
Mono<ByteBuf> keyPrefixMono,
Flux<ByteBuf> debuggingKeysFlux) {
return Mono.usingWhen(
keyPrefixMono,
keyPrefix -> Mono
.<DatabaseStageEntry<T>>fromSupplier(() -> new DatabaseSingle<>(dictionary, keyPrefix.retain(), serializer))
.transform(mono -> {
if (assertsEnabled && needsDebuggingKeyFlux()) {
return debuggingKeysFlux.handle((key, sink) -> {
try {
if (!LLUtils.equals(keyPrefix, key)) {
sink.error(new IndexOutOfBoundsException("Found more than one element!"));
} else {
sink.complete();
}
} finally {
key.release();
}
}).then(mono);
} else {
return mono;
}
return null;
} finally {
for (ByteBuf key : debuggingKeys) {
key.release();
}
}
})
.then(Mono
.<DatabaseStageEntry<T>>fromSupplier(() -> new DatabaseSingle<>(dictionary, keyPrefix.retain(), serializer))
)
.doFirst(keyPrefix::retain)
.doAfterTerminate(keyPrefix::release);
} finally {
keyPrefix.release();
}
}),
keyPrefix -> Mono.fromRunnable(keyPrefix::release)
);
}
@Override

View File

@ -77,9 +77,9 @@ public class LLLocalDictionary implements LLDictionary {
static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations
static final int MULTI_GET_WINDOW = 500;
static final Duration MULTI_GET_WINDOW_TIMEOUT = Duration.ofSeconds(1);
static final ReadOptions EMPTY_READ_OPTIONS = new UnmodifiableReadOptions();
static final WriteOptions EMPTY_WRITE_OPTIONS = new UnmodifiableWriteOptions();
static final WriteOptions BATCH_WRITE_OPTIONS = new UnmodifiableWriteOptions();
static final ReadOptions EMPTY_READ_OPTIONS = new UnreleasableReadOptions(new UnmodifiableReadOptions());
static final WriteOptions EMPTY_WRITE_OPTIONS = new UnreleasableWriteOptions(new UnmodifiableWriteOptions());
static final WriteOptions BATCH_WRITE_OPTIONS = new UnreleasableWriteOptions(new UnmodifiableWriteOptions());
static final boolean PREFER_SEEK_TO_FIRST = false;
/**
* It used to be false,
@ -1492,6 +1492,7 @@ public class LLLocalDictionary implements LLDictionary {
return Mono
.<Void>fromCallable(() -> {
if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) {
assert EMPTY_READ_OPTIONS.isOwningHandle();
try (var opts = new ReadOptions(EMPTY_READ_OPTIONS)) {
ReleasableSlice minBound;
if (range.hasMin()) {
@ -1514,6 +1515,8 @@ public class LLLocalDictionary implements LLDictionary {
} else {
maxBound = emptyReleasableSlice();
}
assert cfh.isOwningHandle();
assert opts.isOwningHandle();
try (RocksIterator it = db.newIterator(cfh, opts)) {
if (!PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(databaseOptions.allowNettyDirect(), it, range.getMin().retain());

View File

@ -4,6 +4,7 @@ import static it.cavallium.dbengine.database.disk.LLLocalDictionary.getRocksIter
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.IllegalReferenceCountException;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
@ -99,9 +100,7 @@ public abstract class LLLocalReactiveRocksIterator<T> {
rocksIterator.close();
tuple.getT2().release();
tuple.getT3().release();
})
.doFirst(range::retain)
.doAfterTerminate(range::release);
});
}
public abstract T getEntry(ByteBuf key, ByteBuf value);
@ -110,7 +109,7 @@ public abstract class LLLocalReactiveRocksIterator<T> {
if (released.compareAndSet(false, true)) {
range.release();
} else {
throw new IllegalStateException("Already released");
throw new IllegalReferenceCountException(0, -1);
}
}
}

View File

@ -0,0 +1,19 @@
package it.cavallium.dbengine.database.disk;
import org.rocksdb.*;
public class UnreleasableReadOptions extends ReadOptions {
public UnreleasableReadOptions() {
}
public UnreleasableReadOptions(ReadOptions readOptions) {
super(readOptions);
}
@Override
public void close() {
throw new UnsupportedOperationException("Can't close " + this.getClass().getSimpleName());
}
}

View File

@ -0,0 +1,19 @@
package it.cavallium.dbengine.database.disk;
import org.rocksdb.WriteOptions;
public class UnreleasableWriteOptions extends WriteOptions {
public UnreleasableWriteOptions() {
}
public UnreleasableWriteOptions(WriteOptions writeOptions) {
super(writeOptions);
}
@Override
public void close() {
throw new UnsupportedOperationException("Can't close " + this.getClass().getSimpleName());
}
}

View File

@ -2,7 +2,6 @@ package it.cavallium.dbengine.database.memory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.ExtraKeyOperationResult;
@ -135,31 +134,28 @@ public class LLMemoryDictionary implements LLDictionary {
}
@Override
public Mono<ByteBuf> get(@Nullable LLSnapshot snapshot, ByteBuf key, boolean existsAlmostCertainly) {
try {
return Mono
.fromCallable(() -> snapshots.get(resolveSnapshot(snapshot)).get(k(key)))
.map(this::kk)
.onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause))
.doFirst(key::retain)
.doAfterTerminate(key::release);
} finally {
key.release();
}
public Mono<ByteBuf> get(@Nullable LLSnapshot snapshot, Mono<ByteBuf> keyMono, boolean existsAlmostCertainly) {
return Mono.usingWhen(keyMono,
key -> Mono
.fromCallable(() -> snapshots.get(resolveSnapshot(snapshot)).get(k(key)))
.map(this::kk)
.onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause)),
key -> Mono.fromRunnable(key::release)
);
}
@Override
public Mono<ByteBuf> put(ByteBuf key, ByteBuf value, LLDictionaryResultType resultType) {
try {
return Mono
.fromCallable(() -> mainDb.put(k(key), k(value)))
.transform(result -> this.transformResult(result, resultType))
.onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause))
.doFirst(key::retain)
.doAfterTerminate(key::release);
} finally {
key.release();
}
public Mono<ByteBuf> put(Mono<ByteBuf> keyMono, Mono<ByteBuf> valueMono, LLDictionaryResultType resultType) {
return Mono.usingWhen(keyMono,
key -> Mono.usingWhen(valueMono,
value -> Mono
.fromCallable(() -> mainDb.put(k(key), k(value)))
.transform(result -> this.transformResult(result, resultType))
.onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause)),
value -> Mono.fromRunnable(value::release)
),
key -> Mono.fromRunnable(key::release)
);
}
@Override
@ -168,26 +164,29 @@ public class LLMemoryDictionary implements LLDictionary {
}
@Override
public Mono<Delta<ByteBuf>> updateAndGetDelta(ByteBuf key,
public Mono<Delta<ByteBuf>> updateAndGetDelta(Mono<ByteBuf> keyMono,
Function<@Nullable ByteBuf, @Nullable ByteBuf> updater,
boolean existsAlmostCertainly) {
return Mono.fromCallable(() -> {
AtomicReference<ByteBuf> oldRef = new AtomicReference<>(null);
var newValue = mainDb.compute(k(key), (_unused, old) -> {
if (old != null) {
oldRef.set(kk(old));
}
var v = updater.apply(old != null ? kk(old) : null);
try {
return k(v);
} finally {
if (v != null) {
v.release();
}
}
});
return new Delta<>(oldRef.get(), kk(newValue));
});
return Mono.usingWhen(keyMono,
key -> Mono.fromCallable(() -> {
AtomicReference<ByteBuf> oldRef = new AtomicReference<>(null);
var newValue = mainDb.compute(k(key), (_unused, old) -> {
if (old != null) {
oldRef.set(kk(old));
}
var v = updater.apply(old != null ? kk(old) : null);
try {
return k(v);
} finally {
if (v != null) {
v.release();
}
}
});
return new Delta<>(oldRef.get(), kk(newValue));
}),
key -> Mono.fromRunnable(key::release)
);
}
@Override
@ -196,18 +195,26 @@ public class LLMemoryDictionary implements LLDictionary {
}
@Override
public Mono<ByteBuf> remove(ByteBuf key, LLDictionaryResultType resultType) {
try {
return Mono
.fromCallable(() -> mainDb.remove(k(key)))
// Don't retain the result because it has been removed from the skip list
.onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause))
.map(this::kk)
.doFirst(key::retain)
.doAfterTerminate(key::release);
} finally {
key.release();
}
public Mono<ByteBuf> remove(Mono<ByteBuf> keyMono, LLDictionaryResultType resultType) {
return Mono.usingWhen(keyMono,
key -> Mono
.fromCallable(() -> mainDb.remove(k(key)))
// Don't retain the result because it has been removed from the skip list
.mapNotNull(bytesList -> switch (resultType) {
case VOID -> null;
case PREVIOUS_VALUE_EXISTENCE -> LLUtils.booleanToResponseByteBuffer(true);
case PREVIOUS_VALUE -> kk(bytesList);
})
.switchIfEmpty(Mono.defer(() -> {
if (resultType == LLDictionaryResultType.PREVIOUS_VALUE_EXISTENCE) {
return Mono.fromCallable(() -> LLUtils.booleanToResponseByteBuffer(false));
} else {
return Mono.empty();
}
}))
.onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause)),
key -> Mono.fromRunnable(key::release)
);
}
@Override
@ -257,99 +264,106 @@ public class LLMemoryDictionary implements LLDictionary {
@Override
public Flux<Entry<ByteBuf, ByteBuf>> getRange(@Nullable LLSnapshot snapshot,
LLRange range,
Mono<LLRange> rangeMono,
boolean existsAlmostCertainly) {
try {
if (range.isSingle()) {
return Mono.fromCallable(() -> {
var element = snapshots.get(resolveSnapshot(snapshot))
.get(k(range.getSingle()));
return Map.entry(range.getSingle().retain(), kk(element));
}).flux();
} else {
return Mono
.fromCallable(() -> mapSlice(snapshot, range))
.flatMapMany(map -> Flux.fromIterable(map.entrySet()))
.map(entry -> Map.entry(kk(entry.getKey()), kk(entry.getValue())));
}
} finally {
range.release();
}
return Flux.usingWhen(rangeMono,
range -> {
if (range.isSingle()) {
return Mono.fromCallable(() -> {
var element = snapshots.get(resolveSnapshot(snapshot))
.get(k(range.getSingle()));
return Map.entry(range.getSingle().retain(), kk(element));
}).flux();
} else {
return Mono
.fromCallable(() -> mapSlice(snapshot, range))
.flatMapMany(map -> Flux.fromIterable(map.entrySet()))
.map(entry -> Map.entry(kk(entry.getKey()), kk(entry.getValue())));
}
},
range -> Mono.fromRunnable(range::release)
);
}
@Override
public Flux<List<Entry<ByteBuf, ByteBuf>>> getRangeGrouped(@Nullable LLSnapshot snapshot,
LLRange range,
Mono<LLRange> rangeMono,
int prefixLength,
boolean existsAlmostCertainly) {
return Flux.error(new UnsupportedOperationException("Not implemented"));
}
@Override
public Flux<ByteBuf> getRangeKeys(@Nullable LLSnapshot snapshot, LLRange range) {
try {
if (range.isSingle()) {
return Mono.fromCallable(() -> {
var contains = snapshots.get(resolveSnapshot(snapshot))
.containsKey(k(range.getSingle()));
return contains ? range.getSingle().retain() : null;
}).flux();
} else {
return Mono
.fromCallable(() -> mapSlice(snapshot, range))
.flatMapMany(map -> Flux.fromIterable(map.entrySet()))
.map(entry -> kk(entry.getKey()));
}
} finally {
range.release();
}
public Flux<ByteBuf> getRangeKeys(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono) {
return Flux.usingWhen(rangeMono,
range -> {
if (range.isSingle()) {
return Mono.fromCallable(() -> {
var contains = snapshots.get(resolveSnapshot(snapshot))
.containsKey(k(range.getSingle()));
return contains ? range.getSingle().retain() : null;
}).flux();
} else {
return Mono
.fromCallable(() -> mapSlice(snapshot, range))
.flatMapMany(map -> Flux.fromIterable(map.entrySet()))
.map(entry -> kk(entry.getKey()));
}
},
range -> Mono.fromRunnable(range::release)
);
}
@Override
public Flux<List<ByteBuf>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) {
return getRangeKeys(snapshot, range)
public Flux<List<ByteBuf>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot,
Mono<LLRange> rangeMono,
int prefixLength) {
return getRangeKeys(snapshot, rangeMono)
.bufferUntilChanged(k -> k.slice(k.readerIndex(), prefixLength), LLUtils::equals);
}
@Override
public Flux<ByteBuf> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) {
return getRangeKeys(snapshot, range)
public Flux<ByteBuf> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono, int prefixLength) {
return getRangeKeys(snapshot, rangeMono)
.distinctUntilChanged(k -> k.slice(k.readerIndex(), prefixLength), LLUtils::equals)
.map(k -> k.slice(k.readerIndex(), prefixLength));
}
@Override
public Flux<BadBlock> badBlocks(LLRange range) {
public Flux<BadBlock> badBlocks(Mono<LLRange> rangeMono) {
return Flux.empty();
}
@Override
public Mono<Void> setRange(LLRange range, Flux<Entry<ByteBuf, ByteBuf>> entries) {
public Mono<Void> setRange(Mono<LLRange> rangeMono, Flux<Entry<ByteBuf, ByteBuf>> entries) {
return Mono.error(new UnsupportedOperationException("Not implemented"));
}
@Override
public Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, LLRange range) {
public Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono) {
return Mono.error(new UnsupportedOperationException("Not implemented"));
}
@Override
public Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, LLRange range, boolean fast) {
return Mono.fromCallable(() -> (long) mapSlice(snapshot, range).size());
public Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono, boolean fast) {
return Mono.usingWhen(rangeMono,
range -> Mono.fromCallable(() -> (long) mapSlice(snapshot, range).size()),
range -> Mono.fromRunnable(range::release)
);
}
@Override
public Mono<Entry<ByteBuf, ByteBuf>> getOne(@Nullable LLSnapshot snapshot, LLRange range) {
public Mono<Entry<ByteBuf, ByteBuf>> getOne(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono) {
return Mono.error(new UnsupportedOperationException("Not implemented"));
}
@Override
public Mono<ByteBuf> getOneKey(@Nullable LLSnapshot snapshot, LLRange range) {
public Mono<ByteBuf> getOneKey(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono) {
return Mono.error(new UnsupportedOperationException("Not implemented"));
}
@Override
public Mono<Entry<ByteBuf, ByteBuf>> removeOne(LLRange range) {
public Mono<Entry<ByteBuf, ByteBuf>> removeOne(Mono<LLRange> rangeMono) {
return Mono.error(new UnsupportedOperationException("Not implemented"));
}

View File

@ -13,10 +13,13 @@ public class LLMemorySingleton implements LLSingleton {
private final LLMemoryDictionary dict;
private final byte[] singletonName;
private final Mono<ByteBuf> singletonNameBufMono;
public LLMemorySingleton(LLMemoryDictionary dict, byte[] singletonName) {
this.dict = dict;
this.singletonName = singletonName;
ByteBuf singletonNameBuf = Unpooled.wrappedBuffer(singletonName);
this.singletonNameBufMono = Mono.just(singletonNameBuf).map(ByteBuf::retain);
}
@Override
@ -26,32 +29,23 @@ public class LLMemorySingleton implements LLSingleton {
@Override
public Mono<byte[]> get(@Nullable LLSnapshot snapshot) {
var bb = Unpooled.wrappedBuffer(singletonName);
return Mono
.defer(() -> dict.get(snapshot, bb.retain(), false))
return dict
.get(snapshot, singletonNameBufMono, false)
.map(b -> {
try {
return LLUtils.toArray(b);
} finally {
b.release();
}
})
.doAfterTerminate(bb::release)
.doFirst(bb::retain);
});
}
@Override
public Mono<Void> set(byte[] value) {
var bbKey = Unpooled.wrappedBuffer(singletonName);
var bbVal = Unpooled.wrappedBuffer(value);
return Mono
.defer(() -> dict
.put(bbKey.retain(), bbVal.retain(), LLDictionaryResultType.VOID)
)
.doAfterTerminate(bbKey::release)
.doAfterTerminate(bbVal::release)
.doFirst(bbKey::retain)
.doFirst(bbVal::retain)
var bbKey = Mono.just(Unpooled.wrappedBuffer(singletonName)).map(ByteBuf::retain);
var bbVal = Mono.just(Unpooled.wrappedBuffer(value)).map(ByteBuf::retain);
return dict
.put(bbKey, bbVal, LLDictionaryResultType.VOID)
.then();
}
}

View File

@ -53,7 +53,7 @@ public class OldDatabaseTests {
))
.flatMap(collection -> Flux
.fromIterable(originalKeys)
.flatMap(k1 -> collection.putValue(k1, DUMMY_VALUE))
.flatMap(k1 -> collection.putValue(k1, DUMMY_VALUE.retain()))
.then(collection.leavesCount(null, false))
)
)
@ -82,7 +82,7 @@ public class OldDatabaseTests {
.flatMap(k1 -> collection.at(null, k1))
.flatMap(k1at -> Flux
.fromIterable(originalSubKeys)
.flatMap(k2 -> k1at.putValue(k2, DUMMY_VALUE))
.flatMap(k2 -> k1at.putValue(k2, DUMMY_VALUE.retain()))
)
.then(collection.leavesCount(null, false))
)
@ -99,15 +99,15 @@ public class OldDatabaseTests {
String newPrefix = "xxx";
StepVerifier
.create(
tempDb()
.flatMapMany(db -> addKeysAndConvertToLongerOnes(db, originalSuperKeys, originalSubKeys, newPrefix))
.create(tempDb()
.flatMapMany(db -> addKeysAndConvertToLongerOnes(db, originalSuperKeys, originalSubKeys, newPrefix))
)
.expectNextSequence(originalSuperKeys
.stream()
.flatMap(superKey -> originalSubKeys
.stream()
.map(subKey -> Map.entry(newPrefix + superKey, newPrefix + subKey)))
.map(subKey -> Map.entry(newPrefix + superKey, newPrefix + subKey))
)
.collect(Collectors.toList())
)
.verifyComplete();
@ -174,11 +174,17 @@ public class OldDatabaseTests {
var db1 = tuple.getT1();
return Flux
.fromIterable(originalSuperKeys)
.flatMapSequential(superKey -> db1.at(null, superKey))
.flatMapSequential(at -> Flux
.concatMap(superKey -> db1.at(null, superKey))
.concatMap(at -> Flux
.fromIterable(originalSubKeys)
.flatMapSequential(subKey -> at.at(null, subKey))
.flatMapSequential(at2 -> at2.set(DUMMY_VALUE))
.concatMap(subKey -> at
.at(null, subKey)
.flatMap(at2 -> at2
.set(DUMMY_VALUE.retainedSlice())
.doAfterTerminate(at2::release)
)
)
.doAfterTerminate(at::release)
)
.then(db
.takeSnapshot()
@ -203,19 +209,23 @@ public class OldDatabaseTests {
return oldDb
.getAllStages(snapshot)
.flatMapSequential(parentEntry -> Mono
.concatMap(parentEntry -> Mono
.fromCallable(() -> newPrefix + parentEntry.getKey())
.flatMapMany(newId1 -> parentEntry.getValue()
.getAllValues(snapshot)
.flatMapSequential(entry -> Mono
.concatMap(entry -> Mono
.fromCallable(() -> newPrefix + entry.getKey())
.flatMap(newId2 -> newDb
.at(null, newId1)
.flatMap(newStage -> newStage.putValue(newId2, entry.getValue()))
.flatMap(newStage -> newStage
.putValue(newId2, entry.getValue())
.doAfterTerminate(newStage::release)
)
.thenReturn(Map.entry(newId1, newId2))
)
)
)
.doAfterTerminate(() -> parentEntry.getValue().release())
)
.concatWith(db
.releaseSnapshot(snapshot.getSnapshot(db))

View File

@ -29,13 +29,7 @@ public class TestDictionaryMap {
return System.getProperty("badkeys", "true").equalsIgnoreCase("true");
}
private static final String BIG_STRING
= "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "01234567890123456789012345678901234567890123456789012345678901234567890123456789";
private static final String BIG_STRING = "012345678901234567890123456789";
private static Stream<Arguments> provideArgumentsPut() {
var goodKeys = List.of("12345");

View File

@ -32,13 +32,7 @@ public class TestDictionaryMapDeep {
return System.getProperty("badkeys", "true").equalsIgnoreCase("true");
}
private static final String BIG_STRING
= "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "01234567890123456789012345678901234567890123456789012345678901234567890123456789";
private static final String BIG_STRING = "012345678901234567890123456789";
private static Stream<Arguments> provideArgumentsSet() {
var goodKeys = Set.of("12345");