Fix last leaks

This commit is contained in:
Andrea Cavalli 2021-09-08 00:22:39 +02:00
parent 46787fa353
commit 047a471bf7
12 changed files with 190 additions and 83 deletions

View File

@ -49,11 +49,11 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
return update(key, updater, returnMode, false);
}
Mono<LLDelta> updateAndGetDelta(Mono<Send<Buffer>> key,
Mono<Send<LLDelta>> updateAndGetDelta(Mono<Send<Buffer>> key,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> updater,
boolean existsAlmostCertainly);
default Mono<LLDelta> updateAndGetDelta(Mono<Send<Buffer>> key,
default Mono<Send<LLDelta>> updateAndGetDelta(Mono<Send<Buffer>> key,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> updater) {
return updateAndGetDelta(key, updater, false);
}

View File

@ -8,6 +8,7 @@ import io.netty5.buffer.api.CompositeBuffer;
import io.netty5.buffer.api.Send;
import io.netty5.util.IllegalReferenceCountException;
import io.netty5.util.internal.PlatformDependent;
import it.cavallium.dbengine.database.collections.DatabaseStage;
import it.cavallium.dbengine.database.disk.MemorySegmentUtils;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
@ -494,9 +495,9 @@ public class LLUtils {
});
}
public static Mono<Send<Buffer>> resolveLLDelta(Mono<LLDelta> prev, UpdateReturnMode updateReturnMode) {
return prev.handle((delta, sink) -> {
try (delta) {
public static Mono<Send<Buffer>> resolveLLDelta(Mono<Send<LLDelta>> prev, UpdateReturnMode updateReturnMode) {
return prev.handle((deltaToReceive, sink) -> {
try (var delta = deltaToReceive.receive()) {
switch (updateReturnMode) {
case GET_NEW_VALUE -> {
var current = delta.current();
@ -546,10 +547,10 @@ public class LLUtils {
});
}
public static <U> Mono<Delta<U>> mapLLDelta(Mono<LLDelta> mono,
public static <U> Mono<Delta<U>> mapLLDelta(Mono<Send<LLDelta>> mono,
SerializationFunction<@NotNull Send<Buffer>, @Nullable U> mapper) {
return mono.handle((delta, sink) -> {
try {
return mono.handle((deltaToReceive, sink) -> {
try (var delta = deltaToReceive.receive()) {
try (Send<Buffer> prev = delta.previous()) {
try (Send<Buffer> curr = delta.current()) {
U newPrev;
@ -609,12 +610,16 @@ public class LLUtils {
discardLLEntry(o);
} else if (obj instanceof LLRange o) {
discardLLRange(o);
} else if (obj instanceof LLDelta o) {
discardLLDelta(o);
} else if (obj instanceof Delta o) {
discardDelta(o);
} else if (obj instanceof Send o) {
discardSend(o);
} else if (obj instanceof Map o) {
discardMap(o);
} else if (obj instanceof DatabaseStage o) {
discardStage(o);
}
});
// todo: check if the single object discard hook is more performant
@ -627,8 +632,10 @@ public class LLUtils {
.doOnDiscard(LLEntry.class, LLUtils::discardLLEntry)
.doOnDiscard(LLRange.class, LLUtils::discardLLRange)
.doOnDiscard(Delta.class, LLUtils::discardDelta)
.doOnDiscard(LLDelta.class, LLUtils::discardLLDelta)
.doOnDiscard(Send.class, LLUtils::discardSend)
.doOnDiscard(Map.class, LLUtils::discardMap);
.doOnDiscard(Map.class, LLUtils::discardMap)
.doOnDiscard(DatabaseStage.class, LLUtils::discardStage);
*/
}
@ -651,10 +658,14 @@ public class LLUtils {
discardLLRange(o);
} else if (obj instanceof Delta o) {
discardDelta(o);
} else if (obj instanceof LLDelta o) {
discardLLDelta(o);
} else if (obj instanceof Send o) {
discardSend(o);
} else if (obj instanceof Map o) {
discardMap(o);
} else if (obj instanceof DatabaseStage o) {
discardStage(o);
}
});
// todo: check if the single object discard hook is more performant
@ -667,8 +678,10 @@ public class LLUtils {
.doOnDiscard(LLEntry.class, LLUtils::discardLLEntry)
.doOnDiscard(LLRange.class, LLUtils::discardLLRange)
.doOnDiscard(Delta.class, LLUtils::discardDelta)
.doOnDiscard(LLDelta.class, LLUtils::discardLLDelta)
.doOnDiscard(Send.class, LLUtils::discardSend)
.doOnDiscard(Map.class, LLUtils::discardMap);
.doOnDiscard(Map.class, LLUtils::discardMap)
.doOnDiscard(DatabaseStage.class, LLUtils::discardStage);
*/
}
@ -683,6 +696,11 @@ public class LLUtils {
range.close();
}
private static void discardLLDelta(LLDelta delta) {
logger.trace("Releasing discarded LLDelta");
delta.close();
}
private static void discardEntry(Map.Entry<?, ?> e) {
if (e.getKey() instanceof Buffer bb) {
bb.close();
@ -776,6 +794,10 @@ public class LLUtils {
}
}
private static void discardStage(DatabaseStage<?> stage) {
stage.release();
}
public static boolean isDirect(Buffer key) {
var readableComponents = key.countReadableComponents();
if (readableComponents == 0) {

View File

@ -19,6 +19,7 @@ import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
@ -287,6 +288,9 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
sink.next(Map.entry(entry.getT1(), valueOpt));
} catch (SerializationException ex) {
sink.error(ex);
} finally {
entry.getT2().close();
entry.getT3().ifPresent(Send::close);
}
})
.transform(LLUtils::handleDiscard);
@ -309,11 +313,22 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
} catch (SerializationException e) {
sink.error(e);
}
});
})
.doOnDiscard(Send.class, Send::close);
return dictionary
.putMulti(serializedEntries, false)
.then()
.doOnDiscard(LLEntry.class, ResourceSupport::close);
.doOnDiscard(Send.class, Send::close)
.doOnDiscard(LLEntry.class, ResourceSupport::close)
.doOnDiscard(List.class, list -> {
for (Object o : list) {
if (o instanceof Send send) {
send.close();
} else if (o instanceof Buffer buf) {
buf.close();
}
}
});
}
@Override

View File

@ -45,7 +45,7 @@ public class DatabaseSingle<U> implements DatabaseStageEntry<U> {
}
private void deserializeValue(Send<Buffer> value, SynchronousSink<U> sink) {
try {
try (value) {
sink.next(serializer.deserialize(value).deserializedData());
} catch (SerializationException ex) {
sink.error(ex);
@ -72,12 +72,14 @@ public class DatabaseSingle<U> implements DatabaseStageEntry<U> {
boolean existsAlmostCertainly) {
return dictionary
.update(keyMono, (oldValueSer) -> {
var result = updater.apply(
oldValueSer == null ? null : serializer.deserialize(oldValueSer).deserializedData());
if (result == null) {
return null;
} else {
return serializer.serialize(result);
try (oldValueSer) {
var result = updater.apply(oldValueSer == null ? null
: serializer.deserialize(oldValueSer).deserializedData());
if (result == null) {
return null;
} else {
return serializer.serialize(result);
}
}
}, updateReturnMode, existsAlmostCertainly)
.handle(this::deserializeValue);
@ -88,12 +90,14 @@ public class DatabaseSingle<U> implements DatabaseStageEntry<U> {
boolean existsAlmostCertainly) {
return dictionary
.updateAndGetDelta(keyMono, (oldValueSer) -> {
var result = updater.apply(
oldValueSer == null ? null : serializer.deserialize(oldValueSer).deserializedData());
if (result == null) {
return null;
} else {
return serializer.serialize(result);
try (oldValueSer) {
var result = updater.apply(oldValueSer == null ? null
: serializer.deserialize(oldValueSer).deserializedData());
if (result == null) {
return null;
} else {
return serializer.serialize(result);
}
}
}, existsAlmostCertainly).transform(mono -> LLUtils.mapLLDelta(mono,
serialized -> serializer.deserialize(serialized).deserializedData()

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine.database.collections;
import io.netty5.buffer.api.Buffer;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.ExtraKeyOperationResult;
@ -148,7 +149,14 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
.getValue(snapshot, key, existsAlmostCertainly)
.map(value -> Map.entry(key, Optional.of(value)))
.switchIfEmpty(Mono.fromSupplier(() -> Map.entry(key, Optional.empty())))
);
)
.doOnDiscard(Entry.class, unknownEntry -> {
if (unknownEntry.getValue() instanceof Optional optionalBuffer
&& optionalBuffer.isPresent()
&& optionalBuffer.get() instanceof Buffer buffer) {
buffer.close();
}
});
}
/**

View File

@ -774,7 +774,7 @@ public class LLLocalDictionary implements LLDictionary {
// Remember to change also update() if you are modifying this function
@SuppressWarnings("DuplicatedCode")
@Override
public Mono<LLDelta> updateAndGetDelta(Mono<Send<Buffer>> keyMono,
public Mono<Send<LLDelta>> updateAndGetDelta(Mono<Send<Buffer>> keyMono,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> updater,
boolean existsAlmostCertainly) {
return Mono.usingWhen(keyMono,
@ -878,7 +878,7 @@ public class LLLocalDictionary implements LLDictionary {
return LLDelta.of(
prevData != null ? prevData.send() : null,
newData != null ? newData.send() : null
);
).send();
} finally {
if (newData != null) {
newData.close();
@ -1536,8 +1536,8 @@ public class LLLocalDictionary implements LLDictionary {
true,
"getRangeKeysGrouped"
),
it -> it.flux(),
it -> it.release()
LLLocalKeyPrefixReactiveRocksIterator::flux,
LLLocalKeyPrefixReactiveRocksIterator::release
)
.subscribeOn(dbScheduler),
rangeSend -> Mono.fromRunnable(rangeSend::close)

View File

@ -94,7 +94,9 @@ public class LLMemoryDictionary implements LLDictionary {
}
private ByteList k(Send<Buffer> buf) {
return new BinaryLexicographicList(LLUtils.toArray(buf.receive()));
try (var b = buf.receive()) {
return new BinaryLexicographicList(LLUtils.toArray(b));
}
}
private Send<Buffer> kk(ByteList bytesList) {
@ -168,20 +170,13 @@ public class LLMemoryDictionary implements LLDictionary {
@Override
public Mono<Send<Buffer>> put(Mono<Send<Buffer>> keyMono, Mono<Send<Buffer>> valueMono, LLDictionaryResultType resultType) {
return Mono.usingWhen(keyMono,
key -> Mono.usingWhen(valueMono,
value -> Mono
.fromCallable(() -> {
var k = k(key);
var v = k(value);
return mainDb.put(k, v);
})
.transform(result -> this.transformResult(result, resultType))
.onErrorMap(cause -> new IOException("Failed to read", cause)),
value -> Mono.fromRunnable(value::close)
),
key -> Mono.fromRunnable(key::close)
);
var kMono = keyMono.map(this::k);
var vMono = valueMono.map(this::k);
return Mono
.zip(kMono, vMono)
.mapNotNull(tuple -> mainDb.put(tuple.getT1(), tuple.getT2()))
.transform(result -> this.transformResult(result, resultType))
.onErrorMap(cause -> new IOException("Failed to read", cause));
}
@Override
@ -190,38 +185,41 @@ public class LLMemoryDictionary implements LLDictionary {
}
@Override
public Mono<LLDelta> updateAndGetDelta(Mono<Send<Buffer>> keyMono,
public Mono<Send<LLDelta>> updateAndGetDelta(Mono<Send<Buffer>> keyMono,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> updater,
boolean existsAlmostCertainly) {
return Mono.usingWhen(keyMono,
key -> Mono.fromCallable(() -> {
if (updateMode == UpdateMode.DISALLOW) {
throw new UnsupportedOperationException("update() is disallowed");
try (key) {
if (updateMode == UpdateMode.DISALLOW) {
throw new UnsupportedOperationException("update() is disallowed");
}
AtomicReference<Send<Buffer>> oldRef = new AtomicReference<>(null);
var newValue = mainDb.compute(k(key), (_unused, old) -> {
if (old != null) {
oldRef.set(kk(old));
}
Buffer v;
try (var oldToSend = old != null ? kk(old) : null) {
var vToReceive = updater.apply(oldToSend);
v = vToReceive != null ? vToReceive.receive() : null;
} catch (SerializationException e) {
throw new IllegalStateException(e);
}
try {
if (v != null) {
return k(v.send());
} else {
return null;
}
} finally {
if (v != null) {
v.close();
}
}
});
return LLDelta.of(oldRef.get(), newValue != null ? kk(newValue) : null).send();
}
AtomicReference<Send<Buffer>> oldRef = new AtomicReference<>(null);
var newValue = mainDb.compute(k(key), (_unused, old) -> {
if (old != null) {
oldRef.set(kk(old));
}
Send<Buffer> v;
try {
v = updater.apply(old != null ? kk(old) : null);
} catch (SerializationException e) {
throw new IllegalStateException(e);
}
try {
if (v != null) {
return k(v);
} else {
return null;
}
} finally {
if (v != null) {
v.close();
}
}
});
return LLDelta.of(oldRef.get(), newValue != null ? kk(newValue) : null);
}),
key -> Mono.fromRunnable(key::close)
);

View File

@ -37,11 +37,7 @@ public class DbTestUtils {
public static final String BIG_STRING = generateBigString();
private static String generateBigString() {
var sb = new StringBuilder();
for (int i = 0; i < 1024; i++) {
sb.append("0123456789");
}
return sb.toString();
return "0123456789".repeat(1024);
}
public static record TestAllocator(PooledBufferAllocator allocator) {}
@ -78,7 +74,9 @@ public class DbTestUtils {
Function<LLKeyValueDatabase, Publisher<U>> action) {
return Flux.usingWhen(
temporaryDbGenerator.openTempDb(alloc),
tempDb -> action.apply(tempDb.db()),
tempDb -> Flux.from(action.apply(tempDb.db())).doOnDiscard(Object.class, o -> {
System.out.println("Discarded: " + o.getClass().getName() + ", " + o);
}),
temporaryDbGenerator::closeTempDb
);
}

View File

@ -29,6 +29,7 @@ import reactor.util.function.Tuples;
public abstract class TestDictionaryMap {
private TestAllocator allocator;
private boolean checkLeaks = true;
private static boolean isTestBadKeysEnabled() {
return System.getProperty("badkeys", "true").equalsIgnoreCase("true");
@ -87,7 +88,9 @@ public abstract class TestDictionaryMap {
@AfterEach
public void afterEach() {
ensureNoLeaks(allocator.allocator(), true, false);
if (checkLeaks) {
ensureNoLeaks(allocator.allocator(), true, false);
}
destroyAllocator(allocator);
}
@ -104,6 +107,7 @@ public abstract class TestDictionaryMap {
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
stpVer.expectNext(value).verifyComplete();
@ -123,6 +127,7 @@ public abstract class TestDictionaryMap {
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
stpVer.expectNext(value).verifyComplete();
@ -145,6 +150,7 @@ public abstract class TestDictionaryMap {
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
stpVer.expectNext("error?").expectNext(value).verifyComplete();
@ -167,6 +173,7 @@ public abstract class TestDictionaryMap {
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
stpVer.expectNext(value).verifyComplete();
@ -189,6 +196,7 @@ public abstract class TestDictionaryMap {
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
stpVer.expectNext(false, true, false).verifyComplete();
@ -229,6 +237,7 @@ public abstract class TestDictionaryMap {
)
.doAfterTerminate(map::release)
)
.transform(LLUtils::handleDiscard)
));
if (updateMode == UpdateMode.DISALLOW || shouldFail) {
stpVer.verifyError();
@ -271,6 +280,7 @@ public abstract class TestDictionaryMap {
)
.doAfterTerminate(map::release)
)
.transform(LLUtils::handleDiscard)
));
if (updateMode == UpdateMode.DISALLOW || shouldFail) {
stpVer.verifyError();
@ -297,6 +307,7 @@ public abstract class TestDictionaryMap {
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
stpVer.expectNext(true, true, false, true).verifyComplete();
@ -358,6 +369,7 @@ public abstract class TestDictionaryMap {
.transform(LLUtils::handleDiscard)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
@ -382,8 +394,10 @@ public abstract class TestDictionaryMap {
)
.filter(k -> k.getValue().isPresent())
.map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow()))
.transform(LLUtils::handleDiscard)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
@ -411,6 +425,7 @@ public abstract class TestDictionaryMap {
.transform(LLUtils::handleDiscard)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
@ -437,8 +452,10 @@ public abstract class TestDictionaryMap {
)
.filter(k -> k.getValue().isPresent())
.map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow()))
.transform(LLUtils::handleDiscard)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
@ -476,6 +493,7 @@ public abstract class TestDictionaryMap {
})
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
stpVer.expectNext(!entries.isEmpty(), false, !entries.isEmpty()).verifyComplete();
@ -497,6 +515,7 @@ public abstract class TestDictionaryMap {
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
@ -522,6 +541,7 @@ public abstract class TestDictionaryMap {
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
@ -549,6 +569,7 @@ public abstract class TestDictionaryMap {
.transform(LLUtils::handleDiscard)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
@ -578,6 +599,7 @@ public abstract class TestDictionaryMap {
.transform(LLUtils::handleDiscard)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
@ -612,6 +634,7 @@ public abstract class TestDictionaryMap {
.transform(LLUtils::handleDiscard)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
@ -641,6 +664,7 @@ public abstract class TestDictionaryMap {
.transform(LLUtils::handleDiscard)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
stpVer.expectNext(true, entries.isEmpty()).verifyComplete();
@ -667,6 +691,7 @@ public abstract class TestDictionaryMap {
.transform(LLUtils::handleDiscard)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
stpVer.expectNext(true, entries.isEmpty(), true).verifyComplete();

View File

@ -42,6 +42,7 @@ import reactor.util.function.Tuples;
public abstract class TestDictionaryMapDeep {
private TestAllocator allocator;
private boolean checkLeaks = true;
private static boolean isTestBadKeysEnabled() {
return System.getProperty("badkeys", "true").equalsIgnoreCase("true");
@ -164,7 +165,9 @@ public abstract class TestDictionaryMapDeep {
@AfterEach
public void afterEach() {
ensureNoLeaks(allocator.allocator(), true, false);
if (checkLeaks) {
ensureNoLeaks(allocator.allocator(), true, false);
}
destroyAllocator(allocator);
}
@ -181,6 +184,7 @@ public abstract class TestDictionaryMapDeep {
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
stpVer.expectNext(value).verifyComplete();
@ -203,6 +207,7 @@ public abstract class TestDictionaryMapDeep {
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
stpVer.expectNext(Map.entry(key, value)).verifyComplete();
@ -245,6 +250,7 @@ public abstract class TestDictionaryMapDeep {
))
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
value.forEach((k, v) -> remainingEntries.add(Tuples.of(key, k, v)));
@ -271,6 +277,7 @@ public abstract class TestDictionaryMapDeep {
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
stpVer.expectNext(value).verifyComplete();
@ -295,6 +302,7 @@ public abstract class TestDictionaryMapDeep {
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
stpVer.expectNext(Map.of("nothing", "nothing"), Map.of("error?", "error.")).expectNext(value).verifyComplete();
@ -332,6 +340,7 @@ public abstract class TestDictionaryMapDeep {
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
stpVer.expectNext("error?", value).verifyComplete();
@ -354,6 +363,7 @@ public abstract class TestDictionaryMapDeep {
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
stpVer.expectNext(value).verifyComplete();
@ -392,6 +402,7 @@ public abstract class TestDictionaryMapDeep {
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
stpVer.expectNext("error?", value).verifyComplete();
@ -414,6 +425,7 @@ public abstract class TestDictionaryMapDeep {
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
stpVer.expectNext(false, true, false).verifyComplete();
@ -452,6 +464,7 @@ public abstract class TestDictionaryMapDeep {
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
stpVer.expectNext(true, true, false).verifyComplete();
@ -537,6 +550,7 @@ public abstract class TestDictionaryMapDeep {
)
)
.doAfterTerminate(map::release)
.transform(LLUtils::handleDiscard)
)
));
if (updateMode == UpdateMode.DISALLOW || shouldFail) {
@ -629,6 +643,7 @@ public abstract class TestDictionaryMapDeep {
)
)
.doAfterTerminate(map::release)
.transform(LLUtils::handleDiscard)
)
));
if (updateMode == UpdateMode.DISALLOW || shouldFail) {
@ -656,6 +671,7 @@ public abstract class TestDictionaryMapDeep {
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
stpVer.expectNext(true, true, false, true).verifyComplete();
@ -709,8 +725,10 @@ public abstract class TestDictionaryMapDeep {
)
.filter(k -> k.getValue().isPresent())
.map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow()))
.transform(LLUtils::handleDiscard)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
@ -737,6 +755,7 @@ public abstract class TestDictionaryMapDeep {
.map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow()))
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
@ -760,9 +779,11 @@ public abstract class TestDictionaryMapDeep {
map.setAllValuesAndGetPrevious(Flux.fromIterable(entries.entrySet()))
)
.doAfterTerminate(map::release)
.transform(LLUtils::handleDiscard)
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
@ -789,8 +810,10 @@ public abstract class TestDictionaryMapDeep {
)
.filter(k -> k.getValue().isPresent())
.map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow()))
.transform(LLUtils::handleDiscard)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
@ -826,6 +849,7 @@ public abstract class TestDictionaryMapDeep {
.transform(LLUtils::handleDiscard)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
stpVer.expectNext(!entries.isEmpty(), false, !entries.isEmpty()).verifyComplete();
@ -850,6 +874,7 @@ public abstract class TestDictionaryMapDeep {
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
@ -873,8 +898,10 @@ public abstract class TestDictionaryMapDeep {
.concatMapIterable(list -> list)
.doAfterTerminate(map::release)
)
.transform(LLUtils::handleDiscard)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
@ -901,6 +928,7 @@ public abstract class TestDictionaryMapDeep {
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
@ -929,6 +957,7 @@ public abstract class TestDictionaryMapDeep {
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
@ -962,6 +991,7 @@ public abstract class TestDictionaryMapDeep {
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
entries.forEach((k, v) -> remainingEntries.add(Map.entry(k, v)));
@ -986,8 +1016,10 @@ public abstract class TestDictionaryMapDeep {
)
.doAfterTerminate(map::release)
)
.transform(LLUtils::handleDiscard)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.expectNext(true).verifyError();
} else {
stpVer.expectNext(true, entries.isEmpty()).verifyComplete();
@ -1012,6 +1044,7 @@ public abstract class TestDictionaryMapDeep {
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.expectNext(true).verifyError();
} else {
stpVer.expectNext(true, entries.isEmpty(), true).verifyComplete();

View File

@ -35,6 +35,7 @@ import reactor.util.function.Tuples;
public abstract class TestDictionaryMapDeepHashMap {
private TestAllocator allocator;
private boolean checkLeaks = true;
private static boolean isTestBadKeysEnabled() {
return System.getProperty("badkeys", "true").equalsIgnoreCase("true");
@ -105,7 +106,9 @@ public abstract class TestDictionaryMapDeepHashMap {
@AfterEach
public void afterEach() {
ensureNoLeaks(allocator.allocator(), true, false);
if (checkLeaks) {
ensureNoLeaks(allocator.allocator(), true, false);
}
destroyAllocator(allocator);
}
@ -127,6 +130,7 @@ public abstract class TestDictionaryMapDeepHashMap {
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
stpVer.expectNext(value).verifyComplete();

View File

@ -175,7 +175,7 @@ public abstract class TestLLDictionaryLeaks {
var dict = getDict(updateMode);
var key = Mono.fromCallable(() -> fromString("test-key"));
var value = Mono.fromCallable(() -> fromString("test-value"));
runVoid(dict.put(key, value, resultType).then());
runVoid(dict.put(key, value, resultType).then().doOnDiscard(Send.class, Send::close));
}
@ParameterizedTest
@ -229,6 +229,6 @@ public abstract class TestLLDictionaryLeaks {
public void testRemove(UpdateMode updateMode, LLDictionaryResultType resultType) {
var dict = getDict(updateMode);
var key = Mono.fromCallable(() -> fromString("test-key"));
runVoid(dict.remove(key, resultType).then());
runVoid(dict.remove(key, resultType).then().doOnDiscard(Send.class, Send::close));
}
}