diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 9405757..fd10eff 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -492,10 +492,9 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> setAllValuesAndGetPrevious(Flux> entries) { - return Flux.usingWhen( - Mono.just(true), - b -> getAllValues(null), - b -> dictionary.setRange(rangeMono, entries.handle((entry, sink) -> { + return Flux.concat( + this.getAllValues(null), + dictionary.setRange(rangeMono, entries.handle((entry, sink) -> { try { ByteBuf serializedKey = toKey(serializeSuffix(entry.getKey())); try { @@ -511,7 +510,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep keysWindowFlux .collectList() .flatMap(entriesList -> Mono diff --git a/src/test/java/it/cavallium/dbengine/DbTestUtils.java b/src/test/java/it/cavallium/dbengine/DbTestUtils.java index 5934e51..bf29675 100644 --- a/src/test/java/it/cavallium/dbengine/DbTestUtils.java +++ b/src/test/java/it/cavallium/dbengine/DbTestUtils.java @@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufUtil; import io.netty.buffer.PoolArenaMetric; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator; @@ -40,28 +41,19 @@ import reactor.core.scheduler.Schedulers; public class DbTestUtils { - private volatile static ByteBufAllocator POOLED_ALLOCATOR = null; + public static record TestAllocator(ByteBufAllocator allocator) {} - public static synchronized ByteBufAllocator getUncachedAllocator() { - try { - ensureNoLeaks(POOLED_ALLOCATOR); - } catch (Throwable ex) { - POOLED_ALLOCATOR = null; - } - if (POOLED_ALLOCATOR == null) { - POOLED_ALLOCATOR = new PooledByteBufAllocator(false, 1, 0, 8192, 11, 0, 0, true); - } - return POOLED_ALLOCATOR; + public static TestAllocator newAllocator() { + return new TestAllocator(new PooledByteBufAllocator(false, 1, 0, 4096, 11, 0, 0, true)); } - public static synchronized ByteBufAllocator getUncachedAllocatorUnsafe() { - return POOLED_ALLOCATOR; + public static void destroyAllocator(TestAllocator testAllocator) { } public static final AtomicInteger dbId = new AtomicInteger(0); @SuppressWarnings("SameParameterValue") - private static int getActiveBuffers(ByteBufAllocator allocator) { + private static int getActiveBuffers(ByteBufAllocator allocator, boolean printStats) { int directActive = 0, directAlloc = 0, directDealloc = 0; if (allocator instanceof PooledByteBufAllocator alloc) { for (PoolArenaMetric arena : alloc.directArenas()) { @@ -74,12 +66,14 @@ public class DbTestUtils { } else { throw new UnsupportedOperationException(); } - System.out.println("directActive " + directActive + " directAlloc " + directAlloc + " directDealloc " + directDealloc); + if (printStats) { + System.out.println("directActive " + directActive + " directAlloc " + directAlloc + " directDealloc " + directDealloc); + } return directActive; } @SuppressWarnings("SameParameterValue") - private static int getActiveHeapBuffers(ByteBufAllocator allocator) { + private static int getActiveHeapBuffers(ByteBufAllocator allocator, boolean printStats) { int heapActive = 0, heapAlloc = 0, heapDealloc = 0; if (allocator instanceof PooledByteBufAllocator alloc) { for (PoolArenaMetric arena : alloc.heapArenas()) { @@ -92,24 +86,25 @@ public class DbTestUtils { } else { throw new UnsupportedOperationException(); } - System.out.println("heapActive " + heapActive + " heapAlloc " + heapAlloc + " heapDealloc " + heapDealloc); + if (printStats) { + System.out.println("heapActive " + heapActive + " heapAlloc " + heapAlloc + " heapDealloc " + heapDealloc); + } return heapActive; } - public static Flux tempDb(Function> action) { - return Flux.usingWhen(openTempDb(), + public static Flux tempDb(TestAllocator alloc, Function> action) { + return Flux.usingWhen(openTempDb(alloc), tempDb -> action.apply(tempDb.db()), DbTestUtils::closeTempDb ); } - public static record TempDb(ByteBufAllocator allocator, LLDatabaseConnection connection, LLKeyValueDatabase db, + public static record TempDb(TestAllocator allocator, LLDatabaseConnection connection, LLKeyValueDatabase db, Path path) {} - public static Mono openTempDb() { + public static Mono openTempDb(TestAllocator alloc) { return Mono.defer(() -> { var wrkspcPath = Path.of("/tmp/.cache/tempdb-" + dbId.incrementAndGet() + "/"); - var alloc = getUncachedAllocator(); return Mono .fromCallable(() -> { if (Files.exists(wrkspcPath)) { @@ -125,7 +120,7 @@ public class DbTestUtils { return null; }) .subscribeOn(Schedulers.boundedElastic()) - .then(new LLLocalDatabaseConnection(alloc, wrkspcPath).connect()) + .then(new LLLocalDatabaseConnection(alloc.allocator(), wrkspcPath).connect()) .flatMap(conn -> conn .getDatabase("testdb", List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")), @@ -138,8 +133,8 @@ public class DbTestUtils { public static Mono closeTempDb(TempDb tempDb) { return tempDb.db().close().then(tempDb.connection().disconnect()).then(Mono.fromCallable(() -> { - ensureNoLeaks(tempDb.allocator()); - if (tempDb.allocator() instanceof PooledByteBufAllocator pooledByteBufAllocator) { + ensureNoLeaks(tempDb.allocator().allocator(), false); + if (tempDb.allocator().allocator() instanceof PooledByteBufAllocator pooledByteBufAllocator) { pooledByteBufAllocator.trimCurrentThreadCache(); pooledByteBufAllocator.freeThreadLocalCache(); } @@ -156,10 +151,10 @@ public class DbTestUtils { }).subscribeOn(Schedulers.boundedElastic())).then(); } - public static void ensureNoLeaks(ByteBufAllocator allocator) { + public static void ensureNoLeaks(ByteBufAllocator allocator, boolean printStats) { if (allocator != null) { - assertEquals(0, getActiveBuffers(allocator)); - assertEquals(0, getActiveHeapBuffers(allocator)); + assertEquals(0, getActiveBuffers(allocator, printStats)); + assertEquals(0, getActiveHeapBuffers(allocator, printStats)); } } diff --git a/src/test/java/it/cavallium/dbengine/TestDictionary.java b/src/test/java/it/cavallium/dbengine/TestDictionary.java index 99f1002..fcaa056 100644 --- a/src/test/java/it/cavallium/dbengine/TestDictionary.java +++ b/src/test/java/it/cavallium/dbengine/TestDictionary.java @@ -1,11 +1,12 @@ package it.cavallium.dbengine; +import static it.cavallium.dbengine.DbTestUtils.destroyAllocator; import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks; -import static it.cavallium.dbengine.DbTestUtils.getUncachedAllocator; -import static it.cavallium.dbengine.DbTestUtils.getUncachedAllocatorUnsafe; +import static it.cavallium.dbengine.DbTestUtils.newAllocator; import static it.cavallium.dbengine.DbTestUtils.tempDb; import static it.cavallium.dbengine.DbTestUtils.tempDictionary; +import it.cavallium.dbengine.DbTestUtils.TestAllocator; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.UpdateMode; import java.util.Arrays; @@ -19,25 +20,29 @@ import reactor.test.StepVerifier; public class TestDictionary { + private TestAllocator allocator; + private static Stream provideArgumentsCreate() { return Arrays.stream(UpdateMode.values()).map(Arguments::of); } @BeforeEach public void beforeEach() { - ensureNoLeaks(getUncachedAllocator()); + this.allocator = newAllocator(); + ensureNoLeaks(allocator.allocator(), false); } @AfterEach public void afterEach() { - ensureNoLeaks(getUncachedAllocatorUnsafe()); + ensureNoLeaks(allocator.allocator(), true); + destroyAllocator(allocator); } @ParameterizedTest @MethodSource("provideArgumentsCreate") public void testCreate(UpdateMode updateMode) { StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .flatMap(LLDictionary::clear) .then() )) diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java index 2b71580..e8f28c2 100644 --- a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java +++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java @@ -28,6 +28,8 @@ import reactor.util.function.Tuples; public class TestDictionaryMap { + private TestAllocator allocator; + private static boolean isTestBadKeysEnabled() { return System.getProperty("badkeys", "true").equalsIgnoreCase("true"); } @@ -79,19 +81,21 @@ public class TestDictionaryMap { @BeforeEach public void beforeEach() { - ensureNoLeaks(getUncachedAllocator()); + this.allocator = newAllocator(); + ensureNoLeaks(allocator.allocator(), false); } @AfterEach public void afterEach() { - ensureNoLeaks(getUncachedAllocatorUnsafe()); + ensureNoLeaks(allocator.allocator(), true); + destroyAllocator(allocator); } @ParameterizedTest @MethodSource("provideArgumentsPut") public void testPut(DbType dbType, UpdateMode updateMode, String key, String value, boolean shouldFail) { var stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, dbType, 5)) .flatMap(map -> map .putValue(key, value) @@ -110,7 +114,7 @@ public class TestDictionaryMap { @MethodSource("provideArgumentsPut") public void testAtSetAtGet(DbType dbType, UpdateMode updateMode, String key, String value, boolean shouldFail) { var stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, dbType, 5)) .flatMap(map -> map .at(null, key).flatMap(v -> v.set(value).doAfterTerminate(v::release)) @@ -129,7 +133,7 @@ public class TestDictionaryMap { @MethodSource("provideArgumentsPut") public void testPutAndGetPrevious(DbType dbType, UpdateMode updateMode, String key, String value, boolean shouldFail) { var stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, dbType, 5)) .flatMapMany(map -> Flux .concat( @@ -151,7 +155,7 @@ public class TestDictionaryMap { @MethodSource("provideArgumentsPut") public void testPutValueRemoveAndGetPrevious(DbType dbType, UpdateMode updateMode, String key, String value, boolean shouldFail) { var stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, dbType, 5)) .flatMapMany(map -> Flux .concat( @@ -173,7 +177,7 @@ public class TestDictionaryMap { @MethodSource("provideArgumentsPut") public void testPutValueRemoveAndGetStatus(DbType dbType, UpdateMode updateMode, String key, String value, boolean shouldFail) { var stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, dbType, 5)) .flatMapMany(map -> Flux .concat( @@ -198,7 +202,7 @@ public class TestDictionaryMap { return; } var stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, dbType, 5)) .flatMapMany(map -> Flux .concat( @@ -240,7 +244,7 @@ public class TestDictionaryMap { return; } var stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, dbType, 5)) .flatMapMany(map -> Flux .concat( @@ -279,7 +283,7 @@ public class TestDictionaryMap { @MethodSource("provideArgumentsPut") public void testPutAndGetChanged(DbType dbType, UpdateMode updateMode, String key, String value, boolean shouldFail) { var stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, dbType, 5)) .flatMapMany(map -> Flux .concat( @@ -340,7 +344,7 @@ public class TestDictionaryMap { public void testPutMultiGetMulti(DbType dbType, UpdateMode updateMode, Map entries, boolean shouldFail) { var remainingEntries = new ConcurrentHashMap, Boolean>().keySet(true); Step> stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, dbType, 5)) .flatMapMany(map -> Flux .concat( @@ -369,7 +373,7 @@ public class TestDictionaryMap { public void testSetAllValuesGetMulti(DbType dbType, UpdateMode updateMode, Map entries, boolean shouldFail) { var remainingEntries = new ConcurrentHashMap, Boolean>().keySet(true); Step> stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, dbType, 5)) .flatMapMany(map -> map .setAllValues(Flux.fromIterable(entries.entrySet())) @@ -395,7 +399,7 @@ public class TestDictionaryMap { public void testSetAllValuesAndGetPrevious(DbType dbType, UpdateMode updateMode, Map entries, boolean shouldFail) { var remainingEntries = new ConcurrentHashMap, Boolean>().keySet(true); Step> stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, dbType, 5)) .flatMapMany(map -> Flux .concat( @@ -422,7 +426,7 @@ public class TestDictionaryMap { public void testSetGetMulti(DbType dbType, UpdateMode updateMode, Map entries, boolean shouldFail) { var remainingEntries = new ConcurrentHashMap, Boolean>().keySet(true); Step> stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, dbType, 5)) .flatMapMany(map -> Flux .concat( @@ -452,7 +456,7 @@ public class TestDictionaryMap { Map entries, boolean shouldFail) { Step stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, dbType, 5)) .flatMapMany(map -> { Mono removalMono; @@ -483,7 +487,7 @@ public class TestDictionaryMap { public void testSetAndGetPrevious(DbType dbType, UpdateMode updateMode, Map entries, boolean shouldFail) { var remainingEntries = new ConcurrentHashMap, Boolean>().keySet(true); Step> stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, dbType, 5)) .flatMapMany(map -> Flux .concat(map.setAndGetPrevious(entries), map.setAndGetPrevious(entries)) @@ -508,7 +512,7 @@ public class TestDictionaryMap { public void testSetClearAndGetPreviousGet(DbType dbType, UpdateMode updateMode, Map entries, boolean shouldFail) { var remainingEntries = new ConcurrentHashMap, Boolean>().keySet(true); Step> stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, dbType, 5)) .flatMapMany(map -> Flux .concat(map.set(entries).then(Mono.empty()), map.clearAndGetPrevious(), map.get(null)) @@ -533,7 +537,7 @@ public class TestDictionaryMap { public void testPutMultiGetAllValues(DbType dbType, UpdateMode updateMode, Map entries, boolean shouldFail) { var remainingEntries = new ConcurrentHashMap, Boolean>().keySet(true); Step> stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, dbType, 5)) .flatMapMany(map -> Flux .concat( @@ -560,7 +564,7 @@ public class TestDictionaryMap { public void testPutMultiGet(DbType dbType, UpdateMode updateMode, Map entries, boolean shouldFail) { var remainingEntries = new ConcurrentHashMap, Boolean>().keySet(true); Step> stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, dbType, 5)) .flatMapMany(map -> Flux .concat( @@ -589,7 +593,7 @@ public class TestDictionaryMap { public void testPutMultiGetAllStagesGet(DbType dbType, UpdateMode updateMode, Map entries, boolean shouldFail) { var remainingEntries = new ConcurrentHashMap, Boolean>().keySet(true); Step> stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, dbType, 5)) .flatMapMany(map -> Flux .concat( @@ -623,7 +627,7 @@ public class TestDictionaryMap { public void testPutMultiIsEmpty(DbType dbType, UpdateMode updateMode, Map entries, boolean shouldFail) { var remainingEntries = new ConcurrentHashMap, Boolean>().keySet(true); Step stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, dbType, 5)) .flatMapMany(map -> Flux .concat( @@ -647,7 +651,7 @@ public class TestDictionaryMap { @MethodSource("provideArgumentsPutMulti") public void testPutMultiClear(DbType dbType, UpdateMode updateMode, Map entries, boolean shouldFail) { Step stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, dbType, 5)) .flatMapMany(map -> Flux .concat( diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java index 725c496..b6216dc 100644 --- a/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java +++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java @@ -1,15 +1,17 @@ package it.cavallium.dbengine; import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks; -import static it.cavallium.dbengine.DbTestUtils.getUncachedAllocator; -import static it.cavallium.dbengine.DbTestUtils.getUncachedAllocatorUnsafe; +import static it.cavallium.dbengine.DbTestUtils.newAllocator; +import static it.cavallium.dbengine.DbTestUtils.destroyAllocator; import static it.cavallium.dbengine.DbTestUtils.tempDatabaseMapDictionaryDeepMap; import static it.cavallium.dbengine.DbTestUtils.tempDb; import static it.cavallium.dbengine.DbTestUtils.tempDictionary; +import it.cavallium.dbengine.DbTestUtils.TestAllocator; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; @@ -19,6 +21,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.TestMethodOrder; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -32,8 +36,11 @@ import reactor.util.function.Tuple3; import reactor.util.function.Tuple4; import reactor.util.function.Tuples; +@TestMethodOrder(MethodOrderer.MethodName.class) public class TestDictionaryMapDeep { + private TestAllocator allocator; + private static boolean isTestBadKeysEnabled() { return System.getProperty("badkeys", "true").equalsIgnoreCase("true"); } @@ -77,45 +84,45 @@ public class TestDictionaryMapDeep { } private static Stream provideArgumentsPut() { - var goodKeys1 = Set.of("12345", "zebra"); - Set badKeys1; + var goodKeys1 = List.of("12345", "zebra"); + List badKeys1; if (isTestBadKeysEnabled()) { - badKeys1 = Set.of("", "a", "aaaa", "aaaaaa"); + badKeys1 = List.of("", "a", "aaaa", "aaaaaa"); } else { - badKeys1 = Set.of(); + badKeys1 = List.of(); } - var goodKeys2 = Set.of("123456", "anatra"); - Set badKeys2; + var goodKeys2 = List.of("123456", "anatra"); + List badKeys2; if (isTestBadKeysEnabled()) { - badKeys2 = Set.of("", "a", "aaaaa", "aaaaaaa"); + badKeys2 = List.of("", "a", "aaaaa", "aaaaaaa"); } else { - badKeys2 = Set.of(); + badKeys2 = List.of(); } - var values = Set.of("a", "", "\0", "\0\0", "z", "azzszgzczqz", BIG_STRING); + var values = List.of("a", "", "\0", "\0\0", "z", "azzszgzczqz", BIG_STRING); Flux> failOnKeys1 = Flux .fromIterable(badKeys1) .map(badKey1 -> Tuples.of( badKey1, - goodKeys2.stream().findAny().orElseThrow(), - values.stream().findAny().orElseThrow(), + goodKeys2.stream().findFirst().orElseThrow(), + values.stream().findFirst().orElseThrow(), true )); Flux> failOnKeys2 = Flux .fromIterable(badKeys2) .map(badKey2 -> Tuples.of( - goodKeys1.stream().findAny().orElseThrow(), + goodKeys1.stream().findFirst().orElseThrow(), badKey2, - values.stream().findAny().orElseThrow(), + values.stream().findFirst().orElseThrow(), true )); Flux> goodKeys1And2 = Flux .fromIterable(values) .map(value -> Tuples.of( - goodKeys1.stream().findAny().orElseThrow(), - goodKeys2.stream().findAny().orElseThrow(), + goodKeys1.stream().findFirst().orElseThrow(), + goodKeys2.stream().findFirst().orElseThrow(), value, false )); @@ -128,7 +135,7 @@ public class TestDictionaryMapDeep { ); return keys1And2 - .flatMap(entryTuple -> Flux + .concatMap(entryTuple -> Flux .fromArray(UpdateMode.values()) .map(updateMode -> Tuples.of(updateMode, entryTuple.getT1(), @@ -143,24 +150,27 @@ public class TestDictionaryMapDeep { fullTuple.getT4(), fullTuple.getT5() )) - .toStream(); + .toStream() + .sequential(); } @BeforeEach public void beforeEach() { - ensureNoLeaks(getUncachedAllocator()); + this.allocator = newAllocator(); + ensureNoLeaks(allocator.allocator(), false); } @AfterEach public void afterEach() { - ensureNoLeaks(getUncachedAllocatorUnsafe()); + ensureNoLeaks(allocator.allocator(), true); + destroyAllocator(allocator); } @ParameterizedTest @MethodSource("provideArgumentsSet") public void testSetValueGetValue(UpdateMode updateMode, String key, Map value, boolean shouldFail) { var stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMap(map -> map .putValue(key, value) @@ -182,7 +192,7 @@ public class TestDictionaryMapDeep { Map value, boolean shouldFail) { var stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> map .putValue(key, value) @@ -202,7 +212,7 @@ public class TestDictionaryMapDeep { public void testAtSetGetAllStagesGetAllValues(UpdateMode updateMode, String key, Map value, boolean shouldFail) { var remainingEntries = new ConcurrentHashMap, Boolean>().keySet(true); Step> stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> map .at(null, key) @@ -243,10 +253,10 @@ public class TestDictionaryMapDeep { } @ParameterizedTest - @MethodSource("provideArgumentsPut") + @MethodSource({"provideArgumentsPut"}) public void testAtPutValueAtGetValue(UpdateMode updateMode, String key1, String key2, String value, boolean shouldFail) { var stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMap(map -> map .at(null, key1).flatMap(v -> v.putValue(key2, value).doAfterTerminate(v::release)) @@ -265,7 +275,7 @@ public class TestDictionaryMapDeep { @MethodSource("provideArgumentsSet") public void testSetAndGetPrevious(UpdateMode updateMode, String key, Map value, boolean shouldFail) { var stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> Flux .concat( @@ -289,7 +299,7 @@ public class TestDictionaryMapDeep { @MethodSource("provideArgumentsPut") public void testAtPutValueAndGetPrevious(UpdateMode updateMode, String key1, String key2, String value, boolean shouldFail) { var stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> Flux .concat( @@ -326,7 +336,7 @@ public class TestDictionaryMapDeep { @MethodSource("provideArgumentsSet") public void testSetValueRemoveAndGetPrevious(UpdateMode updateMode, String key, Map value, boolean shouldFail) { var stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> Flux .concat( @@ -348,7 +358,7 @@ public class TestDictionaryMapDeep { @MethodSource("provideArgumentsPut") public void testAtPutValueRemoveAndGetPrevious(UpdateMode updateMode, String key1, String key2, String value, boolean shouldFail) { var stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> Flux .concat( @@ -386,7 +396,7 @@ public class TestDictionaryMapDeep { @MethodSource("provideArgumentsSet") public void testSetValueRemoveAndGetStatus(UpdateMode updateMode, String key, Map value, boolean shouldFail) { var stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> Flux .concat( @@ -408,7 +418,7 @@ public class TestDictionaryMapDeep { @MethodSource("provideArgumentsPut") public void testAtPutValueRemoveAndGetStatus(UpdateMode updateMode, String key1, String key2, String value, boolean shouldFail) { var stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> Flux .concat( @@ -449,7 +459,7 @@ public class TestDictionaryMapDeep { return; } var stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> Flux .concat( @@ -491,7 +501,7 @@ public class TestDictionaryMapDeep { return; } var stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> Flux .concat( @@ -536,7 +546,7 @@ public class TestDictionaryMapDeep { if (updateMode != UpdateMode.ALLOW_UNSAFE && !isTestBadKeysEnabled()) { return; } - var stpVer = StepVerifier.create(tempDb(db -> tempDictionary(db, updateMode) + var stpVer = StepVerifier.create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> Flux.concat( map.updateValue(key, old -> { @@ -575,7 +585,7 @@ public class TestDictionaryMapDeep { return; } var stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> Flux .concat( @@ -626,7 +636,7 @@ public class TestDictionaryMapDeep { @MethodSource("provideArgumentsSet") public void testSetAndGetChanged(UpdateMode updateMode, String key, Map value, boolean shouldFail) { var stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> Flux .concat( @@ -647,18 +657,18 @@ public class TestDictionaryMapDeep { } private static Stream provideArgumentsSetMulti() { - var goodKeys = Set.of(Set.of("12345", "67890"), Set.of()); - Set> badKeys; + var goodKeys = List.of(List.of("12345", "67890"), List.of()); + List> badKeys; if (isTestBadKeysEnabled()) { - badKeys = Set.of(Set.of("", "12345"), Set.of("45678", "aaaa"), Set.of("aaaaaa", "capra")); + badKeys = List.of(List.of("", "12345"), List.of("45678", "aaaa"), List.of("aaaaaa", "capra")); } else { - badKeys = Set.of(); + badKeys = List.of(); } - Set, Boolean>> keys = Stream.concat( + List, Boolean>> keys = Stream.concat( goodKeys.stream().map(s -> Tuples.of(s, false)), badKeys.stream().map(s -> Tuples.of(s, true)) - ).collect(Collectors.toSet()); - var values = Set.of( + ).collect(Collectors.toList()); + var values = List.of( Map.of("123456", "a", "234567", ""), Map.of("123456", "\0", "234567", "\0\0", "345678", BIG_STRING) ); @@ -682,7 +692,7 @@ public class TestDictionaryMapDeep { public void testSetMultiGetMulti(UpdateMode updateMode, Map> entries, boolean shouldFail) { var remainingEntries = new ConcurrentHashMap>, Boolean>().keySet(true); Step>> stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> Flux .concat( @@ -710,7 +720,7 @@ public class TestDictionaryMapDeep { public void testSetAllValuesGetMulti(UpdateMode updateMode, Map> entries, boolean shouldFail) { var remainingEntries = new ConcurrentHashMap>, Boolean>().keySet(true); Step>> stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> map .setAllValues(Flux.fromIterable(entries.entrySet())) @@ -736,7 +746,7 @@ public class TestDictionaryMapDeep { public void testSetAllValuesAndGetPrevious(UpdateMode updateMode, Map> entries, boolean shouldFail) { var remainingEntries = new ConcurrentHashMap>, Boolean>().keySet(true); Step>> stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> Flux .concat( @@ -762,7 +772,7 @@ public class TestDictionaryMapDeep { public void testSetGetMulti(UpdateMode updateMode, Map> entries, boolean shouldFail) { var remainingEntries = new ConcurrentHashMap>, Boolean>().keySet(true); Step>> stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> Flux .concat( @@ -789,7 +799,7 @@ public class TestDictionaryMapDeep { @MethodSource("provideArgumentsSetMulti") public void testSetAndGetStatus(UpdateMode updateMode, Map> entries, boolean shouldFail) { Step stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> { Mono removalMono; @@ -821,7 +831,7 @@ public class TestDictionaryMapDeep { public void testSetAndGetPrevious(UpdateMode updateMode, Map> entries, boolean shouldFail) { var remainingEntries = new ConcurrentHashMap>, Boolean>().keySet(true); Step>> stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> Flux .concat( @@ -849,7 +859,7 @@ public class TestDictionaryMapDeep { public void testSetClearAndGetPreviousGet(UpdateMode updateMode, Map> entries, boolean shouldFail) { var remainingEntries = new ConcurrentHashMap>, Boolean>().keySet(true); Step>> stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> Flux .concat(map.set(entries).then(Mono.empty()), map.clearAndGetPrevious(), map.get(null)) @@ -874,7 +884,7 @@ public class TestDictionaryMapDeep { public void testSetMultiGetAllValues(UpdateMode updateMode, Map> entries, boolean shouldFail) { var remainingEntries = new ConcurrentHashMap>, Boolean>().keySet(true); Step>> stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> Flux .concat( @@ -900,7 +910,7 @@ public class TestDictionaryMapDeep { public void testSetMultiGet(UpdateMode updateMode, Map> entries, boolean shouldFail) { var remainingEntries = new ConcurrentHashMap>, Boolean>().keySet(true); Step>> stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> Flux .concat( @@ -928,7 +938,7 @@ public class TestDictionaryMapDeep { public void testSetMultiGetAllStagesGet(UpdateMode updateMode, Map> entries, boolean shouldFail) { var remainingEntries = new ConcurrentHashMap>, Boolean>().keySet(true); Step>> stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> Flux .concat( @@ -960,7 +970,7 @@ public class TestDictionaryMapDeep { @MethodSource("provideArgumentsSetMulti") public void testSetMultiIsEmpty(UpdateMode updateMode, Map> entries, boolean shouldFail) { Step stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> Flux .concat( @@ -982,7 +992,7 @@ public class TestDictionaryMapDeep { @MethodSource("provideArgumentsSetMulti") public void testSetMultiClear(UpdateMode updateMode, Map> entries, boolean shouldFail) { Step stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> Flux .concat( diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeepHashMap.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeepHashMap.java index 4717705..385f5da 100644 --- a/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeepHashMap.java +++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeepHashMap.java @@ -1,12 +1,13 @@ package it.cavallium.dbengine; +import static it.cavallium.dbengine.DbTestUtils.destroyAllocator; import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks; -import static it.cavallium.dbengine.DbTestUtils.getUncachedAllocator; -import static it.cavallium.dbengine.DbTestUtils.getUncachedAllocatorUnsafe; +import static it.cavallium.dbengine.DbTestUtils.newAllocator; import static it.cavallium.dbengine.DbTestUtils.tempDatabaseMapDictionaryDeepMapHashMap; import static it.cavallium.dbengine.DbTestUtils.tempDb; import static it.cavallium.dbengine.DbTestUtils.tempDictionary; +import it.cavallium.dbengine.DbTestUtils.TestAllocator; import it.cavallium.dbengine.database.UpdateMode; import java.util.Arrays; import java.util.Map; @@ -32,6 +33,8 @@ import reactor.util.function.Tuples; public class TestDictionaryMapDeepHashMap { + private TestAllocator allocator; + private static boolean isTestBadKeysEnabled() { return System.getProperty("badkeys", "true").equalsIgnoreCase("true"); } @@ -101,19 +104,21 @@ public class TestDictionaryMapDeepHashMap { @BeforeEach public void beforeEach() { - ensureNoLeaks(getUncachedAllocator()); + this.allocator = newAllocator(); + ensureNoLeaks(allocator.allocator(), false); } @AfterEach public void afterEach() { - ensureNoLeaks(getUncachedAllocatorUnsafe()); + ensureNoLeaks(allocator.allocator(), true); + destroyAllocator(allocator); } @ParameterizedTest @MethodSource("provideArgumentsPut") public void testAtPutValueGetAllValues(UpdateMode updateMode, String key1, String key2, String value, boolean shouldFail) { var stpVer = StepVerifier - .create(tempDb(db -> tempDictionary(db, updateMode) + .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMapHashMap(dict, 5)) .flatMapMany(map -> map .at(null, key1).flatMap(v -> v.putValue(key2, value).doAfterTerminate(v::release)) diff --git a/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java b/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java index b1e656d..9cc1f60 100644 --- a/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java +++ b/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java @@ -1,10 +1,14 @@ package it.cavallium.dbengine; +import static it.cavallium.dbengine.DbTestUtils.destroyAllocator; +import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks; +import static it.cavallium.dbengine.DbTestUtils.newAllocator; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import io.netty.buffer.ByteBuf; import it.cavallium.dbengine.DbTestUtils.TempDb; +import it.cavallium.dbengine.DbTestUtils.TestAllocator; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLKeyValueDatabase; @@ -31,15 +35,25 @@ import reactor.core.scheduler.Schedulers; public class TestLLDictionaryLeaks { + private TestAllocator allocator; private TempDb tempDb; private LLKeyValueDatabase db; @BeforeEach public void beforeEach() { - tempDb = Objects.requireNonNull(DbTestUtils.openTempDb().block(), "TempDB"); + this.allocator = newAllocator(); + ensureNoLeaks(allocator.allocator(), false); + tempDb = Objects.requireNonNull(DbTestUtils.openTempDb(allocator).block(), "TempDB"); db = tempDb.db(); } + @AfterEach + public void afterEach() { + DbTestUtils.closeTempDb(tempDb).block(); + ensureNoLeaks(allocator.allocator(), true); + destroyAllocator(allocator); + } + public static Stream provideArguments() { return Arrays.stream(UpdateMode.values()).map(Arguments::of); } @@ -209,9 +223,4 @@ public class TestLLDictionaryLeaks { var key = Mono.fromCallable(() -> fromString("test-key")); runVoid(dict.remove(key, resultType).then()); } - - @AfterEach - public void afterEach() { - DbTestUtils.closeTempDb(tempDb).block(); - } } diff --git a/src/test/java/it/cavallium/dbengine/TestSingletons.java b/src/test/java/it/cavallium/dbengine/TestSingletons.java index fcd1647..cbc5653 100644 --- a/src/test/java/it/cavallium/dbengine/TestSingletons.java +++ b/src/test/java/it/cavallium/dbengine/TestSingletons.java @@ -1,10 +1,11 @@ package it.cavallium.dbengine; +import static it.cavallium.dbengine.DbTestUtils.destroyAllocator; import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks; -import static it.cavallium.dbengine.DbTestUtils.getUncachedAllocator; -import static it.cavallium.dbengine.DbTestUtils.getUncachedAllocatorUnsafe; +import static it.cavallium.dbengine.DbTestUtils.newAllocator; import static it.cavallium.dbengine.DbTestUtils.tempDb; +import it.cavallium.dbengine.DbTestUtils.TestAllocator; import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.collections.DatabaseInt; import it.cavallium.dbengine.database.collections.DatabaseLong; @@ -21,6 +22,8 @@ import reactor.test.StepVerifier; public class TestSingletons { + private TestAllocator allocator; + private static Stream provideNumberWithRepeats() { return Stream.of( Arguments.of(Integer.MIN_VALUE, 2), @@ -38,21 +41,23 @@ public class TestSingletons { Arguments.of(102L, 5) ); } - + @BeforeEach public void beforeEach() { - ensureNoLeaks(getUncachedAllocator()); + this.allocator = newAllocator(); + ensureNoLeaks(allocator.allocator(), false); } @AfterEach public void afterEach() { - ensureNoLeaks(getUncachedAllocatorUnsafe()); + ensureNoLeaks(allocator.allocator(), true); + destroyAllocator(allocator); } @Test public void testCreateInteger() { StepVerifier - .create(tempDb(db -> tempInt(db, "test", 0) + .create(tempDb(allocator, db -> tempInt(db, "test", 0) .flatMap(dbInt -> dbInt.get(null)) .then() )) @@ -62,7 +67,7 @@ public class TestSingletons { @Test public void testCreateLong() { StepVerifier - .create(tempDb(db -> tempLong(db, "test", 0) + .create(tempDb(allocator, db -> tempLong(db, "test", 0) .flatMap(dbLong -> dbLong.get(null)) .then() )) @@ -73,7 +78,7 @@ public class TestSingletons { @ValueSource(ints = {Integer.MIN_VALUE, -192, -2, -1, 0, 1, 2, 1292, Integer.MAX_VALUE}) public void testDefaultValueInteger(int i) { StepVerifier - .create(tempDb(db -> tempInt(db, "test", i) + .create(tempDb(allocator, db -> tempInt(db, "test", i) .flatMap(dbInt -> dbInt.get(null)) )) .expectNext(i) @@ -84,7 +89,7 @@ public class TestSingletons { @ValueSource(longs = {Long.MIN_VALUE, -192, -2, -1, 0, 1, 2, 1292, Long.MAX_VALUE}) public void testDefaultValueLong(long i) { StepVerifier - .create(tempDb(db -> tempLong(db, "test", i) + .create(tempDb(allocator, db -> tempLong(db, "test", i) .flatMap(dbLong -> dbLong.get(null)) )) .expectNext(i) @@ -95,7 +100,7 @@ public class TestSingletons { @MethodSource("provideNumberWithRepeats") public void testSetInteger(Integer i, Integer repeats) { StepVerifier - .create(tempDb(db -> tempInt(db, "test", 0) + .create(tempDb(allocator, db -> tempInt(db, "test", 0) .flatMap(dbInt -> Mono .defer(() -> dbInt.set((int) System.currentTimeMillis())) .repeat(repeats) @@ -110,7 +115,7 @@ public class TestSingletons { @MethodSource("provideLongNumberWithRepeats") public void testSetLong(Long i, Integer repeats) { StepVerifier - .create(tempDb(db -> tempLong(db, "test", 0) + .create(tempDb(allocator, db -> tempLong(db, "test", 0) .flatMap(dbLong -> Mono .defer(() -> dbLong.set(System.currentTimeMillis())) .repeat(repeats)