This commit is contained in:
Andrea Cavalli 2021-05-12 19:02:51 +02:00
parent f4242218da
commit d29ac00c24
8 changed files with 253 additions and 157 deletions

View File

@ -37,7 +37,11 @@ public class LLRange {
} }
public static LLRange single(ByteBuf single) { public static LLRange single(ByteBuf single) {
return new LLRange(single, single); try {
return new LLRange(single.retain(), single.retain());
} finally {
single.release();
}
} }
public static LLRange of(ByteBuf min, ByteBuf max) { public static LLRange of(ByteBuf min, ByteBuf max) {

View File

@ -61,13 +61,15 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override @Override
public Mono<Map<T, U>> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { public Mono<Map<T, U>> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) {
return dictionary return Flux
.getRange(resolveSnapshot(snapshot), range.retain(), existsAlmostCertainly) .defer(() -> dictionary.getRange(resolveSnapshot(snapshot), range.retain(), existsAlmostCertainly))
.collectMap( .collectMap(
entry -> deserializeSuffix(stripPrefix(entry.getKey(), false)), entry -> deserializeSuffix(stripPrefix(entry.getKey(), false)),
entry -> deserialize(entry.getValue()), entry -> deserialize(entry.getValue()),
HashMap::new) HashMap::new)
.filter(map -> !map.isEmpty()); .filter(map -> !map.isEmpty())
.doFirst(() -> range.retain())
.doFinally(s -> range.release());
} }
@Override @Override
@ -84,7 +86,9 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.entry(this.toKey(serializeSuffix(entry.getKey())), serialize(entry.getValue())) .entry(this.toKey(serializeSuffix(entry.getKey())), serialize(entry.getValue()))
) )
) )
); )
.doFirst(() -> range.retain())
.doFinally(s -> range.release());
} }
@Override @Override
@ -95,19 +99,28 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override @Override
public Mono<Long> leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) { public Mono<Long> leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) {
return dictionary.sizeRange(resolveSnapshot(snapshot), range.retain(), fast); return Mono.defer(() -> dictionary.sizeRange(resolveSnapshot(snapshot), range.retain(), fast))
.doFirst(() -> range.retain())
.doFinally(s -> range.release());
} }
@Override @Override
public Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) { public Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) {
return dictionary.isRangeEmpty(resolveSnapshot(snapshot), range.retain()); return Mono.defer(() -> dictionary.isRangeEmpty(resolveSnapshot(snapshot), range.retain()))
.doFirst(() -> range.retain())
.doFinally(s -> range.release());
} }
@Override @Override
public Mono<DatabaseStageEntry<U>> at(@Nullable CompositeSnapshot snapshot, T keySuffix) { public Mono<DatabaseStageEntry<U>> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
return Mono return Mono
.fromSupplier(() -> new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), Serializer.noop())) .using(
.<DatabaseStageEntry<U>>map(entry -> new DatabaseSingleMapped<>(entry, valueSerializer)); () -> toKey(serializeSuffix(keySuffix)),
keyBuf -> Mono
.fromSupplier(() -> new DatabaseSingle<>(dictionary, keyBuf.retain(), Serializer.noop()))
.<DatabaseStageEntry<U>>map(entry -> new DatabaseSingleMapped<>(entry, valueSerializer)),
ReferenceCounted::release
);
} }
@Override @Override
@ -124,17 +137,24 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override @Override
public Mono<Void> putValue(T keySuffix, U value) { public Mono<Void> putValue(T keySuffix, U value) {
ByteBuf keySuffixBuf = serializeSuffix(keySuffix); return Mono
ByteBuf keyBuf = toKey(keySuffixBuf.retain()); .using(
ByteBuf valueBuf = serialize(value); () -> serializeSuffix(keySuffix),
return dictionary keySuffixBuf -> Mono
.put(keyBuf.retain(), valueBuf.retain(), LLDictionaryResultType.VOID) .using(
.doOnNext(ReferenceCounted::release) () -> toKey(keySuffixBuf.retain()),
.doFinally(s -> { keyBuf -> Mono
keyBuf.release(); .using(
keySuffixBuf.release(); () -> serialize(value),
valueBuf.release(); valueBuf -> dictionary
}) .put(keyBuf.retain(), valueBuf.retain(), LLDictionaryResultType.VOID)
.doOnNext(ReferenceCounted::release),
ReferenceCounted::release
),
ReferenceCounted::release
),
ReferenceCounted::release
)
.then(); .then();
} }
@ -200,34 +220,48 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override @Override
public Mono<U> putValueAndGetPrevious(T keySuffix, U value) { public Mono<U> putValueAndGetPrevious(T keySuffix, U value) {
ByteBuf keySuffixBuf = serializeSuffix(keySuffix); return Mono
ByteBuf keyBuf = toKey(keySuffixBuf.retain()); .using(
ByteBuf valueBuf = serialize(value); () -> serializeSuffix(keySuffix),
return dictionary keySuffixBuf -> Mono
.put(keyBuf.retain(), valueBuf.retain(), LLDictionaryResultType.PREVIOUS_VALUE) .using(
.map(this::deserialize) () -> toKey(keySuffixBuf.retain()),
.doFinally(s -> { keyBuf -> Mono
keyBuf.release(); .using(
keySuffixBuf.release(); () -> serialize(value),
valueBuf.release(); valueBuf -> dictionary
}); .put(keyBuf.retain(), valueBuf.retain(), LLDictionaryResultType.PREVIOUS_VALUE)
.map(this::deserialize),
ReferenceCounted::release
),
ReferenceCounted::release
),
ReferenceCounted::release
);
} }
@Override @Override
public Mono<Boolean> putValueAndGetChanged(T keySuffix, U value) { public Mono<Boolean> putValueAndGetChanged(T keySuffix, U value) {
ByteBuf keySuffixBuf = serializeSuffix(keySuffix); return Mono
ByteBuf keyBuf = toKey(keySuffixBuf.retain()); .using(
ByteBuf valueBuf = serialize(value); () -> serializeSuffix(keySuffix),
return dictionary keySuffixBuf -> Mono
.put(keyBuf.retain(), valueBuf.retain(), LLDictionaryResultType.PREVIOUS_VALUE) .using(
.map(this::deserialize) () -> toKey(keySuffixBuf.retain()),
.map(oldValue -> !Objects.equals(oldValue, value)) keyBuf -> Mono
.defaultIfEmpty(value != null) .using(
.doFinally(s -> { () -> serialize(value),
keyBuf.release(); valueBuf -> dictionary
keySuffixBuf.release(); .put(keyBuf.retain(), valueBuf.retain(), LLDictionaryResultType.PREVIOUS_VALUE)
valueBuf.release(); .map(this::deserialize)
}); .map(oldValue -> !Objects.equals(oldValue, value))
.defaultIfEmpty(value != null),
ReferenceCounted::release
),
ReferenceCounted::release
),
ReferenceCounted::release
);
} }
@Override @Override
@ -318,9 +352,8 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override @Override
public Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot) { public Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot) {
return dictionary return Flux.defer(() -> dictionary.getRangeKeys(resolveSnapshot(snapshot), range.retain()))
.getRangeKeys(resolveSnapshot(snapshot), range.retain()) .<Entry<T, DatabaseStageEntry<U>>>map(key -> {
.map(key -> {
ByteBuf keySuffixWithExt = stripPrefix(key, false); ByteBuf keySuffixWithExt = stripPrefix(key, false);
// Don't use "key" under this point --- // Don't use "key" under this point ---
try { try {
@ -333,13 +366,14 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
} finally { } finally {
keySuffixWithExt.release(); keySuffixWithExt.release();
} }
}); })
.doFirst(() -> range.retain())
.doFinally(s -> range.release());
} }
@Override @Override
public Flux<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot) { public Flux<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot) {
return dictionary return Flux.defer(() -> dictionary.getRange(resolveSnapshot(snapshot), range.retain()))
.getRange(resolveSnapshot(snapshot), range.retain())
.map(serializedEntry -> Map.entry( .map(serializedEntry -> Map.entry(
deserializeSuffix(stripPrefix(serializedEntry.getKey(), false)), deserializeSuffix(stripPrefix(serializedEntry.getKey(), false)),
valueSerializer.deserialize(serializedEntry.getValue()) valueSerializer.deserialize(serializedEntry.getValue())
@ -349,7 +383,9 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
var castedEntry = (Entry<ByteBuf, ByteBuf>) entry; var castedEntry = (Entry<ByteBuf, ByteBuf>) entry;
castedEntry.getKey().release(); castedEntry.getKey().release();
castedEntry.getValue().release(); castedEntry.getValue().release();
}); })
.doFirst(() -> range.retain())
.doFinally(s -> range.release());
} }
@Override @Override
@ -364,7 +400,9 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
Map.entry(toKey(serializeSuffix(entry.getKey())), serialize(entry.getValue())) Map.entry(toKey(serializeSuffix(entry.getKey())), serialize(entry.getValue()))
) )
) )
); )
.doFirst(() -> range.retain())
.doFinally(s -> range.release());
} }
@Override @Override
@ -373,13 +411,17 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
return dictionary return dictionary
.clear(); .clear();
} else if (range.isSingle()) { } else if (range.isSingle()) {
return dictionary return Mono
.remove(range.getSingle().retain(), LLDictionaryResultType.VOID) .defer(() -> dictionary.remove(range.getSingle().retain(), LLDictionaryResultType.VOID))
.doOnNext(ReferenceCounted::release) .doOnNext(ReferenceCounted::release)
.then(); .then()
.doFirst(() -> range.getSingle().retain())
.doFinally(s -> range.getSingle().release());
} else { } else {
return dictionary return Mono
.setRange(range.retain(), Flux.empty()); .defer(() -> dictionary.setRange(range.retain(), Flux.empty()))
.doFirst(() -> range.retain())
.doFinally(s -> range.release());
} }
} }

