diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java index c52a718..3f34070 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java @@ -41,7 +41,7 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure { Mono remove(Mono key, LLDictionaryResultType resultType); - Flux> getMulti(@Nullable LLSnapshot snapshot, Flux keys); + Flux getMulti(@Nullable LLSnapshot snapshot, Flux keys); Mono putMulti(Flux entries); diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 46bb712..2c5a1b3 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -16,10 +16,6 @@ import io.netty5.buffer.api.Send; import io.netty5.buffer.api.WritableComponent; import io.netty5.buffer.api.internal.Statics; import io.netty5.util.IllegalReferenceCountException; -import it.cavallium.dbengine.database.disk.UpdateAtomicResultCurrent; -import it.cavallium.dbengine.database.disk.UpdateAtomicResultDelta; -import it.cavallium.dbengine.database.disk.UpdateAtomicResultPrevious; -import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.lucene.RandomSortField; @@ -32,7 +28,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.ToIntFunction; @@ -538,64 +533,6 @@ public class LLUtils { } } - // todo: remove this ugly method - /** - * cleanup resource - * @param cleanupOnSuccess if true the resource will be cleaned up if the function is successful - */ - public static , V extends T> Mono usingResource(Mono resourceSupplier, - Function> resourceClosure, - boolean cleanupOnSuccess) { - return Mono.usingWhen(resourceSupplier, resourceClosure, r -> { - if (cleanupOnSuccess) { - return Mono.fromRunnable(() -> { - if (r.isAccessible()) { - r.close(); - } - }); - } else { - return Mono.empty(); - } - }, (r, ex) -> Mono.fromRunnable(() -> { - if (r.isAccessible()) { - r.close(); - } - }), r -> Mono.fromRunnable(() -> { - if (r.isAccessible()) { - r.close(); - } - })); - } - - // todo: remove this ugly method - /** - * cleanup resource - * @param cleanupOnSuccess if true the resource will be cleaned up if the function is successful - */ - public static , V extends T> Flux usingResources(Mono resourceSupplier, - Function> resourceClosure, - boolean cleanupOnSuccess) { - return Flux.usingWhen(resourceSupplier, resourceClosure, r -> { - if (cleanupOnSuccess) { - return Mono.fromRunnable(() -> { - if (r.isAccessible()) { - r.close(); - } - }); - } else { - return Mono.empty(); - } - }, (r, ex) -> Mono.fromRunnable(() -> { - if (r.isAccessible()) { - r.close(); - } - }), r -> Mono.fromRunnable(() -> { - if (r.isAccessible()) { - r.close(); - } - })); - } - // todo: remove this ugly method /** * cleanup resource @@ -687,6 +624,10 @@ public class LLUtils { return readOptions; } + public static Mono closeResource(Resource resource) { + return Mono.fromRunnable(resource::close); + } + @Deprecated public record DirectBuffer(@NotNull Buffer buffer, @NotNull ByteBuffer byteBuffer) {} diff --git a/src/main/java/it/cavallium/dbengine/database/OptionalBuf.java b/src/main/java/it/cavallium/dbengine/database/OptionalBuf.java new file mode 100644 index 0000000..1c43390 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/OptionalBuf.java @@ -0,0 +1,104 @@ +package it.cavallium.dbengine.database; + +import io.netty5.buffer.api.Buffer; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Function; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public final class OptionalBuf implements SafeCloseable { + + private static final OptionalBuf EMPTY = new OptionalBuf(null); + private final Buffer buffer; + + private OptionalBuf(@Nullable Buffer buffer) { + this.buffer = buffer; + } + + public static OptionalBuf ofNullable(@Nullable Buffer buffer) { + return new OptionalBuf(buffer); + } + + public static OptionalBuf of(@NotNull Buffer buffer) { + Objects.requireNonNull(buffer); + return new OptionalBuf(buffer); + } + + public static OptionalBuf empty() { + return EMPTY; + } + + @Override + public void close() { + if (buffer != null && buffer.isAccessible()) { + buffer.close(); + } + } + + @Override + public String toString() { + if (buffer != null) { + return buffer.toString(); + } else { + return "(empty)"; + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + OptionalBuf that = (OptionalBuf) o; + + return Objects.equals(buffer, that.buffer); + } + + @Override + public int hashCode() { + return buffer != null ? buffer.hashCode() : 0; + } + + public Buffer get() { + if (buffer == null) { + throw new NoSuchElementException(); + } + return buffer; + } + + public Buffer orElse(Buffer alternative) { + if (buffer == null) { + return alternative; + } + return buffer; + } + + public void ifPresent(Consumer consumer) { + if (buffer != null) { + consumer.accept(buffer); + } + } + + public boolean isPresent() { + return buffer != null; + } + + public boolean isEmpty() { + return buffer == null; + } + + public Optional map(Function mapper) { + if (buffer != null) { + return Optional.of(mapper.apply(buffer)); + } else { + return Optional.empty(); + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index a800428..755ceb9 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -209,23 +209,19 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { return dictionary .getRange(resolveSnapshot(snapshot), rangeMono, false, true) - .>handle((entry, sink) -> { + .map(entry -> { Entry deserializedEntry; - try { - try (entry) { - T key; - var serializedKey = entry.getKeyUnsafe(); - var serializedValue = entry.getValueUnsafe(); - splitPrefix(serializedKey).close(); - suffixKeyLengthConsistency(serializedKey.readableBytes()); - key = deserializeSuffix(serializedKey); - U value = valueSerializer.deserialize(serializedValue); - deserializedEntry = Map.entry(key, value); - } - sink.next(deserializedEntry); - } catch (Throwable ex) { - sink.error(ex); + try (entry) { + T key; + var serializedKey = entry.getKeyUnsafe(); + var serializedValue = entry.getValueUnsafe(); + splitPrefix(serializedKey).close(); + suffixKeyLengthConsistency(serializedKey.readableBytes()); + key = deserializeSuffix(serializedKey); + U value = valueSerializer.deserialize(serializedValue); + deserializedEntry = Map.entry(key, value); } + return deserializedEntry; }) .collectMap(Entry::getKey, Entry::getValue, Object2ObjectLinkedOpenHashMap::new) .map(map -> (Object2ObjectSortedMap) map) @@ -238,7 +234,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep extends DatabaseMapDictionaryDeep> getMulti(@Nullable CompositeSnapshot snapshot, Flux keys, boolean existsAlmostCertainly) { - var mappedKeys = keys - .handle((keySuffix, sink) -> { - try { - sink.next(serializeKeySuffixToKey(keySuffix)); - } catch (Throwable ex) { - sink.error(ex); - } - }); + var mappedKeys = keys.map(this::serializeKeySuffixToKey); return dictionary .getMulti(resolveSnapshot(snapshot), mappedKeys) - .handle((valueBufOpt, sink) -> { - try { - sink.next(valueBufOpt.map(valueSerializer::deserialize)); - } catch (Throwable ex) { - sink.error(ex); - } finally { - valueBufOpt.ifPresent(Resource::close); + .map(valueBufOpt -> { + try (valueBufOpt) { + return valueBufOpt.map(valueSerializer::deserialize); } }); } @@ -435,24 +420,13 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep entry, SynchronousSink sink) { - try { - sink.next(serializeEntry(entry.getKey(), entry.getValue())); - } catch (Throwable e) { - sink.error(e); - } + private LLEntry serializeEntry(Entry entry) throws SerializationException { + return serializeEntry(entry.getKey(), entry.getValue()); } @Override public Mono putMulti(Flux> entries) { - var serializedEntries = entries - .handle((entry, sink) -> { - try { - sink.next(serializeEntry(entry.getKey(), entry.getValue())); - } catch (Throwable e) { - sink.error(e); - } - }); + var serializedEntries = entries.map(this::serializeEntry); return dictionary.putMulti(serializedEntries); } @@ -460,14 +434,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep updateMulti(Flux keys, KVSerializationFunction updater) { var sharedKeys = keys.publish().refCount(2); - var serializedKeys = sharedKeys.handle((key, sink) -> { - try { - Buffer serializedKey = serializeKeySuffixToKey(key); - sink.next(serializedKey); - } catch (Throwable ex) { - sink.error(ex); - } - }); + var serializedKeys = sharedKeys.map(this::serializeKeySuffixToKey); var serializedUpdater = getSerializedUpdater(updater); return dictionary.updateMulti(sharedKeys, serializedKeys, serializedUpdater); } @@ -578,34 +545,30 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep { - try { - Entry entry; - try (serializedEntry) { - var keyBuf = serializedEntry.getKeyUnsafe(); - assert keyBuf != null; - assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength; - // Remove prefix. Keep only the suffix and the ext - splitPrefix(keyBuf).close(); - assert suffixKeyLengthConsistency(keyBuf.readableBytes()); - T keySuffix = deserializeSuffix(keyBuf); + .map((serializedEntry) -> { + Entry entry; + try (serializedEntry) { + var keyBuf = serializedEntry.getKeyUnsafe(); + assert keyBuf != null; + assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength; + // Remove prefix. Keep only the suffix and the ext + splitPrefix(keyBuf).close(); + assert suffixKeyLengthConsistency(keyBuf.readableBytes()); + T keySuffix = deserializeSuffix(keyBuf); - assert serializedEntry.getValueUnsafe() != null; - U value = valueSerializer.deserialize(serializedEntry.getValueUnsafe()); - entry = Map.entry(keySuffix, value); - } - sink.next(entry); - } catch (Throwable e) { - sink.error(e); + assert serializedEntry.getValueUnsafe() != null; + U value = valueSerializer.deserialize(serializedEntry.getValueUnsafe()); + entry = Map.entry(keySuffix, value); } + return entry; }); } @Override public Flux> setAllValuesAndGetPrevious(Flux> entries) { - return Flux.concat( - this.getAllValues(null, false), - dictionary.setRange(rangeMono, entries.handle(this::serializeEntrySink), false).then(Mono.empty()) + return Flux.usingWhen(Mono.just(true), + b -> this.getAllValues(null, false), + b -> dictionary.setRange(rangeMono, entries.map(this::serializeEntry), false) ); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java index 7454c77..304c8cf 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java @@ -235,12 +235,10 @@ public class DatabaseMapDictionaryHashed extends @Override public Flux> setAllValuesAndGetPrevious(Flux> entries) { - return entries - .flatMap(entry -> LLUtils.usingResource(this.at(null, entry.getKey()), - stage -> stage - .setAndGetPrevious(entry.getValue()) - .map(prev -> Map.entry(entry.getKey(), prev)), true) - ); + return entries.flatMap(entry -> Mono.usingWhen(this.at(null, entry.getKey()), + stage -> stage.setAndGetPrevious(entry.getValue()).map(prev -> Map.entry(entry.getKey(), prev)), + LLUtils::closeResource + )); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index 4f4a7d1..8ad917b 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -1,5 +1,7 @@ package it.cavallium.dbengine.database.collections; +import static reactor.core.publisher.Mono.fromRunnable; + import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLUtils; @@ -30,13 +32,17 @@ public interface DatabaseStageMap> extends Mono at(@Nullable CompositeSnapshot snapshot, T key); default Mono containsKey(@Nullable CompositeSnapshot snapshot, T key) { - return LLUtils.usingResource(this.at(snapshot, key), - stage -> stage.isEmpty(snapshot).map(empty -> !empty), true); + return Mono.usingWhen(this.at(snapshot, key), + stage -> stage.isEmpty(snapshot).map(empty -> !empty), + LLUtils::closeResource + ); } default Mono getValue(@Nullable CompositeSnapshot snapshot, T key, boolean existsAlmostCertainly) { - return LLUtils.usingResource(this.at(snapshot, key), - stage -> stage.get(snapshot, existsAlmostCertainly), true); + return Mono.usingWhen(this.at(snapshot, key), + stage -> stage.get(snapshot, existsAlmostCertainly), + LLUtils::closeResource + ); } default Mono getValue(@Nullable CompositeSnapshot snapshot, T key) { @@ -48,8 +54,7 @@ public interface DatabaseStageMap> extends } default Mono putValue(T key, U value) { - return LLUtils.usingResource(at(null, key).single(), - stage -> stage.set(value), true); + return Mono.usingWhen(at(null, key).single(), stage -> stage.set(value), LLUtils::closeResource); } Mono getUpdateMode(); @@ -57,8 +62,10 @@ public interface DatabaseStageMap> extends default Mono updateValue(T key, UpdateReturnMode updateReturnMode, SerializationFunction<@Nullable U, @Nullable U> updater) { - return LLUtils.usingResource(this.at(null, key).single(), - stage -> stage.update(updater, updateReturnMode), true); + return Mono.usingWhen(at(null, key).single(), + stage -> stage.update(updater, updateReturnMode), + LLUtils::closeResource + ); } default Flux updateMulti(Flux keys, KVSerializationFunction updater) { @@ -78,14 +85,19 @@ public interface DatabaseStageMap> extends } default Mono putValueAndGetPrevious(T key, U value) { - return LLUtils.usingResource(at(null, key).single(), stage -> stage.setAndGetPrevious(value), true); + return Mono.usingWhen(at(null, key).single(), + stage -> stage.setAndGetPrevious(value), + LLUtils::closeResource + ); } /** * @return true if the key was associated with any value, false if the key didn't exist. */ default Mono putValueAndGetChanged(T key, U value) { - return LLUtils.usingResource(at(null, key).single(), stage -> stage.setAndGetChanged(value), true).single(); + return Mono + .usingWhen(at(null, key).single(), stage -> stage.setAndGetChanged(value), LLUtils::closeResource) + .single(); } default Mono remove(T key) { @@ -93,7 +105,7 @@ public interface DatabaseStageMap> extends } default Mono removeAndGetPrevious(T key) { - return LLUtils.usingResource(at(null, key), DatabaseStage::clearAndGetPrevious, true); + return Mono.usingWhen(at(null, key), DatabaseStage::clearAndGetPrevious, LLUtils::closeResource); } default Mono removeAndGetStatus(T key) { @@ -104,10 +116,10 @@ public interface DatabaseStageMap> extends * GetMulti must return the elements in sequence! */ default Flux> getMulti(@Nullable CompositeSnapshot snapshot, Flux keys, boolean existsAlmostCertainly) { - return keys - .flatMapSequential(key -> this.getValue(snapshot, key, existsAlmostCertainly)) + return keys.flatMapSequential(key -> this + .getValue(snapshot, key, existsAlmostCertainly) .map(Optional::of) - .defaultIfEmpty(Optional.empty()); + .defaultIfEmpty(Optional.empty())); } /** diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 64f1934..bd87d7a 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -16,7 +16,6 @@ import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.BufferAllocator; import io.netty5.buffer.api.ReadableComponent; import io.netty5.buffer.api.Resource; -import io.netty5.buffer.api.Send; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.database.ColumnUtils; import it.cavallium.dbengine.database.LLDelta; @@ -26,6 +25,7 @@ import it.cavallium.dbengine.database.LLEntry; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.OptionalBuf; import it.cavallium.dbengine.database.SafeCloseable; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; @@ -35,7 +35,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ForkJoinPool; @@ -49,10 +48,8 @@ import org.apache.logging.log4j.util.Supplier; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.rocksdb.AbstractNativeReference; -import org.rocksdb.AbstractSlice; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.CompactRangeOptions; -import org.rocksdb.DirectSlice; import org.rocksdb.FlushOptions; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDBException; @@ -60,7 +57,6 @@ import org.rocksdb.Slice; import org.rocksdb.Snapshot; import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; -import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; @@ -258,26 +254,31 @@ public class LLLocalDictionary implements LLDictionary { @Override public Mono get(@Nullable LLSnapshot snapshot, Mono keyMono) { - return Mono.usingWhen(keyMono, key -> runOnDb(false, () -> { - logger.trace(MARKER_ROCKSDB, "Reading {}", () -> toStringSafe(key)); + return Mono.usingWhen(keyMono, + key -> runOnDb(false, () -> this.getSync(snapshot, key)), + key -> Mono.fromRunnable(key::close) + ); + } + + private Buffer getSync(LLSnapshot snapshot, Buffer key) throws Exception { + logger.trace(MARKER_ROCKSDB, "Reading {}", () -> toStringSafe(key)); + try { + var readOptions = generateReadOptionsOrStatic(snapshot); + Buffer result; + startedGet.increment(); try { - var readOptions = generateReadOptionsOrStatic(snapshot); - Buffer result; - startedGet.increment(); - try { - result = getTime.recordCallable(() -> db.get(readOptions, key)); - } finally { - endedGet.increment(); - if (readOptions != EMPTY_READ_OPTIONS) { - readOptions.close(); - } + result = getTime.recordCallable(() -> db.get(readOptions, key)); + } finally { + endedGet.increment(); + if (readOptions != EMPTY_READ_OPTIONS) { + readOptions.close(); } - logger.trace(MARKER_ROCKSDB, "Read {}: {}", () -> toStringSafe(key), () -> toStringSafe(result)); - return result; - } catch (RocksDBException ex) { - throw new IOException("Failed to read " + toStringSafe(key) + ": " + ex.getMessage()); } - }), key -> Mono.fromRunnable(key::close)); + logger.trace(MARKER_ROCKSDB, "Read {}: {}", () -> toStringSafe(key), () -> toStringSafe(result)); + return result; + } catch (RocksDBException ex) { + throw new IOException("Failed to read " + toStringSafe(key) + ": " + ex.getMessage()); + } } @Override @@ -541,46 +542,8 @@ public class LLLocalDictionary implements LLDictionary { } @Override - public Flux> getMulti(@Nullable LLSnapshot snapshot, Flux keys) { - return keys - .buffer(MULTI_GET_WINDOW) - .publishOn(dbRScheduler) - .>>handle((keysWindow, sink) -> { - try { - assert !Schedulers.isInNonBlockingThread() : "Called getMulti in a nonblocking thread"; - ArrayList> mappedResults; - var readOptions = generateReadOptionsOrStatic(snapshot); - try { - List results = db.multiGetAsList(readOptions, LLUtils.toArray(keysWindow)); - mappedResults = new ArrayList<>(results.size()); - for (int i = 0; i < results.size(); i++) { - byte[] val = results.get(i); - Optional valueOpt; - if (val != null) { - // free memory - results.set(i, null); - - valueOpt = Optional.of(LLUtils.fromByteArray(alloc, val)); - } else { - valueOpt = Optional.empty(); - } - mappedResults.add(valueOpt); - } - } finally { - if (readOptions != EMPTY_READ_OPTIONS) { - readOptions.close(); - } - } - sink.next(mappedResults); - } catch (RocksDBException ex) { - sink.error(new RocksDBException("Failed to read keys: " + ex.getMessage())); - } finally { - for (Buffer buffer : keysWindow) { - buffer.close(); - } - } - }) - .flatMapIterable(list -> list); + public Flux getMulti(@Nullable LLSnapshot snapshot, Flux keys) { + return keys.flatMapSequential(key -> runOnDb(false, () -> OptionalBuf.ofNullable(getSync(snapshot, key)))); } @Override @@ -641,7 +604,7 @@ public class LLLocalDictionary implements LLDictionary { for (Tuple2 objects : entriesWindow) { keyBufsWindow.add(objects.getT2()); } - ArrayList>> mappedInputs; + ArrayList> mappedInputs; { var readOptions = generateReadOptionsOrStatic(null); try { @@ -654,13 +617,13 @@ public class LLLocalDictionary implements LLDictionary { mappedInputs.add(Tuples.of( entriesWindow.get(i).getT1(), keyBufsWindow.get(i), - Optional.of(fromByteArray(alloc, val)) + OptionalBuf.of(fromByteArray(alloc, val)) )); } else { mappedInputs.add(Tuples.of( entriesWindow.get(i).getT1(), keyBufsWindow.get(i), - Optional.empty() + OptionalBuf.empty() )); } } diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java index f1ab232..8f232e8 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java @@ -12,6 +12,7 @@ import it.cavallium.dbengine.database.LLEntry; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.OptionalBuf; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.disk.BinarySerializationFunction; import it.cavallium.dbengine.database.serialization.KVSerializationFunction; @@ -270,14 +271,14 @@ public class LLMemoryDictionary implements LLDictionary { } @Override - public Flux> getMulti(@Nullable LLSnapshot snapshot, Flux keys) { + public Flux getMulti(@Nullable LLSnapshot snapshot, Flux keys) { return keys.map(key -> { try (key) { ByteList v = snapshots.get(resolveSnapshot(snapshot)).get(k(key.copy().send())); if (v != null) { - return Optional.of(kkB(v)); + return OptionalBuf.of(kkB(v)); } else { - return Optional.empty(); + return OptionalBuf.empty(); } } }); diff --git a/src/main/java/it/cavallium/dbengine/database/remote/QuicUtils.java b/src/main/java/it/cavallium/dbengine/database/remote/QuicUtils.java index a9f92e6..97d0201 100644 --- a/src/main/java/it/cavallium/dbengine/database/remote/QuicUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/remote/QuicUtils.java @@ -4,6 +4,7 @@ import io.netty.handler.codec.ByteToMessageCodec; import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Send; import it.cavallium.data.generator.nativedata.NullableString; +import it.cavallium.dbengine.database.OptionalBuf; import it.cavallium.dbengine.rpc.current.data.RPCCrash; import it.cavallium.dbengine.rpc.current.data.RPCEvent; import it.cavallium.dbengine.rpc.current.data.nullables.NullableBytes; @@ -42,8 +43,7 @@ public class QuicUtils { return new String(QuicUtils.toArrayNoCopy(b), StandardCharsets.UTF_8); } - @SuppressWarnings("OptionalUsedAsFieldOrParameterType") - public static NullableBytes toBytes(Optional valueSendOpt) { + public static NullableBytes toBytes(OptionalBuf valueSendOpt) { if (valueSendOpt.isPresent()) { try (var value = valueSendOpt.get()) { var bytes = new byte[value.readableBytes()]; @@ -161,7 +161,7 @@ public class QuicUtils { private static R mapErrors(R value) { if (value instanceof RPCCrash crash) { - throw new RPCException(crash.code(), crash.message().orElse(null)); + throw new RPCException(crash.code(), crash.message().getNullable()); } else { return value; } diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index 5a6004e..bb34c6d 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -233,8 +233,10 @@ public class LuceneUtils { public static ValueGetter, V> getAsyncDbValueGetterDeep( CompositeSnapshot snapshot, DatabaseMapDictionaryDeep, ? extends DatabaseStageMap>> dictionaryDeep) { - return entry -> LLUtils.usingResource(dictionaryDeep - .at(snapshot, entry.getKey()), sub -> sub.getValue(snapshot, entry.getValue()), true); + return entry -> Mono.usingWhen(dictionaryDeep.at(snapshot, entry.getKey()), + sub -> sub.getValue(snapshot, entry.getValue()), + LLUtils::closeResource + ); } public static PerFieldAnalyzerWrapper toPerFieldAnalyzerWrapper(IndicizerAnalyzers indicizerAnalyzers) { diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java index 7ad0709..2f60044 100644 --- a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java +++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java @@ -6,6 +6,8 @@ import static it.cavallium.dbengine.SyncUtils.*; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.DbTestUtils.TestAllocator; import it.cavallium.dbengine.database.UpdateMode; +import it.cavallium.dbengine.database.collections.DatabaseStageEntry; +import it.cavallium.dbengine.database.collections.DatabaseStageMap; import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps; @@ -14,6 +16,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; @@ -128,9 +131,9 @@ public abstract class TestDictionaryMap { var stpVer = StepVerifier .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) - .flatMap(map -> LLUtils - .usingResource(map.at(null, key), v -> v.set(value), true) - .then(LLUtils.usingResource(map.at(null, key), v -> v.get(null), true)) + .flatMap(map -> Mono + .usingWhen(map.at(null, key), v -> v.set(value), LLUtils::closeResource) + .then(Mono.usingWhen(map.at(null, key), v -> v.get(null), LLUtils::closeResource)) .doFinally(s -> map.close()) ) )); @@ -417,18 +420,16 @@ public abstract class TestDictionaryMap { public void testSetAllValuesGetMulti(MapType mapType, UpdateMode updateMode, Object2ObjectSortedMap entries, boolean shouldFail) { var remainingEntries = new ConcurrentHashMap, Boolean>().keySet(true); Step> stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) - .flatMapMany(map -> { - var entriesFlux = Flux.fromIterable(entries.entrySet()); - var keysFlux = entriesFlux.map(Entry::getKey); - var resultsFlux = map - .setAllValues(entriesFlux) - .thenMany(map.getMulti(null, keysFlux)); - return Flux.zip(keysFlux, resultsFlux, Map::entry).doFinally(s -> map.close()); - }) - .filter(k -> k.getValue().isPresent()) - .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) + .create(tempDb(getTempDbGenerator(), allocator, db -> { + var mapMono = tempDictionary(db, updateMode).map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)); + return Flux.usingWhen(mapMono, map -> { + Flux> entriesFlux = Flux.fromIterable(entries.entrySet()); + Flux keysFlux = entriesFlux.map(Entry::getKey); + Flux> resultsFlux = map.setAllValues(entriesFlux).thenMany(map.getMulti(null, keysFlux)); + return Flux.zip(keysFlux, resultsFlux, Map::entry); + }, LLUtils::closeResource) + .filter(k -> k.getValue().isPresent()).map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())); + } )); if (shouldFail) { this.checkLeaks = false;