This commit is contained in:
Andrea Cavalli 2021-05-02 19:18:15 +02:00
parent 2e6aceafe6
commit 0c26daba57
29 changed files with 2501 additions and 942 deletions

View File

@ -24,6 +24,8 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
Mono<ByteBuf> put(ByteBuf key, ByteBuf value, LLDictionaryResultType resultType);
Mono<UpdateMode> getUpdateMode();
Mono<Boolean> update(ByteBuf key, Function<@Nullable ByteBuf, @Nullable ByteBuf> updater, boolean existsAlmostCertainly);
default Mono<Boolean> update(ByteBuf key, Function<@Nullable ByteBuf, @Nullable ByteBuf> updater) {
@ -65,7 +67,7 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
Flux<ByteBuf> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength);
Flux<Entry<ByteBuf, ByteBuf>> setRange(LLRange range, Flux<Entry<ByteBuf, ByteBuf>> entries, boolean getOldValues);
Mono<Void> setRange(LLRange range, Flux<Entry<ByteBuf, ByteBuf>> entries);
default Mono<Void> replaceRange(LLRange range,
boolean canKeysChange,
@ -76,8 +78,8 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
return this
.setRange(range, this
.getRange(null, range, existsAlmostCertainly)
.flatMap(entriesReplacer), false)
.then();
.flatMap(entriesReplacer)
);
} else {
return this
.putMulti(this

View File

@ -32,8 +32,8 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.RocksDB;
import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
import static io.netty.buffer.Unpooled.wrappedBuffer;
import static it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep.EMPTY_BYTES;
@SuppressWarnings("unused")
public class LLUtils {
@ -217,14 +217,13 @@ public class LLUtils {
@Nullable
public static ByteBuf readNullableDirectNioBuffer(ByteBufAllocator alloc, ToIntFunction<ByteBuffer> reader) {
ByteBuf buffer = alloc.directBuffer();
try {
ByteBuf directBuffer = null;
ByteBuffer nioBuffer;
int size;
Boolean mustBeCopied = null;
do {
if (mustBeCopied == null || !mustBeCopied) {
nioBuffer = LLUtils.toDirectFast(buffer.retain());
nioBuffer = LLUtils.toDirectFast(buffer);
if (nioBuffer != null) {
nioBuffer.limit(nioBuffer.capacity());
}
@ -264,16 +263,11 @@ public class LLUtils {
}
}
} while (size != RocksDB.NOT_FOUND);
} catch (Throwable t) {
buffer.release();
throw t;
}
return null;
}
@Nullable
public static ByteBuffer toDirectFast(ByteBuf buffer) {
try {
ByteBuffer result = buffer.nioBuffer(0, buffer.capacity());
if (result.isDirect()) {
result.limit(buffer.writerIndex());
@ -287,9 +281,16 @@ public class LLUtils {
} else {
return null;
}
} finally {
buffer.release();
}
public static ByteBuffer toDirect(ByteBuf buffer) {
ByteBuffer result = toDirectFast(buffer);
if (result == null) {
throw new IllegalArgumentException("The supplied ByteBuf is not direct "
+ "(if it's a CompositeByteBuf it must be consolidated before)");
}
assert result.isDirect();
return result;
}
public static ByteBuf toDirectCopy(ByteBuf buffer) {
@ -324,59 +325,99 @@ public class LLUtils {
}
public static ByteBuf directCompositeBuffer(ByteBufAllocator alloc, ByteBuf buffer) {
return wrappedBuffer(buffer);
try {
ByteBuf result = alloc.directBuffer(buffer.readableBytes());
try {
result.writeBytes(buffer, buffer.readerIndex(), buffer.readableBytes());
return result.retain();
} finally {
result.release();
}
} finally {
buffer.release();
}
}
public static ByteBuf directCompositeBuffer(ByteBufAllocator alloc, ByteBuf buffer1, ByteBuf buffer2) {
try {
assert buffer1.isDirect();
assert buffer1.nioBuffer().isDirect();
assert buffer2.isDirect();
assert buffer2.nioBuffer().isDirect();
if (buffer1.readableBytes() == 0) {
return wrappedBuffer(buffer2);
return directCompositeBuffer(alloc, buffer2.retain());
} else if (buffer2.readableBytes() == 0) {
return wrappedBuffer(buffer1);
return directCompositeBuffer(alloc, buffer1.retain());
}
ByteBuf result = alloc.directBuffer(buffer1.readableBytes() + buffer2.readableBytes());
try {
result.writeBytes(buffer1, buffer1.readerIndex(), buffer1.readableBytes());
result.writeBytes(buffer2, buffer2.readerIndex(), buffer2.readableBytes());
return result.retain();
} finally {
result.release();
}
} finally {
buffer1.release();
buffer2.release();
}
CompositeByteBuf compositeBuffer = alloc.compositeDirectBuffer(2);
compositeBuffer.addComponent(true, buffer1);
compositeBuffer.addComponent(true, buffer2);
compositeBuffer.consolidate();
assert compositeBuffer.isDirect();
assert compositeBuffer.nioBuffer().isDirect();
return compositeBuffer;
}
public static ByteBuf directCompositeBuffer(ByteBufAllocator alloc, ByteBuf buffer1, ByteBuf buffer2, ByteBuf buffer3) {
try {
if (buffer1.readableBytes() == 0) {
return directCompositeBuffer(alloc, buffer2, buffer3);
return directCompositeBuffer(alloc, buffer2.retain(), buffer3.retain());
} else if (buffer2.readableBytes() == 0) {
return directCompositeBuffer(alloc, buffer1, buffer3);
return directCompositeBuffer(alloc, buffer1.retain(), buffer3.retain());
} else if (buffer3.readableBytes() == 0) {
return directCompositeBuffer(alloc, buffer1, buffer2);
return directCompositeBuffer(alloc, buffer1.retain(), buffer2.retain());
}
ByteBuf result = alloc.directBuffer(buffer1.readableBytes() + buffer2.readableBytes() + buffer3.readableBytes());
try {
result.writeBytes(buffer1, buffer1.readerIndex(), buffer1.readableBytes());
result.writeBytes(buffer2, buffer2.readerIndex(), buffer2.readableBytes());
result.writeBytes(buffer3, buffer3.readerIndex(), buffer3.readableBytes());
return result.retain();
} finally {
result.release();
}
} finally {
buffer1.release();
buffer2.release();
buffer3.release();
}
CompositeByteBuf compositeBuffer = alloc.compositeDirectBuffer(3);
compositeBuffer.addComponent(true, buffer1);
compositeBuffer.addComponent(true, buffer2);
compositeBuffer.addComponent(true, buffer3);
compositeBuffer.consolidate();
return compositeBuffer;
}
public static ByteBuf directCompositeBuffer(ByteBufAllocator alloc, ByteBuf... buffers) {
try {
switch (buffers.length) {
case 0:
return EMPTY_BUFFER;
return EMPTY_BYTES;
case 1:
return directCompositeBuffer(alloc, buffers[0]);
return directCompositeBuffer(alloc, buffers[0].retain().retain());
case 2:
return directCompositeBuffer(alloc, buffers[0], buffers[1]);
return directCompositeBuffer(alloc, buffers[0].retain(), buffers[1].retain());
case 3:
return directCompositeBuffer(alloc, buffers[0], buffers[1], buffers[2]);
return directCompositeBuffer(alloc, buffers[0].retain(), buffers[1].retain(), buffers[2].retain());
default:
CompositeByteBuf compositeBuffer = alloc.compositeDirectBuffer(buffers.length);
compositeBuffer.addComponents(true, buffers);
compositeBuffer.consolidate();
return compositeBuffer;
int readableTotal = 0;
for (ByteBuf buffer : buffers) {
readableTotal += buffer.readableBytes();
}
ByteBuf result = alloc.directBuffer(readableTotal);
try {
for (ByteBuf buffer : buffers) {
result.writeBytes(buffer, buffer.readerIndex(), buffer.readableBytes());
}
return result.retain();
} finally {
result.release();
}
}
} finally {
for (ByteBuf buffer : buffers) {
buffer.release();
}
}
}
}

View File

@ -1,16 +1,19 @@
package it.cavallium.dbengine.database.collections;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.RocksDBException;
@ -48,8 +51,8 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
}
private ByteBuf toKey(ByteBuf suffixKey) {
assert suffixKeyConsistency(suffixKey.readableBytes());
try {
assert suffixKeyConsistency(suffixKey.readableBytes());
return LLUtils.directCompositeBuffer(dictionary.getAllocator(), keyPrefix.retain(), suffixKey.retain());
} finally {
suffixKey.release();
@ -63,32 +66,31 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.collectMap(
entry -> deserializeSuffix(stripPrefix(entry.getKey())),
entry -> deserialize(entry.getValue()),
HashMap::new);
HashMap::new)
.filter(map -> !map.isEmpty());
}
@Override
public Mono<Map<T, U>> setAndGetPrevious(Map<T, U> value) {
return dictionary
return Mono
.usingWhen(
Mono.just(true),
b -> get(null, false),
b -> dictionary
.setRange(range.retain(),
Flux
.fromIterable(value.entrySet())
.map(entry -> Map.entry(serializeSuffix(entry.getKey()), serialize(entry.getValue()))),
true
.map(entry -> Map
.entry(this.toKey(serializeSuffix(entry.getKey())), serialize(entry.getValue()))
)
.collectMap(
entry -> deserializeSuffix(stripPrefix(entry.getKey())),
entry -> deserialize(entry.getValue()),
HashMap::new);
)
);
}
@Override
public Mono<Map<T, U>> clearAndGetPrevious() {
return dictionary
.setRange(range.retain(), Flux.empty(), true)
.collectMap(
entry -> deserializeSuffix(stripPrefix(entry.getKey())),
entry -> deserialize(entry.getValue()),
HashMap::new);
return this
.setAndGetPrevious(Map.of());
}
@Override
@ -103,28 +105,21 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<DatabaseStageEntry<U>> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
ByteBuf keySuffixBuf = serializeSuffix(keySuffix);
ByteBuf keyBuf = toKey(keySuffixBuf.retain());
return Mono
.fromSupplier(() -> new DatabaseSingle<>(dictionary, keyBuf.retain(), Serializer.noop()))
.<DatabaseStageEntry<U>>map(entry -> new DatabaseSingleMapped<>(entry, valueSerializer))
.doFinally(s -> {
keyBuf.release();
keySuffixBuf.release();
});
.fromSupplier(() -> new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), Serializer.noop()))
.<DatabaseStageEntry<U>>map(entry -> new DatabaseSingleMapped<>(entry, valueSerializer));
}
@Override
public Mono<U> getValue(@Nullable CompositeSnapshot snapshot, T keySuffix, boolean existsAlmostCertainly) {
ByteBuf keySuffixBuf = serializeSuffix(keySuffix);
ByteBuf keyBuf = toKey(keySuffixBuf.retain());
return dictionary
return Mono
.using(
() -> toKey(serializeSuffix(keySuffix)),
keyBuf -> dictionary
.get(resolveSnapshot(snapshot), keyBuf.retain(), existsAlmostCertainly)
.map(this::deserialize)
.doFinally(s -> {
keyBuf.release();
keySuffixBuf.release();
});
.map(this::deserialize),
ReferenceCounted::release
);
}
@Override
@ -139,13 +134,19 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
}).then();
}
@Override
public Mono<UpdateMode> getUpdateMode() {
return dictionary.getUpdateMode();
}
@Override
public Mono<Boolean> updateValue(T keySuffix,
boolean existsAlmostCertainly,
Function<@Nullable U, @Nullable U> updater) {
ByteBuf keySuffixBuf = serializeSuffix(keySuffix);
ByteBuf keyBuf = toKey(keySuffixBuf.retain());
return dictionary.update(keyBuf.retain(), oldSerialized -> {
return Mono
.using(
() -> toKey(serializeSuffix(keySuffix)),
keyBuf -> dictionary.update(keyBuf.retain(), oldSerialized -> {
try {
var result = updater.apply(oldSerialized == null ? null : this.deserialize(oldSerialized.retain()));
if (result == null) {
@ -158,10 +159,9 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
oldSerialized.release();
}
}
}, existsAlmostCertainly).doFinally(s -> {
keyBuf.release();
keySuffixBuf.release();
});
}, existsAlmostCertainly),
ReferenceCounted::release
);
}
@Override
@ -180,13 +180,15 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
}
@Override
public Mono<Boolean> putValueAndGetStatus(T keySuffix, U value) {
public Mono<Boolean> putValueAndGetChanged(T keySuffix, U value) {
ByteBuf keySuffixBuf = serializeSuffix(keySuffix);
ByteBuf keyBuf = toKey(keySuffixBuf.retain());
ByteBuf valueBuf = serialize(value);
return dictionary
.put(keyBuf.retain(), valueBuf.retain(), LLDictionaryResultType.PREVIOUS_VALUE_EXISTENCE)
.map(LLUtils::responseToBoolean)
.put(keyBuf.retain(), valueBuf.retain(), LLDictionaryResultType.PREVIOUS_VALUE)
.map(this::deserialize)
.map(oldValue -> !Objects.equals(oldValue, value))
.defaultIfEmpty(value != null)
.doFinally(s -> {
keyBuf.release();
keySuffixBuf.release();
@ -196,12 +198,12 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<Void> remove(T keySuffix) {
ByteBuf keySuffixBuf = serializeSuffix(keySuffix);
ByteBuf keyBuf = toKey(keySuffixBuf.retain());
return dictionary.remove(keyBuf.retain(), LLDictionaryResultType.VOID).doFinally(s -> {
keyBuf.release();
keySuffixBuf.release();
}).then();
return Mono
.using(
() -> toKey(serializeSuffix(keySuffix)),
keyBuf -> dictionary.remove(keyBuf.retain(), LLDictionaryResultType.VOID).then(),
ReferenceCounted::release
);
}
@Override
@ -241,15 +243,38 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
keySuffixBuf.release();
}
})), existsAlmostCertainly)
.flatMap(entry -> Mono.fromCallable(() -> Map.entry(deserializeSuffix(stripPrefix(entry.getKey())), deserialize(entry.getValue()))));
.flatMap(entry -> Mono
.fromCallable(() -> Map.entry(deserializeSuffix(stripPrefix(entry.getKey())), deserialize(entry.getValue())))
);
}
private Entry<ByteBuf, ByteBuf> serializeEntry(T key, U value) {
ByteBuf serializedKey = toKey(serializeSuffix(key));
try {
ByteBuf serializedValue = serialize(value);
try {
return Map.entry(serializedKey.retain(), serializedValue.retain());
} finally {
serializedValue.release();
}
} finally {
serializedKey.release();
}
}
@Override
public Mono<Void> putMulti(Flux<Entry<T, U>> entries) {
var serializedEntries = entries
.flatMap(entry -> Mono
.fromCallable(() -> serializeEntry(entry.getKey(), entry.getValue()))
).doOnDiscard(Entry.class, entry -> {
//noinspection unchecked
var castedEntry = (Entry<ByteBuf, ByteBuf>) entry;
castedEntry.getKey().release();
castedEntry.getValue().release();
});
return dictionary
.putMulti(entries.flatMap(entry -> Mono.fromCallable(() -> Map.entry(toKey(serializeSuffix(entry.getKey())),
serialize(entry.getValue())
))), false)
.putMulti(serializedEntries, false)
.then();
}
@ -257,14 +282,18 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot) {
return dictionary
.getRangeKeys(resolveSnapshot(snapshot), range.retain())
.map(key -> Map.entry(deserializeSuffix(stripPrefix(key)),
new DatabaseSingleMapped<>(
new DatabaseSingle<>(dictionary,
toKey(stripPrefix(key)),
Serializer.noop()),
valueSerializer
)
));
.map(key -> {
try {
return Map.entry(deserializeSuffix(stripPrefix(key.retain())),
new DatabaseSingleMapped<>(new DatabaseSingle<>(dictionary,
toKey(stripPrefix(key.retain())),
Serializer.noop()
), valueSerializer)
);
} finally {
key.release();
}
});
}
@Override
@ -274,16 +303,28 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.map(serializedEntry -> Map.entry(
deserializeSuffix(stripPrefix(serializedEntry.getKey())),
valueSerializer.deserialize(serializedEntry.getValue())
));
))
.doOnDiscard(Entry.class, entry -> {
//noinspection unchecked
var castedEntry = (Entry<ByteBuf, ByteBuf>) entry;
castedEntry.getKey().release();
castedEntry.getValue().release();
});
}
@Override
public Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) {
return dictionary
return Flux
.usingWhen(
Mono.just(true),
b -> getAllValues(null),
b -> dictionary
.setRange(range.retain(),
entries.map(entry ->
Map.entry(toKey(serializeSuffix(entry.getKey())), serialize(entry.getValue()))), true)
.map(entry -> Map.entry(deserializeSuffix(stripPrefix(entry.getKey())), deserialize(entry.getValue())));
Map.entry(toKey(serializeSuffix(entry.getKey())), serialize(entry.getValue()))
)
)
);
}
@Override
@ -297,8 +338,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.then();
} else {
return dictionary
.setRange(range.retain(), Flux.empty(), false)
.then();
.setRange(range.retain(), Flux.empty());
}
}
@ -315,9 +355,4 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
private ByteBuf serialize(U bytes) {
return valueSerializer.serialize(bytes);
}
@Override
public void release() {
super.release();
}
}

