Fix tests

This commit is contained in:
Andrea Cavalli 2023-02-24 00:18:02 +01:00
parent 3f88ff8f83
commit 1b83c95856
9 changed files with 717 additions and 1084 deletions

View File

@ -86,6 +86,8 @@ public class LLUtils {
private static final MethodHandle IS_IN_NON_BLOCKING_THREAD_MH; private static final MethodHandle IS_IN_NON_BLOCKING_THREAD_MH;
private static final Consumer<Object> NULL_CONSUMER = ignored -> {}; private static final Consumer<Object> NULL_CONSUMER = ignored -> {};
private static final Buf BUF_TRUE = Buf.wrap(new byte[] {(byte) 1});
private static final Buf BUF_FALSE = Buf.wrap(new byte[] {(byte) 0});
static { static {
for (int i1 = 0; i1 < 256; i1++) { for (int i1 = 0; i1 < 256; i1++) {
@ -124,6 +126,8 @@ public class LLUtils {
} }
public static boolean responseToBoolean(Buf response) { public static boolean responseToBoolean(Buf response) {
if (response == BUF_FALSE) return false;
if (response == BUF_TRUE) return true;
assert response.size() == 1; assert response.size() == 1;
return response.getBoolean(0); return response.getBoolean(0);
} }
@ -133,7 +137,7 @@ public class LLUtils {
} }
public static Buf booleanToResponseByteBuffer(boolean bool) { public static Buf booleanToResponseByteBuffer(boolean bool) {
return Buf.wrap(new byte[] {bool ? (byte) 1 : 0}); return bool ? BUF_TRUE : BUF_FALSE;
} }
@Nullable @Nullable

View File

@ -194,7 +194,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
} else { } else {
dictionary.setRange(range, value.entrySet().stream().map(this::serializeEntry), true); dictionary.setRange(range, value.entrySet().stream().map(this::serializeEntry), true);
} }
return prev; return prev != null && prev.isEmpty() ? null : prev;
} }
@Override @Override
@ -227,7 +227,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public U getValue(@Nullable CompositeSnapshot snapshot, T keySuffix) { public U getValue(@Nullable CompositeSnapshot snapshot, T keySuffix) {
var keySuffixBuf = serializeKeySuffixToKey(keySuffix); var keySuffixBuf = serializeKeySuffixToKey(keySuffix);
Buf value = dictionary.get(resolveSnapshot(snapshot), keySuffixBuf); Buf value = dictionary.get(resolveSnapshot(snapshot), keySuffixBuf);
return deserializeValue(keySuffix, BufDataInput.create(value)); return value != null ? deserializeValue(keySuffix, BufDataInput.create(value)) : null;
} }
@Override @Override
@ -307,7 +307,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
var keyMono = serializeKeySuffixToKey(keySuffix); var keyMono = serializeKeySuffixToKey(keySuffix);
var valueMono = serializeValue(value); var valueMono = serializeValue(value);
var oldValueBuf = dictionary.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE); var oldValueBuf = dictionary.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE);
var oldValue = deserializeValue(keySuffix, BufDataInput.create(oldValueBuf)); var oldValue = oldValueBuf != null ? deserializeValue(keySuffix, BufDataInput.create(oldValueBuf)) : null;
if (oldValue == null) { if (oldValue == null) {
return value != null; return value != null;
} else { } else {
@ -325,7 +325,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public U removeAndGetPrevious(T keySuffix) { public U removeAndGetPrevious(T keySuffix) {
var keyMono = serializeKeySuffixToKey(keySuffix); var keyMono = serializeKeySuffixToKey(keySuffix);
var valueBuf = dictionary.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE); var valueBuf = dictionary.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE);
return deserializeValue(keySuffix, BufDataInput.create(valueBuf)); return valueBuf != null ? deserializeValue(keySuffix, BufDataInput.create(valueBuf)) : null;
} }
@Override @Override

View File

@ -110,7 +110,8 @@ public class DatabaseMapDictionaryHashed<T, U, TH> implements DatabaseStageMap<T
@Override @Override
public Object2ObjectSortedMap<T, U> get(@Nullable CompositeSnapshot snapshot) { public Object2ObjectSortedMap<T, U> get(@Nullable CompositeSnapshot snapshot) {
var v = subDictionary.get(snapshot); var v = subDictionary.get(snapshot);
return v != null ? deserializeMap(v) : null; var result = v != null ? deserializeMap(v) : null;
return result != null && result.isEmpty() ? null : result;
} }
@Override @Override
@ -203,7 +204,8 @@ public class DatabaseMapDictionaryHashed<T, U, TH> implements DatabaseStageMap<T
@Override @Override
public Object2ObjectSortedMap<T, U> setAndGetPrevious(Object2ObjectSortedMap<T, U> value) { public Object2ObjectSortedMap<T, U> setAndGetPrevious(Object2ObjectSortedMap<T, U> value) {
var v = subDictionary.setAndGetPrevious(this.serializeMap(value)); var v = subDictionary.setAndGetPrevious(this.serializeMap(value));
return v != null ? deserializeMap(v) : null; var result = v != null ? deserializeMap(v) : null;
return result != null && result.isEmpty() ? null : result;
} }
@Override @Override

View File

