From d25311123358e8041004fa08d16aff6dd7a559f4 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 20 May 2022 18:31:05 +0200 Subject: [PATCH] Fix some memory leaks --- .../cavallium/dbengine/database/LLUtils.java | 134 +--- .../collections/DatabaseMapDictionary.java | 114 ++-- .../database/disk/AbstractRocksDBColumn.java | 15 - .../database/disk/LLLocalDictionary.java | 619 ++++++++---------- .../database/disk/LLLocalSingleton.java | 13 +- .../disk/OptimisticRocksDBColumn.java | 34 +- .../disk/PessimisticRocksDBColumn.java | 19 +- .../database/disk/StandardRocksDBColumn.java | 26 +- .../database/disk/UpdateAtomicResult.java | 6 +- .../disk/UpdateAtomicResultBinaryChanged.java | 8 +- .../disk/UpdateAtomicResultCurrent.java | 10 +- .../disk/UpdateAtomicResultDelta.java | 10 +- .../disk/UpdateAtomicResultNothing.java | 8 +- .../disk/UpdateAtomicResultPrevious.java | 10 +- .../dbengine/lucene/HugePqPriorityQueue.java | 17 +- 15 files changed, 476 insertions(+), 567 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index f81e88a..ccdaf7d 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -538,23 +538,6 @@ public class LLUtils { } } - // todo: remove this ugly method - /** - * cleanup resource - * @param cleanupOnSuccess if true the resource will be cleaned up if the function is successful - */ - public static > Mono usingSend(Mono> resourceSupplier, - Function, Mono> resourceClosure, - boolean cleanupOnSuccess) { - return Mono.usingWhen(resourceSupplier, resourceClosure, r -> { - if (cleanupOnSuccess) { - return Mono.fromRunnable(() -> r.close()); - } else { - return Mono.empty(); - } - }, (r, ex) -> Mono.fromRunnable(() -> r.close()), r -> Mono.fromRunnable(() -> r.close())); - } - // todo: remove this ugly method /** * cleanup resource @@ -613,36 +596,6 @@ public class LLUtils { })); } - // todo: remove this ugly method - /** - * cleanup resource - * @param cleanupOnSuccess if true the resource will be cleaned up if the function is successful - */ - public static , V extends T> Flux usingEachResource(Flux resourceSupplier, - Function> resourceClosure, - boolean cleanupOnSuccess) { - return resourceSupplier - .concatMap(resource -> Mono.usingWhen(Mono.just(resource), resourceClosure, r -> { - if (cleanupOnSuccess) { - return Mono.fromRunnable(() -> { - if (r.isAccessible()) { - r.close(); - } - }); - } else { - return Mono.empty(); - } - }, (r, ex) -> Mono.fromRunnable(() -> { - if (r.isAccessible()) { - r.close(); - } - }), r -> Mono.fromRunnable(() -> { - if (r.isAccessible()) { - r.close(); - } - }))); - } - // todo: remove this ugly method /** * cleanup resource @@ -668,35 +621,6 @@ public class LLUtils { })); } - // todo: remove this ugly method - /** - * cleanup resource - * @param cleanupOnSuccess if true the resource will be cleaned up if the function is successful - */ - public static > Flux usingSendResources(Mono> resourceSupplier, - Function> resourceClosure, - boolean cleanupOnSuccess) { - return Flux.usingWhen(resourceSupplier.map(Send::receive), resourceClosure, r -> { - if (cleanupOnSuccess) { - return Mono.fromRunnable(() -> { - if (r.isAccessible()) { - r.close(); - } - }); - } else { - return Mono.empty(); - } - }, (r, ex) -> Mono.fromRunnable(() -> { - if (r.isAccessible()) { - r.close(); - } - }), r -> Mono.fromRunnable(() -> { - if (r.isAccessible()) { - r.close(); - } - })); - } - public static boolean isSet(ScoreDoc[] scoreDocs) { for (ScoreDoc scoreDoc : scoreDocs) { if (scoreDoc == null) { @@ -873,28 +797,40 @@ public class LLUtils { } public static Mono resolveLLDelta(Mono prev, UpdateReturnMode updateReturnMode) { - return prev.handle((deltaToReceive, sink) -> { - try (var delta = deltaToReceive) { - switch (updateReturnMode) { - case GET_NEW_VALUE -> { - var current = delta.currentUnsafe(); - if (current != null) { - sink.next(current.copy()); - } else { - sink.complete(); - } + return prev.handle((delta, sink) -> { + final Buffer previous = delta.previousUnsafe(); + final Buffer current = delta.currentUnsafe(); + switch (updateReturnMode) { + case GET_NEW_VALUE -> { + if (previous != null && previous.isAccessible()) { + previous.close(); } - case GET_OLD_VALUE -> { - var previous = delta.previousUnsafe(); - if (previous != null) { - sink.next(previous.copy()); - } else { - sink.complete(); - } + if (current != null) { + sink.next(current); + } else { + sink.complete(); } - case NOTHING -> sink.complete(); - default -> sink.error(new IllegalStateException()); } + case GET_OLD_VALUE -> { + if (current != null && current.isAccessible()) { + current.close(); + } + if (previous != null) { + sink.next(previous); + } else { + sink.complete(); + } + } + case NOTHING -> { + if (previous != null && previous.isAccessible()) { + previous.close(); + } + if (current != null && current.isAccessible()) { + current.close(); + } + sink.complete(); + } + default -> sink.error(new IllegalStateException()); } }); } @@ -985,7 +921,7 @@ public class LLUtils { private static void onNextDropped(Object next) { if (next instanceof Send send) { send.close(); - } else if (next instanceof Resource resource) { + } else if (next instanceof Resource resource && resource.isAccessible()) { resource.close(); } else if (next instanceof Iterable iterable) { iterable.forEach(LLUtils::onNextDropped); @@ -995,12 +931,6 @@ public class LLUtils { if (rocksObj.isOwningHandle()) { rocksObj.close(); } - } else if (next instanceof UpdateAtomicResultDelta delta) { - delta.delta().close(); - } else if (next instanceof UpdateAtomicResultCurrent cur) { - cur.current().close(); - } else if (next instanceof UpdateAtomicResultPrevious cur) { - cur.previous().close(); } else if (next instanceof Optional optional) { optional.ifPresent(LLUtils::onNextDropped); } else if (next instanceof Map.Entry entry) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index c819e45..fe6f594 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -63,8 +63,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep keySerializer, Serializer valueSerializer, Runnable onClose) { - return new DatabaseMapDictionary<>(dictionary, null, keySerializer, - valueSerializer, onClose); + return new DatabaseMapDictionary<>(dictionary, null, keySerializer, valueSerializer, onClose); } public static DatabaseMapDictionary tail(LLDictionary dictionary, @@ -128,33 +127,31 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep sink) { - try (value) { - try { - sink.next(valueSerializer.deserialize(value)); - } catch (IndexOutOfBoundsException ex) { - var exMessage = ex.getMessage(); - if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) { - var totalZeroBytesErrors = this.totalZeroBytesErrors.incrementAndGet(); - if (totalZeroBytesErrors < 512 || totalZeroBytesErrors % 10000 == 0) { - try (var keySuffixBytes = serializeKeySuffixToKey(keySuffix)) { - LOG.error("Unexpected zero-bytes value at " + dictionary.getDatabaseName() - + ":" + dictionary.getColumnName() - + ":" + LLUtils.toStringSafe(this.keyPrefix) - + ":" + keySuffix + "(" + LLUtils.toStringSafe(keySuffixBytes) + ") total=" + totalZeroBytesErrors); - } catch (SerializationException e) { - LOG.error("Unexpected zero-bytes value at " + dictionary.getDatabaseName() - + ":" + dictionary.getColumnName() - + ":" + LLUtils.toStringSafe(this.keyPrefix) - + ":" + keySuffix + "(?) total=" + totalZeroBytesErrors); - } + try { + sink.next(valueSerializer.deserialize(value)); + } catch (IndexOutOfBoundsException ex) { + var exMessage = ex.getMessage(); + if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) { + var totalZeroBytesErrors = this.totalZeroBytesErrors.incrementAndGet(); + if (totalZeroBytesErrors < 512 || totalZeroBytesErrors % 10000 == 0) { + try (var keySuffixBytes = serializeKeySuffixToKey(keySuffix)) { + LOG.error("Unexpected zero-bytes value at " + dictionary.getDatabaseName() + + ":" + dictionary.getColumnName() + + ":" + LLUtils.toStringSafe(this.keyPrefix) + + ":" + keySuffix + "(" + LLUtils.toStringSafe(keySuffixBytes) + ") total=" + totalZeroBytesErrors); + } catch (SerializationException e) { + LOG.error("Unexpected zero-bytes value at " + dictionary.getDatabaseName() + + ":" + dictionary.getColumnName() + + ":" + LLUtils.toStringSafe(this.keyPrefix) + + ":" + keySuffix + "(?) total=" + totalZeroBytesErrors); } - sink.complete(); - } else { - sink.error(ex); } - } catch (Throwable ex) { + sink.complete(); + } else { sink.error(ex); } + } catch (Throwable ex) { + sink.error(ex); } } @@ -282,10 +279,12 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep getValue(@Nullable CompositeSnapshot snapshot, T keySuffix, boolean existsAlmostCertainly) { return dictionary - .get(resolveSnapshot(snapshot), - Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)) - ) - .handle((valueToReceive, sink) -> deserializeValue(keySuffix, valueToReceive, sink)); + .get(resolveSnapshot(snapshot), Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix))) + .handle((valueToReceive, sink) -> { + try (valueToReceive) { + deserializeValue(keySuffix, valueToReceive, sink); + } + }); } @Override @@ -304,20 +303,22 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep updateValue(T keySuffix, - UpdateReturnMode updateReturnMode, + public Mono updateValue(T keySuffix, UpdateReturnMode updateReturnMode, SerializationFunction<@Nullable U, @Nullable U> updater) { var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)); return dictionary .update(keyMono, getSerializedUpdater(updater), updateReturnMode) - .handle((valueToReceive, sink) -> deserializeValue(keySuffix, valueToReceive, sink)); + .handle((valueToReceive, sink) -> { + try (valueToReceive) { + deserializeValue(keySuffix, valueToReceive, sink); + } + }); } @Override - public Mono> updateValueAndGetDelta(T keySuffix, - SerializationFunction<@Nullable U, @Nullable U> updater) { + public Mono> updateValueAndGetDelta(T keySuffix, SerializationFunction<@Nullable U, @Nullable U> updater) { var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)); - return dictionary + return dictionary .updateAndGetDelta(keyMono, getSerializedUpdater(updater)) .transform(mono -> LLUtils.mapLLDelta(mono, serialized -> { try (serialized) { @@ -326,21 +327,18 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep updater) { + public BinarySerializationFunction getSerializedUpdater(SerializationFunction<@Nullable U, @Nullable U> updater) { return oldSerialized -> { - try (oldSerialized) { - U result; - if (oldSerialized == null) { - result = updater.apply(null); - } else { - result = updater.apply(valueSerializer.deserialize(oldSerialized)); - } - if (result == null) { - return null; - } else { - return serializeValue(result); - } + U result; + if (oldSerialized == null) { + result = updater.apply(null); + } else { + result = updater.apply(valueSerializer.deserialize(oldSerialized)); + } + if (result == null) { + return null; + } else { + return serializeValue(result); } }; } @@ -372,7 +370,11 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep serializeValue(value)); return dictionary .put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE) - .handle((valueToReceive, sink) -> deserializeValue(keySuffix, valueToReceive, sink)); + .handle((valueToReceive, sink) -> { + try (valueToReceive) { + deserializeValue(keySuffix, valueToReceive, sink); + } + }); } @Override @@ -381,7 +383,11 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep serializeValue(value)); return dictionary .put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE) - .handle((Buffer valueBuf, SynchronousSink sink) -> deserializeValue(keySuffix, valueBuf, sink)) + .handle((Buffer valueBuf, SynchronousSink sink) -> { + try (valueBuf) { + deserializeValue(keySuffix, valueBuf, sink); + } + }) .map(oldValue -> !Objects.equals(oldValue, value)) .defaultIfEmpty(value != null); } @@ -400,7 +406,11 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep serializeKeySuffixToKey(keySuffix)); return dictionary .remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE) - .handle((valueToReceive, sink) -> deserializeValue(keySuffix, valueToReceive, sink)); + .handle((valueToReceive, sink) -> { + try (valueToReceive) { + deserializeValue(keySuffix, valueToReceive, sink); + } + }); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java index 353b111..599e1c0 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java @@ -925,21 +925,6 @@ public sealed abstract class AbstractRocksDBColumn implements } } - protected final Buffer applyUpdateAndCloseIfNecessary(BinarySerializationFunction updater, - @Nullable Buffer prevDataToSendToUpdater) - throws SerializationException { - @Nullable Buffer newData = null; - try { - newData = updater.apply(prevDataToSendToUpdater); - } finally { - if (prevDataToSendToUpdater != newData && prevDataToSendToUpdater != null - && prevDataToSendToUpdater.isAccessible()) { - prevDataToSendToUpdater.close(); - } - } - return newData; - } - protected int getLevels() { var closeReadLock = closeLock.readLock(); try { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 285c3fc..881ca2d 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -257,87 +257,73 @@ public class LLLocalDictionary implements LLDictionary { @Override public Mono get(@Nullable LLSnapshot snapshot, Mono keyMono) { - return keyMono - .publishOn(dbRScheduler) - .handle((key, sink) -> { - try (key) { - logger.trace(MARKER_ROCKSDB, "Reading {}", () -> toStringSafe(key)); - try { - var readOptions = generateReadOptionsOrStatic(snapshot); - Buffer result; - startedGet.increment(); - try { - result = getTime.recordCallable(() -> db.get(readOptions, key)); - } finally { - endedGet.increment(); - if (readOptions != EMPTY_READ_OPTIONS) { - readOptions.close(); - } - } - logger.trace(MARKER_ROCKSDB, "Read {}: {}", () -> toStringSafe(key), () -> toStringSafe(result)); - if (result != null) { - sink.next(result); - } else { - sink.complete(); - } - } catch (RocksDBException ex) { - sink.error(new IOException("Failed to read " + toStringSafe(key) + ": " + ex.getMessage())); - } catch (Exception ex) { - sink.error(ex); - } + return Mono.usingWhen(keyMono, key -> runOnDb(false, () -> { + logger.trace(MARKER_ROCKSDB, "Reading {}", () -> toStringSafe(key)); + try { + var readOptions = generateReadOptionsOrStatic(snapshot); + Buffer result; + startedGet.increment(); + try { + result = getTime.recordCallable(() -> db.get(readOptions, key)); + } finally { + endedGet.increment(); + if (readOptions != EMPTY_READ_OPTIONS) { + readOptions.close(); } - }); + } + logger.trace(MARKER_ROCKSDB, "Read {}: {}", () -> toStringSafe(key), () -> toStringSafe(result)); + return result; + } catch (RocksDBException ex) { + throw new IOException("Failed to read " + toStringSafe(key) + ": " + ex.getMessage()); + } + }), key -> Mono.fromRunnable(key::close)); } @Override public Mono isRangeEmpty(@Nullable LLSnapshot snapshot, Mono rangeMono, boolean fillCache) { - return rangeMono.publishOn(dbRScheduler).handle((range, sink) -> { - try (range) { - assert !Schedulers.isInNonBlockingThread() : "Called isRangeEmpty in a nonblocking thread"; - startedContains.increment(); - try { - Boolean isRangeEmpty = containsTime.recordCallable(() -> { - if (range.isSingle()) { - return !containsKey(snapshot, range.getSingleUnsafe()); - } else { - // Temporary resources to release after finished + return Mono.usingWhen(rangeMono, range -> runOnDb(false, () -> { + assert !Schedulers.isInNonBlockingThread() : "Called isRangeEmpty in a nonblocking thread"; + startedContains.increment(); + try { + Boolean isRangeEmpty = containsTime.recordCallable(() -> { + if (range.isSingle()) { + return !containsKey(snapshot, range.getSingleUnsafe()); + } else { + // Temporary resources to release after finished - try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot), - true, - isBoundedRange(range), - true - )) { - readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); - readOpts.setFillCache(fillCache); - try (var rocksIterator = db.newIterator(readOpts, range.getMinUnsafe(), range.getMaxUnsafe())) { - if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) { - if (nettyDirect && isReadOnlyDirect(range.getMinUnsafe())) { - var seekBuf = ((ReadableComponent) range.getMinUnsafe()).readableBuffer(); - rocksIterator.seek(seekBuf); - } else { - var seekArray = LLUtils.toArray(range.getMinUnsafe()); - rocksIterator.seek(seekArray); - } + try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot), + true, + isBoundedRange(range), + true + )) { + readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); + readOpts.setFillCache(fillCache); + try (var rocksIterator = db.newIterator(readOpts, range.getMinUnsafe(), range.getMaxUnsafe())) { + if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) { + if (nettyDirect && isReadOnlyDirect(range.getMinUnsafe())) { + var seekBuf = ((ReadableComponent) range.getMinUnsafe()).readableBuffer(); + rocksIterator.seek(seekBuf); } else { - rocksIterator.seekToFirst(); + var seekArray = LLUtils.toArray(range.getMinUnsafe()); + rocksIterator.seek(seekArray); } - return !rocksIterator.isValid(); + } else { + rocksIterator.seekToFirst(); } + return !rocksIterator.isValid(); } } - }); - assert isRangeEmpty != null; - sink.next(isRangeEmpty); - } catch (RocksDBException ex) { - sink.error(new RocksDBException("Failed to read range " + LLUtils.toStringSafe(range) - + ": " + ex.getMessage())); - } finally { - endedContains.increment(); - } - } catch (Throwable ex) { - sink.error(ex); + } + }); + assert isRangeEmpty != null; + return isRangeEmpty; + } catch (RocksDBException ex) { + throw new RocksDBException("Failed to read range " + LLUtils.toStringSafe(range) + + ": " + ex.getMessage()); + } finally { + endedContains.increment(); } - }); + }), range -> Mono.fromRunnable(range::close)); } private boolean containsKey(@Nullable LLSnapshot snapshot, Buffer key) throws RocksDBException { @@ -375,24 +361,16 @@ public class LLLocalDictionary implements LLDictionary { v.touch("put entry value") )); // Write the new entry to the database - Mono putMono = entryMono - .publishOn(dbWScheduler) - .handle((entry, sink) -> { - try { - try (entry) { - var key = entry.getKeyUnsafe(); - var value = entry.getValueUnsafe(); - assert key != null : "Key is null"; - assert value != null : "Value is null"; - key.touch("Dictionary put key"); - value.touch("Dictionary put value"); - put(key, value); - } - sink.complete(); - } catch (Throwable ex) { - sink.error(ex); - } - }); + Mono putMono = Mono.usingWhen(entryMono, entry -> runOnDb(true, () -> { + var key = entry.getKeyUnsafe(); + var value = entry.getValueUnsafe(); + assert key != null : "Key is null"; + assert value != null : "Value is null"; + key.touch("Dictionary put key"); + value.touch("Dictionary put value"); + put(key, value); + return null; + }), entry -> Mono.fromRunnable(entry::close)); // Read the previous data, then write the new data, then return the previous data return Flux.concatDelayError(Flux.just(previousDataMono, putMono), true, 1).singleOrEmpty(); } @@ -433,87 +411,67 @@ public class LLLocalDictionary implements LLDictionary { public Mono update(Mono keyMono, BinarySerializationFunction updater, UpdateReturnMode updateReturnMode) { - return keyMono - .publishOn(dbWScheduler) - .handle((key, sink) -> { - try (key) { - assert !Schedulers.isInNonBlockingThread() : "Called update in a nonblocking thread"; - if (updateMode == UpdateMode.DISALLOW) { - sink.error(new UnsupportedOperationException("update() is disallowed")); - return; - } - UpdateAtomicResultMode returnMode = switch (updateReturnMode) { - case NOTHING -> UpdateAtomicResultMode.NOTHING; - case GET_NEW_VALUE -> UpdateAtomicResultMode.CURRENT; - case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS; - }; - UpdateAtomicResult result; - var readOptions = generateReadOptionsOrStatic(null); - startedUpdates.increment(); - try (var writeOptions = new WriteOptions()) { - result = updateTime.recordCallable(() -> - db.updateAtomic(readOptions, writeOptions, key, updater, returnMode)); - } finally { - endedUpdates.increment(); - if (readOptions != EMPTY_READ_OPTIONS) { - readOptions.close(); - } - } - assert result != null; - var previous = switch (updateReturnMode) { - case NOTHING -> null; - case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current(); - case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous(); - }; - if (previous != null) { - sink.next(previous); - } else { - sink.complete(); - } - } catch (Throwable ex) { - sink.error(ex); - } - }); + return Mono.usingWhen(keyMono, key -> runOnDb(true, () -> { + assert !Schedulers.isInNonBlockingThread() : "Called update in a nonblocking thread"; + if (updateMode == UpdateMode.DISALLOW) { + throw new UnsupportedOperationException("update() is disallowed"); + } + UpdateAtomicResultMode returnMode = switch (updateReturnMode) { + case NOTHING -> UpdateAtomicResultMode.NOTHING; + case GET_NEW_VALUE -> UpdateAtomicResultMode.CURRENT; + case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS; + }; + UpdateAtomicResult result; + var readOptions = generateReadOptionsOrStatic(null); + startedUpdates.increment(); + try (var writeOptions = new WriteOptions()) { + result = updateTime.recordCallable(() -> db.updateAtomic(readOptions, writeOptions, key, updater, returnMode)); + } finally { + endedUpdates.increment(); + if (readOptions != EMPTY_READ_OPTIONS) { + readOptions.close(); + } + } + assert result != null; + return switch (updateReturnMode) { + case NOTHING -> { + result.close(); + yield null; + } + case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current(); + case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous(); + }; + }), key -> Mono.fromRunnable(key::close)); } @SuppressWarnings("DuplicatedCode") @Override - public Mono updateAndGetDelta(Mono keyMono, - BinarySerializationFunction updater) { - return keyMono - .publishOn(dbWScheduler) - .handle((key, sink) -> { - try (key) { - key.touch("low-level dictionary update"); - assert !Schedulers.isInNonBlockingThread() : "Called update in a nonblocking thread"; - if (updateMode == UpdateMode.DISALLOW) { - sink.error(new UnsupportedOperationException("update() is disallowed")); - return; - } - if (updateMode == UpdateMode.ALLOW && !db.supportsTransactions()) { - sink.error(new UnsupportedOperationException("update() is disallowed because the database doesn't support" - + "safe atomic operations")); - return; - } + public Mono updateAndGetDelta(Mono keyMono, BinarySerializationFunction updater) { + return Mono.usingWhen(keyMono, key -> runOnDb(true, () -> { + key.touch("low-level dictionary update"); + assert !Schedulers.isInNonBlockingThread() : "Called update in a nonblocking thread"; + if (updateMode == UpdateMode.DISALLOW) { + throw new UnsupportedOperationException("update() is disallowed"); + } + if (updateMode == UpdateMode.ALLOW && !db.supportsTransactions()) { + throw new UnsupportedOperationException("update() is disallowed because the database doesn't support" + + "safe atomic operations"); + } - UpdateAtomicResult result; - var readOptions = generateReadOptionsOrStatic(null); - startedUpdates.increment(); - try (var writeOptions = new WriteOptions()) { - result = updateTime.recordCallable(() -> - db.updateAtomic(readOptions, writeOptions, key, updater, DELTA)); - } finally { - endedUpdates.increment(); - if (readOptions != EMPTY_READ_OPTIONS) { - readOptions.close(); - } - } - assert result != null; - sink.next(((UpdateAtomicResultDelta) result).delta()); - } catch (Throwable ex) { - sink.error(ex); - } - }); + UpdateAtomicResult result; + var readOptions = generateReadOptionsOrStatic(null); + startedUpdates.increment(); + try (var writeOptions = new WriteOptions()) { + result = updateTime.recordCallable(() -> db.updateAtomic(readOptions, writeOptions, key, updater, DELTA)); + } finally { + endedUpdates.increment(); + if (readOptions != EMPTY_READ_OPTIONS) { + readOptions.close(); + } + } + assert result != null; + return ((UpdateAtomicResultDelta) result).delta(); + }), key -> Mono.fromRunnable(key::close)); } @Override @@ -521,67 +479,47 @@ public class LLLocalDictionary implements LLDictionary { // Obtain the previous value from the database Mono previousDataMono = this.getPreviousData(keyMono, resultType); // Delete the value from the database - Mono removeMono = keyMono - .publishOn(dbWScheduler) - .handle((key, sink) -> { - try (key) { - logger.trace(MARKER_ROCKSDB, "Deleting {}", () -> toStringSafe(key)); - startedRemove.increment(); - try (var writeOptions = new WriteOptions()) { - removeTime.recordCallable(() -> { - db.delete(writeOptions, key); - return null; - }); - } finally { - endedRemove.increment(); - } - sink.complete(); - } catch (RocksDBException ex) { - sink.error(new RocksDBException("Failed to delete: " + ex.getMessage())); - } catch (Throwable ex) { - sink.error(ex); - } - }); + Mono removeMono = Mono.usingWhen(keyMono, key -> runOnDb(true, () -> { + try { + logger.trace(MARKER_ROCKSDB, "Deleting {}", () -> toStringSafe(key)); + startedRemove.increment(); + try (var writeOptions = new WriteOptions()) { + removeTime.recordCallable(() -> { + db.delete(writeOptions, key); + return null; + }); + } finally { + endedRemove.increment(); + } + return null; + } catch (RocksDBException ex) { + throw new RocksDBException("Failed to delete: " + ex.getMessage()); + } + }), key -> Mono.fromRunnable(key::close)); // Read the previous data, then delete the data, then return the previous data return Flux.concat(previousDataMono, removeMono).singleOrEmpty(); } private Mono getPreviousData(Mono keyMono, LLDictionaryResultType resultType) { return switch (resultType) { - case PREVIOUS_VALUE_EXISTENCE -> keyMono - .publishOn(dbRScheduler) - .handle((key, sink) -> { - try (key) { - var contained = containsKey(null, key); - sink.next(LLUtils.booleanToResponseByteBuffer(alloc, contained)); - } catch (Throwable ex) { - sink.error(ex); - } - }); - case PREVIOUS_VALUE -> keyMono - .publishOn(dbRScheduler) - .handle((key, sink) -> { - try (key) { - assert !Schedulers.isInNonBlockingThread() : "Called getPreviousData in a nonblocking thread"; - Buffer result; - var readOptions = generateReadOptionsOrStatic(null); - try { - result = db.get(readOptions, key); - } finally { - if (readOptions != EMPTY_READ_OPTIONS) { - readOptions.close(); - } - } - logger.trace(MARKER_ROCKSDB, "Read {}: {}", () -> toStringSafe(key), () -> toStringSafe(result)); - if (result == null) { - sink.complete(); - } else { - sink.next(result); - } - } catch (Throwable ex) { - sink.error(ex); - } - }); + case PREVIOUS_VALUE_EXISTENCE -> Mono.usingWhen(keyMono, key -> runOnDb(false, () -> { + var contained = containsKey(null, key); + return LLUtils.booleanToResponseByteBuffer(alloc, contained); + }), key -> Mono.fromRunnable(key::close)); + case PREVIOUS_VALUE -> Mono.usingWhen(keyMono, key -> runOnDb(false, () -> { + assert !Schedulers.isInNonBlockingThread() : "Called getPreviousData in a nonblocking thread"; + Buffer result; + var readOptions = generateReadOptionsOrStatic(null); + try { + result = db.get(readOptions, key); + } finally { + if (readOptions != EMPTY_READ_OPTIONS) { + readOptions.close(); + } + } + logger.trace(MARKER_ROCKSDB, "Read {}: {}", () -> toStringSafe(key), () -> toStringSafe(result)); + return result; + }), key -> Mono.fromRunnable(key::close)); case VOID -> Mono.empty(); }; } @@ -818,7 +756,6 @@ public class LLLocalDictionary implements LLDictionary { }); } - @SuppressWarnings("resource") private Flux getRangeSingle(LLSnapshot snapshot, Mono keyMono) { return Mono .zip(keyMono, this.get(snapshot, keyMono)) @@ -963,20 +900,13 @@ public class LLLocalDictionary implements LLDictionary { } private Flux getRangeKeysSingle(LLSnapshot snapshot, Mono keyMono) { - return keyMono - .publishOn(dbRScheduler) - .handle((key, sink) -> { - try (key) { - if (containsKey(snapshot, key)) { - sink.next(key); - } else { - sink.complete(); - } - } catch (Throwable ex) { - sink.error(ex); - } - }) - .flux(); + return Mono.usingWhen(keyMono, key -> runOnDb(false, () -> { + if (containsKey(snapshot, key)) { + return key; + } else { + return null; + } + }), key -> Mono.fromRunnable(key::close)).flux(); } private record RocksObjTuple>(T t1, U t2) implements SafeCloseable { @@ -1006,30 +936,23 @@ public class LLLocalDictionary implements LLDictionary { @Override public Mono setRange(Mono rangeMono, Flux entries, boolean smallRange) { if (USE_WINDOW_IN_SET_RANGE) { - return rangeMono - .publishOn(dbWScheduler) - .handle((range, sink) -> { + return Mono + .usingWhen(rangeMono, range -> runOnDb(true, () -> { try (var writeOptions = new WriteOptions(); range) { assert !Schedulers.isInNonBlockingThread() : "Called setRange in a nonblocking thread"; if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) { - try (var opts = LLUtils.generateCustomReadOptions(null, - true, - isBoundedRange(range), - smallRange - )) { - SafeCloseable seekTo; - try (var it = db.newIterator(opts, range.getMinUnsafe(), range.getMaxUnsafe())) { - if (!PREFER_AUTO_SEEK_BOUND && range.hasMin()) { - it.seekTo(range.getMinUnsafe()); - } else { - seekTo = null; - it.seekToFirst(); - } - while (it.isValid()) { - db.delete(writeOptions, it.key()); - it.next(); - } + try (var opts = LLUtils.generateCustomReadOptions(null, true, isBoundedRange(range), smallRange)) { + try (var it = db.newIterator(opts, range.getMinUnsafe(), range.getMaxUnsafe())) { + if (!PREFER_AUTO_SEEK_BOUND && range.hasMin()) { + it.seekTo(range.getMinUnsafe()); + } else { + it.seekToFirst(); } + while (it.isValid()) { + db.delete(writeOptions, it.key()); + it.next(); + } + } } } else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) { try (var batch = new CappedWriteBatch(db, @@ -1057,60 +980,56 @@ public class LLLocalDictionary implements LLDictionary { batch.clear(); } } - sink.next(true); + return true; } catch (RocksDBException ex) { - sink.error(new RocksDBException("Failed to set a range: " + ex.getMessage())); + throw new RocksDBException("Failed to set a range: " + ex.getMessage()); } - }) + }), range -> Mono.fromRunnable(range::close)) .thenMany(entries.window(MULTI_GET_WINDOW)) .flatMap(keysWindowFlux -> keysWindowFlux .collectList() - .flatMap(entriesList -> this - .runOnDb(true, () -> { - try (var writeOptions = new WriteOptions()) { - if (!USE_WRITE_BATCHES_IN_SET_RANGE) { - for (LLEntry entry : entriesList) { - db.put(writeOptions, entry.getKeyUnsafe(), entry.getValueUnsafe()); - } - } else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) { - try (var batch = new CappedWriteBatch(db, - alloc, - CAPPED_WRITE_BATCH_CAP, - RESERVED_WRITE_BATCH_SIZE, - MAX_WRITE_BATCH_SIZE, - writeOptions - )) { - for (LLEntry entry : entriesList) { - if (nettyDirect) { - batch.put(cfh, entry.getKeyUnsafe().send(), entry.getValueUnsafe().send()); - } else { - batch.put(cfh, - LLUtils.toArray(entry.getKeyUnsafe()), - LLUtils.toArray(entry.getValueUnsafe()) - ); - } - } - batch.flush(); - } - } else { - try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) { - for (LLEntry entry : entriesList) { - batch.put(cfh, LLUtils.toArray(entry.getKeyUnsafe()), - LLUtils.toArray(entry.getValueUnsafe())); - } - db.write(writeOptions, batch); - batch.clear(); - } - } - return null; - } finally { - for (LLEntry entry : entriesList) { - entry.close(); - } + .flatMap(entriesList -> this.runOnDb(true, () -> { + try (var writeOptions = new WriteOptions()) { + if (!USE_WRITE_BATCHES_IN_SET_RANGE) { + for (LLEntry entry : entriesList) { + db.put(writeOptions, entry.getKeyUnsafe(), entry.getValueUnsafe()); } - }) - ) - ) + } else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) { + try (var batch = new CappedWriteBatch(db, + alloc, + CAPPED_WRITE_BATCH_CAP, + RESERVED_WRITE_BATCH_SIZE, + MAX_WRITE_BATCH_SIZE, + writeOptions + )) { + for (LLEntry entry : entriesList) { + if (nettyDirect) { + batch.put(cfh, entry.getKeyUnsafe().send(), entry.getValueUnsafe().send()); + } else { + batch.put(cfh, + LLUtils.toArray(entry.getKeyUnsafe()), + LLUtils.toArray(entry.getValueUnsafe()) + ); + } + } + batch.flush(); + } + } else { + try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) { + for (LLEntry entry : entriesList) { + batch.put(cfh, LLUtils.toArray(entry.getKeyUnsafe()), LLUtils.toArray(entry.getValueUnsafe())); + } + db.write(writeOptions, batch); + batch.clear(); + } + } + return null; + } finally { + for (LLEntry entry : entriesList) { + entry.close(); + } + } + }))) .then() .onErrorMap(cause -> new IOException("Failed to write range", cause)); } else { @@ -1131,19 +1050,16 @@ public class LLLocalDictionary implements LLDictionary { }) .then(Mono.empty()); - var putMono = entries - .publishOn(dbWScheduler) - .handle((entry, sink) -> { - try (entry) { - if (entry.getKeyUnsafe() != null && entry.getValueUnsafe() != null) { - this.put(entry.getKeyUnsafe(), entry.getValueUnsafe()); - } - sink.next(true); - } catch (RocksDBException ex) { - sink.error(new RocksDBException("Failed to write range: " + ex.getMessage())); - } - }) - .then(Mono.empty()); + var putMono = entries.publishOn(dbWScheduler).handle((entry, sink) -> { + try (entry) { + if (entry.getKeyUnsafe() != null && entry.getValueUnsafe() != null) { + this.put(entry.getKeyUnsafe(), entry.getValueUnsafe()); + } + sink.next(true); + } catch (RocksDBException ex) { + sink.error(new RocksDBException("Failed to write range: " + ex.getMessage())); + } + }).then(Mono.empty()); return deleteMono.then(putMono); } @@ -1263,11 +1179,11 @@ public class LLLocalDictionary implements LLDictionary { @Override public Mono sizeRange(@Nullable LLSnapshot snapshot, Mono rangeMono, boolean fast) { - return rangeMono.publishOn(dbRScheduler).handle((range, sink) -> { - try (range) { + return Mono.usingWhen(rangeMono, range -> runOnDb(false, () -> { + try { assert !Schedulers.isInNonBlockingThread() : "Called sizeRange in a nonblocking thread"; if (range.isAll()) { - sink.next(fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot)); + return fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot); } else { try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot), false, @@ -1291,20 +1207,20 @@ public class LLLocalDictionary implements LLDictionary { rocksIterator.next(); i++; } - sink.next(i); + return i; } } } } catch (RocksDBException ex) { - sink.error(new RocksDBException("Failed to get size of range: " + ex.getMessage())); + throw new RocksDBException("Failed to get size of range: " + ex.getMessage()); } - }); + }), range -> Mono.fromRunnable(range::close)); } @Override public Mono getOne(@Nullable LLSnapshot snapshot, Mono rangeMono) { - return rangeMono.publishOn(dbRScheduler).handle((range, sink) -> { - try (range) { + return Mono.usingWhen(rangeMono, range -> runOnDb(false, () -> { + try { assert !Schedulers.isInNonBlockingThread() : "Called getOne in a nonblocking thread"; try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot), true, true, true)) { try (var rocksIterator = db.newIterator(readOpts, range.getMinUnsafe(), range.getMaxUnsafe())) { @@ -1316,24 +1232,24 @@ public class LLLocalDictionary implements LLDictionary { if (rocksIterator.isValid()) { try (var key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)) { try (var value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value)) { - sink.next(LLEntry.of(key.touch("get-one key"), value.touch("get-one value"))); + return LLEntry.of(key.touch("get-one key"), value.touch("get-one value")); } } } else { - sink.complete(); + return null; } } } } catch (RocksDBException ex) { - sink.error(new RocksDBException("Failed to get one entry: " + ex.getMessage())); + throw new RocksDBException("Failed to get one entry: " + ex.getMessage()); } - }); + }), range -> Mono.fromRunnable(range::close)); } @Override public Mono getOneKey(@Nullable LLSnapshot snapshot, Mono rangeMono) { - return rangeMono.publishOn(dbRScheduler).handle((range, sink) -> { - try (range) { + return Mono.usingWhen(rangeMono, range -> runOnDb(false, () -> { + try { assert !Schedulers.isInNonBlockingThread() : "Called getOneKey in a nonblocking thread"; try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot), true, true, true)) { try (var rocksIterator = db.newIterator(readOpts, range.getMinUnsafe(), range.getMaxUnsafe())) { @@ -1343,16 +1259,16 @@ public class LLLocalDictionary implements LLDictionary { rocksIterator.seekToFirst(); } if (rocksIterator.isValid()) { - sink.next(LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)); + return LLUtils.readDirectNioBuffer(alloc, rocksIterator::key); } else { - sink.complete(); + return null; } } } } catch (RocksDBException ex) { - sink.error(new RocksDBException("Failed to get one key: " + ex.getMessage())); + throw new RocksDBException("Failed to get one key: " + ex.getMessage()); } - }); + }), range -> Mono.fromRunnable(range::close)); } private long fastSizeAll(@Nullable LLSnapshot snapshot) throws RocksDBException { @@ -1467,31 +1383,26 @@ public class LLLocalDictionary implements LLDictionary { @Override public Mono removeOne(Mono rangeMono) { - return rangeMono.publishOn(dbWScheduler).handle((range, sink) -> { - try (range) { - assert !Schedulers.isInNonBlockingThread() : "Called removeOne in a nonblocking thread"; - try (var readOpts = new ReadOptions(); - var writeOpts = new WriteOptions()) { - try (var rocksIterator = db.newIterator(readOpts, range.getMinUnsafe(), range.getMaxUnsafe())) { - if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) { - rocksIterator.seekTo(range.getMinUnsafe()); - } else { - rocksIterator.seekToFirst(); - } - if (!rocksIterator.isValid()) { - sink.complete(); - return; - } - Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key); - Buffer value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value); - db.delete(writeOpts, key); - sink.next(LLEntry.of(key, value)); + return Mono.usingWhen(rangeMono, range -> runOnDb(true, () -> { + assert !Schedulers.isInNonBlockingThread() : "Called removeOne in a nonblocking thread"; + try (var readOpts = new ReadOptions(); + var writeOpts = new WriteOptions()) { + try (var rocksIterator = db.newIterator(readOpts, range.getMinUnsafe(), range.getMaxUnsafe())) { + if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) { + rocksIterator.seekTo(range.getMinUnsafe()); + } else { + rocksIterator.seekToFirst(); } + if (!rocksIterator.isValid()) { + return null; + } + Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key); + Buffer value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value); + db.delete(writeOpts, key); + return LLEntry.of(key, value); } - } catch (RocksDBException ex) { - sink.error(ex); } - }); + }), range -> Mono.fromRunnable(range::close)); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java index b126673..3a8bebb 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java @@ -139,13 +139,14 @@ public class LLLocalSingleton implements LLSingleton { case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS; }; UpdateAtomicResult result; - try (key; - var readOptions = new ReadOptions(); - var writeOptions = new WriteOptions()) { + try (var readOptions = new ReadOptions(); var writeOptions = new WriteOptions()) { result = db.updateAtomic(readOptions, writeOptions, key, updater, returnMode); } return switch (updateReturnMode) { - case NOTHING -> null; + case NOTHING -> { + result.close(); + yield null; + } case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current(); case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous(); }; @@ -160,9 +161,7 @@ public class LLLocalSingleton implements LLSingleton { throw new UnsupportedOperationException("Called update in a nonblocking thread"); } UpdateAtomicResult result; - try (key; - var readOptions = new ReadOptions(); - var writeOptions = new WriteOptions()) { + try (var readOptions = new ReadOptions(); var writeOptions = new WriteOptions()) { result = db.updateAtomic(readOptions, writeOptions, key, updater, DELTA); } return ((UpdateAtomicResultDelta) result).delta(); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java index e156037..d7aff26 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java @@ -96,8 +96,8 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn new UpdateAtomicResultBinaryChanged(changed); + case BINARY_CHANGED -> { + if (sentPrevData != null) { + sentPrevData.close(); + } + if (sentCurData != null) { + sentCurData.close(); + } + yield new UpdateAtomicResultBinaryChanged(changed); + } case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(sentPrevData, sentCurData)); }; } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java index 5f72d01..f0d7873 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java @@ -95,7 +95,14 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn new UpdateAtomicResultBinaryChanged(changed); + case BINARY_CHANGED -> { + if (sentPrevData != null) { + sentPrevData.close(); + } + if (sentCurData != null) { + sentCurData.close(); + } + yield new UpdateAtomicResultBinaryChanged(changed); + } case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(sentPrevData, sentCurData)); }; } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/StandardRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/StandardRocksDBColumn.java index 721d77e..7f6dafa 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/StandardRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/StandardRocksDBColumn.java @@ -67,7 +67,14 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn prevDataToSendToUpdater = null; } - @Nullable Buffer newData = applyUpdateAndCloseIfNecessary(updater, prevDataToSendToUpdater); + @Nullable Buffer newData; + try { + newData = updater.apply(prevDataToSendToUpdater); + } finally { + if (prevDataToSendToUpdater != null && prevDataToSendToUpdater.isAccessible()) { + prevDataToSendToUpdater.close(); + } + } try (newData) { boolean changed; assert newData == null || newData.isAccessible(); @@ -112,18 +119,13 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn } recordAtomicUpdateTime(changed, prevData != null, newData != null, initNanoTime); return switch (returnMode) { - case NOTHING -> { - yield RESULT_NOTHING; - } - case CURRENT -> { - yield new UpdateAtomicResultCurrent(newData != null ? newData.copy() : null); - } - case PREVIOUS -> { - yield new UpdateAtomicResultPrevious(prevData != null ? prevData.copy() : null); - } + case NOTHING -> RESULT_NOTHING; + case CURRENT -> new UpdateAtomicResultCurrent(newData != null ? newData.copy() : null); + case PREVIOUS -> new UpdateAtomicResultPrevious(prevData != null ? prevData.copy() : null); case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed); - case DELTA -> new UpdateAtomicResultDelta(LLDelta - .of(prevData != null ? prevData.copy() : null, newData != null ? newData.copy() : null)); + case DELTA -> new UpdateAtomicResultDelta(LLDelta.of( + prevData != null ? prevData.copy() : null, + newData != null ? newData.copy() : null)); }; } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResult.java b/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResult.java index 4421a18..cca2cfc 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResult.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResult.java @@ -1,4 +1,6 @@ package it.cavallium.dbengine.database.disk; -public sealed interface UpdateAtomicResult permits UpdateAtomicResultBinaryChanged, UpdateAtomicResultDelta, - UpdateAtomicResultNothing, UpdateAtomicResultPrevious, UpdateAtomicResultCurrent {} +import it.cavallium.dbengine.database.SafeCloseable; + +public sealed interface UpdateAtomicResult extends SafeCloseable permits UpdateAtomicResultBinaryChanged, + UpdateAtomicResultDelta, UpdateAtomicResultNothing, UpdateAtomicResultPrevious, UpdateAtomicResultCurrent {} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResultBinaryChanged.java b/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResultBinaryChanged.java index ed69f4f..b51ffc8 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResultBinaryChanged.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResultBinaryChanged.java @@ -1,3 +1,9 @@ package it.cavallium.dbengine.database.disk; -public final record UpdateAtomicResultBinaryChanged(boolean changed) implements UpdateAtomicResult {} +public record UpdateAtomicResultBinaryChanged(boolean changed) implements UpdateAtomicResult { + + @Override + public void close() { + + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResultCurrent.java b/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResultCurrent.java index e022b7f..012faf1 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResultCurrent.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResultCurrent.java @@ -3,4 +3,12 @@ package it.cavallium.dbengine.database.disk; import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Send; -public final record UpdateAtomicResultCurrent(Buffer current) implements UpdateAtomicResult {} +public record UpdateAtomicResultCurrent(Buffer current) implements UpdateAtomicResult { + + @Override + public void close() { + if (current != null && current.isAccessible()) { + current.close(); + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResultDelta.java b/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResultDelta.java index 8451f3e..dc140ce 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResultDelta.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResultDelta.java @@ -3,4 +3,12 @@ package it.cavallium.dbengine.database.disk; import io.netty5.buffer.api.Send; import it.cavallium.dbengine.database.LLDelta; -public final record UpdateAtomicResultDelta(LLDelta delta) implements UpdateAtomicResult {} +public record UpdateAtomicResultDelta(LLDelta delta) implements UpdateAtomicResult { + + @Override + public void close() { + if (delta != null && delta.isAccessible()) { + delta.close(); + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResultNothing.java b/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResultNothing.java index cea4276..2d6c9f1 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResultNothing.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResultNothing.java @@ -1,3 +1,9 @@ package it.cavallium.dbengine.database.disk; -public final class UpdateAtomicResultNothing implements UpdateAtomicResult {} +public record UpdateAtomicResultNothing() implements UpdateAtomicResult { + + @Override + public void close() { + + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResultPrevious.java b/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResultPrevious.java index 99d4284..3397205 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResultPrevious.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/UpdateAtomicResultPrevious.java @@ -3,4 +3,12 @@ package it.cavallium.dbengine.database.disk; import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Send; -public final record UpdateAtomicResultPrevious(Buffer previous) implements UpdateAtomicResult {} +public record UpdateAtomicResultPrevious(Buffer previous) implements UpdateAtomicResult { + + @Override + public void close() { + if (previous != null && previous.isAccessible()) { + previous.close(); + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java b/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java index 913ce30..4aa0183 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java +++ b/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java @@ -77,7 +77,9 @@ public class HugePqPriorityQueue implements PriorityQueue, Reversable implements PriorityQueue, Reversable implements PriorityQueue, Reversable