From 8eedc27bbad73330f14126bb45b5e5e46b23657f Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 12 May 2021 21:41:47 +0200 Subject: [PATCH] Allow multiple resubscriptions to the same flux --- .../cavallium/dbengine/database/LLUtils.java | 13 + .../collections/DatabaseMapDictionary.java | 155 +++++------ .../DatabaseMapDictionaryDeep.java | 177 ++++++------- .../DatabaseMapDictionaryHashed.java | 4 +- .../database/collections/DatabaseSingle.java | 28 +- .../collections/DatabaseStageMap.java | 20 +- .../collections/SubStageGetterHashMap.java | 57 ++-- .../collections/SubStageGetterHashSet.java | 55 ++-- .../collections/SubStageGetterMap.java | 53 ++-- .../collections/SubStageGetterMapDeep.java | 55 ++-- .../collections/SubStageGetterSet.java | 40 +-- .../collections/SubStageGetterSingle.java | 4 +- .../database/disk/LLLocalDictionary.java | 243 +++++++++--------- .../disk/LLLocalReactiveRocksIterator.java | 7 +- .../dbengine/lucene/LuceneUtils.java | 2 +- .../cavallium/dbengine/TestDictionaryMap.java | 46 ++-- .../dbengine/TestDictionaryMapDeep.java | 100 +++---- .../TestDictionaryMapDeepHashMap.java | 4 +- 18 files changed, 555 insertions(+), 508 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 2b4773a..39a0d00 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -7,6 +7,7 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; +import io.netty.util.IllegalReferenceCountException; import it.cavallium.dbengine.lucene.RandomSortField; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -156,6 +157,18 @@ public class LLUtils { return new it.cavallium.dbengine.database.LLKeyScore(hit.getKey(), hit.getScore()); } + public static String toStringSafe(ByteBuf key) { + try { + if (key.refCnt() > 0) { + return toString(key); + } else { + return "(released)"; + } + } catch (IllegalReferenceCountException ex) { + return "(released)"; + } + } + public static String toString(ByteBuf key) { if (key == null) { return "null"; diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 5bf17fd..8f347fc 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -68,8 +68,8 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep deserialize(entry.getValue()), HashMap::new) .filter(map -> !map.isEmpty()) - .doFirst(() -> range.retain()) - .doFinally(s -> range.release()); + .doFirst(range::retain) + .doAfterTerminate(range::release); } @Override @@ -87,8 +87,8 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep range.retain()) - .doFinally(s -> range.release()); + .doFirst(range::retain) + .doAfterTerminate(range::release); } @Override @@ -99,27 +99,26 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) { - return Mono.defer(() -> dictionary.sizeRange(resolveSnapshot(snapshot), range.retain(), fast)) - .doFirst(() -> range.retain()) - .doFinally(s -> range.release()); + return Mono + .defer(() -> dictionary.sizeRange(resolveSnapshot(snapshot), range.retain(), fast)) + .doFirst(range::retain) + .doAfterTerminate(range::release); } @Override public Mono isEmpty(@Nullable CompositeSnapshot snapshot) { - return Mono.defer(() -> dictionary.isRangeEmpty(resolveSnapshot(snapshot), range.retain())) - .doFirst(() -> range.retain()) - .doFinally(s -> range.release()); + return Mono + .defer(() -> dictionary.isRangeEmpty(resolveSnapshot(snapshot), range.retain())) + .doFirst(range::retain) + .doAfterTerminate(range::release); } @Override public Mono> at(@Nullable CompositeSnapshot snapshot, T keySuffix) { return Mono - .using( - () -> toKey(serializeSuffix(keySuffix)), - keyBuf -> Mono - .fromSupplier(() -> new DatabaseSingle<>(dictionary, keyBuf.retain(), Serializer.noop())) - .>map(entry -> new DatabaseSingleMapped<>(entry, valueSerializer)), - ReferenceCounted::release + .fromSupplier(() -> new DatabaseSingleMapped<>( + new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), Serializer.noop()) + , valueSerializer) ); } @@ -279,41 +278,41 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep removeAndGetPrevious(T keySuffix) { - ByteBuf keySuffixBuf = serializeSuffix(keySuffix); - ByteBuf keyBuf = toKey(keySuffixBuf.retain()); - return dictionary - .remove(keyBuf.retain(), LLDictionaryResultType.PREVIOUS_VALUE) - .map(this::deserialize) - .doFinally(s -> { - keyBuf.release(); - keySuffixBuf.release(); - }); + return Mono + .using( + () -> toKey(serializeSuffix(keySuffix)), + keyBuf -> dictionary + .remove(keyBuf.retain(), LLDictionaryResultType.PREVIOUS_VALUE) + .map(this::deserialize), + ReferenceCounted::release + ); } @Override public Mono removeAndGetStatus(T keySuffix) { - ByteBuf keySuffixBuf = serializeSuffix(keySuffix); - ByteBuf keyBuf = toKey(keySuffixBuf.retain()); - return dictionary - .remove(keyBuf.retain(), LLDictionaryResultType.PREVIOUS_VALUE_EXISTENCE) - .map(LLUtils::responseToBoolean) - .doFinally(s -> { - keyBuf.release(); - keySuffixBuf.release(); - }); + return Mono + .using( + () -> toKey(serializeSuffix(keySuffix)), + keyBuf -> dictionary + .remove(keyBuf.retain(), LLDictionaryResultType.PREVIOUS_VALUE_EXISTENCE) + .map(LLUtils::responseToBoolean), + ReferenceCounted::release + ); } @Override public Flux> getMulti(@Nullable CompositeSnapshot snapshot, Flux keys, boolean existsAlmostCertainly) { - return dictionary - .getMulti(resolveSnapshot(snapshot), keys.flatMap(keySuffix -> Mono.fromCallable(() -> { - ByteBuf keySuffixBuf = serializeSuffix(keySuffix); - try { - return toKey(keySuffixBuf.retain()); - } finally { - keySuffixBuf.release(); - } - })), existsAlmostCertainly) + return Flux + .defer(() -> dictionary + .getMulti(resolveSnapshot(snapshot), keys.flatMap(keySuffix -> Mono.fromCallable(() -> { + ByteBuf keySuffixBuf = serializeSuffix(keySuffix); + try { + return toKey(keySuffixBuf.retain()); + } finally { + keySuffixBuf.release(); + } + })), existsAlmostCertainly) + ) .flatMap(entry -> Mono .fromCallable(() -> Map.entry(deserializeSuffix(stripPrefix(entry.getKey(), false)), deserialize(entry.getValue()))) ); @@ -352,28 +351,33 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>> getAllStages(@Nullable CompositeSnapshot snapshot) { - return Flux.defer(() -> dictionary.getRangeKeys(resolveSnapshot(snapshot), range.retain())) + return Flux + .defer(() -> dictionary.getRangeKeys(resolveSnapshot(snapshot), range.retain())) .>>map(key -> { - ByteBuf keySuffixWithExt = stripPrefix(key, false); - // Don't use "key" under this point --- + ByteBuf keySuffixWithExt = stripPrefix(key.retain(), false); try { - return Map.entry(deserializeSuffix(keySuffixWithExt.retainedSlice()), - new DatabaseSingleMapped<>(new DatabaseSingle<>(dictionary, - toKey(keySuffixWithExt.retainedSlice()), - Serializer.noop() - ), valueSerializer) - ); + try { + return Map.entry(deserializeSuffix(keySuffixWithExt.retainedSlice()), + new DatabaseSingleMapped<>(new DatabaseSingle<>(dictionary, + toKey(keySuffixWithExt.retainedSlice()), + Serializer.noop() + ), valueSerializer) + ); + } finally { + keySuffixWithExt.release(); + } } finally { - keySuffixWithExt.release(); + key.release(); } }) - .doFirst(() -> range.retain()) - .doFinally(s -> range.release()); + .doFirst(range::retain) + .doAfterTerminate(range::release); } @Override public Flux> getAllValues(@Nullable CompositeSnapshot snapshot) { - return Flux.defer(() -> dictionary.getRange(resolveSnapshot(snapshot), range.retain())) + return Flux + .defer(() -> dictionary.getRange(resolveSnapshot(snapshot), range.retain())) .map(serializedEntry -> Map.entry( deserializeSuffix(stripPrefix(serializedEntry.getKey(), false)), valueSerializer.deserialize(serializedEntry.getValue()) @@ -384,8 +388,8 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep range.retain()) - .doFinally(s -> range.release()); + .doFirst(range::retain) + .doAfterTerminate(range::release); } @Override @@ -401,28 +405,27 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep range.retain()) - .doFinally(s -> range.release()); + .doFirst(range::retain) + .doAfterTerminate(range::release); } @Override public Mono clear() { - if (range.isAll()) { - return dictionary - .clear(); - } else if (range.isSingle()) { - return Mono - .defer(() -> dictionary.remove(range.getSingle().retain(), LLDictionaryResultType.VOID)) - .doOnNext(ReferenceCounted::release) - .then() - .doFirst(() -> range.getSingle().retain()) - .doFinally(s -> range.getSingle().release()); - } else { - return Mono - .defer(() -> dictionary.setRange(range.retain(), Flux.empty())) - .doFirst(() -> range.retain()) - .doFinally(s -> range.release()); - } + return Mono + .defer(() -> { + if (range.isAll()) { + return dictionary.clear(); + } else if (range.isSingle()) { + return dictionary + .remove(range.getSingle().retain(), LLDictionaryResultType.VOID) + .doOnNext(ReferenceCounted::release) + .then(); + } else { + return dictionary.setRange(range.retain(), Flux.empty()); + } + }) + .doFirst(range::retain) + .doAfterTerminate(range::release); } /** diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index de7c1f4..a7da26d 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -11,6 +11,7 @@ import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.disk.LLLocalDictionary; +import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSlice; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import java.util.Collection; import java.util.List; @@ -367,15 +368,16 @@ public class DatabaseMapDictionaryDeep> implem public Mono leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) { return Mono .defer(() -> dictionary.sizeRange(resolveSnapshot(snapshot), range.retain(), fast)) - .doFirst(() -> range.retain()) - .doFinally(s -> range.release()); + .doFirst(range::retain) + .doAfterTerminate(range::release); } @Override public Mono isEmpty(@Nullable CompositeSnapshot snapshot) { - return Mono.defer(() -> dictionary.isRangeEmpty(resolveSnapshot(snapshot), range.retain())) - .doFirst(() -> range.retain()) - .doFinally(s -> range.release()); + return Mono + .defer(() -> dictionary.isRangeEmpty(resolveSnapshot(snapshot), range.retain())) + .doFirst(range::retain) + .doAfterTerminate(range::release); } @Override @@ -430,72 +432,76 @@ public class DatabaseMapDictionaryDeep> implem @Override public Flux> getAllStages(@Nullable CompositeSnapshot snapshot) { - if (LLLocalDictionary.DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED && this.subStageGetter.needsDebuggingKeyFlux()) { - return Flux - .defer(() -> dictionary - .getRangeKeysGrouped(resolveSnapshot(snapshot), range.retain(), - keyPrefixLength + keySuffixLength) - ) - .flatMapSequential(rangeKeys -> Flux - .using( - () -> { - assert this.subStageGetter.isMultiKey() || rangeKeys.size() == 1; - ByteBuf groupKeyWithExt = rangeKeys.get(0).retainedSlice(); - ByteBuf groupKeyWithoutExt = removeExtFromFullKey(groupKeyWithExt.retain(), true); - ByteBuf groupSuffix = this.stripPrefix(groupKeyWithoutExt.retain(), true); - return new GroupBuffers(groupKeyWithExt, groupKeyWithoutExt, groupSuffix); - }, - buffers -> Mono - .fromCallable(() -> { - assert subStageKeysConsistency(buffers.groupKeyWithExt.readableBytes()); - return null; - }) - .then(this.subStageGetter - .subStage(dictionary, - snapshot, - buffers.groupKeyWithoutExt.retain(), - rangeKeys.stream().map(ByteBuf::retain).collect(Collectors.toList()) - ) - .map(us -> Map.entry(this.deserializeSuffix(buffers.groupSuffix.retain()), us)) - ), - buffers -> { - buffers.groupSuffix.release(); - buffers.groupKeyWithoutExt.release(); - buffers.groupKeyWithExt.release(); - } - ) - .doFinally(s -> { - for (ByteBuf rangeKey : rangeKeys) { - rangeKey.release(); - } - }) - ) - .doOnDiscard(Collection.class, discardedCollection -> { - //noinspection unchecked - var rangeKeys = (Collection) discardedCollection; - for (ByteBuf rangeKey : rangeKeys) { - rangeKey.release(); - } - }); - } else { - return Flux - .defer(() -> dictionary - .getRangeKeyPrefixes(resolveSnapshot(snapshot), range.retain(), - keyPrefixLength + keySuffixLength) - ) - .flatMapSequential(groupKeyWithoutExt -> { - ByteBuf groupSuffix = this.stripPrefix(groupKeyWithoutExt.retain(), true); - assert subStageKeysConsistency(groupKeyWithoutExt.readableBytes() + keyExtLength); - return this.subStageGetter - .subStage(dictionary, - snapshot, - groupKeyWithoutExt.retain(), - List.of() + return Flux + .defer(() -> { + if (LLLocalDictionary.DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED && this.subStageGetter.needsDebuggingKeyFlux()) { + return Flux + .defer(() -> dictionary.getRangeKeysGrouped(resolveSnapshot(snapshot), range.retain(), keyPrefixLength + keySuffixLength)) + .flatMapSequential(rangeKeys -> Flux + .using( + () -> { + assert this.subStageGetter.isMultiKey() || rangeKeys.size() == 1; + ByteBuf groupKeyWithExt = rangeKeys.get(0).retainedSlice(); + ByteBuf groupKeyWithoutExt = removeExtFromFullKey(groupKeyWithExt.retain(), true); + ByteBuf groupSuffix = this.stripPrefix(groupKeyWithoutExt.retain(), true); + return new GroupBuffers(groupKeyWithExt, groupKeyWithoutExt, groupSuffix); + }, + buffers -> Mono + .fromCallable(() -> { + assert subStageKeysConsistency(buffers.groupKeyWithExt.readableBytes()); + return null; + }) + .then(this.subStageGetter + .subStage(dictionary, + snapshot, + buffers.groupKeyWithoutExt.retain(), + rangeKeys.stream().map(ByteBuf::retain).collect(Collectors.toList()) + ) + .map(us -> Map.entry(this.deserializeSuffix(buffers.groupSuffix.retain()), us)) + ), + buffers -> { + buffers.groupSuffix.release(); + buffers.groupKeyWithoutExt.release(); + buffers.groupKeyWithExt.release(); + } + ) + .doAfterTerminate(() -> { + for (ByteBuf rangeKey : rangeKeys) { + rangeKey.release(); + } + }) ) - .map(us -> Map.entry(this.deserializeSuffix(groupSuffix.retain()), us)) - .doFinally(s -> groupSuffix.release()); - }); - } + .doOnDiscard(Collection.class, discardedCollection -> { + //noinspection unchecked + var rangeKeys = (Collection) discardedCollection; + for (ByteBuf rangeKey : rangeKeys) { + rangeKey.release(); + } + }); + } else { + return Flux + .defer(() -> dictionary.getRangeKeyPrefixes(resolveSnapshot(snapshot), range.retain(), keyPrefixLength + keySuffixLength)) + .flatMapSequential(groupKeyWithoutExt -> Mono + .using( + () -> { + var groupSuffix = this.stripPrefix(groupKeyWithoutExt.retain(), true); + assert subStageKeysConsistency(groupKeyWithoutExt.readableBytes() + keyExtLength); + return groupSuffix; + }, + groupSuffix -> this.subStageGetter + .subStage(dictionary, + snapshot, + groupKeyWithoutExt.retain(), + List.of() + ) + .map(us -> Map.entry(this.deserializeSuffix(groupSuffix.retain()), us)), + ReferenceCounted::release + ) + ); + } + }) + .doFirst(range::retain) + .doAfterTerminate(range::release); } private boolean subStageKeysConsistency(int totalKeyLength) { @@ -521,7 +527,7 @@ public class DatabaseMapDictionaryDeep> implem .at(null, entry.getKey()) .flatMap(us -> us .set(entry.getValue()) - .doFinally(s -> us.release()) + .doAfterTerminate(us::release) ) ) .doOnDiscard(DatabaseStage.class, DatabaseStage::release) @@ -532,22 +538,19 @@ public class DatabaseMapDictionaryDeep> implem @Override public Mono clear() { - if (range.isAll()) { - return dictionary - .clear(); - } else if (range.isSingle()) { - return Mono - .defer(() -> dictionary - .remove(range.getSingle().retain(), LLDictionaryResultType.VOID) - .doOnNext(ReferenceCounted::release) - ) - .then(); - } else { - return Mono - .defer(() -> dictionary - .setRange(range.retain(), Flux.empty()) - ); - } + return Mono + .defer(() -> { + if (range.isAll()) { + return dictionary.clear(); + } else if (range.isSingle()) { + return dictionary + .remove(range.getSingle().retain(), LLDictionaryResultType.VOID) + .doOnNext(ReferenceCounted::release) + .then(); + } else { + return dictionary.setRange(range.retain(), Flux.empty()); + } + }); } //todo: temporary wrapper. convert the whole class to buffers diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java index f24bd84..fa9d507 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java @@ -176,7 +176,7 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap this .at(snapshot, key) - .flatMap(stage -> Mono.just(Map.entry(key, stage)).doFinally(s -> stage.release())) + .flatMap(stage -> Mono.just(Map.entry(key, stage)).doAfterTerminate(stage::release)) ) ); } @@ -194,7 +194,7 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap stage .setAndGetPrevious(entry.getValue()) .map(prev -> Map.entry(entry.getKey(), prev)) - .doFinally(s -> stage.release())) + .doAfterTerminate(stage::release)) ); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java index 88d7f76..3c13b28 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java @@ -46,8 +46,8 @@ public class DatabaseSingle implements DatabaseStageEntry { return Mono .defer(() -> dictionary.get(resolveSnapshot(snapshot), key.retain(), existsAlmostCertainly)) .map(this::deserialize) - .doFirst(() -> key.retain()) - .doFinally(s -> key.release()); + .doFirst(key::retain) + .doAfterTerminate(key::release); } @Override @@ -60,8 +60,8 @@ public class DatabaseSingle implements DatabaseStageEntry { .map(this::deserialize), ReferenceCounted::release ) - .doFirst(() -> key.retain()) - .doFinally(s -> key.release()); + .doFirst(key::retain) + .doAfterTerminate(key::release); } @Override @@ -78,8 +78,8 @@ public class DatabaseSingle implements DatabaseStageEntry { } }, updateReturnMode, existsAlmostCertainly)) .map(this::deserialize) - .doFirst(() -> key.retain()) - .doFinally(s -> key.release()); + .doFirst(key::retain) + .doAfterTerminate(key::release); } @Override @@ -94,8 +94,8 @@ public class DatabaseSingle implements DatabaseStageEntry { return this.serialize(result); } }, existsAlmostCertainly).transform(mono -> LLUtils.mapDelta(mono, this::deserialize))) - .doFirst(() -> key.retain()) - .doFinally(s -> key.release()); + .doFirst(key::retain) + .doAfterTerminate(key::release); } @Override @@ -105,8 +105,8 @@ public class DatabaseSingle implements DatabaseStageEntry { .remove(key.retain(), LLDictionaryResultType.PREVIOUS_VALUE) ) .map(this::deserialize) - .doFirst(() -> key.retain()) - .doFinally(s -> key.release()); + .doFirst(key::retain) + .doAfterTerminate(key::release); } @Override @@ -116,8 +116,8 @@ public class DatabaseSingle implements DatabaseStageEntry { .isRangeEmpty(resolveSnapshot(snapshot), LLRange.single(key.retain())) ) .map(empty -> empty ? 0L : 1L) - .doFirst(() -> key.retain()) - .doFinally(s -> key.release()); + .doFirst(key::retain) + .doAfterTerminate(key::release); } @Override @@ -126,8 +126,8 @@ public class DatabaseSingle implements DatabaseStageEntry { .defer(() -> dictionary .isRangeEmpty(resolveSnapshot(snapshot), LLRange.single(key.retain())) ) - .doFirst(() -> key.retain()) - .doFinally(s -> key.release()); + .doFirst(key::retain) + .doAfterTerminate(key::release); } //todo: temporary wrapper. convert the whole class to buffers diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index abe1659..476ffda 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -27,7 +27,7 @@ public interface DatabaseStageMap> extends Dat Mono at(@Nullable CompositeSnapshot snapshot, T key); default Mono getValue(@Nullable CompositeSnapshot snapshot, T key, boolean existsAlmostCertainly) { - return this.at(snapshot, key).flatMap(v -> v.get(snapshot, existsAlmostCertainly).doFinally(s -> v.release())); + return this.at(snapshot, key).flatMap(v -> v.get(snapshot, existsAlmostCertainly).doAfterTerminate(v::release)); } default Mono getValue(@Nullable CompositeSnapshot snapshot, T key) { @@ -39,7 +39,7 @@ public interface DatabaseStageMap> extends Dat } default Mono putValue(T key, U value) { - return at(null, key).single().flatMap(v -> v.set(value).doFinally(s -> v.release())); + return at(null, key).single().flatMap(v -> v.set(value).doAfterTerminate(v::release)); } Mono getUpdateMode(); @@ -50,7 +50,7 @@ public interface DatabaseStageMap> extends Dat .single() .flatMap(v -> v .update(updater, updateReturnMode, existsAlmostCertainly) - .doFinally(s -> v.release()) + .doAfterTerminate(v::release) ); } @@ -72,7 +72,7 @@ public interface DatabaseStageMap> extends Dat .single() .flatMap(v -> v .updateAndGetDelta(updater, existsAlmostCertainly) - .doFinally(s -> v.release()) + .doAfterTerminate(v::release) ); } @@ -81,7 +81,7 @@ public interface DatabaseStageMap> extends Dat } default Mono putValueAndGetPrevious(T key, U value) { - return at(null, key).single().flatMap(v -> v.setAndGetPrevious(value).doFinally(s -> v.release())); + return at(null, key).single().flatMap(v -> v.setAndGetPrevious(value).doAfterTerminate(v::release)); } /** @@ -91,7 +91,7 @@ public interface DatabaseStageMap> extends Dat * @return true if the key was associated with any value, false if the key didn't exist. */ default Mono putValueAndGetChanged(T key, U value) { - return at(null, key).single().flatMap(v -> v.setAndGetChanged(value).doFinally(s -> v.release())).single(); + return at(null, key).single().flatMap(v -> v.setAndGetChanged(value).doAfterTerminate(v::release)).single(); } default Mono remove(T key) { @@ -99,7 +99,7 @@ public interface DatabaseStageMap> extends Dat } default Mono removeAndGetPrevious(T key) { - return at(null, key).flatMap(v -> v.clearAndGetPrevious().doFinally(s -> v.release())); + return at(null, key).flatMap(v -> v.clearAndGetPrevious().doAfterTerminate(v::release)); } default Mono removeAndGetStatus(T key) { @@ -129,7 +129,7 @@ public interface DatabaseStageMap> extends Dat .getValue() .get(snapshot, true) .map(value -> Map.entry(entry.getKey(), value)) - .doFinally(s -> entry.getValue().release()) + .doAfterTerminate(() -> entry.getValue().release()) ); } @@ -152,7 +152,7 @@ public interface DatabaseStageMap> extends Dat .flatMap(entriesReplacer) .flatMap(replacedEntry -> this .at(null, replacedEntry.getKey()) - .flatMap(v -> v.set(replacedEntry.getValue()).doFinally(s -> v.release()))) + .flatMap(v -> v.set(replacedEntry.getValue()).doAfterTerminate(v::release))) .then(); } } @@ -162,7 +162,7 @@ public interface DatabaseStageMap> extends Dat .getAllStages(null) .flatMap(stage -> Mono .defer(() -> entriesReplacer.apply(stage)) - .doFinally(s -> stage.getValue().release()) + .doAfterTerminate(() -> stage.getValue().release()) ) .then(); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java index 58bd100..18fd68b 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java @@ -46,32 +46,37 @@ public class SubStageGetterHashMap implements @Nullable CompositeSnapshot snapshot, ByteBuf prefixKey, List debuggingKeys) { - return Mono - .defer(() -> { - if (assertsEnabled) { - return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys); - } else { - return Mono - .fromCallable(() -> { - for (ByteBuf key : debuggingKeys) { - key.release(); - } - return null; - }); - } - }) - .then(Mono - .fromSupplier(() -> DatabaseMapDictionaryHashed - .tail(dictionary, - prefixKey.retain(), - keySerializer, - valueSerializer, - keyHashFunction, - keyHashSerializer - ) - ) - ) - .doFinally(s -> prefixKey.release()); + try { + return Mono + .defer(() -> { + if (assertsEnabled) { + return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys); + } else { + return Mono + .fromCallable(() -> { + for (ByteBuf key : debuggingKeys) { + key.release(); + } + return null; + }); + } + }) + .then(Mono + .fromSupplier(() -> DatabaseMapDictionaryHashed + .tail(dictionary, + prefixKey.retain(), + keySerializer, + valueSerializer, + keyHashFunction, + keyHashSerializer + ) + ) + ) + .doFirst(prefixKey::retain) + .doAfterTerminate(prefixKey::release); + } finally { + prefixKey.release(); + } } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java index f5ea9f1..b241c71 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java @@ -44,31 +44,36 @@ public class SubStageGetterHashSet implements @Nullable CompositeSnapshot snapshot, ByteBuf prefixKey, List debuggingKeys) { - return Mono - .defer(() -> { - if (assertsEnabled) { - return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys); - } else { - return Mono - .fromCallable(() -> { - for (ByteBuf key : debuggingKeys) { - key.release(); - } - return null; - }); - } - }) - .then(Mono - .fromSupplier(() -> DatabaseSetDictionaryHashed - .tail(dictionary, - prefixKey.retain(), - keySerializer, - keyHashFunction, - keyHashSerializer - ) - ) - ) - .doFinally(s -> prefixKey.release()); + try { + return Mono + .defer(() -> { + if (assertsEnabled) { + return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys); + } else { + return Mono + .fromCallable(() -> { + for (ByteBuf key : debuggingKeys) { + key.release(); + } + return null; + }); + } + }) + .then(Mono + .fromSupplier(() -> DatabaseSetDictionaryHashed + .tail(dictionary, + prefixKey.retain(), + keySerializer, + keyHashFunction, + keyHashSerializer + ) + ) + ) + .doFirst(prefixKey::retain) + .doAfterTerminate(prefixKey::release); + } finally { + prefixKey.release(); + } } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java index 2dc09ca..cf1ecfe 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java @@ -37,30 +37,35 @@ public class SubStageGetterMap implements SubStageGetter, Databa @Nullable CompositeSnapshot snapshot, ByteBuf prefixKey, List debuggingKeys) { - return Mono - .defer(() -> { - if (assertsEnabled) { - return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys); - } else { - return Mono - .fromCallable(() -> { - for (ByteBuf key : debuggingKeys) { - key.release(); - } - return null; - }); - } - }) - .then(Mono - .fromSupplier(() -> DatabaseMapDictionary - .tail(dictionary, - prefixKey.retain(), - keySerializer, - valueSerializer - ) - ) - ) - .doFinally(s -> prefixKey.release()); + try { + return Mono + .defer(() -> { + if (assertsEnabled) { + return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys); + } else { + return Mono + .fromCallable(() -> { + for (ByteBuf key : debuggingKeys) { + key.release(); + } + return null; + }); + } + }) + .then(Mono + .fromSupplier(() -> DatabaseMapDictionary + .tail(dictionary, + prefixKey.retain(), + keySerializer, + valueSerializer + ) + ) + ) + .doFirst(prefixKey::retain) + .doAfterTerminate(prefixKey::release); + } finally { + prefixKey.release(); + } } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java index 3906eb8..2e5b0f0 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java @@ -51,31 +51,36 @@ public class SubStageGetterMapDeep> implements @Nullable CompositeSnapshot snapshot, ByteBuf prefixKey, List debuggingKeys) { - return Mono - .defer(() -> { - if (assertsEnabled) { - return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys); - } else { - return Mono - .fromCallable(() -> { - for (ByteBuf key : debuggingKeys) { - key.release(); - } - return null; - }); - } - }) - .then(Mono - .fromSupplier(() -> DatabaseMapDictionaryDeep - .deepIntermediate(dictionary, - prefixKey.retain(), - keySerializer, - subStageGetter, - keyExtLength - ) - ) - ) - .doFinally(s -> prefixKey.release()); + try { + return Mono + .defer(() -> { + if (assertsEnabled) { + return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys); + } else { + return Mono + .fromCallable(() -> { + for (ByteBuf key : debuggingKeys) { + key.release(); + } + return null; + }); + } + }) + .then(Mono + .fromSupplier(() -> DatabaseMapDictionaryDeep + .deepIntermediate(dictionary, + prefixKey.retain(), + keySerializer, + subStageGetter, + keyExtLength + ) + ) + ) + .doFirst(prefixKey::retain) + .doAfterTerminate(prefixKey::release); + } finally { + prefixKey.release(); + } } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java index 15a252e..7ee13fb 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java @@ -38,9 +38,7 @@ public class SubStageGetterSet implements SubStageGetter, Dat return Mono .defer(() -> { if (assertsEnabled) { - return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys) - .doFirst(() -> prefixKey.retain()) - .doFinally(s -> prefixKey.release()); + return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys); } else { return Mono .fromCallable(() -> { @@ -60,8 +58,8 @@ public class SubStageGetterSet implements SubStageGetter, Dat ) ) ) - .doFirst(() -> prefixKey.retain()) - .doFinally(s -> prefixKey.release()); + .doFirst(prefixKey::retain) + .doAfterTerminate(prefixKey::release); } finally { prefixKey.release(); } @@ -78,20 +76,26 @@ public class SubStageGetterSet implements SubStageGetter, Dat } private Mono checkKeyFluxConsistency(ByteBuf prefixKey, List keys) { - return Mono - .fromCallable(() -> { - try { - for (ByteBuf key : keys) { - assert key.readableBytes() == prefixKey.readableBytes() + getKeyBinaryLength(); + try { + return Mono + .fromCallable(() -> { + try { + for (ByteBuf key : keys) { + assert key.readableBytes() == prefixKey.readableBytes() + getKeyBinaryLength(); + } + } finally { + prefixKey.release(); + for (ByteBuf key : keys) { + key.release(); + } } - } finally { - prefixKey.release(); - for (ByteBuf key : keys) { - key.release(); - } - } - return null; - }); + return null; + }) + .doFirst(prefixKey::retain) + .doAfterTerminate(prefixKey::release); + } finally { + prefixKey.release(); + } } public int getKeyBinaryLength() { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java index f30c00b..9fee2a9 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java @@ -52,8 +52,8 @@ public class SubStageGetterSingle implements SubStageGetter>fromSupplier(() -> new DatabaseSingle<>(dictionary, keyPrefix.retain(), serializer)) ) - .doFirst(() -> keyPrefix.retain()) - .doFinally(s -> keyPrefix.release()); + .doFirst(keyPrefix::retain) + .doAfterTerminate(keyPrefix::release); } finally { keyPrefix.release(); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 6b342de..c9bca88 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -208,7 +208,7 @@ public class LLLocalDictionary implements LLDictionary { } try { if (logger.isTraceEnabled()) { - logger.trace("Reading {}", LLUtils.toString(key)); + logger.trace("Reading {}", LLUtils.toStringSafe(key)); } return dbGet(cfh, resolveSnapshot(snapshot), key.retain(), existsAlmostCertainly); } finally { @@ -217,10 +217,10 @@ public class LLLocalDictionary implements LLDictionary { } } }) - .onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toString(key), cause)) .subscribeOn(dbScheduler) - .doFirst(() -> key.retain()) - .doFinally(s -> key.release()); + .onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause)) + .doFirst(key::retain) + .doAfterTerminate(key::release); } finally { key.release(); } @@ -368,16 +368,14 @@ public class LLLocalDictionary implements LLDictionary { return Mono .defer(() -> { if (range.isSingle()) { - return this - .containsKey(snapshot, range.getSingle().retain()); + return this.containsKey(snapshot, range.getSingle().retain()); } else { - return this - .containsRange(snapshot, range.retain()); + return this.containsRange(snapshot, range.retain()); } }) .map(isContained -> !isContained) - .doFirst(() -> range.retain()) - .doFinally(s -> range.release()); + .doFirst(range::retain) + .doAfterTerminate(range::release); } finally { range.release(); } @@ -425,8 +423,8 @@ public class LLLocalDictionary implements LLDictionary { }) .onErrorMap(cause -> new IOException("Failed to read range " + range.toString(), cause)) .subscribeOn(dbScheduler) - .doFirst(() -> range.retain()) - .doFinally(s -> range.release()); + .doFirst(range::retain) + .doAfterTerminate(range::release); } finally { range.release(); } @@ -464,10 +462,10 @@ public class LLLocalDictionary implements LLDictionary { } } }) - .onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toString(key), cause)) + .onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause)) .subscribeOn(dbScheduler) - .doFirst(() -> key.retain()) - .doFinally(s -> key.release()); + .doFirst(key::retain) + .doAfterTerminate(key::release); } finally { key.release(); } @@ -492,7 +490,7 @@ public class LLLocalDictionary implements LLDictionary { } try { if (logger.isTraceEnabled()) { - logger.trace("Writing {}: {}", LLUtils.toString(key), LLUtils.toString(value)); + logger.trace("Writing {}: {}", LLUtils.toStringSafe(key), LLUtils.toStringSafe(value)); } dbPut(cfh, null, key.retain(), value.retain()); return null; @@ -503,14 +501,14 @@ public class LLLocalDictionary implements LLDictionary { } }) .subscribeOn(dbScheduler) - .onErrorMap(cause -> new IOException("Failed to write " + LLUtils.toString(key), cause)) + .onErrorMap(cause -> new IOException("Failed to write " + LLUtils.toStringSafe(key), cause)) ) .singleOrEmpty() .doFirst(() -> { key.retain(); value.retain(); }) - .doFinally(s -> { + .doAfterTerminate(() -> { key.release(); value.release(); }); @@ -550,7 +548,7 @@ public class LLLocalDictionary implements LLDictionary { } try { if (logger.isTraceEnabled()) { - logger.trace("Reading {}", LLUtils.toString(key)); + logger.trace("Reading {}", LLUtils.toStringSafe(key)); } while (true) { @Nullable ByteBuf prevData; @@ -597,7 +595,7 @@ public class LLLocalDictionary implements LLDictionary { } } if (logger.isTraceEnabled()) { - logger.trace("Deleting {}", LLUtils.toString(key)); + logger.trace("Deleting {}", LLUtils.toStringSafe(key)); } dbDelete(cfh, null, key.retain()); } else if (newData != null @@ -615,7 +613,7 @@ public class LLLocalDictionary implements LLDictionary { } } if (logger.isTraceEnabled()) { - logger.trace("Writing {}: {}", LLUtils.toString(key), LLUtils.toString(newData)); + logger.trace("Writing {}: {}", LLUtils.toStringSafe(key), LLUtils.toStringSafe(newData)); } dbPut(cfh, null, key.retain(), newData.retain()); } @@ -646,10 +644,10 @@ public class LLLocalDictionary implements LLDictionary { } } }) - .onErrorMap(cause -> new IOException("Failed to read or write " + (key.refCnt() > 0 ? LLUtils.toString(key) : "(released)"), cause)) + .onErrorMap(cause -> new IOException("Failed to read or write " + LLUtils.toStringSafe(key), cause)) .subscribeOn(dbScheduler) - .doFirst(() -> key.retain()) - .doFinally(s -> key.release()); + .doFirst(key::retain) + .doAfterTerminate(key::release); } finally { key.release(); } @@ -677,7 +675,7 @@ public class LLLocalDictionary implements LLDictionary { } try { if (logger.isTraceEnabled()) { - logger.trace("Reading {}", LLUtils.toString(key)); + logger.trace("Reading {}", LLUtils.toStringSafe(key)); } while (true) { @Nullable ByteBuf prevData; @@ -724,7 +722,7 @@ public class LLLocalDictionary implements LLDictionary { } } if (logger.isTraceEnabled()) { - logger.trace("Deleting {}", LLUtils.toString(key)); + logger.trace("Deleting {}", LLUtils.toStringSafe(key)); } dbDelete(cfh, null, key.retain()); } else if (newData != null @@ -742,7 +740,7 @@ public class LLLocalDictionary implements LLDictionary { } } if (logger.isTraceEnabled()) { - logger.trace("Writing {}: {}", LLUtils.toString(key), LLUtils.toString(newData)); + logger.trace("Writing {}: {}", LLUtils.toStringSafe(key), LLUtils.toStringSafe(newData)); } dbPut(cfh, null, key.retain(), newData.retain()); } @@ -767,10 +765,10 @@ public class LLLocalDictionary implements LLDictionary { } } }) - .onErrorMap(cause -> new IOException("Failed to read or write " + (key.refCnt() > 0 ? LLUtils.toString(key) : "(released)"), cause)) + .onErrorMap(cause -> new IOException("Failed to read or write " + LLUtils.toStringSafe(key), cause)) .subscribeOn(dbScheduler) - .doFirst(() -> key.retain()) - .doFinally(s -> key.release()); + .doFirst(key::retain) + .doAfterTerminate(key::release); } finally { key.release(); } @@ -812,7 +810,7 @@ public class LLLocalDictionary implements LLDictionary { } try { if (logger.isTraceEnabled()) { - logger.trace("Deleting {}", LLUtils.toString(key)); + logger.trace("Deleting {}", LLUtils.toStringSafe(key)); } dbDelete(cfh, null, key.retain()); return null; @@ -822,13 +820,13 @@ public class LLLocalDictionary implements LLDictionary { } } }) - .onErrorMap(cause -> new IOException("Failed to delete " + LLUtils.toString(key), cause)) + .onErrorMap(cause -> new IOException("Failed to delete " + LLUtils.toStringSafe(key), cause)) .subscribeOn(dbScheduler) .then(Mono.empty()) ) .singleOrEmpty() - .doFirst(() -> key.retain()) - .doFinally(s -> key.release()); + .doFirst(key::retain) + .doAfterTerminate(key::release); } finally { key.release(); } @@ -844,7 +842,7 @@ public class LLLocalDictionary implements LLDictionary { .containsKey(null, key.retain()) .single() .map(LLUtils::booleanToResponseByteBuffer) - .doFinally(s -> { + .doAfterTerminate(() -> { assert key.refCnt() > 0; }); case PREVIOUS_VALUE: @@ -884,7 +882,7 @@ public class LLLocalDictionary implements LLDictionary { } } }) - .onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toString(key), cause)) + .onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause)) .subscribeOn(dbScheduler); case VOID: return Mono.empty(); @@ -892,8 +890,8 @@ public class LLLocalDictionary implements LLDictionary { return Mono.error(new IllegalStateException("Unexpected value: " + resultType)); } }) - .doFirst(() -> key.retain()) - .doFinally(s -> key.release()); + .doFirst(key::retain) + .doAfterTerminate(key::release); } finally { key.release(); } @@ -956,7 +954,7 @@ public class LLLocalDictionary implements LLDictionary { .flatMapMany(Flux::fromIterable) .onErrorMap(cause -> new IOException("Failed to read keys " + Arrays.deepToString(keysWindow.toArray(ByteBuf[]::new)), cause)) - .doFinally(s -> keysWindow.forEach(ReferenceCounted::release)) + .doAfterTerminate(() -> keysWindow.forEach(ReferenceCounted::release)) ) ) .doOnDiscard(Entry.class, discardedEntry -> { @@ -1081,8 +1079,8 @@ public class LLLocalDictionary implements LLDictionary { return getRangeMulti(snapshot, range.retain()); } }) - .doFirst(() -> range.retain()) - .doFinally(s -> range.release()); + .doFirst(range::retain) + .doAfterTerminate(range::release); } finally { range.release(); } @@ -1101,8 +1099,8 @@ public class LLLocalDictionary implements LLDictionary { return getRangeMultiGrouped(snapshot, range.retain(), prefixLength); } }) - .doFirst(() -> range.retain()) - .doFinally(s -> range.release()); + .doFirst(range::retain) + .doAfterTerminate(range::release); } finally { range.release(); } @@ -1114,8 +1112,8 @@ public class LLLocalDictionary implements LLDictionary { .defer(() -> this.get(snapshot, key.retain(), existsAlmostCertainly)) .map(value -> Map.entry(key.retain(), value)) .flux() - .doFirst(() -> key.retain()) - .doFinally(s -> key.release()); + .doFirst(key::retain) + .doAfterTerminate(key::release); } finally { key.release(); } @@ -1136,8 +1134,8 @@ public class LLLocalDictionary implements LLDictionary { castedEntry.getValue().release(); }) .subscribeOn(dbScheduler) - .doFirst(() -> range.retain()) - .doFinally(s -> range.release()); + .doFirst(range::retain) + .doAfterTerminate(range::release); } finally { range.release(); } @@ -1159,8 +1157,8 @@ public class LLLocalDictionary implements LLDictionary { LLLocalGroupedReactiveRocksIterator::release ) .subscribeOn(dbScheduler) - .doFirst(() -> range.retain()) - .doFinally(s -> range.release()); + .doFirst(range::retain) + .doAfterTerminate(range::release); } finally { range.release(); } @@ -1177,8 +1175,8 @@ public class LLLocalDictionary implements LLDictionary { return this.getRangeKeysMulti(snapshot, range.retain()); } }) - .doFirst(() -> range.retain()) - .doFinally(s -> range.release()); + .doFirst(range::retain) + .doAfterTerminate(range::release); } finally { range.release(); } @@ -1201,8 +1199,8 @@ public class LLLocalDictionary implements LLDictionary { LLLocalGroupedReactiveRocksIterator::release ) .subscribeOn(dbScheduler) - .doFirst(() -> range.retain()) - .doFinally(s -> range.release()); + .doFirst(range::retain) + .doAfterTerminate(range::release); } finally { range.release(); } @@ -1226,8 +1224,8 @@ public class LLLocalDictionary implements LLDictionary { LLLocalKeyPrefixReactiveRocksIterator::release ) .subscribeOn(dbScheduler) - .doFirst(() -> range.retain()) - .doFinally(s -> range.release()); + .doFirst(range::retain) + .doAfterTerminate(range::release); } finally { range.release(); } @@ -1246,8 +1244,8 @@ public class LLLocalDictionary implements LLDictionary { } }) .doOnDiscard(ByteBuf.class, ReferenceCounted::release) - .doFirst(() -> key.retain()) - .doFinally(s -> key.release()); + .doFirst(key::retain) + .doAfterTerminate(key::release); } finally { key.release(); } @@ -1263,8 +1261,8 @@ public class LLLocalDictionary implements LLDictionary { ) .doOnDiscard(ByteBuf.class, ReferenceCounted::release) .subscribeOn(dbScheduler) - .doFirst(() -> range.retain()) - .doFinally(s -> range.release()); + .doFirst(range::retain) + .doAfterTerminate(range::release); } finally { range.release(); } @@ -1387,8 +1385,8 @@ public class LLLocalDictionary implements LLDictionary { ) .then() .onErrorMap(cause -> new IOException("Failed to write range", cause)) - .doFirst(() -> range.retain()) - .doFinally(s -> range.release()); + .doFirst(range::retain) + .doAfterTerminate(range::release); } else { if (USE_WRITE_BATCHES_IN_SET_RANGE) { return Mono.fromCallable(() -> { @@ -1415,8 +1413,8 @@ public class LLLocalDictionary implements LLDictionary { .then(Mono.empty()) ) .onErrorMap(cause -> new IOException("Failed to write range", cause)) - .doFirst(() -> range.retain()) - .doFinally(s -> range.release()); + .doFirst(range::retain) + .doAfterTerminate(range::release); } } finally { range.release(); @@ -1630,61 +1628,64 @@ public class LLLocalDictionary implements LLDictionary { @Override public Mono sizeRange(@Nullable LLSnapshot snapshot, LLRange range, boolean fast) { try { - Mono result; - if (range.isAll()) { - result = Mono - .fromCallable(() -> fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot)) - .onErrorMap(IOException::new) - .subscribeOn(dbScheduler); - } else { - result = Mono - .fromCallable(() -> { - var readOpts = resolveSnapshot(snapshot); - readOpts.setFillCache(false); - readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); - ReleasableSlice minBound; - if (range.hasMin()) { - minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain()); - } else { - minBound = emptyReleasableSlice(); - } - try { - ReleasableSlice maxBound; - if (range.hasMax()) { - maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain()); - } else { - maxBound = emptyReleasableSlice(); - } - try { - if (fast) { - readOpts.setIgnoreRangeDeletions(true); - - } - try (var rocksIterator = db.newIterator(cfh, readOpts)) { - if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - rocksIterSeekTo(rocksIterator, range.getMin().retain()); + return Mono + .defer(() -> { + if (range.isAll()) { + return Mono + .fromCallable(() -> fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot)) + .onErrorMap(IOException::new) + .subscribeOn(dbScheduler); + } else { + return Mono + .fromCallable(() -> { + var readOpts = resolveSnapshot(snapshot); + readOpts.setFillCache(false); + readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); + ReleasableSlice minBound; + if (range.hasMin()) { + minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain()); } else { - rocksIterator.seekToFirst(); + minBound = emptyReleasableSlice(); } - long i = 0; - while (rocksIterator.isValid()) { - rocksIterator.next(); - i++; + try { + ReleasableSlice maxBound; + if (range.hasMax()) { + maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain()); + } else { + maxBound = emptyReleasableSlice(); + } + try { + if (fast) { + readOpts.setIgnoreRangeDeletions(true); + + } + try (var rocksIterator = db.newIterator(cfh, readOpts)) { + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + rocksIterSeekTo(rocksIterator, range.getMin().retain()); + } else { + rocksIterator.seekToFirst(); + } + long i = 0; + while (rocksIterator.isValid()) { + rocksIterator.next(); + i++; + } + return i; + } + } finally { + maxBound.release(); + } + } finally { + minBound.release(); } - return i; - } - } finally { - maxBound.release(); - } - } finally { - minBound.release(); - } - }) - .onErrorMap(cause -> new IOException("Failed to get size of range " - + range.toString(), cause)) - .subscribeOn(dbScheduler); - } - return result.doFirst(() -> range.retain()).doFinally(s -> range.release()); + }) + .onErrorMap(cause -> new IOException("Failed to get size of range " + + range.toString(), cause)) + .subscribeOn(dbScheduler); + } + }) + .doFirst(range::retain) + .doAfterTerminate(range::release); } finally { range.release(); } @@ -1738,8 +1739,8 @@ public class LLLocalDictionary implements LLDictionary { } }) .subscribeOn(dbScheduler) - .doFirst(() -> range.retain()) - .doFinally(s -> range.release()); + .doFirst(range::retain) + .doAfterTerminate(range::release); } finally { range.release(); } @@ -1785,8 +1786,8 @@ public class LLLocalDictionary implements LLDictionary { } }) .subscribeOn(dbScheduler) - .doFirst(() -> range.retain()) - .doFinally(s -> range.release()); + .doFirst(range::retain) + .doAfterTerminate(range::release); } finally { range.release(); } @@ -1934,8 +1935,8 @@ public class LLLocalDictionary implements LLDictionary { }) .onErrorMap(cause -> new IOException("Failed to delete " + range.toString(), cause)) .subscribeOn(dbScheduler) - .doFirst(() -> range.retain()) - .doFinally(s -> range.release()); + .doFirst(range::retain) + .doAfterTerminate(range::release); } finally { range.release(); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java index 0d9045e..ccb36bd 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java @@ -8,6 +8,7 @@ import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep; import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSlice; +import org.jetbrains.annotations.NotNull; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; @@ -43,7 +44,7 @@ public abstract class LLLocalReactiveRocksIterator { public Flux flux() { return Flux - .generate(() -> { + .>generate(() -> { var readOptions = new ReadOptions(this.readOptions); if (!range.hasMin() || !range.hasMax()) { readOptions.setReadaheadSize(2 * 1024 * 1024); @@ -84,7 +85,9 @@ public abstract class LLLocalReactiveRocksIterator { rocksIterator.close(); tuple.getT2().release(); tuple.getT3().release(); - }); + }) + .doFirst(range::retain) + .doAfterTerminate(range::release); } public abstract T getEntry(ByteBuf key, ByteBuf value); diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index 4543658..f5afe72 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -283,6 +283,6 @@ public class LuceneUtils { DatabaseMapDictionaryDeep, DatabaseMapDictionary> dictionaryDeep) { return entry -> dictionaryDeep .at(snapshot, entry.getKey()) - .flatMap(sub -> sub.getValue(snapshot, entry.getValue()).doFinally(s -> sub.release())); + .flatMap(sub -> sub.getValue(snapshot, entry.getValue()).doAfterTerminate(sub::release)); } } diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java index 6851e27..174b5a4 100644 --- a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java +++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java @@ -88,7 +88,7 @@ public class TestDictionaryMap { .flatMap(map -> map .putValue(key, value) .then(map.getValue(null, key)) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -105,9 +105,9 @@ public class TestDictionaryMap { .create(tempDb(db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, dbType, 5)) .flatMap(map -> map - .at(null, key).flatMap(v -> v.set(value).doFinally(s -> v.release())) - .then(map.at(null, key).flatMap(v -> v.get(null).doFinally(s -> v.release()))) - .doFinally(s -> map.release()) + .at(null, key).flatMap(v -> v.set(value).doAfterTerminate(v::release)) + .then(map.at(null, key).flatMap(v -> v.get(null).doAfterTerminate(v::release))) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -129,7 +129,7 @@ public class TestDictionaryMap { map.putValueAndGetPrevious(key, value), map.putValueAndGetPrevious(key, value) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -151,7 +151,7 @@ public class TestDictionaryMap { map.putValue(key, value).then(map.removeAndGetPrevious(key)), map.removeAndGetPrevious(key) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -173,7 +173,7 @@ public class TestDictionaryMap { map.putValue(key, value).then(map.removeAndGetStatus(key)), map.removeAndGetStatus(key) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -215,7 +215,7 @@ public class TestDictionaryMap { return value; }) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (updateMode == UpdateMode.DISALLOW || shouldFail) { @@ -257,7 +257,7 @@ public class TestDictionaryMap { return value; }).then(map.getValue(null, key)) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (updateMode == UpdateMode.DISALLOW || shouldFail) { @@ -281,7 +281,7 @@ public class TestDictionaryMap { map.remove(key), map.putValueAndGetChanged(key, "error?").single() ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -339,7 +339,7 @@ public class TestDictionaryMap { map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), map.getMulti(null, Flux.fromIterable(entries.keySet())) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -363,7 +363,7 @@ public class TestDictionaryMap { .flatMapMany(map -> map .setAllValues(Flux.fromIterable(entries.entrySet())) .thenMany(map.getMulti(null, Flux.fromIterable(entries.keySet()))) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -389,7 +389,7 @@ public class TestDictionaryMap { map.setAllValuesAndGetPrevious(Flux.fromIterable(entries.entrySet())), map.setAllValuesAndGetPrevious(Flux.fromIterable(entries.entrySet())) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -415,7 +415,7 @@ public class TestDictionaryMap { map.set(entries).then(Mono.empty()), map.getMulti(null, Flux.fromIterable(entries.keySet())) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -450,7 +450,7 @@ public class TestDictionaryMap { removalMono.then(Mono.empty()), map.setAndGetChanged(entries).single() ) - .doFinally(s -> map.release()); + .doAfterTerminate(map::release); }) )); if (shouldFail) { @@ -471,7 +471,7 @@ public class TestDictionaryMap { .concat(map.setAndGetPrevious(entries), map.setAndGetPrevious(entries)) .map(Map::entrySet) .flatMap(Flux::fromIterable) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -496,7 +496,7 @@ public class TestDictionaryMap { .concat(map.set(entries).then(Mono.empty()), map.clearAndGetPrevious(), map.get(null)) .map(Map::entrySet) .flatMap(Flux::fromIterable) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -522,7 +522,7 @@ public class TestDictionaryMap { map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), map.getAllValues(null) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -550,7 +550,7 @@ public class TestDictionaryMap { .map(Map::entrySet) .flatMapMany(Flux::fromIterable) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -580,10 +580,10 @@ public class TestDictionaryMap { .getValue() .get(null) .map(val -> Map.entry(stage.getKey(), val)) - .doFinally(s -> stage.getValue().release()) + .doAfterTerminate(() -> stage.getValue().release()) ) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -610,7 +610,7 @@ public class TestDictionaryMap { map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), map.isEmpty(null) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) .flatMap(val -> shouldFail ? Mono.empty() : Mono.just(val)) )); @@ -636,7 +636,7 @@ public class TestDictionaryMap { map.clear().then(Mono.empty()), map.isEmpty(null) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) .flatMap(val -> shouldFail ? Mono.empty() : Mono.just(val)) )); diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java index ae446b0..c8e85ab 100644 --- a/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java +++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java @@ -155,7 +155,7 @@ public class TestDictionaryMapDeep { .flatMap(map -> map .putValue(key, value) .then(map.getValue(null, key)) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -177,7 +177,7 @@ public class TestDictionaryMapDeep { .flatMapMany(map -> map .putValue(key, value) .thenMany(map.getAllValues(null)) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -198,13 +198,13 @@ public class TestDictionaryMapDeep { .at(null, key) .flatMap(v -> v .set(value) - .doFinally(s -> v.release()) + .doAfterTerminate(v::release) ) .then(map .at(null, "capra") .flatMap(v -> v .set(Map.of("normal", "123", "ormaln", "456")) - .doFinally(s -> v.release()) + .doAfterTerminate(v::release) ) ) .thenMany(map @@ -212,10 +212,10 @@ public class TestDictionaryMapDeep { .flatMap(v -> v.getValue() .getAllValues(null) .map(result -> Tuples.of(v.getKey(), result.getKey(), result.getValue())) - .doFinally(s -> v.getValue().release()) + .doAfterTerminate(() -> v.getValue().release()) ) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -239,9 +239,9 @@ public class TestDictionaryMapDeep { .create(tempDb(db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMap(map -> map - .at(null, key1).flatMap(v -> v.putValue(key2, value).doFinally(s -> v.release())) - .then(map.at(null, key1).flatMap(v -> v.getValue(null, key2).doFinally(s -> v.release()))) - .doFinally(s -> map.release()) + .at(null, key1).flatMap(v -> v.putValue(key2, value).doAfterTerminate(v::release)) + .then(map.at(null, key1).flatMap(v -> v.getValue(null, key2).doAfterTerminate(v::release))) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -265,7 +265,7 @@ public class TestDictionaryMapDeep { map.putValueAndGetPrevious(key, value), map.putValueAndGetPrevious(key, value) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -287,22 +287,22 @@ public class TestDictionaryMapDeep { .at(null, key1) .flatMap(v -> v .putValueAndGetPrevious(key2, "error?") - .doFinally(s -> v.release()) + .doAfterTerminate(v::release) ), map .at(null, key1) .flatMap(v -> v .putValueAndGetPrevious(key2, value) - .doFinally(s -> v.release()) + .doAfterTerminate(v::release) ), map .at(null, key1) .flatMap(v -> v .putValueAndGetPrevious(key2, value) - .doFinally(s -> v.release()) + .doAfterTerminate(v::release) ) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -324,7 +324,7 @@ public class TestDictionaryMapDeep { map.putValue(key, value).then(map.removeAndGetPrevious(key)), map.removeAndGetPrevious(key) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -347,22 +347,22 @@ public class TestDictionaryMapDeep { .flatMap(v -> v .putValue(key2, "error?") .then(v.removeAndGetPrevious(key2)) - .doFinally(s -> v.release()) + .doAfterTerminate(v::release) ), map .at(null, key1) .flatMap(v -> v .putValue(key2, value) .then(v.removeAndGetPrevious(key2)) - .doFinally(s -> v.release()) + .doAfterTerminate(v::release) ), map .at(null, key1) .flatMap(v -> v.removeAndGetPrevious(key2) - .doFinally(s -> v.release()) + .doAfterTerminate(v::release) ) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -384,7 +384,7 @@ public class TestDictionaryMapDeep { map.putValue(key, value).then(map.removeAndGetStatus(key)), map.removeAndGetStatus(key) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -407,22 +407,22 @@ public class TestDictionaryMapDeep { .flatMap(v -> v .putValue(key2, "error?") .then(v.removeAndGetStatus(key2)) - .doFinally(s -> v.release()) + .doAfterTerminate(v::release) ), map .at(null, key1) .flatMap(v -> v .putValue(key2, value) .then(v.removeAndGetStatus(key2)) - .doFinally(s -> v.release()) + .doAfterTerminate(v::release) ), map .at(null, key1) .flatMap(v -> v.removeAndGetStatus(key2) - .doFinally(s -> v.release()) + .doAfterTerminate(v::release) ) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -464,7 +464,7 @@ public class TestDictionaryMapDeep { return value; }) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (updateMode != UpdateMode.ALLOW_UNSAFE || shouldFail) { @@ -489,28 +489,28 @@ public class TestDictionaryMapDeep { .at(null, key1) .flatMap(v -> v .updateValue(key2, prev -> prev) - .doFinally(s -> v.release()) + .doAfterTerminate(v::release) ), map .at(null, key1) .flatMap(v -> v .updateValue(key2, prev -> value) - .doFinally(s -> v.release()) + .doAfterTerminate(v::release) ), map .at(null, key1) .flatMap(v -> v .updateValue(key2, prev -> value) - .doFinally(s -> v.release()) + .doAfterTerminate(v::release) ), map .at(null, key1) .flatMap(v -> v .updateValue(key2, prev -> null) - .doFinally(s -> v.release()) + .doAfterTerminate(v::release) ) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (updateMode == UpdateMode.DISALLOW || shouldFail) { @@ -552,7 +552,7 @@ public class TestDictionaryMapDeep { return value; }).then(map.getValue(null, key)) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (updateMode != UpdateMode.ALLOW_UNSAFE || shouldFail) { @@ -579,7 +579,7 @@ public class TestDictionaryMapDeep { .updateValue(key2, prev -> prev) .then(v.getValue(null, key2)) .defaultIfEmpty("empty") - .doFinally(s -> v.release()) + .doAfterTerminate(v::release) ), map .at(null, key1) @@ -587,7 +587,7 @@ public class TestDictionaryMapDeep { .updateValue(key2, prev -> value) .then(v.getValue(null, key2)) .defaultIfEmpty("empty") - .doFinally(s -> v.release()) + .doAfterTerminate(v::release) ), map .at(null, key1) @@ -595,7 +595,7 @@ public class TestDictionaryMapDeep { .updateValue(key2, prev -> value) .then(v.getValue(null, key2)) .defaultIfEmpty("empty") - .doFinally(s -> v.release()) + .doAfterTerminate(v::release) ), map .at(null, key1) @@ -603,10 +603,10 @@ public class TestDictionaryMapDeep { .updateValue(key2, prev -> null) .then(v.getValue(null, key2)) .defaultIfEmpty("empty") - .doFinally(s -> v.release()) + .doAfterTerminate(v::release) ) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (updateMode == UpdateMode.DISALLOW || shouldFail) { @@ -630,7 +630,7 @@ public class TestDictionaryMapDeep { map.remove(key), map.putValueAndGetChanged(key, Map.of("error?", "error.")).single() ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -683,7 +683,7 @@ public class TestDictionaryMapDeep { map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), map.getMulti(null, Flux.fromIterable(entries.keySet())) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -707,7 +707,7 @@ public class TestDictionaryMapDeep { .flatMapMany(map -> map .setAllValues(Flux.fromIterable(entries.entrySet())) .thenMany(map.getMulti(null, Flux.fromIterable(entries.keySet()))) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -733,7 +733,7 @@ public class TestDictionaryMapDeep { map.setAllValuesAndGetPrevious(Flux.fromIterable(entries.entrySet())), map.setAllValuesAndGetPrevious(Flux.fromIterable(entries.entrySet())) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -759,7 +759,7 @@ public class TestDictionaryMapDeep { map.set(entries).then(Mono.empty()), map.getMulti(null, Flux.fromIterable(entries.keySet())) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -793,7 +793,7 @@ public class TestDictionaryMapDeep { removalMono.then(Mono.empty()), map.setAndGetChanged(entries).single() ) - .doFinally(s -> map.release()); + .doAfterTerminate(map::release); }) )); if (shouldFail) { @@ -817,7 +817,7 @@ public class TestDictionaryMapDeep { ) .map(Map::entrySet) .flatMap(Flux::fromIterable) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -842,7 +842,7 @@ public class TestDictionaryMapDeep { .concat(map.set(entries).then(Mono.empty()), map.clearAndGetPrevious(), map.get(null)) .map(Map::entrySet) .flatMap(Flux::fromIterable) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -868,7 +868,7 @@ public class TestDictionaryMapDeep { map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), map.getAllValues(null) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -896,7 +896,7 @@ public class TestDictionaryMapDeep { .map(Map::entrySet) .flatMapMany(Flux::fromIterable) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -926,10 +926,10 @@ public class TestDictionaryMapDeep { .getValue() .get(null) .map(val -> Map.entry(stage.getKey(), val)) - .doFinally(s -> stage.getValue().release()) + .doAfterTerminate(() -> stage.getValue().release()) ) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -955,7 +955,7 @@ public class TestDictionaryMapDeep { map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), map.isEmpty(null) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { @@ -979,7 +979,7 @@ public class TestDictionaryMapDeep { map.clear().then(Mono.empty()), map.isEmpty(null) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) { diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeepHashMap.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeepHashMap.java index 053b10e..36b38bb 100644 --- a/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeepHashMap.java +++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeepHashMap.java @@ -101,14 +101,14 @@ public class TestDictionaryMapDeepHashMap { .create(tempDb(db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMapHashMap(dict, 5)) .flatMapMany(map -> map - .at(null, key1).flatMap(v -> v.putValue(key2, value).doFinally(s -> v.release())) + .at(null, key1).flatMap(v -> v.putValue(key2, value).doAfterTerminate(v::release)) .thenMany(map .getAllValues(null) .map(Entry::getValue) .flatMap(maps -> Flux.fromIterable(maps.entrySet())) .map(Entry::getValue) ) - .doFinally(s -> map.release()) + .doAfterTerminate(map::release) ) )); if (shouldFail) {