This commit is contained in:
Andrea Cavalli 2021-08-22 23:50:50 +02:00
parent 2a24570512
commit c5552d2827
4 changed files with 31 additions and 21 deletions

View File

@ -531,7 +531,8 @@ public class LLUtils {
public static <T> Flux<T> handleDiscard(Flux<T> mono) {
return mono
.doOnDiscard(ReferenceCounted.class, LLUtils::discardRefCounted)
.doOnDiscard(Map.Entry.class, LLUtils::discardEntry);
.doOnDiscard(Map.Entry.class, LLUtils::discardEntry)
.doOnDiscard(Collection.class, LLUtils::discardCollection);
}
private static void discardEntry(Map.Entry<?, ?> e) {
@ -552,4 +553,27 @@ public class LLUtils {
referenceCounted.release();
}
}
private static void discardCollection(Collection<?> collection) {
for (Object o : collection) {
if (o instanceof ReferenceCounted referenceCounted) {
if (referenceCounted.refCnt() > 0) {
referenceCounted.release();
}
} else if (o instanceof Map.Entry entry) {
if (entry.getKey() instanceof ReferenceCounted bb) {
if (bb.refCnt() > 0) {
bb.release();
}
}
if (entry.getValue() instanceof ReferenceCounted bb) {
if (bb.refCnt() > 0) {
bb.release();
}
}
} else {
break;
}
}
}
}

View File

@ -1093,22 +1093,7 @@ public class LLLocalDictionary implements LLDictionary {
}
}
), 2) // Max concurrency is 2 to read data while preparing the next segment
.doOnDiscard(Entry.class, entry -> {
if (entry.getKey() instanceof ByteBuf && entry.getValue() instanceof ByteBuf) {
//noinspection unchecked
var castedEntry = (Entry<ByteBuf, ByteBuf>) entry;
castedEntry.getKey().release();
castedEntry.getValue().release();
}
})
.doOnDiscard(Collection.class, obj -> {
//noinspection unchecked
var castedEntries = (Collection<Entry<ByteBuf, ByteBuf>>) obj;
for (Entry<ByteBuf, ByteBuf> entry : castedEntries) {
entry.getKey().release();
entry.getValue().release();
}
});
.transform(LLUtils::handleDiscard);
}
@Override

View File

@ -1,9 +1,8 @@
package it.cavallium.dbengine.database.serialization;
import java.io.IOException;
import java.io.UncheckedIOException;
public class SerializationException extends Exception {
public class SerializationException extends IOException {
public SerializationException() {
super();

View File

@ -432,8 +432,10 @@ public class TestDictionaryMap {
@ParameterizedTest
@MethodSource("provideArgumentsPutMulti")
public void testSetAndGetChanged(DbType dbType, UpdateMode updateMode, Map<String, String> entries, boolean shouldFail) {
var remainingEntries = new ConcurrentHashMap<Entry<String, String>, Boolean>().keySet(true);
public void testSetAndGetChanged(DbType dbType,
UpdateMode updateMode,
Map<String, String> entries,
boolean shouldFail) {
Step<Boolean> stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryMap(dict, dbType, 5))