View File

@ -365,12 +365,17 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
@Override @Override
public Mono<Long> leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) { public Mono<Long> leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) {
return Mono.defer(() -> dictionary.sizeRange(resolveSnapshot(snapshot), range.retain(), fast)); return Mono
.defer(() -> dictionary.sizeRange(resolveSnapshot(snapshot), range.retain(), fast))
.doFirst(() -> range.retain())
.doFinally(s -> range.release());
} }
@Override @Override
public Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) { public Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) {
return Mono.defer(() -> dictionary.isRangeEmpty(resolveSnapshot(snapshot), range.retain())); return Mono.defer(() -> dictionary.isRangeEmpty(resolveSnapshot(snapshot), range.retain()))
.doFirst(() -> range.retain())
.doFinally(s -> range.release());
} }
@Override @Override

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.database.collections; package it.cavallium.dbengine.database.collections;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionary;
@ -42,61 +43,91 @@ public class DatabaseSingle<U> implements DatabaseStageEntry<U> {
@Override @Override
public Mono<U> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { public Mono<U> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) {
return dictionary.get(resolveSnapshot(snapshot), key.retain(), existsAlmostCertainly).map(this::deserialize); return Mono
.defer(() -> dictionary.get(resolveSnapshot(snapshot), key.retain(), existsAlmostCertainly))
.map(this::deserialize)
.doFirst(() -> key.retain())
.doFinally(s -> key.release());
} }
@Override @Override
public Mono<U> setAndGetPrevious(U value) { public Mono<U> setAndGetPrevious(U value) {
ByteBuf valueByteBuf = serialize(value); return Mono
return dictionary .using(
.put(key.retain(), valueByteBuf.retain(), LLDictionaryResultType.PREVIOUS_VALUE) () -> serialize(value),
.map(this::deserialize) valueByteBuf -> dictionary
.doFinally(s -> valueByteBuf.release()); .put(key.retain(), valueByteBuf.retain(), LLDictionaryResultType.PREVIOUS_VALUE)
.map(this::deserialize),
ReferenceCounted::release
)
.doFirst(() -> key.retain())
.doFinally(s -> key.release());
} }
@Override @Override
public Mono<U> update(Function<@Nullable U, @Nullable U> updater, public Mono<U> update(Function<@Nullable U, @Nullable U> updater,
UpdateReturnMode updateReturnMode, UpdateReturnMode updateReturnMode,
boolean existsAlmostCertainly) { boolean existsAlmostCertainly) {
return dictionary.update(key.retain(), (oldValueSer) -> { return Mono
var result = updater.apply(oldValueSer == null ? null : this.deserialize(oldValueSer)); .defer(() -> dictionary.update(key.retain(), (oldValueSer) -> {
if (result == null) { var result = updater.apply(oldValueSer == null ? null : this.deserialize(oldValueSer));
return null; if (result == null) {
} else { return null;
return this.serialize(result); } else {
} return this.serialize(result);
}, updateReturnMode, existsAlmostCertainly).map(this::deserialize); }
}, updateReturnMode, existsAlmostCertainly))
.map(this::deserialize)
.doFirst(() -> key.retain())
.doFinally(s -> key.release());
} }
@Override @Override
public Mono<Delta<U>> updateAndGetDelta(Function<@Nullable U, @Nullable U> updater, public Mono<Delta<U>> updateAndGetDelta(Function<@Nullable U, @Nullable U> updater,
boolean existsAlmostCertainly) { boolean existsAlmostCertainly) {
return dictionary.updateAndGetDelta(key.retain(), (oldValueSer) -> { return Mono
var result = updater.apply(oldValueSer == null ? null : this.deserialize(oldValueSer)); .defer(() -> dictionary.updateAndGetDelta(key.retain(), (oldValueSer) -> {
if (result == null) { var result = updater.apply(oldValueSer == null ? null : this.deserialize(oldValueSer));
return null; if (result == null) {
} else { return null;
return this.serialize(result); } else {
} return this.serialize(result);
}, existsAlmostCertainly).transform(mono -> LLUtils.mapDelta(mono, this::deserialize)); }
}, existsAlmostCertainly).transform(mono -> LLUtils.mapDelta(mono, this::deserialize)))
.doFirst(() -> key.retain())
.doFinally(s -> key.release());
} }
@Override @Override
public Mono<U> clearAndGetPrevious() { public Mono<U> clearAndGetPrevious() {
return dictionary.remove(key.retain(), LLDictionaryResultType.PREVIOUS_VALUE).map(this::deserialize); return Mono
.defer(() -> dictionary
.remove(key.retain(), LLDictionaryResultType.PREVIOUS_VALUE)
)
.map(this::deserialize)
.doFirst(() -> key.retain())
.doFinally(s -> key.release());
} }
@Override @Override
public Mono<Long> leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) { public Mono<Long> leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) {
return dictionary return Mono
.isRangeEmpty(resolveSnapshot(snapshot), LLRange.single(key.retain())) .defer(() -> dictionary
.map(empty -> empty ? 0L : 1L); .isRangeEmpty(resolveSnapshot(snapshot), LLRange.single(key.retain()))
)
.map(empty -> empty ? 0L : 1L)
.doFirst(() -> key.retain())
.doFinally(s -> key.release());
} }
@Override @Override
public Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) { public Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) {
return dictionary return Mono
.isRangeEmpty(resolveSnapshot(snapshot), LLRange.single(key.retain())); .defer(() -> dictionary
.isRangeEmpty(resolveSnapshot(snapshot), LLRange.single(key.retain()))
)
.doFirst(() -> key.retain())
.doFinally(s -> key.release());
} }
//todo: temporary wrapper. convert the whole class to buffers //todo: temporary wrapper. convert the whole class to buffers

