Use a new approach to completely avoid memory leaks

This commit is contained in:
Andrea Cavalli 2021-08-16 10:27:47 +02:00
parent 9d326f5a8b
commit 435e7d4886
2 changed files with 983 additions and 1099 deletions

View File

@ -23,17 +23,17 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
ByteBufAllocator getAllocator();
Mono<ByteBuf> get(@Nullable LLSnapshot snapshot, ByteBuf key, boolean existsAlmostCertainly);
Mono<ByteBuf> get(@Nullable LLSnapshot snapshot, Mono<ByteBuf> key, boolean existsAlmostCertainly);
default Mono<ByteBuf> get(@Nullable LLSnapshot snapshot, ByteBuf key) {
default Mono<ByteBuf> get(@Nullable LLSnapshot snapshot, Mono<ByteBuf> key) {
return get(snapshot, key, false);
}
Mono<ByteBuf> put(ByteBuf key, ByteBuf value, LLDictionaryResultType resultType);
Mono<ByteBuf> put(Mono<ByteBuf> key, Mono<ByteBuf> value, LLDictionaryResultType resultType);
Mono<UpdateMode> getUpdateMode();
default Mono<ByteBuf> update(ByteBuf key,
default Mono<ByteBuf> update(Mono<ByteBuf> key,
Function<@Nullable ByteBuf, @Nullable ByteBuf> updater,
UpdateReturnMode updateReturnMode,
boolean existsAlmostCertainly) {
@ -42,24 +42,24 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
.transform(prev -> LLUtils.resolveDelta(prev, updateReturnMode));
}
default Mono<ByteBuf> update(ByteBuf key,
default Mono<ByteBuf> update(Mono<ByteBuf> key,
Function<@Nullable ByteBuf, @Nullable ByteBuf> updater,
UpdateReturnMode returnMode) {
return update(key, updater, returnMode, false);
}
Mono<Delta<ByteBuf>> updateAndGetDelta(ByteBuf key,
Mono<Delta<ByteBuf>> updateAndGetDelta(Mono<ByteBuf> key,
Function<@Nullable ByteBuf, @Nullable ByteBuf> updater,
boolean existsAlmostCertainly);
default Mono<Delta<ByteBuf>> updateAndGetDelta(ByteBuf key,
default Mono<Delta<ByteBuf>> updateAndGetDelta(Mono<ByteBuf> key,
Function<@Nullable ByteBuf, @Nullable ByteBuf> updater) {
return updateAndGetDelta(key, updater, false);
}
Mono<Void> clear();
Mono<ByteBuf> remove(ByteBuf key, LLDictionaryResultType resultType);
Mono<ByteBuf> remove(Mono<ByteBuf> key, LLDictionaryResultType resultType);
<K> Flux<Tuple3<K, ByteBuf, Optional<ByteBuf>>> getMulti(@Nullable LLSnapshot snapshot,
Flux<Tuple2<K, ByteBuf>> keys,
@ -74,34 +74,34 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
<X> Flux<ExtraKeyOperationResult<ByteBuf, X>> updateMulti(Flux<Tuple2<ByteBuf, X>> entries,
BiFunction<ByteBuf, X, ByteBuf> updateFunction);
Flux<Entry<ByteBuf, ByteBuf>> getRange(@Nullable LLSnapshot snapshot, LLRange range, boolean existsAlmostCertainly);
Flux<Entry<ByteBuf, ByteBuf>> getRange(@Nullable LLSnapshot snapshot, Mono<LLRange> range, boolean existsAlmostCertainly);
default Flux<Entry<ByteBuf, ByteBuf>> getRange(@Nullable LLSnapshot snapshot, LLRange range) {
default Flux<Entry<ByteBuf, ByteBuf>> getRange(@Nullable LLSnapshot snapshot, Mono<LLRange> range) {
return getRange(snapshot, range, false);
}
Flux<List<Entry<ByteBuf, ByteBuf>>> getRangeGrouped(@Nullable LLSnapshot snapshot,
LLRange range,
Mono<LLRange> range,
int prefixLength,
boolean existsAlmostCertainly);
default Flux<List<Entry<ByteBuf, ByteBuf>>> getRangeGrouped(@Nullable LLSnapshot snapshot,
LLRange range,
Mono<LLRange> range,
int prefixLength) {
return getRangeGrouped(snapshot, range, prefixLength, false);
}
Flux<ByteBuf> getRangeKeys(@Nullable LLSnapshot snapshot, LLRange range);
Flux<ByteBuf> getRangeKeys(@Nullable LLSnapshot snapshot, Mono<LLRange> range);
Flux<List<ByteBuf>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength);
Flux<List<ByteBuf>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, Mono<LLRange> range, int prefixLength);
Flux<ByteBuf> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength);
Flux<ByteBuf> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono<LLRange> range, int prefixLength);
Flux<BadBlock> badBlocks(LLRange range);
Flux<BadBlock> badBlocks(Mono<LLRange> range);
Mono<Void> setRange(LLRange range, Flux<Entry<ByteBuf, ByteBuf>> entries);
Mono<Void> setRange(Mono<LLRange> range, Flux<Entry<ByteBuf, ByteBuf>> entries);
default Mono<Void> replaceRange(LLRange range,
default Mono<Void> replaceRange(Mono<LLRange> range,
boolean canKeysChange,
Function<Entry<ByteBuf, ByteBuf>, Mono<Entry<ByteBuf, ByteBuf>>> entriesReplacer,
boolean existsAlmostCertainly) {
@ -122,19 +122,19 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
});
}
default Mono<Void> replaceRange(LLRange range,
default Mono<Void> replaceRange(Mono<LLRange> range,
boolean canKeysChange,
Function<Entry<ByteBuf, ByteBuf>, Mono<Entry<ByteBuf, ByteBuf>>> entriesReplacer) {
return replaceRange(range, canKeysChange, entriesReplacer, false);
}
Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, LLRange range);
Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, Mono<LLRange> range);
Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, LLRange range, boolean fast);
Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, Mono<LLRange> range, boolean fast);
Mono<Entry<ByteBuf, ByteBuf>> getOne(@Nullable LLSnapshot snapshot, LLRange range);
Mono<Entry<ByteBuf, ByteBuf>> getOne(@Nullable LLSnapshot snapshot, Mono<LLRange> range);
Mono<ByteBuf> getOneKey(@Nullable LLSnapshot snapshot, LLRange range);
Mono<ByteBuf> getOneKey(@Nullable LLSnapshot snapshot, Mono<LLRange> range);
Mono<Entry<ByteBuf, ByteBuf>> removeOne(LLRange range);
Mono<Entry<ByteBuf, ByteBuf>> removeOne(Mono<LLRange> range);
}

