diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index d5bff52..95a7eb1 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -86,6 +86,8 @@ public class LLUtils { private static final MethodHandle IS_IN_NON_BLOCKING_THREAD_MH; private static final Consumer NULL_CONSUMER = ignored -> {}; + private static final Buf BUF_TRUE = Buf.wrap(new byte[] {(byte) 1}); + private static final Buf BUF_FALSE = Buf.wrap(new byte[] {(byte) 0}); static { for (int i1 = 0; i1 < 256; i1++) { @@ -124,6 +126,8 @@ public class LLUtils { } public static boolean responseToBoolean(Buf response) { + if (response == BUF_FALSE) return false; + if (response == BUF_TRUE) return true; assert response.size() == 1; return response.getBoolean(0); } @@ -133,7 +137,7 @@ public class LLUtils { } public static Buf booleanToResponseByteBuffer(boolean bool) { - return Buf.wrap(new byte[] {bool ? (byte) 1 : 0}); + return bool ? BUF_TRUE : BUF_FALSE; } @Nullable diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 6f7ca99..61c28d8 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -194,7 +194,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep extends DatabaseMapDictionaryDeep extends DatabaseMapDictionaryDeep extends DatabaseMapDictionaryDeep implements DatabaseStageMap get(@Nullable CompositeSnapshot snapshot) { var v = subDictionary.get(snapshot); - return v != null ? deserializeMap(v) : null; + var result = v != null ? deserializeMap(v) : null; + return result != null && result.isEmpty() ? null : result; } @Override @@ -203,7 +204,8 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap setAndGetPrevious(Object2ObjectSortedMap value) { var v = subDictionary.setAndGetPrevious(this.serializeMap(value)); - return v != null ? deserializeMap(v) : null; + var result = v != null ? deserializeMap(v) : null; + return result != null && result.isEmpty() ? null : result; } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index 364fdd6..4df4976 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -10,6 +10,7 @@ import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationFunction; +import it.cavallium.dbengine.utils.StreamUtils; import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps; @@ -131,15 +132,20 @@ public interface DatabaseStageMap> extends Dat Function, @NotNull Entry> entriesReplacer, boolean smallRange) { if (canKeysChange) { - this.setAllValues(this.getAllValues(null, smallRange).map(entriesReplacer)); + try (var values = this.getAllValues(null, smallRange)) { + this.setAllValues(values.map(entriesReplacer)); + } } else { - this.getAllValues(null, smallRange).map(entriesReplacer) - .forEach(replacedEntry -> this.at(null, replacedEntry.getKey()).set(replacedEntry.getValue())); + try (var values = this.getAllValues(null, smallRange).map(entriesReplacer)) { + values.forEach(replacedEntry -> this.at(null, replacedEntry.getKey()).set(replacedEntry.getValue())); + } } } default void replaceAll(Consumer> entriesReplacer) { - this.getAllStages(null, false).forEach(entriesReplacer); + try (var stream = this.getAllStages(null, false)) { + stream.forEach(entriesReplacer); + } } @Override @@ -148,11 +154,15 @@ public interface DatabaseStageMap> extends Dat if (value == null) { map = this.clearAndGetPrevious(); } else { - map = this - .setAllValuesAndGetPrevious(value.entrySet().stream()) - .collect(Collectors.toMap(Entry::getKey, Entry::getValue, (a, b) -> a, Object2ObjectLinkedOpenHashMap::new)); + try (var stream = this.setAllValuesAndGetPrevious(value.entrySet().stream())) { + map = stream.collect(Collectors.toMap(Entry::getKey, + Entry::getValue, + (a, b) -> a, + Object2ObjectLinkedOpenHashMap::new + )); + } } - return map; + return map != null && map.isEmpty() ? null : map; } @Override @@ -173,9 +183,12 @@ public interface DatabaseStageMap> extends Dat SerializationFunction<@Nullable Object2ObjectSortedMap, @Nullable Object2ObjectSortedMap> updater) { var updateMode = this.getUpdateMode(); if (updateMode == UpdateMode.ALLOW_UNSAFE) { - Object2ObjectSortedMap v = this - .getAllValues(null, true) - .collect(Collectors.toMap(Entry::getKey, Entry::getValue, (a, b) -> a, Object2ObjectLinkedOpenHashMap::new)); + Object2ObjectSortedMap v; + + try (var stream = this.getAllValues(null, true)) { + v = stream + .collect(Collectors.toMap(Entry::getKey, Entry::getValue, (a, b) -> a, Object2ObjectLinkedOpenHashMap::new)); + } if (v.isEmpty()) { v = null; @@ -212,7 +225,7 @@ public interface DatabaseStageMap> extends Dat @Override default long leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) { - return this.getAllStages(snapshot, false).count(); + return StreamUtils.countClose(this.getAllStages(snapshot, false)); } /** diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 65a0d92..ac2ae10 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -77,12 +77,14 @@ import org.rocksdb.InfoLogLevel; import org.rocksdb.IngestExternalFileOptions; import org.rocksdb.OptimisticTransactionDB; import org.rocksdb.PersistentCache; +import org.rocksdb.PlainTableConfig; import org.rocksdb.PrepopulateBlobCache; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.Snapshot; import org.rocksdb.Statistics; import org.rocksdb.StatsLevel; +import org.rocksdb.TableFormatConfig; import org.rocksdb.TickerType; import org.rocksdb.TransactionDB; import org.rocksdb.TransactionDBOptions; @@ -303,7 +305,7 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa columnFamilyOptions.setCompressionPerLevel(compressionTypes); } - final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig(); + final TableFormatConfig tableOptions = inMemory ? new PlainTableConfig() : new BlockBasedTableConfig(); if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) { if (!databaseOptions.lowMemory()) { // tableOptions.setOptimizeFiltersForMemory(true); @@ -313,7 +315,9 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa if (columnOptions.writeBufferSize().isPresent()) { columnFamilyOptions.setWriteBufferSize(columnOptions.writeBufferSize().get()); } - tableOptions.setVerifyCompression(false); + if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) { + blockBasedTableConfig.setVerifyCompression(false); + } if (columnOptions.filter().isPresent()) { var filterOptions = columnOptions.filter().get(); @@ -322,9 +326,13 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa // If OptimizeFiltersForHits == false: memory size = bitsPerKey * totalKeys final BloomFilter bloomFilter = new BloomFilter(bloomFilterOptions.bitsPerKey()); refs.track(bloomFilter); - tableOptions.setFilterPolicy(bloomFilter); + if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) { + blockBasedTableConfig.setFilterPolicy(bloomFilter); + } } else if (filterOptions instanceof NoFilter) { - tableOptions.setFilterPolicy(null); + if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) { + blockBasedTableConfig.setFilterPolicy(null); + } } } boolean cacheIndexAndFilterBlocks = columnOptions.cacheIndexAndFilterBlocks() @@ -340,39 +348,44 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa columnFamilyOptions.setMaxWriteBufferNumber(4); } } - tableOptions - // http://rocksdb.org/blog/2018/08/23/data-block-hash-index.html - .setDataBlockIndexType(DataBlockIndexType.kDataBlockBinaryAndHash) - // http://rocksdb.org/blog/2018/08/23/data-block-hash-index.html - .setDataBlockHashTableUtilRatio(0.75) - // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters - .setPinTopLevelIndexAndFilter(true) - // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters - .setPinL0FilterAndIndexBlocksInCache(true) - // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters - .setCacheIndexAndFilterBlocksWithHighPriority(true) - .setCacheIndexAndFilterBlocks(cacheIndexAndFilterBlocks) - // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters - // Enabling partition filters increase the reads by 2x - .setPartitionFilters(columnOptions.partitionFilters().orElse(false)) - // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters - .setIndexType(columnOptions.partitionFilters().orElse(false) ? IndexType.kTwoLevelIndexSearch : IndexType.kBinarySearch) - .setChecksumType(ChecksumType.kXXH3) - // Spinning disks: 64KiB to 256KiB (also 512KiB). SSDs: 16KiB - // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks - // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html - .setBlockSize(columnOptions.blockSize().orElse((databaseOptions.spinning() ? 128 : 16) * 1024)) - .setBlockCacheCompressed(optionsWithCache.compressedCache()) - .setBlockCache(optionsWithCache.standardCache()) - .setPersistentCache(resolvePersistentCache(persistentCaches, - rocksdbOptions, - databaseOptions.persistentCaches(), - columnOptions.persistentCacheId(), - refs, - rocksLogger - )); + if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) { + blockBasedTableConfig + // http://rocksdb.org/blog/2018/08/23/data-block-hash-index.html + .setDataBlockIndexType(DataBlockIndexType.kDataBlockBinaryAndHash) + // http://rocksdb.org/blog/2018/08/23/data-block-hash-index.html + .setDataBlockHashTableUtilRatio(0.75) + // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters + .setPinTopLevelIndexAndFilter(true) + // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters + .setPinL0FilterAndIndexBlocksInCache(true) + // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters + .setCacheIndexAndFilterBlocksWithHighPriority(true) + .setCacheIndexAndFilterBlocks(cacheIndexAndFilterBlocks) + // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters + // Enabling partition filters increase the reads by 2x + .setPartitionFilters(columnOptions.partitionFilters().orElse(false)) + // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters + .setIndexType(columnOptions.partitionFilters().orElse(false) ? IndexType.kTwoLevelIndexSearch : IndexType.kBinarySearch) + .setChecksumType(ChecksumType.kXXH3) + // Spinning disks: 64KiB to 256KiB (also 512KiB). SSDs: 16KiB + // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks + // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html + .setBlockSize(columnOptions.blockSize().orElse((databaseOptions.spinning() ? 128 : 16) * 1024)) + .setBlockCacheCompressed(optionsWithCache.compressedCache()) + .setBlockCache(optionsWithCache.standardCache()) + .setPersistentCache(resolvePersistentCache(persistentCaches, + rocksdbOptions, + databaseOptions.persistentCaches(), + columnOptions.persistentCacheId(), + refs, + rocksLogger + )); + } columnFamilyOptions.setTableFormatConfig(tableOptions); + if (inMemory) { + columnFamilyOptions.useFixedLengthPrefixExtractor(3); + } columnFamilyOptions.setCompactionPriority(CompactionPriority.MinOverlappingRatio); if (columnOptions.filter().isPresent()) { var filterOptions = columnOptions.filter().get(); diff --git a/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java b/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java index 8516931..e9177b4 100644 --- a/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java +++ b/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java @@ -2,6 +2,8 @@ package it.cavallium.dbengine.utils; import com.google.common.collect.Iterators; import com.google.common.collect.Streams; +import it.cavallium.dbengine.database.SubStageEntry; +import it.cavallium.dbengine.database.collections.DatabaseStage; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -88,6 +90,12 @@ public class StreamUtils { } } + public static long countClose(Stream stream) { + try (stream) { + return stream.count(); + } + } + private record BatchSpliterator(Spliterator base, int batchSize) implements Spliterator> { @Override diff --git a/src/test/java/it/cavallium/dbengine/tests/TestDictionaryMap.java b/src/test/java/it/cavallium/dbengine/tests/TestDictionaryMap.java index 54a88c5..780f165 100644 --- a/src/test/java/it/cavallium/dbengine/tests/TestDictionaryMap.java +++ b/src/test/java/it/cavallium/dbengine/tests/TestDictionaryMap.java @@ -1,18 +1,34 @@ package it.cavallium.dbengine.tests; import static it.cavallium.dbengine.tests.DbTestUtils.*; +import static it.cavallium.dbengine.utils.StreamUtils.toListClose; +import com.google.common.collect.Streams; +import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; +import it.cavallium.dbengine.utils.StreamUtils; +import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -147,50 +163,39 @@ public abstract class TestDictionaryMap { } } - /* @ParameterizedTest @MethodSource("provideArgumentsPut") public void testPutValueRemoveAndGetPrevious(MapType mapType, UpdateMode updateMode, String key, String value, boolean shouldFail) { - var stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) - .flatMapMany(map -> Flux - .concat( - map.removeAndGetPrevious(key), - map.putValue(key, value).then(map.removeAndGetPrevious(key)), - map.removeAndGetPrevious(key) - ) - .doFinally(s -> map.close()) - ) - )); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5); + var x1 = map.removeAndGetPrevious(key); + map.putValue(key, value); + var x2 = map.removeAndGetPrevious(key); + var x3 = map.removeAndGetPrevious(key); + return Arrays.asList(x1, x2, x3); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { - stpVer.expectNext(value).verifyComplete(); + Assertions.assertEquals(Arrays.asList(null, value, null), stpVer); } } @ParameterizedTest @MethodSource("provideArgumentsPut") public void testPutValueRemoveAndGetStatus(MapType mapType, UpdateMode updateMode, String key, String value, boolean shouldFail) { - var stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) - .flatMapMany(map -> Flux - .concat( - map.removeAndGetStatus(key), - map.putValue(key, value).then(map.removeAndGetStatus(key)), - map.removeAndGetStatus(key) - ) - .doFinally(s -> map.close()) - ) - )); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5); + var x1 = map.removeAndGetStatus(key); + map.putValue(key, value); + var x2 = map.removeAndGetStatus(key); + var x3 = map.removeAndGetStatus(key); + return Stream.of(x1, x2, x3).toList(); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { - stpVer.expectNext(false, true, false).verifyComplete(); + Assertions.assertEquals(Arrays.asList(false, true, false), stpVer); } } @@ -200,39 +205,27 @@ public abstract class TestDictionaryMap { if (updateMode == UpdateMode.DISALLOW && !isTestBadKeysEnabled()) { return; } - var stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) - .flatMapMany(map -> Flux - .concat( - map.updateValue(key, old -> { - Assertions.assertNull(old); - return "error?"; - }), - map.updateValue(key, old -> { - Assertions.assertEquals("error?", old); - return "error?"; - }), - map.updateValue(key, old -> { - Assertions.assertEquals("error?", old); - return "error?"; - }), - map.updateValue(key, old -> { - Assertions.assertEquals("error?", old); - return value; - }), - map.updateValue(key, old -> { - Assertions.assertEquals(value, old); - return value; - }) - ) - .doFinally(s -> map.close()) - ) - )); - if (updateMode == UpdateMode.DISALLOW || shouldFail) { - stpVer.verifyError(); - } else { - stpVer.expectNext(true, false, false, true, false).verifyComplete(); + var stpVer = run(updateMode == UpdateMode.DISALLOW || shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5); + return Arrays.asList(map.updateValue(key, old -> { + Assertions.assertNull(old); + return "error?"; + }), map.updateValue(key, old -> { + Assertions.assertEquals("error?", old); + return "error?"; + }), map.updateValue(key, old -> { + Assertions.assertEquals("error?", old); + return "error?"; + }), map.updateValue(key, old -> { + Assertions.assertEquals("error?", old); + return value; + }), map.updateValue(key, old -> { + Assertions.assertEquals(value, old); + return value; + })); + })); + if (updateMode != UpdateMode.DISALLOW && !shouldFail) { + Assertions.assertEquals(Arrays.asList(true, false, false, true, false), stpVer); } } @@ -242,103 +235,70 @@ public abstract class TestDictionaryMap { if (updateMode == UpdateMode.DISALLOW && !isTestBadKeysEnabled()) { return; } - var stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) - .flatMapMany(map -> Flux - .concat( - Mono - .fromRunnable(() -> log.debug("1. Updating value: {}", key)) - .then(map.updateValue(key, old -> { - assert old == null; - return "error?"; - })) - .doOnSuccess(s -> log.debug("1. Getting value: {}", key)) - .then(map.getValue(null, key)), + var stpVer = run(updateMode == UpdateMode.DISALLOW || shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5); + log.debug("1. Updating value: {}", key); + map.updateValue(key, old -> { + assert old == null; + return "error?"; + }); + log.debug("1. Getting value: {}", key); + var x2 = map.getValue(null, key); - Mono - .fromRunnable(() -> log.debug("2. Updating value: {}", key)) - .then(map.updateValue(key, old -> { - assert Objects.equals(old, "error?"); - return "error?"; - })) - .doOnSuccess(s -> log.debug("2. Getting value: {}", key)) - .then(map.getValue(null, key)), + log.debug("2. Updating value: {}", key); + map.updateValue(key, old -> { + assert Objects.equals(old, "error?"); + return "error?"; + }); + log.debug("2. Getting value: {}", key); + var x3 = map.getValue(null, key); - Mono - .fromRunnable(() -> log.debug("3. Updating value: {}", key)) - .then(map.updateValue(key, old -> { - assert Objects.equals(old, "error?"); - return "error?"; - })) - .doOnSuccess(s -> log.debug("3. Getting value: {}", key)) - .then(map.getValue(null, key)), + log.debug("3. Updating value: {}", key); + map.updateValue(key, old -> { + assert Objects.equals(old, "error?"); + return "error?"; + }); + log.debug("3. Getting value: {}", key); + var x4 = map.getValue(null, key); - Mono - .fromRunnable(() -> log.debug("4. Updating value: {}", key)) - .then(map.updateValue(key, old -> { - assert Objects.equals(old, "error?"); - return value; - })) - .doOnSuccess(s -> log.debug("4. Getting value: {}", key)) - .then(map.getValue(null, key)), + log.debug("4. Updating value: {}", key); + map.updateValue(key, old -> { + assert Objects.equals(old, "error?"); + return value; + }); + log.debug("4. Getting value: {}", key); + var x5 = map.getValue(null, key); - Mono - .fromRunnable(() -> log.debug("5. Updating value: {}", key)) - .then(map.updateValue(key, old -> { - assert Objects.equals(old, value); - return value; - })) - .doOnSuccess(s -> log.debug("5. Getting value: {}", key)) - .then(map.getValue(null, key)) - ) - .doFinally(s -> map.close()) - ) - )); - if (updateMode == UpdateMode.DISALLOW || shouldFail) { - stpVer.verifyError(); - } else { - stpVer.expectNext("error?", "error?", "error?", value, value).verifyComplete(); + log.debug("5. Updating value: {}", key); + map.updateValue(key, old -> { + assert Objects.equals(old, value); + return value; + }); + log.debug("5. Getting value: {}", key); + var x6 = map.getValue(null, key); + return Arrays.asList(x2, x3, x4, x5, x6); + })); + if (updateMode != UpdateMode.DISALLOW && !shouldFail) { + Assertions.assertEquals(Arrays.asList("error?", "error?", "error?", value, value), stpVer); } } - @Test - public void testUpdateGetWithCancel() { - tempDb(getTempDbGenerator(), allocator, db -> { - var mapMono = tempDictionary(db, UpdateMode.ALLOW) - .map(dict -> tempDatabaseMapDictionaryMap(dict, MapType.MAP, 5)); - - var keys = Flux.range(10, 89).map(n -> "key" + n).repeat(100); - - return Mono.usingWhen(mapMono, - map -> keys.flatMap(key -> map.updateValue(key, prevValue -> key + "-val")).then(), - LLUtils::finalizeResource - ); - }).take(50).blockLast(); - } - @ParameterizedTest @MethodSource("provideArgumentsPut") public void testPutAndGetChanged(MapType mapType, UpdateMode updateMode, String key, String value, boolean shouldFail) { - var stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) - .flatMapMany(map -> Flux - .concat( - map.putValueAndGetChanged(key, "error?").single(), - map.putValueAndGetChanged(key, value).single(), - map.putValueAndGetChanged(key, value).single(), - map.remove(key), - map.putValueAndGetChanged(key, "error?").single() - ) - .doFinally(s -> map.close()) - ) - )); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5); + var x1 = map.putValueAndGetChanged(key, "error?"); + var x2 = map.putValueAndGetChanged(key, value); + var x3 = map.putValueAndGetChanged(key, value); + map.remove(key); + var x4 = map.putValueAndGetChanged(key, "error?"); + return Arrays.asList(x1, x2, x3, x4); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { - stpVer.expectNext(true, true, false, true).verifyComplete(); + Assertions.assertEquals(Arrays.asList(true, true, false, true), stpVer); } } @@ -357,22 +317,21 @@ public abstract class TestDictionaryMap { return keys .stream() - .map(keyTuple -> keyTuple.mapT1(ks -> Flux - .zip(Flux.fromIterable(ks), Flux.fromIterable(values)) - .collectMap(Tuple2::getT1, Tuple2::getT2, Object2ObjectLinkedOpenHashMap::new) - .block() - )) - .flatMap(entryTuple -> Arrays.stream(UpdateMode.values()).map(updateMode -> new Tuple2<>(updateMode, + .map(keyTuple -> new Tuple2<>(Streams + .zip(keyTuple.getT1().stream(), values.stream(), Tuple2::new) + .collect(Collectors.toMap(Tuple2::getT1, + Tuple2::getT2, + (a, b) -> a, + it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap::new + )), keyTuple.getT2())) + .flatMap(entryTuple -> Arrays + .stream(UpdateMode.values()) + .map(updateMode -> new Tuple3<>(updateMode, entryTuple.getT1(), entryTuple.getT2()))) + .flatMap(entryTuple -> Stream.of(new Tuple4<>(MapType.MAP, entryTuple.getT1(), - entryTuple.getT2() - ))) - .flatMap(entryTuple -> Stream.of(new Tuple2<>(MapType.MAP, entryTuple.getT1(), entryTuple.getT2(), entryTuple.getT3() - ), new Tuple2<>(MapType.HASH_MAP, entryTuple.getT1(), - entryTuple.getT2(), - false - ))) + ), new Tuple4<>(MapType.HASH_MAP, entryTuple.getT1(), entryTuple.getT2(), false))) .filter(tuple -> !(tuple.getT1() == MapType.HASH_MAP && tuple.getT2() != UpdateMode.ALLOW)) .map(fullTuple -> Arguments.of(fullTuple.getT1(), fullTuple.getT2(), fullTuple.getT3(), fullTuple.getT4())); } @@ -380,88 +339,77 @@ public abstract class TestDictionaryMap { @ParameterizedTest @MethodSource("provideArgumentsPutMulti") public void testPutMultiGetMulti(MapType mapType, UpdateMode updateMode, Object2ObjectSortedMap entries, boolean shouldFail) { - var remainingEntries = new ConcurrentHashMap, Boolean>().keySet(true); - Step> stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) - .flatMapMany(map -> { - var entriesFlux = Flux.fromIterable(entries.entrySet()); - var keysFlux = entriesFlux.map(Entry::getKey); - var resultsFlux = Flux - .concat( - map.putMulti(entriesFlux).then(Mono.empty()), - map.getMulti(null, keysFlux) - ); - return Flux.zip(keysFlux, resultsFlux, Map::entry).doFinally(s -> map.close()); - }) + var remainingEntries = new ArrayList>(); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5); - .filter(entry -> entry.getValue().isPresent()) - .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) - )); + var entriesFlux = entries.entrySet(); + var keysFlux = entriesFlux.stream().map(Entry::getKey).toList(); + map.putMulti(entriesFlux.stream()); + List> results; + try (var resultsFlux = map.getMulti(null, keysFlux.stream())) { + results = resultsFlux.toList(); + } + return Streams + .zip(keysFlux.stream(), results.stream(), Map::entry) + .filter(entry -> entry.getValue().isPresent()) + .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) + .toList(); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); - for (Entry ignored : remainingEntries) { - stpVer = stpVer.expectNextMatches(remainingEntries::remove); - } - stpVer.verifyComplete(); + Assertions.assertEquals(remainingEntries, stpVer); } } @ParameterizedTest @MethodSource("provideArgumentsPutMulti") public void testSetAllValuesGetMulti(MapType mapType, UpdateMode updateMode, Object2ObjectSortedMap entries, boolean shouldFail) { - var remainingEntries = new ConcurrentHashMap, Boolean>().keySet(true); - Step> stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> { - var mapMono = tempDictionary(db, updateMode).map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)); - return Flux.usingWhen(mapMono, map -> { - Flux> entriesFlux = Flux.fromIterable(entries.entrySet()); - Flux keysFlux = entriesFlux.map(Entry::getKey); - Flux> resultsFlux = map.setAllValues(entriesFlux).thenMany(map.getMulti(null, keysFlux)); - return Flux.zip(keysFlux, resultsFlux, Map::entry); - }, LLUtils::finalizeResource) - .filter(k -> k.getValue().isPresent()).map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())); - } - )); + var remainingEntries = new ArrayList>(); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5); + + var entriesFlux = entries.entrySet(); + var keysFlux = entriesFlux.stream().map(Entry::getKey).toList(); + map.setAllValues(entriesFlux.stream()); + List> resultsFlux; + try (var stream = map.getMulti(null, keysFlux.stream())) { + resultsFlux = stream.toList(); + } + return Streams + .zip(keysFlux.stream(), resultsFlux.stream(), Map::entry) + .filter(k -> k.getValue().isPresent()) + .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) + .toList(); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); - for (Entry ignored : remainingEntries) { - stpVer = stpVer.expectNextMatches(remainingEntries::remove); - } - stpVer.verifyComplete(); + Assertions.assertEquals(remainingEntries, stpVer); } } @ParameterizedTest @MethodSource("provideArgumentsPutMulti") - public void testSetAllValuesAndGetPrevious(MapType mapType, UpdateMode updateMode, Object2ObjectSortedMap entries, boolean shouldFail) { - var remainingEntries = new ConcurrentHashMap, Boolean>().keySet(true); - Step> stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) - .flatMapMany(map -> Flux - .concat( - map.setAllValuesAndGetPrevious(Flux.fromIterable(entries.entrySet())), - map.setAllValuesAndGetPrevious(Flux.fromIterable(entries.entrySet())) - ) - .doFinally(s -> map.close()) - ) - )); + public void testSetAllValuesAndGetPrevious(MapType mapType, + UpdateMode updateMode, + Object2ObjectSortedMap entries, + boolean shouldFail) { + var remainingEntries = new ArrayList>(); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5); + return Arrays.asList(toListClose(map.setAllValuesAndGetPrevious(entries.entrySet().stream())), + toListClose(map.setAllValuesAndGetPrevious(entries.entrySet().stream())) + ); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); - for (Entry ignored : remainingEntries) { - stpVer = stpVer.expectNextMatches(remainingEntries::remove); - } - stpVer.verifyComplete(); + Assertions.assertEquals(Arrays.asList(List.of(), remainingEntries), stpVer); } } @@ -471,32 +419,24 @@ public abstract class TestDictionaryMap { UpdateMode updateMode, Object2ObjectSortedMap entries, boolean shouldFail) { - var remainingEntries = new ConcurrentHashMap, Boolean>().keySet(true); - Step> stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) - .flatMapMany(map -> { - var entriesFlux = Flux.fromIterable(entries.entrySet()); - var keysFlux = entriesFlux.map(Entry::getKey); - var resultsFlux = Flux - .concat( - map.set(entries).then(Mono.empty()), - map.getMulti(null, Flux.fromIterable(entries.keySet())) - ); - return Flux.zip(keysFlux, resultsFlux, Map::entry).doFinally(s -> map.close()); - }) - .filter(k -> k.getValue().isPresent()) - .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) - )); + var remainingEntries = new ArrayList>(); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5); + var entriesFlux = entries.entrySet(); + var keysFlux = entriesFlux.stream().map(Entry::getKey).toList(); + map.set(entries); + var resultsFlux = toListClose(map.getMulti(null, entries.keySet().stream())); + return Streams + .zip(keysFlux.stream(), resultsFlux.stream(), Map::entry) + .filter(k -> k.getValue().isPresent()) + .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) + .toList(); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); - for (Entry ignored : remainingEntries) { - stpVer = stpVer.expectNextMatches(remainingEntries::remove); - } - stpVer.verifyComplete(); + Assertions.assertEquals(remainingEntries, stpVer); } } @@ -506,31 +446,20 @@ public abstract class TestDictionaryMap { UpdateMode updateMode, Object2ObjectSortedMap entries, boolean shouldFail) { - Step stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) - .flatMapMany(map -> { - Mono removalMono; - if (entries.isEmpty()) { - removalMono = Mono.empty(); - } else { - removalMono = map.remove(entries.keySet().stream().findFirst().orElseThrow()); - } - return Flux - .concat( - map.setAndGetChanged(entries).single(), - map.setAndGetChanged(entries).single(), - removalMono.then(Mono.empty()), - map.setAndGetChanged(entries).single() - ) - .doFinally(s -> map.close()); - }) - )); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5); + var x1 = map.setAndGetChanged(entries); + var x2 = map.setAndGetChanged(entries); + if (!entries.isEmpty()) { + map.remove(entries.keySet().stream().findFirst().orElseThrow()); + } + var x3 = map.setAndGetChanged(entries); + return Arrays.asList(x1, x2, x3); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { - stpVer.expectNext(!entries.isEmpty(), false, !entries.isEmpty()).verifyComplete(); + Assertions.assertEquals(Arrays.asList(!entries.isEmpty(), false, !entries.isEmpty()), stpVer); } } @@ -540,26 +469,14 @@ public abstract class TestDictionaryMap { UpdateMode updateMode, Object2ObjectSortedMap entries, boolean shouldFail) { - var remainingEntries = new ConcurrentHashMap, Boolean>().keySet(true); - Step> stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) - .flatMapMany(map -> Flux - .concat(map.setAndGetPrevious(entries), map.setAndGetPrevious(entries)) - .map(Map::entrySet) - .concatMapIterable(list -> list) - .doFinally(s -> map.close()) - ) - )); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5); + return Arrays.asList(map.setAndGetPrevious(entries), map.setAndGetPrevious(entries)); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { - entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); - for (Entry ignored : remainingEntries) { - stpVer = stpVer.expectNextMatches(remainingEntries::remove); - } - stpVer.verifyComplete(); + Assertions.assertEquals(Arrays.asList(null, entries.isEmpty() ? null : entries), stpVer); } } @@ -569,178 +486,116 @@ public abstract class TestDictionaryMap { UpdateMode updateMode, Object2ObjectSortedMap entries, boolean shouldFail) { - var remainingEntries = new ConcurrentHashMap, Boolean>().keySet(true); - Step> stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) - .flatMapMany(map -> Flux - .concat(map.set(entries).then(Mono.empty()), map.clearAndGetPrevious(), map.get(null)) - .map(Map::entrySet) - .concatMapIterable(list -> list) - .doFinally(s -> map.close()) - ) - )); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5); + map.set(entries); + return Arrays.asList(map.clearAndGetPrevious(), map.get(null)); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { - entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); - for (Entry ignored : remainingEntries) { - stpVer = stpVer.expectNextMatches(remainingEntries::remove); - } - stpVer.verifyComplete(); + Assertions.assertEquals(Arrays.asList(entries.isEmpty() ? null : entries, null), stpVer); } } @ParameterizedTest @MethodSource("provideArgumentsPutMulti") public void testPutMultiGetAllValues(MapType mapType, UpdateMode updateMode, Object2ObjectSortedMap entries, boolean shouldFail) { - var remainingEntries = new ConcurrentHashMap, Boolean>().keySet(true); - Step> stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) - .flatMapMany(map -> Flux - .concat( - map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), - map.getAllValues(null, false) - ) - .doFinally(s -> map.close()) - ) - )); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5); + map.putMulti(entries.entrySet().stream()); + return toListClose(map.getAllValues(null, false)); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { - entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); - for (Entry ignored : remainingEntries) { - stpVer = stpVer.expectNextMatches(remainingEntries::remove); - } - stpVer.verifyComplete(); + Assertions.assertEquals(entries.entrySet(), Set.copyOf(stpVer)); } } @ParameterizedTest @MethodSource("provideArgumentsPutMulti") public void testPutMultiGet(MapType mapType, UpdateMode updateMode, Object2ObjectSortedMap entries, boolean shouldFail) { - var remainingEntries = new ConcurrentHashMap, Boolean>().keySet(true); - Step> stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) - .flatMapMany(map -> Flux - .concat( - map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), - map.get(null) - .map(Map::entrySet) - .flatMapIterable(list -> list) - ) - .doFinally(s -> map.close()) - ) - )); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5); + map.putMulti(entries.entrySet().stream()); + return map.get(null); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { - entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); - for (Entry ignored : remainingEntries) { - stpVer = stpVer.expectNextMatches(remainingEntries::remove); - } - stpVer.verifyComplete(); + Assertions.assertEquals(entries.isEmpty() ? null : entries, stpVer); } } @ParameterizedTest @MethodSource("provideArgumentsPutMulti") public void testPutMultiGetAllStagesGet(MapType mapType, UpdateMode updateMode, Object2ObjectSortedMap entries, boolean shouldFail) { - var remainingEntries = new ConcurrentHashMap, Boolean>().keySet(true); - Step> stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) - .flatMapMany(map -> Flux - .concat( - map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), - map - .getAllStages(null, false) - .flatMap(stage -> stage - .getValue() - .get(null) - .map(val -> Map.entry(stage.getKey(), val)) - .doFinally(s -> stage.getValue().close()) - ) - ) - .doFinally(s -> map.close()) - ) - )); + var remainingEntries = new ArrayList>(); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5); + map.putMulti(entries.entrySet().stream()); + return toListClose(map.getAllStages(null, false).map(stage -> { + var v = stage.getValue().get(null); + if (v == null) { + return null; + } + return Map.entry(stage.getKey(), v); + })); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); - for (Entry ignored : remainingEntries) { - stpVer = stpVer.expectNextMatches(remainingEntries::remove); - } - stpVer.verifyComplete(); + Assertions.assertEquals(new HashSet<>(remainingEntries), Set.copyOf(stpVer)); } } @ParameterizedTest @MethodSource("provideArgumentsPutMulti") public void testPutMultiIsEmpty(MapType mapType, UpdateMode updateMode, Object2ObjectSortedMap entries, boolean shouldFail) { - var remainingEntries = new ConcurrentHashMap, Boolean>().keySet(true); - Step stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) - .flatMapMany(map -> Flux - .concat( - map.isEmpty(null), - map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), - map.isEmpty(null) - ) - .doFinally(s -> map.close()) - ) - .flatMap(val -> shouldFail ? Mono.empty() : Mono.just(val)) - )); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5); + var x1 = map.isEmpty(null); + map.putMulti(entries.entrySet().stream()); + var x2 = map.isEmpty(null); + return Arrays.asList(x1, x2); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { - stpVer.expectNext(true, entries.isEmpty()).verifyComplete(); + Assertions.assertEquals(Arrays.asList(true, entries.isEmpty()), stpVer); } } @ParameterizedTest @MethodSource("provideArgumentsPutMulti") - public void testPutMultiClear(MapType mapType, UpdateMode updateMode, Object2ObjectSortedMap entries, boolean shouldFail) { + public void testPutMultiClear(MapType mapType, + UpdateMode updateMode, + Object2ObjectSortedMap entries, + boolean shouldFail) { List result; try { - result = SyncUtils.run(DbTestUtils.tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) - .flatMapMany(map -> Flux - .concat( - map.isEmpty(null), - map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), - map.isEmpty(null), - map.clear().then(Mono.empty()), - map.isEmpty(null) - ) - .doFinally(s -> map.close()) - ) - .flatMap(val -> shouldFail ? Mono.empty() : Mono.just(val)) - .collectList() - ).singleOrEmpty()); + result = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5); + var x1 = map.isEmpty(null); + map.putMulti(entries.entrySet().stream()); + var x2 = map.isEmpty(null); + map.clear(); + var x3 = map.isEmpty(null); + return List.of(x1, x2, x3); + })); + Assertions.assertEquals(true, result.get(0)); + + Assertions.assertEquals(entries.isEmpty(), result.get(1)); + + Assertions.assertEquals(true, result.get(2)); } catch (Exception ex) { if (shouldFail) { this.checkLeaks = false; } else { throw ex; } - return; } - - Assertions.assertEquals(true, result.get(0)); - - Assertions.assertEquals(entries.isEmpty(), result.get(1)); - - Assertions.assertEquals(true, result.get(2)); } - */ } diff --git a/src/test/java/it/cavallium/dbengine/tests/TestDictionaryMapDeep.java b/src/test/java/it/cavallium/dbengine/tests/TestDictionaryMapDeep.java index 7bc9ed9..2591bdd 100644 --- a/src/test/java/it/cavallium/dbengine/tests/TestDictionaryMapDeep.java +++ b/src/test/java/it/cavallium/dbengine/tests/TestDictionaryMapDeep.java @@ -6,6 +6,7 @@ import static it.cavallium.dbengine.tests.DbTestUtils.isCIMode; import static it.cavallium.dbengine.tests.DbTestUtils.run; import static it.cavallium.dbengine.tests.DbTestUtils.runVoid; import static it.cavallium.dbengine.tests.DbTestUtils.tempDatabaseMapDictionaryDeepMap; +import static it.cavallium.dbengine.tests.DbTestUtils.tempDb; import static it.cavallium.dbengine.tests.DbTestUtils.tempDictionary; import com.google.common.collect.Streams; @@ -13,10 +14,16 @@ import it.cavallium.dbengine.database.UpdateMode; import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; @@ -204,25 +211,22 @@ public abstract class TestDictionaryMapDeep { gen.closeTempDb(db); } -/* + @ParameterizedTest @MethodSource("provideArgumentsSet") public void testSetValueGetAllValues(UpdateMode updateMode, String key, Object2ObjectSortedMap value, boolean shouldFail) { - var stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> map - .putValue(key, value) - .thenMany(map.getAllValues(null, false)) - .doFinally(s -> map.close()) - ) - )); + var result = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); + map.putValue(key, value); + try (var stream = map.getAllValues(null, false)) { + return stream.toList(); + } + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { - stpVer.expectNext(Map.entry(key, value)).verifyComplete(); + Assertions.assertEquals(List.of(Map.entry(key, value)), result); } } @@ -233,49 +237,29 @@ public abstract class TestDictionaryMapDeep { Object2ObjectSortedMap value, boolean shouldFail) { var remainingEntries = new ConcurrentHashMap, Boolean>().keySet(true); - Step> stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map_ -> Flux.using( - () -> map_, - map -> map - .at(null, key) - .flatMap(v_ -> Mono.using( - () -> v_, - v -> v.set(value), - SimpleResource::close - )) - .then(map - .at(null, "capra") - .flatMap(v_ -> Mono.using( - () -> v_, - v -> v.set(new Object2ObjectLinkedOpenHashMap<>(Map.of("normal", "123", "ormaln", "456"))), - SimpleResource::close - )) - ) - .thenMany(map - .getAllStages(null, false) - .flatMap(v -> v.getValue() - .getAllValues(null, false) - .map(result -> new Tuple2<>(v.getKey(), result.getKey(), result.getValue())) - .doFinally(s -> v.getValue().close()) - ) - ), - SimpleResource::close - )) - )); + List> result = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var dict = tempDictionary(db, updateMode); + var map = tempDatabaseMapDictionaryDeepMap(dict, 5, 6); + var v = map.at(null, key); + v.set(value); + var v2 = map.at(null, "capra"); + v2.set(new Object2ObjectLinkedOpenHashMap<>(Map.of("normal", "123", "ormaln", "456"))); + try (var stages = map.getAllStages(null, false)) { + return stages + .flatMap(stage -> stage + .getValue() + .getAllValues(null, false) + .map(r -> new Tuple3<>(stage.getKey(), r.getKey(), r.getValue()))) + .toList(); + } + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { - value.forEach((k, v) -> remainingEntries.add(new Tuple2<>(key, k, v))); - remainingEntries.add(new Tuple2<>("capra", "normal", "123")); - remainingEntries.add(new Tuple2<>("capra", "ormaln", "456")); - for (Tuple3 ignored : remainingEntries) { - stpVer = stpVer.expectNextMatches(remainingEntries::remove); - } - stpVer.verifyComplete(); - assert remainingEntries.isEmpty(); + value.forEach((k, v) -> remainingEntries.add(new Tuple3<>(key, k, v))); + remainingEntries.add(new Tuple3<>("capra", "normal", "123")); + remainingEntries.add(new Tuple3<>("capra", "ormaln", "456")); + Assertions.assertEquals(remainingEntries, new HashSet<>(result)); } } @@ -283,30 +267,17 @@ public abstract class TestDictionaryMapDeep { @MethodSource({"provideArgumentsPut"}) public void testAtPutValueAtGetValue(UpdateMode updateMode, String key1, String key2, String value, boolean shouldFail) { - var stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMap(map -> map - .at(null, key1) - .flatMap(v -> v - .putValue(key2, value) - .doFinally(s -> v.close()) - ) - .then(map - .at(null, key1) - .flatMap(v -> v - .getValue(null, key2) - .doFinally(s -> v.close()) - ) - ) - .doFinally(s -> map.close()) - ) - )); + var result = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); + var v = map.at(null, key1); + v.putValue(key2, value); + var v2 = map.at(null, key1); + return v2.getValue(null, key2); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { - stpVer.expectNext(value).verifyComplete(); + Assertions.assertEquals(result, value); } } @@ -316,69 +287,40 @@ public abstract class TestDictionaryMapDeep { String key, Object2ObjectSortedMap value, boolean shouldFail) { - var stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> Flux - .concat - (map - .putValueAndGetPrevious(key, new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error."))) - .defaultIfEmpty(new Object2ObjectLinkedOpenHashMap<>(Map.of("nothing", "nothing"))), - map.putValueAndGetPrevious(key, value), - map.putValueAndGetPrevious(key, value) - ) - .doFinally(s -> map.close()) - ) - )); + var result = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var dict = tempDictionary(db, updateMode); + var map = tempDatabaseMapDictionaryDeepMap(dict, 5, 6); + var x1 = Objects.requireNonNullElse( + map.putValueAndGetPrevious(key, new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error."))), + new Object2ObjectLinkedOpenHashMap<>(Map.of("nothing", "nothing")) + ); + var x2 = Objects.requireNonNull(map.putValueAndGetPrevious(key, value)); + var x3 = Objects.requireNonNull(map.putValueAndGetPrevious(key, value)); + return List.of(x1, x2, x3); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { - stpVer - .expectNext(new Object2ObjectLinkedOpenHashMap<>(Map.of("nothing", "nothing")), - new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error.")) - ) - .expectNext(value) - .verifyComplete(); + Assertions.assertEquals(List.of(new Object2ObjectLinkedOpenHashMap<>(Map.of("nothing", "nothing")), + new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error.")), value), result); } } @ParameterizedTest @MethodSource("provideArgumentsPut") public void testAtPutValueAndGetPrevious(UpdateMode updateMode, String key1, String key2, String value, - boolean shouldFail) { - var stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> Flux - .concat( - map - .at(null, key1) - .flatMap(v -> v - .putValueAndGetPrevious(key2, "error?") - .doFinally(s -> v.close()) - ), - map - .at(null, key1) - .flatMap(v -> v - .putValueAndGetPrevious(key2, value) - .doFinally(s -> v.close()) - ), - map - .at(null, key1) - .flatMap(v -> v - .putValueAndGetPrevious(key2, value) - .doFinally(s -> v.close()) - ) - ) - .doFinally(s -> map.close()) - ) - )); + boolean shouldFail) throws IOException { + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); + var x1 = map.at(null, key1).putValueAndGetPrevious(key2, "error?"); + var x2 = map.at(null, key1).putValueAndGetPrevious(key2, value); + var x3 = map.at(null, key1).putValueAndGetPrevious(key2, value); + return Stream.of(x1, x2, x3).filter(Objects::nonNull).toList(); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { - stpVer.expectNext("error?", value).verifyComplete(); + Assertions.assertEquals(List.of("error?", value), stpVer); } } @@ -388,23 +330,18 @@ public abstract class TestDictionaryMapDeep { String key, Object2ObjectSortedMap value, boolean shouldFail) { - var stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> Flux - .concat( - map.removeAndGetPrevious(key), - map.putValue(key, value).then(map.removeAndGetPrevious(key)), - map.removeAndGetPrevious(key) - ) - .doFinally(s -> map.close()) - ) - )); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); + var x1 = map.removeAndGetPrevious(key); + map.putValue(key, value); + var x2 = map.removeAndGetPrevious(key); + var x3 = map.removeAndGetPrevious(key); + return Stream.of(x1, x2, x3).filter(Objects::nonNull).toList(); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { - stpVer.expectNext(value).verifyComplete(); + Assertions.assertEquals(List.of(value), stpVer); } } @@ -412,39 +349,22 @@ public abstract class TestDictionaryMapDeep { @MethodSource("provideArgumentsPut") public void testAtPutValueRemoveAndGetPrevious(UpdateMode updateMode, String key1, String key2, String value, boolean shouldFail) { - var stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> Flux - .concat( - map - .at(null, key1) - .flatMap(v -> v - .putValue(key2, "error?") - .then(v.removeAndGetPrevious(key2)) - .doFinally(s -> v.close()) - ), - map - .at(null, key1) - .flatMap(v -> v - .putValue(key2, value) - .then(v.removeAndGetPrevious(key2)) - .doFinally(s -> v.close()) - ), - map - .at(null, key1) - .flatMap(v -> v.removeAndGetPrevious(key2) - .doFinally(s -> v.close()) - ) - ) - .doFinally(s -> map.close()) - ) - )); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); + var stage1 = map.at(null, key1); + stage1.putValue(key2, "error?"); + var x1 = stage1.removeAndGetPrevious(key2); + var stage2 = map.at(null, key1); + stage2.putValue(key2, value); + var x2 = stage2.removeAndGetPrevious(key2); + var stage3 = map.at(null, key1); + var x3 = stage3.removeAndGetPrevious(key2); + return Stream.of(x1, x2, x3).filter(Objects::nonNull).toList(); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { - stpVer.expectNext("error?", value).verifyComplete(); + Assertions.assertEquals(List.of("error?", value), stpVer); } } @@ -454,23 +374,18 @@ public abstract class TestDictionaryMapDeep { String key, Object2ObjectSortedMap value, boolean shouldFail) { - var stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> Flux - .concat( - map.removeAndGetStatus(key), - map.putValue(key, value).then(map.removeAndGetStatus(key)), - map.removeAndGetStatus(key) - ) - .doFinally(s -> map.close()) - ) - )); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); + var x1 = map.removeAndGetStatus(key); + map.putValue(key, value); + var x2 = map.removeAndGetStatus(key); + var x3 = map.removeAndGetStatus(key); + return Stream.of(x1, x2, x3).toList(); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { - stpVer.expectNext(false, true, false).verifyComplete(); + Assertions.assertEquals(List.of(false, true, false), stpVer); } } @@ -478,39 +393,22 @@ public abstract class TestDictionaryMapDeep { @MethodSource("provideArgumentsPut") public void testAtPutValueRemoveAndGetStatus(UpdateMode updateMode, String key1, String key2, String value, boolean shouldFail) { - var stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> Flux - .concat( - map - .at(null, key1) - .flatMap(v -> v - .putValue(key2, "error?") - .then(v.removeAndGetStatus(key2)) - .doFinally(s -> v.close()) - ), - map - .at(null, key1) - .flatMap(v -> v - .putValue(key2, value) - .then(v.removeAndGetStatus(key2)) - .doFinally(s -> v.close()) - ), - map - .at(null, key1) - .flatMap(v -> v.removeAndGetStatus(key2) - .doFinally(s -> v.close()) - ) - ) - .doFinally(s -> map.close()) - ) - )); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); + var step1 = map.at(null, key1); + step1.putValue(key2, "error?"); + var x1 = step1.removeAndGetStatus(key2); + var step2 = map.at(null, key1); + step2.putValue(key2, value); + var x2 = step2.removeAndGetStatus(key2); + var step3 = map.at(null, key1); + var x3 = step3.removeAndGetStatus(key2); + return Stream.of(x1, x2, x3).toList(); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { - stpVer.expectNext(true, true, false).verifyComplete(); + Assertions.assertEquals(List.of(true, true, false), stpVer); } } @@ -523,39 +421,27 @@ public abstract class TestDictionaryMapDeep { if (updateMode != UpdateMode.ALLOW_UNSAFE && !isTestBadKeysEnabled()) { return; } - var stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> Flux - .concat( - map.updateValue(key, old -> { - assert old == null; - return new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error.")); - }), - map.updateValue(key, old -> { - assert Objects.equals(old, Map.of("error?", "error.")); - return new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error.")); - }), - map.updateValue(key, old -> { - assert Objects.equals(old, Map.of("error?", "error.")); - return new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error.")); - }), - map.updateValue(key, old -> { - assert Objects.equals(old, Map.of("error?", "error.")); - return value; - }), - map.updateValue(key, old -> { - assert Objects.equals(old, value); - return value; - }) - ) - .doFinally(s -> map.close()) - ) - )); - if (updateMode != UpdateMode.ALLOW_UNSAFE || shouldFail) { - stpVer.verifyError(); - } else { - stpVer.expectNext(true, false, false, true, false).verifyComplete(); + var stpVer = run(shouldFail || updateMode != UpdateMode.ALLOW_UNSAFE, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); + return Stream.of(map.updateValue(key, old -> { + assert old == null; + return new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error.")); + }), map.updateValue(key, old -> { + assert Objects.equals(old, Map.of("error?", "error.")); + return new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error.")); + }), map.updateValue(key, old -> { + assert Objects.equals(old, Map.of("error?", "error.")); + return new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error.")); + }), map.updateValue(key, old -> { + assert Objects.equals(old, Map.of("error?", "error.")); + return value; + }), map.updateValue(key, old -> { + assert Objects.equals(old, value); + return value; + })).toList(); + })); + if (updateMode == UpdateMode.ALLOW_UNSAFE && !shouldFail) { + Assertions.assertEquals(List.of(true, false, false, true, false), stpVer); } } @@ -565,43 +451,16 @@ public abstract class TestDictionaryMapDeep { if (updateMode == UpdateMode.DISALLOW && !isTestBadKeysEnabled()) { return; } - var stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> Flux - .concat( - map - .at(null, key1) - .flatMap(v -> v - .updateValue(key2, prev -> prev) - .doFinally(s -> v.close()) - ), - map - .at(null, key1) - .flatMap(v -> v - .updateValue(key2, prev -> value) - .doFinally(s -> v.close()) - ), - map - .at(null, key1) - .flatMap(v -> v - .updateValue(key2, prev -> value) - .doFinally(s -> v.close()) - ), - map - .at(null, key1) - .flatMap(v -> v - .updateValue(key2, prev -> null) - .doFinally(s -> v.close()) - ) - ) - .doFinally(s -> map.close()) - ) - )); - if (updateMode == UpdateMode.DISALLOW || shouldFail) { - stpVer.verifyError(); - } else { - stpVer.expectNext(false, true, false, true).verifyComplete(); + var stpVer = run(shouldFail || updateMode == UpdateMode.DISALLOW, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); + var x1 = map.at(null, key1).updateValue(key2, prev -> prev); + var x2 = map.at(null, key1).updateValue(key2, prev -> value); + var x3 = map.at(null, key1).updateValue(key2, prev -> value); + var x4 = map.at(null, key1).updateValue(key2, prev -> null); + return Stream.of(x1, x2, x3, x4).toList(); + })); + if (updateMode != UpdateMode.DISALLOW && !shouldFail) { + Assertions.assertEquals(List.of(false, true, false, true), stpVer); } } @@ -614,42 +473,43 @@ public abstract class TestDictionaryMapDeep { if (updateMode != UpdateMode.ALLOW_UNSAFE && !isTestBadKeysEnabled()) { return; } - var stpVer = StepVerifier.create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> Flux.concat( - map.updateValue(key, old -> { - assert old == null; - return new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error.")); - }).then(map.getValue(null, key)), - map.updateValue(key, old -> { - assert Objects.equals(old, Map.of("error?", "error.")); - return new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error.")); - }).then(map.getValue(null, key)), - map.updateValue(key, old -> { - assert Objects.equals(old, Map.of("error?", "error.")); - return new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error.")); - }).then(map.getValue(null, key)), - map.updateValue(key, old -> { - assert Objects.equals(old, new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error."))); - return value; - }).then(map.getValue(null, key)), - map.updateValue(key, old -> { - assert Objects.equals(old, value); - return value; - }).then(map.getValue(null, key)) - ).doFinally(s -> map.close())) - )); - if (updateMode != UpdateMode.ALLOW_UNSAFE || shouldFail) { - stpVer.verifyError(); - } else { - stpVer - .expectNext(new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error.")), - new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error.")), - new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error.")), - value, - value - ) - .verifyComplete(); + var stpVer = run(shouldFail || updateMode != UpdateMode.ALLOW_UNSAFE, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); + map.updateValue(key, old -> { + assert old == null; + return new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error.")); + }); + var x1 = map.getValue(null, key); + map.updateValue(key, old -> { + assert Objects.equals(old, Map.of("error?", "error.")); + return new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error.")); + }); + var x2 = map.getValue(null, key); + + map.updateValue(key, old -> { + assert Objects.equals(old, Map.of("error?", "error.")); + return new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error.")); + }); + var x3 = map.getValue(null, key); + map.updateValue(key, old -> { + assert Objects.equals(old, new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error."))); + return value; + }); + var x4 = map.getValue(null, key); + map.updateValue(key, old -> { + assert Objects.equals(old, value); + return value; + }); + var x5 = map.getValue(null, key); + return Stream.of(x1, x2, x3, x4, x5).toList(); + })); + if (updateMode == UpdateMode.ALLOW_UNSAFE && !shouldFail) { + Assertions.assertEquals(List.of(new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error.")), + new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error.")), + new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error.")), + value, + value + ), stpVer); } } @@ -659,51 +519,24 @@ public abstract class TestDictionaryMapDeep { if (updateMode == UpdateMode.DISALLOW && !isTestBadKeysEnabled()) { return; } - var stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> Flux - .concat( - map - .at(null, key1) - .flatMap(v -> v - .updateValue(key2, prev -> prev) - .then(v.getValue(null, key2)) - .defaultIfEmpty("empty") - .doFinally(s -> v.close()) - ), - map - .at(null, key1) - .flatMap(v -> v - .updateValue(key2, prev -> value) - .then(v.getValue(null, key2)) - .defaultIfEmpty("empty") - .doFinally(s -> v.close()) - ), - map - .at(null, key1) - .flatMap(v -> v - .updateValue(key2, prev -> value) - .then(v.getValue(null, key2)) - .defaultIfEmpty("empty") - .doFinally(s -> v.close()) - ), - map - .at(null, key1) - .flatMap(v -> v - .updateValue(key2, prev -> null) - .then(v.getValue(null, key2)) - .defaultIfEmpty("empty") - .doFinally(s -> v.close()) - ) - ) - .doFinally(s -> map.close()) - ) - )); - if (updateMode == UpdateMode.DISALLOW || shouldFail) { - stpVer.verifyError(); - } else { - stpVer.expectNext("empty", value, value, "empty").verifyComplete(); + var stpVer = run(shouldFail || updateMode == UpdateMode.DISALLOW, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); + var stage1 = map.at(null, key1); + stage1.updateValue(key2, prev -> prev); + var x1 = Objects.requireNonNullElse(stage1.getValue(null, key2), "empty"); + var stage2 = map.at(null, key1); + stage2.updateValue(key2, prev -> value); + var x2 = Objects.requireNonNullElse(stage2.getValue(null, key2), "empty"); + var stage3 = map.at(null, key1); + stage3.updateValue(key2, prev -> value); + var x3 = Objects.requireNonNullElse(stage3.getValue(null, key2), "empty"); + var stage4 = map.at(null, key1); + stage4.updateValue(key2, prev -> null); + var x4 = Objects.requireNonNullElse(stage4.getValue(null, key2), "empty"); + return Stream.of(x1, x2, x3, x4).toList(); + })); + if (updateMode != UpdateMode.DISALLOW && !shouldFail) { + Assertions.assertEquals(List.of("empty", value, value, "empty"), stpVer); } } @@ -713,25 +546,19 @@ public abstract class TestDictionaryMapDeep { String key, Object2ObjectSortedMap value, boolean shouldFail) { - var stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> Flux - .concat( - map.putValueAndGetChanged(key, new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error."))).single(), - map.putValueAndGetChanged(key, value).single(), - map.putValueAndGetChanged(key, value).single(), - map.remove(key), - map.putValueAndGetChanged(key, new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error."))).single() - ) - .doFinally(s -> map.close()) - ) - )); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); + var x1 = map.putValueAndGetChanged(key, new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error."))); + var x2 = map.putValueAndGetChanged(key, value); + var x3 = map.putValueAndGetChanged(key, value); + map.remove(key); + var x4 = map.putValueAndGetChanged(key, new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error."))); + return List.of(x1, x2, x3, x4); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { - stpVer.expectNext(true, true, false, true).verifyComplete(); + Assertions.assertEquals(List.of(true, true, false, true), stpVer); } } @@ -753,15 +580,14 @@ public abstract class TestDictionaryMapDeep { return keys .stream() - .map(keyTuple -> keyTuple.mapT1(ks -> Flux - .zip(Flux.fromIterable(ks), Flux.fromIterable(values)) - .collectMap(Tuple2::getT1, Tuple2::getT2, Object2ObjectLinkedOpenHashMap::new) - .block() + .map(keyTuple -> new Tuple2<>(Streams + .zip(keyTuple.getT1().stream(), values.stream(), Tuple2::new) + .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2, (a, b) -> a, Object2ObjectLinkedOpenHashMap::new)), + keyTuple.getT2() )) - .flatMap(entryTuple -> Arrays.stream(UpdateMode.values()).map(updateMode -> new Tuple2<>(updateMode, - entryTuple.getT1(), - entryTuple.getT2() - ))) + .flatMap(entryTuple -> Arrays + .stream(UpdateMode.values()) + .map(updateMode -> new Tuple3<>(updateMode, entryTuple.getT1(), entryTuple.getT2()))) .map(fullTuple -> Arguments.of(fullTuple.getT1(), fullTuple.getT2(), fullTuple.getT3())); } @@ -770,27 +596,22 @@ public abstract class TestDictionaryMapDeep { public void testSetMultiGetMulti(UpdateMode updateMode, Map> entries, boolean shouldFail) { - var flux = tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> { - var entriesFlux = Flux.fromIterable(entries.entrySet()); - var keysFlux = entriesFlux.map(Entry::getKey); - var resultsFlux = Flux - .concat( - map.putMulti(entriesFlux).then(Mono.empty()), - map.getMulti(null, keysFlux) - ); - return Flux.zip(keysFlux, resultsFlux, Map::entry).doFinally(s -> map.close()); - }) - .filter(k -> k.getValue().isPresent()) - .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) - ); + var flux = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); + var entriesFlux = entries.entrySet(); + var keysFlux = entriesFlux.stream().map(Entry::getKey).toList(); + map.putMulti(entriesFlux.stream()); + var resultsFlux = map.getMulti(null, keysFlux.stream()); + return Streams + .zip(keysFlux.stream(), resultsFlux, Map::entry) + .filter(k -> k.getValue().isPresent()) + .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) + .toList(); + })); if (shouldFail) { this.checkLeaks = false; - StepVerifier.create(flux).verifyError(); } else { - var elements = flux.collect(Collectors.toList()).block(); - assertThat(elements).containsExactlyInAnyOrderElementsOf(entries.entrySet()); + Assertions.assertEquals(entries.entrySet().stream().toList(), flux); } } @@ -799,122 +620,103 @@ public abstract class TestDictionaryMapDeep { public void testSetAllValuesGetMulti(UpdateMode updateMode, Object2ObjectSortedMap> entries, boolean shouldFail) { - var remainingEntries = new ConcurrentHashMap>, Boolean>().keySet(true); - Step>> stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> { - var entriesFlux = Flux.fromIterable(entries.entrySet()); - var keysFlux = entriesFlux.map(Entry::getKey); - var resultsFlux = map - .setAllValues(Flux.fromIterable(entries.entrySet())) - .thenMany(map.getMulti(null, Flux.fromIterable(entries.keySet()))); - return Flux.zip(keysFlux, resultsFlux, Map::entry).doFinally(s -> map.close()); - }) + var remainingEntries = new ArrayList>>(); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); + var entriesFlux = entries.entrySet(); + var keysFlux = entriesFlux.stream().map(Entry::getKey).toList(); + map.setAllValues(entries.entrySet().stream()); + try (var resultsFlux = map.getMulti(null, entries.keySet().stream())) { + return Streams + .zip(keysFlux.stream(), resultsFlux, Map::entry) .filter(k -> k.getValue().isPresent()) .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) - )); + .toList(); + } + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); - for (Entry> ignored : remainingEntries) { - stpVer = stpVer.expectNextMatches(remainingEntries::remove); - } - stpVer.verifyComplete(); + Assertions.assertEquals(remainingEntries, stpVer); } } @ParameterizedTest @MethodSource("provideArgumentsSetMulti") - public void testSetAllValuesAndGetPrevious(UpdateMode updateMode, Object2ObjectSortedMap> entries, + public void testSetAllValuesAndGetPrevious(UpdateMode updateMode, + Object2ObjectSortedMap> entries, boolean shouldFail) { - var remainingEntries = new ConcurrentHashMap>, Boolean>().keySet(true); - Step>> stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> Flux - .concat( - map.setAllValuesAndGetPrevious(Flux.fromIterable(entries.entrySet())), - map.setAllValuesAndGetPrevious(Flux.fromIterable(entries.entrySet())) - ) - .doFinally(s -> map.close()) - ) - )); + var remainingEntries = new ArrayList>>(); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); + List>> a1; + try (var stream1 = map.setAllValuesAndGetPrevious(entries.entrySet().stream())) { + a1 = stream1.toList(); + } + List>> a2; + try (var stream2 = map.setAllValuesAndGetPrevious(entries.entrySet().stream())) { + a2 = stream2.toList(); + } + return List.of(a1, a2); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); - for (Entry> ignored : remainingEntries) { - stpVer = stpVer.expectNextMatches(remainingEntries::remove); - } - stpVer.verifyComplete(); + Assertions.assertEquals( + List.of(List.of(), remainingEntries), + stpVer + ); } } @ParameterizedTest @MethodSource("provideArgumentsSetMulti") public void testSetGetMulti(UpdateMode updateMode, Object2ObjectSortedMap> entries, boolean shouldFail) { - var remainingEntries = new ConcurrentHashMap>, Boolean>().keySet(true); - Step>> stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> { - var entriesFlux = Flux.fromIterable(entries.entrySet()); - var keysFlux = entriesFlux.map(Entry::getKey); - var resultsFlux = Flux - .concat( - map.set(entries).then(Mono.empty()), - map.getMulti(null, Flux.fromIterable(entries.keySet())) - ); - return Flux.zip(keysFlux, resultsFlux, Map::entry).doFinally(s -> map.close()); - }) - .filter(k -> k.getValue().isPresent()) - .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) - )); + var remainingEntries = new ArrayList>>(); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); + var entriesFlux = entries.entrySet(); + var keysFlux = entriesFlux.stream().map(Entry::getKey).toList(); + map.set(entries); + List>> results; + try (var stream = map.getMulti(null, entries.keySet().stream())) { + results = stream.toList(); + } + return Streams + .zip(keysFlux.stream(), results.stream(), Map::entry) + .filter(k -> k.getValue().isPresent()) + .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) + .toList(); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); - for (Entry> ignored : remainingEntries) { - stpVer = stpVer.expectNextMatches(remainingEntries::remove); - } - stpVer.verifyComplete(); + Assertions.assertEquals(remainingEntries, stpVer); } } @ParameterizedTest @MethodSource("provideArgumentsSetMulti") - public void testSetAndGetStatus(UpdateMode updateMode, Object2ObjectSortedMap> entries, + public void testSetAndGetStatus(UpdateMode updateMode, + Object2ObjectSortedMap> entries, boolean shouldFail) { - Step stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> { - Mono removalMono; - if (entries.isEmpty()) { - removalMono = Mono.empty(); - } else { - removalMono = map.remove(entries.keySet().stream().findAny().orElseThrow()); - } - return Flux - .concat( - map.setAndGetChanged(entries).single(), - map.setAndGetChanged(entries).single(), - removalMono.then(Mono.empty()), - map.setAndGetChanged(entries).single() - ) - .doFinally(s -> map.close()); - }) - )); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); + var x1 = map.setAndGetChanged(entries); + var x2 = map.setAndGetChanged(entries); + if (!entries.isEmpty()) { + map.remove(entries.keySet().stream().findAny().orElseThrow()); + } + var x3 = map.setAndGetChanged(entries); + return List.of(x1, x2, x3); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { - stpVer.expectNext(!entries.isEmpty(), false, !entries.isEmpty()).verifyComplete(); + Assertions.assertEquals(List.of(!entries.isEmpty(), false, !entries.isEmpty()), stpVer); } } @@ -922,29 +724,18 @@ public abstract class TestDictionaryMapDeep { @MethodSource("provideArgumentsSetMulti") public void testSetAndGetPrevious(UpdateMode updateMode, Object2ObjectSortedMap> entries, boolean shouldFail) { - var remainingEntries = new ConcurrentHashMap>, Boolean>().keySet(true); - Step>> stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> Flux - .concat( - map.setAndGetPrevious(entries), - map.setAndGetPrevious(entries) - ) - .map(Map::entrySet) - .concatMapIterable(list -> list) - .doFinally(s -> map.close()) - ) - )); + var remainingEntries = new ArrayList>>(); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); + var x1 = map.setAndGetPrevious(entries); + var x2 = map.setAndGetPrevious(entries); + return Stream.of(x1, x2).map(x -> x != null ? x.entrySet().stream().toList() : null).toList(); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); - for (Entry> ignored : remainingEntries) { - stpVer = stpVer.expectNextMatches(remainingEntries::remove); - } - stpVer.verifyComplete(); + Assertions.assertEquals(Arrays.asList(null, remainingEntries.isEmpty() ? null : remainingEntries), stpVer); } } @@ -952,83 +743,57 @@ public abstract class TestDictionaryMapDeep { @MethodSource("provideArgumentsSetMulti") public void testSetClearAndGetPreviousGet(UpdateMode updateMode, Object2ObjectSortedMap> entries, boolean shouldFail) { - var remainingEntries = new ConcurrentHashMap>, Boolean>().keySet(true); - Step>> stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> Flux - .concat(map.set(entries).then(Mono.empty()), map.clearAndGetPrevious(), map.get(null)) - .map(Map::entrySet) - .concatMapIterable(list -> list) - .doFinally(s -> map.close()) - ) - )); + var remainingEntries = new ArrayList>>(); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); + map.set(entries); + var prev = map.clearAndGetPrevious(); + var curr = map.get(null); + return Stream.of(prev, curr).map(x -> x != null ? x.entrySet().stream().toList() : null).toList(); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); - for (Entry> ignored : remainingEntries) { - stpVer = stpVer.expectNextMatches(remainingEntries::remove); - } - stpVer.verifyComplete(); + Assertions.assertEquals(Arrays.asList(remainingEntries.isEmpty() ? null : remainingEntries, null), stpVer); } } @ParameterizedTest @MethodSource("provideArgumentsSetMulti") - public void testSetMultiGetAllValues(UpdateMode updateMode, Object2ObjectSortedMap> entries, + public void testSetMultiGetAllValues(UpdateMode updateMode, + Object2ObjectSortedMap> entries, boolean shouldFail) { - var remainingEntries = new ConcurrentHashMap>, Boolean>().keySet(true); - Step>> stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> Flux - .concat( - map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), - map.getAllValues(null, false) - ) - .doFinally(s -> map.close()) - ) - )); + var remainingEntries = new ArrayList>>(); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); + map.putMulti(entries.entrySet().stream()); + try (var values = map.getAllValues(null, false)) { + return values.toList(); + } + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); - for (Entry> ignored : remainingEntries) { - stpVer = stpVer.expectNextMatches(remainingEntries::remove); - } - stpVer.verifyComplete(); + Assertions.assertEquals(remainingEntries, stpVer); } } @ParameterizedTest @MethodSource("provideArgumentsSetMulti") public void testSetMultiGet(UpdateMode updateMode, Object2ObjectSortedMap> entries, boolean shouldFail) { - var remainingEntries = new ConcurrentHashMap>, Boolean>().keySet(true); - Step>> stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> Flux - .concat( - map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), - map.get(null) - .map(Map::entrySet) - .flatMapIterable(list -> list) - ) - .doFinally(s -> map.close()) - ) - )); + var remainingEntries = new ArrayList>>(); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); + map.putMulti(entries.entrySet().stream()); + return map.get(null); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); - for (Entry> ignored : remainingEntries) { - stpVer = stpVer.expectNextMatches(remainingEntries::remove); - } - stpVer.verifyComplete(); + Assertions.assertEquals(remainingEntries.isEmpty() ? null : remainingEntries, stpVer == null ? null : stpVer.entrySet().stream().toList()); } } @@ -1036,34 +801,21 @@ public abstract class TestDictionaryMapDeep { @MethodSource("provideArgumentsSetMulti") public void testSetMultiGetAllStagesGet(UpdateMode updateMode, Object2ObjectSortedMap> entries, boolean shouldFail) { - var remainingEntries = new ConcurrentHashMap>, Boolean>().keySet(true); - Step>> stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> Flux - .concat( - map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), - map - .getAllStages(null, false) - .flatMap(stage -> stage - .getValue() - .get(null) - .map(val -> Map.entry(stage.getKey(), val)) - .doFinally(s -> stage.getValue().close()) - ) - ) - .doFinally(s -> map.close()) - ) - )); + var remainingEntries = new ArrayList>>(); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); + map.putMulti(entries.entrySet().stream()); + return map.getAllStages(null, false).map(stage -> { + var v = stage.getValue().get(null); + if (v == null) return null; + return Map.entry(stage.getKey(), v); + }).filter(Objects::nonNull).toList(); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); - for (Entry> ignored : remainingEntries) { - stpVer = stpVer.expectNextMatches(remainingEntries::remove); - } - stpVer.verifyComplete(); + Assertions.assertEquals(remainingEntries, stpVer); } } @@ -1071,50 +823,37 @@ public abstract class TestDictionaryMapDeep { @MethodSource("provideArgumentsSetMulti") public void testSetMultiIsEmpty(UpdateMode updateMode, Object2ObjectSortedMap> entries, boolean shouldFail) { - Step stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> Flux - .concat( - map.isEmpty(null), - map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), - map.isEmpty(null) - ) - .doFinally(s -> map.close()) - ) - )); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); + var x1 = map.isEmpty(null); + map.putMulti(entries.entrySet().stream()); + var x2 = map.isEmpty(null); + return List.of(x1, x2); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.expectNext(true).verifyError(); } else { - stpVer.expectNext(true, entries.isEmpty()).verifyComplete(); + Assertions.assertEquals(List.of(true, entries.isEmpty()), stpVer); } } @ParameterizedTest @MethodSource("provideArgumentsSetMulti") public void testSetMultiClear(UpdateMode updateMode, Object2ObjectSortedMap> entries, boolean shouldFail) { - Step stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> Flux - .concat( - map.isEmpty(null), - map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), - map.isEmpty(null), - map.clear().then(Mono.empty()), - map.isEmpty(null) - ) - .doFinally(s -> map.close()) - ) - )); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); + + var x1 = map.isEmpty(null); + map.putMulti(entries.entrySet().stream()); + var x2 = map.isEmpty(null); + map.clear(); + var x3 = map.isEmpty(null); + return List.of(x1, x2, x3); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.expectNext(true).verifyError(); } else { - stpVer.expectNext(true, entries.isEmpty(), true).verifyComplete(); + Assertions.assertEquals(List.of(true, entries.isEmpty(), true), stpVer); } } - - */ } diff --git a/src/test/java/it/cavallium/dbengine/tests/TestDictionaryMapDeepHashMap.java b/src/test/java/it/cavallium/dbengine/tests/TestDictionaryMapDeepHashMap.java index 26106b3..e699f16 100644 --- a/src/test/java/it/cavallium/dbengine/tests/TestDictionaryMapDeepHashMap.java +++ b/src/test/java/it/cavallium/dbengine/tests/TestDictionaryMapDeepHashMap.java @@ -3,15 +3,22 @@ package it.cavallium.dbengine.tests; import static it.cavallium.dbengine.tests.DbTestUtils.BIG_STRING; import static it.cavallium.dbengine.tests.DbTestUtils.ensureNoLeaks; import static it.cavallium.dbengine.tests.DbTestUtils.isCIMode; +import static it.cavallium.dbengine.tests.DbTestUtils.run; +import static it.cavallium.dbengine.tests.DbTestUtils.tempDatabaseMapDictionaryDeepMapHashMap; +import static it.cavallium.dbengine.tests.DbTestUtils.tempDb; import static it.cavallium.dbengine.tests.DbTestUtils.tempDictionary; import com.google.common.collect.Streams; import it.cavallium.dbengine.database.UpdateMode; import java.util.List; +import java.util.Map.Entry; import java.util.stream.Stream; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; public abstract class TestDictionaryMapDeepHashMap { private boolean checkLeaks = true; @@ -90,32 +97,24 @@ public abstract class TestDictionaryMapDeepHashMap { } } - /* @ParameterizedTest @MethodSource("provideArgumentsPut") public void testAtPutValueGetAllValues(UpdateMode updateMode, String key1, String key2, String value, boolean shouldFail) { - var stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMapHashMap(dict, 5)) - .flatMapMany(map -> map - .at(null, key1).flatMap(v -> v.putValue(key2, value).doFinally(s -> v.close())) - .thenMany(map - .getAllValues(null, false) - .map(Entry::getValue) - .flatMap(maps -> Flux.fromIterable(maps.entrySet())) - .map(Entry::getValue) - ) - .doFinally(s -> map.close()) - ) - )); + var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { + var map = tempDatabaseMapDictionaryDeepMapHashMap(tempDictionary(db, updateMode), 5); + map.at(null, key1).putValue(key2, value); + return map + .getAllValues(null, false) + .map(Entry::getValue) + .flatMap(maps -> maps.entrySet().stream()) + .map(Entry::getValue) + .toList(); + })); if (shouldFail) { this.checkLeaks = false; - stpVer.verifyError(); } else { - stpVer.expectNext(value).verifyComplete(); + Assertions.assertEquals(List.of(value), stpVer); } } - */ - }