View File

@ -34,30 +34,37 @@ public class SubStageGetterSet<T> implements SubStageGetter<Map<T, Nothing>, Dat
@Nullable CompositeSnapshot snapshot, @Nullable CompositeSnapshot snapshot,
ByteBuf prefixKey, ByteBuf prefixKey,
List<ByteBuf> debuggingKeys) { List<ByteBuf> debuggingKeys) {
return Mono try {
.defer(() -> { return Mono
if (assertsEnabled) { .defer(() -> {
return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys); if (assertsEnabled) {
} else { return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys)
return Mono .doFirst(() -> prefixKey.retain())
.fromCallable(() -> { .doFinally(s -> prefixKey.release());
for (ByteBuf key : debuggingKeys) { } else {
key.release(); return Mono
} .fromCallable(() -> {
return null; for (ByteBuf key : debuggingKeys) {
}); key.release();
} }
}) return null;
.then(Mono });
.fromSupplier(() -> DatabaseSetDictionary }
.tail( })
dictionary, .then(Mono
prefixKey.retain(), .fromSupplier(() -> DatabaseSetDictionary
keySerializer .tail(
) dictionary,
) prefixKey.retain(),
) keySerializer
.doFinally(s -> prefixKey.release()); )
)
)
.doFirst(() -> prefixKey.retain())
.doFinally(s -> prefixKey.release());
} finally {
prefixKey.release();
}
} }
@Override @Override

