diff --git a/src/main/java/it/cavallium/dbengine/database/LLRange.java b/src/main/java/it/cavallium/dbengine/database/LLRange.java index 682ed64..be9b569 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLRange.java +++ b/src/main/java/it/cavallium/dbengine/database/LLRange.java @@ -37,7 +37,11 @@ public class LLRange { } public static LLRange single(ByteBuf single) { - return new LLRange(single, single); + try { + return new LLRange(single.retain(), single.retain()); + } finally { + single.release(); + } } public static LLRange of(ByteBuf min, ByteBuf max) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 2dbc7ec..5bf17fd 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -61,13 +61,15 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { - return dictionary - .getRange(resolveSnapshot(snapshot), range.retain(), existsAlmostCertainly) + return Flux + .defer(() -> dictionary.getRange(resolveSnapshot(snapshot), range.retain(), existsAlmostCertainly)) .collectMap( entry -> deserializeSuffix(stripPrefix(entry.getKey(), false)), entry -> deserialize(entry.getValue()), HashMap::new) - .filter(map -> !map.isEmpty()); + .filter(map -> !map.isEmpty()) + .doFirst(() -> range.retain()) + .doFinally(s -> range.release()); } @Override @@ -84,7 +86,9 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep range.retain()) + .doFinally(s -> range.release()); } @Override @@ -95,19 +99,28 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) { - return dictionary.sizeRange(resolveSnapshot(snapshot), range.retain(), fast); + return Mono.defer(() -> dictionary.sizeRange(resolveSnapshot(snapshot), range.retain(), fast)) + .doFirst(() -> range.retain()) + .doFinally(s -> range.release()); } @Override public Mono isEmpty(@Nullable CompositeSnapshot snapshot) { - return dictionary.isRangeEmpty(resolveSnapshot(snapshot), range.retain()); + return Mono.defer(() -> dictionary.isRangeEmpty(resolveSnapshot(snapshot), range.retain())) + .doFirst(() -> range.retain()) + .doFinally(s -> range.release()); } @Override public Mono> at(@Nullable CompositeSnapshot snapshot, T keySuffix) { return Mono - .fromSupplier(() -> new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), Serializer.noop())) - .>map(entry -> new DatabaseSingleMapped<>(entry, valueSerializer)); + .using( + () -> toKey(serializeSuffix(keySuffix)), + keyBuf -> Mono + .fromSupplier(() -> new DatabaseSingle<>(dictionary, keyBuf.retain(), Serializer.noop())) + .>map(entry -> new DatabaseSingleMapped<>(entry, valueSerializer)), + ReferenceCounted::release + ); } @Override @@ -124,17 +137,24 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep putValue(T keySuffix, U value) { - ByteBuf keySuffixBuf = serializeSuffix(keySuffix); - ByteBuf keyBuf = toKey(keySuffixBuf.retain()); - ByteBuf valueBuf = serialize(value); - return dictionary - .put(keyBuf.retain(), valueBuf.retain(), LLDictionaryResultType.VOID) - .doOnNext(ReferenceCounted::release) - .doFinally(s -> { - keyBuf.release(); - keySuffixBuf.release(); - valueBuf.release(); - }) + return Mono + .using( + () -> serializeSuffix(keySuffix), + keySuffixBuf -> Mono + .using( + () -> toKey(keySuffixBuf.retain()), + keyBuf -> Mono + .using( + () -> serialize(value), + valueBuf -> dictionary + .put(keyBuf.retain(), valueBuf.retain(), LLDictionaryResultType.VOID) + .doOnNext(ReferenceCounted::release), + ReferenceCounted::release + ), + ReferenceCounted::release + ), + ReferenceCounted::release + ) .then(); } @@ -200,34 +220,48 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep putValueAndGetPrevious(T keySuffix, U value) { - ByteBuf keySuffixBuf = serializeSuffix(keySuffix); - ByteBuf keyBuf = toKey(keySuffixBuf.retain()); - ByteBuf valueBuf = serialize(value); - return dictionary - .put(keyBuf.retain(), valueBuf.retain(), LLDictionaryResultType.PREVIOUS_VALUE) - .map(this::deserialize) - .doFinally(s -> { - keyBuf.release(); - keySuffixBuf.release(); - valueBuf.release(); - }); + return Mono + .using( + () -> serializeSuffix(keySuffix), + keySuffixBuf -> Mono + .using( + () -> toKey(keySuffixBuf.retain()), + keyBuf -> Mono + .using( + () -> serialize(value), + valueBuf -> dictionary + .put(keyBuf.retain(), valueBuf.retain(), LLDictionaryResultType.PREVIOUS_VALUE) + .map(this::deserialize), + ReferenceCounted::release + ), + ReferenceCounted::release + ), + ReferenceCounted::release + ); } @Override public Mono putValueAndGetChanged(T keySuffix, U value) { - ByteBuf keySuffixBuf = serializeSuffix(keySuffix); - ByteBuf keyBuf = toKey(keySuffixBuf.retain()); - ByteBuf valueBuf = serialize(value); - return dictionary - .put(keyBuf.retain(), valueBuf.retain(), LLDictionaryResultType.PREVIOUS_VALUE) - .map(this::deserialize) - .map(oldValue -> !Objects.equals(oldValue, value)) - .defaultIfEmpty(value != null) - .doFinally(s -> { - keyBuf.release(); - keySuffixBuf.release(); - valueBuf.release(); - }); + return Mono + .using( + () -> serializeSuffix(keySuffix), + keySuffixBuf -> Mono + .using( + () -> toKey(keySuffixBuf.retain()), + keyBuf -> Mono + .using( + () -> serialize(value), + valueBuf -> dictionary + .put(keyBuf.retain(), valueBuf.retain(), LLDictionaryResultType.PREVIOUS_VALUE) + .map(this::deserialize) + .map(oldValue -> !Objects.equals(oldValue, value)) + .defaultIfEmpty(value != null), + ReferenceCounted::release + ), + ReferenceCounted::release + ), + ReferenceCounted::release + ); } @Override @@ -318,9 +352,8 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>> getAllStages(@Nullable CompositeSnapshot snapshot) { - return dictionary - .getRangeKeys(resolveSnapshot(snapshot), range.retain()) - .map(key -> { + return Flux.defer(() -> dictionary.getRangeKeys(resolveSnapshot(snapshot), range.retain())) + .>>map(key -> { ByteBuf keySuffixWithExt = stripPrefix(key, false); // Don't use "key" under this point --- try { @@ -333,13 +366,14 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep range.retain()) + .doFinally(s -> range.release()); } @Override public Flux> getAllValues(@Nullable CompositeSnapshot snapshot) { - return dictionary - .getRange(resolveSnapshot(snapshot), range.retain()) + return Flux.defer(() -> dictionary.getRange(resolveSnapshot(snapshot), range.retain())) .map(serializedEntry -> Map.entry( deserializeSuffix(stripPrefix(serializedEntry.getKey(), false)), valueSerializer.deserialize(serializedEntry.getValue()) @@ -349,7 +383,9 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep) entry; castedEntry.getKey().release(); castedEntry.getValue().release(); - }); + }) + .doFirst(() -> range.retain()) + .doFinally(s -> range.release()); } @Override @@ -364,7 +400,9 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep range.retain()) + .doFinally(s -> range.release()); } @Override @@ -373,13 +411,17 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep dictionary.remove(range.getSingle().retain(), LLDictionaryResultType.VOID)) .doOnNext(ReferenceCounted::release) - .then(); + .then() + .doFirst(() -> range.getSingle().retain()) + .doFinally(s -> range.getSingle().release()); } else { - return dictionary - .setRange(range.retain(), Flux.empty()); + return Mono + .defer(() -> dictionary.setRange(range.retain(), Flux.empty())) + .doFirst(() -> range.retain()) + .doFinally(s -> range.release()); } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index 4461cb5..de7c1f4 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -365,12 +365,17 @@ public class DatabaseMapDictionaryDeep> implem @Override public Mono leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) { - return Mono.defer(() -> dictionary.sizeRange(resolveSnapshot(snapshot), range.retain(), fast)); + return Mono + .defer(() -> dictionary.sizeRange(resolveSnapshot(snapshot), range.retain(), fast)) + .doFirst(() -> range.retain()) + .doFinally(s -> range.release()); } @Override public Mono isEmpty(@Nullable CompositeSnapshot snapshot) { - return Mono.defer(() -> dictionary.isRangeEmpty(resolveSnapshot(snapshot), range.retain())); + return Mono.defer(() -> dictionary.isRangeEmpty(resolveSnapshot(snapshot), range.retain())) + .doFirst(() -> range.retain()) + .doFinally(s -> range.release()); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java index 1d54089..88d7f76 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java @@ -1,6 +1,7 @@ package it.cavallium.dbengine.database.collections; import io.netty.buffer.ByteBuf; +import io.netty.util.ReferenceCounted; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLDictionary; @@ -42,61 +43,91 @@ public class DatabaseSingle implements DatabaseStageEntry { @Override public Mono get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { - return dictionary.get(resolveSnapshot(snapshot), key.retain(), existsAlmostCertainly).map(this::deserialize); + return Mono + .defer(() -> dictionary.get(resolveSnapshot(snapshot), key.retain(), existsAlmostCertainly)) + .map(this::deserialize) + .doFirst(() -> key.retain()) + .doFinally(s -> key.release()); } @Override public Mono setAndGetPrevious(U value) { - ByteBuf valueByteBuf = serialize(value); - return dictionary - .put(key.retain(), valueByteBuf.retain(), LLDictionaryResultType.PREVIOUS_VALUE) - .map(this::deserialize) - .doFinally(s -> valueByteBuf.release()); + return Mono + .using( + () -> serialize(value), + valueByteBuf -> dictionary + .put(key.retain(), valueByteBuf.retain(), LLDictionaryResultType.PREVIOUS_VALUE) + .map(this::deserialize), + ReferenceCounted::release + ) + .doFirst(() -> key.retain()) + .doFinally(s -> key.release()); } @Override public Mono update(Function<@Nullable U, @Nullable U> updater, UpdateReturnMode updateReturnMode, boolean existsAlmostCertainly) { - return dictionary.update(key.retain(), (oldValueSer) -> { - var result = updater.apply(oldValueSer == null ? null : this.deserialize(oldValueSer)); - if (result == null) { - return null; - } else { - return this.serialize(result); - } - }, updateReturnMode, existsAlmostCertainly).map(this::deserialize); + return Mono + .defer(() -> dictionary.update(key.retain(), (oldValueSer) -> { + var result = updater.apply(oldValueSer == null ? null : this.deserialize(oldValueSer)); + if (result == null) { + return null; + } else { + return this.serialize(result); + } + }, updateReturnMode, existsAlmostCertainly)) + .map(this::deserialize) + .doFirst(() -> key.retain()) + .doFinally(s -> key.release()); } @Override public Mono> updateAndGetDelta(Function<@Nullable U, @Nullable U> updater, boolean existsAlmostCertainly) { - return dictionary.updateAndGetDelta(key.retain(), (oldValueSer) -> { - var result = updater.apply(oldValueSer == null ? null : this.deserialize(oldValueSer)); - if (result == null) { - return null; - } else { - return this.serialize(result); - } - }, existsAlmostCertainly).transform(mono -> LLUtils.mapDelta(mono, this::deserialize)); + return Mono + .defer(() -> dictionary.updateAndGetDelta(key.retain(), (oldValueSer) -> { + var result = updater.apply(oldValueSer == null ? null : this.deserialize(oldValueSer)); + if (result == null) { + return null; + } else { + return this.serialize(result); + } + }, existsAlmostCertainly).transform(mono -> LLUtils.mapDelta(mono, this::deserialize))) + .doFirst(() -> key.retain()) + .doFinally(s -> key.release()); } @Override public Mono clearAndGetPrevious() { - return dictionary.remove(key.retain(), LLDictionaryResultType.PREVIOUS_VALUE).map(this::deserialize); + return Mono + .defer(() -> dictionary + .remove(key.retain(), LLDictionaryResultType.PREVIOUS_VALUE) + ) + .map(this::deserialize) + .doFirst(() -> key.retain()) + .doFinally(s -> key.release()); } @Override public Mono leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) { - return dictionary - .isRangeEmpty(resolveSnapshot(snapshot), LLRange.single(key.retain())) - .map(empty -> empty ? 0L : 1L); + return Mono + .defer(() -> dictionary + .isRangeEmpty(resolveSnapshot(snapshot), LLRange.single(key.retain())) + ) + .map(empty -> empty ? 0L : 1L) + .doFirst(() -> key.retain()) + .doFinally(s -> key.release()); } @Override public Mono isEmpty(@Nullable CompositeSnapshot snapshot) { - return dictionary - .isRangeEmpty(resolveSnapshot(snapshot), LLRange.single(key.retain())); + return Mono + .defer(() -> dictionary + .isRangeEmpty(resolveSnapshot(snapshot), LLRange.single(key.retain())) + ) + .doFirst(() -> key.retain()) + .doFinally(s -> key.release()); } //todo: temporary wrapper. convert the whole class to buffers diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java index b1eb87e..15a252e 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java @@ -34,30 +34,37 @@ public class SubStageGetterSet implements SubStageGetter, Dat @Nullable CompositeSnapshot snapshot, ByteBuf prefixKey, List debuggingKeys) { - return Mono - .defer(() -> { - if (assertsEnabled) { - return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys); - } else { - return Mono - .fromCallable(() -> { - for (ByteBuf key : debuggingKeys) { - key.release(); - } - return null; - }); - } - }) - .then(Mono - .fromSupplier(() -> DatabaseSetDictionary - .tail( - dictionary, - prefixKey.retain(), - keySerializer - ) - ) - ) - .doFinally(s -> prefixKey.release()); + try { + return Mono + .defer(() -> { + if (assertsEnabled) { + return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys) + .doFirst(() -> prefixKey.retain()) + .doFinally(s -> prefixKey.release()); + } else { + return Mono + .fromCallable(() -> { + for (ByteBuf key : debuggingKeys) { + key.release(); + } + return null; + }); + } + }) + .then(Mono + .fromSupplier(() -> DatabaseSetDictionary + .tail( + dictionary, + prefixKey.retain(), + keySerializer + ) + ) + ) + .doFirst(() -> prefixKey.retain()) + .doFinally(s -> prefixKey.release()); + } finally { + prefixKey.release(); + } } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java index 33b81c2..f30c00b 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java @@ -33,25 +33,30 @@ public class SubStageGetterSingle implements SubStageGetter debuggingKeys) { - return Mono - .fromCallable(() -> { - try { - for (ByteBuf key : debuggingKeys) { - if (!LLUtils.equals(keyPrefix, key)) { - throw new IndexOutOfBoundsException("Found more than one element!"); + try { + return Mono + .fromCallable(() -> { + try { + for (ByteBuf key : debuggingKeys) { + if (!LLUtils.equals(keyPrefix, key)) { + throw new IndexOutOfBoundsException("Found more than one element!"); + } + } + return null; + } finally { + for (ByteBuf key : debuggingKeys) { + key.release(); } } - return null; - } finally { - for (ByteBuf key : debuggingKeys) { - key.release(); - } - } - }) - .then(Mono - .>fromSupplier(() -> new DatabaseSingle<>(dictionary, keyPrefix.retain(), serializer)) - ) - .doFinally(s -> keyPrefix.release()); + }) + .then(Mono + .>fromSupplier(() -> new DatabaseSingle<>(dictionary, keyPrefix.retain(), serializer)) + ) + .doFirst(() -> keyPrefix.retain()) + .doFinally(s -> keyPrefix.release()); + } finally { + keyPrefix.release(); + } } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 15e1530..6b342de 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -219,7 +219,7 @@ public class LLLocalDictionary implements LLDictionary { }) .onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toString(key), cause)) .subscribeOn(dbScheduler) - .doOnSubscribe(s -> key.retain()) + .doFirst(() -> key.retain()) .doFinally(s -> key.release()); } finally { key.release(); @@ -368,13 +368,15 @@ public class LLLocalDictionary implements LLDictionary { return Mono .defer(() -> { if (range.isSingle()) { - return containsKey(snapshot, range.getSingle().retain()); + return this + .containsKey(snapshot, range.getSingle().retain()); } else { - return containsRange(snapshot, range.retain()); + return this + .containsRange(snapshot, range.retain()); } }) .map(isContained -> !isContained) - .doOnSubscribe(s -> range.retain()) + .doFirst(() -> range.retain()) .doFinally(s -> range.release()); } finally { range.release(); @@ -423,7 +425,7 @@ public class LLLocalDictionary implements LLDictionary { }) .onErrorMap(cause -> new IOException("Failed to read range " + range.toString(), cause)) .subscribeOn(dbScheduler) - .doOnSubscribe(s -> range.retain()) + .doFirst(() -> range.retain()) .doFinally(s -> range.release()); } finally { range.release(); @@ -464,7 +466,7 @@ public class LLLocalDictionary implements LLDictionary { }) .onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toString(key), cause)) .subscribeOn(dbScheduler) - .doOnSubscribe(s -> key.retain()) + .doFirst(() -> key.retain()) .doFinally(s -> key.release()); } finally { key.release(); @@ -504,7 +506,7 @@ public class LLLocalDictionary implements LLDictionary { .onErrorMap(cause -> new IOException("Failed to write " + LLUtils.toString(key), cause)) ) .singleOrEmpty() - .doOnSubscribe(s -> { + .doFirst(() -> { key.retain(); value.retain(); }) @@ -646,7 +648,7 @@ public class LLLocalDictionary implements LLDictionary { }) .onErrorMap(cause -> new IOException("Failed to read or write " + (key.refCnt() > 0 ? LLUtils.toString(key) : "(released)"), cause)) .subscribeOn(dbScheduler) - .doOnSubscribe(s -> key.retain()) + .doFirst(() -> key.retain()) .doFinally(s -> key.release()); } finally { key.release(); @@ -767,7 +769,7 @@ public class LLLocalDictionary implements LLDictionary { }) .onErrorMap(cause -> new IOException("Failed to read or write " + (key.refCnt() > 0 ? LLUtils.toString(key) : "(released)"), cause)) .subscribeOn(dbScheduler) - .doOnSubscribe(s -> key.retain()) + .doFirst(() -> key.retain()) .doFinally(s -> key.release()); } finally { key.release(); @@ -825,7 +827,7 @@ public class LLLocalDictionary implements LLDictionary { .then(Mono.empty()) ) .singleOrEmpty() - .doOnSubscribe(s -> key.retain()) + .doFirst(() -> key.retain()) .doFinally(s -> key.release()); } finally { key.release(); @@ -890,7 +892,7 @@ public class LLLocalDictionary implements LLDictionary { return Mono.error(new IllegalStateException("Unexpected value: " + resultType)); } }) - .doOnSubscribe(s -> key.retain()) + .doFirst(() -> key.retain()) .doFinally(s -> key.release()); } finally { key.release(); @@ -1079,7 +1081,7 @@ public class LLLocalDictionary implements LLDictionary { return getRangeMulti(snapshot, range.retain()); } }) - .doOnSubscribe(s -> range.retain()) + .doFirst(() -> range.retain()) .doFinally(s -> range.release()); } finally { range.release(); @@ -1099,7 +1101,7 @@ public class LLLocalDictionary implements LLDictionary { return getRangeMultiGrouped(snapshot, range.retain(), prefixLength); } }) - .doOnSubscribe(s -> range.retain()) + .doFirst(() -> range.retain()) .doFinally(s -> range.release()); } finally { range.release(); @@ -1112,7 +1114,7 @@ public class LLLocalDictionary implements LLDictionary { .defer(() -> this.get(snapshot, key.retain(), existsAlmostCertainly)) .map(value -> Map.entry(key.retain(), value)) .flux() - .doOnSubscribe(s -> key.retain()) + .doFirst(() -> key.retain()) .doFinally(s -> key.release()); } finally { key.release(); @@ -1134,7 +1136,7 @@ public class LLLocalDictionary implements LLDictionary { castedEntry.getValue().release(); }) .subscribeOn(dbScheduler) - .doOnSubscribe(s -> range.retain()) + .doFirst(() -> range.retain()) .doFinally(s -> range.release()); } finally { range.release(); @@ -1157,7 +1159,7 @@ public class LLLocalDictionary implements LLDictionary { LLLocalGroupedReactiveRocksIterator::release ) .subscribeOn(dbScheduler) - .doOnSubscribe(s -> range.retain()) + .doFirst(() -> range.retain()) .doFinally(s -> range.release()); } finally { range.release(); @@ -1175,7 +1177,7 @@ public class LLLocalDictionary implements LLDictionary { return this.getRangeKeysMulti(snapshot, range.retain()); } }) - .doOnSubscribe(s -> range.retain()) + .doFirst(() -> range.retain()) .doFinally(s -> range.release()); } finally { range.release(); @@ -1199,7 +1201,7 @@ public class LLLocalDictionary implements LLDictionary { LLLocalGroupedReactiveRocksIterator::release ) .subscribeOn(dbScheduler) - .doOnSubscribe(s -> range.retain()) + .doFirst(() -> range.retain()) .doFinally(s -> range.release()); } finally { range.release(); @@ -1224,7 +1226,7 @@ public class LLLocalDictionary implements LLDictionary { LLLocalKeyPrefixReactiveRocksIterator::release ) .subscribeOn(dbScheduler) - .doOnSubscribe(s -> range.retain()) + .doFirst(() -> range.retain()) .doFinally(s -> range.release()); } finally { range.release(); @@ -1244,7 +1246,7 @@ public class LLLocalDictionary implements LLDictionary { } }) .doOnDiscard(ByteBuf.class, ReferenceCounted::release) - .doOnSubscribe(s -> key.retain()) + .doFirst(() -> key.retain()) .doFinally(s -> key.release()); } finally { key.release(); @@ -1261,7 +1263,7 @@ public class LLLocalDictionary implements LLDictionary { ) .doOnDiscard(ByteBuf.class, ReferenceCounted::release) .subscribeOn(dbScheduler) - .doOnSubscribe(s -> range.retain()) + .doFirst(() -> range.retain()) .doFinally(s -> range.release()); } finally { range.release(); @@ -1385,7 +1387,7 @@ public class LLLocalDictionary implements LLDictionary { ) .then() .onErrorMap(cause -> new IOException("Failed to write range", cause)) - .doOnSubscribe(s -> range.retain()) + .doFirst(() -> range.retain()) .doFinally(s -> range.release()); } else { if (USE_WRITE_BATCHES_IN_SET_RANGE) { @@ -1413,7 +1415,7 @@ public class LLLocalDictionary implements LLDictionary { .then(Mono.empty()) ) .onErrorMap(cause -> new IOException("Failed to write range", cause)) - .doOnSubscribe(s -> range.retain()) + .doFirst(() -> range.retain()) .doFinally(s -> range.release()); } } finally { @@ -1682,7 +1684,7 @@ public class LLLocalDictionary implements LLDictionary { + range.toString(), cause)) .subscribeOn(dbScheduler); } - return result.doOnSubscribe(s -> range.retain()).doFinally(s -> range.release()); + return result.doFirst(() -> range.retain()).doFinally(s -> range.release()); } finally { range.release(); } @@ -1736,7 +1738,7 @@ public class LLLocalDictionary implements LLDictionary { } }) .subscribeOn(dbScheduler) - .doOnSubscribe(s -> range.retain()) + .doFirst(() -> range.retain()) .doFinally(s -> range.release()); } finally { range.release(); @@ -1783,7 +1785,7 @@ public class LLLocalDictionary implements LLDictionary { } }) .subscribeOn(dbScheduler) - .doOnSubscribe(s -> range.retain()) + .doFirst(() -> range.retain()) .doFinally(s -> range.release()); } finally { range.release(); @@ -1932,7 +1934,7 @@ public class LLLocalDictionary implements LLDictionary { }) .onErrorMap(cause -> new IOException("Failed to delete " + range.toString(), cause)) .subscribeOn(dbScheduler) - .doOnSubscribe(s -> range.retain()) + .doFirst(() -> range.retain()) .doFinally(s -> range.release()); } finally { range.release(); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index bf25d67..c3a07b2 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -224,9 +224,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { options.setAllowFAllocate(true); options.setRateLimiter(new RateLimiter(10L * 1024L * 1024L)); // 10MiB/s max compaction write speed options.setDbPaths(List.of(new DbPath(databasesDirPath.resolve(path.getFileName() + "_hot"), - 50L * 1024L * 1024L * 1024L), // 50GiB + 10L * 1024L * 1024L * 1024L), // 10GiB new DbPath(databasesDirPath.resolve(path.getFileName() + "_cold"), - 400L * 1024L * 1024L * 1024L), // 400GiB + 100L * 1024L * 1024L * 1024L), // 100GiB new DbPath(databasesDirPath.resolve(path.getFileName() + "_colder"), 600L * 1024L * 1024L * 1024L))); // 600GiB // Direct I/O parameters. Removed because they use too much disk.