View File

@ -2,23 +2,24 @@ package it.cavallium.dbengine.database.collections;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.disk.LLLocalDictionary;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import lombok.Value;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;
import static io.netty.buffer.Unpooled.*;
// todo: implement optimized methods
@ -34,6 +35,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
protected final int keySuffixLength;
protected final int keyExtLength;
protected final LLRange range;
private volatile boolean released;
private static ByteBuf incrementPrefix(ByteBufAllocator alloc, ByteBuf originalKey, int prefixLength) {
try {
@ -115,9 +117,13 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
assert zeroSuffixAndExt.nioBuffer().isDirect();
zeroSuffixAndExt.writeZero(suffixLength + extLength);
ByteBuf result = LLUtils.directCompositeBuffer(alloc, prefixKey.retain(), zeroSuffixAndExt.retain());
try {
assert result.isDirect();
assert result.nioBuffer().isDirect();
return result;
return result.retain();
} finally {
result.release();
}
} finally {
zeroSuffixAndExt.release();
}
@ -169,13 +175,26 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
int prefixLength,
int suffixLength,
int extLength) {
try {
assert prefixKey.readableBytes() == prefixLength;
assert suffixKey.readableBytes() == suffixLength;
assert suffixLength > 0;
assert extLength >= 0;
var result = LLUtils.directCompositeBuffer(alloc, prefixKey, suffixKey, alloc.buffer(extLength, extLength).writeZero(extLength));
ByteBuf result = LLUtils.directCompositeBuffer(alloc,
prefixKey.retain(),
suffixKey.retain(),
alloc.directBuffer(extLength, extLength).writeZero(extLength)
);
try {
assert result.readableBytes() == prefixLength + suffixLength + extLength;
return result;
return result.retain();
} finally {
result.release();
}
} finally {
prefixKey.release();
suffixKey.release();
}
}
/**
@ -213,7 +232,9 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
this.alloc = dictionary.getAllocator();
this.subStageGetter = subStageGetter;
this.keySuffixSerializer = keySuffixSerializer;
this.keyPrefix = wrappedUnmodifiableBuffer(prefixKey).retain();
assert prefixKey.refCnt() > 0;
this.keyPrefix = prefixKey.retain();
assert keyPrefix.refCnt() > 0;
this.keyPrefixLength = keyPrefix.readableBytes();
this.keySuffixLength = keySuffixSerializer.getSerializedBinaryLength();
this.keyExtLength = keyExtLength;
@ -221,19 +242,21 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
throw new IllegalArgumentException("KeyPrefix must be a direct buffer");
}
assert keyPrefix.isDirect();
ByteBuf firstKey = wrappedUnmodifiableBuffer(firstRangeKey(alloc,
ByteBuf firstKey = firstRangeKey(alloc,
keyPrefix.retain(),
keyPrefixLength,
keySuffixLength,
keyExtLength
));
ByteBuf nextRangeKey = wrappedUnmodifiableBuffer(nextRangeKey(alloc,
keyPrefix.retain(),
keyPrefixLength,
keySuffixLength,
keyExtLength
));
);
try {
ByteBuf nextRangeKey = nextRangeKey(alloc,
keyPrefix.retain(),
keyPrefixLength,
keySuffixLength,
keyExtLength
);
try {
assert keyPrefix.refCnt() > 0;
assert keyPrefixLength == 0 || !LLUtils.equals(firstKey, nextRangeKey);
assert firstKey.isDirect();
assert nextRangeKey.isDirect();
@ -244,9 +267,11 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
assert range == null || !range.hasMax() || range.getMax().isDirect();
assert subStageKeysConsistency(keyPrefixLength + keySuffixLength + keyExtLength);
} finally {
firstKey.release();
nextRangeKey.release();
}
} finally {
firstKey.release();
}
} finally {
prefixKey.release();
}
@ -271,7 +296,11 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
* Keep only suffix and ext
*/
protected ByteBuf stripPrefix(ByteBuf key) {
return key.slice(this.keyPrefixLength, key.readableBytes() - this.keyPrefixLength);
try {
return key.retainedSlice(this.keyPrefixLength, key.readableBytes() - this.keyPrefixLength);
} finally {
key.release();
}
}
/**
@ -292,8 +321,13 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
try {
assert suffixKey.readableBytes() == keySuffixLength;
ByteBuf result = LLUtils.directCompositeBuffer(alloc, keyPrefix.retain(), suffixKey.retain());
assert keyPrefix.refCnt() > 0;
try {
assert result.readableBytes() == keyPrefixLength + keySuffixLength;
return result;
return result.retain();
} finally {
result.release();
}
} finally {
suffixKey.release();
}
@ -323,6 +357,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
keySuffixLength,
keyExtLength
);
assert keyPrefix.refCnt() > 0;
return LLRange.of(first, end);
} finally {
keySuffix.release();
@ -331,65 +366,129 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
@Override
public Mono<Long> leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) {
return dictionary.sizeRange(resolveSnapshot(snapshot), range.retain(), fast);
return Mono.defer(() -> dictionary.sizeRange(resolveSnapshot(snapshot), range.retain(), fast));
}
@Override
public Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) {
return dictionary.isRangeEmpty(resolveSnapshot(snapshot), range.retain());
return Mono.defer(() -> dictionary.isRangeEmpty(resolveSnapshot(snapshot), range.retain()));
}
@Override
public Mono<US> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
ByteBuf keySuffixData = serializeSuffix(keySuffix);
Flux<ByteBuf> keyFlux;
if (LLLocalDictionary.DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED && this.subStageGetter.needsDebuggingKeyFlux()) {
keyFlux = this.dictionary.getRangeKeys(resolveSnapshot(snapshot), toExtRange(keySuffixData.retain()));
return Mono
.using(
() -> serializeSuffix(keySuffix),
keySuffixData -> {
Flux<ByteBuf> keyFlux = Flux
.defer(() -> {
if (LLLocalDictionary.DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED
&& this.subStageGetter.needsDebuggingKeyFlux()) {
return Flux
.using(
() -> toExtRange(keySuffixData.retain()),
extRangeBuf -> this.dictionary
.getRangeKeys(resolveSnapshot(snapshot), extRangeBuf.retain()),
LLRange::release
);
} else {
keyFlux = Flux.empty();
return Flux.empty();
}
return this.subStageGetter
.subStage(dictionary, snapshot, toKeyWithoutExt(keySuffixData.retain()), keyFlux)
.doFinally(s -> keySuffixData.release());
});
return Mono
.using(
() -> toKeyWithoutExt(keySuffixData.retain()),
keyBuf -> this.subStageGetter
.subStage(dictionary, snapshot, keyBuf.retain(), keyFlux),
ReferenceCounted::release
)
.doOnDiscard(DatabaseStage.class, DatabaseStage::release);
},
ReferenceCounted::release
)
.doOnDiscard(DatabaseStage.class, DatabaseStage::release);
}
@Override
public Mono<UpdateMode> getUpdateMode() {
return dictionary.getUpdateMode();
}
@Value
private static class GroupBuffers {
ByteBuf groupKeyWithExt;
ByteBuf groupKeyWithoutExt;
ByteBuf groupSuffix;
}
@Override
public Flux<Entry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot) {
if (LLLocalDictionary.DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED && this.subStageGetter.needsDebuggingKeyFlux()) {
return dictionary
.getRangeKeysGrouped(resolveSnapshot(snapshot), range.retain(), keyPrefixLength + keySuffixLength)
.flatMapSequential(rangeKeys -> {
return Flux
.defer(() -> dictionary
.getRangeKeysGrouped(resolveSnapshot(snapshot), range.retain(),
keyPrefixLength + keySuffixLength)
)
.flatMapSequential(rangeKeys -> Flux
.using(
() -> {
assert this.subStageGetter.isMultiKey() || rangeKeys.size() == 1;
ByteBuf groupKeyWithExt = rangeKeys.get(0).retain();
ByteBuf groupKeyWithoutExt = removeExtFromFullKey(groupKeyWithExt.retain());
ByteBuf groupSuffix = this.stripPrefix(groupKeyWithoutExt.retain());
assert subStageKeysConsistency(groupKeyWithExt.readableBytes());
return this.subStageGetter
return new GroupBuffers(groupKeyWithExt, groupKeyWithoutExt, groupSuffix);
},
buffers -> Mono
.fromCallable(() -> {
assert subStageKeysConsistency(buffers.groupKeyWithExt.readableBytes());
return null;
})
.then(this.subStageGetter
.subStage(dictionary,
snapshot,
groupKeyWithoutExt,
Flux.fromIterable(rangeKeys)
buffers.groupKeyWithoutExt.retain(),
Flux
.fromIterable(rangeKeys)
.map(ByteBuf::retain)
)
.map(us -> Map.entry(this.deserializeSuffix(buffers.groupSuffix.retain()), us))
),
buffers -> {
buffers.groupSuffix.release();
buffers.groupKeyWithoutExt.release();
buffers.groupKeyWithExt.release();
}
)
.map(us -> Map.entry(this.deserializeSuffix(wrappedUnmodifiableBuffer(groupSuffix.retain())), us))
.doFinally(s -> {
groupSuffix.release();
groupKeyWithoutExt.release();
groupKeyWithExt.release();
});
for (ByteBuf rangeKey : rangeKeys) {
rangeKey.release();
}
})
)
.doOnDiscard(Collection.class, discardedCollection -> {
//noinspection unchecked
var rangeKeys = (Collection<ByteBuf>) discardedCollection;
for (ByteBuf rangeKey : rangeKeys) {
rangeKey.release();
}
});
} else {
return dictionary
.getRangeKeyPrefixes(resolveSnapshot(snapshot), range, keyPrefixLength + keySuffixLength)
return Flux
.defer(() -> dictionary
.getRangeKeyPrefixes(resolveSnapshot(snapshot), range.retain(),
keyPrefixLength + keySuffixLength)
)
.flatMapSequential(groupKeyWithoutExt -> {
ByteBuf groupSuffix = this.stripPrefix(groupKeyWithoutExt);
ByteBuf groupSuffix = this.stripPrefix(groupKeyWithoutExt.retain());
assert subStageKeysConsistency(groupKeyWithoutExt.readableBytes() + keyExtLength);
return this.subStageGetter
.subStage(dictionary,
snapshot,
groupKeyWithoutExt,
groupKeyWithoutExt.retain(),
Flux.empty()
)
.map(us -> Map.entry(this.deserializeSuffix(wrappedUnmodifiableBuffer(groupSuffix)), us));
.map(us -> Map.entry(this.deserializeSuffix(groupSuffix.retain()), us))
.doFinally(s -> groupSuffix.release());
});
}
}
@ -408,12 +507,22 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
@Override
public Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) {
return getAllStages(null)
.flatMapSequential(stage -> stage.getValue().get(null).map(val -> Map.entry(stage.getKey(), val)))
.concatWith(clear().then(entries
.flatMap(entry -> at(null, entry.getKey()).map(us -> Tuples.of(us, entry.getValue())))
.flatMap(tuple -> tuple.getT1().set(tuple.getT2()))
.then(Mono.empty())));
return this
.getAllValues(null)
.concatWith(this
.clear()
.then(entries
.flatMap(entry -> this
.at(null, entry.getKey())
.flatMap(us -> us
.set(entry.getValue())
.doFinally(s -> us.release())
)
)
.doOnDiscard(DatabaseStage.class, DatabaseStage::release)
.then(Mono.empty())
)
);
}
@Override
@ -422,38 +531,47 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
return dictionary
.clear();
} else if (range.isSingle()) {
return dictionary
return Mono
.defer(() -> dictionary
.remove(range.getSingle().retain(), LLDictionaryResultType.VOID)
)
.then();
} else {
return dictionary
.setRange(range.retain(), Flux.empty(), false)
.then();
return Mono
.defer(() -> dictionary
.setRange(range.retain(), Flux.empty())
);
}
}
//todo: temporary wrapper. convert the whole class to buffers
protected T deserializeSuffix(ByteBuf keySuffix) {
try {
assert suffixKeyConsistency(keySuffix.readableBytes());
return keySuffixSerializer.deserialize(keySuffix);
var result = keySuffixSerializer.deserialize(keySuffix.retain());
assert keyPrefix.refCnt() > 0;
return result;
} finally {
keySuffix.release();
}
}
//todo: temporary wrapper. convert the whole class to buffers
protected ByteBuf serializeSuffix(T keySuffix) {
ByteBuf suffixData = keySuffixSerializer.serialize(keySuffix);
assert suffixKeyConsistency(suffixData.readableBytes());
assert keyPrefix.refCnt() > 0;
return suffixData;
}
@Override
protected void finalize() throws Throwable {
super.finalize();
range.release();
}
@Override
public void release() {
if (!released) {
released = true;
this.range.release();
this.keyPrefix.release();
} else {
throw new IllegalStateException("Already released");
}
}
}

View File

@ -7,6 +7,7 @@ import io.netty.buffer.ByteBufAllocator;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.collections.Joiner.ValueGetter;
import it.cavallium.dbengine.database.collections.JoinerBlocking.ValueGetterBlocking;
import it.cavallium.dbengine.database.serialization.Serializer;
@ -34,16 +35,21 @@ public class DatabaseMapDictionaryHashed<T, U, TH> implements DatabaseStageMap<T
Serializer<U, ByteBuf> valueSerializer,
Function<T, TH> keySuffixHashFunction,
SerializerFixedBinaryLength<TH, ByteBuf> keySuffixHashSerializer) {
try {
ValueWithHashSerializer<T, U> valueWithHashSerializer = new ValueWithHashSerializer<>(keySuffixSerializer,
valueSerializer
);
this.alloc = dictionary.getAllocator();
this.valueMapper = ValueMapper::new;
this.subDictionary = DatabaseMapDictionary.tail(dictionary,
prefixKey,
keySuffixHashSerializer, valueWithHashSerializer
prefixKey.retain(),
keySuffixHashSerializer,
valueWithHashSerializer
);
this.keySuffixHashFunction = keySuffixHashFunction;
} finally {
prefixKey.release();
}
}
private class ValueWithHashSerializer<T, U> implements Serializer<Entry<T, U>, ByteBuf> {
@ -72,9 +78,18 @@ public class DatabaseMapDictionaryHashed<T, U, TH> implements DatabaseStageMap<T
public @NotNull ByteBuf serialize(@NotNull Entry<T, U> deserialized) {
ByteBuf keySuffix = keySuffixSerializer.serialize(deserialized.getKey());
ByteBuf value = valueSerializer.serialize(deserialized.getValue());
ByteBuf keySuffixLen = alloc.buffer(Integer.BYTES, Integer.BYTES);
try {
ByteBuf keySuffixLen = alloc.directBuffer(Integer.BYTES, Integer.BYTES);
try {
keySuffixLen.writeInt(keySuffix.readableBytes());
return LLUtils.directCompositeBuffer(alloc, keySuffixLen, keySuffix, value);
return LLUtils.directCompositeBuffer(alloc, keySuffixLen.retain(), keySuffix.retain(), value.retain());
} finally {
keySuffixLen.release();
}
} finally {
keySuffix.release();
value.release();
}
}
}
@ -226,6 +241,11 @@ public class DatabaseMapDictionaryHashed<T, U, TH> implements DatabaseStageMap<T
return subDictionary.putValue(keySuffixHashFunction.apply(key), Map.entry(key, value));
}
@Override
public Mono<UpdateMode> getUpdateMode() {
return subDictionary.getUpdateMode();
}
@Override
public Mono<Boolean> updateValue(T key, boolean existsAlmostCertainly, Function<@Nullable U, @Nullable U> updater) {
return subDictionary.updateValue(keySuffixHashFunction.apply(key), existsAlmostCertainly, old -> {
@ -258,9 +278,9 @@ public class DatabaseMapDictionaryHashed<T, U, TH> implements DatabaseStageMap<T
}
@Override
public Mono<Boolean> putValueAndGetStatus(T key, U value) {
public Mono<Boolean> putValueAndGetChanged(T key, U value) {
return subDictionary
.putValueAndGetStatus(keySuffixHashFunction.apply(key), Map.entry(key, value));
.putValueAndGetChanged(keySuffixHashFunction.apply(key), Map.entry(key, value));
}
@Override

View File

@ -20,12 +20,16 @@ public class DatabaseSingle<U> implements DatabaseStageEntry<U> {
private final Serializer<U, ByteBuf> serializer;
public DatabaseSingle(LLDictionary dictionary, ByteBuf key, Serializer<U, ByteBuf> serializer) {
try {
this.dictionary = dictionary;
if (!key.isDirect()) {
throw new IllegalArgumentException("Key must be direct");
}
this.key = key;
this.key = key.retain();
this.serializer = serializer;
} finally {
key.release();
}
}
private LLSnapshot resolveSnapshot(@Nullable CompositeSnapshot snapshot) {

View File

@ -25,13 +25,18 @@ public interface DatabaseStage<T> extends DatabaseStageWithEntry<T> {
}
default Mono<Void> set(T value) {
return setAndGetChanged(value).then();
return this
.setAndGetChanged(value)
.then();
}
Mono<T> setAndGetPrevious(T value);
default Mono<Boolean> setAndGetChanged(T value) {
return setAndGetPrevious(value).map(oldValue -> !Objects.equals(oldValue, value)).defaultIfEmpty(value != null);
return this
.setAndGetPrevious(value)
.map(oldValue -> !Objects.equals(oldValue, value))
.switchIfEmpty(Mono.fromSupplier(() -> value != null));
}
Mono<Boolean> update(Function<@Nullable T, @Nullable T> updater, boolean existsAlmostCertainly);

View File

@ -1,15 +1,20 @@
package it.cavallium.dbengine.database.collections;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.collections.Joiner.ValueGetter;
import it.cavallium.dbengine.database.collections.JoinerBlocking.ValueGetterBlocking;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
@SuppressWarnings("unused")
public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends DatabaseStageEntry<Map<T, U>> {
@ -32,8 +37,16 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
return at(null, key).single().flatMap(v -> v.set(value).doFinally(s -> v.release()));
}
Mono<UpdateMode> getUpdateMode();
default Mono<Boolean> updateValue(T key, boolean existsAlmostCertainly, Function<@Nullable U, @Nullable U> updater) {
return at(null, key).single().flatMap(v -> v.update(updater, existsAlmostCertainly).doFinally(s -> v.release()));
return this
.at(null, key)
.single()
.flatMap(v -> v
.update(updater, existsAlmostCertainly)
.doFinally(s -> v.release())
);
}
default Mono<Boolean> updateValue(T key, Function<@Nullable U, @Nullable U> updater) {
@ -50,7 +63,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
* @param value
* @return true if the key was associated with any value, false if the key didn't exist.
*/
default Mono<Boolean> putValueAndGetStatus(T key, U value) {
default Mono<Boolean> putValueAndGetChanged(T key, U value) {
return at(null, key).single().flatMap(v -> v.setAndGetChanged(value).doFinally(s -> v.release())).single();
}
@ -89,6 +102,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
.getValue()
.get(snapshot, true)
.map(value -> Map.entry(entry.getKey(), value))
.doFinally(s -> entry.getValue().release())
);
}
@ -111,7 +125,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
.flatMap(entriesReplacer)
.flatMap(replacedEntry -> this
.at(null, replacedEntry.getKey())
.map(v -> v.set(replacedEntry.getValue()).doFinally(s -> v.release())))
.flatMap(v -> v.set(replacedEntry.getValue()).doFinally(s -> v.release())))
.then();
}
}
@ -119,7 +133,10 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
default Mono<Void> replaceAll(Function<Entry<T, US>, Mono<Void>> entriesReplacer) {
return this
.getAllStages(null)
.flatMap(entriesReplacer)
.flatMap(stage -> Mono
.defer(() -> entriesReplacer.apply(stage))
.doFinally(s -> stage.getValue().release())
)
.then();
}
@ -130,27 +147,55 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
.collectMap(Entry::getKey, Entry::getValue, HashMap::new);
}
@Override
default Mono<Boolean> setAndGetChanged(Map<T, U> value) {
return this
.setAndGetPrevious(value)
.map(oldValue -> !Objects.equals(oldValue, value))
.switchIfEmpty(Mono.fromSupplier(() -> !value.isEmpty()));
}
@Override
default Mono<Boolean> update(Function<@Nullable Map<T, U>, @Nullable Map<T, U>> updater, boolean existsAlmostCertainly) {
return this
.getUpdateMode()
.single()
.flatMap(updateMode -> {
if (updateMode == UpdateMode.ALLOW_UNSAFE) {
return this
.getAllValues(null)
.collectMap(Entry::getKey, Entry::getValue, HashMap::new)
.single()
.<Map<T, U>>handle((v, sink) -> {
if (v == null || v.isEmpty()) {
sink.complete();
} else {
.<Tuple2<Optional<Map<T, U>>, Boolean>>handle((v, sink) -> {
if (v.isEmpty()) {
v = null;
}
var result = updater.apply(v);
if (result == null) {
sink.complete();
} else {
sink.next(result);
}
if (result != null && result.isEmpty()) {
result = null;
}
boolean changed = !Objects.equals(v, result);
sink.next(Tuples.of(Optional.ofNullable(result), changed));
})
.flatMap(result -> Mono
.justOrEmpty(result.getT1())
.flatMap(values -> this.setAllValues(Flux.fromIterable(values.entrySet())))
//todo: can be optimized by calculating the correct return value
.thenReturn(true);
.thenReturn(result.getT2())
);
} else if (updateMode == UpdateMode.ALLOW) {
return Mono.fromCallable(() -> {
throw new UnsupportedOperationException("Maps can't be updated atomically");
});
} else if (updateMode == UpdateMode.DISALLOW) {
return Mono.fromCallable(() -> {
throw new UnsupportedOperationException("Map can't be updated because updates are disabled");
});
} else {
return Mono.fromCallable(() -> {
throw new UnsupportedOperationException("Unknown update mode: " + updateMode);
});
}
});
}
@Override
@ -166,7 +211,12 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
@Override
default Mono<Long> leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) {
return getAllStages(snapshot).count();
return getAllStages(snapshot)
.flatMap(stage -> Mono
.fromRunnable(() -> stage.getValue().release())
.thenReturn(true)
)
.count();
}
/**

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.database.collections;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.serialization.Serializer;
@ -44,17 +45,22 @@ public class SubStageGetterHashMap<T, U, TH> implements
@Nullable CompositeSnapshot snapshot,
ByteBuf prefixKey,
Flux<ByteBuf> debuggingKeyFlux) {
Mono<DatabaseMapDictionaryHashed<T, U, TH>> result = Mono.just(DatabaseMapDictionaryHashed.tail(dictionary,
prefixKey,
Mono<DatabaseMapDictionaryHashed<T, U, TH>> result = Mono.fromSupplier(() -> DatabaseMapDictionaryHashed.tail(dictionary,
prefixKey.retain(),
keySerializer,
valueSerializer,
keyHashFunction,
keyHashSerializer
));
if (assertsEnabled) {
return checkKeyFluxConsistency(prefixKey, debuggingKeyFlux).then(result);
return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeyFlux)
.then(result)
.doFinally(s -> prefixKey.release());
} else {
return result;
return debuggingKeyFlux
.flatMap(key -> Mono.fromRunnable(key::release))
.then(result)
.doFinally(s -> prefixKey.release());
}
}
@ -69,9 +75,14 @@ public class SubStageGetterHashMap<T, U, TH> implements
}
private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, Flux<ByteBuf> keyFlux) {
return keyFlux.doOnNext(key -> {
return keyFlux
.doOnNext(key -> {
assert key.readableBytes() == prefixKey.readableBytes() + getKeyHashBinaryLength();
}).then();
})
.flatMap(key -> Mono.fromRunnable(key::release))
.doOnDiscard(ByteBuf.class, ReferenceCounted::release)
.then()
.doFinally(s -> prefixKey.release());
}
public int getKeyHashBinaryLength() {

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.database.collections;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing;
@ -42,16 +43,21 @@ public class SubStageGetterHashSet<T, TH> implements
@Nullable CompositeSnapshot snapshot,
ByteBuf prefixKey,
Flux<ByteBuf> debuggingKeyFlux) {
Mono<DatabaseSetDictionaryHashed<T, TH>> result = Mono.just(DatabaseSetDictionaryHashed.tail(dictionary,
prefixKey,
Mono<DatabaseSetDictionaryHashed<T, TH>> result = Mono.fromSupplier(() -> DatabaseSetDictionaryHashed.tail(dictionary,
prefixKey.retain(),
keySerializer,
keyHashFunction,
keyHashSerializer
));
if (assertsEnabled) {
return checkKeyFluxConsistency(prefixKey, debuggingKeyFlux).then(result);
return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeyFlux)
.then(result)
.doFinally(s -> prefixKey.release());
} else {
return result;
return debuggingKeyFlux
.flatMap(key -> Mono.fromRunnable(key::release))
.then(result)
.doFinally(s -> prefixKey.release());
}
}
@ -66,9 +72,14 @@ public class SubStageGetterHashSet<T, TH> implements
}
private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, Flux<ByteBuf> keyFlux) {
return keyFlux.doOnNext(key -> {
return keyFlux
.doOnNext(key -> {
assert key.readableBytes() == prefixKey.readableBytes() + getKeyHashBinaryLength();
}).then();
})
.flatMap(key -> Mono.fromRunnable(key::release))
.doOnDiscard(ByteBuf.class, ReferenceCounted::release)
.then()
.doFinally(s -> prefixKey.release());
}
public int getKeyHashBinaryLength() {

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.database.collections;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.serialization.Serializer;
@ -35,14 +36,27 @@ public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, Databa
@Nullable CompositeSnapshot snapshot,
ByteBuf prefixKey,
Flux<ByteBuf> debuggingKeyFlux) {
Mono<DatabaseMapDictionary<T, U>> result = Mono.just(DatabaseMapDictionary.tail(dictionary, prefixKey, keySerializer,
valueSerializer
));
return Mono
.using(
() -> true,
b -> Mono
.fromSupplier(() -> DatabaseMapDictionary.tail(dictionary, prefixKey.retain(), keySerializer, valueSerializer))
.doOnDiscard(DatabaseMapDictionary.class, DatabaseMapDictionary::release)
.transformDeferred(result -> {
if (assertsEnabled) {
return checkKeyFluxConsistency(prefixKey, debuggingKeyFlux).then(result);
return this
.checkKeyFluxConsistency(prefixKey.retain(), debuggingKeyFlux)
.then(result);
} else {
return result;
return debuggingKeyFlux
.flatMap(buf -> Mono.fromRunnable(buf::release))
.doOnDiscard(ByteBuf.class, ReferenceCounted::release)
.then(result);
}
})
.doOnDiscard(DatabaseMapDictionary.class, DatabaseMapDictionary::release),
b -> prefixKey.release()
);
}
@Override
@ -56,9 +70,14 @@ public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, Databa
}
private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, Flux<ByteBuf> keyFlux) {
return keyFlux.doOnNext(key -> {
return keyFlux
.doOnNext(key -> {
assert key.readableBytes() == prefixKey.readableBytes() + getKeyBinaryLength();
}).then();
})
.flatMap(key -> Mono.fromRunnable(key::release))
.doOnDiscard(ByteBuf.class, ReferenceCounted::release)
.then()
.doFinally(s -> prefixKey.release());
}
public int getKeyBinaryLength() {

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.database.collections;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
@ -49,17 +50,24 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
@Nullable CompositeSnapshot snapshot,
ByteBuf prefixKey,
Flux<ByteBuf> debuggingKeyFlux) {
Mono<DatabaseMapDictionaryDeep<T, U, US>> result = Mono.just(DatabaseMapDictionaryDeep.deepIntermediate(dictionary,
prefixKey,
return Flux
.defer(() -> {
if (assertsEnabled) {
return this
.checkKeyFluxConsistency(prefixKey.retain(), debuggingKeyFlux);
} else {
return debuggingKeyFlux.flatMap(buf -> Mono.fromRunnable(buf::release));
}
})
.then(Mono
.fromSupplier(() -> DatabaseMapDictionaryDeep.deepIntermediate(dictionary,
prefixKey.retain(),
keySerializer,
subStageGetter,
keyExtLength
));
if (assertsEnabled) {
return checkKeyFluxConsistency(prefixKey, debuggingKeyFlux).then(result);
} else {
return result;
}
))
)
.doFinally(s -> prefixKey.release());
}
@Override
@ -73,9 +81,14 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
}
private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, Flux<ByteBuf> keyFlux) {
return keyFlux.doOnNext(key -> {
return keyFlux
.doOnNext(key -> {
assert key.readableBytes() == prefixKey.readableBytes() + getKeyBinaryLength();
}).then();
})
.flatMap(key -> Mono.fromRunnable(key::release))
.doOnDiscard(ByteBuf.class, ReferenceCounted::release)
.then()
.doFinally(s -> prefixKey.release());
}
public int getKeyBinaryLength() {

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.database.collections;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing;
@ -32,16 +33,17 @@ public class SubStageGetterSet<T> implements SubStageGetter<Map<T, Nothing>, Dat
@Nullable CompositeSnapshot snapshot,
ByteBuf prefixKey,
Flux<ByteBuf> debuggingKeyFlux) {
try {
Mono<DatabaseSetDictionary<T>> result = Mono
.fromSupplier(() -> DatabaseSetDictionary.tail(dictionary, prefixKey.retain(), keySerializer));
if (assertsEnabled) {
return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeyFlux).then(result);
return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeyFlux)
.then(result)
.doFinally(s -> prefixKey.release());
} else {
return result;
}
} finally {
prefixKey.release();
return debuggingKeyFlux
.flatMap(key -> Mono.fromRunnable(key::release))
.then(result)
.doFinally(s -> prefixKey.release());
}
}
@ -56,9 +58,14 @@ public class SubStageGetterSet<T> implements SubStageGetter<Map<T, Nothing>, Dat
}
private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, Flux<ByteBuf> keyFlux) {
return keyFlux.doOnNext(key -> {
return keyFlux
.doOnNext(key -> {
assert key.readableBytes() == prefixKey.readableBytes() + getKeyBinaryLength();
}).doFinally(s -> prefixKey.release()).then();
})
.flatMap(key -> Mono.fromRunnable(key::release))
.doOnDiscard(ByteBuf.class, ReferenceCounted::release)
.then()
.doFinally(s -> prefixKey.release());
}
public int getKeyBinaryLength() {

View File

@ -36,13 +36,20 @@ public class SubStageGetterSingle<T> implements SubStageGetter<T, DatabaseStageE
.singleOrEmpty()
.flatMap(key -> Mono
.<DatabaseStageEntry<T>>fromCallable(() -> {
try {
if (!LLUtils.equals(keyPrefix, key)) {
throw new IndexOutOfBoundsException("Found more than one element!");
}
} finally {
key.release();
}
return null;
})
)
.then(Mono.fromSupplier(() -> new DatabaseSingle<>(dictionary, keyPrefix, serializer)));
.then(Mono
.<DatabaseStageEntry<T>>fromSupplier(() -> new DatabaseSingle<>(dictionary, keyPrefix.retain(), serializer))
)
.doFinally(s -> keyPrefix.release());
}
@Override

View File

@ -16,6 +16,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -45,6 +46,7 @@ import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Slice;
import org.rocksdb.Snapshot;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
import org.warp.commonutils.locks.Striped;
@ -72,6 +74,14 @@ public class LLLocalDictionary implements LLDictionary {
static final boolean PREFER_SEEK_TO_FIRST = false;
static final boolean VERIFY_CHECKSUMS_WHEN_NOT_NEEDED = false;
public static final boolean DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED = true;
/**
* Default: true. Use false to debug problems with write batches.
*/
static final boolean USE_WRITE_BATCHES_IN_SET_RANGE = false;
/**
* Default: true. Use false to debug problems with capped write batches.
*/
static final boolean USE_CAPPED_WRITE_BATCH_IN_SET_RANGE = false;
static final boolean PARALLEL_EXACT_SIZE = true;
private static final int STRIPES = 512;
@ -199,14 +209,7 @@ public class LLLocalDictionary implements LLDictionary {
throw new RocksDBException("Key buffer must be direct");
}
try {
ByteBuf keyDirectBuf = key.retain();
ByteBuffer keyNioBuffer = LLUtils.toDirectFast(keyDirectBuf.retain());
if (keyNioBuffer == null) {
keyDirectBuf.release();
keyDirectBuf = LLUtils.toDirectCopy(key.retain());
keyNioBuffer = keyDirectBuf.nioBuffer();
}
try {
ByteBuffer keyNioBuffer = LLUtils.toDirect(key);
assert keyNioBuffer.isDirect();
// Create a direct result buffer because RocksDB works only with direct buffers
ByteBuf resultBuf = alloc.directBuffer();
@ -270,9 +273,6 @@ public class LLLocalDictionary implements LLDictionary {
} finally {
resultBuf.release();
}
} finally {
keyDirectBuf.release();
}
} finally {
key.release();
}
@ -287,33 +287,13 @@ public class LLLocalDictionary implements LLDictionary {
throw new RocksDBException("Value buffer must be direct");
}
try {
ByteBuf keyDirectBuffer = key.retain();
var keyNioBuffer = LLUtils.toDirectFast(keyDirectBuffer.retain());
if (keyNioBuffer == null) {
keyDirectBuffer.release();
keyDirectBuffer = LLUtils.toDirectCopy(key.retain());
keyNioBuffer = keyDirectBuffer.nioBuffer();
}
try {
var keyNioBuffer = LLUtils.toDirect(key);
assert keyNioBuffer.isDirect();
ByteBuf valueDirectBuffer = value.retain();
var valueNioBuffer = LLUtils.toDirectFast(valueDirectBuffer.retain());
if (valueNioBuffer == null) {
valueDirectBuffer.release();
valueDirectBuffer = LLUtils.toDirectCopy(value.retain());
valueNioBuffer = valueDirectBuffer.nioBuffer();
}
try {
var valueNioBuffer = LLUtils.toDirect(value);
assert valueNioBuffer.isDirect();
db.put(cfh, Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS), keyNioBuffer, valueNioBuffer);
} finally {
valueDirectBuffer.release();
}
} finally {
keyDirectBuffer.release();
}
} finally {
key.release();
value.release();
@ -322,13 +302,16 @@ public class LLLocalDictionary implements LLDictionary {
@Override
public Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, LLRange range) {
Mono<Boolean> contains;
return Mono
.defer(() -> {
if (range.isSingle()) {
contains = containsKey(snapshot, range.getSingle().retain());
return containsKey(snapshot, range.getSingle().retain());
} else {
contains = containsRange(snapshot, range.retain());
return containsRange(snapshot, range.retain());
}
return contains.map(isContained -> !isContained).doFinally(s -> range.release());
})
.map(isContained -> !isContained)
.doFinally(s -> range.release());
}
public Mono<Boolean> containsRange(@Nullable LLSnapshot snapshot, LLRange range) {
@ -338,18 +321,18 @@ public class LLLocalDictionary implements LLDictionary {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
readOpts.setFillCache(false);
if (range.hasMin()) {
readOpts.setIterateLowerBound(new DirectSlice(Objects.requireNonNull(LLUtils.toDirectFast(range.getMin().retain()),
readOpts.setIterateLowerBound(new DirectSlice(Objects.requireNonNull(LLUtils.toDirect(range.getMin()),
"This range must use direct buffers"
)));
}
if (range.hasMax()) {
readOpts.setIterateUpperBound(new DirectSlice(Objects.requireNonNull(LLUtils.toDirectFast(range.getMax().retain()),
readOpts.setIterateUpperBound(new DirectSlice(Objects.requireNonNull(LLUtils.toDirect(range.getMax()),
"This range must use direct buffers"
)));
}
try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterator.seek(Objects.requireNonNull(LLUtils.toDirectFast(range.getMin().retain()),
rocksIterator.seek(Objects.requireNonNull(LLUtils.toDirect(range.getMin()),
"This range must use direct buffers"
));
} else {
@ -411,9 +394,10 @@ public class LLLocalDictionary implements LLDictionary {
throw new IllegalArgumentException("Value must not be direct");
});
}
return getPreviousData(key.retain(), resultType)
return Mono
.defer(() -> getPreviousData(key.retain(), resultType))
.concatWith(Mono
.fromCallable(() -> {
.<ByteBuf>fromCallable(() -> {
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
@ -435,7 +419,6 @@ public class LLLocalDictionary implements LLDictionary {
throw new IllegalArgumentException("Value must not be direct");
}
dbPut(cfh, null, key.retain(), value.retain());
assert value.refCnt() > 0;
return null;
} finally {
if (updateMode == UpdateMode.ALLOW) {
@ -443,9 +426,8 @@ public class LLLocalDictionary implements LLDictionary {
}
}
})
.onErrorMap(cause -> new IOException("Failed to write " + LLUtils.toString(key), cause))
.subscribeOn(dbScheduler)
.then(Mono.empty())
.onErrorMap(cause -> new IOException("Failed to write " + LLUtils.toString(key), cause))
)
.singleOrEmpty()
.doFinally(s -> {
@ -454,6 +436,11 @@ public class LLLocalDictionary implements LLDictionary {
});
}
@Override
public Mono<UpdateMode> getUpdateMode() {
return Mono.fromSupplier(() -> updateMode);
}
@Override
public Mono<Boolean> update(ByteBuf key,
Function<@Nullable ByteBuf, @Nullable ByteBuf> updater,
@ -574,18 +561,8 @@ public class LLLocalDictionary implements LLDictionary {
if (!key.isDirect()) {
throw new IllegalArgumentException("Key must be a direct buffer");
}
ByteBuf keyDirectBuffer = key.retain();
var keyNioBuffer = LLUtils.toDirectFast(keyDirectBuffer.retain());
if (keyNioBuffer == null) {
keyDirectBuffer.release();
keyDirectBuffer = LLUtils.toDirectCopy(key.retain());
keyNioBuffer = keyDirectBuffer.nioBuffer();
}
try {
var keyNioBuffer = LLUtils.toDirect(key);
db.delete(cfh, Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS), keyNioBuffer);
} finally {
keyDirectBuffer.release();
}
} finally {
key.release();
}
@ -593,7 +570,8 @@ public class LLLocalDictionary implements LLDictionary {
@Override
public Mono<ByteBuf> remove(ByteBuf key, LLDictionaryResultType resultType) {
return getPreviousData(key.retain(), resultType)
return Mono
.defer(() -> getPreviousData(key.retain(), resultType))
.concatWith(Mono
.fromCallable(() -> {
StampedLock lock;
@ -626,19 +604,19 @@ public class LLLocalDictionary implements LLDictionary {
}
private Mono<ByteBuf> getPreviousData(ByteBuf key, LLDictionaryResultType resultType) {
Mono<ByteBuf> prevValue;
return Mono
.defer(() -> {
switch (resultType) {
case PREVIOUS_VALUE_EXISTENCE:
prevValue = this
return this
.containsKey(null, key.retain())
.single()
.map(LLUtils::booleanToResponseByteBuffer)
.doFinally(s -> {
assert key.refCnt() > 0;
});
break;
case PREVIOUS_VALUE:
prevValue = Mono
return Mono
.fromCallable(() -> {
StampedLock lock;
long stamp;
@ -676,15 +654,13 @@ public class LLLocalDictionary implements LLDictionary {
})
.onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toString(key), cause))
.subscribeOn(dbScheduler);
break;
case VOID:
prevValue = Mono.empty();
break;
return Mono.empty();
default:
prevValue = Mono.error(new IllegalStateException("Unexpected value: " + resultType));
break;
return Mono.error(new IllegalStateException("Unexpected value: " + resultType));
}
return prevValue.doFinally(s -> key.release());
})
.doFinally(s -> key.release());
}
@Override
@ -693,7 +669,14 @@ public class LLLocalDictionary implements LLDictionary {
boolean existsAlmostCertainly) {
return keys
.window(MULTI_GET_WINDOW)
.flatMap(keysWindowFlux -> keysWindowFlux.collectList()
.flatMap(keysWindowFlux -> keysWindowFlux
.collectList()
.doOnDiscard(Entry.class, discardedEntry -> {
//noinspection unchecked
var entry = (Entry<ByteBuf, ByteBuf>) discardedEntry;
entry.getKey().release();
entry.getValue().release();
})
.flatMapMany(keysWindow -> Mono
.fromCallable(() -> {
Iterable<StampedLock> locks;
@ -739,28 +722,37 @@ public class LLLocalDictionary implements LLDictionary {
+ Arrays.deepToString(keysWindow.toArray(ByteBuf[]::new)), cause))
.doFinally(s -> keysWindow.forEach(ReferenceCounted::release))
)
);
)
.doOnDiscard(Entry.class, discardedEntry -> {
//noinspection unchecked
var entry = (Entry<ByteBuf, ByteBuf>) discardedEntry;
entry.getKey().release();
entry.getValue().release();
});
}
@Override
public Flux<Entry<ByteBuf, ByteBuf>> putMulti(Flux<Entry<ByteBuf, ByteBuf>> entries, boolean getOldValues) {
return entries
.window(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP))
.doOnDiscard(Entry.class, entry -> {
//noinspection unchecked
var castedEntry = (Entry<ByteBuf, ByteBuf>) entry;
castedEntry.getKey().release();
castedEntry.getValue().release();
})
.flatMap(Flux::collectList)
.flatMap(entriesWindow -> {
Flux<Entry<ByteBuf, ByteBuf>> oldValues;
if (getOldValues) {
oldValues = this
.getMulti(null, Flux
.fromIterable(entriesWindow)
.map(Entry::getKey)
.map(ByteBuf::retain), false)
.publishOn(dbScheduler);
} else {
oldValues = Flux.empty();
}
return oldValues
.concatWith(Mono.fromCallable(() -> {
.doOnDiscard(Entry.class, entry -> {
//noinspection unchecked
var castedEntry = (Entry<ByteBuf, ByteBuf>) entry;
castedEntry.getKey().release();
castedEntry.getValue().release();
})
.flatMap(ew -> Mono
.using(
() -> ew,
entriesWindow -> Mono
.<Entry<ByteBuf, ByteBuf>>fromCallable(() -> {
Iterable<StampedLock> locks;
ArrayList<Long> stamps;
if (updateMode == UpdateMode.ALLOW) {
@ -774,6 +766,7 @@ public class LLLocalDictionary implements LLDictionary {
stamps = null;
}
try {
if (USE_WRITE_BATCHES_IN_SET_RANGE) {
var batch = new CappedWriteBatch(db,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
@ -785,6 +778,11 @@ public class LLLocalDictionary implements LLDictionary {
}
batch.writeToDbAndClose();
batch.close();
} else {
for (Entry<ByteBuf, ByteBuf> entry : entriesWindow) {
db.put(cfh, LLUtils.toArray(entry.getKey()), LLUtils.toArray(entry.getValue()));
}
}
return null;
} finally {
if (updateMode == UpdateMode.ALLOW) {
@ -795,122 +793,165 @@ public class LLLocalDictionary implements LLDictionary {
}
}
}
}))
.doFinally(s -> entriesWindow.forEach(entry -> {
})
// Prepend everything to get previous elements
.transformDeferred(transformer -> {
if (getOldValues) {
return this
.getMulti(null, Flux
.fromIterable(entriesWindow)
.map(Entry::getKey)
.map(ByteBuf::retain), false)
.publishOn(dbScheduler)
.then(transformer);
} else {
return transformer;
}
}),
entriesWindow -> {
for (Entry<ByteBuf, ByteBuf> entry : entriesWindow) {
entry.getKey().release();
entry.getValue().release();
}));
}
);
}
)
)
.doOnDiscard(Collection.class, obj -> {
//noinspection unchecked
var castedEntries = (Collection<Entry<ByteBuf, ByteBuf>>) obj;
for (Entry<ByteBuf, ByteBuf> entry : castedEntries) {
entry.getKey().release();
entry.getValue().release();
}
});
}
@NotNull
private Mono<Entry<ByteBuf, ByteBuf>> putEntryToWriteBatch(Entry<ByteBuf, ByteBuf> newEntry,
boolean getOldValues, CappedWriteBatch writeBatch) {
Mono<ByteBuf> getOldValueMono;
if (getOldValues) {
getOldValueMono = get(null, newEntry.getKey().retain(), false);
} else {
getOldValueMono = Mono.empty();
}
return getOldValueMono
.concatWith(Mono
.<ByteBuf>fromCallable(() -> {
writeBatch.put(cfh, newEntry.getKey().retain(), newEntry.getValue().retain());
private Mono<Void> putEntryToWriteBatch(Entry<ByteBuf, ByteBuf> newEntry, CappedWriteBatch writeBatch) {
return Mono
.<Void>fromCallable(() -> {
writeBatch.put(cfh, newEntry.getKey(), newEntry.getValue());
return null;
})
.subscribeOn(dbScheduler)
)
.singleOrEmpty()
.map(oldValue -> Map.entry(newEntry.getKey().retain(), oldValue))
.doFinally(s -> {
newEntry.getKey().release();
newEntry.getValue().release();
});
.subscribeOn(dbScheduler);
}
@Override
public Flux<Entry<ByteBuf, ByteBuf>> getRange(@Nullable LLSnapshot snapshot,
LLRange range,
boolean existsAlmostCertainly) {
Flux<Entry<ByteBuf, ByteBuf>> result;
return Flux
.defer(() -> {
if (range.isSingle()) {
result = getRangeSingle(snapshot, range.getMin().retain(), existsAlmostCertainly);
return getRangeSingle(snapshot, range.getMin().retain(), existsAlmostCertainly);
} else {
result = getRangeMulti(snapshot, range.retain());
return getRangeMulti(snapshot, range.retain());
}
return result.doFinally(s -> range.release());
})
.doFinally(s -> range.release());
}
@Override
public Flux<List<Entry<ByteBuf, ByteBuf>>> getRangeGrouped(@Nullable LLSnapshot snapshot,
LLRange range,
int prefixLength, boolean existsAlmostCertainly) {
Flux<List<Entry<ByteBuf, ByteBuf>>> result;
return Flux
.defer(() -> {
if (range.isSingle()) {
result = getRangeSingle(snapshot, range.getMin().retain(), existsAlmostCertainly).map(List::of);
return getRangeSingle(snapshot, range.getMin().retain(), existsAlmostCertainly).map(List::of);
} else {
result = getRangeMultiGrouped(snapshot, range.retain(), prefixLength);
return getRangeMultiGrouped(snapshot, range.retain(), prefixLength);
}
return result.doFinally(s -> range.release());
})
.doFinally(s -> range.release());
}
private Flux<Entry<ByteBuf, ByteBuf>> getRangeSingle(LLSnapshot snapshot, ByteBuf key, boolean existsAlmostCertainly) {
return this
.get(snapshot, key.retain(), existsAlmostCertainly)
return Mono
.defer(() -> this.get(snapshot, key.retain(), existsAlmostCertainly))
.map(value -> Map.entry(key.retain(), value))
.flux()
.doFinally(s -> key.release());
}
private Flux<Entry<ByteBuf, ByteBuf>> getRangeMulti(LLSnapshot snapshot, LLRange range) {
return new LLLocalEntryReactiveRocksIterator(db, alloc, cfh, range.retain(), resolveSnapshot(snapshot))
.flux()
return Flux
.using(
() -> new LLLocalEntryReactiveRocksIterator(db, alloc, cfh, range.retain(), resolveSnapshot(snapshot)),
LLLocalReactiveRocksIterator::flux,
LLLocalReactiveRocksIterator::release
)
.doOnDiscard(Entry.class, entry -> {
//noinspection unchecked
var castedEntry = (Entry<ByteBuf, ByteBuf>) entry;
castedEntry.getKey().release();
castedEntry.getValue().release();
})
.subscribeOn(dbScheduler)
.doFinally(s -> range.release());
}
private Flux<List<Entry<ByteBuf, ByteBuf>>> getRangeMultiGrouped(LLSnapshot snapshot, LLRange range, int prefixLength) {
return new LLLocalGroupedEntryReactiveRocksIterator(db,
return Flux
.using(
() -> new LLLocalGroupedEntryReactiveRocksIterator(db,
alloc,
cfh,
prefixLength,
range.retain(),
resolveSnapshot(snapshot),
"getRangeMultiGrouped"
),
LLLocalGroupedReactiveRocksIterator::flux,
LLLocalGroupedReactiveRocksIterator::release
)
.flux()
.subscribeOn(dbScheduler)
.doFinally(s -> range.release());
}
@Override
public Flux<ByteBuf> getRangeKeys(@Nullable LLSnapshot snapshot, LLRange range) {
Flux<ByteBuf> result;
return Flux
.defer(() -> {
if (range.isSingle()) {
result = getRangeKeysSingle(snapshot, range.getMin().retain());
return this.getRangeKeysSingle(snapshot, range.getMin().retain());
} else {
result = getRangeKeysMulti(snapshot, range.retain());
return this.getRangeKeysMulti(snapshot, range.retain());
}
return result.doFinally(s -> range.release());
})
.doFinally(s -> range.release());
}
@Override
public Flux<List<ByteBuf>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) {
return new LLLocalGroupedKeyReactiveRocksIterator(db,
return Flux
.using(
() -> true,
b -> Flux
.using(
() -> new LLLocalGroupedKeyReactiveRocksIterator(db,
alloc,
cfh,
prefixLength,
range.retain(),
resolveSnapshot(snapshot),
"getRangeKeysGrouped"
).flux().subscribeOn(dbScheduler).doFinally(s -> range.release());
),
LLLocalGroupedReactiveRocksIterator::flux,
LLLocalGroupedReactiveRocksIterator::release
)
.subscribeOn(dbScheduler),
b -> range.release()
);
}
@Override
public Flux<ByteBuf> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) {
return new LLLocalKeyPrefixReactiveRocksIterator(db,
return Flux
.using(
() -> new LLLocalKeyPrefixReactiveRocksIterator(db,
alloc,
cfh,
prefixLength,
@ -918,74 +959,134 @@ public class LLLocalDictionary implements LLDictionary {
resolveSnapshot(snapshot),
true,
"getRangeKeysGrouped"
).flux().subscribeOn(dbScheduler).doFinally(s -> range.release());
),
LLLocalKeyPrefixReactiveRocksIterator::flux,
LLLocalKeyPrefixReactiveRocksIterator::release
)
.subscribeOn(dbScheduler)
.doFinally(s -> range.release());
}
private Flux<ByteBuf> getRangeKeysSingle(LLSnapshot snapshot, ByteBuf key) {
return this
.containsKey(snapshot, key.retain())
.filter(contains -> contains)
.map(contains -> key.retain())
return Mono
.defer(() -> this.containsKey(snapshot, key.retain()))
.flux()
.<ByteBuf>handle((contains, sink) -> {
if (contains) {
sink.next(key.retain());
} else {
sink.complete();
}
})
.doOnDiscard(ByteBuf.class, ReferenceCounted::release)
.doFinally(s -> key.release());
}
private Flux<ByteBuf> getRangeKeysMulti(LLSnapshot snapshot, LLRange range) {
return new LLLocalKeyReactiveRocksIterator(db, alloc, cfh, range.retain(), resolveSnapshot(snapshot))
.flux()
return Flux
.using(
() -> new LLLocalKeyReactiveRocksIterator(db, alloc, cfh, range.retain(), resolveSnapshot(snapshot)),
LLLocalReactiveRocksIterator::flux,
LLLocalReactiveRocksIterator::release
)
.doOnDiscard(ByteBuf.class, ReferenceCounted::release)
.subscribeOn(dbScheduler)
.doFinally(s -> range.release());
}
@Override
public Flux<Entry<ByteBuf, ByteBuf>> setRange(LLRange range,
Flux<Entry<ByteBuf, ByteBuf>> entries,
boolean getOldValues) {
Flux<Entry<ByteBuf, ByteBuf>> oldValues;
if (getOldValues) {
oldValues = getRange(null, range);
} else {
oldValues = Flux.empty();
}
return oldValues
.concatWith(Flux
.usingWhen(
Mono
.fromCallable(() -> new CappedWriteBatch(db,
public Mono<Void> setRange(LLRange range, Flux<Entry<ByteBuf, ByteBuf>> entries) {
if (USE_WRITE_BATCHES_IN_SET_RANGE) {
return entries
.window(MULTI_GET_WINDOW)
.flatMap(keysWindowFlux -> keysWindowFlux
.collectList()
.doOnDiscard(Entry.class, discardedEntry -> {
//noinspection unchecked
var entry = (Entry<ByteBuf, ByteBuf>) discardedEntry;
entry.getKey().release();
entry.getValue().release();
})
.flatMap(entriesList -> Mono
.<Void>fromCallable(() -> {
try {
if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) {
try (var batch = new CappedWriteBatch(db,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS)
)
.subscribeOn(dbScheduler),
writeBatch -> Mono
.fromCallable(() -> {
BATCH_WRITE_OPTIONS
)) {
if (range.isSingle()) {
writeBatch.delete(cfh, LLUtils.toArray(range.getSingle().retain()));
batch.delete(cfh, range.getSingle().retain());
} else {
deleteSmallRangeWriteBatch(writeBatch, range.retain());
deleteSmallRangeWriteBatch(batch, range.retain());
}
for (Entry<ByteBuf, ByteBuf> entry : entriesList) {
batch.put(cfh, entry.getKey().retain(), entry.getValue().retain());
}
batch.writeToDbAndClose();
}
} else {
try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) {
if (range.isSingle()) {
batch.delete(cfh, LLUtils.toArray(range.getSingle()));
} else {
deleteSmallRangeWriteBatch(batch, range.retain());
}
db.write(EMPTY_WRITE_OPTIONS, batch);
batch.clear();
}
try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) {
for (Entry<ByteBuf, ByteBuf> entry : entriesList) {
batch.put(cfh, LLUtils.toArray(entry.getKey()), LLUtils.toArray(entry.getValue()));
}
db.write(EMPTY_WRITE_OPTIONS, batch);
batch.clear();
}
}
return null;
})
.subscribeOn(dbScheduler)
.thenMany(entries)
.<Entry<ByteBuf, ByteBuf>>concatMap(newEntry -> this
.putEntryToWriteBatch(newEntry, false, writeBatch)
.then(Mono.empty())
),
writeBatch -> Mono
.fromCallable(() -> {
try (writeBatch) {
writeBatch.writeToDbAndClose();
} finally {
for (Entry<ByteBuf, ByteBuf> entry : entriesList) {
entry.getKey().release();
entry.getValue().release();
}
}
return null;
})
.subscribeOn(dbScheduler)
)
.subscribeOn(dbScheduler)
)
.then()
.doOnDiscard(Entry.class, discardedEntry -> {
//noinspection unchecked
var entry = (Entry<ByteBuf, ByteBuf>) discardedEntry;
entry.getKey().release();
entry.getValue().release();
})
.onErrorMap(cause -> new IOException("Failed to write range", cause))
.doFinally(s -> range.release())
);
.doFinally(s -> range.release());
} else {
return Flux
.defer(() -> this.getRange(null, range.retain(), false))
.flatMap(oldValue -> Mono
.<Void>fromCallable(() -> {
try {
dbDelete(cfh, EMPTY_WRITE_OPTIONS, oldValue.getKey().retain());
return null;
} finally {
oldValue.getKey().release();
oldValue.getValue().release();
}
})
.subscribeOn(dbScheduler)
)
.then(entries
.flatMap(entry -> this.put(entry.getKey(), entry.getValue(), LLDictionaryResultType.VOID))
.then(Mono.<Void>empty())
)
.onErrorMap(cause -> new IOException("Failed to write range", cause))
.doFinally(s -> range.release());
}
}
private void deleteSmallRangeWriteBatch(CappedWriteBatch writeBatch, LLRange range)
@ -996,13 +1097,13 @@ public class LLLocalDictionary implements LLDictionary {
if (range.hasMin()) {
minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain());
} else {
minBound = EMPTY_RELEASABLE_SLICE;
minBound = emptyReleasableSlice();
}
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain());
} else {
maxBound = EMPTY_RELEASABLE_SLICE;
maxBound = emptyReleasableSlice();
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
@ -1021,22 +1122,50 @@ public class LLLocalDictionary implements LLDictionary {
}
}
private void deleteSmallRangeWriteBatch(WriteBatch writeBatch, LLRange range)
throws RocksDBException {
var readOpts = getReadOptions(null);
readOpts.setFillCache(false);
ReleasableSlice minBound;
if (range.hasMin()) {
var arr = LLUtils.toArray(range.getMin());
var minSlice = new Slice(arr);
readOpts.setIterateLowerBound(minSlice);
minBound = new ReleasableSlice(minSlice, null, arr);
} else {
minBound = emptyReleasableSlice();
}
ReleasableSlice maxBound;
if (range.hasMax()) {
var arr = LLUtils.toArray(range.getMax());
var maxSlice = new Slice(arr);
readOpts.setIterateUpperBound(maxSlice);
maxBound = new ReleasableSlice(maxSlice, null, arr);
} else {
maxBound = emptyReleasableSlice();
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterator.seek(LLUtils.toArray(range.getMin()));
} else {
rocksIterator.seekToFirst();
}
while (rocksIterator.isValid()) {
writeBatch.delete(cfh, rocksIterator.key());
rocksIterator.next();
}
} finally {
minBound.release();
maxBound.release();
range.release();
}
}
private static void rocksIterSeekTo(RocksIterator rocksIterator, ByteBuf buffer) {
try {
ByteBuf directBuffer = buffer.retain();
ByteBuffer nioBuffer = LLUtils.toDirectFast(directBuffer.retain());
if (nioBuffer == null) {
directBuffer.release();
directBuffer = LLUtils.toDirectCopy(buffer.retain());
assert directBuffer.isDirect();
nioBuffer = directBuffer.nioBuffer();
}
try {
ByteBuffer nioBuffer = LLUtils.toDirect(buffer);
assert nioBuffer.isDirect();
rocksIterator.seek(nioBuffer);
} finally {
directBuffer.release();
}
} finally {
buffer.release();
}
@ -1044,15 +1173,8 @@ public class LLLocalDictionary implements LLDictionary {
private static ReleasableSlice setIterateBound(ReadOptions readOpts, IterateBound boundType, ByteBuf buffer) {
try {
ByteBuf directBuffer = buffer.retain();
ByteBuffer nioBuffer = LLUtils.toDirectFast(directBuffer.retain());
if (nioBuffer == null) {
directBuffer = LLUtils.toDirectCopy(buffer.retain());
assert directBuffer.isDirect();
nioBuffer = directBuffer.nioBuffer();
}
ByteBuffer nioBuffer = LLUtils.toDirect(buffer);
AbstractSlice<?> slice;
try {
assert nioBuffer.isDirect();
slice = new DirectSlice(nioBuffer);
if (boundType == IterateBound.LOWER) {
@ -1060,27 +1182,27 @@ public class LLLocalDictionary implements LLDictionary {
} else {
readOpts.setIterateUpperBound(slice);
}
} catch (Throwable t) {
directBuffer.release();
throw t;
}
return new ReleasableSlice(slice, directBuffer);
return new ReleasableSlice(slice, buffer.retain(), nioBuffer);
} finally {
buffer.release();
}
}
private static final ReleasableSlice EMPTY_RELEASABLE_SLICE = new ReleasableSlice(new Slice(new byte[0]), null) {
private static ReleasableSlice emptyReleasableSlice() {
var arr = new byte[0];
return new ReleasableSlice(new Slice(arr), null, arr) {
@Override
public void release() {
}
};
}
@Data
@AllArgsConstructor
public static class ReleasableSlice {
AbstractSlice<?> slice;
@Nullable ByteBuf byteBuf;
private @Nullable Object additionalData;
public void release() {
slice.clear();
@ -1163,13 +1285,13 @@ public class LLLocalDictionary implements LLDictionary {
if (range.hasMin()) {
minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain());
} else {
minBound = EMPTY_RELEASABLE_SLICE;
minBound = emptyReleasableSlice();
}
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain());
} else {
maxBound = EMPTY_RELEASABLE_SLICE;
maxBound = emptyReleasableSlice();
}
try {
if (fast) {
@ -1210,13 +1332,13 @@ public class LLLocalDictionary implements LLDictionary {
if (range.hasMin()) {
minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain());
} else {
minBound = EMPTY_RELEASABLE_SLICE;
minBound = emptyReleasableSlice();
}
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain());
} else {
maxBound = EMPTY_RELEASABLE_SLICE;
maxBound = emptyReleasableSlice();
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
@ -1224,10 +1346,18 @@ public class LLLocalDictionary implements LLDictionary {
} else {
rocksIterator.seekToFirst();
}
ByteBuf key;
if (rocksIterator.isValid()) {
key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
return Map.entry(key, LLUtils.readDirectNioBuffer(alloc, rocksIterator::value));
ByteBuf key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
try {
ByteBuf value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value);
try {
return Map.entry(key.retain(), value.retain());
} finally {
value.release();
}
} finally {
key.release();
}
} else {
return null;
}
@ -1249,13 +1379,13 @@ public class LLLocalDictionary implements LLDictionary {
if (range.hasMin()) {
minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain());
} else {
minBound = EMPTY_RELEASABLE_SLICE;
minBound = emptyReleasableSlice();
}
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain());
} else {
maxBound = EMPTY_RELEASABLE_SLICE;
maxBound = emptyReleasableSlice();
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
@ -1389,13 +1519,13 @@ public class LLLocalDictionary implements LLDictionary {
if (range.hasMin()) {
minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain());
} else {
minBound = EMPTY_RELEASABLE_SLICE;
minBound = emptyReleasableSlice();
}
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain());
} else {
maxBound = EMPTY_RELEASABLE_SLICE;
maxBound = emptyReleasableSlice();
}
try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
@ -1429,18 +1559,18 @@ public class LLLocalDictionary implements LLDictionary {
ReleasableSlice sliceMin;
ReleasableSlice sliceMax;
if (range.hasMin()) {
sliceMin = setIterateBound(readOptions, IterateBound.LOWER, range.getMin().retain());
sliceMin = setIterateBound(readOptions, IterateBound.LOWER, range.getMin().retainedSlice());
} else {
sliceMin = EMPTY_RELEASABLE_SLICE;
sliceMin = emptyReleasableSlice();
}
if (range.hasMax()) {
sliceMax = setIterateBound(readOptions, IterateBound.UPPER, range.getMax().retain());
sliceMax = setIterateBound(readOptions, IterateBound.UPPER, range.getMax().retainedSlice());
} else {
sliceMax = EMPTY_RELEASABLE_SLICE;
sliceMax = emptyReleasableSlice();
}
var rocksIterator = db.newIterator(cfh, readOptions);
if (!PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(rocksIterator, range.getMin().retain());
rocksIterSeekTo(rocksIterator, range.getMin().retainedSlice());
} else {
rocksIterator.seekToFirst();
}

View File

@ -21,6 +21,9 @@ public class LLLocalGroupedKeyReactiveRocksIterator extends LLLocalGroupedReacti
@Override
public ByteBuf getEntry(ByteBuf key, ByteBuf value) {
if (value != null) {
value.release();
}
return key;
}
}

View File

@ -7,8 +7,10 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
@ -49,8 +51,10 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
.generate(() -> {
var readOptions = new ReadOptions(this.readOptions);
readOptions.setFillCache(canFillCache && range.hasMin() && range.hasMax());
return getRocksIterator(readOptions, range, db, cfh);
return getRocksIterator(readOptions, range.retain(), db, cfh);
}, (tuple, sink) -> {
range.retain();
try {
var rocksIterator = tuple.getT1();
ObjectArrayList<T> values = new ObjectArrayList<>();
ByteBuf firstGroupKey = null;
@ -60,11 +64,16 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
ByteBuf key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
try {
if (firstGroupKey == null) {
firstGroupKey = key.retainedSlice();
} else if (!ByteBufUtil.equals(firstGroupKey, 0, key, 0, prefixLength)) {
firstGroupKey = key.retain();
} else if (!ByteBufUtil.equals(firstGroupKey, firstGroupKey.readerIndex(), key, key.readerIndex(), prefixLength)) {
break;
}
ByteBuf value = readValues ? LLUtils.readDirectNioBuffer(alloc, rocksIterator::value) : EMPTY_BUFFER;
ByteBuf value;
if (readValues) {
value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value);
} else {
value = DatabaseMapDictionaryDeep.EMPTY_BYTES;
}
try {
rocksIterator.next();
T entry = getEntry(key.retain(), value.retain());
@ -87,14 +96,20 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
sink.complete();
}
return tuple;
} finally {
range.release();
}
}, tuple -> {
var rocksIterator = tuple.getT1();
rocksIterator.close();
tuple.getT2().release();
tuple.getT3().release();
range.release();
});
}
public abstract T getEntry(ByteBuf key, ByteBuf value);
public void release() {
range.release();
}
}

View File

@ -51,6 +51,8 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
}
return LLLocalDictionary.getRocksIterator(readOptions, range.retain(), db, cfh);
}, (tuple, sink) -> {
range.retain();
try {
var rocksIterator = tuple.getT1();
ByteBuf firstGroupKey = null;
try {
@ -79,13 +81,18 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
}
}
return tuple;
} finally {
range.release();
}
}, tuple -> {
var rocksIterator = tuple.getT1();
rocksIterator.close();
tuple.getT2().release();
tuple.getT3().release();
range.release();
});
}
public void release() {
range.release();
}
}

View File

@ -19,6 +19,9 @@ public class LLLocalKeyReactiveRocksIterator extends LLLocalReactiveRocksIterato
@Override
public ByteBuf getEntry(ByteBuf key, ByteBuf value) {
if (value != null) {
value.release();
}
return key;
}
}

View File

@ -6,11 +6,16 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSlice;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksIterator;
import org.rocksdb.RocksMutableObject;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuple3;
import static io.netty.buffer.Unpooled.*;
public abstract class LLLocalReactiveRocksIterator<T> {
@ -46,11 +51,18 @@ public abstract class LLLocalReactiveRocksIterator<T> {
}
return getRocksIterator(readOptions, range.retain(), db, cfh);
}, (tuple, sink) -> {
range.retain();
try {
var rocksIterator = tuple.getT1();
if (rocksIterator.isValid()) {
ByteBuf key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
try {
ByteBuf value = readValues ? LLUtils.readDirectNioBuffer(alloc, rocksIterator::value) : EMPTY_BUFFER;
ByteBuf value;
if (readValues) {
value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value);
} else {
value = DatabaseMapDictionaryDeep.EMPTY_BYTES;
}
try {
rocksIterator.next();
sink.next(getEntry(key.retain(), value.retain()));
@ -64,6 +76,9 @@ public abstract class LLLocalReactiveRocksIterator<T> {
sink.complete();
}
return tuple;
} finally {
range.release();
}
}, tuple -> {
var rocksIterator = tuple.getT1();
rocksIterator.close();
@ -73,4 +88,8 @@ public abstract class LLLocalReactiveRocksIterator<T> {
}
public abstract T getEntry(ByteBuf key, ByteBuf value);
public void release() {
range.release();
}
}

View File

@ -72,7 +72,8 @@ public interface SerializerFixedBinaryLength<A, B> extends Serializer<A, B> {
try {
ByteBufUtil.writeUtf8(buf, deserialized);
if (buf.readableBytes() != getSerializedBinaryLength()) {
throw new SerializationException("Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to serialize an element with "
throw new SerializationException("Fixed serializer with " + getSerializedBinaryLength()
+ " bytes has tried to serialize an element with "
+ buf.readableBytes() + " bytes instead");
}
return buf.retain();

View File

@ -287,6 +287,6 @@ public class LuceneUtils {
DatabaseMapDictionaryDeep<T, Map<U, V>, DatabaseMapDictionary<U, V>> dictionaryDeep) {
return entry -> dictionaryDeep
.at(snapshot, entry.getKey())
.flatMap(sub -> sub.getValue(snapshot, entry.getValue()));
.flatMap(sub -> sub.getValue(snapshot, entry.getValue()).doFinally(s -> sub.release()));
}
}

View File

@ -17,6 +17,10 @@ import org.warp.commonutils.concurrency.atomicity.NotAtomic;
@NotAtomic
public class CappedWriteBatch extends WriteBatch {
/**
* Default: true, Use false to debug problems with direct buffers
*/
private static final boolean USE_FAST_DIRECT_BUFFERS = false;
private final RocksDB db;
private final int cap;
private final WriteOptions writeOptions;
@ -41,18 +45,21 @@ public class CappedWriteBatch extends WriteBatch {
private synchronized void flushIfNeeded(boolean force) throws RocksDBException {
if (this.count() >= (force ? 1 : cap)) {
db.write(writeOptions, this);
db.write(writeOptions, this.getWriteBatch());
this.clear();
releaseAllBuffers();
}
}
private synchronized void releaseAllBuffers() {
if (!buffersToRelease.isEmpty()) {
for (ByteBuf byteBuffer : buffersToRelease) {
assert byteBuffer.refCnt() > 0;
byteBuffer.release();
}
buffersToRelease.clear();
}
}
@Override
public synchronized int count() {
@ -84,33 +91,24 @@ public class CappedWriteBatch extends WriteBatch {
}
public synchronized void put(ColumnFamilyHandle columnFamilyHandle, ByteBuf key, ByteBuf value) throws RocksDBException {
if (USE_FAST_DIRECT_BUFFERS) {
buffersToRelease.add(key);
buffersToRelease.add(value);
ByteBuf keyDirectBuf = key.retain();
ByteBuffer keyNioBuffer = LLUtils.toDirectFast(keyDirectBuf.retain());
if (keyNioBuffer == null) {
keyDirectBuf.release();
keyDirectBuf = LLUtils.toDirectCopy(key.retain());
keyNioBuffer = keyDirectBuf.nioBuffer();
}
try {
ByteBuffer keyNioBuffer = LLUtils.toDirect(key);
assert keyNioBuffer.isDirect();
ByteBuf valueDirectBuf = value.retain();
ByteBuffer valueNioBuffer = LLUtils.toDirectFast(valueDirectBuf.retain());
if (valueNioBuffer == null) {
valueDirectBuf.release();
valueDirectBuf = LLUtils.toDirectCopy(value.retain());
valueNioBuffer = valueDirectBuf.nioBuffer();
}
try {
ByteBuffer valueNioBuffer = LLUtils.toDirect(value);
assert valueNioBuffer.isDirect();
super.put(columnFamilyHandle, keyNioBuffer, valueNioBuffer);
} else {
try {
byte[] keyArray = LLUtils.toArray(key);
byte[] valueArray = LLUtils.toArray(value);
super.put(columnFamilyHandle, keyArray, valueArray);
} finally {
buffersToRelease.add(valueDirectBuf);
key.release();
value.release();
}
} finally {
buffersToRelease.add(keyDirectBuf);
}
flushIfNeeded(false);
}
@ -154,15 +152,9 @@ public class CappedWriteBatch extends WriteBatch {
}
public synchronized void delete(ColumnFamilyHandle columnFamilyHandle, ByteBuf key) throws RocksDBException {
if (USE_FAST_DIRECT_BUFFERS) {
buffersToRelease.add(key);
ByteBuf keyDirectBuf = key.retain();
ByteBuffer keyNioBuffer = LLUtils.toDirectFast(keyDirectBuf.retain());
if (keyNioBuffer == null) {
keyDirectBuf.release();
keyDirectBuf = LLUtils.toDirectCopy(key.retain());
keyNioBuffer = keyDirectBuf.nioBuffer();
}
try {
ByteBuffer keyNioBuffer = LLUtils.toDirect(key);
assert keyNioBuffer.isDirect();
removeDirect(nativeHandle_,
keyNioBuffer,
@ -171,8 +163,12 @@ public class CappedWriteBatch extends WriteBatch {
columnFamilyHandle.nativeHandle_
);
keyNioBuffer.position(keyNioBuffer.limit());
} else {
try {
super.delete(columnFamilyHandle, LLUtils.toArray(key));
} finally {
buffersToRelease.add(keyDirectBuf);
key.release();
}
}
flushIfNeeded(false);
}
@ -248,11 +244,24 @@ public class CappedWriteBatch extends WriteBatch {
@Override
public synchronized WriteBatch getWriteBatch() {
return this;
return super.getWriteBatch();
}
public synchronized void writeToDbAndClose() throws RocksDBException {
try {
flushIfNeeded(true);
super.close();
} finally {
releaseAllBuffers();
}
}
public void flush() throws RocksDBException {
try {
flushIfNeeded(true);
} finally {
releaseAllBuffers();
}
}
@Override

View File

@ -1,4 +1,4 @@
package it.cavallium.dbengine.client;
package it.cavallium.dbengine;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLDictionary;
@ -67,6 +67,7 @@ public class DbTestUtils {
}).subscribeOn(Schedulers.boundedElastic()))
);
}
public static Mono<? extends LLDictionary> tempDictionary(LLKeyValueDatabase database, UpdateMode updateMode) {
return tempDictionary(database, "testmap", updateMode);
}

View File

@ -1,9 +1,11 @@
package it.cavallium.dbengine.client;
package it.cavallium.dbengine;
import static it.cavallium.dbengine.client.CompositeDatabasePartLocation.CompositeDatabasePartType.KV_DATABASE;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import it.cavallium.dbengine.client.CompositeDatabasePartLocation;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.UpdateMode;
@ -14,7 +16,6 @@ import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
@ -24,7 +25,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;

View File

@ -0,0 +1,31 @@
package it.cavallium.dbengine;
import static it.cavallium.dbengine.DbTestUtils.tempDb;
import static it.cavallium.dbengine.DbTestUtils.tempDictionary;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.UpdateMode;
import java.util.Arrays;
import java.util.stream.Stream;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import reactor.test.StepVerifier;
public class TestDictionary {
private static Stream<Arguments> provideArgumentsCreate() {
return Arrays.stream(UpdateMode.values()).map(Arguments::of);
}
@ParameterizedTest
@MethodSource("provideArgumentsCreate")
public void testCreate(UpdateMode updateMode) {
StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.flatMap(LLDictionary::clear)
.then()
))
.verifyComplete();
}
}

View File

@ -1,18 +1,14 @@
package it.cavallium.dbengine.client;
package it.cavallium.dbengine;
import static it.cavallium.dbengine.client.DbTestUtils.*;
import static it.cavallium.dbengine.DbTestUtils.*;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.UpdateMode;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.params.ParameterizedTest;
@ -21,38 +17,37 @@ import org.junit.jupiter.params.provider.MethodSource;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.test.StepVerifier.FirstStep;
import reactor.test.StepVerifier.Step;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuple4;
import reactor.util.function.Tuples;
public class TestDictionaryMap {
private static Stream<Arguments> provideArgumentsCreate() {
return Arrays.stream(UpdateMode.values()).map(Arguments::of);
private static boolean isTestBadKeysEnabled() {
return System.getProperty("badkeys", "true").equalsIgnoreCase("true");
}
@ParameterizedTest
@MethodSource("provideArgumentsCreate")
public void testCreate(UpdateMode updateMode) {
StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.flatMap(LLDictionary::clear)
.then()
))
.verifyComplete();
}
private static final String BIG_STRING
= "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "01234567890123456789012345678901234567890123456789012345678901234567890123456789";
private static Stream<Arguments> provideArgumentsPut() {
var goodKeys = Set.of("12345", "zebra");
var badKeys = Set.of("", "a", "aaaa", "aaaaaa");
Set<String> badKeys;
if (isTestBadKeysEnabled()) {
badKeys = Set.of("", "a", "aaaa", "aaaaaa");
} else {
badKeys = Set.of();
}
Set<Tuple2<String, Boolean>> keys = Stream.concat(
goodKeys.stream().map(s -> Tuples.of(s, false)),
badKeys.stream().map(s -> Tuples.of(s, true))
).collect(Collectors.toSet());
var values = Set.of("a", "", "\0", "\0\0", "z", "azzszgzczqz", "bzzazazqzeztzgzzhz!");
var values = Set.of("a", "", "\0", "\0\0", "z", "azzszgzczqz", BIG_STRING);
return keys
.stream()
@ -180,6 +175,9 @@ public class TestDictionaryMap {
@ParameterizedTest
@MethodSource("provideArgumentsPut")
public void testUpdate(UpdateMode updateMode, String key, String value, boolean shouldFail) {
if (updateMode == UpdateMode.DISALLOW && !isTestBadKeysEnabled()) {
return;
}
var stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryMap(dict, 5))
@ -219,6 +217,9 @@ public class TestDictionaryMap {
@ParameterizedTest
@MethodSource("provideArgumentsPut")
public void testUpdateGet(UpdateMode updateMode, String key, String value, boolean shouldFail) {
if (updateMode == UpdateMode.DISALLOW && !isTestBadKeysEnabled()) {
return;
}
var stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryMap(dict, 5))
@ -257,17 +258,17 @@ public class TestDictionaryMap {
@ParameterizedTest
@MethodSource("provideArgumentsPut")
public void testPutAndGetStatus(UpdateMode updateMode, String key, String value, boolean shouldFail) {
public void testPutAndGetChanged(UpdateMode updateMode, String key, String value, boolean shouldFail) {
var stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryMap(dict, 5))
.flatMapMany(map -> Flux
.concat(
map.putValueAndGetStatus(key, "error?").single(),
map.putValueAndGetStatus(key, value).single(),
map.putValueAndGetStatus(key, value).single(),
map.putValueAndGetChanged(key, "error?").single(),
map.putValueAndGetChanged(key, value).single(),
map.putValueAndGetChanged(key, value).single(),
map.remove(key),
map.putValueAndGetStatus(key, "error?").single()
map.putValueAndGetChanged(key, "error?").single()
)
.doFinally(s -> map.release())
)
@ -275,18 +276,23 @@ public class TestDictionaryMap {
if (shouldFail) {
stpVer.verifyError();
} else {
stpVer.expectNext(false, true, true, false).verifyComplete();
stpVer.expectNext(true, true, false, true).verifyComplete();
}
}
private static Stream<Arguments> provideArgumentsPutMulti() {
var goodKeys = Set.of(Set.of("12345", "67890"), Set.of("zebra"), Set.<String>of());
var badKeys = Set.of(Set.of("", "12345"), Set.of("12345", "a"), Set.of("45678", "aaaa"), Set.of("aaaaaa", "capra"));
Set<Set<String>> badKeys;
if (isTestBadKeysEnabled()) {
badKeys = Set.of(Set.of("", "12345"), Set.of("12345", "a"), Set.of("45678", "aaaa"), Set.of("aaaaaa", "capra"));
} else {
badKeys = Set.of();
}
Set<Tuple2<Set<String>, Boolean>> keys = Stream.concat(
goodKeys.stream().map(s -> Tuples.of(s, false)),
badKeys.stream().map(s -> Tuples.of(s, true))
).collect(Collectors.toSet());
var values = Set.of("a", "", "\0", "\0\0", "z", "azzszgzczqz", "bzzazazqzeztzgzzhz!");
var values = Set.of("a", "", "\0", "\0\0", "z", "azzszgzczqz", BIG_STRING);
return keys
.stream()
@ -406,7 +412,7 @@ public class TestDictionaryMap {
@ParameterizedTest
@MethodSource("provideArgumentsPutMulti")
public void testSetAndGetStatus(UpdateMode updateMode, Map<String, String> entries, boolean shouldFail) {
public void testSetAndGetChanged(UpdateMode updateMode, Map<String, String> entries, boolean shouldFail) {
var remainingEntries = new ConcurrentHashMap<Entry<String, String>, Boolean>().keySet(true);
Step<Boolean> stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)

View File

@ -0,0 +1,993 @@
package it.cavallium.dbengine;
import static it.cavallium.dbengine.DbTestUtils.tempDatabaseMapDictionaryDeepMap;
import static it.cavallium.dbengine.DbTestUtils.tempDb;
import static it.cavallium.dbengine.DbTestUtils.tempDictionary;
import it.cavallium.dbengine.database.UpdateMode;
import java.util.Arrays;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.test.StepVerifier.FirstStep;
import reactor.test.StepVerifier.Step;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuple4;
import reactor.util.function.Tuples;
public class TestDictionaryMapDeep {
private static boolean isTestBadKeysEnabled() {
return System.getProperty("badkeys", "true").equalsIgnoreCase("true");
}
private static final String BIG_STRING
= "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "01234567890123456789012345678901234567890123456789012345678901234567890123456789";
private static Stream<Arguments> provideArgumentsSet() {
var goodKeys = Set.of("12345", "zebra");
Set<String> badKeys;
if (isTestBadKeysEnabled()) {
badKeys = Set.of("", "a", "aaaa", "aaaaaa");
} else {
badKeys = Set.of();
}
Set<Tuple2<String, Boolean>> keys = Stream.concat(
goodKeys.stream().map(s -> Tuples.of(s, false)),
badKeys.stream().map(s -> Tuples.of(s, true))
).collect(Collectors.toSet());
var values = Set.of(
Map.of("123456", "a", "234567", ""),
Map.of("123456", "", "234567", "bb"),
Map.of("123456", "\0", "234567", "\0\0", "345678", BIG_STRING)
);
return keys
.stream()
.flatMap(keyTuple -> {
Stream<Map<String, String>> strm;
if (keyTuple.getT2()) {
strm = values.stream().limit(1);
} else {
strm = values.stream();
}
return strm.map(val -> Tuples.of(keyTuple.getT1(), val, keyTuple.getT2()));
})
.flatMap(entryTuple -> Arrays.stream(UpdateMode.values()).map(updateMode -> Tuples.of(updateMode,
entryTuple.getT1(),
entryTuple.getT2(),
entryTuple.getT3()
)))
.map(fullTuple -> Arguments.of(fullTuple.getT1(), fullTuple.getT2(), fullTuple.getT3(), fullTuple.getT4()));
}
private static Stream<Arguments> provideArgumentsPut() {
var goodKeys1 = Set.of("12345", "zebra");
Set<String> badKeys1;
if (isTestBadKeysEnabled()) {
badKeys1 = Set.of("", "a", "aaaa", "aaaaaa");
} else {
badKeys1 = Set.of();
}
var goodKeys2 = Set.of("123456", "anatra");
Set<String> badKeys2;
if (isTestBadKeysEnabled()) {
badKeys2 = Set.of("", "a", "aaaaa", "aaaaaaa");
} else {
badKeys2 = Set.of();
}
var values = Set.of("a", "", "\0", "\0\0", "z", "azzszgzczqz", BIG_STRING);
Flux<Tuple4<String, String, String, Boolean>> failOnKeys1 = Flux
.fromIterable(badKeys1)
.map(badKey1 -> Tuples.of(
badKey1,
goodKeys2.stream().findAny().orElseThrow(),
values.stream().findAny().orElseThrow(),
true
));
Flux<Tuple4<String, String, String, Boolean>> failOnKeys2 = Flux
.fromIterable(badKeys2)
.map(badKey2 -> Tuples.of(
goodKeys1.stream().findAny().orElseThrow(),
badKey2,
values.stream().findAny().orElseThrow(),
true
));
Flux<Tuple4<String, String, String, Boolean>> goodKeys1And2 = Flux
.fromIterable(values)
.map(value -> Tuples.of(
goodKeys1.stream().findAny().orElseThrow(),
goodKeys2.stream().findAny().orElseThrow(),
value,
false
));
Flux<Tuple4<String, String, String, Boolean>> keys1And2 = Flux
.concat(
goodKeys1And2,
failOnKeys1,
failOnKeys2
);
return keys1And2
.flatMap(entryTuple -> Flux
.fromArray(UpdateMode.values())
.map(updateMode -> Tuples.of(updateMode,
entryTuple.getT1(),
entryTuple.getT2(),
entryTuple.getT3(),
entryTuple.getT4()
))
)
.map(fullTuple -> Arguments.of(fullTuple.getT1(),
fullTuple.getT2(),
fullTuple.getT3(),
fullTuple.getT4(),
fullTuple.getT5()
))
.toStream();
}
@ParameterizedTest
@MethodSource("provideArgumentsSet")
public void testSetValueGetValue(UpdateMode updateMode, String key, Map<String, String> value, boolean shouldFail) {
var stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMap(map -> map
.putValue(key, value)
.then(map.getValue(null, key))
.doFinally(s -> map.release())
)
));
if (shouldFail) {
stpVer.verifyError();
} else {
stpVer.expectNext(value).verifyComplete();
}
}
@ParameterizedTest
@MethodSource("provideArgumentsSet")
public void testSetValueGetAllValues(UpdateMode updateMode,
String key,
Map<String, String> value,
boolean shouldFail) {
var stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> map
.putValue(key, value)
.thenMany(map.getAllValues(null))
.doFinally(s -> map.release())
)
));
if (shouldFail) {
stpVer.verifyError();
} else {
stpVer.expectNext(Map.entry(key, value)).verifyComplete();
}
}
@ParameterizedTest
@MethodSource("provideArgumentsSet")
public void testAtSetGetAllStagesGetAllValues(UpdateMode updateMode, String key, Map<String, String> value, boolean shouldFail) {
var remainingEntries = new ConcurrentHashMap<Tuple3<String, String, String>, Boolean>().keySet(true);
Step<Tuple3<String, String, String>> stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> map
.at(null, key)
.flatMap(v -> v
.set(value)
.doFinally(s -> v.release())
)
.then(map
.at(null, "capra")
.flatMap(v -> v
.set(Map.of("normal", "123", "ormaln", "456"))
.doFinally(s -> v.release())
)
)
.thenMany(map
.getAllStages(null)
.flatMap(v -> v.getValue()
.getAllValues(null)
.map(result -> Tuples.of(v.getKey(), result.getKey(), result.getValue()))
.doFinally(s -> v.getValue().release())
)
)
.doFinally(s -> map.release())
)
));
if (shouldFail) {
stpVer.verifyError();
} else {
value.forEach((k, v) -> remainingEntries.add(Tuples.of(key, k, v)));
remainingEntries.add(Tuples.of("capra", "normal", "123"));
remainingEntries.add(Tuples.of("capra", "ormaln", "456"));
for (Tuple3<String, String, String> ignored : remainingEntries) {
stpVer = stpVer.expectNextMatches(remainingEntries::remove);
}
stpVer.verifyComplete();
assert remainingEntries.isEmpty();
}
}
@ParameterizedTest
@MethodSource("provideArgumentsPut")
public void testAtPutValueAtGetValue(UpdateMode updateMode, String key1, String key2, String value, boolean shouldFail) {
var stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMap(map -> map
.at(null, key1).flatMap(v -> v.putValue(key2, value).doFinally(s -> v.release()))
.then(map.at(null, key1).flatMap(v -> v.getValue(null, key2).doFinally(s -> v.release())))
.doFinally(s -> map.release())
)
));
if (shouldFail) {
stpVer.verifyError();
} else {
stpVer.expectNext(value).verifyComplete();
}
}
@ParameterizedTest
@MethodSource("provideArgumentsSet")
public void testSetAndGetPrevious(UpdateMode updateMode, String key, Map<String, String> value, boolean shouldFail) {
var stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> Flux
.concat(
map
.putValueAndGetPrevious(key, Map.of("error?", "error."))
.defaultIfEmpty(Map.of("nothing", "nothing")),
map.putValueAndGetPrevious(key, value),
map.putValueAndGetPrevious(key, value)
)
.doFinally(s -> map.release())
)
));
if (shouldFail) {
stpVer.verifyError();
} else {
stpVer.expectNext(Map.of("nothing", "nothing"), Map.of("error?", "error.")).expectNext(value).verifyComplete();
}
}
@ParameterizedTest
@MethodSource("provideArgumentsPut")
public void testAtPutValueAndGetPrevious(UpdateMode updateMode, String key1, String key2, String value, boolean shouldFail) {
var stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> Flux
.concat(
map
.at(null, key1)
.flatMap(v -> v
.putValueAndGetPrevious(key2, "error?")
.doFinally(s -> v.release())
),
map
.at(null, key1)
.flatMap(v -> v
.putValueAndGetPrevious(key2, value)
.doFinally(s -> v.release())
),
map
.at(null, key1)
.flatMap(v -> v
.putValueAndGetPrevious(key2, value)
.doFinally(s -> v.release())
)
)
.doFinally(s -> map.release())
)
));
if (shouldFail) {
stpVer.verifyError();
} else {
stpVer.expectNext("error?", value).verifyComplete();
}
}
@ParameterizedTest
@MethodSource("provideArgumentsSet")
public void testSetValueRemoveAndGetPrevious(UpdateMode updateMode, String key, Map<String, String> value, boolean shouldFail) {
var stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> Flux
.concat(
map.removeAndGetPrevious(key),
map.putValue(key, value).then(map.removeAndGetPrevious(key)),
map.removeAndGetPrevious(key)
)
.doFinally(s -> map.release())
)
));
if (shouldFail) {
stpVer.verifyError();
} else {
stpVer.expectNext(value).verifyComplete();
}
}
@ParameterizedTest
@MethodSource("provideArgumentsPut")
public void testAtPutValueRemoveAndGetPrevious(UpdateMode updateMode, String key1, String key2, String value, boolean shouldFail) {
var stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> Flux
.concat(
map
.at(null, key1)
.flatMap(v -> v
.putValue(key2, "error?")
.then(v.removeAndGetPrevious(key2))
.doFinally(s -> v.release())
),
map
.at(null, key1)
.flatMap(v -> v
.putValue(key2, value)
.then(v.removeAndGetPrevious(key2))
.doFinally(s -> v.release())
),
map
.at(null, key1)
.flatMap(v -> v.removeAndGetPrevious(key2)
.doFinally(s -> v.release())
)
)
.doFinally(s -> map.release())
)
));
if (shouldFail) {
stpVer.verifyError();
} else {
stpVer.expectNext("error?", value).verifyComplete();
}
}
@ParameterizedTest
@MethodSource("provideArgumentsSet")
public void testSetValueRemoveAndGetStatus(UpdateMode updateMode, String key, Map<String, String> value, boolean shouldFail) {
var stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> Flux
.concat(
map.removeAndGetStatus(key),
map.putValue(key, value).then(map.removeAndGetStatus(key)),
map.removeAndGetStatus(key)
)
.doFinally(s -> map.release())
)
));
if (shouldFail) {
stpVer.verifyError();
} else {
stpVer.expectNext(false, true, false).verifyComplete();
}
}
@ParameterizedTest
@MethodSource("provideArgumentsPut")
public void testAtPutValueRemoveAndGetStatus(UpdateMode updateMode, String key1, String key2, String value, boolean shouldFail) {
var stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> Flux
.concat(
map
.at(null, key1)
.flatMap(v -> v
.putValue(key2, "error?")
.then(v.removeAndGetStatus(key2))
.doFinally(s -> v.release())
),
map
.at(null, key1)
.flatMap(v -> v
.putValue(key2, value)
.then(v.removeAndGetStatus(key2))
.doFinally(s -> v.release())
),
map
.at(null, key1)
.flatMap(v -> v.removeAndGetStatus(key2)
.doFinally(s -> v.release())
)
)
.doFinally(s -> map.release())
)
));
if (shouldFail) {
stpVer.verifyError();
} else {
stpVer.expectNext(true, true, false).verifyComplete();
}
}
@ParameterizedTest
@MethodSource("provideArgumentsSet")
public void testUpdate(UpdateMode updateMode, String key, Map<String, String> value, boolean shouldFail) {
if (updateMode != UpdateMode.ALLOW_UNSAFE && !isTestBadKeysEnabled()) {
return;
}
var stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> Flux
.concat(
map.updateValue(key, old -> {
assert old == null;
return Map.of("error?", "error.");
}),
map.updateValue(key, false, old -> {
assert Objects.equals(old, Map.of("error?", "error."));
return Map.of("error?", "error.");
}),
map.updateValue(key, true, old -> {
assert Objects.equals(old, Map.of("error?", "error."));
return Map.of("error?", "error.");
}),
map.updateValue(key, true, old -> {
assert Objects.equals(old, Map.of("error?", "error."));
return value;
}),
map.updateValue(key, true, old -> {
assert Objects.equals(old, value);
return value;
})
)
.doFinally(s -> map.release())
)
));
if (updateMode != UpdateMode.ALLOW_UNSAFE || shouldFail) {
stpVer.verifyError();
} else {
stpVer.expectNext(true, false, false, true, false).verifyComplete();
}
}
@ParameterizedTest
@MethodSource("provideArgumentsPut")
public void testAtUpdate(UpdateMode updateMode, String key1, String key2, String value, boolean shouldFail) {
if (updateMode == UpdateMode.DISALLOW && !isTestBadKeysEnabled()) {
return;
}
var stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> Flux
.concat(
map
.at(null, key1)
.flatMap(v -> v
.updateValue(key2, prev -> prev)
.doFinally(s -> v.release())
),
map
.at(null, key1)
.flatMap(v -> v
.updateValue(key2, prev -> value)
.doFinally(s -> v.release())
),
map
.at(null, key1)
.flatMap(v -> v
.updateValue(key2, prev -> value)
.doFinally(s -> v.release())
),
map
.at(null, key1)
.flatMap(v -> v
.updateValue(key2, prev -> null)
.doFinally(s -> v.release())
)
)
.doFinally(s -> map.release())
)
));
if (updateMode == UpdateMode.DISALLOW || shouldFail) {
stpVer.verifyError();
} else {
stpVer.expectNext(false, true, false, true).verifyComplete();
}
}
@ParameterizedTest
@MethodSource("provideArgumentsSet")
public void testUpdateGet(UpdateMode updateMode, String key, Map<String, String> value, boolean shouldFail) {
if (updateMode != UpdateMode.ALLOW_UNSAFE && !isTestBadKeysEnabled()) {
return;
}
var stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> Flux
.concat(
map.updateValue(key, old -> {
assert old == null;
return Map.of("error?", "error.");
}).then(map.getValue(null, key)),
map.updateValue(key, false, old -> {
assert Objects.equals(old, Map.of("error?", "error."));
return Map.of("error?", "error.");
}).then(map.getValue(null, key)),
map.updateValue(key, true, old -> {
assert Objects.equals(old, Map.of("error?", "error."));
return Map.of("error?", "error.");
}).then(map.getValue(null, key)),
map.updateValue(key, true, old -> {
assert Objects.equals(old, Map.of("error?", "error."));
return value;
}).then(map.getValue(null, key)),
map.updateValue(key, true, old -> {
assert Objects.equals(old, value);
return value;
}).then(map.getValue(null, key))
)
.doFinally(s -> map.release())
)
));
if (updateMode != UpdateMode.ALLOW_UNSAFE || shouldFail) {
stpVer.verifyError();
} else {
stpVer.expectNext(Map.of("error?", "error."), Map.of("error?", "error."), Map.of("error?", "error."), value, value).verifyComplete();
}
}
@ParameterizedTest
@MethodSource("provideArgumentsPut")
public void testAtUpdateGetValue(UpdateMode updateMode, String key1, String key2, String value, boolean shouldFail) {
if (updateMode == UpdateMode.DISALLOW && !isTestBadKeysEnabled()) {
return;
}
var stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> Flux
.concat(
map
.at(null, key1)
.flatMap(v -> v
.updateValue(key2, prev -> prev)
.then(v.getValue(null, key2))
.defaultIfEmpty("empty")
.doFinally(s -> v.release())
),
map
.at(null, key1)
.flatMap(v -> v
.updateValue(key2, prev -> value)
.then(v.getValue(null, key2))
.defaultIfEmpty("empty")
.doFinally(s -> v.release())
),
map
.at(null, key1)
.flatMap(v -> v
.updateValue(key2, prev -> value)
.then(v.getValue(null, key2))
.defaultIfEmpty("empty")
.doFinally(s -> v.release())
),
map
.at(null, key1)
.flatMap(v -> v
.updateValue(key2, prev -> null)
.then(v.getValue(null, key2))
.defaultIfEmpty("empty")
.doFinally(s -> v.release())
)
)
.doFinally(s -> map.release())
)
));
if (updateMode == UpdateMode.DISALLOW || shouldFail) {
stpVer.verifyError();
} else {
stpVer.expectNext("empty", value, value, "empty").verifyComplete();
}
}
@ParameterizedTest
@MethodSource("provideArgumentsSet")
public void testSetAndGetChanged(UpdateMode updateMode, String key, Map<String, String> value, boolean shouldFail) {
var stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> Flux
.concat(
map.putValueAndGetChanged(key, Map.of("error?", "error.")).single(),
map.putValueAndGetChanged(key, value).single(),
map.putValueAndGetChanged(key, value).single(),
map.remove(key),
map.putValueAndGetChanged(key, Map.of("error?", "error.")).single()
)
.doFinally(s -> map.release())
)
));
if (shouldFail) {
stpVer.verifyError();
} else {
stpVer.expectNext(true, true, false, true).verifyComplete();
}
}
private static Stream<Arguments> provideArgumentsSetMulti() {
var goodKeys = Set.of(Set.of("12345", "67890"), Set.of("zebra"), Set.<String>of());
Set<Set<String>> badKeys;
if (isTestBadKeysEnabled()) {
badKeys = Set.of(Set.of("", "12345"), Set.of("12345", "a"), Set.of("45678", "aaaa"), Set.of("aaaaaa", "capra"));
} else {
badKeys = Set.of();
}
Set<Tuple2<Set<String>, Boolean>> keys = Stream.concat(
goodKeys.stream().map(s -> Tuples.of(s, false)),
badKeys.stream().map(s -> Tuples.of(s, true))
).collect(Collectors.toSet());
var values = Set.of(
Map.of("123456", "a", "234567", ""),
Map.of("123456", "", "234567", "bb"),
Map.of("123456", "\0", "234567", "\0\0", "345678", BIG_STRING)
);
return keys
.stream()
.map(keyTuple -> keyTuple.mapT1(ks -> Flux
.zip(Flux.fromIterable(ks), Flux.fromIterable(values))
.collectMap(Tuple2::getT1, Tuple2::getT2)
.block()
))
.flatMap(entryTuple -> Arrays.stream(UpdateMode.values()).map(updateMode -> Tuples.of(updateMode,
entryTuple.getT1(),
entryTuple.getT2()
)))
.map(fullTuple -> Arguments.of(fullTuple.getT1(), fullTuple.getT2(), fullTuple.getT3()));
}
@ParameterizedTest
@MethodSource("provideArgumentsSetMulti")
public void testSetMultiGetMulti(UpdateMode updateMode, Map<String, Map<String, String>> entries, boolean shouldFail) {
var remainingEntries = new ConcurrentHashMap<Entry<String, Map<String, String>>, Boolean>().keySet(true);
Step<Entry<String, Map<String, String>>> stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> Flux
.concat(
map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()),
map.getMulti(null, Flux.fromIterable(entries.keySet()))
)
.doFinally(s -> map.release())
)
));
if (shouldFail) {
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
for (Entry<String, Map<String, String>> ignored : remainingEntries) {
stpVer = stpVer.expectNextMatches(remainingEntries::remove);
}
stpVer.verifyComplete();
}
}
@ParameterizedTest
@MethodSource("provideArgumentsSetMulti")
public void testSetAllValuesGetMulti(UpdateMode updateMode, Map<String, Map<String, String>> entries, boolean shouldFail) {
var remainingEntries = new ConcurrentHashMap<Entry<String, Map<String, String>>, Boolean>().keySet(true);
Step<Entry<String, Map<String, String>>> stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> map
.setAllValues(Flux.fromIterable(entries.entrySet()))
.thenMany(map.getMulti(null, Flux.fromIterable(entries.keySet())))
.doFinally(s -> map.release())
)
));
if (shouldFail) {
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
for (Entry<String, Map<String, String>> ignored : remainingEntries) {
stpVer = stpVer.expectNextMatches(remainingEntries::remove);
}
stpVer.verifyComplete();
}
}
@ParameterizedTest
@MethodSource("provideArgumentsSetMulti")
public void testSetAllValuesAndGetPrevious(UpdateMode updateMode, Map<String, Map<String, String>> entries, boolean shouldFail) {
var remainingEntries = new ConcurrentHashMap<Entry<String, Map<String, String>>, Boolean>().keySet(true);
Step<Entry<String, Map<String, String>>> stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> Flux
.concat(
map.setAllValuesAndGetPrevious(Flux.fromIterable(entries.entrySet())),
map.setAllValuesAndGetPrevious(Flux.fromIterable(entries.entrySet()))
)
.doFinally(s -> map.release())
)
));
if (shouldFail) {
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
for (Entry<String, Map<String, String>> ignored : remainingEntries) {
stpVer = stpVer.expectNextMatches(remainingEntries::remove);
}
stpVer.verifyComplete();
}
}
@ParameterizedTest
@MethodSource("provideArgumentsSetMulti")
public void testSetGetMulti(UpdateMode updateMode, Map<String, Map<String, String>> entries, boolean shouldFail) {
var remainingEntries = new ConcurrentHashMap<Entry<String, Map<String, String>>, Boolean>().keySet(true);
Step<Entry<String, Map<String, String>>> stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> Flux
.concat(
map.set(entries).then(Mono.empty()),
map.getMulti(null, Flux.fromIterable(entries.keySet()))
)
.doFinally(s -> map.release())
)
));
if (shouldFail) {
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
for (Entry<String, Map<String, String>> ignored : remainingEntries) {
stpVer = stpVer.expectNextMatches(remainingEntries::remove);
}
stpVer.verifyComplete();
}
}
@ParameterizedTest
@MethodSource("provideArgumentsSetMulti")
public void testSetAndGetStatus(UpdateMode updateMode, Map<String, Map<String, String>> entries, boolean shouldFail) {
Step<Boolean> stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> {
Mono<Void> removalMono;
if (entries.isEmpty()) {
removalMono = Mono.empty();
} else {
removalMono = map.remove(entries.keySet().stream().findAny().orElseThrow());
}
return Flux
.concat(
map.setAndGetChanged(entries).single(),
map.setAndGetChanged(entries).single(),
removalMono.then(Mono.empty()),
map.setAndGetChanged(entries).single()
)
.doFinally(s -> map.release());
})
));
if (shouldFail) {
stpVer.verifyError();
} else {
stpVer.expectNext(!entries.isEmpty(), false, !entries.isEmpty()).verifyComplete();
}
}
@ParameterizedTest
@MethodSource("provideArgumentsSetMulti")
public void testSetAndGetPrevious(UpdateMode updateMode, Map<String, Map<String, String>> entries, boolean shouldFail) {
var remainingEntries = new ConcurrentHashMap<Entry<String, Map<String, String>>, Boolean>().keySet(true);
Step<Entry<String, Map<String, String>>> stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> Flux
.concat(
map.setAndGetPrevious(entries),
map.setAndGetPrevious(entries)
)
.map(Map::entrySet)
.flatMap(Flux::fromIterable)
.doFinally(s -> map.release())
)
));
if (shouldFail) {
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
for (Entry<String, Map<String, String>> ignored : remainingEntries) {
stpVer = stpVer.expectNextMatches(remainingEntries::remove);
}
stpVer.verifyComplete();
}
}
@ParameterizedTest
@MethodSource("provideArgumentsSetMulti")
public void testSetClearAndGetPreviousGet(UpdateMode updateMode, Map<String, Map<String, String>> entries, boolean shouldFail) {
var remainingEntries = new ConcurrentHashMap<Entry<String, Map<String, String>>, Boolean>().keySet(true);
Step<Entry<String, Map<String, String>>> stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> Flux
.concat(map.set(entries).then(Mono.empty()), map.clearAndGetPrevious(), map.get(null))
.map(Map::entrySet)
.flatMap(Flux::fromIterable)
.doFinally(s -> map.release())
)
));
if (shouldFail) {
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
for (Entry<String, Map<String, String>> ignored : remainingEntries) {
stpVer = stpVer.expectNextMatches(remainingEntries::remove);
}
stpVer.verifyComplete();
}
}
@ParameterizedTest
@MethodSource("provideArgumentsSetMulti")
public void testSetMultiGetAllValues(UpdateMode updateMode, Map<String, Map<String, String>> entries, boolean shouldFail) {
var remainingEntries = new ConcurrentHashMap<Entry<String, Map<String, String>>, Boolean>().keySet(true);
Step<Entry<String, Map<String, String>>> stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> Flux
.concat(
map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()),
map.getAllValues(null)
)
.doFinally(s -> map.release())
)
));
if (shouldFail) {
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
for (Entry<String, Map<String, String>> ignored : remainingEntries) {
stpVer = stpVer.expectNextMatches(remainingEntries::remove);
}
stpVer.verifyComplete();
}
}
@ParameterizedTest
@MethodSource("provideArgumentsSetMulti")
public void testSetMultiGet(UpdateMode updateMode, Map<String, Map<String, String>> entries, boolean shouldFail) {
var remainingEntries = new ConcurrentHashMap<Entry<String, Map<String, String>>, Boolean>().keySet(true);
Step<Entry<String, Map<String, String>>> stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> Flux
.concat(
map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()),
map.get(null)
.map(Map::entrySet)
.flatMapMany(Flux::fromIterable)
)
.doFinally(s -> map.release())
)
));
if (shouldFail) {
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
for (Entry<String, Map<String, String>> ignored : remainingEntries) {
stpVer = stpVer.expectNextMatches(remainingEntries::remove);
}
stpVer.verifyComplete();
}
}
@ParameterizedTest
@MethodSource("provideArgumentsSetMulti")
public void testSetMultiGetAllStagesGet(UpdateMode updateMode, Map<String, Map<String, String>> entries, boolean shouldFail) {
var remainingEntries = new ConcurrentHashMap<Entry<String, Map<String, String>>, Boolean>().keySet(true);
Step<Entry<String, Map<String, String>>> stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> Flux
.concat(
map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()),
map
.getAllStages(null)
.flatMap(stage -> stage
.getValue()
.get(null)
.map(val -> Map.entry(stage.getKey(), val))
.doFinally(s -> stage.getValue().release())
)
)
.doFinally(s -> map.release())
)
));
if (shouldFail) {
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
for (Entry<String, Map<String, String>> ignored : remainingEntries) {
stpVer = stpVer.expectNextMatches(remainingEntries::remove);
}
stpVer.verifyComplete();
}
}
@ParameterizedTest
@MethodSource("provideArgumentsSetMulti")
public void testSetMultiIsEmpty(UpdateMode updateMode, Map<String, Map<String, String>> entries, boolean shouldFail) {
Step<Boolean> stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> Flux
.concat(
map.isEmpty(null),
map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()),
map.isEmpty(null)
)
.doFinally(s -> map.release())
)
));
if (shouldFail) {
stpVer.expectNext(true).verifyError();
} else {
stpVer.expectNext(true, entries.isEmpty()).verifyComplete();
}
}
@ParameterizedTest
@MethodSource("provideArgumentsSetMulti")
public void testSetMultiClear(UpdateMode updateMode, Map<String, Map<String, String>> entries, boolean shouldFail) {
Step<Boolean> stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> Flux
.concat(
map.isEmpty(null),
map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()),
map.isEmpty(null),
map.clear().then(Mono.empty()),
map.isEmpty(null)
)
.doFinally(s -> map.release())
)
));
if (shouldFail) {
stpVer.expectNext(true).verifyError();
} else {
stpVer.expectNext(true, entries.isEmpty(), true).verifyComplete();
}
}
}

View File

@ -1,6 +1,6 @@
package it.cavallium.dbengine.client;
package it.cavallium.dbengine;
import static it.cavallium.dbengine.client.DbTestUtils.tempDb;
import static it.cavallium.dbengine.DbTestUtils.tempDb;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.collections.DatabaseInt;
@ -9,8 +9,6 @@ import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import reactor.core.publisher.Mono;