View File

@ -33,25 +33,30 @@ public class SubStageGetterSingle<T> implements SubStageGetter<T, DatabaseStageE
@Nullable CompositeSnapshot snapshot, @Nullable CompositeSnapshot snapshot,
ByteBuf keyPrefix, ByteBuf keyPrefix,
List<ByteBuf> debuggingKeys) { List<ByteBuf> debuggingKeys) {
return Mono try {
.fromCallable(() -> { return Mono
try { .fromCallable(() -> {
for (ByteBuf key : debuggingKeys) { try {
if (!LLUtils.equals(keyPrefix, key)) { for (ByteBuf key : debuggingKeys) {
throw new IndexOutOfBoundsException("Found more than one element!"); if (!LLUtils.equals(keyPrefix, key)) {
throw new IndexOutOfBoundsException("Found more than one element!");
}
}
return null;
} finally {
for (ByteBuf key : debuggingKeys) {
key.release();
} }
} }
return null; })
} finally { .then(Mono
for (ByteBuf key : debuggingKeys) { .<DatabaseStageEntry<T>>fromSupplier(() -> new DatabaseSingle<>(dictionary, keyPrefix.retain(), serializer))
key.release(); )
} .doFirst(() -> keyPrefix.retain())
} .doFinally(s -> keyPrefix.release());
}) } finally {
.then(Mono keyPrefix.release();
.<DatabaseStageEntry<T>>fromSupplier(() -> new DatabaseSingle<>(dictionary, keyPrefix.retain(), serializer)) }
)
.doFinally(s -> keyPrefix.release());
} }
@Override @Override