View File

@ -1,14 +1,15 @@
package it.cavallium.dbengine.database.disk;
import static io.netty.buffer.Unpooled.wrappedBuffer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.ExtraKeyOperationResult;
import it.cavallium.dbengine.database.KeyOperationResult;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLRange;
@ -17,16 +18,13 @@ import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.RepeatedElementList;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -68,7 +66,6 @@ import reactor.core.scheduler.Scheduler;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
import static io.netty.buffer.Unpooled.*;
@NotAtomic
public class LLLocalDictionary implements LLDictionary {
@ -237,11 +234,14 @@ public class LLLocalDictionary implements LLDictionary {
return alloc;
}
private <T> Mono<T> runOnDb(Callable<@Nullable T> callable) {
return Mono.fromCallable(callable).subscribeOn(dbScheduler);
}
@Override
public Mono<ByteBuf> get(@Nullable LLSnapshot snapshot, ByteBuf key, boolean existsAlmostCertainly) {
try {
return Mono
.fromCallable(() -> {
public Mono<ByteBuf> get(@Nullable LLSnapshot snapshot, Mono<ByteBuf> keyMono, boolean existsAlmostCertainly) {
return Mono.usingWhen(keyMono,
key -> runOnDb(() -> {
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
@ -262,14 +262,9 @@ public class LLLocalDictionary implements LLDictionary {
lock.unlockRead(stamp);
}
}
})
.subscribeOn(dbScheduler)
.onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause))
.doFirst(key::retain)
.doAfterTerminate(key::release);
} finally {
key.release();
}
}).onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause)),
key -> Mono.fromRunnable(key::release)
);
}
private ByteBuf dbGet(ColumnFamilyHandle cfh,
@ -287,9 +282,7 @@ public class LLLocalDictionary implements LLDictionary {
throw new RocksDBException("Key buffer must be direct");
}
ByteBuffer keyNioBuffer = LLUtils.toDirect(key);
if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) {
assert keyNioBuffer.isDirect();
}
assert !databaseOptions.enableDbAssertionsWhenUsingAssertions() || keyNioBuffer.isDirect();
// Create a direct result buffer because RocksDB works only with direct buffers
ByteBuf resultBuf = alloc.directBuffer(LLLocalDictionary.INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES);
try {
@ -388,6 +381,7 @@ public class LLLocalDictionary implements LLDictionary {
}
}
@SuppressWarnings("SameParameterValue")
private void dbPut(ColumnFamilyHandle cfh, @Nullable WriteOptions writeOptions, ByteBuf key, ByteBuf value)
throws RocksDBException {
try {
@ -399,15 +393,11 @@ public class LLLocalDictionary implements LLDictionary {
throw new RocksDBException("Value buffer must be direct");
}
var keyNioBuffer = LLUtils.toDirect(key);
if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) {
assert keyNioBuffer.isDirect();
}
assert !databaseOptions.enableDbAssertionsWhenUsingAssertions() || keyNioBuffer.isDirect();
var valueNioBuffer = LLUtils.toDirect(value);
if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) {
assert valueNioBuffer.isDirect();
}
assert !databaseOptions.enableDbAssertionsWhenUsingAssertions() || valueNioBuffer.isDirect();
db.put(cfh, Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS), keyNioBuffer, valueNioBuffer);
} else {
db.put(cfh, Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS), LLUtils.toArray(key), LLUtils.toArray(value));
@ -419,28 +409,22 @@ public class LLLocalDictionary implements LLDictionary {
}
@Override
public Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, LLRange range) {
try {
return Mono
.defer(() -> {
public Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono) {
return Mono.usingWhen(rangeMono,
range -> {
if (range.isSingle()) {
return this.containsKey(snapshot, range.getSingle().retain());
return this.containsKey(snapshot, Mono.just(range.getSingle()).map(ByteBuf::retain));
} else {
return this.containsRange(snapshot, range.retain());
}
})
.map(isContained -> !isContained)
.doFirst(range::retain)
.doAfterTerminate(range::release);
} finally {
range.release();
return this.containsRange(snapshot, Mono.just(range).map(LLRange::retain));
}
},
range -> Mono.fromRunnable(range::release)
).map(isContained -> !isContained);
}
public Mono<Boolean> containsRange(@Nullable LLSnapshot snapshot, LLRange range) {
try {
return Mono
.fromCallable(() -> {
public Mono<Boolean> containsRange(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono) {
return Mono.usingWhen(rangeMono,
range -> runOnDb(() -> {
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
readOpts.setFillCache(false);
@ -478,20 +462,14 @@ public class LLLocalDictionary implements LLDictionary {
return rocksIterator.isValid();
}
}
})
.onErrorMap(cause -> new IOException("Failed to read range " + range.toString(), cause))
.subscribeOn(dbScheduler)
.doFirst(range::retain)
.doAfterTerminate(range::release);
} finally {
range.release();
}
}).onErrorMap(cause -> new IOException("Failed to read range " + range.toString(), cause)),
range -> Mono.fromRunnable(range::release));
}
private Mono<Boolean> containsKey(@Nullable LLSnapshot snapshot, ByteBuf key) {
try {
return Mono
.fromCallable(() -> {
private Mono<Boolean> containsKey(@Nullable LLSnapshot snapshot, Mono<ByteBuf> keyMono) {
return Mono.usingWhen(keyMono,
key -> runOnDb(() -> {
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
@ -520,23 +498,18 @@ public class LLLocalDictionary implements LLDictionary {
lock.unlockRead(stamp);
}
}
})
.onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause))
.subscribeOn(dbScheduler)
.doFirst(key::retain)
.doAfterTerminate(key::release);
} finally {
key.release();
}
}).onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause)),
key -> Mono.fromRunnable(key::release)
);
}
@Override
public Mono<ByteBuf> put(ByteBuf key, ByteBuf value, LLDictionaryResultType resultType) {
try {
return Mono
.defer(() -> getPreviousData(key.retain(), resultType))
.concatWith(Mono
.<ByteBuf>fromCallable(() -> {
public Mono<ByteBuf> put(Mono<ByteBuf> keyMono, Mono<ByteBuf> valueMono, LLDictionaryResultType resultType) {
return Mono.usingWhen(keyMono,
key -> this
.getPreviousData(Mono.just(key).map(ByteBuf::retain), resultType)
.concatWith(Mono.usingWhen(valueMono,
value -> this.<ByteBuf>runOnDb(() -> {
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
@ -558,23 +531,12 @@ public class LLLocalDictionary implements LLDictionary {
lock.unlockWrite(stamp);
}
}
})
.subscribeOn(dbScheduler)
.onErrorMap(cause -> new IOException("Failed to write " + LLUtils.toStringSafe(key), cause))
)
.singleOrEmpty()
.doFirst(() -> {
key.retain();
value.retain();
})
.doAfterTerminate(() -> {
key.release();
value.release();
});
} finally {
key.release();
value.release();
}
}),
value -> Mono.fromRunnable(value::release)
).onErrorMap(cause -> new IOException("Failed to write " + LLUtils.toStringSafe(key), cause)))
.singleOrEmpty(),
key -> Mono.fromRunnable(key::release)
);
}
@Override
@ -585,13 +547,12 @@ public class LLLocalDictionary implements LLDictionary {
// Remember to change also updateAndGetDelta() if you are modifying this function
@SuppressWarnings("DuplicatedCode")
@Override
public Mono<ByteBuf> update(ByteBuf key,
public Mono<ByteBuf> update(Mono<ByteBuf> keyMono,
Function<@Nullable ByteBuf, @Nullable ByteBuf> updater,
UpdateReturnMode updateReturnMode,
boolean existsAlmostCertainly) {
try {
return Mono
.fromCallable(() -> {
return Mono.usingWhen(keyMono,
key -> runOnDb(() -> {
if (updateMode == UpdateMode.DISALLOW) {
throw new UnsupportedOperationException("update() is disallowed");
}
@ -704,25 +665,19 @@ public class LLLocalDictionary implements LLDictionary {
lock.unlock(stamp);
}
}
})
.onErrorMap(cause -> new IOException("Failed to read or write " + LLUtils.toStringSafe(key), cause))
.subscribeOn(dbScheduler)
.doFirst(key::retain)
.doAfterTerminate(key::release);
} finally {
key.release();
}
}).onErrorMap(cause -> new IOException("Failed to read or write " + LLUtils.toStringSafe(key), cause)),
key -> Mono.fromRunnable(key::release)
);
}
// Remember to change also update() if you are modifying this function
@SuppressWarnings("DuplicatedCode")
@Override
public Mono<Delta<ByteBuf>> updateAndGetDelta(ByteBuf key,
public Mono<Delta<ByteBuf>> updateAndGetDelta(Mono<ByteBuf> keyMono,
Function<@Nullable ByteBuf, @Nullable ByteBuf> updater,
boolean existsAlmostCertainly) {
try {
return Mono
.fromCallable(() -> {
return Mono.usingWhen(keyMono,
key -> this.runOnDb(() -> {
if (updateMode == UpdateMode.DISALLOW) throw new UnsupportedOperationException("update() is disallowed");
StampedLock lock;
long stamp;
@ -760,11 +715,10 @@ public class LLLocalDictionary implements LLDictionary {
ByteBuf prevDataToSendToUpdater = prevData == null ? null : prevData.retainedSlice();
try {
newData = updater.apply(prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.retain());
if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) {
assert prevDataToSendToUpdater == null
assert !databaseOptions.enableDbAssertionsWhenUsingAssertions()
|| prevDataToSendToUpdater == null
|| prevDataToSendToUpdater.readerIndex() == 0
|| !prevDataToSendToUpdater.isReadable();
}
} finally {
if (prevDataToSendToUpdater != null) {
prevDataToSendToUpdater.release();
@ -827,14 +781,9 @@ public class LLLocalDictionary implements LLDictionary {
lock.unlock(stamp);
}
}
})
.onErrorMap(cause -> new IOException("Failed to read or write " + LLUtils.toStringSafe(key), cause))
.subscribeOn(dbScheduler)
.doFirst(key::retain)
.doAfterTerminate(key::release);
} finally {
key.release();
}
}).onErrorMap(cause -> new IOException("Failed to read or write " + LLUtils.toStringSafe(key), cause)),
key -> Mono.fromRunnable(key::release)
);
}
private void dbDelete(ColumnFamilyHandle cfh, @Nullable WriteOptions writeOptions, ByteBuf key)
@ -855,12 +804,12 @@ public class LLLocalDictionary implements LLDictionary {
}
@Override
public Mono<ByteBuf> remove(ByteBuf key, LLDictionaryResultType resultType) {
try {
return Mono
.defer(() -> getPreviousData(key.retain(), resultType))
.concatWith(Mono
.fromCallable(() -> {
public Mono<ByteBuf> remove(Mono<ByteBuf> keyMono, LLDictionaryResultType resultType) {
return Mono.usingWhen(keyMono,
key -> this
.getPreviousData(Mono.just(key).map(ByteBuf::retain), resultType)
.concatWith(this
.<ByteBuf>runOnDb(() -> {
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
@ -884,29 +833,21 @@ public class LLLocalDictionary implements LLDictionary {
}
})
.onErrorMap(cause -> new IOException("Failed to delete " + LLUtils.toStringSafe(key), cause))
.subscribeOn(dbScheduler)
.then(Mono.empty())
)
.singleOrEmpty()
.doFirst(key::retain)
.doAfterTerminate(key::release);
} finally {
key.release();
}
.singleOrEmpty(),
key -> Mono.fromCallable(key::release));
}
private Mono<ByteBuf> getPreviousData(ByteBuf key, LLDictionaryResultType resultType) {
try {
private Mono<ByteBuf> getPreviousData(Mono<ByteBuf> keyMono, LLDictionaryResultType resultType) {
return Mono
.defer(() -> switch (resultType) {
.usingWhen(keyMono,
key -> switch (resultType) {
case PREVIOUS_VALUE_EXISTENCE -> this
.containsKey(null, key.retain())
.containsKey(null, Mono.just(key).map(ByteBuf::retain))
.single()
.map(LLUtils::booleanToResponseByteBuffer)
.doAfterTerminate(() -> {
if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) {
assert key.refCnt() > 0;
}
assert !databaseOptions.enableDbAssertionsWhenUsingAssertions() || key.refCnt() > 0;
});
case PREVIOUS_VALUE -> Mono
.fromCallable(() -> {
@ -932,9 +873,7 @@ public class LLLocalDictionary implements LLDictionary {
try {
return dbGet(cfh, null, key.retain(), true);
} finally {
if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) {
assert key.refCnt() > 0;
}
assert !databaseOptions.enableDbAssertionsWhenUsingAssertions() || key.refCnt() > 0;
}
}
} else {
@ -949,12 +888,9 @@ public class LLLocalDictionary implements LLDictionary {
.onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause))
.subscribeOn(dbScheduler);
case VOID -> Mono.empty();
})
.doFirst(key::retain)
.doAfterTerminate(key::release);
} finally {
key.release();
}
},
key -> Mono.fromRunnable(key::release)
);
}
@Override
@ -1284,61 +1220,51 @@ public class LLLocalDictionary implements LLDictionary {
@Override
public Flux<Entry<ByteBuf, ByteBuf>> getRange(@Nullable LLSnapshot snapshot,
LLRange range,
Mono<LLRange> rangeMono,
boolean existsAlmostCertainly) {
try {
return Flux
.defer(() -> {
return Flux.usingWhen(rangeMono,
range -> {
if (range.isSingle()) {
return getRangeSingle(snapshot, range.getMin().retain(), existsAlmostCertainly);
return getRangeSingle(snapshot, Mono.just(range.getMin()).map(ByteBuf::retain), existsAlmostCertainly);
} else {
return getRangeMulti(snapshot, range.retain());
}
})
.doFirst(range::retain)
.doAfterTerminate(range::release);
} finally {
range.release();
return getRangeMulti(snapshot, Mono.just(range).map(LLRange::retain));
}
},
range -> Mono.fromRunnable(range::release)
);
}
@Override
public Flux<List<Entry<ByteBuf, ByteBuf>>> getRangeGrouped(@Nullable LLSnapshot snapshot,
LLRange range,
Mono<LLRange> rangeMono,
int prefixLength, boolean existsAlmostCertainly) {
try {
return Flux
.defer(() -> {
return Flux.usingWhen(rangeMono,
range -> {
if (range.isSingle()) {
return getRangeSingle(snapshot, range.getMin().retain(), existsAlmostCertainly).map(List::of);
var rangeSingleMono = Mono.just(range.getMin()).map(ByteBuf::retain);
return getRangeSingle(snapshot, rangeSingleMono, existsAlmostCertainly).map(List::of);
} else {
return getRangeMultiGrouped(snapshot, range.retain(), prefixLength);
}
})
.doFirst(range::retain)
.doAfterTerminate(range::release);
} finally {
range.release();
return getRangeMultiGrouped(snapshot, Mono.just(range).map(LLRange::retain), prefixLength);
}
},
range -> Mono.fromRunnable(range::release)
);
}
private Flux<Entry<ByteBuf, ByteBuf>> getRangeSingle(LLSnapshot snapshot, ByteBuf key, boolean existsAlmostCertainly) {
try {
return Mono
.defer(() -> this.get(snapshot, key.retain(), existsAlmostCertainly))
.map(value -> Map.entry(key.retain(), value))
.flux()
.doFirst(key::retain)
.doAfterTerminate(key::release);
} finally {
key.release();
}
private Flux<Entry<ByteBuf, ByteBuf>> getRangeSingle(LLSnapshot snapshot,
Mono<ByteBuf> keyMono,
boolean existsAlmostCertainly) {
return Flux.usingWhen(keyMono,
key -> this
.get(snapshot, Mono.just(key).map(ByteBuf::retain), existsAlmostCertainly)
.map(value -> Map.entry(key.retain(), value)),
key -> Mono.fromRunnable(key::release)
);
}
@SuppressWarnings("Convert2MethodRef")
private Flux<Entry<ByteBuf, ByteBuf>> getRangeMulti(LLSnapshot snapshot, LLRange range) {
try {
return Flux
private Flux<Entry<ByteBuf, ByteBuf>> getRangeMulti(LLSnapshot snapshot, Mono<LLRange> rangeMono) {
return Flux.usingWhen(rangeMono,
range -> Flux
.using(
() -> new LLLocalEntryReactiveRocksIterator(db,
alloc,
@ -1348,27 +1274,18 @@ public class LLLocalDictionary implements LLDictionary {
resolveSnapshot(snapshot),
getRangeMultiDebugName
),
llLocalEntryReactiveRocksIterator -> llLocalEntryReactiveRocksIterator.flux(),
llLocalEntryReactiveRocksIterator -> llLocalEntryReactiveRocksIterator
.flux()
.subscribeOn(dbScheduler),
LLLocalReactiveRocksIterator::release
)
.doOnDiscard(Entry.class, entry -> {
//noinspection unchecked
var castedEntry = (Entry<ByteBuf, ByteBuf>) entry;
castedEntry.getKey().release();
castedEntry.getValue().release();
})
.subscribeOn(dbScheduler)
.doFirst(range::retain)
.doAfterTerminate(range::release);
} finally {
range.release();
}
),
range -> Mono.fromRunnable(range::release)
);
}
@SuppressWarnings("Convert2MethodRef")
private Flux<List<Entry<ByteBuf, ByteBuf>>> getRangeMultiGrouped(LLSnapshot snapshot, LLRange range, int prefixLength) {
try {
return Flux
private Flux<List<Entry<ByteBuf, ByteBuf>>> getRangeMultiGrouped(LLSnapshot snapshot, Mono<LLRange> rangeMono, int prefixLength) {
return Flux.usingWhen(rangeMono,
range -> Flux
.using(
() -> new LLLocalGroupedEntryReactiveRocksIterator(db,
alloc,
@ -1379,39 +1296,35 @@ public class LLLocalDictionary implements LLDictionary {
resolveSnapshot(snapshot),
"getRangeMultiGrouped"
),
llLocalGroupedEntryReactiveRocksIterator -> llLocalGroupedEntryReactiveRocksIterator.flux(),
reactiveRocksIterator -> reactiveRocksIterator
.flux()
.subscribeOn(dbScheduler),
LLLocalGroupedReactiveRocksIterator::release
)
.subscribeOn(dbScheduler)
.doFirst(range::retain)
.doAfterTerminate(range::release);
} finally {
range.release();
}
),
range -> Mono.fromRunnable(range::release)
);
}
@Override
public Flux<ByteBuf> getRangeKeys(@Nullable LLSnapshot snapshot, LLRange range) {
try {
return Flux
.defer(() -> {
public Flux<ByteBuf> getRangeKeys(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono) {
return Flux.usingWhen(rangeMono,
range -> {
if (range.isSingle()) {
return this.getRangeKeysSingle(snapshot, range.getMin().retain());
return this.getRangeKeysSingle(snapshot, Mono.just(range.getMin()).map(ByteBuf::retain));
} else {
return this.getRangeKeysMulti(snapshot, range.retain());
}
})
.doFirst(range::retain)
.doAfterTerminate(range::release);
} finally {
range.release();
return this.getRangeKeysMulti(snapshot, Mono.just(range).map(LLRange::retain));
}
},
range -> Mono.fromRunnable(range::release)
);
}
@Override
public Flux<List<ByteBuf>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) {
try {
return Flux
public Flux<List<ByteBuf>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot,
Mono<LLRange> rangeMono,
int prefixLength) {
return Flux.usingWhen(rangeMono,
range -> Flux
.using(
() -> new LLLocalGroupedKeyReactiveRocksIterator(db,
alloc,
@ -1421,21 +1334,18 @@ public class LLLocalDictionary implements LLDictionary {
databaseOptions.allowNettyDirect(),
resolveSnapshot(snapshot),
"getRangeKeysGrouped"
),
LLLocalGroupedReactiveRocksIterator::flux,
), reactiveRocksIterator -> reactiveRocksIterator.flux()
.subscribeOn(dbScheduler),
LLLocalGroupedReactiveRocksIterator::release
)
.subscribeOn(dbScheduler)
.doFirst(range::retain)
.doAfterTerminate(range::release);
} finally {
range.release();
}
),
range -> Mono.fromRunnable(range::release)
);
}
@Override
public Flux<BadBlock> badBlocks(LLRange range) {
return Flux
public Flux<BadBlock> badBlocks(Mono<LLRange> rangeMono) {
return Flux.usingWhen(rangeMono,
range -> Flux
.<BadBlock>create(sink -> {
try (var ro = new ReadOptions(getReadOptions(null))) {
ro.setFillCache(false);
@ -1470,15 +1380,15 @@ public class LLLocalDictionary implements LLDictionary {
sink.error(ex);
}
})
.subscribeOn(dbScheduler)
.doFirst(range::retain)
.doAfterTerminate(range::release);
.subscribeOn(dbScheduler),
range -> Mono.fromRunnable(range::release)
);
}
@Override
public Flux<ByteBuf> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) {
try {
return Flux
public Flux<ByteBuf> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono, int prefixLength) {
return Flux.usingWhen(rangeMono,
range -> Flux
.using(
() -> new LLLocalKeyPrefixReactiveRocksIterator(db,
alloc,
@ -1493,18 +1403,15 @@ public class LLLocalDictionary implements LLDictionary {
LLLocalKeyPrefixReactiveRocksIterator::flux,
LLLocalKeyPrefixReactiveRocksIterator::release
)
.subscribeOn(dbScheduler)
.doFirst(range::retain)
.doAfterTerminate(range::release);
} finally {
range.release();
}
.subscribeOn(dbScheduler),
range -> Mono.fromRunnable(range::release)
);
}
private Flux<ByteBuf> getRangeKeysSingle(LLSnapshot snapshot, ByteBuf key) {
try {
return Mono
.defer(() -> this.containsKey(snapshot, key.retain()))
private Flux<ByteBuf> getRangeKeysSingle(LLSnapshot snapshot, Mono<ByteBuf> keyMono) {
return Flux.usingWhen(keyMono,
key -> this
.containsKey(snapshot, Mono.just(key).map(ByteBuf::retain))
.flux()
.<ByteBuf>handle((contains, sink) -> {
if (contains) {
@ -1513,18 +1420,15 @@ public class LLLocalDictionary implements LLDictionary {
sink.complete();
}
})
.doOnDiscard(ByteBuf.class, ReferenceCounted::release)
.doFirst(key::retain)
.doAfterTerminate(key::release);
} finally {
key.release();
}
.doOnDiscard(ByteBuf.class, ReferenceCounted::release),
key -> Mono.fromRunnable(key::release)
);
}
@SuppressWarnings("Convert2MethodRef")
private Flux<ByteBuf> getRangeKeysMulti(LLSnapshot snapshot, LLRange range) {
try {
return Flux
private Flux<ByteBuf> getRangeKeysMulti(LLSnapshot snapshot, Mono<LLRange> rangeMono) {
return Flux.usingWhen(rangeMono,
range -> Flux
.using(
() -> new LLLocalKeyReactiveRocksIterator(db,
alloc,
@ -1538,17 +1442,15 @@ public class LLLocalDictionary implements LLDictionary {
LLLocalReactiveRocksIterator::release
)
.doOnDiscard(ByteBuf.class, ReferenceCounted::release)
.subscribeOn(dbScheduler)
.doFirst(range::retain)
.doAfterTerminate(range::release);
} finally {
range.release();
}
.subscribeOn(dbScheduler),
range -> Mono.fromRunnable(range::release)
);
}
@Override
public Mono<Void> setRange(LLRange range, Flux<Entry<ByteBuf, ByteBuf>> entries) {
try {
public Mono<Void> setRange(Mono<LLRange> rangeMono, Flux<Entry<ByteBuf, ByteBuf>> entries) {
return Mono.usingWhen(rangeMono,
range -> {
if (USE_WINDOW_IN_SET_RANGE) {
return Mono
.<Void>fromCallable(() -> {
@ -1673,17 +1575,15 @@ public class LLLocalDictionary implements LLDictionary {
)
)
.then()
.onErrorMap(cause -> new IOException("Failed to write range", cause))
.doFirst(range::retain)
.doAfterTerminate(range::release);
.onErrorMap(cause -> new IOException("Failed to write range", cause));
} else {
if (USE_WRITE_BATCHES_IN_SET_RANGE) {
return Mono.fromCallable(() -> {
throw new UnsupportedOperationException("Can't use write batches in setRange without window. Please fix params");
});
}
return Flux
.defer(() -> this.getRange(null, range.retain(), false))
return this
.getRange(null, Mono.just(range).map(LLRange::retain), false)
.flatMap(oldValue -> Mono
.<Void>fromCallable(() -> {
try {
@ -1697,20 +1597,29 @@ public class LLLocalDictionary implements LLDictionary {
.subscribeOn(dbScheduler)
)
.then(entries
.flatMap(entry -> this.put(entry.getKey(), entry.getValue(), LLDictionaryResultType.VOID))
.doOnNext(ReferenceCounted::release)
.flatMap(entry -> Mono.using(
() -> entry,
releasableEntry -> this
.put(Mono.just(entry.getKey()).map(ByteBuf::retain),
Mono.just(entry.getValue()).map(ByteBuf::retain),
LLDictionaryResultType.VOID
)
.doOnNext(ReferenceCounted::release),
releasableEntry -> {
releasableEntry.getKey().release();
releasableEntry.getValue().release();
})
)
.then(Mono.<Void>empty())
)
.onErrorMap(cause -> new IOException("Failed to write range", cause))
.doFirst(range::retain)
.doAfterTerminate(range::release);
}
} finally {
range.release();
.onErrorMap(cause -> new IOException("Failed to write range", cause));
}
},
range -> Mono.fromRunnable(range::release)
);
}
//todo: this is broken, check why
//todo: this is broken, check why. (is this still true?)
private void deleteSmallRangeWriteBatch(CappedWriteBatch writeBatch, LLRange range)
throws RocksDBException {
try (var readOpts = new ReadOptions(getReadOptions(null))) {
@ -1932,18 +1841,15 @@ public class LLLocalDictionary implements LLDictionary {
}
@Override
public Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, LLRange range, boolean fast) {
try {
return Mono
.defer(() -> {
public Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono, boolean fast) {
return Mono.usingWhen(rangeMono,
range -> {
if (range.isAll()) {
return Mono
.fromCallable(() -> fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot))
.onErrorMap(IOException::new)
.subscribeOn(dbScheduler);
return this
.runOnDb(() -> fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot))
.onErrorMap(IOException::new);
} else {
return Mono
.fromCallable(() -> {
return runOnDb(() -> {
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
readOpts.setFillCache(false);
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
@ -1998,24 +1904,17 @@ public class LLLocalDictionary implements LLDictionary {
minBound.release();
}
}
})
.onErrorMap(cause -> new IOException("Failed to get size of range "
+ range, cause))
.subscribeOn(dbScheduler);
}
})
.doFirst(range::retain)
.doAfterTerminate(range::release);
} finally {
range.release();
}).onErrorMap(cause -> new IOException("Failed to get size of range " + range, cause));
}
},
range -> Mono.fromRunnable(range::release)
);
}
@Override
public Mono<Entry<ByteBuf, ByteBuf>> getOne(@Nullable LLSnapshot snapshot, LLRange range) {
try {
return Mono
.fromCallable(() -> {
public Mono<Entry<ByteBuf, ByteBuf>> getOne(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono) {
return Mono.usingWhen(rangeMono,
range -> runOnDb(() -> {
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
ReleasableSlice minBound;
if (range.hasMin()) {
@ -2067,20 +1966,15 @@ public class LLLocalDictionary implements LLDictionary {
minBound.release();
}
}
})
.subscribeOn(dbScheduler)
.doFirst(range::retain)
.doAfterTerminate(range::release);
} finally {
range.release();
}
}),
range -> Mono.fromRunnable(range::release)
);
}
@Override
public Mono<ByteBuf> getOneKey(@Nullable LLSnapshot snapshot, LLRange range) {
try {
return Mono
.fromCallable(() -> {
public Mono<ByteBuf> getOneKey(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono) {
return Mono.usingWhen(rangeMono,
range -> runOnDb(() -> {
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
ReleasableSlice minBound;
if (range.hasMin()) {
@ -2124,13 +2018,9 @@ public class LLLocalDictionary implements LLDictionary {
minBound.release();
}
}
})
.subscribeOn(dbScheduler)
.doFirst(range::retain)
.doAfterTerminate(range::release);
} finally {
range.release();
}
}),
range -> Mono.fromRunnable(range::release)
);
}
private long fastSizeAll(@Nullable LLSnapshot snapshot) throws RocksDBException {
@ -2242,10 +2132,9 @@ public class LLLocalDictionary implements LLDictionary {
}
@Override
public Mono<Entry<ByteBuf, ByteBuf>> removeOne(LLRange range) {
try {
return Mono
.fromCallable(() -> {
public Mono<Entry<ByteBuf, ByteBuf>> removeOne(Mono<LLRange> rangeMono) {
return Mono.usingWhen(rangeMono,
range -> runOnDb(() -> {
try (var readOpts = new ReadOptions(getReadOptions(null))) {
ReleasableSlice minBound;
if (range.hasMin()) {
@ -2289,14 +2178,9 @@ public class LLLocalDictionary implements LLDictionary {
minBound.release();
}
}
})
.onErrorMap(cause -> new IOException("Failed to delete " + range.toString(), cause))
.subscribeOn(dbScheduler)
.doFirst(range::retain)
.doAfterTerminate(range::release);
} finally {
range.release();
}
}).onErrorMap(cause -> new IOException("Failed to delete " + range.toString(), cause)),
range -> Mono.fromRunnable(range::release)
);
}
@NotNull