diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 78478fb..2aad71a 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -531,7 +531,8 @@ public class LLUtils { public static Flux handleDiscard(Flux mono) { return mono .doOnDiscard(ReferenceCounted.class, LLUtils::discardRefCounted) - .doOnDiscard(Map.Entry.class, LLUtils::discardEntry); + .doOnDiscard(Map.Entry.class, LLUtils::discardEntry) + .doOnDiscard(Collection.class, LLUtils::discardCollection); } private static void discardEntry(Map.Entry e) { @@ -552,4 +553,27 @@ public class LLUtils { referenceCounted.release(); } } + + private static void discardCollection(Collection collection) { + for (Object o : collection) { + if (o instanceof ReferenceCounted referenceCounted) { + if (referenceCounted.refCnt() > 0) { + referenceCounted.release(); + } + } else if (o instanceof Map.Entry entry) { + if (entry.getKey() instanceof ReferenceCounted bb) { + if (bb.refCnt() > 0) { + bb.release(); + } + } + if (entry.getValue() instanceof ReferenceCounted bb) { + if (bb.refCnt() > 0) { + bb.release(); + } + } + } else { + break; + } + } + } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 7e45c65..88b54dc 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -1093,22 +1093,7 @@ public class LLLocalDictionary implements LLDictionary { } } ), 2) // Max concurrency is 2 to read data while preparing the next segment - .doOnDiscard(Entry.class, entry -> { - if (entry.getKey() instanceof ByteBuf && entry.getValue() instanceof ByteBuf) { - //noinspection unchecked - var castedEntry = (Entry) entry; - castedEntry.getKey().release(); - castedEntry.getValue().release(); - } - }) - .doOnDiscard(Collection.class, obj -> { - //noinspection unchecked - var castedEntries = (Collection>) obj; - for (Entry entry : castedEntries) { - entry.getKey().release(); - entry.getValue().release(); - } - }); + .transform(LLUtils::handleDiscard); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/SerializationException.java b/src/main/java/it/cavallium/dbengine/database/serialization/SerializationException.java index 6330782..435071d 100644 --- a/src/main/java/it/cavallium/dbengine/database/serialization/SerializationException.java +++ b/src/main/java/it/cavallium/dbengine/database/serialization/SerializationException.java @@ -1,9 +1,8 @@ package it.cavallium.dbengine.database.serialization; import java.io.IOException; -import java.io.UncheckedIOException; -public class SerializationException extends Exception { +public class SerializationException extends IOException { public SerializationException() { super(); diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java index 61b8d67..84397f2 100644 --- a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java +++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java @@ -432,8 +432,10 @@ public class TestDictionaryMap { @ParameterizedTest @MethodSource("provideArgumentsPutMulti") - public void testSetAndGetChanged(DbType dbType, UpdateMode updateMode, Map entries, boolean shouldFail) { - var remainingEntries = new ConcurrentHashMap, Boolean>().keySet(true); + public void testSetAndGetChanged(DbType dbType, + UpdateMode updateMode, + Map entries, + boolean shouldFail) { Step stpVer = StepVerifier .create(tempDb(db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, dbType, 5))