View File

@ -219,7 +219,7 @@ public class LLLocalDictionary implements LLDictionary {
}) })
.onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toString(key), cause)) .onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toString(key), cause))
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> key.retain()) .doFirst(() -> key.retain())
.doFinally(s -> key.release()); .doFinally(s -> key.release());
} finally { } finally {
key.release(); key.release();
@ -368,13 +368,15 @@ public class LLLocalDictionary implements LLDictionary {
return Mono return Mono
.defer(() -> { .defer(() -> {
if (range.isSingle()) { if (range.isSingle()) {
return containsKey(snapshot, range.getSingle().retain()); return this
.containsKey(snapshot, range.getSingle().retain());
} else { } else {
return containsRange(snapshot, range.retain()); return this
.containsRange(snapshot, range.retain());
} }
}) })
.map(isContained -> !isContained) .map(isContained -> !isContained)
.doOnSubscribe(s -> range.retain()) .doFirst(() -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally { } finally {
range.release(); range.release();
@ -423,7 +425,7 @@ public class LLLocalDictionary implements LLDictionary {
}) })
.onErrorMap(cause -> new IOException("Failed to read range " + range.toString(), cause)) .onErrorMap(cause -> new IOException("Failed to read range " + range.toString(), cause))
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> range.retain()) .doFirst(() -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally { } finally {
range.release(); range.release();
@ -464,7 +466,7 @@ public class LLLocalDictionary implements LLDictionary {
}) })
.onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toString(key), cause)) .onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toString(key), cause))
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> key.retain()) .doFirst(() -> key.retain())
.doFinally(s -> key.release()); .doFinally(s -> key.release());
} finally { } finally {
key.release(); key.release();
@ -504,7 +506,7 @@ public class LLLocalDictionary implements LLDictionary {
.onErrorMap(cause -> new IOException("Failed to write " + LLUtils.toString(key), cause)) .onErrorMap(cause -> new IOException("Failed to write " + LLUtils.toString(key), cause))
) )
.singleOrEmpty() .singleOrEmpty()
.doOnSubscribe(s -> { .doFirst(() -> {
key.retain(); key.retain();
value.retain(); value.retain();
}) })
@ -646,7 +648,7 @@ public class LLLocalDictionary implements LLDictionary {
}) })
.onErrorMap(cause -> new IOException("Failed to read or write " + (key.refCnt() > 0 ? LLUtils.toString(key) : "(released)"), cause)) .onErrorMap(cause -> new IOException("Failed to read or write " + (key.refCnt() > 0 ? LLUtils.toString(key) : "(released)"), cause))
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> key.retain()) .doFirst(() -> key.retain())
.doFinally(s -> key.release()); .doFinally(s -> key.release());
} finally { } finally {
key.release(); key.release();
@ -767,7 +769,7 @@ public class LLLocalDictionary implements LLDictionary {
}) })
.onErrorMap(cause -> new IOException("Failed to read or write " + (key.refCnt() > 0 ? LLUtils.toString(key) : "(released)"), cause)) .onErrorMap(cause -> new IOException("Failed to read or write " + (key.refCnt() > 0 ? LLUtils.toString(key) : "(released)"), cause))
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> key.retain()) .doFirst(() -> key.retain())
.doFinally(s -> key.release()); .doFinally(s -> key.release());
} finally { } finally {
key.release(); key.release();
@ -825,7 +827,7 @@ public class LLLocalDictionary implements LLDictionary {
.then(Mono.empty()) .then(Mono.empty())
) )
.singleOrEmpty() .singleOrEmpty()
.doOnSubscribe(s -> key.retain()) .doFirst(() -> key.retain())
.doFinally(s -> key.release()); .doFinally(s -> key.release());
} finally { } finally {
key.release(); key.release();
@ -890,7 +892,7 @@ public class LLLocalDictionary implements LLDictionary {
return Mono.error(new IllegalStateException("Unexpected value: " + resultType)); return Mono.error(new IllegalStateException("Unexpected value: " + resultType));
} }
}) })
.doOnSubscribe(s -> key.retain()) .doFirst(() -> key.retain())
.doFinally(s -> key.release()); .doFinally(s -> key.release());
} finally { } finally {
key.release(); key.release();
@ -1079,7 +1081,7 @@ public class LLLocalDictionary implements LLDictionary {
return getRangeMulti(snapshot, range.retain()); return getRangeMulti(snapshot, range.retain());
} }
}) })
.doOnSubscribe(s -> range.retain()) .doFirst(() -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally { } finally {
range.release(); range.release();
@ -1099,7 +1101,7 @@ public class LLLocalDictionary implements LLDictionary {
return getRangeMultiGrouped(snapshot, range.retain(), prefixLength); return getRangeMultiGrouped(snapshot, range.retain(), prefixLength);
} }
}) })
.doOnSubscribe(s -> range.retain()) .doFirst(() -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally { } finally {
range.release(); range.release();
@ -1112,7 +1114,7 @@ public class LLLocalDictionary implements LLDictionary {
.defer(() -> this.get(snapshot, key.retain(), existsAlmostCertainly)) .defer(() -> this.get(snapshot, key.retain(), existsAlmostCertainly))
.map(value -> Map.entry(key.retain(), value)) .map(value -> Map.entry(key.retain(), value))
.flux() .flux()
.doOnSubscribe(s -> key.retain()) .doFirst(() -> key.retain())
.doFinally(s -> key.release()); .doFinally(s -> key.release());
} finally { } finally {
key.release(); key.release();
@ -1134,7 +1136,7 @@ public class LLLocalDictionary implements LLDictionary {
castedEntry.getValue().release(); castedEntry.getValue().release();
}) })
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> range.retain()) .doFirst(() -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally { } finally {
range.release(); range.release();
@ -1157,7 +1159,7 @@ public class LLLocalDictionary implements LLDictionary {
LLLocalGroupedReactiveRocksIterator::release LLLocalGroupedReactiveRocksIterator::release
) )
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> range.retain()) .doFirst(() -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally { } finally {
range.release(); range.release();
@ -1175,7 +1177,7 @@ public class LLLocalDictionary implements LLDictionary {
return this.getRangeKeysMulti(snapshot, range.retain()); return this.getRangeKeysMulti(snapshot, range.retain());
} }
}) })
.doOnSubscribe(s -> range.retain()) .doFirst(() -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally { } finally {
range.release(); range.release();
@ -1199,7 +1201,7 @@ public class LLLocalDictionary implements LLDictionary {
LLLocalGroupedReactiveRocksIterator::release LLLocalGroupedReactiveRocksIterator::release
) )
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> range.retain()) .doFirst(() -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally { } finally {
range.release(); range.release();
@ -1224,7 +1226,7 @@ public class LLLocalDictionary implements LLDictionary {
LLLocalKeyPrefixReactiveRocksIterator::release LLLocalKeyPrefixReactiveRocksIterator::release
) )
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> range.retain()) .doFirst(() -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally { } finally {
range.release(); range.release();
@ -1244,7 +1246,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
}) })
.doOnDiscard(ByteBuf.class, ReferenceCounted::release) .doOnDiscard(ByteBuf.class, ReferenceCounted::release)
.doOnSubscribe(s -> key.retain()) .doFirst(() -> key.retain())
.doFinally(s -> key.release()); .doFinally(s -> key.release());
} finally { } finally {
key.release(); key.release();
@ -1261,7 +1263,7 @@ public class LLLocalDictionary implements LLDictionary {
) )
.doOnDiscard(ByteBuf.class, ReferenceCounted::release) .doOnDiscard(ByteBuf.class, ReferenceCounted::release)
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> range.retain()) .doFirst(() -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally { } finally {
range.release(); range.release();
@ -1385,7 +1387,7 @@ public class LLLocalDictionary implements LLDictionary {
) )
.then() .then()
.onErrorMap(cause -> new IOException("Failed to write range", cause)) .onErrorMap(cause -> new IOException("Failed to write range", cause))
.doOnSubscribe(s -> range.retain()) .doFirst(() -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} else { } else {
if (USE_WRITE_BATCHES_IN_SET_RANGE) { if (USE_WRITE_BATCHES_IN_SET_RANGE) {
@ -1413,7 +1415,7 @@ public class LLLocalDictionary implements LLDictionary {
.then(Mono.<Void>empty()) .then(Mono.<Void>empty())
) )
.onErrorMap(cause -> new IOException("Failed to write range", cause)) .onErrorMap(cause -> new IOException("Failed to write range", cause))
.doOnSubscribe(s -> range.retain()) .doFirst(() -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} }
} finally { } finally {
@ -1682,7 +1684,7 @@ public class LLLocalDictionary implements LLDictionary {
+ range.toString(), cause)) + range.toString(), cause))
.subscribeOn(dbScheduler); .subscribeOn(dbScheduler);
} }
return result.doOnSubscribe(s -> range.retain()).doFinally(s -> range.release()); return result.doFirst(() -> range.retain()).doFinally(s -> range.release());
} finally { } finally {
range.release(); range.release();
} }
@ -1736,7 +1738,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
}) })
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> range.retain()) .doFirst(() -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally { } finally {
range.release(); range.release();
@ -1783,7 +1785,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
}) })
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> range.retain()) .doFirst(() -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally { } finally {
range.release(); range.release();
@ -1932,7 +1934,7 @@ public class LLLocalDictionary implements LLDictionary {
}) })
.onErrorMap(cause -> new IOException("Failed to delete " + range.toString(), cause)) .onErrorMap(cause -> new IOException("Failed to delete " + range.toString(), cause))
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> range.retain()) .doFirst(() -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally { } finally {
range.release(); range.release();

View File

@ -224,9 +224,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
options.setAllowFAllocate(true); options.setAllowFAllocate(true);
options.setRateLimiter(new RateLimiter(10L * 1024L * 1024L)); // 10MiB/s max compaction write speed options.setRateLimiter(new RateLimiter(10L * 1024L * 1024L)); // 10MiB/s max compaction write speed
options.setDbPaths(List.of(new DbPath(databasesDirPath.resolve(path.getFileName() + "_hot"), options.setDbPaths(List.of(new DbPath(databasesDirPath.resolve(path.getFileName() + "_hot"),
50L * 1024L * 1024L * 1024L), // 50GiB 10L * 1024L * 1024L * 1024L), // 10GiB
new DbPath(databasesDirPath.resolve(path.getFileName() + "_cold"), new DbPath(databasesDirPath.resolve(path.getFileName() + "_cold"),
400L * 1024L * 1024L * 1024L), // 400GiB 100L * 1024L * 1024L * 1024L), // 100GiB
new DbPath(databasesDirPath.resolve(path.getFileName() + "_colder"), new DbPath(databasesDirPath.resolve(path.getFileName() + "_colder"),
600L * 1024L * 1024L * 1024L))); // 600GiB 600L * 1024L * 1024L * 1024L))); // 600GiB
// Direct I/O parameters. Removed because they use too much disk. // Direct I/O parameters. Removed because they use too much disk.