Code cleanup

This commit is contained in:
Andrea Cavalli 2022-05-21 15:28:52 +02:00
parent 5c4519552d
commit 7f52339a6a
11 changed files with 231 additions and 246 deletions

View File

@ -41,7 +41,7 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
Mono<Buffer> remove(Mono<Buffer> key, LLDictionaryResultType resultType); Mono<Buffer> remove(Mono<Buffer> key, LLDictionaryResultType resultType);
Flux<Optional<Buffer>> getMulti(@Nullable LLSnapshot snapshot, Flux<Buffer> keys); Flux<OptionalBuf> getMulti(@Nullable LLSnapshot snapshot, Flux<Buffer> keys);
Mono<Void> putMulti(Flux<LLEntry> entries); Mono<Void> putMulti(Flux<LLEntry> entries);

View File

@ -16,10 +16,6 @@ import io.netty5.buffer.api.Send;
import io.netty5.buffer.api.WritableComponent; import io.netty5.buffer.api.WritableComponent;
import io.netty5.buffer.api.internal.Statics; import io.netty5.buffer.api.internal.Statics;
import io.netty5.util.IllegalReferenceCountException; import io.netty5.util.IllegalReferenceCountException;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultCurrent;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultDelta;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultPrevious;
import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj;
import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.lucene.RandomSortField; import it.cavallium.dbengine.lucene.RandomSortField;
@ -32,7 +28,6 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.ToIntFunction; import java.util.function.ToIntFunction;
@ -538,64 +533,6 @@ public class LLUtils {
} }
} }
// todo: remove this ugly method
/**
* cleanup resource
* @param cleanupOnSuccess if true the resource will be cleaned up if the function is successful
*/
public static <U, T extends Resource<? extends T>, V extends T> Mono<U> usingResource(Mono<V> resourceSupplier,
Function<V, Mono<U>> resourceClosure,
boolean cleanupOnSuccess) {
return Mono.usingWhen(resourceSupplier, resourceClosure, r -> {
if (cleanupOnSuccess) {
return Mono.fromRunnable(() -> {
if (r.isAccessible()) {
r.close();
}
});
} else {
return Mono.empty();
}
}, (r, ex) -> Mono.fromRunnable(() -> {
if (r.isAccessible()) {
r.close();
}
}), r -> Mono.fromRunnable(() -> {
if (r.isAccessible()) {
r.close();
}
}));
}
// todo: remove this ugly method
/**
* cleanup resource
* @param cleanupOnSuccess if true the resource will be cleaned up if the function is successful
*/
public static <U, T extends Resource<T>, V extends T> Flux<U> usingResources(Mono<V> resourceSupplier,
Function<V, Flux<U>> resourceClosure,
boolean cleanupOnSuccess) {
return Flux.usingWhen(resourceSupplier, resourceClosure, r -> {
if (cleanupOnSuccess) {
return Mono.fromRunnable(() -> {
if (r.isAccessible()) {
r.close();
}
});
} else {
return Mono.empty();
}
}, (r, ex) -> Mono.fromRunnable(() -> {
if (r.isAccessible()) {
r.close();
}
}), r -> Mono.fromRunnable(() -> {
if (r.isAccessible()) {
r.close();
}
}));
}
// todo: remove this ugly method // todo: remove this ugly method
/** /**
* cleanup resource * cleanup resource
@ -687,6 +624,10 @@ public class LLUtils {
return readOptions; return readOptions;
} }
public static Mono<Void> closeResource(Resource<?> resource) {
return Mono.fromRunnable(resource::close);
}
@Deprecated @Deprecated
public record DirectBuffer(@NotNull Buffer buffer, @NotNull ByteBuffer byteBuffer) {} public record DirectBuffer(@NotNull Buffer buffer, @NotNull ByteBuffer byteBuffer) {}

View File

@ -0,0 +1,104 @@
package it.cavallium.dbengine.database;
import io.netty5.buffer.api.Buffer;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public final class OptionalBuf implements SafeCloseable {
private static final OptionalBuf EMPTY = new OptionalBuf(null);
private final Buffer buffer;
private OptionalBuf(@Nullable Buffer buffer) {
this.buffer = buffer;
}
public static OptionalBuf ofNullable(@Nullable Buffer buffer) {
return new OptionalBuf(buffer);
}
public static OptionalBuf of(@NotNull Buffer buffer) {
Objects.requireNonNull(buffer);
return new OptionalBuf(buffer);
}
public static OptionalBuf empty() {
return EMPTY;
}
@Override
public void close() {
if (buffer != null && buffer.isAccessible()) {
buffer.close();
}
}
@Override
public String toString() {
if (buffer != null) {
return buffer.toString();
} else {
return "(empty)";
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
OptionalBuf that = (OptionalBuf) o;
return Objects.equals(buffer, that.buffer);
}
@Override
public int hashCode() {
return buffer != null ? buffer.hashCode() : 0;
}
public Buffer get() {
if (buffer == null) {
throw new NoSuchElementException();
}
return buffer;
}
public Buffer orElse(Buffer alternative) {
if (buffer == null) {
return alternative;
}
return buffer;
}
public void ifPresent(Consumer<Buffer> consumer) {
if (buffer != null) {
consumer.accept(buffer);
}
}
public boolean isPresent() {
return buffer != null;
}
public boolean isEmpty() {
return buffer == null;
}
public <U> Optional<U> map(Function<Buffer, U> mapper) {
if (buffer != null) {
return Optional.of(mapper.apply(buffer));
} else {
return Optional.empty();
}
}
}

View File

@ -209,9 +209,8 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public Mono<Object2ObjectSortedMap<T, U>> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { public Mono<Object2ObjectSortedMap<T, U>> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) {
return dictionary return dictionary
.getRange(resolveSnapshot(snapshot), rangeMono, false, true) .getRange(resolveSnapshot(snapshot), rangeMono, false, true)
.<Entry<T, U>>handle((entry, sink) -> { .map(entry -> {
Entry<T, U> deserializedEntry; Entry<T, U> deserializedEntry;
try {
try (entry) { try (entry) {
T key; T key;
var serializedKey = entry.getKeyUnsafe(); var serializedKey = entry.getKeyUnsafe();
@ -222,10 +221,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
U value = valueSerializer.deserialize(serializedValue); U value = valueSerializer.deserialize(serializedValue);
deserializedEntry = Map.entry(key, value); deserializedEntry = Map.entry(key, value);
} }
sink.next(deserializedEntry); return deserializedEntry;
} catch (Throwable ex) {
sink.error(ex);
}
}) })
.collectMap(Entry::getKey, Entry::getValue, Object2ObjectLinkedOpenHashMap::new) .collectMap(Entry::getKey, Entry::getValue, Object2ObjectLinkedOpenHashMap::new)
.map(map -> (Object2ObjectSortedMap<T, U>) map) .map(map -> (Object2ObjectSortedMap<T, U>) map)
@ -238,7 +234,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.get(null, false) .get(null, false)
.concatWith(dictionary.setRange(rangeMono, Flux .concatWith(dictionary.setRange(rangeMono, Flux
.fromIterable(Collections.unmodifiableMap(value).entrySet()) .fromIterable(Collections.unmodifiableMap(value).entrySet())
.handle(this::serializeEntrySink), true).then(Mono.empty())) .map(this::serializeEntry), true).then(Mono.empty()))
.singleOrEmpty(); .singleOrEmpty();
} }
@ -403,23 +399,12 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override @Override
public Flux<Optional<U>> getMulti(@Nullable CompositeSnapshot snapshot, Flux<T> keys, boolean existsAlmostCertainly) { public Flux<Optional<U>> getMulti(@Nullable CompositeSnapshot snapshot, Flux<T> keys, boolean existsAlmostCertainly) {
var mappedKeys = keys var mappedKeys = keys.map(this::serializeKeySuffixToKey);
.<Buffer>handle((keySuffix, sink) -> {
try {
sink.next(serializeKeySuffixToKey(keySuffix));
} catch (Throwable ex) {
sink.error(ex);
}
});
return dictionary return dictionary
.getMulti(resolveSnapshot(snapshot), mappedKeys) .getMulti(resolveSnapshot(snapshot), mappedKeys)
.handle((valueBufOpt, sink) -> { .map(valueBufOpt -> {
try { try (valueBufOpt) {
sink.next(valueBufOpt.map(valueSerializer::deserialize)); return valueBufOpt.map(valueSerializer::deserialize);
} catch (Throwable ex) {
sink.error(ex);
} finally {
valueBufOpt.ifPresent(Resource::close);
} }
}); });
} }
@ -435,24 +420,13 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
} }
} }
private void serializeEntrySink(Entry<T,U> entry, SynchronousSink<LLEntry> sink) { private LLEntry serializeEntry(Entry<T, U> entry) throws SerializationException {
try { return serializeEntry(entry.getKey(), entry.getValue());
sink.next(serializeEntry(entry.getKey(), entry.getValue()));
} catch (Throwable e) {
sink.error(e);
}
} }
@Override @Override
public Mono<Void> putMulti(Flux<Entry<T, U>> entries) { public Mono<Void> putMulti(Flux<Entry<T, U>> entries) {
var serializedEntries = entries var serializedEntries = entries.map(this::serializeEntry);
.<LLEntry>handle((entry, sink) -> {
try {
sink.next(serializeEntry(entry.getKey(), entry.getValue()));
} catch (Throwable e) {
sink.error(e);
}
});
return dictionary.putMulti(serializedEntries); return dictionary.putMulti(serializedEntries);
} }
@ -460,14 +434,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public Flux<Boolean> updateMulti(Flux<T> keys, public Flux<Boolean> updateMulti(Flux<T> keys,
KVSerializationFunction<T, @Nullable U, @Nullable U> updater) { KVSerializationFunction<T, @Nullable U, @Nullable U> updater) {
var sharedKeys = keys.publish().refCount(2); var sharedKeys = keys.publish().refCount(2);
var serializedKeys = sharedKeys.<Buffer>handle((key, sink) -> { var serializedKeys = sharedKeys.map(this::serializeKeySuffixToKey);
try {
Buffer serializedKey = serializeKeySuffixToKey(key);
sink.next(serializedKey);
} catch (Throwable ex) {
sink.error(ex);
}
});
var serializedUpdater = getSerializedUpdater(updater); var serializedUpdater = getSerializedUpdater(updater);
return dictionary.updateMulti(sharedKeys, serializedKeys, serializedUpdater); return dictionary.updateMulti(sharedKeys, serializedKeys, serializedUpdater);
} }
@ -578,8 +545,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
boolean reverse, boolean smallRange) { boolean reverse, boolean smallRange) {
return dictionary return dictionary
.getRange(resolveSnapshot(snapshot), sliceRangeMono, reverse, smallRange) .getRange(resolveSnapshot(snapshot), sliceRangeMono, reverse, smallRange)
.handle((serializedEntry, sink) -> { .map((serializedEntry) -> {
try {
Entry<T, U> entry; Entry<T, U> entry;
try (serializedEntry) { try (serializedEntry) {
var keyBuf = serializedEntry.getKeyUnsafe(); var keyBuf = serializedEntry.getKeyUnsafe();
@ -594,18 +560,15 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
U value = valueSerializer.deserialize(serializedEntry.getValueUnsafe()); U value = valueSerializer.deserialize(serializedEntry.getValueUnsafe());
entry = Map.entry(keySuffix, value); entry = Map.entry(keySuffix, value);
} }
sink.next(entry); return entry;
} catch (Throwable e) {
sink.error(e);
}
}); });
} }
@Override @Override
public Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) { public Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) {
return Flux.concat( return Flux.usingWhen(Mono.just(true),
this.getAllValues(null, false), b -> this.getAllValues(null, false),
dictionary.setRange(rangeMono, entries.handle(this::serializeEntrySink), false).then(Mono.empty()) b -> dictionary.setRange(rangeMono, entries.map(this::serializeEntry), false)
); );
} }

View File

@ -235,12 +235,10 @@ public class DatabaseMapDictionaryHashed<T, U, TH> extends
@Override @Override
public Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) { public Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) {
return entries return entries.flatMap(entry -> Mono.usingWhen(this.at(null, entry.getKey()),
.flatMap(entry -> LLUtils.usingResource(this.at(null, entry.getKey()), stage -> stage.setAndGetPrevious(entry.getValue()).map(prev -> Map.entry(entry.getKey(), prev)),
stage -> stage LLUtils::closeResource
.setAndGetPrevious(entry.getValue()) ));
.map(prev -> Map.entry(entry.getKey(), prev)), true)
);
} }
@Override @Override

View File

@ -1,5 +1,7 @@
package it.cavallium.dbengine.database.collections; package it.cavallium.dbengine.database.collections;
import static reactor.core.publisher.Mono.fromRunnable;
import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
@ -30,13 +32,17 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
Mono<US> at(@Nullable CompositeSnapshot snapshot, T key); Mono<US> at(@Nullable CompositeSnapshot snapshot, T key);
default Mono<Boolean> containsKey(@Nullable CompositeSnapshot snapshot, T key) { default Mono<Boolean> containsKey(@Nullable CompositeSnapshot snapshot, T key) {
return LLUtils.usingResource(this.at(snapshot, key), return Mono.usingWhen(this.at(snapshot, key),
stage -> stage.isEmpty(snapshot).map(empty -> !empty), true); stage -> stage.isEmpty(snapshot).map(empty -> !empty),
LLUtils::closeResource
);
} }
default Mono<U> getValue(@Nullable CompositeSnapshot snapshot, T key, boolean existsAlmostCertainly) { default Mono<U> getValue(@Nullable CompositeSnapshot snapshot, T key, boolean existsAlmostCertainly) {
return LLUtils.usingResource(this.at(snapshot, key), return Mono.usingWhen(this.at(snapshot, key),
stage -> stage.get(snapshot, existsAlmostCertainly), true); stage -> stage.get(snapshot, existsAlmostCertainly),
LLUtils::closeResource
);
} }
default Mono<U> getValue(@Nullable CompositeSnapshot snapshot, T key) { default Mono<U> getValue(@Nullable CompositeSnapshot snapshot, T key) {
@ -48,8 +54,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
} }
default Mono<Void> putValue(T key, U value) { default Mono<Void> putValue(T key, U value) {
return LLUtils.usingResource(at(null, key).single(), return Mono.usingWhen(at(null, key).single(), stage -> stage.set(value), LLUtils::closeResource);
stage -> stage.set(value), true);
} }
Mono<UpdateMode> getUpdateMode(); Mono<UpdateMode> getUpdateMode();
@ -57,8 +62,10 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
default Mono<U> updateValue(T key, default Mono<U> updateValue(T key,
UpdateReturnMode updateReturnMode, UpdateReturnMode updateReturnMode,
SerializationFunction<@Nullable U, @Nullable U> updater) { SerializationFunction<@Nullable U, @Nullable U> updater) {
return LLUtils.usingResource(this.at(null, key).single(), return Mono.usingWhen(at(null, key).single(),
stage -> stage.update(updater, updateReturnMode), true); stage -> stage.update(updater, updateReturnMode),
LLUtils::closeResource
);
} }
default Flux<Boolean> updateMulti(Flux<T> keys, KVSerializationFunction<T, @Nullable U, @Nullable U> updater) { default Flux<Boolean> updateMulti(Flux<T> keys, KVSerializationFunction<T, @Nullable U, @Nullable U> updater) {
@ -78,14 +85,19 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
} }
default Mono<U> putValueAndGetPrevious(T key, U value) { default Mono<U> putValueAndGetPrevious(T key, U value) {
return LLUtils.usingResource(at(null, key).single(), stage -> stage.setAndGetPrevious(value), true); return Mono.usingWhen(at(null, key).single(),
stage -> stage.setAndGetPrevious(value),
LLUtils::closeResource
);
} }
/** /**
* @return true if the key was associated with any value, false if the key didn't exist. * @return true if the key was associated with any value, false if the key didn't exist.
*/ */
default Mono<Boolean> putValueAndGetChanged(T key, U value) { default Mono<Boolean> putValueAndGetChanged(T key, U value) {
return LLUtils.usingResource(at(null, key).single(), stage -> stage.setAndGetChanged(value), true).single(); return Mono
.usingWhen(at(null, key).single(), stage -> stage.setAndGetChanged(value), LLUtils::closeResource)
.single();
} }
default Mono<Void> remove(T key) { default Mono<Void> remove(T key) {
@ -93,7 +105,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
} }
default Mono<U> removeAndGetPrevious(T key) { default Mono<U> removeAndGetPrevious(T key) {
return LLUtils.usingResource(at(null, key), DatabaseStage::clearAndGetPrevious, true); return Mono.usingWhen(at(null, key), DatabaseStage::clearAndGetPrevious, LLUtils::closeResource);
} }
default Mono<Boolean> removeAndGetStatus(T key) { default Mono<Boolean> removeAndGetStatus(T key) {
@ -104,10 +116,10 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
* GetMulti must return the elements in sequence! * GetMulti must return the elements in sequence!
*/ */
default Flux<Optional<U>> getMulti(@Nullable CompositeSnapshot snapshot, Flux<T> keys, boolean existsAlmostCertainly) { default Flux<Optional<U>> getMulti(@Nullable CompositeSnapshot snapshot, Flux<T> keys, boolean existsAlmostCertainly) {
return keys return keys.flatMapSequential(key -> this
.flatMapSequential(key -> this.getValue(snapshot, key, existsAlmostCertainly)) .getValue(snapshot, key, existsAlmostCertainly)
.map(Optional::of) .map(Optional::of)
.defaultIfEmpty(Optional.empty()); .defaultIfEmpty(Optional.empty()));
} }
/** /**

View File

@ -16,7 +16,6 @@ import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator; import io.netty5.buffer.api.BufferAllocator;
import io.netty5.buffer.api.ReadableComponent; import io.netty5.buffer.api.ReadableComponent;
import io.netty5.buffer.api.Resource; import io.netty5.buffer.api.Resource;
import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.database.ColumnUtils; import it.cavallium.dbengine.database.ColumnUtils;
import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLDelta;
@ -26,6 +25,7 @@ import it.cavallium.dbengine.database.LLEntry;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.OptionalBuf;
import it.cavallium.dbengine.database.SafeCloseable; import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.UpdateReturnMode;
@ -35,7 +35,6 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinPool;
@ -49,10 +48,8 @@ import org.apache.logging.log4j.util.Supplier;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.rocksdb.AbstractNativeReference; import org.rocksdb.AbstractNativeReference;
import org.rocksdb.AbstractSlice;
import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.CompactRangeOptions; import org.rocksdb.CompactRangeOptions;
import org.rocksdb.DirectSlice;
import org.rocksdb.FlushOptions; import org.rocksdb.FlushOptions;
import org.rocksdb.ReadOptions; import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
@ -60,7 +57,6 @@ import org.rocksdb.Slice;
import org.rocksdb.Snapshot; import org.rocksdb.Snapshot;
import org.rocksdb.WriteBatch; import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions; import org.rocksdb.WriteOptions;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Scheduler;
@ -258,7 +254,13 @@ public class LLLocalDictionary implements LLDictionary {
@Override @Override
public Mono<Buffer> get(@Nullable LLSnapshot snapshot, Mono<Buffer> keyMono) { public Mono<Buffer> get(@Nullable LLSnapshot snapshot, Mono<Buffer> keyMono) {
return Mono.usingWhen(keyMono, key -> runOnDb(false, () -> { return Mono.usingWhen(keyMono,
key -> runOnDb(false, () -> this.getSync(snapshot, key)),
key -> Mono.fromRunnable(key::close)
);
}
private Buffer getSync(LLSnapshot snapshot, Buffer key) throws Exception {
logger.trace(MARKER_ROCKSDB, "Reading {}", () -> toStringSafe(key)); logger.trace(MARKER_ROCKSDB, "Reading {}", () -> toStringSafe(key));
try { try {
var readOptions = generateReadOptionsOrStatic(snapshot); var readOptions = generateReadOptionsOrStatic(snapshot);
@ -277,7 +279,6 @@ public class LLLocalDictionary implements LLDictionary {
} catch (RocksDBException ex) { } catch (RocksDBException ex) {
throw new IOException("Failed to read " + toStringSafe(key) + ": " + ex.getMessage()); throw new IOException("Failed to read " + toStringSafe(key) + ": " + ex.getMessage());
} }
}), key -> Mono.fromRunnable(key::close));
} }
@Override @Override
@ -541,46 +542,8 @@ public class LLLocalDictionary implements LLDictionary {
} }
@Override @Override
public Flux<Optional<Buffer>> getMulti(@Nullable LLSnapshot snapshot, Flux<Buffer> keys) { public Flux<OptionalBuf> getMulti(@Nullable LLSnapshot snapshot, Flux<Buffer> keys) {
return keys return keys.flatMapSequential(key -> runOnDb(false, () -> OptionalBuf.ofNullable(getSync(snapshot, key))));
.buffer(MULTI_GET_WINDOW)
.publishOn(dbRScheduler)
.<ArrayList<Optional<Buffer>>>handle((keysWindow, sink) -> {
try {
assert !Schedulers.isInNonBlockingThread() : "Called getMulti in a nonblocking thread";
ArrayList<Optional<Buffer>> mappedResults;
var readOptions = generateReadOptionsOrStatic(snapshot);
try {
List<byte[]> results = db.multiGetAsList(readOptions, LLUtils.toArray(keysWindow));
mappedResults = new ArrayList<>(results.size());
for (int i = 0; i < results.size(); i++) {
byte[] val = results.get(i);
Optional<Buffer> valueOpt;
if (val != null) {
// free memory
results.set(i, null);
valueOpt = Optional.of(LLUtils.fromByteArray(alloc, val));
} else {
valueOpt = Optional.empty();
}
mappedResults.add(valueOpt);
}
} finally {
if (readOptions != EMPTY_READ_OPTIONS) {
readOptions.close();
}
}
sink.next(mappedResults);
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to read keys: " + ex.getMessage()));
} finally {
for (Buffer buffer : keysWindow) {
buffer.close();
}
}
})
.flatMapIterable(list -> list);
} }
@Override @Override
@ -641,7 +604,7 @@ public class LLLocalDictionary implements LLDictionary {
for (Tuple2<K, Buffer> objects : entriesWindow) { for (Tuple2<K, Buffer> objects : entriesWindow) {
keyBufsWindow.add(objects.getT2()); keyBufsWindow.add(objects.getT2());
} }
ArrayList<Tuple3<K, Buffer, Optional<Buffer>>> mappedInputs; ArrayList<Tuple3<K, Buffer, OptionalBuf>> mappedInputs;
{ {
var readOptions = generateReadOptionsOrStatic(null); var readOptions = generateReadOptionsOrStatic(null);
try { try {
@ -654,13 +617,13 @@ public class LLLocalDictionary implements LLDictionary {
mappedInputs.add(Tuples.of( mappedInputs.add(Tuples.of(
entriesWindow.get(i).getT1(), entriesWindow.get(i).getT1(),
keyBufsWindow.get(i), keyBufsWindow.get(i),
Optional.of(fromByteArray(alloc, val)) OptionalBuf.of(fromByteArray(alloc, val))
)); ));
} else { } else {
mappedInputs.add(Tuples.of( mappedInputs.add(Tuples.of(
entriesWindow.get(i).getT1(), entriesWindow.get(i).getT1(),
keyBufsWindow.get(i), keyBufsWindow.get(i),
Optional.empty() OptionalBuf.empty()
)); ));
} }
} }

View File

@ -12,6 +12,7 @@ import it.cavallium.dbengine.database.LLEntry;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.OptionalBuf;
import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.disk.BinarySerializationFunction; import it.cavallium.dbengine.database.disk.BinarySerializationFunction;
import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.KVSerializationFunction;
@ -270,14 +271,14 @@ public class LLMemoryDictionary implements LLDictionary {
} }
@Override @Override
public Flux<Optional<Buffer>> getMulti(@Nullable LLSnapshot snapshot, Flux<Buffer> keys) { public Flux<OptionalBuf> getMulti(@Nullable LLSnapshot snapshot, Flux<Buffer> keys) {
return keys.map(key -> { return keys.map(key -> {
try (key) { try (key) {
ByteList v = snapshots.get(resolveSnapshot(snapshot)).get(k(key.copy().send())); ByteList v = snapshots.get(resolveSnapshot(snapshot)).get(k(key.copy().send()));
if (v != null) { if (v != null) {
return Optional.of(kkB(v)); return OptionalBuf.of(kkB(v));
} else { } else {
return Optional.empty(); return OptionalBuf.empty();
} }
} }
}); });

View File

@ -4,6 +4,7 @@ import io.netty.handler.codec.ByteToMessageCodec;
import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.Send; import io.netty5.buffer.api.Send;
import it.cavallium.data.generator.nativedata.NullableString; import it.cavallium.data.generator.nativedata.NullableString;
import it.cavallium.dbengine.database.OptionalBuf;
import it.cavallium.dbengine.rpc.current.data.RPCCrash; import it.cavallium.dbengine.rpc.current.data.RPCCrash;
import it.cavallium.dbengine.rpc.current.data.RPCEvent; import it.cavallium.dbengine.rpc.current.data.RPCEvent;
import it.cavallium.dbengine.rpc.current.data.nullables.NullableBytes; import it.cavallium.dbengine.rpc.current.data.nullables.NullableBytes;
@ -42,8 +43,7 @@ public class QuicUtils {
return new String(QuicUtils.toArrayNoCopy(b), StandardCharsets.UTF_8); return new String(QuicUtils.toArrayNoCopy(b), StandardCharsets.UTF_8);
} }
@SuppressWarnings("OptionalUsedAsFieldOrParameterType") public static NullableBytes toBytes(OptionalBuf valueSendOpt) {
public static NullableBytes toBytes(Optional<Buffer> valueSendOpt) {
if (valueSendOpt.isPresent()) { if (valueSendOpt.isPresent()) {
try (var value = valueSendOpt.get()) { try (var value = valueSendOpt.get()) {
var bytes = new byte[value.readableBytes()]; var bytes = new byte[value.readableBytes()];
@ -161,7 +161,7 @@ public class QuicUtils {
private static <R> R mapErrors(R value) { private static <R> R mapErrors(R value) {
if (value instanceof RPCCrash crash) { if (value instanceof RPCCrash crash) {
throw new RPCException(crash.code(), crash.message().orElse(null)); throw new RPCException(crash.code(), crash.message().getNullable());
} else { } else {
return value; return value;
} }

View File

@ -233,8 +233,10 @@ public class LuceneUtils {
public static <T, U, V> ValueGetter<Entry<T, U>, V> getAsyncDbValueGetterDeep( public static <T, U, V> ValueGetter<Entry<T, U>, V> getAsyncDbValueGetterDeep(
CompositeSnapshot snapshot, CompositeSnapshot snapshot,
DatabaseMapDictionaryDeep<T, Object2ObjectSortedMap<U, V>, ? extends DatabaseStageMap<U, V, ? extends DatabaseStageEntry<V>>> dictionaryDeep) { DatabaseMapDictionaryDeep<T, Object2ObjectSortedMap<U, V>, ? extends DatabaseStageMap<U, V, ? extends DatabaseStageEntry<V>>> dictionaryDeep) {
return entry -> LLUtils.usingResource(dictionaryDeep return entry -> Mono.usingWhen(dictionaryDeep.at(snapshot, entry.getKey()),
.at(snapshot, entry.getKey()), sub -> sub.getValue(snapshot, entry.getValue()), true); sub -> sub.getValue(snapshot, entry.getValue()),
LLUtils::closeResource
);
} }
public static PerFieldAnalyzerWrapper toPerFieldAnalyzerWrapper(IndicizerAnalyzers indicizerAnalyzers) { public static PerFieldAnalyzerWrapper toPerFieldAnalyzerWrapper(IndicizerAnalyzers indicizerAnalyzers) {

View File

@ -6,6 +6,8 @@ import static it.cavallium.dbengine.SyncUtils.*;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.DbTestUtils.TestAllocator; import it.cavallium.dbengine.DbTestUtils.TestAllocator;
import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.collections.DatabaseStageEntry;
import it.cavallium.dbengine.database.collections.DatabaseStageMap;
import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap; import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap;
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps;
@ -14,6 +16,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
@ -128,9 +131,9 @@ public abstract class TestDictionaryMap {
var stpVer = StepVerifier var stpVer = StepVerifier
.create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5))
.flatMap(map -> LLUtils .flatMap(map -> Mono
.usingResource(map.at(null, key), v -> v.set(value), true) .usingWhen(map.at(null, key), v -> v.set(value), LLUtils::closeResource)
.then(LLUtils.usingResource(map.at(null, key), v -> v.get(null), true)) .then(Mono.usingWhen(map.at(null, key), v -> v.get(null), LLUtils::closeResource))
.doFinally(s -> map.close()) .doFinally(s -> map.close())
) )
)); ));
@ -417,18 +420,16 @@ public abstract class TestDictionaryMap {
public void testSetAllValuesGetMulti(MapType mapType, UpdateMode updateMode, Object2ObjectSortedMap<String, String> entries, boolean shouldFail) { public void testSetAllValuesGetMulti(MapType mapType, UpdateMode updateMode, Object2ObjectSortedMap<String, String> entries, boolean shouldFail) {
var remainingEntries = new ConcurrentHashMap<Entry<String, String>, Boolean>().keySet(true); var remainingEntries = new ConcurrentHashMap<Entry<String, String>, Boolean>().keySet(true);
Step<Entry<String, String>> stpVer = StepVerifier Step<Entry<String, String>> stpVer = StepVerifier
.create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) .create(tempDb(getTempDbGenerator(), allocator, db -> {
.map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) var mapMono = tempDictionary(db, updateMode).map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5));
.flatMapMany(map -> { return Flux.usingWhen(mapMono, map -> {
var entriesFlux = Flux.fromIterable(entries.entrySet()); Flux<Entry<String, String>> entriesFlux = Flux.fromIterable(entries.entrySet());
var keysFlux = entriesFlux.map(Entry::getKey); Flux<String> keysFlux = entriesFlux.map(Entry::getKey);
var resultsFlux = map Flux<Optional<String>> resultsFlux = map.setAllValues(entriesFlux).thenMany(map.getMulti(null, keysFlux));
.setAllValues(entriesFlux) return Flux.zip(keysFlux, resultsFlux, Map::entry);
.thenMany(map.getMulti(null, keysFlux)); }, LLUtils::closeResource)
return Flux.zip(keysFlux, resultsFlux, Map::entry).doFinally(s -> map.close()); .filter(k -> k.getValue().isPresent()).map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow()));
}) }
.filter(k -> k.getValue().isPresent())
.map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow()))
)); ));
if (shouldFail) { if (shouldFail) {
this.checkLeaks = false; this.checkLeaks = false;