diff --git a/pom.xml b/pom.xml
index c080b41..b9dd848 100644
--- a/pom.xml
+++ b/pom.xml
@@ -377,17 +377,17 @@
io.projectreactor
reactor-core
- 3.4.8
+ 3.4.9
io.projectreactor
reactor-tools
- 3.4.8
+ 3.4.9
io.projectreactor
reactor-test
- 3.4.8
+ 3.4.9
org.novasearch
diff --git a/src/main/java/it/cavallium/dbengine/client/DatabaseOptions.java b/src/main/java/it/cavallium/dbengine/client/DatabaseOptions.java
index 3a8b6a9..119e718 100644
--- a/src/main/java/it/cavallium/dbengine/client/DatabaseOptions.java
+++ b/src/main/java/it/cavallium/dbengine/client/DatabaseOptions.java
@@ -14,5 +14,4 @@ public record DatabaseOptions(Map extraFlags,
boolean allowMemoryMapping,
boolean allowNettyDirect,
boolean useNettyDirect,
- boolean enableDbAssertionsWhenUsingAssertions,
int maxOpenFiles) {}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java
index 1caf698..5e2c135 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java
@@ -71,23 +71,23 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
return getMulti(snapshot, keys, false);
}
- Flux> putMulti(Flux> entries, boolean getOldValues);
+ Flux putMulti(Flux entries, boolean getOldValues);
Flux> updateMulti(Flux> entries,
BiSerializationFunction updateFunction);
- Flux> getRange(@Nullable LLSnapshot snapshot, Mono range, boolean existsAlmostCertainly);
+ Flux getRange(@Nullable LLSnapshot snapshot, Mono range, boolean existsAlmostCertainly);
- default Flux> getRange(@Nullable LLSnapshot snapshot, Mono range) {
+ default Flux getRange(@Nullable LLSnapshot snapshot, Mono range) {
return getRange(snapshot, range, false);
}
- Flux>> getRangeGrouped(@Nullable LLSnapshot snapshot,
+ Flux> getRangeGrouped(@Nullable LLSnapshot snapshot,
Mono range,
int prefixLength,
boolean existsAlmostCertainly);
- default Flux>> getRangeGrouped(@Nullable LLSnapshot snapshot,
+ default Flux> getRangeGrouped(@Nullable LLSnapshot snapshot,
Mono range,
int prefixLength) {
return getRangeGrouped(snapshot, range, prefixLength, false);
@@ -101,11 +101,11 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
Flux badBlocks(Mono range);
- Mono setRange(Mono range, Flux> entries);
+ Mono setRange(Mono range, Flux entries);
default Mono replaceRange(Mono range,
boolean canKeysChange,
- Function, Mono>> entriesReplacer,
+ Function> entriesReplacer,
boolean existsAlmostCertainly) {
return Mono.defer(() -> {
if (canKeysChange) {
@@ -126,7 +126,7 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
default Mono replaceRange(Mono range,
boolean canKeysChange,
- Function, Mono>> entriesReplacer) {
+ Function> entriesReplacer) {
return replaceRange(range, canKeysChange, entriesReplacer, false);
}
@@ -134,9 +134,9 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
Mono sizeRange(@Nullable LLSnapshot snapshot, Mono range, boolean fast);
- Mono> getOne(@Nullable LLSnapshot snapshot, Mono range);
+ Mono getOne(@Nullable LLSnapshot snapshot, Mono range);
Mono getOneKey(@Nullable LLSnapshot snapshot, Mono range);
- Mono> removeOne(Mono range);
+ Mono removeOne(Mono range);
}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLEntry.java b/src/main/java/it/cavallium/dbengine/database/LLEntry.java
new file mode 100644
index 0000000..3a7a46a
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/LLEntry.java
@@ -0,0 +1,74 @@
+package it.cavallium.dbengine.database;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.IllegalReferenceCountException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.warp.commonutils.log.Logger;
+import org.warp.commonutils.log.LoggerFactory;
+
+public class LLEntry {
+
+ private static final Logger logger = LoggerFactory.getLogger(LLEntry.class);
+
+ private final AtomicInteger refCnt = new AtomicInteger(1);
+
+ private final ByteBuf key;
+ private final ByteBuf value;
+
+ public LLEntry(ByteBuf key, ByteBuf value) {
+ try {
+ this.key = key.retain();
+ this.value = value.retain();
+ } finally {
+ key.release();
+ value.release();
+ }
+ }
+
+ public ByteBuf getKey() {
+ if (refCnt.get() <= 0) {
+ throw new IllegalReferenceCountException(refCnt.get());
+ }
+ return key;
+ }
+
+ public ByteBuf getValue() {
+ if (refCnt.get() <= 0) {
+ throw new IllegalReferenceCountException(refCnt.get());
+ }
+ return value;
+ }
+
+ public void retain() {
+ if (refCnt.getAndIncrement() <= 0) {
+ throw new IllegalReferenceCountException(refCnt.get(), 1);
+ }
+ key.retain();
+ value.retain();
+ }
+
+ public void release() {
+ if (refCnt.decrementAndGet() < 0) {
+ throw new IllegalReferenceCountException(refCnt.get(), -1);
+ }
+ if (key.refCnt() > 0) {
+ key.release();
+ }
+ if (value.refCnt() > 0) {
+ value.release();
+ }
+ }
+
+ public boolean isReleased() {
+ return refCnt.get() <= 0;
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ if (refCnt.get() > 0) {
+ logger.warn(this.getClass().getName() + "::release has not been called!");
+ }
+ super.finalize();
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java
index 2aad71a..9c16c7e 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.function.Function;
import java.util.function.ToIntFunction;
@@ -42,13 +43,19 @@ import org.apache.lucene.search.SortedNumericSortField;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.RocksDB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.warp.commonutils.functional.IOFunction;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuple3;
@SuppressWarnings("unused")
public class LLUtils {
+ private static final Logger logger = LoggerFactory.getLogger(LLUtils.class);
+
private static final byte[] RESPONSE_TRUE = new byte[]{1};
private static final byte[] RESPONSE_FALSE = new byte[]{0};
private static final byte[] RESPONSE_TRUE_BUF = new byte[]{1};
@@ -514,35 +521,154 @@ public class LLUtils {
}
public static Mono handleDiscard(Mono mono) {
- return mono.doOnDiscard(Map.Entry.class, e -> {
- if (e.getKey() instanceof ByteBuf bb) {
- if (bb.refCnt() > 0) {
- bb.release();
- }
- }
- if (e.getValue() instanceof ByteBuf bb) {
- if (bb.refCnt() > 0) {
- bb.release();
- }
- }
- });
+ return mono
+ .doOnDiscard(Object.class, obj -> {
+ if (obj instanceof ReferenceCounted o) {
+ discardRefCounted(o);
+ } else if (obj instanceof Entry o) {
+ discardEntry(o);
+ } else if (obj instanceof Collection o) {
+ discardCollection(o);
+ } else if (obj instanceof Tuple3 o) {
+ discardTuple3(o);
+ } else if (obj instanceof Tuple2 o) {
+ discardTuple2(o);
+ } else if (obj instanceof LLEntry o) {
+ discardLLEntry(o);
+ } else if (obj instanceof LLRange o) {
+ discardLLRange(o);
+ } else if (obj instanceof Delta o) {
+ discardDelta(o);
+ } else if (obj instanceof Map o) {
+ discardMap(o);
+ }
+ });
+ // todo: check if the single object discard hook is more performant
+ /*
+ .doOnDiscard(ReferenceCounted.class, LLUtils::discardRefCounted)
+ .doOnDiscard(Map.Entry.class, LLUtils::discardEntry)
+ .doOnDiscard(Collection.class, LLUtils::discardCollection)
+ .doOnDiscard(Tuple2.class, LLUtils::discardTuple2)
+ .doOnDiscard(Tuple3.class, LLUtils::discardTuple3)
+ .doOnDiscard(LLEntry.class, LLUtils::discardLLEntry)
+ .doOnDiscard(LLRange.class, LLUtils::discardLLRange)
+ .doOnDiscard(Delta.class, LLUtils::discardDelta)
+ .doOnDiscard(Map.class, LLUtils::discardMap);
+
+ */
}
public static Flux handleDiscard(Flux mono) {
return mono
+ .doOnDiscard(Object.class, obj -> {
+ if (obj instanceof ReferenceCounted o) {
+ discardRefCounted(o);
+ } else if (obj instanceof Entry o) {
+ discardEntry(o);
+ } else if (obj instanceof Collection o) {
+ discardCollection(o);
+ } else if (obj instanceof Tuple3 o) {
+ discardTuple3(o);
+ } else if (obj instanceof Tuple2 o) {
+ discardTuple2(o);
+ } else if (obj instanceof LLEntry o) {
+ discardLLEntry(o);
+ } else if (obj instanceof LLRange o) {
+ discardLLRange(o);
+ } else if (obj instanceof Delta o) {
+ discardDelta(o);
+ } else if (obj instanceof Map o) {
+ discardMap(o);
+ } else {
+ System.err.println(obj.getClass().getName());
+ }
+ });
+ // todo: check if the single object discard hook is more performant
+ /*
.doOnDiscard(ReferenceCounted.class, LLUtils::discardRefCounted)
.doOnDiscard(Map.Entry.class, LLUtils::discardEntry)
- .doOnDiscard(Collection.class, LLUtils::discardCollection);
+ .doOnDiscard(Collection.class, LLUtils::discardCollection)
+ .doOnDiscard(Tuple2.class, LLUtils::discardTuple2)
+ .doOnDiscard(Tuple3.class, LLUtils::discardTuple3)
+ .doOnDiscard(LLEntry.class, LLUtils::discardLLEntry)
+ .doOnDiscard(LLRange.class, LLUtils::discardLLRange)
+ .doOnDiscard(Delta.class, LLUtils::discardDelta)
+ .doOnDiscard(Map.class, LLUtils::discardMap);
+
+ */
+ }
+
+ private static void discardLLEntry(LLEntry entry) {
+ logger.trace("Releasing discarded ByteBuf");
+ entry.release();
+ }
+
+ private static void discardLLRange(LLRange range) {
+ logger.trace("Releasing discarded ByteBuf");
+ range.release();
}
private static void discardEntry(Map.Entry, ?> e) {
if (e.getKey() instanceof ByteBuf bb) {
if (bb.refCnt() > 0) {
+ logger.trace("Releasing discarded ByteBuf");
bb.release();
}
}
if (e.getValue() instanceof ByteBuf bb) {
if (bb.refCnt() > 0) {
+ logger.trace("Releasing discarded ByteBuf");
+ bb.release();
+ }
+ }
+ }
+
+ private static void discardTuple2(Tuple2, ?> e) {
+ if (e.getT1() instanceof ByteBuf bb) {
+ if (bb.refCnt() > 0) {
+ logger.trace("Releasing discarded ByteBuf");
+ bb.release();
+ }
+ }
+ if (e.getT2() instanceof ByteBuf bb) {
+ if (bb.refCnt() > 0) {
+ logger.trace("Releasing discarded ByteBuf");
+ bb.release();
+ }
+ }
+ }
+
+ private static void discardTuple3(Tuple3, ?, ?> e) {
+ if (e.getT1() instanceof ByteBuf bb) {
+ if (bb.refCnt() > 0) {
+ logger.trace("Releasing discarded ByteBuf");
+ bb.release();
+ }
+ } else if (e.getT1() instanceof Optional opt) {
+ if (opt.isPresent() && opt.get() instanceof ByteBuf bb) {
+ logger.trace("Releasing discarded ByteBuf");
+ bb.release();
+ }
+ }
+ if (e.getT2() instanceof ByteBuf bb) {
+ if (bb.refCnt() > 0) {
+ logger.trace("Releasing discarded ByteBuf");
+ bb.release();
+ }
+ } else if (e.getT1() instanceof Optional opt) {
+ if (opt.isPresent() && opt.get() instanceof ByteBuf bb) {
+ logger.trace("Releasing discarded ByteBuf");
+ bb.release();
+ }
+ }
+ if (e.getT3() instanceof ByteBuf bb) {
+ if (bb.refCnt() > 0) {
+ logger.trace("Releasing discarded ByteBuf");
+ bb.release();
+ }
+ } else if (e.getT1() instanceof Optional opt) {
+ if (opt.isPresent() && opt.get() instanceof ByteBuf bb) {
+ logger.trace("Releasing discarded ByteBuf");
bb.release();
}
}
@@ -550,6 +676,7 @@ public class LLUtils {
private static void discardRefCounted(ReferenceCounted referenceCounted) {
if (referenceCounted.refCnt() > 0) {
+ logger.trace("Releasing discarded ByteBuf");
referenceCounted.release();
}
}
@@ -558,16 +685,19 @@ public class LLUtils {
for (Object o : collection) {
if (o instanceof ReferenceCounted referenceCounted) {
if (referenceCounted.refCnt() > 0) {
+ logger.trace("Releasing discarded ByteBuf");
referenceCounted.release();
}
} else if (o instanceof Map.Entry entry) {
if (entry.getKey() instanceof ReferenceCounted bb) {
if (bb.refCnt() > 0) {
+ logger.trace("Releasing discarded ByteBuf");
bb.release();
}
}
if (entry.getValue() instanceof ReferenceCounted bb) {
if (bb.refCnt() > 0) {
+ logger.trace("Releasing discarded ByteBuf");
bb.release();
}
}
@@ -576,4 +706,42 @@ public class LLUtils {
}
}
}
+
+ private static void discardDelta(Delta> delta) {
+ if (delta.previous() instanceof ByteBuf bb) {
+ if (bb.refCnt() > 0) {
+ logger.trace("Releasing discarded ByteBuf");
+ bb.release();
+ }
+ }
+ if (delta.current() instanceof ByteBuf bb) {
+ if (bb.refCnt() > 0) {
+ logger.trace("Releasing discarded ByteBuf");
+ bb.release();
+ }
+ }
+ }
+
+ private static void discardMap(Map, ?> map) {
+ for (Entry, ?> entry : map.entrySet()) {
+ boolean hasByteBuf = false;
+ if (entry.getKey() instanceof ByteBuf bb) {
+ if (bb.refCnt() > 0) {
+ logger.trace("Releasing discarded ByteBuf");
+ bb.release();
+ }
+ hasByteBuf = true;
+ }
+ if (entry.getValue() instanceof ByteBuf bb) {
+ if (bb.refCnt() > 0) {
+ logger.trace("Releasing discarded ByteBuf");
+ bb.release();
+ }
+ hasByteBuf = true;
+ }
+ if (!hasByteBuf) {
+ break;
+ }
+ }
+ }
}
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java
index 41c439e..f9006b1 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java
@@ -14,7 +14,11 @@ public class DatabaseEmpty {
public static final Serializer NOTHING_SERIALIZER = new Serializer<>() {
@Override
public @NotNull Nothing deserialize(@NotNull ByteBuf serialized) {
- return NOTHING;
+ try {
+ return NOTHING;
+ } finally {
+ serialized.release();
+ }
}
@Override
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java
index 8281ab7..cc04e56 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java
@@ -7,6 +7,7 @@ import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.ExtraKeyOperationResult;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
+import it.cavallium.dbengine.database.LLEntry;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
@@ -15,6 +16,7 @@ import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -28,6 +30,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;
import reactor.util.function.Tuple2;
+import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
/**
@@ -87,6 +90,8 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep extends DatabaseMapDictionaryDeep {
try {
- sink.next(Map.entry(this.toKey(serializeSuffix(entry.getKey())),
+ sink.next(new LLEntry(this.toKey(serializeSuffix(entry.getKey())),
valueSerializer.serialize(entry.getValue())));
} catch (SerializationException e) {
sink.error(e);
@@ -151,26 +156,18 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep putValue(T keySuffix, U value) {
- return Mono
- .using(
- () -> serializeSuffix(keySuffix),
- keySuffixBuf -> Mono
- .using(
- () -> toKey(keySuffixBuf.retain()),
- keyBuf -> Mono
- .using(() -> valueSerializer.serialize(value),
- valueBuf -> dictionary
- .put(LLUtils.lazyRetain(keyBuf),
- LLUtils.lazyRetain(valueBuf),
- LLDictionaryResultType.VOID)
- .doOnNext(ReferenceCounted::release),
- ReferenceCounted::release
- ),
- ReferenceCounted::release
- ),
+ return Mono.using(() -> serializeSuffix(keySuffix),
+ keySuffixBuf -> Mono.using(() -> toKey(keySuffixBuf.retain()),
+ keyBuf -> Mono.using(() -> valueSerializer.serialize(value),
+ valueBuf -> dictionary
+ .put(LLUtils.lazyRetain(keyBuf), LLUtils.lazyRetain(valueBuf), LLDictionaryResultType.VOID)
+ .doOnNext(ReferenceCounted::release),
+ ReferenceCounted::release
+ ),
ReferenceCounted::release
- )
- .then();
+ ),
+ ReferenceCounted::release
+ ).then();
}
@Override
@@ -340,35 +337,43 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>> getMulti(@Nullable CompositeSnapshot snapshot, Flux keys, boolean existsAlmostCertainly) {
- return dictionary
- .getMulti(resolveSnapshot(snapshot), keys.flatMap(keySuffix -> Mono.fromCallable(() -> {
- ByteBuf keySuffixBuf = serializeSuffix(keySuffix);
+ return dictionary.getMulti(resolveSnapshot(snapshot), keys.flatMap(keySuffix -> Mono.fromCallable(() -> {
+ ByteBuf keySuffixBuf = serializeSuffix(keySuffix);
+ try {
+ var key = toKey(keySuffixBuf.retain());
try {
- return Tuples.of(keySuffix, toKey(keySuffixBuf.retain()));
+ return Tuples.of(keySuffix, key.retain());
} finally {
- keySuffixBuf.release();
+ key.release();
}
- })), existsAlmostCertainly)
- .flatMapSequential(entry -> {
- entry.getT2().release();
- return Mono.fromCallable(() -> {
- Optional valueOpt;
- if (entry.getT3().isPresent()) {
- valueOpt = Optional.of(valueSerializer.deserialize(entry.getT3().get()));
- } else {
- valueOpt = Optional.empty();
+ } finally {
+ keySuffixBuf.release();
+ }
+ })), existsAlmostCertainly).flatMapSequential(entry -> {
+ entry.getT2().release();
+ return Mono.fromCallable(() -> {
+ Optional valueOpt;
+ if (entry.getT3().isPresent()) {
+ var buf = entry.getT3().get();
+ try {
+ valueOpt = Optional.of(valueSerializer.deserialize(buf.retain()));
+ } finally {
+ buf.release();
}
- return Map.entry(entry.getT1(), valueOpt);
- });
+ } else {
+ valueOpt = Optional.empty();
+ }
+ return Map.entry(entry.getT1(), valueOpt);
});
+ }).transform(LLUtils::handleDiscard);
}
- private Entry serializeEntry(T key, U value) throws SerializationException {
+ private LLEntry serializeEntry(T key, U value) throws SerializationException {
ByteBuf serializedKey = toKey(serializeSuffix(key));
try {
ByteBuf serializedValue = valueSerializer.serialize(value);
try {
- return Map.entry(serializedKey.retain(), serializedValue.retain());
+ return new LLEntry(serializedKey.retain(), serializedValue.retain());
} finally {
serializedValue.release();
}
@@ -380,20 +385,21 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep putMulti(Flux> entries) {
var serializedEntries = entries
- .flatMap(entry -> Mono
- .fromCallable(() -> serializeEntry(entry.getKey(), entry.getValue()))
- .doOnDiscard(Entry.class, uncastedEntry -> {
- if (uncastedEntry.getKey() instanceof ByteBuf byteBuf) {
- byteBuf.release();
- }
- if (uncastedEntry.getValue() instanceof ByteBuf byteBuf) {
- byteBuf.release();
- }
- })
- );
+ .handle((entry, sink) -> {
+ try {
+ sink.next(serializeEntry(entry.getKey(), entry.getValue()));
+ } catch (SerializationException e) {
+ sink.error(e);
+ }
+ });
return dictionary
.putMulti(serializedEntries, false)
- .then();
+ .then()
+ .doOnDiscard(LLEntry.class, entry -> {
+ if (!entry.isReleased()) {
+ entry.release();
+ }
+ });
}
@Override
@@ -455,21 +461,33 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>handle((serializedEntry, sink) -> {
+ ByteBuf key = serializedEntry.getKey();
+ ByteBuf value = serializedEntry.getValue();
try {
- sink.next(Map.entry(
- deserializeSuffix(stripPrefix(serializedEntry.getKey(), false)),
- valueSerializer.deserialize(serializedEntry.getValue())
- ));
+ ByteBuf keySuffix = stripPrefix(key.retain(), false);
+ try {
+ sink.next(Map.entry(deserializeSuffix(keySuffix.retain()),
+ valueSerializer.deserialize(value.retain())));
+ } finally {
+ keySuffix.release();
+ }
} catch (SerializationException e) {
sink.error(e);
+ } finally {
+ key.release();
+ value.release();
}
})
.doOnDiscard(Entry.class, uncastedEntry -> {
if (uncastedEntry.getKey() instanceof ByteBuf byteBuf) {
- byteBuf.release();
+ if (byteBuf.refCnt() > 0) {
+ byteBuf.release();
+ }
}
if (uncastedEntry.getValue() instanceof ByteBuf byteBuf) {
- byteBuf.release();
+ if (byteBuf.refCnt() > 0) {
+ byteBuf.release();
+ }
}
});
}
@@ -481,8 +499,22 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep getAllValues(null),
b -> dictionary.setRange(rangeMono, entries.handle((entry, sink) -> {
try {
- ByteBuf serializedValue = valueSerializer.serialize(entry.getValue());
- sink.next(Map.entry(toKey(serializeSuffix(entry.getKey())), serializedValue));
+ ByteBuf serializedKeySuffix = serializeSuffix(entry.getKey());
+ try {
+ ByteBuf serializedKey = toKey(serializedKeySuffix);
+ try {
+ ByteBuf serializedValue = valueSerializer.serialize(entry.getValue());
+ try {
+ sink.next(new LLEntry(serializedKey.retain(), serializedValue.retain()));
+ } finally {
+ serializedValue.release();
+ }
+ } finally {
+ serializedKey.release();
+ }
+ } finally {
+ serializedKeySuffix.release();
+ }
} catch (SerializationException e) {
sink.error(e);
}
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java
index 2873454..add1919 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java
@@ -22,6 +22,7 @@ import java.util.Map.Entry;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.util.function.Tuples;
// todo: implement optimized methods (which?)
public class DatabaseMapDictionaryDeep> implements DatabaseStageMap {
@@ -393,25 +394,10 @@ public class DatabaseMapDictionaryDeep> implem
return Mono.using(
() -> serializeSuffix(keySuffix),
keySuffixData -> {
- Flux debuggingKeysFlux = Mono.>defer(() -> {
- if (LLLocalDictionary.DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED
- && this.subStageGetter.needsDebuggingKeyFlux()) {
- return Flux
- .using(
- () -> toExtRange(keySuffixData.retain()),
- extRangeBuf -> this.dictionary
- .getRangeKeys(resolveSnapshot(snapshot), LLUtils.lazyRetainRange(extRangeBuf)),
- LLRange::release
- )
- .collectList();
- } else {
- return Mono.just(List.of());
- }
- }).flatMapIterable(it -> it);
return Mono.using(
() -> toKeyWithoutExt(keySuffixData.retain()),
keyWithoutExt -> this.subStageGetter
- .subStage(dictionary, snapshot, LLUtils.lazyRetain(keyWithoutExt), debuggingKeysFlux),
+ .subStage(dictionary, snapshot, LLUtils.lazyRetain(keyWithoutExt)),
ReferenceCounted::release
);
},
@@ -433,87 +419,43 @@ public class DatabaseMapDictionaryDeep> implem
@Override
public Flux> getAllStages(@Nullable CompositeSnapshot snapshot) {
+
return Flux
- .defer(() -> {
- if (LLLocalDictionary.DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED && this.subStageGetter.needsDebuggingKeyFlux()) {
- return dictionary
- .getRangeKeysGrouped(resolveSnapshot(snapshot), rangeMono, keyPrefixLength + keySuffixLength)
- .concatMap(rangeKeys -> Flux
- .using(
- () -> {
- assert this.subStageGetter.isMultiKey() || rangeKeys.size() == 1;
- ByteBuf groupKeyWithExt = rangeKeys.get(0).retainedSlice();
- ByteBuf groupKeyWithoutExt = removeExtFromFullKey(groupKeyWithExt.retain(), true);
- ByteBuf groupSuffix = this.stripPrefix(groupKeyWithoutExt.retain(), true);
- return new GroupBuffers(groupKeyWithExt, groupKeyWithoutExt, groupSuffix);
- },
- buffers -> Mono
- .fromCallable(() -> {
- assert subStageKeysConsistency(buffers.groupKeyWithExt.readableBytes());
- return null;
- })
- .then(this.subStageGetter
- .subStage(dictionary,
- snapshot,
- LLUtils.lazyRetain(buffers.groupKeyWithoutExt),
- Flux.fromIterable(rangeKeys).map(ByteBuf::retain)
- )
- .>handle((us, sink) -> {
- try {
- var deserializedSuffix = this.deserializeSuffix(buffers.groupSuffix.retain());
- sink.next(Map.entry(deserializedSuffix, us));
- } catch (SerializationException ex) {
- sink.error(ex);
- }
- })
- ),
- buffers -> {
- buffers.groupSuffix.release();
- buffers.groupKeyWithoutExt.release();
- buffers.groupKeyWithExt.release();
- }
- )
- .doAfterTerminate(() -> {
- for (ByteBuf rangeKey : rangeKeys) {
- rangeKey.release();
- }
- })
- )
- .doOnDiscard(Collection.class, discardedCollection -> {
- for (Object o : discardedCollection) {
- if (o instanceof ByteBuf byteBuf) {
- byteBuf.release();
+ .defer(() -> dictionary.getRangeKeyPrefixes(resolveSnapshot(snapshot), rangeMono, keyPrefixLength + keySuffixLength))
+ .flatMapSequential(groupKeyWithoutExt -> Mono
+ .using(
+ () -> {
+ try {
+ var groupSuffix = this.stripPrefix(groupKeyWithoutExt.retain(), true);
+ try {
+ assert subStageKeysConsistency(groupKeyWithoutExt.readableBytes() + keyExtLength);
+ return Tuples.of(groupKeyWithoutExt.retain(), groupSuffix.retain());
+ } finally {
+ groupSuffix.release();
}
+ } finally {
+ groupKeyWithoutExt.release();
}
- });
- } else {
- return Flux
- .defer(() -> dictionary.getRangeKeyPrefixes(resolveSnapshot(snapshot), rangeMono, keyPrefixLength + keySuffixLength))
- .flatMapSequential(groupKeyWithoutExt -> Mono
- .using(
- () -> {
- var groupSuffix = this.stripPrefix(groupKeyWithoutExt.retain(), true);
- assert subStageKeysConsistency(groupKeyWithoutExt.readableBytes() + keyExtLength);
- return groupSuffix;
- },
- groupSuffix -> this.subStageGetter
- .subStage(dictionary,
- snapshot,
- LLUtils.lazyRetain(groupKeyWithoutExt),
- Flux.empty()
- )
- .>handle((us, sink) -> {
- try {
- sink.next(Map.entry(this.deserializeSuffix(groupSuffix.retain()), us));
- } catch (SerializationException ex) {
- sink.error(ex);
- }
- }),
- ReferenceCounted::release
+ },
+ groupKeyWithoutExtAndGroupSuffix -> this.subStageGetter
+ .subStage(dictionary,
+ snapshot,
+ LLUtils.lazyRetain(groupKeyWithoutExtAndGroupSuffix.getT1())
)
- );
- }
- });
+ .>handle((us, sink) -> {
+ try {
+ sink.next(Map.entry(this.deserializeSuffix(groupKeyWithoutExtAndGroupSuffix.getT2().retain()), us));
+ } catch (SerializationException ex) {
+ sink.error(ex);
+ }
+ }),
+ entry -> {
+ entry.getT1().release();
+ entry.getT2().release();
+ }
+ )
+ )
+ .transform(LLUtils::handleDiscard);
}
private boolean subStageKeysConsistency(int totalKeyLength) {
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetter.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetter.java
index b954d6d..b4609ad 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetter.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetter.java
@@ -13,10 +13,7 @@ public interface SubStageGetter> {
Mono subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot,
- Mono prefixKey,
- @Nullable Flux debuggingKeyFlux);
+ Mono prefixKey);
boolean isMultiKey();
-
- boolean needsDebuggingKeyFlux();
}
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java
index 25e3451..025d4e2 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java
@@ -16,38 +16,25 @@ import reactor.core.publisher.Mono;
public class SubStageGetterHashMap implements
SubStageGetter