@ -10,6 +10,7 @@ import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.KVSerializationFunction;
import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.utils.StreamUtils;
import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap; import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap;
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps;
@ -131,15 +132,20 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
Function<Entry<T, U>, @NotNull Entry<T, U>> entriesReplacer, Function<Entry<T, U>, @NotNull Entry<T, U>> entriesReplacer,
boolean smallRange) { boolean smallRange) {
if (canKeysChange) { if (canKeysChange) {
this.setAllValues(this.getAllValues(null, smallRange).map(entriesReplacer)); try (var values = this.getAllValues(null, smallRange)) {
this.setAllValues(values.map(entriesReplacer));
}
} else { } else {
this.getAllValues(null, smallRange).map(entriesReplacer) try (var values = this.getAllValues(null, smallRange).map(entriesReplacer)) {
.forEach(replacedEntry -> this.at(null, replacedEntry.getKey()).set(replacedEntry.getValue())); values.forEach(replacedEntry -> this.at(null, replacedEntry.getKey()).set(replacedEntry.getValue()));
}
} }
} }
default void replaceAll(Consumer<Entry<T, US>> entriesReplacer) { default void replaceAll(Consumer<Entry<T, US>> entriesReplacer) {
this.getAllStages(null, false).forEach(entriesReplacer); try (var stream = this.getAllStages(null, false)) {
stream.forEach(entriesReplacer);
}
} }
@Override @Override
@ -148,11 +154,15 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
if (value == null) { if (value == null) {
map = this.clearAndGetPrevious(); map = this.clearAndGetPrevious();
} else { } else {
map = this try (var stream = this.setAllValuesAndGetPrevious(value.entrySet().stream())) {
.setAllValuesAndGetPrevious(value.entrySet().stream()) map = stream.collect(Collectors.toMap(Entry::getKey,
.collect(Collectors.toMap(Entry::getKey, Entry::getValue, (a, b) -> a, Object2ObjectLinkedOpenHashMap::new)); Entry::getValue,
(a, b) -> a,
Object2ObjectLinkedOpenHashMap::new
));
}
} }
return map; return map != null && map.isEmpty() ? null : map;
} }
@Override @Override
@ -173,9 +183,12 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
SerializationFunction<@Nullable Object2ObjectSortedMap<T, U>, @Nullable Object2ObjectSortedMap<T, U>> updater) { SerializationFunction<@Nullable Object2ObjectSortedMap<T, U>, @Nullable Object2ObjectSortedMap<T, U>> updater) {
var updateMode = this.getUpdateMode(); var updateMode = this.getUpdateMode();
if (updateMode == UpdateMode.ALLOW_UNSAFE) { if (updateMode == UpdateMode.ALLOW_UNSAFE) {
Object2ObjectSortedMap<T, U> v = this Object2ObjectSortedMap<T, U> v;
.getAllValues(null, true)
.collect(Collectors.toMap(Entry::getKey, Entry::getValue, (a, b) -> a, Object2ObjectLinkedOpenHashMap::new)); try (var stream = this.getAllValues(null, true)) {
v = stream
.collect(Collectors.toMap(Entry::getKey, Entry::getValue, (a, b) -> a, Object2ObjectLinkedOpenHashMap::new));
}
if (v.isEmpty()) { if (v.isEmpty()) {
v = null; v = null;
@ -212,7 +225,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
@Override @Override
default long leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) { default long leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) {
return this.getAllStages(snapshot, false).count(); return StreamUtils.countClose(this.getAllStages(snapshot, false));
} }
/** /**

View File

@ -77,12 +77,14 @@ import org.rocksdb.InfoLogLevel;
import org.rocksdb.IngestExternalFileOptions; import org.rocksdb.IngestExternalFileOptions;
import org.rocksdb.OptimisticTransactionDB; import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.PersistentCache; import org.rocksdb.PersistentCache;
import org.rocksdb.PlainTableConfig;
import org.rocksdb.PrepopulateBlobCache; import org.rocksdb.PrepopulateBlobCache;
import org.rocksdb.RocksDB; import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot; import org.rocksdb.Snapshot;
import org.rocksdb.Statistics; import org.rocksdb.Statistics;
import org.rocksdb.StatsLevel; import org.rocksdb.StatsLevel;
import org.rocksdb.TableFormatConfig;
import org.rocksdb.TickerType; import org.rocksdb.TickerType;
import org.rocksdb.TransactionDB; import org.rocksdb.TransactionDB;
import org.rocksdb.TransactionDBOptions; import org.rocksdb.TransactionDBOptions;
@ -303,7 +305,7 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa
columnFamilyOptions.setCompressionPerLevel(compressionTypes); columnFamilyOptions.setCompressionPerLevel(compressionTypes);
} }
final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig(); final TableFormatConfig tableOptions = inMemory ? new PlainTableConfig() : new BlockBasedTableConfig();
if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) { if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) {
if (!databaseOptions.lowMemory()) { if (!databaseOptions.lowMemory()) {
// tableOptions.setOptimizeFiltersForMemory(true); // tableOptions.setOptimizeFiltersForMemory(true);
@ -313,7 +315,9 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa
if (columnOptions.writeBufferSize().isPresent()) { if (columnOptions.writeBufferSize().isPresent()) {
columnFamilyOptions.setWriteBufferSize(columnOptions.writeBufferSize().get()); columnFamilyOptions.setWriteBufferSize(columnOptions.writeBufferSize().get());
} }
tableOptions.setVerifyCompression(false); if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) {
blockBasedTableConfig.setVerifyCompression(false);
}
if (columnOptions.filter().isPresent()) { if (columnOptions.filter().isPresent()) {
var filterOptions = columnOptions.filter().get(); var filterOptions = columnOptions.filter().get();
@ -322,9 +326,13 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa
// If OptimizeFiltersForHits == false: memory size = bitsPerKey * totalKeys // If OptimizeFiltersForHits == false: memory size = bitsPerKey * totalKeys
final BloomFilter bloomFilter = new BloomFilter(bloomFilterOptions.bitsPerKey()); final BloomFilter bloomFilter = new BloomFilter(bloomFilterOptions.bitsPerKey());
refs.track(bloomFilter); refs.track(bloomFilter);
tableOptions.setFilterPolicy(bloomFilter); if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) {
blockBasedTableConfig.setFilterPolicy(bloomFilter);
}
} else if (filterOptions instanceof NoFilter) { } else if (filterOptions instanceof NoFilter) {
tableOptions.setFilterPolicy(null); if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) {
blockBasedTableConfig.setFilterPolicy(null);
}
} }
} }
boolean cacheIndexAndFilterBlocks = columnOptions.cacheIndexAndFilterBlocks() boolean cacheIndexAndFilterBlocks = columnOptions.cacheIndexAndFilterBlocks()
@ -340,39 +348,44 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa
columnFamilyOptions.setMaxWriteBufferNumber(4); columnFamilyOptions.setMaxWriteBufferNumber(4);
} }
} }
tableOptions if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) {
// http://rocksdb.org/blog/2018/08/23/data-block-hash-index.html blockBasedTableConfig
.setDataBlockIndexType(DataBlockIndexType.kDataBlockBinaryAndHash) // http://rocksdb.org/blog/2018/08/23/data-block-hash-index.html
// http://rocksdb.org/blog/2018/08/23/data-block-hash-index.html .setDataBlockIndexType(DataBlockIndexType.kDataBlockBinaryAndHash)
.setDataBlockHashTableUtilRatio(0.75) // http://rocksdb.org/blog/2018/08/23/data-block-hash-index.html
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters .setDataBlockHashTableUtilRatio(0.75)
.setPinTopLevelIndexAndFilter(true) // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters .setPinTopLevelIndexAndFilter(true)
.setPinL0FilterAndIndexBlocksInCache(true) // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters .setPinL0FilterAndIndexBlocksInCache(true)
.setCacheIndexAndFilterBlocksWithHighPriority(true) // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.setCacheIndexAndFilterBlocks(cacheIndexAndFilterBlocks) .setCacheIndexAndFilterBlocksWithHighPriority(true)
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters .setCacheIndexAndFilterBlocks(cacheIndexAndFilterBlocks)
// Enabling partition filters increase the reads by 2x // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.setPartitionFilters(columnOptions.partitionFilters().orElse(false)) // Enabling partition filters increase the reads by 2x
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters .setPartitionFilters(columnOptions.partitionFilters().orElse(false))
.setIndexType(columnOptions.partitionFilters().orElse(false) ? IndexType.kTwoLevelIndexSearch : IndexType.kBinarySearch) // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.setChecksumType(ChecksumType.kXXH3) .setIndexType(columnOptions.partitionFilters().orElse(false) ? IndexType.kTwoLevelIndexSearch : IndexType.kBinarySearch)
// Spinning disks: 64KiB to 256KiB (also 512KiB). SSDs: 16KiB .setChecksumType(ChecksumType.kXXH3)
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks // Spinning disks: 64KiB to 256KiB (also 512KiB). SSDs: 16KiB
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
.setBlockSize(columnOptions.blockSize().orElse((databaseOptions.spinning() ? 128 : 16) * 1024)) // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
.setBlockCacheCompressed(optionsWithCache.compressedCache()) .setBlockSize(columnOptions.blockSize().orElse((databaseOptions.spinning() ? 128 : 16) * 1024))
.setBlockCache(optionsWithCache.standardCache()) .setBlockCacheCompressed(optionsWithCache.compressedCache())
.setPersistentCache(resolvePersistentCache(persistentCaches, .setBlockCache(optionsWithCache.standardCache())
rocksdbOptions, .setPersistentCache(resolvePersistentCache(persistentCaches,
databaseOptions.persistentCaches(), rocksdbOptions,
columnOptions.persistentCacheId(), databaseOptions.persistentCaches(),
refs, columnOptions.persistentCacheId(),
rocksLogger refs,
)); rocksLogger
));
}
columnFamilyOptions.setTableFormatConfig(tableOptions); columnFamilyOptions.setTableFormatConfig(tableOptions);
if (inMemory) {
columnFamilyOptions.useFixedLengthPrefixExtractor(3);
}
columnFamilyOptions.setCompactionPriority(CompactionPriority.MinOverlappingRatio); columnFamilyOptions.setCompactionPriority(CompactionPriority.MinOverlappingRatio);
if (columnOptions.filter().isPresent()) { if (columnOptions.filter().isPresent()) {
var filterOptions = columnOptions.filter().get(); var filterOptions = columnOptions.filter().get();

View File

@ -2,6 +2,8 @@ package it.cavallium.dbengine.utils;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import com.google.common.collect.Streams; import com.google.common.collect.Streams;
import it.cavallium.dbengine.database.SubStageEntry;
import it.cavallium.dbengine.database.collections.DatabaseStage;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
@ -88,6 +90,12 @@ public class StreamUtils {
} }
} }
public static <X> long countClose(Stream<X> stream) {
try (stream) {
return stream.count();
}
}
private record BatchSpliterator<E>(Spliterator<E> base, int batchSize) implements Spliterator<List<E>> { private record BatchSpliterator<E>(Spliterator<E> base, int batchSize) implements Spliterator<List<E>> {
@Override @Override

View File

@ -1,18 +1,34 @@
package it.cavallium.dbengine.tests; package it.cavallium.dbengine.tests;
import static it.cavallium.dbengine.tests.DbTestUtils.*; import static it.cavallium.dbengine.tests.DbTestUtils.*;
import static it.cavallium.dbengine.utils.StreamUtils.toListClose;
import com.google.common.collect.Streams;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.utils.StreamUtils;
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap;
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.MethodSource;
@ -147,50 +163,39 @@ public abstract class TestDictionaryMap {
} }
} }
/*
@ParameterizedTest @ParameterizedTest
@MethodSource("provideArgumentsPut") @MethodSource("provideArgumentsPut")
public void testPutValueRemoveAndGetPrevious(MapType mapType, UpdateMode updateMode, String key, String value, boolean shouldFail) { public void testPutValueRemoveAndGetPrevious(MapType mapType, UpdateMode updateMode, String key, String value, boolean shouldFail) {
var stpVer = StepVerifier var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> {
.create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5);
.map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) var x1 = map.removeAndGetPrevious(key);
.flatMapMany(map -> Flux map.putValue(key, value);
.concat( var x2 = map.removeAndGetPrevious(key);
map.removeAndGetPrevious(key), var x3 = map.removeAndGetPrevious(key);
map.putValue(key, value).then(map.removeAndGetPrevious(key)), return Arrays.asList(x1, x2, x3);
map.removeAndGetPrevious(key) }));
)
.doFinally(s -> map.close())
)
));
if (shouldFail) { if (shouldFail) {
this.checkLeaks = false; this.checkLeaks = false;
stpVer.verifyError();
} else { } else {
stpVer.expectNext(value).verifyComplete(); Assertions.assertEquals(Arrays.asList(null, value, null), stpVer);
} }
} }
@ParameterizedTest @ParameterizedTest
@MethodSource("provideArgumentsPut") @MethodSource("provideArgumentsPut")
public void testPutValueRemoveAndGetStatus(MapType mapType, UpdateMode updateMode, String key, String value, boolean shouldFail) { public void testPutValueRemoveAndGetStatus(MapType mapType, UpdateMode updateMode, String key, String value, boolean shouldFail) {
var stpVer = StepVerifier var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> {
.create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5);
.map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) var x1 = map.removeAndGetStatus(key);
.flatMapMany(map -> Flux map.putValue(key, value);
.concat( var x2 = map.removeAndGetStatus(key);
map.removeAndGetStatus(key), var x3 = map.removeAndGetStatus(key);
map.putValue(key, value).then(map.removeAndGetStatus(key)), return Stream.of(x1, x2, x3).toList();
map.removeAndGetStatus(key) }));
)
.doFinally(s -> map.close())
)
));
if (shouldFail) { if (shouldFail) {
this.checkLeaks = false; this.checkLeaks = false;
stpVer.verifyError();
} else { } else {
stpVer.expectNext(false, true, false).verifyComplete(); Assertions.assertEquals(Arrays.asList(false, true, false), stpVer);
} }
} }
@ -200,39 +205,27 @@ public abstract class TestDictionaryMap {
if (updateMode == UpdateMode.DISALLOW && !isTestBadKeysEnabled()) { if (updateMode == UpdateMode.DISALLOW && !isTestBadKeysEnabled()) {
return; return;
} }
var stpVer = StepVerifier var stpVer = run(updateMode == UpdateMode.DISALLOW || shouldFail, () -> tempDb(getTempDbGenerator(), db -> {
.create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5);
.map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) return Arrays.asList(map.updateValue(key, old -> {
.flatMapMany(map -> Flux Assertions.assertNull(old);
.concat( return "error?";
map.updateValue(key, old -> { }), map.updateValue(key, old -> {
Assertions.assertNull(old); Assertions.assertEquals("error?", old);
return "error?"; return "error?";
}), }), map.updateValue(key, old -> {
map.updateValue(key, old -> { Assertions.assertEquals("error?", old);
Assertions.assertEquals("error?", old); return "error?";
return "error?"; }), map.updateValue(key, old -> {
}), Assertions.assertEquals("error?", old);
map.updateValue(key, old -> { return value;
Assertions.assertEquals("error?", old); }), map.updateValue(key, old -> {
return "error?"; Assertions.assertEquals(value, old);
}), return value;
map.updateValue(key, old -> { }));
Assertions.assertEquals("error?", old); }));
return value; if (updateMode != UpdateMode.DISALLOW && !shouldFail) {
}), Assertions.assertEquals(Arrays.asList(true, false, false, true, false), stpVer);
map.updateValue(key, old -> {
Assertions.assertEquals(value, old);
return value;
})
)
.doFinally(s -> map.close())
)
));
if (updateMode == UpdateMode.DISALLOW || shouldFail) {
stpVer.verifyError();
} else {
stpVer.expectNext(true, false, false, true, false).verifyComplete();
} }
} }
@ -242,103 +235,70 @@ public abstract class TestDictionaryMap {
if (updateMode == UpdateMode.DISALLOW && !isTestBadKeysEnabled()) { if (updateMode == UpdateMode.DISALLOW && !isTestBadKeysEnabled()) {
return; return;
} }
var stpVer = StepVerifier var stpVer = run(updateMode == UpdateMode.DISALLOW || shouldFail, () -> tempDb(getTempDbGenerator(), db -> {
.create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5);
.map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) log.debug("1. Updating value: {}", key);
.flatMapMany(map -> Flux map.updateValue(key, old -> {
.concat( assert old == null;
Mono return "error?";
.fromRunnable(() -> log.debug("1. Updating value: {}", key)) });
.then(map.updateValue(key, old -> { log.debug("1. Getting value: {}", key);
assert old == null; var x2 = map.getValue(null, key);
return "error?";
}))
.doOnSuccess(s -> log.debug("1. Getting value: {}", key))
.then(map.getValue(null, key)),
Mono log.debug("2. Updating value: {}", key);
.fromRunnable(() -> log.debug("2. Updating value: {}", key)) map.updateValue(key, old -> {
.then(map.updateValue(key, old -> { assert Objects.equals(old, "error?");
assert Objects.equals(old, "error?"); return "error?";
return "error?"; });
})) log.debug("2. Getting value: {}", key);
.doOnSuccess(s -> log.debug("2. Getting value: {}", key)) var x3 = map.getValue(null, key);
.then(map.getValue(null, key)),
Mono log.debug("3. Updating value: {}", key);
.fromRunnable(() -> log.debug("3. Updating value: {}", key)) map.updateValue(key, old -> {
.then(map.updateValue(key, old -> { assert Objects.equals(old, "error?");
assert Objects.equals(old, "error?"); return "error?";
return "error?"; });
})) log.debug("3. Getting value: {}", key);
.doOnSuccess(s -> log.debug("3. Getting value: {}", key)) var x4 = map.getValue(null, key);
.then(map.getValue(null, key)),
Mono log.debug("4. Updating value: {}", key);
.fromRunnable(() -> log.debug("4. Updating value: {}", key)) map.updateValue(key, old -> {
.then(map.updateValue(key, old -> { assert Objects.equals(old, "error?");
assert Objects.equals(old, "error?"); return value;
return value; });
})) log.debug("4. Getting value: {}", key);
.doOnSuccess(s -> log.debug("4. Getting value: {}", key)) var x5 = map.getValue(null, key);
.then(map.getValue(null, key)),
Mono log.debug("5. Updating value: {}", key);
.fromRunnable(() -> log.debug("5. Updating value: {}", key)) map.updateValue(key, old -> {
.then(map.updateValue(key, old -> { assert Objects.equals(old, value);
assert Objects.equals(old, value); return value;
return value; });
})) log.debug("5. Getting value: {}", key);
.doOnSuccess(s -> log.debug("5. Getting value: {}", key)) var x6 = map.getValue(null, key);
.then(map.getValue(null, key)) return Arrays.asList(x2, x3, x4, x5, x6);
) }));
.doFinally(s -> map.close()) if (updateMode != UpdateMode.DISALLOW && !shouldFail) {
) Assertions.assertEquals(Arrays.asList("error?", "error?", "error?", value, value), stpVer);
));
if (updateMode == UpdateMode.DISALLOW || shouldFail) {
stpVer.verifyError();
} else {
stpVer.expectNext("error?", "error?", "error?", value, value).verifyComplete();
} }
} }
@Test
public void testUpdateGetWithCancel() {
tempDb(getTempDbGenerator(), allocator, db -> {
var mapMono = tempDictionary(db, UpdateMode.ALLOW)
.map(dict -> tempDatabaseMapDictionaryMap(dict, MapType.MAP, 5));
var keys = Flux.range(10, 89).map(n -> "key" + n).repeat(100);
return Mono.usingWhen(mapMono,
map -> keys.flatMap(key -> map.updateValue(key, prevValue -> key + "-val")).then(),
LLUtils::finalizeResource
);
}).take(50).blockLast();
}
@ParameterizedTest @ParameterizedTest
@MethodSource("provideArgumentsPut") @MethodSource("provideArgumentsPut")
public void testPutAndGetChanged(MapType mapType, UpdateMode updateMode, String key, String value, boolean shouldFail) { public void testPutAndGetChanged(MapType mapType, UpdateMode updateMode, String key, String value, boolean shouldFail) {
var stpVer = StepVerifier var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> {
.create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5);
.map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) var x1 = map.putValueAndGetChanged(key, "error?");
.flatMapMany(map -> Flux var x2 = map.putValueAndGetChanged(key, value);
.concat( var x3 = map.putValueAndGetChanged(key, value);
map.putValueAndGetChanged(key, "error?").single(), map.remove(key);
map.putValueAndGetChanged(key, value).single(), var x4 = map.putValueAndGetChanged(key, "error?");
map.putValueAndGetChanged(key, value).single(), return Arrays.asList(x1, x2, x3, x4);
map.remove(key), }));
map.putValueAndGetChanged(key, "error?").single()
)
.doFinally(s -> map.close())
)
));
if (shouldFail) { if (shouldFail) {
this.checkLeaks = false; this.checkLeaks = false;
stpVer.verifyError();
} else { } else {
stpVer.expectNext(true, true, false, true).verifyComplete(); Assertions.assertEquals(Arrays.asList(true, true, false, true), stpVer);
} }
} }
@ -357,22 +317,21 @@ public abstract class TestDictionaryMap {
return keys return keys
.stream() .stream()
.map(keyTuple -> keyTuple.mapT1(ks -> Flux .map(keyTuple -> new Tuple2<>(Streams
.zip(Flux.fromIterable(ks), Flux.fromIterable(values)) .zip(keyTuple.getT1().stream(), values.stream(), Tuple2::new)
.collectMap(Tuple2::getT1, Tuple2::getT2, Object2ObjectLinkedOpenHashMap::new) .collect(Collectors.toMap(Tuple2::getT1,
.block() Tuple2::getT2,
)) (a, b) -> a,
.flatMap(entryTuple -> Arrays.stream(UpdateMode.values()).map(updateMode -> new Tuple2<>(updateMode, it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap::new
)), keyTuple.getT2()))
.flatMap(entryTuple -> Arrays
.stream(UpdateMode.values())
.map(updateMode -> new Tuple3<>(updateMode, entryTuple.getT1(), entryTuple.getT2())))
.flatMap(entryTuple -> Stream.of(new Tuple4<>(MapType.MAP,
entryTuple.getT1(), entryTuple.getT1(),
entryTuple.getT2()
)))
.flatMap(entryTuple -> Stream.of(new Tuple2<>(MapType.MAP, entryTuple.getT1(),
entryTuple.getT2(), entryTuple.getT2(),
entryTuple.getT3() entryTuple.getT3()
), new Tuple2<>(MapType.HASH_MAP, entryTuple.getT1(), ), new Tuple4<>(MapType.HASH_MAP, entryTuple.getT1(), entryTuple.getT2(), false)))
entryTuple.getT2(),
false
)))
.filter(tuple -> !(tuple.getT1() == MapType.HASH_MAP && tuple.getT2() != UpdateMode.ALLOW)) .filter(tuple -> !(tuple.getT1() == MapType.HASH_MAP && tuple.getT2() != UpdateMode.ALLOW))
.map(fullTuple -> Arguments.of(fullTuple.getT1(), fullTuple.getT2(), fullTuple.getT3(), fullTuple.getT4())); .map(fullTuple -> Arguments.of(fullTuple.getT1(), fullTuple.getT2(), fullTuple.getT3(), fullTuple.getT4()));
} }
@ -380,88 +339,77 @@ public abstract class TestDictionaryMap {
@ParameterizedTest @ParameterizedTest
@MethodSource("provideArgumentsPutMulti") @MethodSource("provideArgumentsPutMulti")
public void testPutMultiGetMulti(MapType mapType, UpdateMode updateMode, Object2ObjectSortedMap<String, String> entries, boolean shouldFail) { public void testPutMultiGetMulti(MapType mapType, UpdateMode updateMode, Object2ObjectSortedMap<String, String> entries, boolean shouldFail) {
var remainingEntries = new ConcurrentHashMap<Entry<String, String>, Boolean>().keySet(true); var remainingEntries = new ArrayList<Entry<String, String>>();
Step<Entry<String, String>> stpVer = StepVerifier var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> {
.create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5);
.map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5))
.flatMapMany(map -> {
var entriesFlux = Flux.fromIterable(entries.entrySet());
var keysFlux = entriesFlux.map(Entry::getKey);
var resultsFlux = Flux
.concat(
map.putMulti(entriesFlux).then(Mono.empty()),
map.getMulti(null, keysFlux)
);
return Flux.zip(keysFlux, resultsFlux, Map::entry).doFinally(s -> map.close());
})
.filter(entry -> entry.getValue().isPresent()) var entriesFlux = entries.entrySet();
.map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) var keysFlux = entriesFlux.stream().map(Entry::getKey).toList();
)); map.putMulti(entriesFlux.stream());
List<Optional<String>> results;
try (var resultsFlux = map.getMulti(null, keysFlux.stream())) {
results = resultsFlux.toList();
}
return Streams
.zip(keysFlux.stream(), results.stream(), Map::entry)
.filter(entry -> entry.getValue().isPresent())
.map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow()))
.toList();
}));
if (shouldFail) { if (shouldFail) {
this.checkLeaks = false; this.checkLeaks = false;
stpVer.verifyError();
} else { } else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
for (Entry<String, String> ignored : remainingEntries) { Assertions.assertEquals(remainingEntries, stpVer);
stpVer = stpVer.expectNextMatches(remainingEntries::remove);
}
stpVer.verifyComplete();
} }
} }
@ParameterizedTest @ParameterizedTest
@MethodSource("provideArgumentsPutMulti") @MethodSource("provideArgumentsPutMulti")
public void testSetAllValuesGetMulti(MapType mapType, UpdateMode updateMode, Object2ObjectSortedMap<String, String> entries, boolean shouldFail) { public void testSetAllValuesGetMulti(MapType mapType, UpdateMode updateMode, Object2ObjectSortedMap<String, String> entries, boolean shouldFail) {
var remainingEntries = new ConcurrentHashMap<Entry<String, String>, Boolean>().keySet(true); var remainingEntries = new ArrayList<Entry<String, String>>();
Step<Entry<String, String>> stpVer = StepVerifier var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> {
.create(tempDb(getTempDbGenerator(), allocator, db -> { var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5);
var mapMono = tempDictionary(db, updateMode).map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5));
return Flux.usingWhen(mapMono, map -> { var entriesFlux = entries.entrySet();
Flux<Entry<String, String>> entriesFlux = Flux.fromIterable(entries.entrySet()); var keysFlux = entriesFlux.stream().map(Entry::getKey).toList();
Flux<String> keysFlux = entriesFlux.map(Entry::getKey); map.setAllValues(entriesFlux.stream());
Flux<Optional<String>> resultsFlux = map.setAllValues(entriesFlux).thenMany(map.getMulti(null, keysFlux)); List<Optional<String>> resultsFlux;
return Flux.zip(keysFlux, resultsFlux, Map::entry); try (var stream = map.getMulti(null, keysFlux.stream())) {
}, LLUtils::finalizeResource) resultsFlux = stream.toList();
.filter(k -> k.getValue().isPresent()).map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())); }
} return Streams
)); .zip(keysFlux.stream(), resultsFlux.stream(), Map::entry)
.filter(k -> k.getValue().isPresent())
.map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow()))
.toList();
}));
if (shouldFail) { if (shouldFail) {
this.checkLeaks = false; this.checkLeaks = false;
stpVer.verifyError();
} else { } else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
for (Entry<String, String> ignored : remainingEntries) { Assertions.assertEquals(remainingEntries, stpVer);
stpVer = stpVer.expectNextMatches(remainingEntries::remove);
}
stpVer.verifyComplete();
} }
} }
@ParameterizedTest @ParameterizedTest
@MethodSource("provideArgumentsPutMulti") @MethodSource("provideArgumentsPutMulti")
public void testSetAllValuesAndGetPrevious(MapType mapType, UpdateMode updateMode, Object2ObjectSortedMap<String, String> entries, boolean shouldFail) { public void testSetAllValuesAndGetPrevious(MapType mapType,
var remainingEntries = new ConcurrentHashMap<Entry<String, String>, Boolean>().keySet(true); UpdateMode updateMode,
Step<Entry<String, String>> stpVer = StepVerifier Object2ObjectSortedMap<String, String> entries,
.create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) boolean shouldFail) {
.map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) var remainingEntries = new ArrayList<Entry<String, String>>();
.flatMapMany(map -> Flux var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> {
.concat( var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5);
map.setAllValuesAndGetPrevious(Flux.fromIterable(entries.entrySet())), return Arrays.asList(toListClose(map.setAllValuesAndGetPrevious(entries.entrySet().stream())),
map.setAllValuesAndGetPrevious(Flux.fromIterable(entries.entrySet())) toListClose(map.setAllValuesAndGetPrevious(entries.entrySet().stream()))
) );
.doFinally(s -> map.close()) }));
)
));
if (shouldFail) { if (shouldFail) {
this.checkLeaks = false; this.checkLeaks = false;
stpVer.verifyError();
} else { } else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
for (Entry<String, String> ignored : remainingEntries) { Assertions.assertEquals(Arrays.asList(List.of(), remainingEntries), stpVer);
stpVer = stpVer.expectNextMatches(remainingEntries::remove);
}
stpVer.verifyComplete();
} }
} }
@ -471,32 +419,24 @@ public abstract class TestDictionaryMap {
UpdateMode updateMode, UpdateMode updateMode,
Object2ObjectSortedMap<String, String> entries, Object2ObjectSortedMap<String, String> entries,
boolean shouldFail) { boolean shouldFail) {
var remainingEntries = new ConcurrentHashMap<Entry<String, String>, Boolean>().keySet(true); var remainingEntries = new ArrayList<Entry<String, String>>();
Step<Entry<String, String>> stpVer = StepVerifier var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> {
.create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5);
.map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) var entriesFlux = entries.entrySet();
.flatMapMany(map -> { var keysFlux = entriesFlux.stream().map(Entry::getKey).toList();
var entriesFlux = Flux.fromIterable(entries.entrySet()); map.set(entries);
var keysFlux = entriesFlux.map(Entry::getKey); var resultsFlux = toListClose(map.getMulti(null, entries.keySet().stream()));
var resultsFlux = Flux return Streams
.concat( .zip(keysFlux.stream(), resultsFlux.stream(), Map::entry)
map.set(entries).then(Mono.empty()), .filter(k -> k.getValue().isPresent())
map.getMulti(null, Flux.fromIterable(entries.keySet())) .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow()))
); .toList();
return Flux.zip(keysFlux, resultsFlux, Map::entry).doFinally(s -> map.close()); }));
})
.filter(k -> k.getValue().isPresent())
.map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow()))
));
if (shouldFail) { if (shouldFail) {
this.checkLeaks = false; this.checkLeaks = false;
stpVer.verifyError();
} else { } else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
for (Entry<String, String> ignored : remainingEntries) { Assertions.assertEquals(remainingEntries, stpVer);
stpVer = stpVer.expectNextMatches(remainingEntries::remove);
}
stpVer.verifyComplete();
} }
} }
@ -506,31 +446,20 @@ public abstract class TestDictionaryMap {
UpdateMode updateMode, UpdateMode updateMode,
Object2ObjectSortedMap<String, String> entries, Object2ObjectSortedMap<String, String> entries,
boolean shouldFail) { boolean shouldFail) {
Step<Boolean> stpVer = StepVerifier var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> {
.create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5);
.map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) var x1 = map.setAndGetChanged(entries);
.flatMapMany(map -> { var x2 = map.setAndGetChanged(entries);
Mono<Void> removalMono; if (!entries.isEmpty()) {
if (entries.isEmpty()) { map.remove(entries.keySet().stream().findFirst().orElseThrow());
removalMono = Mono.empty(); }
} else { var x3 = map.setAndGetChanged(entries);
removalMono = map.remove(entries.keySet().stream().findFirst().orElseThrow()); return Arrays.asList(x1, x2, x3);
} }));
return Flux
.concat(
map.setAndGetChanged(entries).single(),
map.setAndGetChanged(entries).single(),
removalMono.then(Mono.empty()),
map.setAndGetChanged(entries).single()
)
.doFinally(s -> map.close());
})
));
if (shouldFail) { if (shouldFail) {
this.checkLeaks = false; this.checkLeaks = false;
stpVer.verifyError();
} else { } else {
stpVer.expectNext(!entries.isEmpty(), false, !entries.isEmpty()).verifyComplete(); Assertions.assertEquals(Arrays.asList(!entries.isEmpty(), false, !entries.isEmpty()), stpVer);
} }
} }
@ -540,26 +469,14 @@ public abstract class TestDictionaryMap {
UpdateMode updateMode, UpdateMode updateMode,
Object2ObjectSortedMap<String, String> entries, Object2ObjectSortedMap<String, String> entries,
boolean shouldFail) { boolean shouldFail) {
var remainingEntries = new ConcurrentHashMap<Entry<String, String>, Boolean>().keySet(true); var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> {
Step<Entry<String, String>> stpVer = StepVerifier var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5);
.create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) return Arrays.asList(map.setAndGetPrevious(entries), map.setAndGetPrevious(entries));
.map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) }));
.flatMapMany(map -> Flux
.concat(map.setAndGetPrevious(entries), map.setAndGetPrevious(entries))
.map(Map::entrySet)
.concatMapIterable(list -> list)
.doFinally(s -> map.close())
)
));
if (shouldFail) { if (shouldFail) {
this.checkLeaks = false; this.checkLeaks = false;
stpVer.verifyError();
} else { } else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); Assertions.assertEquals(Arrays.asList(null, entries.isEmpty() ? null : entries), stpVer);
for (Entry<String, String> ignored : remainingEntries) {
stpVer = stpVer.expectNextMatches(remainingEntries::remove);
}
stpVer.verifyComplete();
} }
} }
@ -569,178 +486,116 @@ public abstract class TestDictionaryMap {
UpdateMode updateMode, UpdateMode updateMode,
Object2ObjectSortedMap<String, String> entries, Object2ObjectSortedMap<String, String> entries,
boolean shouldFail) { boolean shouldFail) {
var remainingEntries = new ConcurrentHashMap<Entry<String, String>, Boolean>().keySet(true); var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> {
Step<Entry<String, String>> stpVer = StepVerifier var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5);
.create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) map.set(entries);
.map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) return Arrays.asList(map.clearAndGetPrevious(), map.get(null));
.flatMapMany(map -> Flux }));
.concat(map.set(entries).then(Mono.empty()), map.clearAndGetPrevious(), map.get(null))
.map(Map::entrySet)
.concatMapIterable(list -> list)
.doFinally(s -> map.close())
)
));
if (shouldFail) { if (shouldFail) {
this.checkLeaks = false; this.checkLeaks = false;
stpVer.verifyError();
} else { } else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); Assertions.assertEquals(Arrays.asList(entries.isEmpty() ? null : entries, null), stpVer);
for (Entry<String, String> ignored : remainingEntries) {
stpVer = stpVer.expectNextMatches(remainingEntries::remove);
}
stpVer.verifyComplete();
} }
} }
@ParameterizedTest @ParameterizedTest
@MethodSource("provideArgumentsPutMulti") @MethodSource("provideArgumentsPutMulti")
public void testPutMultiGetAllValues(MapType mapType, UpdateMode updateMode, Object2ObjectSortedMap<String, String> entries, boolean shouldFail) { public void testPutMultiGetAllValues(MapType mapType, UpdateMode updateMode, Object2ObjectSortedMap<String, String> entries, boolean shouldFail) {
var remainingEntries = new ConcurrentHashMap<Entry<String, String>, Boolean>().keySet(true); var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> {
Step<Entry<String, String>> stpVer = StepVerifier var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5);
.create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) map.putMulti(entries.entrySet().stream());
.map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) return toListClose(map.getAllValues(null, false));
.flatMapMany(map -> Flux }));
.concat(
map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()),
map.getAllValues(null, false)
)
.doFinally(s -> map.close())
)
));
if (shouldFail) { if (shouldFail) {
this.checkLeaks = false; this.checkLeaks = false;
stpVer.verifyError();
} else { } else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); Assertions.assertEquals(entries.entrySet(), Set.copyOf(stpVer));
for (Entry<String, String> ignored : remainingEntries) {
stpVer = stpVer.expectNextMatches(remainingEntries::remove);
}
stpVer.verifyComplete();
} }
} }
@ParameterizedTest @ParameterizedTest
@MethodSource("provideArgumentsPutMulti") @MethodSource("provideArgumentsPutMulti")
public void testPutMultiGet(MapType mapType, UpdateMode updateMode, Object2ObjectSortedMap<String, String> entries, boolean shouldFail) { public void testPutMultiGet(MapType mapType, UpdateMode updateMode, Object2ObjectSortedMap<String, String> entries, boolean shouldFail) {
var remainingEntries = new ConcurrentHashMap<Entry<String, String>, Boolean>().keySet(true); var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> {
Step<Entry<String, String>> stpVer = StepVerifier var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5);
.create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) map.putMulti(entries.entrySet().stream());
.map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) return map.get(null);
.flatMapMany(map -> Flux }));
.concat(
map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()),
map.get(null)
.map(Map::entrySet)
.flatMapIterable(list -> list)
)
.doFinally(s -> map.close())
)
));
if (shouldFail) { if (shouldFail) {
this.checkLeaks = false; this.checkLeaks = false;
stpVer.verifyError();
} else { } else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); Assertions.assertEquals(entries.isEmpty() ? null : entries, stpVer);
for (Entry<String, String> ignored : remainingEntries) {
stpVer = stpVer.expectNextMatches(remainingEntries::remove);
}
stpVer.verifyComplete();
} }
} }
@ParameterizedTest @ParameterizedTest
@MethodSource("provideArgumentsPutMulti") @MethodSource("provideArgumentsPutMulti")
public void testPutMultiGetAllStagesGet(MapType mapType, UpdateMode updateMode, Object2ObjectSortedMap<String, String> entries, boolean shouldFail) { public void testPutMultiGetAllStagesGet(MapType mapType, UpdateMode updateMode, Object2ObjectSortedMap<String, String> entries, boolean shouldFail) {
var remainingEntries = new ConcurrentHashMap<Entry<String, String>, Boolean>().keySet(true); var remainingEntries = new ArrayList<Entry<String, String>>();
Step<Entry<String, String>> stpVer = StepVerifier var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> {
.create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5);
.map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) map.putMulti(entries.entrySet().stream());
.flatMapMany(map -> Flux return toListClose(map.getAllStages(null, false).map(stage -> {
.concat( var v = stage.getValue().get(null);
map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), if (v == null) {
map return null;
.getAllStages(null, false) }
.flatMap(stage -> stage return Map.entry(stage.getKey(), v);
.getValue() }));
.get(null) }));
.map(val -> Map.entry(stage.getKey(), val))
.doFinally(s -> stage.getValue().close())
)
)
.doFinally(s -> map.close())
)
));
if (shouldFail) { if (shouldFail) {
this.checkLeaks = false; this.checkLeaks = false;
stpVer.verifyError();
} else { } else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
for (Entry<String, String> ignored : remainingEntries) { Assertions.assertEquals(new HashSet<>(remainingEntries), Set.copyOf(stpVer));
stpVer = stpVer.expectNextMatches(remainingEntries::remove);
}
stpVer.verifyComplete();
} }
} }
@ParameterizedTest @ParameterizedTest
@MethodSource("provideArgumentsPutMulti") @MethodSource("provideArgumentsPutMulti")
public void testPutMultiIsEmpty(MapType mapType, UpdateMode updateMode, Object2ObjectSortedMap<String, String> entries, boolean shouldFail) { public void testPutMultiIsEmpty(MapType mapType, UpdateMode updateMode, Object2ObjectSortedMap<String, String> entries, boolean shouldFail) {
var remainingEntries = new ConcurrentHashMap<Entry<String, String>, Boolean>().keySet(true); var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> {
Step<Boolean> stpVer = StepVerifier var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5);
.create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) var x1 = map.isEmpty(null);
.map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) map.putMulti(entries.entrySet().stream());
.flatMapMany(map -> Flux var x2 = map.isEmpty(null);
.concat( return Arrays.asList(x1, x2);
map.isEmpty(null), }));
map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()),
map.isEmpty(null)
)
.doFinally(s -> map.close())
)
.flatMap(val -> shouldFail ? Mono.empty() : Mono.just(val))
));
if (shouldFail) { if (shouldFail) {
this.checkLeaks = false; this.checkLeaks = false;
stpVer.verifyError();
} else { } else {
stpVer.expectNext(true, entries.isEmpty()).verifyComplete(); Assertions.assertEquals(Arrays.asList(true, entries.isEmpty()), stpVer);
} }
} }
@ParameterizedTest @ParameterizedTest
@MethodSource("provideArgumentsPutMulti") @MethodSource("provideArgumentsPutMulti")
public void testPutMultiClear(MapType mapType, UpdateMode updateMode, Object2ObjectSortedMap<String, String> entries, boolean shouldFail) { public void testPutMultiClear(MapType mapType,
UpdateMode updateMode,
Object2ObjectSortedMap<String, String> entries,
boolean shouldFail) {
List<Boolean> result; List<Boolean> result;
try { try {
result = SyncUtils.run(DbTestUtils.tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) result = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> {
.map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5);
.flatMapMany(map -> Flux var x1 = map.isEmpty(null);
.concat( map.putMulti(entries.entrySet().stream());
map.isEmpty(null), var x2 = map.isEmpty(null);
map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), map.clear();
map.isEmpty(null), var x3 = map.isEmpty(null);
map.clear().then(Mono.empty()), return List.of(x1, x2, x3);
map.isEmpty(null) }));
) Assertions.assertEquals(true, result.get(0));
.doFinally(s -> map.close())
) Assertions.assertEquals(entries.isEmpty(), result.get(1));
.flatMap(val -> shouldFail ? Mono.empty() : Mono.just(val))
.collectList() Assertions.assertEquals(true, result.get(2));
).singleOrEmpty());
} catch (Exception ex) { } catch (Exception ex) {
if (shouldFail) { if (shouldFail) {
this.checkLeaks = false; this.checkLeaks = false;
} else { } else {
throw ex; throw ex;
} }
return;
} }
Assertions.assertEquals(true, result.get(0));
Assertions.assertEquals(entries.isEmpty(), result.get(1));
Assertions.assertEquals(true, result.get(2));
} }
*/
} }

View File

@ -3,15 +3,22 @@ package it.cavallium.dbengine.tests;
import static it.cavallium.dbengine.tests.DbTestUtils.BIG_STRING; import static it.cavallium.dbengine.tests.DbTestUtils.BIG_STRING;
import static it.cavallium.dbengine.tests.DbTestUtils.ensureNoLeaks; import static it.cavallium.dbengine.tests.DbTestUtils.ensureNoLeaks;
import static it.cavallium.dbengine.tests.DbTestUtils.isCIMode; import static it.cavallium.dbengine.tests.DbTestUtils.isCIMode;
import static it.cavallium.dbengine.tests.DbTestUtils.run;
import static it.cavallium.dbengine.tests.DbTestUtils.tempDatabaseMapDictionaryDeepMapHashMap;
import static it.cavallium.dbengine.tests.DbTestUtils.tempDb;
import static it.cavallium.dbengine.tests.DbTestUtils.tempDictionary; import static it.cavallium.dbengine.tests.DbTestUtils.tempDictionary;
import com.google.common.collect.Streams; import com.google.common.collect.Streams;
import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateMode;
import java.util.List; import java.util.List;
import java.util.Map.Entry;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
public abstract class TestDictionaryMapDeepHashMap { public abstract class TestDictionaryMapDeepHashMap {
private boolean checkLeaks = true; private boolean checkLeaks = true;
@ -90,32 +97,24 @@ public abstract class TestDictionaryMapDeepHashMap {
} }
} }
/*
@ParameterizedTest @ParameterizedTest
@MethodSource("provideArgumentsPut") @MethodSource("provideArgumentsPut")
public void testAtPutValueGetAllValues(UpdateMode updateMode, String key1, String key2, String value, boolean shouldFail) { public void testAtPutValueGetAllValues(UpdateMode updateMode, String key1, String key2, String value, boolean shouldFail) {
var stpVer = StepVerifier var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> {
.create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) var map = tempDatabaseMapDictionaryDeepMapHashMap(tempDictionary(db, updateMode), 5);
.map(dict -> tempDatabaseMapDictionaryDeepMapHashMap(dict, 5)) map.at(null, key1).putValue(key2, value);
.flatMapMany(map -> map return map
.at(null, key1).flatMap(v -> v.putValue(key2, value).doFinally(s -> v.close())) .getAllValues(null, false)
.thenMany(map .map(Entry::getValue)
.getAllValues(null, false) .flatMap(maps -> maps.entrySet().stream())
.map(Entry::getValue) .map(Entry::getValue)
.flatMap(maps -> Flux.fromIterable(maps.entrySet())) .toList();
.map(Entry::getValue) }));
)
.doFinally(s -> map.close())
)
));
if (shouldFail) { if (shouldFail) {
this.checkLeaks = false; this.checkLeaks = false;
stpVer.verifyError();
} else { } else {
stpVer.expectNext(value).verifyComplete(); Assertions.assertEquals(List.of(value), stpVer);
} }
} }
*/
} }