diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java index 28c36a4..67e3204 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java @@ -49,11 +49,11 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure { return update(key, updater, returnMode, false); } - Mono updateAndGetDelta(Mono> key, + Mono> updateAndGetDelta(Mono> key, SerializationFunction<@Nullable Send, @Nullable Send> updater, boolean existsAlmostCertainly); - default Mono updateAndGetDelta(Mono> key, + default Mono> updateAndGetDelta(Mono> key, SerializationFunction<@Nullable Send, @Nullable Send> updater) { return updateAndGetDelta(key, updater, false); } diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 210a685..936148b 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -8,6 +8,7 @@ import io.netty5.buffer.api.CompositeBuffer; import io.netty5.buffer.api.Send; import io.netty5.util.IllegalReferenceCountException; import io.netty5.util.internal.PlatformDependent; +import it.cavallium.dbengine.database.collections.DatabaseStage; import it.cavallium.dbengine.database.disk.MemorySegmentUtils; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; @@ -494,9 +495,9 @@ public class LLUtils { }); } - public static Mono> resolveLLDelta(Mono prev, UpdateReturnMode updateReturnMode) { - return prev.handle((delta, sink) -> { - try (delta) { + public static Mono> resolveLLDelta(Mono> prev, UpdateReturnMode updateReturnMode) { + return prev.handle((deltaToReceive, sink) -> { + try (var delta = deltaToReceive.receive()) { switch (updateReturnMode) { case GET_NEW_VALUE -> { var current = delta.current(); @@ -546,10 +547,10 @@ public class LLUtils { }); } - public static Mono> mapLLDelta(Mono mono, + public static Mono> mapLLDelta(Mono> mono, SerializationFunction<@NotNull Send, @Nullable U> mapper) { - return mono.handle((delta, sink) -> { - try { + return mono.handle((deltaToReceive, sink) -> { + try (var delta = deltaToReceive.receive()) { try (Send prev = delta.previous()) { try (Send curr = delta.current()) { U newPrev; @@ -609,12 +610,16 @@ public class LLUtils { discardLLEntry(o); } else if (obj instanceof LLRange o) { discardLLRange(o); + } else if (obj instanceof LLDelta o) { + discardLLDelta(o); } else if (obj instanceof Delta o) { discardDelta(o); } else if (obj instanceof Send o) { discardSend(o); } else if (obj instanceof Map o) { discardMap(o); + } else if (obj instanceof DatabaseStage o) { + discardStage(o); } }); // todo: check if the single object discard hook is more performant @@ -627,8 +632,10 @@ public class LLUtils { .doOnDiscard(LLEntry.class, LLUtils::discardLLEntry) .doOnDiscard(LLRange.class, LLUtils::discardLLRange) .doOnDiscard(Delta.class, LLUtils::discardDelta) + .doOnDiscard(LLDelta.class, LLUtils::discardLLDelta) .doOnDiscard(Send.class, LLUtils::discardSend) - .doOnDiscard(Map.class, LLUtils::discardMap); + .doOnDiscard(Map.class, LLUtils::discardMap) + .doOnDiscard(DatabaseStage.class, LLUtils::discardStage); */ } @@ -651,10 +658,14 @@ public class LLUtils { discardLLRange(o); } else if (obj instanceof Delta o) { discardDelta(o); + } else if (obj instanceof LLDelta o) { + discardLLDelta(o); } else if (obj instanceof Send o) { discardSend(o); } else if (obj instanceof Map o) { discardMap(o); + } else if (obj instanceof DatabaseStage o) { + discardStage(o); } }); // todo: check if the single object discard hook is more performant @@ -667,8 +678,10 @@ public class LLUtils { .doOnDiscard(LLEntry.class, LLUtils::discardLLEntry) .doOnDiscard(LLRange.class, LLUtils::discardLLRange) .doOnDiscard(Delta.class, LLUtils::discardDelta) + .doOnDiscard(LLDelta.class, LLUtils::discardLLDelta) .doOnDiscard(Send.class, LLUtils::discardSend) - .doOnDiscard(Map.class, LLUtils::discardMap); + .doOnDiscard(Map.class, LLUtils::discardMap) + .doOnDiscard(DatabaseStage.class, LLUtils::discardStage); */ } @@ -683,6 +696,11 @@ public class LLUtils { range.close(); } + private static void discardLLDelta(LLDelta delta) { + logger.trace("Releasing discarded LLDelta"); + delta.close(); + } + private static void discardEntry(Map.Entry e) { if (e.getKey() instanceof Buffer bb) { bb.close(); @@ -776,6 +794,10 @@ public class LLUtils { } } + private static void discardStage(DatabaseStage stage) { + stage.release(); + } + public static boolean isDirect(Buffer key) { var readableComponents = key.countReadableComponents(); if (readableComponents == 0) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 4f048d3..e2a1e94 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -19,6 +19,7 @@ import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; @@ -287,6 +288,9 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep extends DatabaseMapDictionaryDeep { + for (Object o : list) { + if (o instanceof Send send) { + send.close(); + } else if (o instanceof Buffer buf) { + buf.close(); + } + } + }); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java index 05cf858..e47b37c 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java @@ -45,7 +45,7 @@ public class DatabaseSingle implements DatabaseStageEntry { } private void deserializeValue(Send value, SynchronousSink sink) { - try { + try (value) { sink.next(serializer.deserialize(value).deserializedData()); } catch (SerializationException ex) { sink.error(ex); @@ -72,12 +72,14 @@ public class DatabaseSingle implements DatabaseStageEntry { boolean existsAlmostCertainly) { return dictionary .update(keyMono, (oldValueSer) -> { - var result = updater.apply( - oldValueSer == null ? null : serializer.deserialize(oldValueSer).deserializedData()); - if (result == null) { - return null; - } else { - return serializer.serialize(result); + try (oldValueSer) { + var result = updater.apply(oldValueSer == null ? null + : serializer.deserialize(oldValueSer).deserializedData()); + if (result == null) { + return null; + } else { + return serializer.serialize(result); + } } }, updateReturnMode, existsAlmostCertainly) .handle(this::deserializeValue); @@ -88,12 +90,14 @@ public class DatabaseSingle implements DatabaseStageEntry { boolean existsAlmostCertainly) { return dictionary .updateAndGetDelta(keyMono, (oldValueSer) -> { - var result = updater.apply( - oldValueSer == null ? null : serializer.deserialize(oldValueSer).deserializedData()); - if (result == null) { - return null; - } else { - return serializer.serialize(result); + try (oldValueSer) { + var result = updater.apply(oldValueSer == null ? null + : serializer.deserialize(oldValueSer).deserializedData()); + if (result == null) { + return null; + } else { + return serializer.serialize(result); + } } }, existsAlmostCertainly).transform(mono -> LLUtils.mapLLDelta(mono, serialized -> serializer.deserialize(serialized).deserializedData() diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index 6e3bed9..1c6cadb 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.database.collections; +import io.netty5.buffer.api.Buffer; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.ExtraKeyOperationResult; @@ -148,7 +149,14 @@ public interface DatabaseStageMap> extends Dat .getValue(snapshot, key, existsAlmostCertainly) .map(value -> Map.entry(key, Optional.of(value))) .switchIfEmpty(Mono.fromSupplier(() -> Map.entry(key, Optional.empty()))) - ); + ) + .doOnDiscard(Entry.class, unknownEntry -> { + if (unknownEntry.getValue() instanceof Optional optionalBuffer + && optionalBuffer.isPresent() + && optionalBuffer.get() instanceof Buffer buffer) { + buffer.close(); + } + }); } /** diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 2cfdc61..2174b35 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -774,7 +774,7 @@ public class LLLocalDictionary implements LLDictionary { // Remember to change also update() if you are modifying this function @SuppressWarnings("DuplicatedCode") @Override - public Mono updateAndGetDelta(Mono> keyMono, + public Mono> updateAndGetDelta(Mono> keyMono, SerializationFunction<@Nullable Send, @Nullable Send> updater, boolean existsAlmostCertainly) { return Mono.usingWhen(keyMono, @@ -878,7 +878,7 @@ public class LLLocalDictionary implements LLDictionary { return LLDelta.of( prevData != null ? prevData.send() : null, newData != null ? newData.send() : null - ); + ).send(); } finally { if (newData != null) { newData.close(); @@ -1536,8 +1536,8 @@ public class LLLocalDictionary implements LLDictionary { true, "getRangeKeysGrouped" ), - it -> it.flux(), - it -> it.release() + LLLocalKeyPrefixReactiveRocksIterator::flux, + LLLocalKeyPrefixReactiveRocksIterator::release ) .subscribeOn(dbScheduler), rangeSend -> Mono.fromRunnable(rangeSend::close) diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java index fef921f..5fdaf2f 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java @@ -94,7 +94,9 @@ public class LLMemoryDictionary implements LLDictionary { } private ByteList k(Send buf) { - return new BinaryLexicographicList(LLUtils.toArray(buf.receive())); + try (var b = buf.receive()) { + return new BinaryLexicographicList(LLUtils.toArray(b)); + } } private Send kk(ByteList bytesList) { @@ -168,20 +170,13 @@ public class LLMemoryDictionary implements LLDictionary { @Override public Mono> put(Mono> keyMono, Mono> valueMono, LLDictionaryResultType resultType) { - return Mono.usingWhen(keyMono, - key -> Mono.usingWhen(valueMono, - value -> Mono - .fromCallable(() -> { - var k = k(key); - var v = k(value); - return mainDb.put(k, v); - }) - .transform(result -> this.transformResult(result, resultType)) - .onErrorMap(cause -> new IOException("Failed to read", cause)), - value -> Mono.fromRunnable(value::close) - ), - key -> Mono.fromRunnable(key::close) - ); + var kMono = keyMono.map(this::k); + var vMono = valueMono.map(this::k); + return Mono + .zip(kMono, vMono) + .mapNotNull(tuple -> mainDb.put(tuple.getT1(), tuple.getT2())) + .transform(result -> this.transformResult(result, resultType)) + .onErrorMap(cause -> new IOException("Failed to read", cause)); } @Override @@ -190,38 +185,41 @@ public class LLMemoryDictionary implements LLDictionary { } @Override - public Mono updateAndGetDelta(Mono> keyMono, + public Mono> updateAndGetDelta(Mono> keyMono, SerializationFunction<@Nullable Send, @Nullable Send> updater, boolean existsAlmostCertainly) { return Mono.usingWhen(keyMono, key -> Mono.fromCallable(() -> { - if (updateMode == UpdateMode.DISALLOW) { - throw new UnsupportedOperationException("update() is disallowed"); + try (key) { + if (updateMode == UpdateMode.DISALLOW) { + throw new UnsupportedOperationException("update() is disallowed"); + } + AtomicReference> oldRef = new AtomicReference<>(null); + var newValue = mainDb.compute(k(key), (_unused, old) -> { + if (old != null) { + oldRef.set(kk(old)); + } + Buffer v; + try (var oldToSend = old != null ? kk(old) : null) { + var vToReceive = updater.apply(oldToSend); + v = vToReceive != null ? vToReceive.receive() : null; + } catch (SerializationException e) { + throw new IllegalStateException(e); + } + try { + if (v != null) { + return k(v.send()); + } else { + return null; + } + } finally { + if (v != null) { + v.close(); + } + } + }); + return LLDelta.of(oldRef.get(), newValue != null ? kk(newValue) : null).send(); } - AtomicReference> oldRef = new AtomicReference<>(null); - var newValue = mainDb.compute(k(key), (_unused, old) -> { - if (old != null) { - oldRef.set(kk(old)); - } - Send v; - try { - v = updater.apply(old != null ? kk(old) : null); - } catch (SerializationException e) { - throw new IllegalStateException(e); - } - try { - if (v != null) { - return k(v); - } else { - return null; - } - } finally { - if (v != null) { - v.close(); - } - } - }); - return LLDelta.of(oldRef.get(), newValue != null ? kk(newValue) : null); }), key -> Mono.fromRunnable(key::close) ); diff --git a/src/test/java/it/cavallium/dbengine/DbTestUtils.java b/src/test/java/it/cavallium/dbengine/DbTestUtils.java index a2bd491..203678b 100644 --- a/src/test/java/it/cavallium/dbengine/DbTestUtils.java +++ b/src/test/java/it/cavallium/dbengine/DbTestUtils.java @@ -37,11 +37,7 @@ public class DbTestUtils { public static final String BIG_STRING = generateBigString(); private static String generateBigString() { - var sb = new StringBuilder(); - for (int i = 0; i < 1024; i++) { - sb.append("0123456789"); - } - return sb.toString(); + return "0123456789".repeat(1024); } public static record TestAllocator(PooledBufferAllocator allocator) {} @@ -78,7 +74,9 @@ public class DbTestUtils { Function> action) { return Flux.usingWhen( temporaryDbGenerator.openTempDb(alloc), - tempDb -> action.apply(tempDb.db()), + tempDb -> Flux.from(action.apply(tempDb.db())).doOnDiscard(Object.class, o -> { + System.out.println("Discarded: " + o.getClass().getName() + ", " + o); + }), temporaryDbGenerator::closeTempDb ); } diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java index 5a56c86..324802c 100644 --- a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java +++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java @@ -29,6 +29,7 @@ import reactor.util.function.Tuples; public abstract class TestDictionaryMap { private TestAllocator allocator; + private boolean checkLeaks = true; private static boolean isTestBadKeysEnabled() { return System.getProperty("badkeys", "true").equalsIgnoreCase("true"); @@ -87,7 +88,9 @@ public abstract class TestDictionaryMap { @AfterEach public void afterEach() { - ensureNoLeaks(allocator.allocator(), true, false); + if (checkLeaks) { + ensureNoLeaks(allocator.allocator(), true, false); + } destroyAllocator(allocator); } @@ -104,6 +107,7 @@ public abstract class TestDictionaryMap { ) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { stpVer.expectNext(value).verifyComplete(); @@ -123,6 +127,7 @@ public abstract class TestDictionaryMap { ) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { stpVer.expectNext(value).verifyComplete(); @@ -145,6 +150,7 @@ public abstract class TestDictionaryMap { ) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { stpVer.expectNext("error?").expectNext(value).verifyComplete(); @@ -167,6 +173,7 @@ public abstract class TestDictionaryMap { ) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { stpVer.expectNext(value).verifyComplete(); @@ -189,6 +196,7 @@ public abstract class TestDictionaryMap { ) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { stpVer.expectNext(false, true, false).verifyComplete(); @@ -229,6 +237,7 @@ public abstract class TestDictionaryMap { ) .doAfterTerminate(map::release) ) + .transform(LLUtils::handleDiscard) )); if (updateMode == UpdateMode.DISALLOW || shouldFail) { stpVer.verifyError(); @@ -271,6 +280,7 @@ public abstract class TestDictionaryMap { ) .doAfterTerminate(map::release) ) + .transform(LLUtils::handleDiscard) )); if (updateMode == UpdateMode.DISALLOW || shouldFail) { stpVer.verifyError(); @@ -297,6 +307,7 @@ public abstract class TestDictionaryMap { ) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { stpVer.expectNext(true, true, false, true).verifyComplete(); @@ -358,6 +369,7 @@ public abstract class TestDictionaryMap { .transform(LLUtils::handleDiscard) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); @@ -382,8 +394,10 @@ public abstract class TestDictionaryMap { ) .filter(k -> k.getValue().isPresent()) .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) + .transform(LLUtils::handleDiscard) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); @@ -411,6 +425,7 @@ public abstract class TestDictionaryMap { .transform(LLUtils::handleDiscard) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); @@ -437,8 +452,10 @@ public abstract class TestDictionaryMap { ) .filter(k -> k.getValue().isPresent()) .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) + .transform(LLUtils::handleDiscard) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); @@ -476,6 +493,7 @@ public abstract class TestDictionaryMap { }) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { stpVer.expectNext(!entries.isEmpty(), false, !entries.isEmpty()).verifyComplete(); @@ -497,6 +515,7 @@ public abstract class TestDictionaryMap { ) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); @@ -522,6 +541,7 @@ public abstract class TestDictionaryMap { ) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); @@ -549,6 +569,7 @@ public abstract class TestDictionaryMap { .transform(LLUtils::handleDiscard) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); @@ -578,6 +599,7 @@ public abstract class TestDictionaryMap { .transform(LLUtils::handleDiscard) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); @@ -612,6 +634,7 @@ public abstract class TestDictionaryMap { .transform(LLUtils::handleDiscard) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); @@ -641,6 +664,7 @@ public abstract class TestDictionaryMap { .transform(LLUtils::handleDiscard) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { stpVer.expectNext(true, entries.isEmpty()).verifyComplete(); @@ -667,6 +691,7 @@ public abstract class TestDictionaryMap { .transform(LLUtils::handleDiscard) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { stpVer.expectNext(true, entries.isEmpty(), true).verifyComplete(); diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java index 2f73ec7..21db819 100644 --- a/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java +++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java @@ -42,6 +42,7 @@ import reactor.util.function.Tuples; public abstract class TestDictionaryMapDeep { private TestAllocator allocator; + private boolean checkLeaks = true; private static boolean isTestBadKeysEnabled() { return System.getProperty("badkeys", "true").equalsIgnoreCase("true"); @@ -164,7 +165,9 @@ public abstract class TestDictionaryMapDeep { @AfterEach public void afterEach() { - ensureNoLeaks(allocator.allocator(), true, false); + if (checkLeaks) { + ensureNoLeaks(allocator.allocator(), true, false); + } destroyAllocator(allocator); } @@ -181,6 +184,7 @@ public abstract class TestDictionaryMapDeep { ) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { stpVer.expectNext(value).verifyComplete(); @@ -203,6 +207,7 @@ public abstract class TestDictionaryMapDeep { ) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { stpVer.expectNext(Map.entry(key, value)).verifyComplete(); @@ -245,6 +250,7 @@ public abstract class TestDictionaryMapDeep { )) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { value.forEach((k, v) -> remainingEntries.add(Tuples.of(key, k, v))); @@ -271,6 +277,7 @@ public abstract class TestDictionaryMapDeep { ) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { stpVer.expectNext(value).verifyComplete(); @@ -295,6 +302,7 @@ public abstract class TestDictionaryMapDeep { ) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { stpVer.expectNext(Map.of("nothing", "nothing"), Map.of("error?", "error.")).expectNext(value).verifyComplete(); @@ -332,6 +340,7 @@ public abstract class TestDictionaryMapDeep { ) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { stpVer.expectNext("error?", value).verifyComplete(); @@ -354,6 +363,7 @@ public abstract class TestDictionaryMapDeep { ) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { stpVer.expectNext(value).verifyComplete(); @@ -392,6 +402,7 @@ public abstract class TestDictionaryMapDeep { ) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { stpVer.expectNext("error?", value).verifyComplete(); @@ -414,6 +425,7 @@ public abstract class TestDictionaryMapDeep { ) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { stpVer.expectNext(false, true, false).verifyComplete(); @@ -452,6 +464,7 @@ public abstract class TestDictionaryMapDeep { ) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { stpVer.expectNext(true, true, false).verifyComplete(); @@ -537,6 +550,7 @@ public abstract class TestDictionaryMapDeep { ) ) .doAfterTerminate(map::release) + .transform(LLUtils::handleDiscard) ) )); if (updateMode == UpdateMode.DISALLOW || shouldFail) { @@ -629,6 +643,7 @@ public abstract class TestDictionaryMapDeep { ) ) .doAfterTerminate(map::release) + .transform(LLUtils::handleDiscard) ) )); if (updateMode == UpdateMode.DISALLOW || shouldFail) { @@ -656,6 +671,7 @@ public abstract class TestDictionaryMapDeep { ) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { stpVer.expectNext(true, true, false, true).verifyComplete(); @@ -709,8 +725,10 @@ public abstract class TestDictionaryMapDeep { ) .filter(k -> k.getValue().isPresent()) .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) + .transform(LLUtils::handleDiscard) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); @@ -737,6 +755,7 @@ public abstract class TestDictionaryMapDeep { .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); @@ -760,9 +779,11 @@ public abstract class TestDictionaryMapDeep { map.setAllValuesAndGetPrevious(Flux.fromIterable(entries.entrySet())) ) .doAfterTerminate(map::release) + .transform(LLUtils::handleDiscard) ) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); @@ -789,8 +810,10 @@ public abstract class TestDictionaryMapDeep { ) .filter(k -> k.getValue().isPresent()) .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) + .transform(LLUtils::handleDiscard) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); @@ -826,6 +849,7 @@ public abstract class TestDictionaryMapDeep { .transform(LLUtils::handleDiscard) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { stpVer.expectNext(!entries.isEmpty(), false, !entries.isEmpty()).verifyComplete(); @@ -850,6 +874,7 @@ public abstract class TestDictionaryMapDeep { ) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); @@ -873,8 +898,10 @@ public abstract class TestDictionaryMapDeep { .concatMapIterable(list -> list) .doAfterTerminate(map::release) ) + .transform(LLUtils::handleDiscard) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); @@ -901,6 +928,7 @@ public abstract class TestDictionaryMapDeep { ) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); @@ -929,6 +957,7 @@ public abstract class TestDictionaryMapDeep { ) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); @@ -962,6 +991,7 @@ public abstract class TestDictionaryMapDeep { ) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v))); @@ -986,8 +1016,10 @@ public abstract class TestDictionaryMapDeep { ) .doAfterTerminate(map::release) ) + .transform(LLUtils::handleDiscard) )); if (shouldFail) { + this.checkLeaks = false; stpVer.expectNext(true).verifyError(); } else { stpVer.expectNext(true, entries.isEmpty()).verifyComplete(); @@ -1012,6 +1044,7 @@ public abstract class TestDictionaryMapDeep { ) )); if (shouldFail) { + this.checkLeaks = false; stpVer.expectNext(true).verifyError(); } else { stpVer.expectNext(true, entries.isEmpty(), true).verifyComplete(); diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeepHashMap.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeepHashMap.java index 1e53bac..ff6c3ef 100644 --- a/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeepHashMap.java +++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeepHashMap.java @@ -35,6 +35,7 @@ import reactor.util.function.Tuples; public abstract class TestDictionaryMapDeepHashMap { private TestAllocator allocator; + private boolean checkLeaks = true; private static boolean isTestBadKeysEnabled() { return System.getProperty("badkeys", "true").equalsIgnoreCase("true"); @@ -105,7 +106,9 @@ public abstract class TestDictionaryMapDeepHashMap { @AfterEach public void afterEach() { - ensureNoLeaks(allocator.allocator(), true, false); + if (checkLeaks) { + ensureNoLeaks(allocator.allocator(), true, false); + } destroyAllocator(allocator); } @@ -127,6 +130,7 @@ public abstract class TestDictionaryMapDeepHashMap { ) )); if (shouldFail) { + this.checkLeaks = false; stpVer.verifyError(); } else { stpVer.expectNext(value).verifyComplete(); diff --git a/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java b/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java index ac3d00b..49541e8 100644 --- a/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java +++ b/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java @@ -175,7 +175,7 @@ public abstract class TestLLDictionaryLeaks { var dict = getDict(updateMode); var key = Mono.fromCallable(() -> fromString("test-key")); var value = Mono.fromCallable(() -> fromString("test-value")); - runVoid(dict.put(key, value, resultType).then()); + runVoid(dict.put(key, value, resultType).then().doOnDiscard(Send.class, Send::close)); } @ParameterizedTest @@ -229,6 +229,6 @@ public abstract class TestLLDictionaryLeaks { public void testRemove(UpdateMode updateMode, LLDictionaryResultType resultType) { var dict = getDict(updateMode); var key = Mono.fromCallable(() -> fromString("test-key")); - runVoid(dict.remove(key, resultType).then()); + runVoid(dict.remove(key, resultType).then().doOnDiscard(Send.class, Send::close)); } }