From 3a544d4297ef91f07a4c89a56976f3dd0e244a63 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 23 Sep 2021 20:57:28 +0200 Subject: [PATCH] Partial migration to ResourceSupport --- .../dbengine/client/SearchResult.java | 3 +- .../dbengine/client/SearchResultKeys.java | 3 +- .../cavallium/dbengine/database/LLDelta.java | 2 +- .../cavallium/dbengine/database/LLEntry.java | 2 +- .../cavallium/dbengine/database/LLRange.java | 8 +- .../database/LLSearchResultShard.java | 2 +- .../cavallium/dbengine/database/LLUtils.java | 23 ++- .../database/LiveResourceSupport.java | 33 ++++ .../database/collections/DatabaseEmpty.java | 8 +- .../collections/DatabaseMapDictionary.java | 24 +-- .../DatabaseMapDictionaryDeep.java | 179 ++++++++++-------- .../DatabaseMapDictionaryHashed.java | 97 +++++++--- .../collections/DatabaseSetDictionary.java | 16 +- .../DatabaseSetDictionaryHashed.java | 18 +- .../database/collections/DatabaseSingle.java | 50 ++++- .../collections/DatabaseSingleBucket.java | 71 +++++-- .../collections/DatabaseSingleMapped.java | 65 +++++-- .../database/collections/DatabaseStage.java | 9 +- .../collections/DatabaseStageEntry.java | 1 + .../collections/DatabaseStageMap.java | 82 +++----- .../collections/DatabaseStageWithEntry.java | 1 + .../collections/SubStageGetterHashMap.java | 18 +- .../collections/SubStageGetterHashSet.java | 9 +- .../collections/SubStageGetterMap.java | 7 +- .../collections/SubStageGetterMapDeep.java | 9 +- .../collections/SubStageGetterSet.java | 2 +- .../collections/SubStageGetterSingle.java | 7 +- .../database/disk/LLIndexSearcher.java | 3 +- .../database/disk/LLIndexSearchers.java | 5 +- .../dbengine/lucene/LuceneUtils.java | 6 +- .../lucene/searcher/LuceneSearchResult.java | 3 +- .../dbengine/netty/NullableBuffer.java | 3 +- .../it/cavallium/dbengine/DbTestUtils.java | 15 +- .../cavallium/dbengine/TestDictionaryMap.java | 49 +++-- .../dbengine/TestDictionaryMapDeep.java | 105 +++++----- .../TestDictionaryMapDeepHashMap.java | 4 +- 36 files changed, 576 insertions(+), 366 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/database/LiveResourceSupport.java diff --git a/src/main/java/it/cavallium/dbengine/client/SearchResult.java b/src/main/java/it/cavallium/dbengine/client/SearchResult.java index efbca1c..0a9bec2 100644 --- a/src/main/java/it/cavallium/dbengine/client/SearchResult.java +++ b/src/main/java/it/cavallium/dbengine/client/SearchResult.java @@ -5,13 +5,14 @@ import io.net5.buffer.api.Owned; import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLSearchResultShard; +import it.cavallium.dbengine.database.LiveResourceSupport; import java.util.Objects; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -public final class SearchResult extends ResourceSupport, SearchResult> { +public final class SearchResult extends LiveResourceSupport, SearchResult> { private Flux> results; private TotalHitsCount totalHitsCount; diff --git a/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java b/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java index f071a1c..7478031 100644 --- a/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java +++ b/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java @@ -5,6 +5,7 @@ import io.net5.buffer.api.Owned; import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLSearchResultShard; +import it.cavallium.dbengine.database.LiveResourceSupport; import it.cavallium.dbengine.database.collections.ValueGetter; import java.util.Objects; import org.reactivestreams.Publisher; @@ -14,7 +15,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @SuppressWarnings("unused") -public final class SearchResultKeys extends ResourceSupport, SearchResultKeys> { +public final class SearchResultKeys extends LiveResourceSupport, SearchResultKeys> { private static final Logger logger = LoggerFactory.getLogger(SearchResultKeys.class); diff --git a/src/main/java/it/cavallium/dbengine/database/LLDelta.java b/src/main/java/it/cavallium/dbengine/database/LLDelta.java index dbf445b..8052c56 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDelta.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDelta.java @@ -8,7 +8,7 @@ import io.net5.buffer.api.internal.ResourceSupport; import java.util.StringJoiner; import org.jetbrains.annotations.Nullable; -public class LLDelta extends ResourceSupport { +public class LLDelta extends LiveResourceSupport { @Nullable private final Buffer previous; @Nullable diff --git a/src/main/java/it/cavallium/dbengine/database/LLEntry.java b/src/main/java/it/cavallium/dbengine/database/LLEntry.java index e93552a..3413008 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLEntry.java +++ b/src/main/java/it/cavallium/dbengine/database/LLEntry.java @@ -9,7 +9,7 @@ import java.util.StringJoiner; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -public class LLEntry extends ResourceSupport { +public class LLEntry extends LiveResourceSupport { @NotNull private final Buffer key; @NotNull diff --git a/src/main/java/it/cavallium/dbengine/database/LLRange.java b/src/main/java/it/cavallium/dbengine/database/LLRange.java index ef6199e..664c249 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLRange.java +++ b/src/main/java/it/cavallium/dbengine/database/LLRange.java @@ -12,7 +12,7 @@ import java.util.StringJoiner; /** * Range of data, from min (inclusive),to max (exclusive) */ -public class LLRange extends ResourceSupport { +public class LLRange extends LiveResourceSupport { private static final LLRange RANGE_ALL = new LLRange(null, null, null, d -> {}); private Buffer min; @@ -212,9 +212,9 @@ public class LLRange extends ResourceSupport { @Override public void drop(LLRange obj) { - if (obj.min != null) obj.min.close(); - if (obj.max != null) obj.max.close(); - if (obj.single != null) obj.single.close(); + if (obj.min != null && obj.min.isAccessible()) obj.min.close(); + if (obj.max != null && obj.max.isAccessible()) obj.max.close(); + if (obj.single != null && obj.single.isAccessible()) obj.single.close(); delegate.drop(obj); } } diff --git a/src/main/java/it/cavallium/dbengine/database/LLSearchResultShard.java b/src/main/java/it/cavallium/dbengine/database/LLSearchResultShard.java index ad07bec..2ffdbfc 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLSearchResultShard.java +++ b/src/main/java/it/cavallium/dbengine/database/LLSearchResultShard.java @@ -9,7 +9,7 @@ import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; -public final class LLSearchResultShard extends ResourceSupport { +public final class LLSearchResultShard extends LiveResourceSupport { private static final Logger logger = LoggerFactory.getLogger(LLSearchResultShard.class); diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 5b572ff..16f38cb 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -433,6 +433,25 @@ public class LLUtils { .doOnDiscard(Send.class, send -> send.close()); } + /** + * cleanup resource + * @param cleanupOnSuccess if true the resource will be cleaned up if the function is successful + */ + public static , V extends T> Flux usingEachResource(Flux resourceSupplier, + Function> resourceClosure, + boolean cleanupOnSuccess) { + return resourceSupplier + .concatMap(resource -> Mono.usingWhen(Mono.just(resource), resourceClosure, r -> { + if (cleanupOnSuccess) { + return Mono.fromRunnable(() -> r.close()); + } else { + return Mono.empty(); + } + }, (r, ex) -> Mono.fromRunnable(() -> r.close()), r -> Mono.fromRunnable(() -> r.close()))) + .doOnDiscard(Resource.class, resource -> resource.close()) + .doOnDiscard(Send.class, send -> send.close()); + } + /** * cleanup resource * @param cleanupOnSuccess if true the resource will be cleaned up if the function is successful @@ -936,7 +955,9 @@ public class LLUtils { } private static void discardStage(DatabaseStage stage) { - stage.release(); + if (stage != null && stage.isAccessible()) { + stage.close(); + } } public static boolean isDirect(Buffer key) { diff --git a/src/main/java/it/cavallium/dbengine/database/LiveResourceSupport.java b/src/main/java/it/cavallium/dbengine/database/LiveResourceSupport.java new file mode 100644 index 0000000..88b1c36 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/LiveResourceSupport.java @@ -0,0 +1,33 @@ +package it.cavallium.dbengine.database; + +import io.net5.buffer.api.Drop; +import io.net5.buffer.api.Owned; +import io.net5.buffer.api.Resource; +import io.net5.buffer.api.internal.LifecycleTracer; +import io.net5.buffer.api.internal.ResourceSupport; +import org.warp.commonutils.log.Logger; +import org.warp.commonutils.log.LoggerFactory; + +public abstract class LiveResourceSupport, T extends LiveResourceSupport> extends ResourceSupport { + + private static final Logger logger = LoggerFactory.getLogger(LiveResourceSupport.class); + + protected LiveResourceSupport(Drop drop) { + super(drop); + } + + @Override + protected void finalize() throws Throwable { + if (this.isAccessible()) { + try { + this.close(); + } catch (Throwable ignored) { + } finally { + var ise = new IllegalStateException("Resource not released"); + ise.setStackTrace(new StackTraceElement[0]); + logger.error("Resource not released: {}", this, attachTrace(ise)); + } + } + super.finalize(); + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java index 6769af6..cdd9dbd 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java @@ -2,7 +2,7 @@ package it.cavallium.dbengine.database.collections; import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; -import io.net5.buffer.api.CompositeBuffer; +import io.net5.buffer.api.Drop; import io.net5.buffer.api.Send; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLUtils; @@ -36,8 +36,10 @@ public class DatabaseEmpty { private DatabaseEmpty() { } - public static DatabaseStageEntry create(LLDictionary dictionary, Send key) { - return new DatabaseSingle<>(dictionary, key, nothingSerializer(dictionary.getAllocator())); + public static DatabaseStageEntry create(LLDictionary dictionary, + Send key, + Drop> drop) { + return new DatabaseSingle<>(dictionary, key, nothingSerializer(dictionary.getAllocator()), drop); } public static final class Nothing { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index fd931c7..5ee7a61 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -1,6 +1,7 @@ package it.cavallium.dbengine.database.collections; import io.net5.buffer.api.Buffer; +import io.net5.buffer.api.Drop; import io.net5.buffer.api.Send; import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.client.CompositeSnapshot; @@ -42,23 +43,27 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep prefixKey, SerializerFixedBinaryLength keySuffixSerializer, - Serializer valueSerializer) { + Serializer valueSerializer, + Drop>> drop) { // Do not retain or release or use the prefixKey here - super(dictionary, prefixKey, keySuffixSerializer, new SubStageGetterSingle<>(valueSerializer), 0); + super(dictionary, prefixKey, keySuffixSerializer, new SubStageGetterSingle<>(valueSerializer), 0, drop); this.valueSerializer = valueSerializer; } public static DatabaseMapDictionary simple(LLDictionary dictionary, SerializerFixedBinaryLength keySerializer, - Serializer valueSerializer) { - return new DatabaseMapDictionary<>(dictionary, LLUtils.empty(dictionary.getAllocator()), keySerializer, valueSerializer); + Serializer valueSerializer, + Drop>> drop) { + return new DatabaseMapDictionary<>(dictionary, LLUtils.empty(dictionary.getAllocator()), keySerializer, + valueSerializer, drop); } public static DatabaseMapDictionary tail(LLDictionary dictionary, Send prefixKey, SerializerFixedBinaryLength keySuffixSerializer, - Serializer valueSerializer) { - return new DatabaseMapDictionary<>(dictionary, prefixKey, keySuffixSerializer, valueSerializer); + Serializer valueSerializer, + Drop>> drop) { + return new DatabaseMapDictionary<>(dictionary, prefixKey, keySuffixSerializer, valueSerializer, drop); } private Send toKey(Send suffixKeyToSend) { @@ -147,7 +152,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> at(@Nullable CompositeSnapshot snapshot, T keySuffix) { return Mono.fromCallable(() -> - new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), valueSerializer)); + new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), valueSerializer, d -> {})); } @Override @@ -396,10 +401,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep(dictionary, - toKey(keyBuf.send()), - valueSerializer - ) + new DatabaseSingle<>(dictionary, toKey(keyBuf.send()), valueSerializer, d -> {}) )); } catch (SerializationException ex) { sink.error(ex); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index baf3492..b62afdc 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -2,8 +2,11 @@ package it.cavallium.dbengine.database.collections; import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; +import io.net5.buffer.api.Drop; +import io.net5.buffer.api.Owned; import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; +import io.net5.buffer.api.internal.ResourceSupport; import io.net5.util.IllegalReferenceCountException; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.CompositeSnapshot; @@ -12,6 +15,7 @@ import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.LiveResourceSupport; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; @@ -24,20 +28,21 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; // todo: implement optimized methods (which?) -public class DatabaseMapDictionaryDeep> implements DatabaseStageMap { +public class DatabaseMapDictionaryDeep> extends + LiveResourceSupport>, DatabaseMapDictionaryDeep> + implements DatabaseStageMap { protected final LLDictionary dictionary; private final BufferAllocator alloc; protected final SubStageGetter subStageGetter; protected final SerializerFixedBinaryLength keySuffixSerializer; - @NotNull - protected final Buffer keyPrefix; protected final int keyPrefixLength; protected final int keySuffixLength; protected final int keyExtLength; - protected final LLRange range; protected final Mono> rangeMono; - private volatile boolean released; + + protected LLRange range; + protected Buffer keyPrefix; private static void incrementPrefix(Buffer prefix, int prefixLength) { assert prefix.readableBytes() >= prefixLength; @@ -71,18 +76,12 @@ public class DatabaseMapDictionaryDeep> implem } } - static Buffer firstRangeKey(BufferAllocator alloc, - Send prefixKey, - int prefixLength, - int suffixLength, + static Buffer firstRangeKey(BufferAllocator alloc, Send prefixKey, int prefixLength, int suffixLength, int extLength) { return zeroFillKeySuffixAndExt(alloc, prefixKey, prefixLength, suffixLength, extLength); } - static Buffer nextRangeKey(BufferAllocator alloc, - Send prefixKey, - int prefixLength, - int suffixLength, + static Buffer nextRangeKey(BufferAllocator alloc, Send prefixKey, int prefixLength, int suffixLength, int extLength) { try (prefixKey) { Buffer nonIncremented = zeroFillKeySuffixAndExt(alloc, prefixKey, prefixLength, suffixLength, extLength); @@ -91,11 +90,8 @@ public class DatabaseMapDictionaryDeep> implem } } - protected static Buffer zeroFillKeySuffixAndExt(BufferAllocator alloc, - @NotNull Send prefixKeySend, - int prefixLength, - int suffixLength, - int extLength) { + protected static Buffer zeroFillKeySuffixAndExt(BufferAllocator alloc, @NotNull Send prefixKeySend, + int prefixLength, int suffixLength, int extLength) { var result = prefixKeySend.receive(); if (result == null) { assert prefixLength == 0; @@ -115,41 +111,20 @@ public class DatabaseMapDictionaryDeep> implem } } - static Buffer firstRangeKey( - BufferAllocator alloc, - Send prefixKey, - Send suffixKey, - int prefixLength, - int suffixLength, - int extLength) { + static Buffer firstRangeKey(BufferAllocator alloc, Send prefixKey, Send suffixKey, int prefixLength, + int suffixLength, int extLength) { return zeroFillKeyExt(alloc, prefixKey, suffixKey, prefixLength, suffixLength, extLength); } - static Buffer nextRangeKey( - BufferAllocator alloc, - Send prefixKey, - Send suffixKey, - int prefixLength, - int suffixLength, - int extLength) { - Buffer nonIncremented = zeroFillKeyExt(alloc, - prefixKey, - suffixKey, - prefixLength, - suffixLength, - extLength - ); + static Buffer nextRangeKey(BufferAllocator alloc, Send prefixKey, Send suffixKey, int prefixLength, + int suffixLength, int extLength) { + Buffer nonIncremented = zeroFillKeyExt(alloc, prefixKey, suffixKey, prefixLength, suffixLength, extLength); incrementPrefix(nonIncremented, prefixLength + suffixLength); return nonIncremented; } - protected static Buffer zeroFillKeyExt( - BufferAllocator alloc, - Send prefixKeySend, - Send suffixKeySend, - int prefixLength, - int suffixLength, - int extLength) { + protected static Buffer zeroFillKeyExt(BufferAllocator alloc, Send prefixKeySend, Send suffixKeySend, + int prefixLength, int suffixLength, int extLength) { try (var prefixKey = prefixKeySend.receive()) { try (var suffixKey = suffixKeySend.receive()) { assert prefixKey.readableBytes() == prefixLength; @@ -174,36 +149,30 @@ public class DatabaseMapDictionaryDeep> implem */ @Deprecated public static DatabaseMapDictionaryDeep> simple(LLDictionary dictionary, - SerializerFixedBinaryLength keySerializer, - SubStageGetterSingle subStageGetter) { - return new DatabaseMapDictionaryDeep<>(dictionary, LLUtils.empty(dictionary.getAllocator()), keySerializer, subStageGetter, 0); + SerializerFixedBinaryLength keySerializer, SubStageGetterSingle subStageGetter, + Drop>> drop) { + return new DatabaseMapDictionaryDeep<>(dictionary, LLUtils.empty(dictionary.getAllocator()), keySerializer, + subStageGetter, 0, drop); } - public static > DatabaseMapDictionaryDeep deepTail(LLDictionary dictionary, - SerializerFixedBinaryLength keySerializer, - int keyExtLength, - SubStageGetter subStageGetter) { - return new DatabaseMapDictionaryDeep<>(dictionary, - LLUtils.empty(dictionary.getAllocator()), - keySerializer, - subStageGetter, - keyExtLength - ); + public static > DatabaseMapDictionaryDeep deepTail( + LLDictionary dictionary, SerializerFixedBinaryLength keySerializer, int keyExtLength, + SubStageGetter subStageGetter, Drop> drop) { + return new DatabaseMapDictionaryDeep<>(dictionary, LLUtils.empty(dictionary.getAllocator()), keySerializer, + subStageGetter, keyExtLength, drop); } - public static > DatabaseMapDictionaryDeep deepIntermediate(LLDictionary dictionary, - Send prefixKey, - SerializerFixedBinaryLength keySuffixSerializer, - SubStageGetter subStageGetter, - int keyExtLength) { - return new DatabaseMapDictionaryDeep<>(dictionary, prefixKey, keySuffixSerializer, subStageGetter, keyExtLength); + public static > DatabaseMapDictionaryDeep deepIntermediate( + LLDictionary dictionary, Send prefixKey, SerializerFixedBinaryLength keySuffixSerializer, + SubStageGetter subStageGetter, int keyExtLength, Drop> drop) { + return new DatabaseMapDictionaryDeep<>(dictionary, prefixKey, keySuffixSerializer, subStageGetter, + keyExtLength, drop); } - protected DatabaseMapDictionaryDeep(LLDictionary dictionary, - @NotNull Send prefixKeyToReceive, - SerializerFixedBinaryLength keySuffixSerializer, - SubStageGetter subStageGetter, - int keyExtLength) { + protected DatabaseMapDictionaryDeep(LLDictionary dictionary, @NotNull Send prefixKeyToReceive, + SerializerFixedBinaryLength keySuffixSerializer, SubStageGetter subStageGetter, int keyExtLength, + Drop> drop) { + super(new CloseOnDrop<>(drop)); try (var prefixKey = prefixKeyToReceive.receive()) { this.dictionary = dictionary; this.alloc = dictionary.getAllocator(); @@ -231,6 +200,31 @@ public class DatabaseMapDictionaryDeep> implem } } + private DatabaseMapDictionaryDeep(LLDictionary dictionary, + BufferAllocator alloc, + SubStageGetter subStageGetter, + SerializerFixedBinaryLength keySuffixSerializer, + int keyPrefixLength, + int keySuffixLength, + int keyExtLength, + Mono> rangeMono, + Send range, + Send keyPrefix, + Drop> drop) { + super(new CloseOnDrop<>(drop)); + this.dictionary = dictionary; + this.alloc = alloc; + this.subStageGetter = subStageGetter; + this.keySuffixSerializer = keySuffixSerializer; + this.keyPrefixLength = keyPrefixLength; + this.keySuffixLength = keySuffixLength; + this.keyExtLength = keyExtLength; + this.rangeMono = rangeMono; + + this.range = range.receive(); + this.keyPrefix = keyPrefix.receive(); + } + @SuppressWarnings("unused") protected boolean suffixKeyConsistency(int keySuffixLength) { return this.keySuffixLength == keySuffixLength; @@ -301,7 +295,7 @@ public class DatabaseMapDictionaryDeep> implem return this.subStageGetter .subStage(dictionary, snapshot, suffixKeyWithoutExt) .transform(LLUtils::handleDiscard) - .doOnDiscard(DatabaseStage.class, DatabaseStage::release); + .doOnDiscard(DatabaseStage.class, DatabaseStage::close); } @Override @@ -415,13 +409,42 @@ public class DatabaseMapDictionaryDeep> implem } @Override - public void release() { - if (!released) { - released = true; - this.range.close(); - this.keyPrefix.close(); - } else { - throw new IllegalReferenceCountException(0, -1); + protected RuntimeException createResourceClosedException() { + throw new IllegalStateException("Closed"); + } + + @Override + protected Owned> prepareSend() { + var keyPrefix = this.keyPrefix.send(); + var range = this.range.send(); + return drop -> new DatabaseMapDictionaryDeep<>(dictionary, alloc, subStageGetter, keySuffixSerializer, + keyPrefixLength, keySuffixLength, keyExtLength, rangeMono, range, keyPrefix, drop); + } + + @Override + protected void makeInaccessible() { + this.keyPrefix = null; + this.range = null; + } + + private static class CloseOnDrop> implements + Drop> { + + private final Drop> delegate; + + public CloseOnDrop(Drop> drop) { + this.delegate = drop; + } + + @Override + public void drop(DatabaseMapDictionaryDeep obj) { + if (obj.range != null) { + obj.range.close(); + } + if (obj.keyPrefix != null) { + obj.keyPrefix.close(); + } + delegate.drop(obj); } } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java index 66815c3..daec688 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java @@ -2,11 +2,14 @@ package it.cavallium.dbengine.database.collections; import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; +import io.net5.buffer.api.Drop; +import io.net5.buffer.api.Owned; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.LiveResourceSupport; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; @@ -24,18 +27,23 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @SuppressWarnings("unused") -public class DatabaseMapDictionaryHashed implements DatabaseStageMap> { +public class DatabaseMapDictionaryHashed extends + LiveResourceSupport>, DatabaseMapDictionaryHashed> + implements DatabaseStageMap> { private final BufferAllocator alloc; - private final DatabaseMapDictionary>> subDictionary; private final Function keySuffixHashFunction; + private DatabaseMapDictionary>> subDictionary; + protected DatabaseMapDictionaryHashed(LLDictionary dictionary, @NotNull Send prefixKey, Serializer keySuffixSerializer, Serializer valueSerializer, Function keySuffixHashFunction, - SerializerFixedBinaryLength keySuffixHashSerializer) { + SerializerFixedBinaryLength keySuffixHashSerializer, + Drop> drop) { + super(new DatabaseMapDictionaryHashed.CloseOnDrop<>(drop)); if (dictionary.getUpdateMode().block() != UpdateMode.ALLOW) { throw new IllegalArgumentException("Hashed maps only works when UpdateMode is ALLOW"); } @@ -44,26 +52,36 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap(alloc, keySuffixSerializer, valueSerializer); ValuesSetSerializer> valuesSetSerializer = new ValuesSetSerializer<>(alloc, valueWithHashSerializer); - this.subDictionary = DatabaseMapDictionary.tail(dictionary, - prefixKey, - keySuffixHashSerializer, - valuesSetSerializer - ); + this.subDictionary = DatabaseMapDictionary.tail(dictionary, prefixKey, keySuffixHashSerializer, + valuesSetSerializer, d -> {}); this.keySuffixHashFunction = keySuffixHashFunction; } + private DatabaseMapDictionaryHashed(BufferAllocator alloc, + Function keySuffixHashFunction, + Send>>>> subDictionary, + Drop> drop) { + super(new CloseOnDrop<>(drop)); + this.alloc = alloc; + this.keySuffixHashFunction = keySuffixHashFunction; + + this.subDictionary = (DatabaseMapDictionary>>) subDictionary.receive(); + } + public static DatabaseMapDictionaryHashed simple(LLDictionary dictionary, Serializer keySerializer, Serializer valueSerializer, Function keyHashFunction, - SerializerFixedBinaryLength keyHashSerializer) { + SerializerFixedBinaryLength keyHashSerializer, + Drop> drop) { return new DatabaseMapDictionaryHashed<>( dictionary, LLUtils.empty(dictionary.getAllocator()), keySerializer, valueSerializer, keyHashFunction, - keyHashSerializer + keyHashSerializer, + drop ); } @@ -72,13 +90,15 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap keySuffixSerializer, Serializer valueSerializer, Function keySuffixHashFunction, - SerializerFixedBinaryLength keySuffixHashSerializer) { + SerializerFixedBinaryLength keySuffixHashSerializer, + Drop> drop) { return new DatabaseMapDictionaryHashed<>(dictionary, prefixKey, keySuffixSerializer, valueSerializer, keySuffixHashFunction, - keySuffixHashSerializer + keySuffixHashSerializer, + drop ); } @@ -125,11 +145,6 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap close() { - return subDictionary.close(); - } - @Override public Mono isEmpty(@Nullable CompositeSnapshot snapshot) { return subDictionary.isEmpty(snapshot); @@ -145,11 +160,6 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap> at(@Nullable CompositeSnapshot snapshot, T key) { return this @@ -160,7 +170,7 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap> atPrivate(@Nullable CompositeSnapshot snapshot, T key, TH hash) { return subDictionary .at(snapshot, hash) - .map(entry -> new DatabaseSingleBucket<>(entry, key)); + .map(entry -> new DatabaseSingleBucket<>(entry, key, d -> {})); } @Override @@ -193,13 +203,11 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap> setAllValuesAndGetPrevious(Flux> entries) { return entries - .flatMap(entry -> Flux.usingWhen( - this.at(null, entry.getKey()), + .flatMap(entry -> LLUtils.usingResource(this.at(null, entry.getKey()), stage -> stage .setAndGetPrevious(entry.getValue()) - .map(prev -> Map.entry(entry.getKey(), prev)), - stage -> Mono.fromRunnable(stage::release) - )); + .map(prev -> Map.entry(entry.getKey(), prev)), true) + ); } @Override @@ -297,4 +305,37 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap> prepareSend() { + var subDictionary = this.subDictionary.send(); + return drop -> new DatabaseMapDictionaryHashed<>(alloc, keySuffixHashFunction, subDictionary, drop); + } + + @Override + protected void makeInaccessible() { + this.subDictionary = null; + } + + private static class CloseOnDrop implements Drop> { + + private final Drop> delegate; + + public CloseOnDrop(Drop> drop) { + this.delegate = drop; + } + + @Override + public void drop(DatabaseMapDictionaryHashed obj) { + if (obj.subDictionary != null) { + obj.subDictionary.close(); + } + delegate.drop(obj); + } + } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionary.java index 260ff51..c082547 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionary.java @@ -1,6 +1,7 @@ package it.cavallium.dbengine.database.collections; import io.net5.buffer.api.Buffer; +import io.net5.buffer.api.Drop; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; @@ -18,19 +19,22 @@ public class DatabaseSetDictionary extends DatabaseMapDictionary protected DatabaseSetDictionary(LLDictionary dictionary, Send prefixKey, - SerializerFixedBinaryLength keySuffixSerializer) { - super(dictionary, prefixKey, keySuffixSerializer, DatabaseEmpty.nothingSerializer(dictionary.getAllocator())); + SerializerFixedBinaryLength keySuffixSerializer, + Drop>> drop) { + super(dictionary, prefixKey, keySuffixSerializer, DatabaseEmpty.nothingSerializer(dictionary.getAllocator()), drop); } public static DatabaseSetDictionary simple(LLDictionary dictionary, - SerializerFixedBinaryLength keySerializer) { - return new DatabaseSetDictionary<>(dictionary, LLUtils.empty(dictionary.getAllocator()), keySerializer); + SerializerFixedBinaryLength keySerializer, + Drop>> drop) { + return new DatabaseSetDictionary<>(dictionary, LLUtils.empty(dictionary.getAllocator()), keySerializer, drop); } public static DatabaseSetDictionary tail(LLDictionary dictionary, Send prefixKey, - SerializerFixedBinaryLength keySuffixSerializer) { - return new DatabaseSetDictionary<>(dictionary, prefixKey, keySuffixSerializer); + SerializerFixedBinaryLength keySuffixSerializer, + Drop>> drop) { + return new DatabaseSetDictionary<>(dictionary, prefixKey, keySuffixSerializer, drop); } public Mono> getKeySet(@Nullable CompositeSnapshot snapshot) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionaryHashed.java index 16a9f3d..68bf230 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionaryHashed.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionaryHashed.java @@ -1,6 +1,7 @@ package it.cavallium.dbengine.database.collections; import io.net5.buffer.api.Buffer; +import io.net5.buffer.api.Drop; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; @@ -23,25 +24,29 @@ public class DatabaseSetDictionaryHashed extends DatabaseMapDictionaryHas @NotNull Send prefixKey, Serializer keySuffixSerializer, Function keySuffixHashFunction, - SerializerFixedBinaryLength keySuffixHashSerializer) { + SerializerFixedBinaryLength keySuffixHashSerializer, + Drop> drop) { super(dictionary, prefixKey, keySuffixSerializer, DatabaseEmpty.nothingSerializer(dictionary.getAllocator()), keySuffixHashFunction, - keySuffixHashSerializer + keySuffixHashSerializer, + drop ); } public static DatabaseSetDictionaryHashed simple(LLDictionary dictionary, Serializer keySerializer, Function keyHashFunction, - SerializerFixedBinaryLength keyHashSerializer) { + SerializerFixedBinaryLength keyHashSerializer, + Drop> drop) { return new DatabaseSetDictionaryHashed<>(dictionary, LLUtils.empty(dictionary.getAllocator()), keySerializer, keyHashFunction, - keyHashSerializer + keyHashSerializer, + drop ); } @@ -49,12 +54,13 @@ public class DatabaseSetDictionaryHashed extends DatabaseMapDictionaryHas Send prefixKey, Serializer keySuffixSerializer, Function keyHashFunction, - SerializerFixedBinaryLength keyHashSerializer) { + SerializerFixedBinaryLength keyHashSerializer, Drop> drop) { return new DatabaseSetDictionaryHashed<>(dictionary, prefixKey, keySuffixSerializer, keyHashFunction, - keyHashSerializer + keyHashSerializer, + drop ); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java index 8dcd6ab..05b9f6e 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java @@ -1,6 +1,8 @@ package it.cavallium.dbengine.database.collections; import io.net5.buffer.api.Buffer; +import io.net5.buffer.api.Drop; +import io.net5.buffer.api.Owned; import io.net5.buffer.api.Send; import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.client.BadBlock; @@ -20,14 +22,18 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.SynchronousSink; -public class DatabaseSingle implements DatabaseStageEntry { +public class DatabaseSingle extends ResourceSupport, DatabaseSingle> implements + DatabaseStageEntry { private final LLDictionary dictionary; - private final Buffer key; private final Mono> keyMono; private final Serializer serializer; - public DatabaseSingle(LLDictionary dictionary, Send key, Serializer serializer) { + private Buffer key; + + public DatabaseSingle(LLDictionary dictionary, Send key, Serializer serializer, + Drop> drop) { + super(new CloseOnDrop<>(drop)); try (key) { this.dictionary = dictionary; this.key = key.receive(); @@ -124,13 +130,41 @@ public class DatabaseSingle implements DatabaseStageEntry { .isRangeEmpty(resolveSnapshot(snapshot), keyMono.map(LLRange::single).map(ResourceSupport::send)); } - @Override - public void release() { - key.close(); - } - @Override public Flux badBlocks() { return dictionary.badBlocks(keyMono.map(LLRange::single).map(ResourceSupport::send)); } + + @Override + protected RuntimeException createResourceClosedException() { + throw new IllegalStateException("Closed"); + } + + @Override + protected Owned> prepareSend() { + var key = this.key.send(); + return drop -> new DatabaseSingle<>(dictionary, key, serializer, drop); + } + + @Override + protected void makeInaccessible() { + this.key = null; + } + + private static class CloseOnDrop implements Drop> { + + private final Drop> delegate; + + public CloseOnDrop(Drop> drop) { + this.delegate = drop; + } + + @Override + public void drop(DatabaseSingle obj) { + if (obj.key != null) { + obj.key.close(); + } + delegate.drop(obj); + } + } } \ No newline at end of file diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java index a60934d..b3e577f 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java @@ -1,10 +1,14 @@ package it.cavallium.dbengine.database.collections; +import io.net5.buffer.api.Drop; +import io.net5.buffer.api.Owned; +import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.LiveResourceSupport; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.unimi.dsi.fastutil.objects.ObjectArraySet; @@ -23,14 +27,26 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @SuppressWarnings("unused") -public class DatabaseSingleBucket implements DatabaseStageEntry { +public class DatabaseSingleBucket + extends LiveResourceSupport, DatabaseSingleBucket> + implements DatabaseStageEntry { - private final DatabaseStageEntry>> bucketStage; private final K key; - public DatabaseSingleBucket(DatabaseStageEntry>> bucketStage, K key) { - this.bucketStage = bucketStage; + private DatabaseStageEntry>> bucketStage; + + public DatabaseSingleBucket(DatabaseStageEntry>> bucketStage, K key, + Drop> drop) { + super(new CloseOnDrop<>(drop)); this.key = key; + this.bucketStage = bucketStage; + } + + private DatabaseSingleBucket(Send>>> bucketStage, K key, + Drop> drop) { + super(new CloseOnDrop<>(drop)); + this.key = key; + this.bucketStage = (DatabaseStageEntry>>) bucketStage.receive(); } @Override @@ -77,7 +93,8 @@ public class DatabaseSingleBucket implements DatabaseStageEntry { } @Override - public Mono> updateAndGetDelta(SerializationFunction<@Nullable V, @Nullable V> updater, boolean existsAlmostCertainly) { + public Mono> updateAndGetDelta(SerializationFunction<@Nullable V, @Nullable V> updater, + boolean existsAlmostCertainly) { return bucketStage .updateAndGetDelta(oldBucket -> { V oldValue = extractValue(oldBucket); @@ -106,11 +123,6 @@ public class DatabaseSingleBucket implements DatabaseStageEntry { return this.updateAndGetDelta(prev -> null).map(LLUtils::isDeltaChanged); } - @Override - public Mono close() { - return bucketStage.close(); - } - @Override public Mono leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) { return this.get(snapshot).map(prev -> 1L).defaultIfEmpty(0L); @@ -131,11 +143,6 @@ public class DatabaseSingleBucket implements DatabaseStageEntry { return bucketStage.badBlocks(); } - @Override - public void release() { - bucketStage.release(); - } - private Mono extractValueTransformation(Set> entries) { return Mono.fromCallable(() -> extractValue(entries)); } @@ -193,4 +200,38 @@ public class DatabaseSingleBucket implements DatabaseStageEntry { return null; } } + + @Override + protected RuntimeException createResourceClosedException() { + throw new IllegalStateException("Closed"); + } + + @Override + protected Owned> prepareSend() { + var bucketStage = this.bucketStage.send(); + return drop -> new DatabaseSingleBucket<>(bucketStage, key, drop); + } + + @Override + protected void makeInaccessible() { + this.bucketStage = null; + } + + private static class CloseOnDrop implements + Drop> { + + private final Drop> delegate; + + public CloseOnDrop(Drop> drop) { + this.delegate = drop; + } + + @Override + public void drop(DatabaseSingleBucket obj) { + if (obj.bucketStage != null) { + obj.bucketStage.close(); + } + delegate.drop(obj); + } + } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java index c17fcc9..935c0e4 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java @@ -1,5 +1,9 @@ package it.cavallium.dbengine.database.collections; +import io.net5.buffer.api.Drop; +import io.net5.buffer.api.Owned; +import io.net5.buffer.api.Send; +import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.client.Mapper; @@ -14,16 +18,28 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.SynchronousSink; @SuppressWarnings("unused") -public class DatabaseSingleMapped implements DatabaseStageEntry { +public class DatabaseSingleMapped extends ResourceSupport, DatabaseSingleMapped> + implements DatabaseStageEntry { - private final DatabaseStageEntry serializedSingle; private final Mapper mapper; - public DatabaseSingleMapped(DatabaseStageEntry serializedSingle, Mapper mapper) { + private DatabaseStageEntry serializedSingle; + + public DatabaseSingleMapped(DatabaseStageEntry serializedSingle, Mapper mapper, + Drop> drop) { + super(new CloseOnDrop<>(drop)); this.serializedSingle = serializedSingle; this.mapper = mapper; } + private DatabaseSingleMapped(Send> serializedSingle, Mapper mapper, + Drop> drop) { + super(new CloseOnDrop<>(drop)); + this.mapper = mapper; + + this.serializedSingle = (DatabaseStageEntry) serializedSingle.receive(); + } + private void deserializeSink(B value, SynchronousSink sink) { try { sink.next(this.unMap(value)); @@ -107,11 +123,6 @@ public class DatabaseSingleMapped implements DatabaseStageEntry { return serializedSingle.clearAndGetStatus(); } - @Override - public Mono close() { - return serializedSingle.close(); - } - @Override public Mono leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) { return serializedSingle.leavesCount(snapshot, fast); @@ -132,11 +143,6 @@ public class DatabaseSingleMapped implements DatabaseStageEntry { return this.serializedSingle.badBlocks(); } - @Override - public void release() { - serializedSingle.release(); - } - //todo: temporary wrapper. convert the whole class to buffers private A unMap(B bytes) throws SerializationException { return mapper.unmap(bytes); @@ -146,4 +152,37 @@ public class DatabaseSingleMapped implements DatabaseStageEntry { private B map(A bytes) throws SerializationException { return mapper.map(bytes); } + + @Override + protected RuntimeException createResourceClosedException() { + throw new IllegalStateException("Closed"); + } + + @Override + protected Owned> prepareSend() { + var serializedSingle = this.serializedSingle.send(); + return drop -> new DatabaseSingleMapped<>(serializedSingle, mapper, drop); + } + + @Override + protected void makeInaccessible() { + this.serializedSingle = null; + } + + private static class CloseOnDrop implements Drop> { + + private final Drop> delegate; + + public CloseOnDrop(Drop> drop) { + this.delegate = drop; + } + + @Override + public void drop(DatabaseSingleMapped obj) { + if (obj.serializedSingle != null) { + obj.serializedSingle.close(); + } + delegate.drop(obj); + } + } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java index acd160b..444773b 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.database.collections; +import io.net5.buffer.api.Resource; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; @@ -12,7 +13,7 @@ import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -public interface DatabaseStage extends DatabaseStageWithEntry { +public interface DatabaseStage extends DatabaseStageWithEntry, Resource> { default Mono get(@Nullable CompositeSnapshot snapshot) { return get(snapshot, false); @@ -74,12 +75,6 @@ public interface DatabaseStage extends DatabaseStageWithEntry { return clearAndGetPrevious().map(Objects::nonNull).defaultIfEmpty(false); } - void release(); - - default Mono close() { - return Mono.empty(); - } - /** * Count all the elements. * If it's a nested collection the count will include all the children recursively diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageEntry.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageEntry.java index 0e09d66..c5df267 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageEntry.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageEntry.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.database.collections; +import io.net5.buffer.api.Resource; import it.cavallium.dbengine.client.BadBlock; import reactor.core.publisher.Flux; diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index fdc4c2e..6588646 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -34,11 +34,8 @@ public interface DatabaseStageMap> extends Dat Mono at(@Nullable CompositeSnapshot snapshot, T key); default Mono getValue(@Nullable CompositeSnapshot snapshot, T key, boolean existsAlmostCertainly) { - return Mono.usingWhen( - this.at(snapshot, key), - stage -> stage.get(snapshot, existsAlmostCertainly), - stage -> Mono.fromRunnable(stage::release) - ); + return LLUtils.usingResource(this.at(snapshot, key), + stage -> stage.get(snapshot, existsAlmostCertainly), true); } default Mono getValue(@Nullable CompositeSnapshot snapshot, T key) { @@ -50,11 +47,8 @@ public interface DatabaseStageMap> extends Dat } default Mono putValue(T key, U value) { - return Mono.usingWhen( - at(null, key).single(), - stage -> stage.set(value), - stage -> Mono.fromRunnable(stage::release) - ); + return LLUtils.usingResource(at(null, key).single(), + stage -> stage.set(value), true); } Mono getUpdateMode(); @@ -63,11 +57,8 @@ public interface DatabaseStageMap> extends Dat UpdateReturnMode updateReturnMode, boolean existsAlmostCertainly, SerializationFunction<@Nullable U, @Nullable U> updater) { - return Mono.usingWhen( - this.at(null, key).single(), - stage -> stage.update(updater, updateReturnMode, existsAlmostCertainly), - stage -> Mono.fromRunnable(stage::release) - ); + return LLUtils.usingResource(this.at(null, key).single(), + stage -> stage.update(updater, updateReturnMode, existsAlmostCertainly), true); } default Flux> updateMulti(Flux> entries, @@ -94,11 +85,8 @@ public interface DatabaseStageMap> extends Dat default Mono> updateValueAndGetDelta(T key, boolean existsAlmostCertainly, SerializationFunction<@Nullable U, @Nullable U> updater) { - return Mono.usingWhen( - this.at(null, key).single(), - stage -> stage.updateAndGetDelta(updater, existsAlmostCertainly), - stage -> Mono.fromRunnable(stage::release) - ); + return LLUtils.usingResource(this.at(null, key).single(), + stage -> stage.updateAndGetDelta(updater, existsAlmostCertainly), true); } default Mono> updateValueAndGetDelta(T key, SerializationFunction<@Nullable U, @Nullable U> updater) { @@ -106,22 +94,14 @@ public interface DatabaseStageMap> extends Dat } default Mono putValueAndGetPrevious(T key, U value) { - return Mono.usingWhen( - at(null, key).single(), - stage -> stage.setAndGetPrevious(value), - stage -> Mono.fromRunnable(stage::release) - ); + return LLUtils.usingResource(at(null, key).single(), stage -> stage.setAndGetPrevious(value), true); } /** * @return true if the key was associated with any value, false if the key didn't exist. */ default Mono putValueAndGetChanged(T key, U value) { - return Mono.usingWhen( - at(null, key).single(), - stage -> stage.setAndGetChanged(value), - stage -> Mono.fromRunnable(stage::release) - ).single(); + return LLUtils.usingResource(at(null, key).single(), stage -> stage.setAndGetChanged(value), true).single(); } default Mono remove(T key) { @@ -129,11 +109,7 @@ public interface DatabaseStageMap> extends Dat } default Mono removeAndGetPrevious(T key) { - return Mono.usingWhen( - at(null, key), - DatabaseStage::clearAndGetPrevious, - stage -> Mono.fromRunnable(stage::release) - ); + return LLUtils.usingResource(at(null, key), DatabaseStage::clearAndGetPrevious, true); } default Mono removeAndGetStatus(T key) { @@ -175,11 +151,11 @@ public interface DatabaseStageMap> extends Dat default Flux> getAllValues(@Nullable CompositeSnapshot snapshot) { return this .getAllStages(snapshot) - .flatMapSequential(entry -> entry + .flatMapSequential(stage -> stage .getValue() .get(snapshot, true) - .map(value -> Map.entry(entry.getKey(), value)) - .doAfterTerminate(() -> entry.getValue().release()) + .map(value -> Map.entry(stage.getKey(), value)) + .doFinally(s -> stage.getValue().close()) ); } @@ -193,7 +169,8 @@ public interface DatabaseStageMap> extends Dat return setAllValues(Flux.empty()); } - default Mono replaceAllValues(boolean canKeysChange, Function, Mono>> entriesReplacer) { + default Mono replaceAllValues(boolean canKeysChange, Function, + Mono>> entriesReplacer) { if (canKeysChange) { return this.setAllValues(this.getAllValues(null).flatMap(entriesReplacer)).then(); } else { @@ -202,7 +179,11 @@ public interface DatabaseStageMap> extends Dat .flatMap(entriesReplacer) .flatMap(replacedEntry -> this .at(null, replacedEntry.getKey()) - .flatMap(v -> v.set(replacedEntry.getValue()).doAfterTerminate(v::release))) + .flatMap(stage -> stage + .set(replacedEntry.getValue()) + .doFinally(s -> stage.close()) + ) + ) .then(); } } @@ -210,9 +191,8 @@ public interface DatabaseStageMap> extends Dat default Mono replaceAll(Function, Mono> entriesReplacer) { return this .getAllStages(null) - .flatMap(stage -> Mono - .defer(() -> entriesReplacer.apply(stage)) - .doAfterTerminate(() -> stage.getValue().release()) + .flatMap(stage -> entriesReplacer.apply(stage) + .doFinally(s -> stage.getValue().close()) ) .then(); } @@ -221,14 +201,15 @@ public interface DatabaseStageMap> extends Dat default Mono> setAndGetPrevious(Map value) { return this .setAllValuesAndGetPrevious(Flux.fromIterable(Map.copyOf(value).entrySet())) - .collectMap(Entry::getKey, Entry::getValue, HashMap::new); + .collectMap(Entry::getKey, Entry::getValue, HashMap::new) + .filter(map -> !map.isEmpty()); } @Override default Mono setAndGetChanged(Map value) { return this .setAndGetPrevious(value) - .map(oldValue -> !Objects.equals(oldValue, value)) + .map(oldValue -> !Objects.equals(oldValue, value.isEmpty() ? null : value)) .switchIfEmpty(Mono.fromSupplier(() -> !value.isEmpty())); } @@ -286,18 +267,17 @@ public interface DatabaseStageMap> extends Dat @Override default Mono> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { - return getAllValues(snapshot) + return this + .getAllValues(snapshot) .collectMap(Entry::getKey, Entry::getValue, HashMap::new) .filter(map -> !map.isEmpty()); } @Override default Mono leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) { - return getAllStages(snapshot) - .flatMap(stage -> Mono - .fromRunnable(() -> stage.getValue().release()) - .thenReturn(true) - ) + return this + .getAllStages(snapshot) + .doOnNext(stage -> stage.getValue().close()) .count(); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageWithEntry.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageWithEntry.java index 5e48507..3d7106c 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageWithEntry.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageWithEntry.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.database.collections; +import io.net5.buffer.api.Resource; import it.cavallium.dbengine.client.BadBlock; import reactor.core.publisher.Mono; diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java index 48e6add..38da00b 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java @@ -4,6 +4,7 @@ import io.net5.buffer.api.Buffer; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; +import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import java.util.Map; @@ -34,20 +35,9 @@ public class SubStageGetterHashMap implements public Mono> subStage(LLDictionary dictionary, @Nullable CompositeSnapshot snapshot, Mono> prefixKeyMono) { - return Mono.usingWhen( - prefixKeyMono, - prefixKey -> Mono - .fromSupplier(() -> DatabaseMapDictionaryHashed - .tail(dictionary, - prefixKey, - keySerializer, - valueSerializer, - keyHashFunction, - keyHashSerializer - ) - ), - prefixKey -> Mono.fromRunnable(prefixKey::close) - ); + return LLUtils.usingSend(prefixKeyMono, prefixKey -> Mono.just(DatabaseMapDictionaryHashed + .tail(dictionary, prefixKey, keySerializer, valueSerializer, keyHashFunction, + keyHashSerializer, d -> {})), true); } public int getKeyHashBinaryLength() { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java index 101b8a6..39c3279 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java @@ -34,13 +34,8 @@ public class SubStageGetterHashSet implements Mono> prefixKeyMono) { return Mono.usingWhen(prefixKeyMono, prefixKey -> Mono - .fromSupplier(() -> DatabaseSetDictionaryHashed - .tail(dictionary, - prefixKey, - keySerializer, - keyHashFunction, - keyHashSerializer - ) + .fromSupplier(() -> DatabaseSetDictionaryHashed.tail(dictionary, prefixKey, keySerializer, + keyHashFunction, keyHashSerializer, d -> {}) ), prefixKey -> Mono.fromRunnable(prefixKey::close) ); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java index dee4cb6..772f417 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java @@ -4,6 +4,7 @@ import io.net5.buffer.api.Buffer; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; +import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import java.util.Map; @@ -25,11 +26,9 @@ public class SubStageGetterMap implements SubStageGetter, Databa public Mono> subStage(LLDictionary dictionary, @Nullable CompositeSnapshot snapshot, Mono> prefixKeyMono) { - return Mono.usingWhen(prefixKeyMono, + return LLUtils.usingSend(prefixKeyMono, prefixKey -> Mono.fromSupplier(() -> DatabaseMapDictionary - .tail(dictionary, prefixKey, keySerializer, valueSerializer)), - prefixKey -> Mono.fromRunnable(prefixKey::close) - ); + .tail(dictionary, prefixKey, keySerializer, valueSerializer, d -> {})), true); } public int getKeyBinaryLength() { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java index 5794c85..34e25c5 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java @@ -4,6 +4,7 @@ import io.net5.buffer.api.Buffer; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; +import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import java.util.Map; import org.jetbrains.annotations.Nullable; @@ -40,11 +41,9 @@ public class SubStageGetterMapDeep> implements public Mono> subStage(LLDictionary dictionary, @Nullable CompositeSnapshot snapshot, Mono> prefixKeyMono) { - return Mono.usingWhen(prefixKeyMono, - prefixKey -> Mono.fromSupplier(() -> DatabaseMapDictionaryDeep - .deepIntermediate(dictionary, prefixKey, keySerializer, subStageGetter, keyExtLength)), - prefixKey -> Mono.fromRunnable(prefixKey::close) - ); + return LLUtils.usingSend(prefixKeyMono, prefixKey -> Mono.just(DatabaseMapDictionaryDeep + .deepIntermediate(dictionary, prefixKey, keySerializer, subStageGetter, keyExtLength, + d -> {})), true); } public int getKeyBinaryLength() { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java index 0e26fea..6b6260c 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java @@ -24,7 +24,7 @@ public class SubStageGetterSet implements SubStageGetter, Dat Mono> prefixKeyMono) { return Mono.usingWhen(prefixKeyMono, prefixKey -> Mono - .fromSupplier(() -> DatabaseSetDictionary.tail(dictionary, prefixKey, keySerializer)), + .fromSupplier(() -> DatabaseSetDictionary.tail(dictionary, prefixKey, keySerializer, d -> {})), prefixKey -> Mono.fromRunnable(prefixKey::close) ); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java index eb71e26..533b4a6 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java @@ -20,12 +20,7 @@ public class SubStageGetterSingle implements SubStageGetter> subStage(LLDictionary dictionary, @Nullable CompositeSnapshot snapshot, Mono> keyPrefixMono) { - return Mono.usingWhen( - keyPrefixMono, - keyPrefix -> Mono - .>fromSupplier(() -> new DatabaseSingle<>(dictionary, keyPrefix, serializer)), - keyPrefix -> Mono.fromRunnable(keyPrefix::close) - ); + return keyPrefixMono.map(keyPrefix -> new DatabaseSingle<>(dictionary, keyPrefix, serializer, d -> {})); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcher.java b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcher.java index f5b6e5e..19ee489 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcher.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcher.java @@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.disk; import io.net5.buffer.api.Drop; import io.net5.buffer.api.Owned; import io.net5.buffer.api.internal.ResourceSupport; +import it.cavallium.dbengine.database.LiveResourceSupport; import java.io.IOException; import org.apache.lucene.index.IndexReader; import org.apache.lucene.search.IndexSearcher; @@ -11,7 +12,7 @@ import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class LLIndexSearcher extends ResourceSupport { +public class LLIndexSearcher extends LiveResourceSupport { private IndexSearcher indexSearcher; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearchers.java b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearchers.java index 04203dd..a6a0411 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearchers.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearchers.java @@ -5,6 +5,7 @@ import io.net5.buffer.api.Owned; import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; import io.net5.buffer.api.internal.ResourceSupport; +import it.cavallium.dbengine.database.LiveResourceSupport; import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import java.io.IOException; import java.io.UncheckedIOException; @@ -37,7 +38,7 @@ public interface LLIndexSearchers extends Resource { IndexReader allShards(); - class UnshardedIndexSearchers extends ResourceSupport + class UnshardedIndexSearchers extends LiveResourceSupport implements LLIndexSearchers { private LLIndexSearcher indexSearcher; @@ -103,7 +104,7 @@ public interface LLIndexSearchers extends Resource { } } - class ShardedIndexSearchers extends ResourceSupport + class ShardedIndexSearchers extends LiveResourceSupport implements LLIndexSearchers { private List indexSearchers; diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index cff94bf..98079ff 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -9,6 +9,7 @@ import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.EnglishItalianStopFilter; import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLSnapshot; +import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.collections.DatabaseMapDictionary; import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep; import it.cavallium.dbengine.database.collections.ValueGetter; @@ -231,9 +232,8 @@ public class LuceneUtils { public static ValueGetter, V> getAsyncDbValueGetterDeep( CompositeSnapshot snapshot, DatabaseMapDictionaryDeep, DatabaseMapDictionary> dictionaryDeep) { - return entry -> dictionaryDeep - .at(snapshot, entry.getKey()) - .flatMap(sub -> sub.getValue(snapshot, entry.getValue()).doAfterTerminate(sub::release)); + return entry -> LLUtils.usingResource(dictionaryDeep + .at(snapshot, entry.getKey()), sub -> sub.getValue(snapshot, entry.getValue()), true); } public static PerFieldAnalyzerWrapper toPerFieldAnalyzerWrapper(IndicizerAnalyzers indicizerAnalyzers) { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneSearchResult.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneSearchResult.java index 9a52926..f92d27f 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneSearchResult.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneSearchResult.java @@ -6,6 +6,7 @@ import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLSearchResultShard; +import it.cavallium.dbengine.database.LiveResourceSupport; import it.cavallium.dbengine.database.disk.LLLocalKeyValueDatabase; import java.io.IOException; import java.util.Objects; @@ -14,7 +15,7 @@ import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -public final class LuceneSearchResult extends ResourceSupport { +public final class LuceneSearchResult extends LiveResourceSupport { private static final Logger logger = LoggerFactory.getLogger(LuceneSearchResult.class); diff --git a/src/main/java/it/cavallium/dbengine/netty/NullableBuffer.java b/src/main/java/it/cavallium/dbengine/netty/NullableBuffer.java index 43f4a25..038f65e 100644 --- a/src/main/java/it/cavallium/dbengine/netty/NullableBuffer.java +++ b/src/main/java/it/cavallium/dbengine/netty/NullableBuffer.java @@ -6,9 +6,10 @@ import io.net5.buffer.api.Owned; import io.net5.buffer.api.Send; import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.client.SearchResult; +import it.cavallium.dbengine.database.LiveResourceSupport; import org.jetbrains.annotations.Nullable; -public class NullableBuffer extends ResourceSupport { +public class NullableBuffer extends LiveResourceSupport { @Nullable private Buffer buffer; diff --git a/src/test/java/it/cavallium/dbengine/DbTestUtils.java b/src/test/java/it/cavallium/dbengine/DbTestUtils.java index c16b139..3a06cfb 100644 --- a/src/test/java/it/cavallium/dbengine/DbTestUtils.java +++ b/src/test/java/it/cavallium/dbengine/DbTestUtils.java @@ -179,7 +179,8 @@ public class DbTestUtils { if (mapType == MapType.MAP) { return DatabaseMapDictionary.simple(dictionary, SerializerFixedBinaryLength.utf8(dictionary.getAllocator(), keyBytes), - Serializer.utf8(dictionary.getAllocator()) + Serializer.utf8(dictionary.getAllocator()), + d -> {} ); } else { return DatabaseMapDictionaryHashed.simple(dictionary, @@ -209,7 +210,8 @@ public class DbTestUtils { return out.send(); } } - } + }, + d -> {} ); } } @@ -224,7 +226,8 @@ public class DbTestUtils { key2Bytes, new SubStageGetterMap<>(SerializerFixedBinaryLength.utf8(dictionary.getAllocator(), key2Bytes), Serializer.utf8(dictionary.getAllocator()) - ) + ), + d -> {} ); } @@ -239,7 +242,8 @@ public class DbTestUtils { Serializer.utf8(dictionary.getAllocator()), String::hashCode, SerializerFixedBinaryLength.intSerializer(dictionary.getAllocator()) - ) + ), + d -> {} ); } @@ -249,7 +253,8 @@ public class DbTestUtils { Serializer.utf8(dictionary.getAllocator()), Serializer.utf8(dictionary.getAllocator()), String::hashCode, - SerializerFixedBinaryLength.intSerializer(dictionary.getAllocator()) + SerializerFixedBinaryLength.intSerializer(dictionary.getAllocator()), + d -> {} ); } } diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java index f56ce1d..cc1e3f6 100644 --- a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java +++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java @@ -115,8 +115,7 @@ public abstract class TestDictionaryMap { var resultingMap = run(map.get(null)); Assertions.assertEquals(shouldFail ? null : Map.of(key, value), resultingMap); - runVoid(map.close()); - map.release(); + map.close(); //if (shouldFail) this.checkLeaks = false; @@ -129,10 +128,10 @@ public abstract class TestDictionaryMap { var stpVer = StepVerifier .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) - .flatMap(map -> map - .at(null, key).flatMap(v -> v.set(value).doAfterTerminate(v::release)) - .then(map.at(null, key).flatMap(v -> v.get(null).doAfterTerminate(v::release))) - .doAfterTerminate(map::release) + .flatMap(map -> LLUtils + .usingResource(map.at(null, key), v -> v.set(value), true) + .then(LLUtils.usingResource(map.at(null, key), v -> v.get(null), true)) + .doFinally(s -> map.close()) ) )); if (shouldFail) { @@ -155,7 +154,7 @@ public abstract class TestDictionaryMap { map.putValueAndGetPrevious(key, value), map.putValueAndGetPrevious(key, value) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) )); if (shouldFail) { @@ -178,7 +177,7 @@ public abstract class TestDictionaryMap { map.putValue(key, value).then(map.removeAndGetPrevious(key)), map.removeAndGetPrevious(key) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) )); if (shouldFail) { @@ -201,7 +200,7 @@ public abstract class TestDictionaryMap { map.putValue(key, value).then(map.removeAndGetStatus(key)), map.removeAndGetStatus(key) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) )); if (shouldFail) { @@ -244,7 +243,7 @@ public abstract class TestDictionaryMap { return value; }) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) .transform(LLUtils::handleDiscard) )); @@ -311,7 +310,7 @@ public abstract class TestDictionaryMap { .doOnSuccess(s -> log.debug("5. Getting value: {}", key)) .then(map.getValue(null, key)) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) .transform(LLUtils::handleDiscard) )); @@ -336,7 +335,7 @@ public abstract class TestDictionaryMap { map.remove(key), map.putValueAndGetChanged(key, "error?").single() ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) )); if (shouldFail) { @@ -395,7 +394,7 @@ public abstract class TestDictionaryMap { map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), map.getMulti(null, Flux.fromIterable(entries.keySet())) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) .filter(k -> k.getValue().isPresent()) .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) @@ -423,7 +422,7 @@ public abstract class TestDictionaryMap { .flatMapMany(map -> map .setAllValues(Flux.fromIterable(entries.entrySet())) .thenMany(map.getMulti(null, Flux.fromIterable(entries.keySet()))) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) .filter(k -> k.getValue().isPresent()) .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) @@ -453,7 +452,7 @@ public abstract class TestDictionaryMap { map.setAllValuesAndGetPrevious(Flux.fromIterable(entries.entrySet())), map.setAllValuesAndGetPrevious(Flux.fromIterable(entries.entrySet())) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) .transform(LLUtils::handleDiscard) )); @@ -481,7 +480,7 @@ public abstract class TestDictionaryMap { map.set(entries).then(Mono.empty()), map.getMulti(null, Flux.fromIterable(entries.keySet())) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) .filter(k -> k.getValue().isPresent()) .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) @@ -522,7 +521,7 @@ public abstract class TestDictionaryMap { removalMono.then(Mono.empty()), map.setAndGetChanged(entries).single() ) - .doAfterTerminate(map::release); + .doFinally(s -> map.close()); }) )); if (shouldFail) { @@ -544,7 +543,7 @@ public abstract class TestDictionaryMap { .concat(map.setAndGetPrevious(entries), map.setAndGetPrevious(entries)) .map(Map::entrySet) .concatMapIterable(list -> list) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) )); if (shouldFail) { @@ -570,7 +569,7 @@ public abstract class TestDictionaryMap { .concat(map.set(entries).then(Mono.empty()), map.clearAndGetPrevious(), map.get(null)) .map(Map::entrySet) .concatMapIterable(list -> list) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) )); if (shouldFail) { @@ -597,7 +596,7 @@ public abstract class TestDictionaryMap { map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), map.getAllValues(null) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) .transform(LLUtils::handleDiscard) )); @@ -627,7 +626,7 @@ public abstract class TestDictionaryMap { .map(Map::entrySet) .flatMapIterable(list -> list) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) .transform(LLUtils::handleDiscard) )); @@ -659,10 +658,10 @@ public abstract class TestDictionaryMap { .getValue() .get(null) .map(val -> Map.entry(stage.getKey(), val)) - .doAfterTerminate(() -> stage.getValue().release()) + .doFinally(s -> stage.getValue().close()) ) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) .transform(LLUtils::handleDiscard) )); @@ -691,7 +690,7 @@ public abstract class TestDictionaryMap { map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), map.isEmpty(null) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) .flatMap(val -> shouldFail ? Mono.empty() : Mono.just(val)) .transform(LLUtils::handleDiscard) @@ -718,7 +717,7 @@ public abstract class TestDictionaryMap { map.clear().then(Mono.empty()), map.isEmpty(null) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) .flatMap(val -> shouldFail ? Mono.empty() : Mono.just(val)) .transform(LLUtils::handleDiscard) diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java index d7073c7..dd79298 100644 --- a/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java +++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java @@ -12,6 +12,7 @@ import static it.cavallium.dbengine.DbTestUtils.tempDatabaseMapDictionaryMap; import static it.cavallium.dbengine.DbTestUtils.tempDb; import static it.cavallium.dbengine.DbTestUtils.tempDictionary; +import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.DbTestUtils.TestAllocator; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; @@ -196,8 +197,7 @@ public abstract class TestDictionaryMapDeep { var resultingMap = run(map.get(null)); Assertions.assertEquals(shouldFail ? null : Map.of(key, value), resultingMap); - runVoid(map.close()); - map.release(); + map.close(); //if (shouldFail) this.checkLeaks = false; @@ -220,8 +220,7 @@ public abstract class TestDictionaryMapDeep { Assertions.assertEquals(shouldFail ? null : value, returnedValue); - runVoid(map.close()); - map.release(); + map.close(); //if (shouldFail) this.checkLeaks = false; @@ -240,7 +239,7 @@ public abstract class TestDictionaryMapDeep { .flatMapMany(map -> map .putValue(key, value) .thenMany(map.getAllValues(null)) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) )); if (shouldFail) { @@ -265,14 +264,14 @@ public abstract class TestDictionaryMapDeep { .flatMap(v_ -> Mono.using( () -> v_, v -> v.set(value), - DatabaseMapDictionaryDeep::release + ResourceSupport::close )) .then(map .at(null, "capra") .flatMap(v_ -> Mono.using( () -> v_, v -> v.set(Map.of("normal", "123", "ormaln", "456")), - DatabaseMapDictionaryDeep::release + ResourceSupport::close )) ) .thenMany(map @@ -280,10 +279,10 @@ public abstract class TestDictionaryMapDeep { .flatMap(v -> v.getValue() .getAllValues(null) .map(result -> Tuples.of(v.getKey(), result.getKey(), result.getValue())) - .doAfterTerminate(() -> v.getValue().release()) + .doFinally(s -> v.getValue().close()) ) ), - DatabaseMapDictionaryDeep::release + ResourceSupport::close )) )); if (shouldFail) { @@ -308,9 +307,9 @@ public abstract class TestDictionaryMapDeep { .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMap(map -> map - .at(null, key1).flatMap(v -> v.putValue(key2, value).doAfterTerminate(v::release)) - .then(map.at(null, key1).flatMap(v -> v.getValue(null, key2).doAfterTerminate(v::release))) - .doAfterTerminate(map::release) + .at(null, key1).flatMap(v -> v.putValue(key2, value).doFinally(s -> v.close())) + .then(map.at(null, key1).flatMap(v -> v.getValue(null, key2).doFinally(s -> v.close()))) + .doFinally(s -> map.close()) ) )); if (shouldFail) { @@ -335,7 +334,7 @@ public abstract class TestDictionaryMapDeep { map.putValueAndGetPrevious(key, value), map.putValueAndGetPrevious(key, value) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) )); if (shouldFail) { @@ -358,22 +357,22 @@ public abstract class TestDictionaryMapDeep { .at(null, key1) .flatMap(v -> v .putValueAndGetPrevious(key2, "error?") - .doAfterTerminate(v::release) + .doFinally(s -> v.close()) ), map .at(null, key1) .flatMap(v -> v .putValueAndGetPrevious(key2, value) - .doAfterTerminate(v::release) + .doFinally(s -> v.close()) ), map .at(null, key1) .flatMap(v -> v .putValueAndGetPrevious(key2, value) - .doAfterTerminate(v::release) + .doFinally(s -> v.close()) ) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) )); if (shouldFail) { @@ -396,7 +395,7 @@ public abstract class TestDictionaryMapDeep { map.putValue(key, value).then(map.removeAndGetPrevious(key)), map.removeAndGetPrevious(key) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) )); if (shouldFail) { @@ -420,22 +419,22 @@ public abstract class TestDictionaryMapDeep { .flatMap(v -> v .putValue(key2, "error?") .then(v.removeAndGetPrevious(key2)) - .doAfterTerminate(v::release) + .doFinally(s -> v.close()) ), map .at(null, key1) .flatMap(v -> v .putValue(key2, value) .then(v.removeAndGetPrevious(key2)) - .doAfterTerminate(v::release) + .doFinally(s -> v.close()) ), map .at(null, key1) .flatMap(v -> v.removeAndGetPrevious(key2) - .doAfterTerminate(v::release) + .doFinally(s -> v.close()) ) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) )); if (shouldFail) { @@ -458,7 +457,7 @@ public abstract class TestDictionaryMapDeep { map.putValue(key, value).then(map.removeAndGetStatus(key)), map.removeAndGetStatus(key) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) )); if (shouldFail) { @@ -482,22 +481,22 @@ public abstract class TestDictionaryMapDeep { .flatMap(v -> v .putValue(key2, "error?") .then(v.removeAndGetStatus(key2)) - .doAfterTerminate(v::release) + .doFinally(s -> v.close()) ), map .at(null, key1) .flatMap(v -> v .putValue(key2, value) .then(v.removeAndGetStatus(key2)) - .doAfterTerminate(v::release) + .doFinally(s -> v.close()) ), map .at(null, key1) .flatMap(v -> v.removeAndGetStatus(key2) - .doAfterTerminate(v::release) + .doFinally(s -> v.close()) ) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) )); if (shouldFail) { @@ -540,7 +539,7 @@ public abstract class TestDictionaryMapDeep { return value; }) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) )); if (updateMode != UpdateMode.ALLOW_UNSAFE || shouldFail) { @@ -565,28 +564,28 @@ public abstract class TestDictionaryMapDeep { .at(null, key1) .flatMap(v -> v .updateValue(key2, prev -> prev) - .doAfterTerminate(v::release) + .doFinally(s -> v.close()) ), map .at(null, key1) .flatMap(v -> v .updateValue(key2, prev -> value) - .doAfterTerminate(v::release) + .doFinally(s -> v.close()) ), map .at(null, key1) .flatMap(v -> v .updateValue(key2, prev -> value) - .doAfterTerminate(v::release) + .doFinally(s -> v.close()) ), map .at(null, key1) .flatMap(v -> v .updateValue(key2, prev -> null) - .doAfterTerminate(v::release) + .doFinally(s -> v.close()) ) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) .transform(LLUtils::handleDiscard) ) )); @@ -626,7 +625,7 @@ public abstract class TestDictionaryMapDeep { assert Objects.equals(old, value); return value; }).then(map.getValue(null, key)) - ).doAfterTerminate(map::release)) + ).doFinally(s -> map.close())) )); if (updateMode != UpdateMode.ALLOW_UNSAFE || shouldFail) { stpVer.verifyError(); @@ -652,7 +651,7 @@ public abstract class TestDictionaryMapDeep { .updateValue(key2, prev -> prev) .then(v.getValue(null, key2)) .defaultIfEmpty("empty") - .doAfterTerminate(v::release) + .doFinally(s -> v.close()) ), map .at(null, key1) @@ -660,7 +659,7 @@ public abstract class TestDictionaryMapDeep { .updateValue(key2, prev -> value) .then(v.getValue(null, key2)) .defaultIfEmpty("empty") - .doAfterTerminate(v::release) + .doFinally(s -> v.close()) ), map .at(null, key1) @@ -668,7 +667,7 @@ public abstract class TestDictionaryMapDeep { .updateValue(key2, prev -> value) .then(v.getValue(null, key2)) .defaultIfEmpty("empty") - .doAfterTerminate(v::release) + .doFinally(s -> v.close()) ), map .at(null, key1) @@ -676,10 +675,10 @@ public abstract class TestDictionaryMapDeep { .updateValue(key2, prev -> null) .then(v.getValue(null, key2)) .defaultIfEmpty("empty") - .doAfterTerminate(v::release) + .doFinally(s -> v.close()) ) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) .transform(LLUtils::handleDiscard) ) )); @@ -704,7 +703,7 @@ public abstract class TestDictionaryMapDeep { map.remove(key), map.putValueAndGetChanged(key, Map.of("error?", "error.")).single() ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) )); if (shouldFail) { @@ -758,7 +757,7 @@ public abstract class TestDictionaryMapDeep { map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), map.getMulti(null, Flux.fromIterable(entries.keySet())) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) .filter(k -> k.getValue().isPresent()) .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) @@ -786,7 +785,7 @@ public abstract class TestDictionaryMapDeep { .flatMapMany(map -> map .setAllValues(Flux.fromIterable(entries.entrySet())) .thenMany(map.getMulti(null, Flux.fromIterable(entries.keySet()))) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) .filter(k -> k.getValue().isPresent()) .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) @@ -815,7 +814,7 @@ public abstract class TestDictionaryMapDeep { map.setAllValuesAndGetPrevious(Flux.fromIterable(entries.entrySet())), map.setAllValuesAndGetPrevious(Flux.fromIterable(entries.entrySet())) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) .transform(LLUtils::handleDiscard) ) )); @@ -843,7 +842,7 @@ public abstract class TestDictionaryMapDeep { map.set(entries).then(Mono.empty()), map.getMulti(null, Flux.fromIterable(entries.keySet())) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) .filter(k -> k.getValue().isPresent()) .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) @@ -881,7 +880,7 @@ public abstract class TestDictionaryMapDeep { removalMono.then(Mono.empty()), map.setAndGetChanged(entries).single() ) - .doAfterTerminate(map::release); + .doFinally(s -> map.close()); }) .transform(LLUtils::handleDiscard) )); @@ -907,7 +906,7 @@ public abstract class TestDictionaryMapDeep { ) .map(Map::entrySet) .concatMapIterable(list -> list) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) )); if (shouldFail) { @@ -933,7 +932,7 @@ public abstract class TestDictionaryMapDeep { .concat(map.set(entries).then(Mono.empty()), map.clearAndGetPrevious(), map.get(null)) .map(Map::entrySet) .concatMapIterable(list -> list) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) .transform(LLUtils::handleDiscard) )); @@ -961,7 +960,7 @@ public abstract class TestDictionaryMapDeep { map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), map.getAllValues(null) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) )); if (shouldFail) { @@ -990,7 +989,7 @@ public abstract class TestDictionaryMapDeep { .map(Map::entrySet) .flatMapIterable(list -> list) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) )); if (shouldFail) { @@ -1021,10 +1020,10 @@ public abstract class TestDictionaryMapDeep { .getValue() .get(null) .map(val -> Map.entry(stage.getKey(), val)) - .doAfterTerminate(() -> stage.getValue().release()) + .doFinally(s -> stage.getValue().close()) ) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) )); if (shouldFail) { @@ -1051,7 +1050,7 @@ public abstract class TestDictionaryMapDeep { map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), map.isEmpty(null) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) .transform(LLUtils::handleDiscard) )); @@ -1077,7 +1076,7 @@ public abstract class TestDictionaryMapDeep { map.clear().then(Mono.empty()), map.isEmpty(null) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) )); if (shouldFail) { diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeepHashMap.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeepHashMap.java index 8486a82..9a81610 100644 --- a/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeepHashMap.java +++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeepHashMap.java @@ -121,14 +121,14 @@ public abstract class TestDictionaryMapDeepHashMap { .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMapHashMap(dict, 5)) .flatMapMany(map -> map - .at(null, key1).flatMap(v -> v.putValue(key2, value).doAfterTerminate(v::release)) + .at(null, key1).flatMap(v -> v.putValue(key2, value).doFinally(s -> v.close())) .thenMany(map .getAllValues(null) .map(Entry::getValue) .flatMap(maps -> Flux.fromIterable(maps.entrySet())) .map(Entry::getValue) ) - .doAfterTerminate(map::release) + .doFinally(s -> map.close()) ) )); if (shouldFail) {