Fix some memory leaks
This commit is contained in:
parent
82f8e91e99
commit
d253111233
@ -538,23 +538,6 @@ public class LLUtils {
|
||||
}
|
||||
}
|
||||
|
||||
// todo: remove this ugly method
|
||||
/**
|
||||
* cleanup resource
|
||||
* @param cleanupOnSuccess if true the resource will be cleaned up if the function is successful
|
||||
*/
|
||||
public static <U, T extends Resource<T>> Mono<U> usingSend(Mono<Send<T>> resourceSupplier,
|
||||
Function<Send<T>, Mono<U>> resourceClosure,
|
||||
boolean cleanupOnSuccess) {
|
||||
return Mono.usingWhen(resourceSupplier, resourceClosure, r -> {
|
||||
if (cleanupOnSuccess) {
|
||||
return Mono.fromRunnable(() -> r.close());
|
||||
} else {
|
||||
return Mono.empty();
|
||||
}
|
||||
}, (r, ex) -> Mono.fromRunnable(() -> r.close()), r -> Mono.fromRunnable(() -> r.close()));
|
||||
}
|
||||
|
||||
// todo: remove this ugly method
|
||||
/**
|
||||
* cleanup resource
|
||||
@ -613,36 +596,6 @@ public class LLUtils {
|
||||
}));
|
||||
}
|
||||
|
||||
// todo: remove this ugly method
|
||||
/**
|
||||
* cleanup resource
|
||||
* @param cleanupOnSuccess if true the resource will be cleaned up if the function is successful
|
||||
*/
|
||||
public static <U, T extends Resource<T>, V extends T> Flux<U> usingEachResource(Flux<V> resourceSupplier,
|
||||
Function<V, Mono<U>> resourceClosure,
|
||||
boolean cleanupOnSuccess) {
|
||||
return resourceSupplier
|
||||
.concatMap(resource -> Mono.usingWhen(Mono.just(resource), resourceClosure, r -> {
|
||||
if (cleanupOnSuccess) {
|
||||
return Mono.fromRunnable(() -> {
|
||||
if (r.isAccessible()) {
|
||||
r.close();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
return Mono.empty();
|
||||
}
|
||||
}, (r, ex) -> Mono.fromRunnable(() -> {
|
||||
if (r.isAccessible()) {
|
||||
r.close();
|
||||
}
|
||||
}), r -> Mono.fromRunnable(() -> {
|
||||
if (r.isAccessible()) {
|
||||
r.close();
|
||||
}
|
||||
})));
|
||||
}
|
||||
|
||||
// todo: remove this ugly method
|
||||
/**
|
||||
* cleanup resource
|
||||
@ -668,35 +621,6 @@ public class LLUtils {
|
||||
}));
|
||||
}
|
||||
|
||||
// todo: remove this ugly method
|
||||
/**
|
||||
* cleanup resource
|
||||
* @param cleanupOnSuccess if true the resource will be cleaned up if the function is successful
|
||||
*/
|
||||
public static <U, T extends Resource<T>> Flux<U> usingSendResources(Mono<Send<T>> resourceSupplier,
|
||||
Function<T, Flux<U>> resourceClosure,
|
||||
boolean cleanupOnSuccess) {
|
||||
return Flux.usingWhen(resourceSupplier.map(Send::receive), resourceClosure, r -> {
|
||||
if (cleanupOnSuccess) {
|
||||
return Mono.fromRunnable(() -> {
|
||||
if (r.isAccessible()) {
|
||||
r.close();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
return Mono.empty();
|
||||
}
|
||||
}, (r, ex) -> Mono.fromRunnable(() -> {
|
||||
if (r.isAccessible()) {
|
||||
r.close();
|
||||
}
|
||||
}), r -> Mono.fromRunnable(() -> {
|
||||
if (r.isAccessible()) {
|
||||
r.close();
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
public static boolean isSet(ScoreDoc[] scoreDocs) {
|
||||
for (ScoreDoc scoreDoc : scoreDocs) {
|
||||
if (scoreDoc == null) {
|
||||
@ -873,28 +797,40 @@ public class LLUtils {
|
||||
}
|
||||
|
||||
public static Mono<Buffer> resolveLLDelta(Mono<LLDelta> prev, UpdateReturnMode updateReturnMode) {
|
||||
return prev.handle((deltaToReceive, sink) -> {
|
||||
try (var delta = deltaToReceive) {
|
||||
switch (updateReturnMode) {
|
||||
case GET_NEW_VALUE -> {
|
||||
var current = delta.currentUnsafe();
|
||||
if (current != null) {
|
||||
sink.next(current.copy());
|
||||
} else {
|
||||
sink.complete();
|
||||
}
|
||||
return prev.handle((delta, sink) -> {
|
||||
final Buffer previous = delta.previousUnsafe();
|
||||
final Buffer current = delta.currentUnsafe();
|
||||
switch (updateReturnMode) {
|
||||
case GET_NEW_VALUE -> {
|
||||
if (previous != null && previous.isAccessible()) {
|
||||
previous.close();
|
||||
}
|
||||
case GET_OLD_VALUE -> {
|
||||
var previous = delta.previousUnsafe();
|
||||
if (previous != null) {
|
||||
sink.next(previous.copy());
|
||||
} else {
|
||||
sink.complete();
|
||||
}
|
||||
if (current != null) {
|
||||
sink.next(current);
|
||||
} else {
|
||||
sink.complete();
|
||||
}
|
||||
case NOTHING -> sink.complete();
|
||||
default -> sink.error(new IllegalStateException());
|
||||
}
|
||||
case GET_OLD_VALUE -> {
|
||||
if (current != null && current.isAccessible()) {
|
||||
current.close();
|
||||
}
|
||||
if (previous != null) {
|
||||
sink.next(previous);
|
||||
} else {
|
||||
sink.complete();
|
||||
}
|
||||
}
|
||||
case NOTHING -> {
|
||||
if (previous != null && previous.isAccessible()) {
|
||||
previous.close();
|
||||
}
|
||||
if (current != null && current.isAccessible()) {
|
||||
current.close();
|
||||
}
|
||||
sink.complete();
|
||||
}
|
||||
default -> sink.error(new IllegalStateException());
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -985,7 +921,7 @@ public class LLUtils {
|
||||
private static void onNextDropped(Object next) {
|
||||
if (next instanceof Send<?> send) {
|
||||
send.close();
|
||||
} else if (next instanceof Resource<?> resource) {
|
||||
} else if (next instanceof Resource<?> resource && resource.isAccessible()) {
|
||||
resource.close();
|
||||
} else if (next instanceof Iterable<?> iterable) {
|
||||
iterable.forEach(LLUtils::onNextDropped);
|
||||
@ -995,12 +931,6 @@ public class LLUtils {
|
||||
if (rocksObj.isOwningHandle()) {
|
||||
rocksObj.close();
|
||||
}
|
||||
} else if (next instanceof UpdateAtomicResultDelta delta) {
|
||||
delta.delta().close();
|
||||
} else if (next instanceof UpdateAtomicResultCurrent cur) {
|
||||
cur.current().close();
|
||||
} else if (next instanceof UpdateAtomicResultPrevious cur) {
|
||||
cur.previous().close();
|
||||
} else if (next instanceof Optional<?> optional) {
|
||||
optional.ifPresent(LLUtils::onNextDropped);
|
||||
} else if (next instanceof Map.Entry<?, ?> entry) {
|
||||
|
@ -63,8 +63,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
SerializerFixedBinaryLength<T> keySerializer,
|
||||
Serializer<U> valueSerializer,
|
||||
Runnable onClose) {
|
||||
return new DatabaseMapDictionary<>(dictionary, null, keySerializer,
|
||||
valueSerializer, onClose);
|
||||
return new DatabaseMapDictionary<>(dictionary, null, keySerializer, valueSerializer, onClose);
|
||||
}
|
||||
|
||||
public static <T, U> DatabaseMapDictionary<T, U> tail(LLDictionary dictionary,
|
||||
@ -128,33 +127,31 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
}
|
||||
|
||||
private void deserializeValue(T keySuffix, Buffer value, SynchronousSink<U> sink) {
|
||||
try (value) {
|
||||
try {
|
||||
sink.next(valueSerializer.deserialize(value));
|
||||
} catch (IndexOutOfBoundsException ex) {
|
||||
var exMessage = ex.getMessage();
|
||||
if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) {
|
||||
var totalZeroBytesErrors = this.totalZeroBytesErrors.incrementAndGet();
|
||||
if (totalZeroBytesErrors < 512 || totalZeroBytesErrors % 10000 == 0) {
|
||||
try (var keySuffixBytes = serializeKeySuffixToKey(keySuffix)) {
|
||||
LOG.error("Unexpected zero-bytes value at " + dictionary.getDatabaseName()
|
||||
+ ":" + dictionary.getColumnName()
|
||||
+ ":" + LLUtils.toStringSafe(this.keyPrefix)
|
||||
+ ":" + keySuffix + "(" + LLUtils.toStringSafe(keySuffixBytes) + ") total=" + totalZeroBytesErrors);
|
||||
} catch (SerializationException e) {
|
||||
LOG.error("Unexpected zero-bytes value at " + dictionary.getDatabaseName()
|
||||
+ ":" + dictionary.getColumnName()
|
||||
+ ":" + LLUtils.toStringSafe(this.keyPrefix)
|
||||
+ ":" + keySuffix + "(?) total=" + totalZeroBytesErrors);
|
||||
}
|
||||
try {
|
||||
sink.next(valueSerializer.deserialize(value));
|
||||
} catch (IndexOutOfBoundsException ex) {
|
||||
var exMessage = ex.getMessage();
|
||||
if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) {
|
||||
var totalZeroBytesErrors = this.totalZeroBytesErrors.incrementAndGet();
|
||||
if (totalZeroBytesErrors < 512 || totalZeroBytesErrors % 10000 == 0) {
|
||||
try (var keySuffixBytes = serializeKeySuffixToKey(keySuffix)) {
|
||||
LOG.error("Unexpected zero-bytes value at " + dictionary.getDatabaseName()
|
||||
+ ":" + dictionary.getColumnName()
|
||||
+ ":" + LLUtils.toStringSafe(this.keyPrefix)
|
||||
+ ":" + keySuffix + "(" + LLUtils.toStringSafe(keySuffixBytes) + ") total=" + totalZeroBytesErrors);
|
||||
} catch (SerializationException e) {
|
||||
LOG.error("Unexpected zero-bytes value at " + dictionary.getDatabaseName()
|
||||
+ ":" + dictionary.getColumnName()
|
||||
+ ":" + LLUtils.toStringSafe(this.keyPrefix)
|
||||
+ ":" + keySuffix + "(?) total=" + totalZeroBytesErrors);
|
||||
}
|
||||
sink.complete();
|
||||
} else {
|
||||
sink.error(ex);
|
||||
}
|
||||
} catch (Throwable ex) {
|
||||
sink.complete();
|
||||
} else {
|
||||
sink.error(ex);
|
||||
}
|
||||
} catch (Throwable ex) {
|
||||
sink.error(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@ -282,10 +279,12 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
@Override
|
||||
public Mono<U> getValue(@Nullable CompositeSnapshot snapshot, T keySuffix, boolean existsAlmostCertainly) {
|
||||
return dictionary
|
||||
.get(resolveSnapshot(snapshot),
|
||||
Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix))
|
||||
)
|
||||
.handle((valueToReceive, sink) -> deserializeValue(keySuffix, valueToReceive, sink));
|
||||
.get(resolveSnapshot(snapshot), Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)))
|
||||
.handle((valueToReceive, sink) -> {
|
||||
try (valueToReceive) {
|
||||
deserializeValue(keySuffix, valueToReceive, sink);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -304,20 +303,22 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<U> updateValue(T keySuffix,
|
||||
UpdateReturnMode updateReturnMode,
|
||||
public Mono<U> updateValue(T keySuffix, UpdateReturnMode updateReturnMode,
|
||||
SerializationFunction<@Nullable U, @Nullable U> updater) {
|
||||
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
|
||||
return dictionary
|
||||
.update(keyMono, getSerializedUpdater(updater), updateReturnMode)
|
||||
.handle((valueToReceive, sink) -> deserializeValue(keySuffix, valueToReceive, sink));
|
||||
.handle((valueToReceive, sink) -> {
|
||||
try (valueToReceive) {
|
||||
deserializeValue(keySuffix, valueToReceive, sink);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Delta<U>> updateValueAndGetDelta(T keySuffix,
|
||||
SerializationFunction<@Nullable U, @Nullable U> updater) {
|
||||
public Mono<Delta<U>> updateValueAndGetDelta(T keySuffix, SerializationFunction<@Nullable U, @Nullable U> updater) {
|
||||
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
|
||||
return dictionary
|
||||
return dictionary
|
||||
.updateAndGetDelta(keyMono, getSerializedUpdater(updater))
|
||||
.transform(mono -> LLUtils.mapLLDelta(mono, serialized -> {
|
||||
try (serialized) {
|
||||
@ -326,21 +327,18 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
}));
|
||||
}
|
||||
|
||||
public BinarySerializationFunction getSerializedUpdater(
|
||||
SerializationFunction<@Nullable U, @Nullable U> updater) {
|
||||
public BinarySerializationFunction getSerializedUpdater(SerializationFunction<@Nullable U, @Nullable U> updater) {
|
||||
return oldSerialized -> {
|
||||
try (oldSerialized) {
|
||||
U result;
|
||||
if (oldSerialized == null) {
|
||||
result = updater.apply(null);
|
||||
} else {
|
||||
result = updater.apply(valueSerializer.deserialize(oldSerialized));
|
||||
}
|
||||
if (result == null) {
|
||||
return null;
|
||||
} else {
|
||||
return serializeValue(result);
|
||||
}
|
||||
U result;
|
||||
if (oldSerialized == null) {
|
||||
result = updater.apply(null);
|
||||
} else {
|
||||
result = updater.apply(valueSerializer.deserialize(oldSerialized));
|
||||
}
|
||||
if (result == null) {
|
||||
return null;
|
||||
} else {
|
||||
return serializeValue(result);
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -372,7 +370,11 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
var valueMono = Mono.fromCallable(() -> serializeValue(value));
|
||||
return dictionary
|
||||
.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE)
|
||||
.handle((valueToReceive, sink) -> deserializeValue(keySuffix, valueToReceive, sink));
|
||||
.handle((valueToReceive, sink) -> {
|
||||
try (valueToReceive) {
|
||||
deserializeValue(keySuffix, valueToReceive, sink);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -381,7 +383,11 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
var valueMono = Mono.fromCallable(() -> serializeValue(value));
|
||||
return dictionary
|
||||
.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE)
|
||||
.handle((Buffer valueBuf, SynchronousSink<U> sink) -> deserializeValue(keySuffix, valueBuf, sink))
|
||||
.handle((Buffer valueBuf, SynchronousSink<U> sink) -> {
|
||||
try (valueBuf) {
|
||||
deserializeValue(keySuffix, valueBuf, sink);
|
||||
}
|
||||
})
|
||||
.map(oldValue -> !Objects.equals(oldValue, value))
|
||||
.defaultIfEmpty(value != null);
|
||||
}
|
||||
@ -400,7 +406,11 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
|
||||
return dictionary
|
||||
.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE)
|
||||
.handle((valueToReceive, sink) -> deserializeValue(keySuffix, valueToReceive, sink));
|
||||
.handle((valueToReceive, sink) -> {
|
||||
try (valueToReceive) {
|
||||
deserializeValue(keySuffix, valueToReceive, sink);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -925,21 +925,6 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
|
||||
}
|
||||
}
|
||||
|
||||
protected final Buffer applyUpdateAndCloseIfNecessary(BinarySerializationFunction updater,
|
||||
@Nullable Buffer prevDataToSendToUpdater)
|
||||
throws SerializationException {
|
||||
@Nullable Buffer newData = null;
|
||||
try {
|
||||
newData = updater.apply(prevDataToSendToUpdater);
|
||||
} finally {
|
||||
if (prevDataToSendToUpdater != newData && prevDataToSendToUpdater != null
|
||||
&& prevDataToSendToUpdater.isAccessible()) {
|
||||
prevDataToSendToUpdater.close();
|
||||
}
|
||||
}
|
||||
return newData;
|
||||
}
|
||||
|
||||
protected int getLevels() {
|
||||
var closeReadLock = closeLock.readLock();
|
||||
try {
|
||||
|
@ -257,87 +257,73 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
|
||||
@Override
|
||||
public Mono<Buffer> get(@Nullable LLSnapshot snapshot, Mono<Buffer> keyMono) {
|
||||
return keyMono
|
||||
.publishOn(dbRScheduler)
|
||||
.<Buffer>handle((key, sink) -> {
|
||||
try (key) {
|
||||
logger.trace(MARKER_ROCKSDB, "Reading {}", () -> toStringSafe(key));
|
||||
try {
|
||||
var readOptions = generateReadOptionsOrStatic(snapshot);
|
||||
Buffer result;
|
||||
startedGet.increment();
|
||||
try {
|
||||
result = getTime.recordCallable(() -> db.get(readOptions, key));
|
||||
} finally {
|
||||
endedGet.increment();
|
||||
if (readOptions != EMPTY_READ_OPTIONS) {
|
||||
readOptions.close();
|
||||
}
|
||||
}
|
||||
logger.trace(MARKER_ROCKSDB, "Read {}: {}", () -> toStringSafe(key), () -> toStringSafe(result));
|
||||
if (result != null) {
|
||||
sink.next(result);
|
||||
} else {
|
||||
sink.complete();
|
||||
}
|
||||
} catch (RocksDBException ex) {
|
||||
sink.error(new IOException("Failed to read " + toStringSafe(key) + ": " + ex.getMessage()));
|
||||
} catch (Exception ex) {
|
||||
sink.error(ex);
|
||||
}
|
||||
return Mono.usingWhen(keyMono, key -> runOnDb(false, () -> {
|
||||
logger.trace(MARKER_ROCKSDB, "Reading {}", () -> toStringSafe(key));
|
||||
try {
|
||||
var readOptions = generateReadOptionsOrStatic(snapshot);
|
||||
Buffer result;
|
||||
startedGet.increment();
|
||||
try {
|
||||
result = getTime.recordCallable(() -> db.get(readOptions, key));
|
||||
} finally {
|
||||
endedGet.increment();
|
||||
if (readOptions != EMPTY_READ_OPTIONS) {
|
||||
readOptions.close();
|
||||
}
|
||||
});
|
||||
}
|
||||
logger.trace(MARKER_ROCKSDB, "Read {}: {}", () -> toStringSafe(key), () -> toStringSafe(result));
|
||||
return result;
|
||||
} catch (RocksDBException ex) {
|
||||
throw new IOException("Failed to read " + toStringSafe(key) + ": " + ex.getMessage());
|
||||
}
|
||||
}), key -> Mono.fromRunnable(key::close));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono, boolean fillCache) {
|
||||
return rangeMono.publishOn(dbRScheduler).handle((range, sink) -> {
|
||||
try (range) {
|
||||
assert !Schedulers.isInNonBlockingThread() : "Called isRangeEmpty in a nonblocking thread";
|
||||
startedContains.increment();
|
||||
try {
|
||||
Boolean isRangeEmpty = containsTime.recordCallable(() -> {
|
||||
if (range.isSingle()) {
|
||||
return !containsKey(snapshot, range.getSingleUnsafe());
|
||||
} else {
|
||||
// Temporary resources to release after finished
|
||||
return Mono.usingWhen(rangeMono, range -> runOnDb(false, () -> {
|
||||
assert !Schedulers.isInNonBlockingThread() : "Called isRangeEmpty in a nonblocking thread";
|
||||
startedContains.increment();
|
||||
try {
|
||||
Boolean isRangeEmpty = containsTime.recordCallable(() -> {
|
||||
if (range.isSingle()) {
|
||||
return !containsKey(snapshot, range.getSingleUnsafe());
|
||||
} else {
|
||||
// Temporary resources to release after finished
|
||||
|
||||
try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot),
|
||||
true,
|
||||
isBoundedRange(range),
|
||||
true
|
||||
)) {
|
||||
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
|
||||
readOpts.setFillCache(fillCache);
|
||||
try (var rocksIterator = db.newIterator(readOpts, range.getMinUnsafe(), range.getMaxUnsafe())) {
|
||||
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
|
||||
if (nettyDirect && isReadOnlyDirect(range.getMinUnsafe())) {
|
||||
var seekBuf = ((ReadableComponent) range.getMinUnsafe()).readableBuffer();
|
||||
rocksIterator.seek(seekBuf);
|
||||
} else {
|
||||
var seekArray = LLUtils.toArray(range.getMinUnsafe());
|
||||
rocksIterator.seek(seekArray);
|
||||
}
|
||||
try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot),
|
||||
true,
|
||||
isBoundedRange(range),
|
||||
true
|
||||
)) {
|
||||
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
|
||||
readOpts.setFillCache(fillCache);
|
||||
try (var rocksIterator = db.newIterator(readOpts, range.getMinUnsafe(), range.getMaxUnsafe())) {
|
||||
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
|
||||
if (nettyDirect && isReadOnlyDirect(range.getMinUnsafe())) {
|
||||
var seekBuf = ((ReadableComponent) range.getMinUnsafe()).readableBuffer();
|
||||
rocksIterator.seek(seekBuf);
|
||||
} else {
|
||||
rocksIterator.seekToFirst();
|
||||
var seekArray = LLUtils.toArray(range.getMinUnsafe());
|
||||
rocksIterator.seek(seekArray);
|
||||
}
|
||||
return !rocksIterator.isValid();
|
||||
} else {
|
||||
rocksIterator.seekToFirst();
|
||||
}
|
||||
return !rocksIterator.isValid();
|
||||
}
|
||||
}
|
||||
});
|
||||
assert isRangeEmpty != null;
|
||||
sink.next(isRangeEmpty);
|
||||
} catch (RocksDBException ex) {
|
||||
sink.error(new RocksDBException("Failed to read range " + LLUtils.toStringSafe(range)
|
||||
+ ": " + ex.getMessage()));
|
||||
} finally {
|
||||
endedContains.increment();
|
||||
}
|
||||
} catch (Throwable ex) {
|
||||
sink.error(ex);
|
||||
}
|
||||
});
|
||||
assert isRangeEmpty != null;
|
||||
return isRangeEmpty;
|
||||
} catch (RocksDBException ex) {
|
||||
throw new RocksDBException("Failed to read range " + LLUtils.toStringSafe(range)
|
||||
+ ": " + ex.getMessage());
|
||||
} finally {
|
||||
endedContains.increment();
|
||||
}
|
||||
});
|
||||
}), range -> Mono.fromRunnable(range::close));
|
||||
}
|
||||
|
||||
private boolean containsKey(@Nullable LLSnapshot snapshot, Buffer key) throws RocksDBException {
|
||||
@ -375,24 +361,16 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
v.touch("put entry value")
|
||||
));
|
||||
// Write the new entry to the database
|
||||
Mono<Buffer> putMono = entryMono
|
||||
.publishOn(dbWScheduler)
|
||||
.handle((entry, sink) -> {
|
||||
try {
|
||||
try (entry) {
|
||||
var key = entry.getKeyUnsafe();
|
||||
var value = entry.getValueUnsafe();
|
||||
assert key != null : "Key is null";
|
||||
assert value != null : "Value is null";
|
||||
key.touch("Dictionary put key");
|
||||
value.touch("Dictionary put value");
|
||||
put(key, value);
|
||||
}
|
||||
sink.complete();
|
||||
} catch (Throwable ex) {
|
||||
sink.error(ex);
|
||||
}
|
||||
});
|
||||
Mono<Buffer> putMono = Mono.usingWhen(entryMono, entry -> runOnDb(true, () -> {
|
||||
var key = entry.getKeyUnsafe();
|
||||
var value = entry.getValueUnsafe();
|
||||
assert key != null : "Key is null";
|
||||
assert value != null : "Value is null";
|
||||
key.touch("Dictionary put key");
|
||||
value.touch("Dictionary put value");
|
||||
put(key, value);
|
||||
return null;
|
||||
}), entry -> Mono.fromRunnable(entry::close));
|
||||
// Read the previous data, then write the new data, then return the previous data
|
||||
return Flux.concatDelayError(Flux.just(previousDataMono, putMono), true, 1).singleOrEmpty();
|
||||
}
|
||||
@ -433,87 +411,67 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
public Mono<Buffer> update(Mono<Buffer> keyMono,
|
||||
BinarySerializationFunction updater,
|
||||
UpdateReturnMode updateReturnMode) {
|
||||
return keyMono
|
||||
.publishOn(dbWScheduler)
|
||||
.handle((key, sink) -> {
|
||||
try (key) {
|
||||
assert !Schedulers.isInNonBlockingThread() : "Called update in a nonblocking thread";
|
||||
if (updateMode == UpdateMode.DISALLOW) {
|
||||
sink.error(new UnsupportedOperationException("update() is disallowed"));
|
||||
return;
|
||||
}
|
||||
UpdateAtomicResultMode returnMode = switch (updateReturnMode) {
|
||||
case NOTHING -> UpdateAtomicResultMode.NOTHING;
|
||||
case GET_NEW_VALUE -> UpdateAtomicResultMode.CURRENT;
|
||||
case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS;
|
||||
};
|
||||
UpdateAtomicResult result;
|
||||
var readOptions = generateReadOptionsOrStatic(null);
|
||||
startedUpdates.increment();
|
||||
try (var writeOptions = new WriteOptions()) {
|
||||
result = updateTime.recordCallable(() ->
|
||||
db.updateAtomic(readOptions, writeOptions, key, updater, returnMode));
|
||||
} finally {
|
||||
endedUpdates.increment();
|
||||
if (readOptions != EMPTY_READ_OPTIONS) {
|
||||
readOptions.close();
|
||||
}
|
||||
}
|
||||
assert result != null;
|
||||
var previous = switch (updateReturnMode) {
|
||||
case NOTHING -> null;
|
||||
case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current();
|
||||
case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous();
|
||||
};
|
||||
if (previous != null) {
|
||||
sink.next(previous);
|
||||
} else {
|
||||
sink.complete();
|
||||
}
|
||||
} catch (Throwable ex) {
|
||||
sink.error(ex);
|
||||
}
|
||||
});
|
||||
return Mono.usingWhen(keyMono, key -> runOnDb(true, () -> {
|
||||
assert !Schedulers.isInNonBlockingThread() : "Called update in a nonblocking thread";
|
||||
if (updateMode == UpdateMode.DISALLOW) {
|
||||
throw new UnsupportedOperationException("update() is disallowed");
|
||||
}
|
||||
UpdateAtomicResultMode returnMode = switch (updateReturnMode) {
|
||||
case NOTHING -> UpdateAtomicResultMode.NOTHING;
|
||||
case GET_NEW_VALUE -> UpdateAtomicResultMode.CURRENT;
|
||||
case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS;
|
||||
};
|
||||
UpdateAtomicResult result;
|
||||
var readOptions = generateReadOptionsOrStatic(null);
|
||||
startedUpdates.increment();
|
||||
try (var writeOptions = new WriteOptions()) {
|
||||
result = updateTime.recordCallable(() -> db.updateAtomic(readOptions, writeOptions, key, updater, returnMode));
|
||||
} finally {
|
||||
endedUpdates.increment();
|
||||
if (readOptions != EMPTY_READ_OPTIONS) {
|
||||
readOptions.close();
|
||||
}
|
||||
}
|
||||
assert result != null;
|
||||
return switch (updateReturnMode) {
|
||||
case NOTHING -> {
|
||||
result.close();
|
||||
yield null;
|
||||
}
|
||||
case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current();
|
||||
case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous();
|
||||
};
|
||||
}), key -> Mono.fromRunnable(key::close));
|
||||
}
|
||||
|
||||
@SuppressWarnings("DuplicatedCode")
|
||||
@Override
|
||||
public Mono<LLDelta> updateAndGetDelta(Mono<Buffer> keyMono,
|
||||
BinarySerializationFunction updater) {
|
||||
return keyMono
|
||||
.publishOn(dbWScheduler)
|
||||
.handle((key, sink) -> {
|
||||
try (key) {
|
||||
key.touch("low-level dictionary update");
|
||||
assert !Schedulers.isInNonBlockingThread() : "Called update in a nonblocking thread";
|
||||
if (updateMode == UpdateMode.DISALLOW) {
|
||||
sink.error(new UnsupportedOperationException("update() is disallowed"));
|
||||
return;
|
||||
}
|
||||
if (updateMode == UpdateMode.ALLOW && !db.supportsTransactions()) {
|
||||
sink.error(new UnsupportedOperationException("update() is disallowed because the database doesn't support"
|
||||
+ "safe atomic operations"));
|
||||
return;
|
||||
}
|
||||
public Mono<LLDelta> updateAndGetDelta(Mono<Buffer> keyMono, BinarySerializationFunction updater) {
|
||||
return Mono.usingWhen(keyMono, key -> runOnDb(true, () -> {
|
||||
key.touch("low-level dictionary update");
|
||||
assert !Schedulers.isInNonBlockingThread() : "Called update in a nonblocking thread";
|
||||
if (updateMode == UpdateMode.DISALLOW) {
|
||||
throw new UnsupportedOperationException("update() is disallowed");
|
||||
}
|
||||
if (updateMode == UpdateMode.ALLOW && !db.supportsTransactions()) {
|
||||
throw new UnsupportedOperationException("update() is disallowed because the database doesn't support"
|
||||
+ "safe atomic operations");
|
||||
}
|
||||
|
||||
UpdateAtomicResult result;
|
||||
var readOptions = generateReadOptionsOrStatic(null);
|
||||
startedUpdates.increment();
|
||||
try (var writeOptions = new WriteOptions()) {
|
||||
result = updateTime.recordCallable(() ->
|
||||
db.updateAtomic(readOptions, writeOptions, key, updater, DELTA));
|
||||
} finally {
|
||||
endedUpdates.increment();
|
||||
if (readOptions != EMPTY_READ_OPTIONS) {
|
||||
readOptions.close();
|
||||
}
|
||||
}
|
||||
assert result != null;
|
||||
sink.next(((UpdateAtomicResultDelta) result).delta());
|
||||
} catch (Throwable ex) {
|
||||
sink.error(ex);
|
||||
}
|
||||
});
|
||||
UpdateAtomicResult result;
|
||||
var readOptions = generateReadOptionsOrStatic(null);
|
||||
startedUpdates.increment();
|
||||
try (var writeOptions = new WriteOptions()) {
|
||||
result = updateTime.recordCallable(() -> db.updateAtomic(readOptions, writeOptions, key, updater, DELTA));
|
||||
} finally {
|
||||
endedUpdates.increment();
|
||||
if (readOptions != EMPTY_READ_OPTIONS) {
|
||||
readOptions.close();
|
||||
}
|
||||
}
|
||||
assert result != null;
|
||||
return ((UpdateAtomicResultDelta) result).delta();
|
||||
}), key -> Mono.fromRunnable(key::close));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -521,67 +479,47 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
// Obtain the previous value from the database
|
||||
Mono<Buffer> previousDataMono = this.getPreviousData(keyMono, resultType);
|
||||
// Delete the value from the database
|
||||
Mono<Buffer> removeMono = keyMono
|
||||
.publishOn(dbWScheduler)
|
||||
.handle((key, sink) -> {
|
||||
try (key) {
|
||||
logger.trace(MARKER_ROCKSDB, "Deleting {}", () -> toStringSafe(key));
|
||||
startedRemove.increment();
|
||||
try (var writeOptions = new WriteOptions()) {
|
||||
removeTime.recordCallable(() -> {
|
||||
db.delete(writeOptions, key);
|
||||
return null;
|
||||
});
|
||||
} finally {
|
||||
endedRemove.increment();
|
||||
}
|
||||
sink.complete();
|
||||
} catch (RocksDBException ex) {
|
||||
sink.error(new RocksDBException("Failed to delete: " + ex.getMessage()));
|
||||
} catch (Throwable ex) {
|
||||
sink.error(ex);
|
||||
}
|
||||
});
|
||||
Mono<Buffer> removeMono = Mono.usingWhen(keyMono, key -> runOnDb(true, () -> {
|
||||
try {
|
||||
logger.trace(MARKER_ROCKSDB, "Deleting {}", () -> toStringSafe(key));
|
||||
startedRemove.increment();
|
||||
try (var writeOptions = new WriteOptions()) {
|
||||
removeTime.recordCallable(() -> {
|
||||
db.delete(writeOptions, key);
|
||||
return null;
|
||||
});
|
||||
} finally {
|
||||
endedRemove.increment();
|
||||
}
|
||||
return null;
|
||||
} catch (RocksDBException ex) {
|
||||
throw new RocksDBException("Failed to delete: " + ex.getMessage());
|
||||
}
|
||||
}), key -> Mono.fromRunnable(key::close));
|
||||
// Read the previous data, then delete the data, then return the previous data
|
||||
return Flux.concat(previousDataMono, removeMono).singleOrEmpty();
|
||||
}
|
||||
|
||||
private Mono<Buffer> getPreviousData(Mono<Buffer> keyMono, LLDictionaryResultType resultType) {
|
||||
return switch (resultType) {
|
||||
case PREVIOUS_VALUE_EXISTENCE -> keyMono
|
||||
.publishOn(dbRScheduler)
|
||||
.handle((key, sink) -> {
|
||||
try (key) {
|
||||
var contained = containsKey(null, key);
|
||||
sink.next(LLUtils.booleanToResponseByteBuffer(alloc, contained));
|
||||
} catch (Throwable ex) {
|
||||
sink.error(ex);
|
||||
}
|
||||
});
|
||||
case PREVIOUS_VALUE -> keyMono
|
||||
.publishOn(dbRScheduler)
|
||||
.handle((key, sink) -> {
|
||||
try (key) {
|
||||
assert !Schedulers.isInNonBlockingThread() : "Called getPreviousData in a nonblocking thread";
|
||||
Buffer result;
|
||||
var readOptions = generateReadOptionsOrStatic(null);
|
||||
try {
|
||||
result = db.get(readOptions, key);
|
||||
} finally {
|
||||
if (readOptions != EMPTY_READ_OPTIONS) {
|
||||
readOptions.close();
|
||||
}
|
||||
}
|
||||
logger.trace(MARKER_ROCKSDB, "Read {}: {}", () -> toStringSafe(key), () -> toStringSafe(result));
|
||||
if (result == null) {
|
||||
sink.complete();
|
||||
} else {
|
||||
sink.next(result);
|
||||
}
|
||||
} catch (Throwable ex) {
|
||||
sink.error(ex);
|
||||
}
|
||||
});
|
||||
case PREVIOUS_VALUE_EXISTENCE -> Mono.usingWhen(keyMono, key -> runOnDb(false, () -> {
|
||||
var contained = containsKey(null, key);
|
||||
return LLUtils.booleanToResponseByteBuffer(alloc, contained);
|
||||
}), key -> Mono.fromRunnable(key::close));
|
||||
case PREVIOUS_VALUE -> Mono.usingWhen(keyMono, key -> runOnDb(false, () -> {
|
||||
assert !Schedulers.isInNonBlockingThread() : "Called getPreviousData in a nonblocking thread";
|
||||
Buffer result;
|
||||
var readOptions = generateReadOptionsOrStatic(null);
|
||||
try {
|
||||
result = db.get(readOptions, key);
|
||||
} finally {
|
||||
if (readOptions != EMPTY_READ_OPTIONS) {
|
||||
readOptions.close();
|
||||
}
|
||||
}
|
||||
logger.trace(MARKER_ROCKSDB, "Read {}: {}", () -> toStringSafe(key), () -> toStringSafe(result));
|
||||
return result;
|
||||
}), key -> Mono.fromRunnable(key::close));
|
||||
case VOID -> Mono.empty();
|
||||
};
|
||||
}
|
||||
@ -818,7 +756,6 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
});
|
||||
}
|
||||
|
||||
@SuppressWarnings("resource")
|
||||
private Flux<LLEntry> getRangeSingle(LLSnapshot snapshot, Mono<Buffer> keyMono) {
|
||||
return Mono
|
||||
.zip(keyMono, this.get(snapshot, keyMono))
|
||||
@ -963,20 +900,13 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
}
|
||||
|
||||
private Flux<Buffer> getRangeKeysSingle(LLSnapshot snapshot, Mono<Buffer> keyMono) {
|
||||
return keyMono
|
||||
.publishOn(dbRScheduler)
|
||||
.<Buffer>handle((key, sink) -> {
|
||||
try (key) {
|
||||
if (containsKey(snapshot, key)) {
|
||||
sink.next(key);
|
||||
} else {
|
||||
sink.complete();
|
||||
}
|
||||
} catch (Throwable ex) {
|
||||
sink.error(ex);
|
||||
}
|
||||
})
|
||||
.flux();
|
||||
return Mono.usingWhen(keyMono, key -> runOnDb(false, () -> {
|
||||
if (containsKey(snapshot, key)) {
|
||||
return key;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}), key -> Mono.fromRunnable(key::close)).flux();
|
||||
}
|
||||
|
||||
private record RocksObjTuple<T extends AbstractNativeReference, U extends Resource<?>>(T t1, U t2) implements SafeCloseable {
|
||||
@ -1006,30 +936,23 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
@Override
|
||||
public Mono<Void> setRange(Mono<LLRange> rangeMono, Flux<LLEntry> entries, boolean smallRange) {
|
||||
if (USE_WINDOW_IN_SET_RANGE) {
|
||||
return rangeMono
|
||||
.publishOn(dbWScheduler)
|
||||
.<Boolean>handle((range, sink) -> {
|
||||
return Mono
|
||||
.usingWhen(rangeMono, range -> runOnDb(true, () -> {
|
||||
try (var writeOptions = new WriteOptions(); range) {
|
||||
assert !Schedulers.isInNonBlockingThread() : "Called setRange in a nonblocking thread";
|
||||
if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) {
|
||||
try (var opts = LLUtils.generateCustomReadOptions(null,
|
||||
true,
|
||||
isBoundedRange(range),
|
||||
smallRange
|
||||
)) {
|
||||
SafeCloseable seekTo;
|
||||
try (var it = db.newIterator(opts, range.getMinUnsafe(), range.getMaxUnsafe())) {
|
||||
if (!PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
|
||||
it.seekTo(range.getMinUnsafe());
|
||||
} else {
|
||||
seekTo = null;
|
||||
it.seekToFirst();
|
||||
}
|
||||
while (it.isValid()) {
|
||||
db.delete(writeOptions, it.key());
|
||||
it.next();
|
||||
}
|
||||
try (var opts = LLUtils.generateCustomReadOptions(null, true, isBoundedRange(range), smallRange)) {
|
||||
try (var it = db.newIterator(opts, range.getMinUnsafe(), range.getMaxUnsafe())) {
|
||||
if (!PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
|
||||
it.seekTo(range.getMinUnsafe());
|
||||
} else {
|
||||
it.seekToFirst();
|
||||
}
|
||||
while (it.isValid()) {
|
||||
db.delete(writeOptions, it.key());
|
||||
it.next();
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) {
|
||||
try (var batch = new CappedWriteBatch(db,
|
||||
@ -1057,60 +980,56 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
batch.clear();
|
||||
}
|
||||
}
|
||||
sink.next(true);
|
||||
return true;
|
||||
} catch (RocksDBException ex) {
|
||||
sink.error(new RocksDBException("Failed to set a range: " + ex.getMessage()));
|
||||
throw new RocksDBException("Failed to set a range: " + ex.getMessage());
|
||||
}
|
||||
})
|
||||
}), range -> Mono.fromRunnable(range::close))
|
||||
.thenMany(entries.window(MULTI_GET_WINDOW))
|
||||
.flatMap(keysWindowFlux -> keysWindowFlux
|
||||
.collectList()
|
||||
.flatMap(entriesList -> this
|
||||
.<Void>runOnDb(true, () -> {
|
||||
try (var writeOptions = new WriteOptions()) {
|
||||
if (!USE_WRITE_BATCHES_IN_SET_RANGE) {
|
||||
for (LLEntry entry : entriesList) {
|
||||
db.put(writeOptions, entry.getKeyUnsafe(), entry.getValueUnsafe());
|
||||
}
|
||||
} else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) {
|
||||
try (var batch = new CappedWriteBatch(db,
|
||||
alloc,
|
||||
CAPPED_WRITE_BATCH_CAP,
|
||||
RESERVED_WRITE_BATCH_SIZE,
|
||||
MAX_WRITE_BATCH_SIZE,
|
||||
writeOptions
|
||||
)) {
|
||||
for (LLEntry entry : entriesList) {
|
||||
if (nettyDirect) {
|
||||
batch.put(cfh, entry.getKeyUnsafe().send(), entry.getValueUnsafe().send());
|
||||
} else {
|
||||
batch.put(cfh,
|
||||
LLUtils.toArray(entry.getKeyUnsafe()),
|
||||
LLUtils.toArray(entry.getValueUnsafe())
|
||||
);
|
||||
}
|
||||
}
|
||||
batch.flush();
|
||||
}
|
||||
} else {
|
||||
try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) {
|
||||
for (LLEntry entry : entriesList) {
|
||||
batch.put(cfh, LLUtils.toArray(entry.getKeyUnsafe()),
|
||||
LLUtils.toArray(entry.getValueUnsafe()));
|
||||
}
|
||||
db.write(writeOptions, batch);
|
||||
batch.clear();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
} finally {
|
||||
for (LLEntry entry : entriesList) {
|
||||
entry.close();
|
||||
}
|
||||
.flatMap(entriesList -> this.<Void>runOnDb(true, () -> {
|
||||
try (var writeOptions = new WriteOptions()) {
|
||||
if (!USE_WRITE_BATCHES_IN_SET_RANGE) {
|
||||
for (LLEntry entry : entriesList) {
|
||||
db.put(writeOptions, entry.getKeyUnsafe(), entry.getValueUnsafe());
|
||||
}
|
||||
})
|
||||
)
|
||||
)
|
||||
} else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) {
|
||||
try (var batch = new CappedWriteBatch(db,
|
||||
alloc,
|
||||
CAPPED_WRITE_BATCH_CAP,
|
||||
RESERVED_WRITE_BATCH_SIZE,
|
||||
MAX_WRITE_BATCH_SIZE,
|
||||
writeOptions
|
||||
)) {
|
||||
for (LLEntry entry : entriesList) {
|
||||
if (nettyDirect) {
|
||||
batch.put(cfh, entry.getKeyUnsafe().send(), entry.getValueUnsafe().send());
|
||||
} else {
|
||||
batch.put(cfh,
|
||||
LLUtils.toArray(entry.getKeyUnsafe()),
|
||||
LLUtils.toArray(entry.getValueUnsafe())
|
||||
);
|
||||
}
|
||||
}
|
||||
batch.flush();
|
||||
}
|
||||
} else {
|
||||
try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) {
|
||||
for (LLEntry entry : entriesList) {
|
||||
batch.put(cfh, LLUtils.toArray(entry.getKeyUnsafe()), LLUtils.toArray(entry.getValueUnsafe()));
|
||||
}
|
||||
db.write(writeOptions, batch);
|
||||
batch.clear();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
} finally {
|
||||
for (LLEntry entry : entriesList) {
|
||||
entry.close();
|
||||
}
|
||||
}
|
||||
})))
|
||||
.then()
|
||||
.onErrorMap(cause -> new IOException("Failed to write range", cause));
|
||||
} else {
|
||||
@ -1131,19 +1050,16 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
})
|
||||
.then(Mono.<Void>empty());
|
||||
|
||||
var putMono = entries
|
||||
.publishOn(dbWScheduler)
|
||||
.handle((entry, sink) -> {
|
||||
try (entry) {
|
||||
if (entry.getKeyUnsafe() != null && entry.getValueUnsafe() != null) {
|
||||
this.put(entry.getKeyUnsafe(), entry.getValueUnsafe());
|
||||
}
|
||||
sink.next(true);
|
||||
} catch (RocksDBException ex) {
|
||||
sink.error(new RocksDBException("Failed to write range: " + ex.getMessage()));
|
||||
}
|
||||
})
|
||||
.then(Mono.<Void>empty());
|
||||
var putMono = entries.publishOn(dbWScheduler).handle((entry, sink) -> {
|
||||
try (entry) {
|
||||
if (entry.getKeyUnsafe() != null && entry.getValueUnsafe() != null) {
|
||||
this.put(entry.getKeyUnsafe(), entry.getValueUnsafe());
|
||||
}
|
||||
sink.next(true);
|
||||
} catch (RocksDBException ex) {
|
||||
sink.error(new RocksDBException("Failed to write range: " + ex.getMessage()));
|
||||
}
|
||||
}).then(Mono.<Void>empty());
|
||||
|
||||
return deleteMono.then(putMono);
|
||||
}
|
||||
@ -1263,11 +1179,11 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
|
||||
@Override
|
||||
public Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono, boolean fast) {
|
||||
return rangeMono.publishOn(dbRScheduler).handle((range, sink) -> {
|
||||
try (range) {
|
||||
return Mono.usingWhen(rangeMono, range -> runOnDb(false, () -> {
|
||||
try {
|
||||
assert !Schedulers.isInNonBlockingThread() : "Called sizeRange in a nonblocking thread";
|
||||
if (range.isAll()) {
|
||||
sink.next(fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot));
|
||||
return fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot);
|
||||
} else {
|
||||
try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot),
|
||||
false,
|
||||
@ -1291,20 +1207,20 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
rocksIterator.next();
|
||||
i++;
|
||||
}
|
||||
sink.next(i);
|
||||
return i;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (RocksDBException ex) {
|
||||
sink.error(new RocksDBException("Failed to get size of range: " + ex.getMessage()));
|
||||
throw new RocksDBException("Failed to get size of range: " + ex.getMessage());
|
||||
}
|
||||
});
|
||||
}), range -> Mono.fromRunnable(range::close));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<LLEntry> getOne(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono) {
|
||||
return rangeMono.publishOn(dbRScheduler).handle((range, sink) -> {
|
||||
try (range) {
|
||||
return Mono.usingWhen(rangeMono, range -> runOnDb(false, () -> {
|
||||
try {
|
||||
assert !Schedulers.isInNonBlockingThread() : "Called getOne in a nonblocking thread";
|
||||
try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot), true, true, true)) {
|
||||
try (var rocksIterator = db.newIterator(readOpts, range.getMinUnsafe(), range.getMaxUnsafe())) {
|
||||
@ -1316,24 +1232,24 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
if (rocksIterator.isValid()) {
|
||||
try (var key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)) {
|
||||
try (var value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value)) {
|
||||
sink.next(LLEntry.of(key.touch("get-one key"), value.touch("get-one value")));
|
||||
return LLEntry.of(key.touch("get-one key"), value.touch("get-one value"));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
sink.complete();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (RocksDBException ex) {
|
||||
sink.error(new RocksDBException("Failed to get one entry: " + ex.getMessage()));
|
||||
throw new RocksDBException("Failed to get one entry: " + ex.getMessage());
|
||||
}
|
||||
});
|
||||
}), range -> Mono.fromRunnable(range::close));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Buffer> getOneKey(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono) {
|
||||
return rangeMono.publishOn(dbRScheduler).handle((range, sink) -> {
|
||||
try (range) {
|
||||
return Mono.usingWhen(rangeMono, range -> runOnDb(false, () -> {
|
||||
try {
|
||||
assert !Schedulers.isInNonBlockingThread() : "Called getOneKey in a nonblocking thread";
|
||||
try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot), true, true, true)) {
|
||||
try (var rocksIterator = db.newIterator(readOpts, range.getMinUnsafe(), range.getMaxUnsafe())) {
|
||||
@ -1343,16 +1259,16 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
rocksIterator.seekToFirst();
|
||||
}
|
||||
if (rocksIterator.isValid()) {
|
||||
sink.next(LLUtils.readDirectNioBuffer(alloc, rocksIterator::key));
|
||||
return LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
|
||||
} else {
|
||||
sink.complete();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (RocksDBException ex) {
|
||||
sink.error(new RocksDBException("Failed to get one key: " + ex.getMessage()));
|
||||
throw new RocksDBException("Failed to get one key: " + ex.getMessage());
|
||||
}
|
||||
});
|
||||
}), range -> Mono.fromRunnable(range::close));
|
||||
}
|
||||
|
||||
private long fastSizeAll(@Nullable LLSnapshot snapshot) throws RocksDBException {
|
||||
@ -1467,31 +1383,26 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
|
||||
@Override
|
||||
public Mono<LLEntry> removeOne(Mono<LLRange> rangeMono) {
|
||||
return rangeMono.publishOn(dbWScheduler).handle((range, sink) -> {
|
||||
try (range) {
|
||||
assert !Schedulers.isInNonBlockingThread() : "Called removeOne in a nonblocking thread";
|
||||
try (var readOpts = new ReadOptions();
|
||||
var writeOpts = new WriteOptions()) {
|
||||
try (var rocksIterator = db.newIterator(readOpts, range.getMinUnsafe(), range.getMaxUnsafe())) {
|
||||
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
|
||||
rocksIterator.seekTo(range.getMinUnsafe());
|
||||
} else {
|
||||
rocksIterator.seekToFirst();
|
||||
}
|
||||
if (!rocksIterator.isValid()) {
|
||||
sink.complete();
|
||||
return;
|
||||
}
|
||||
Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
|
||||
Buffer value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value);
|
||||
db.delete(writeOpts, key);
|
||||
sink.next(LLEntry.of(key, value));
|
||||
return Mono.usingWhen(rangeMono, range -> runOnDb(true, () -> {
|
||||
assert !Schedulers.isInNonBlockingThread() : "Called removeOne in a nonblocking thread";
|
||||
try (var readOpts = new ReadOptions();
|
||||
var writeOpts = new WriteOptions()) {
|
||||
try (var rocksIterator = db.newIterator(readOpts, range.getMinUnsafe(), range.getMaxUnsafe())) {
|
||||
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
|
||||
rocksIterator.seekTo(range.getMinUnsafe());
|
||||
} else {
|
||||
rocksIterator.seekToFirst();
|
||||
}
|
||||
if (!rocksIterator.isValid()) {
|
||||
return null;
|
||||
}
|
||||
Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
|
||||
Buffer value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value);
|
||||
db.delete(writeOpts, key);
|
||||
return LLEntry.of(key, value);
|
||||
}
|
||||
} catch (RocksDBException ex) {
|
||||
sink.error(ex);
|
||||
}
|
||||
});
|
||||
}), range -> Mono.fromRunnable(range::close));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -139,13 +139,14 @@ public class LLLocalSingleton implements LLSingleton {
|
||||
case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS;
|
||||
};
|
||||
UpdateAtomicResult result;
|
||||
try (key;
|
||||
var readOptions = new ReadOptions();
|
||||
var writeOptions = new WriteOptions()) {
|
||||
try (var readOptions = new ReadOptions(); var writeOptions = new WriteOptions()) {
|
||||
result = db.updateAtomic(readOptions, writeOptions, key, updater, returnMode);
|
||||
}
|
||||
return switch (updateReturnMode) {
|
||||
case NOTHING -> null;
|
||||
case NOTHING -> {
|
||||
result.close();
|
||||
yield null;
|
||||
}
|
||||
case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current();
|
||||
case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous();
|
||||
};
|
||||
@ -160,9 +161,7 @@ public class LLLocalSingleton implements LLSingleton {
|
||||
throw new UnsupportedOperationException("Called update in a nonblocking thread");
|
||||
}
|
||||
UpdateAtomicResult result;
|
||||
try (key;
|
||||
var readOptions = new ReadOptions();
|
||||
var writeOptions = new WriteOptions()) {
|
||||
try (var readOptions = new ReadOptions(); var writeOptions = new WriteOptions()) {
|
||||
result = db.updateAtomic(readOptions, writeOptions, key, updater, DELTA);
|
||||
}
|
||||
return ((UpdateAtomicResultDelta) result).delta();
|
||||
|
@ -96,8 +96,8 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
|
||||
boolean committedSuccessfully;
|
||||
int retries = 0;
|
||||
ExponentialPageLimits retryTime = null;
|
||||
Buffer sentPrevData;
|
||||
Buffer sentCurData;
|
||||
Buffer sentPrevData = null;
|
||||
Buffer sentCurData = null;
|
||||
boolean changed;
|
||||
do {
|
||||
var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true);
|
||||
@ -122,8 +122,14 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
|
||||
} else {
|
||||
prevDataToSendToUpdater = null;
|
||||
}
|
||||
|
||||
@Nullable Buffer newData = applyUpdateAndCloseIfNecessary(updater, prevDataToSendToUpdater);
|
||||
@Nullable Buffer newData;
|
||||
try {
|
||||
newData = updater.apply(prevDataToSendToUpdater);
|
||||
} finally {
|
||||
if (prevDataToSendToUpdater != null && prevDataToSendToUpdater.isAccessible()) {
|
||||
prevDataToSendToUpdater.close();
|
||||
}
|
||||
}
|
||||
try (newData) {
|
||||
var newDataArray = newData == null ? null : LLUtils.toArray(newData);
|
||||
if (logger.isTraceEnabled()) {
|
||||
@ -157,15 +163,21 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
|
||||
committedSuccessfully = true;
|
||||
tx.rollback();
|
||||
}
|
||||
if (sentPrevData != null && sentPrevData.isAccessible()) {
|
||||
sentPrevData.close();
|
||||
}
|
||||
if (sentCurData != null && sentCurData.isAccessible()) {
|
||||
sentCurData.close();
|
||||
}
|
||||
sentPrevData = prevData == null ? null : prevData.copy();
|
||||
sentCurData = newData == null ? null : newData.copy();
|
||||
if (!committedSuccessfully) {
|
||||
tx.undoGetForUpdate(cfh, keyArray);
|
||||
tx.rollback();
|
||||
if (sentPrevData != null) {
|
||||
if (sentPrevData != null && sentPrevData.isAccessible()) {
|
||||
sentPrevData.close();
|
||||
}
|
||||
if (sentCurData != null) {
|
||||
if (sentCurData != null && sentCurData.isAccessible()) {
|
||||
sentCurData.close();
|
||||
}
|
||||
retries++;
|
||||
@ -220,7 +232,15 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
|
||||
}
|
||||
yield new UpdateAtomicResultPrevious(sentPrevData);
|
||||
}
|
||||
case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed);
|
||||
case BINARY_CHANGED -> {
|
||||
if (sentPrevData != null) {
|
||||
sentPrevData.close();
|
||||
}
|
||||
if (sentCurData != null) {
|
||||
sentCurData.close();
|
||||
}
|
||||
yield new UpdateAtomicResultBinaryChanged(changed);
|
||||
}
|
||||
case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(sentPrevData, sentCurData));
|
||||
};
|
||||
}
|
||||
|
@ -95,7 +95,14 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<Transa
|
||||
prevDataToSendToUpdater = null;
|
||||
}
|
||||
|
||||
@Nullable Buffer newData = applyUpdateAndCloseIfNecessary(updater, prevDataToSendToUpdater);
|
||||
@Nullable Buffer newData;
|
||||
try {
|
||||
newData = updater.apply(prevDataToSendToUpdater);
|
||||
} finally {
|
||||
if (prevDataToSendToUpdater != null && prevDataToSendToUpdater.isAccessible()) {
|
||||
prevDataToSendToUpdater.close();
|
||||
}
|
||||
}
|
||||
try (newData) {
|
||||
var newDataArray = newData == null ? null : LLUtils.toArray(newData);
|
||||
if (logger.isTraceEnabled()) {
|
||||
@ -160,7 +167,15 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<Transa
|
||||
}
|
||||
yield new UpdateAtomicResultPrevious(sentPrevData);
|
||||
}
|
||||
case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed);
|
||||
case BINARY_CHANGED -> {
|
||||
if (sentPrevData != null) {
|
||||
sentPrevData.close();
|
||||
}
|
||||
if (sentCurData != null) {
|
||||
sentCurData.close();
|
||||
}
|
||||
yield new UpdateAtomicResultBinaryChanged(changed);
|
||||
}
|
||||
case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(sentPrevData, sentCurData));
|
||||
};
|
||||
}
|
||||
|
@ -67,7 +67,14 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB>
|
||||
prevDataToSendToUpdater = null;
|
||||
}
|
||||
|
||||
@Nullable Buffer newData = applyUpdateAndCloseIfNecessary(updater, prevDataToSendToUpdater);
|
||||
@Nullable Buffer newData;
|
||||
try {
|
||||
newData = updater.apply(prevDataToSendToUpdater);
|
||||
} finally {
|
||||
if (prevDataToSendToUpdater != null && prevDataToSendToUpdater.isAccessible()) {
|
||||
prevDataToSendToUpdater.close();
|
||||
}
|
||||
}
|
||||
try (newData) {
|
||||
boolean changed;
|
||||
assert newData == null || newData.isAccessible();
|
||||
@ -112,18 +119,13 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB>
|
||||
}
|
||||
recordAtomicUpdateTime(changed, prevData != null, newData != null, initNanoTime);
|
||||
return switch (returnMode) {
|
||||
case NOTHING -> {
|
||||
yield RESULT_NOTHING;
|
||||
}
|
||||
case CURRENT -> {
|
||||
yield new UpdateAtomicResultCurrent(newData != null ? newData.copy() : null);
|
||||
}
|
||||
case PREVIOUS -> {
|
||||
yield new UpdateAtomicResultPrevious(prevData != null ? prevData.copy() : null);
|
||||
}
|
||||
case NOTHING -> RESULT_NOTHING;
|
||||
case CURRENT -> new UpdateAtomicResultCurrent(newData != null ? newData.copy() : null);
|
||||
case PREVIOUS -> new UpdateAtomicResultPrevious(prevData != null ? prevData.copy() : null);
|
||||
case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed);
|
||||
case DELTA -> new UpdateAtomicResultDelta(LLDelta
|
||||
.of(prevData != null ? prevData.copy() : null, newData != null ? newData.copy() : null));
|
||||
case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(
|
||||
prevData != null ? prevData.copy() : null,
|
||||
newData != null ? newData.copy() : null));
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,6 @@
|
||||
package it.cavallium.dbengine.database.disk;
|
||||
|
||||
public sealed interface UpdateAtomicResult permits UpdateAtomicResultBinaryChanged, UpdateAtomicResultDelta,
|
||||
UpdateAtomicResultNothing, UpdateAtomicResultPrevious, UpdateAtomicResultCurrent {}
|
||||
import it.cavallium.dbengine.database.SafeCloseable;
|
||||
|
||||
public sealed interface UpdateAtomicResult extends SafeCloseable permits UpdateAtomicResultBinaryChanged,
|
||||
UpdateAtomicResultDelta, UpdateAtomicResultNothing, UpdateAtomicResultPrevious, UpdateAtomicResultCurrent {}
|
||||
|
@ -1,3 +1,9 @@
|
||||
package it.cavallium.dbengine.database.disk;
|
||||
|
||||
public final record UpdateAtomicResultBinaryChanged(boolean changed) implements UpdateAtomicResult {}
|
||||
public record UpdateAtomicResultBinaryChanged(boolean changed) implements UpdateAtomicResult {
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -3,4 +3,12 @@ package it.cavallium.dbengine.database.disk;
|
||||
import io.netty5.buffer.api.Buffer;
|
||||
import io.netty5.buffer.api.Send;
|
||||
|
||||
public final record UpdateAtomicResultCurrent(Buffer current) implements UpdateAtomicResult {}
|
||||
public record UpdateAtomicResultCurrent(Buffer current) implements UpdateAtomicResult {
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (current != null && current.isAccessible()) {
|
||||
current.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3,4 +3,12 @@ package it.cavallium.dbengine.database.disk;
|
||||
import io.netty5.buffer.api.Send;
|
||||
import it.cavallium.dbengine.database.LLDelta;
|
||||
|
||||
public final record UpdateAtomicResultDelta(LLDelta delta) implements UpdateAtomicResult {}
|
||||
public record UpdateAtomicResultDelta(LLDelta delta) implements UpdateAtomicResult {
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (delta != null && delta.isAccessible()) {
|
||||
delta.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,3 +1,9 @@
|
||||
package it.cavallium.dbengine.database.disk;
|
||||
|
||||
public final class UpdateAtomicResultNothing implements UpdateAtomicResult {}
|
||||
public record UpdateAtomicResultNothing() implements UpdateAtomicResult {
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -3,4 +3,12 @@ package it.cavallium.dbengine.database.disk;
|
||||
import io.netty5.buffer.api.Buffer;
|
||||
import io.netty5.buffer.api.Send;
|
||||
|
||||
public final record UpdateAtomicResultPrevious(Buffer previous) implements UpdateAtomicResult {}
|
||||
public record UpdateAtomicResultPrevious(Buffer previous) implements UpdateAtomicResult {
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (previous != null && previous.isAccessible()) {
|
||||
previous.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -77,7 +77,9 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
|
||||
var keyBuf = serializeKey(element);
|
||||
try (keyBuf) {
|
||||
try (var readOptions = newReadOptions(); var writeOptions = newWriteOptions()) {
|
||||
rocksDB.updateAtomic(readOptions, writeOptions, keyBuf, this::incrementOrAdd, UpdateAtomicResultMode.NOTHING);
|
||||
rocksDB
|
||||
.updateAtomic(readOptions, writeOptions, keyBuf, this::incrementOrAdd, UpdateAtomicResultMode.NOTHING)
|
||||
.close();
|
||||
}
|
||||
++size;
|
||||
} catch (IOException e) {
|
||||
@ -136,7 +138,9 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
|
||||
if (it.isValid()) {
|
||||
var key = it.key();
|
||||
try (var keyBuf = rocksDB.getAllocator().copyOf(key)) {
|
||||
rocksDB.updateAtomic(readOptions, writeOptions, keyBuf, this::reduceOrRemove, UpdateAtomicResultMode.NOTHING);
|
||||
rocksDB
|
||||
.updateAtomic(readOptions, writeOptions, keyBuf, this::reduceOrRemove, UpdateAtomicResultMode.NOTHING)
|
||||
.close();
|
||||
--size;
|
||||
return deserializeKey(keyBuf);
|
||||
}
|
||||
@ -210,22 +214,17 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
|
||||
try (var readOptions = newReadOptions();
|
||||
var writeOptions = newWriteOptions();
|
||||
var keyBuf = serializeKey(element)) {
|
||||
UpdateAtomicResultPrevious prev = (UpdateAtomicResultPrevious) rocksDB.updateAtomic(readOptions, writeOptions,
|
||||
try (var prev = (UpdateAtomicResultPrevious) rocksDB.updateAtomic(readOptions, writeOptions,
|
||||
keyBuf,
|
||||
this::reduceOrRemove,
|
||||
UpdateAtomicResultMode.PREVIOUS
|
||||
);
|
||||
try {
|
||||
)) {
|
||||
if (prev.previous() != null) {
|
||||
--size;
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} finally {
|
||||
if (prev.previous() != null) {
|
||||
prev.previous().close();
|
||||
}
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
throw new IllegalStateException(ex);
|
||||
|
Loading…
Reference in New Issue
Block a user