package it.cavallium.dbengine.database.collections; import static java.util.Objects.requireNonNullElseGet; import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Resource; import io.netty5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.BufSupplier; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLEntry; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.SubStageEntry; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.disk.BinarySerializationFunction; import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps; import java.util.Collections; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.SynchronousSink; /** * Optimized implementation of "DatabaseMapDictionary with SubStageGetterSingle" */ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> { private static final Logger LOG = LogManager.getLogger(DatabaseMapDictionary.class); private final AtomicLong totalZeroBytesErrors = new AtomicLong(); private final Serializer valueSerializer; protected DatabaseMapDictionary(LLDictionary dictionary, @Nullable BufSupplier prefixKeySupplier, SerializerFixedBinaryLength keySuffixSerializer, Serializer valueSerializer, Runnable onClose) { // Do not retain or release or use the prefixKey here super(dictionary, prefixKeySupplier, keySuffixSerializer, new SubStageGetterSingle<>(valueSerializer), 0, onClose); this.valueSerializer = valueSerializer; } public static DatabaseMapDictionary simple(LLDictionary dictionary, SerializerFixedBinaryLength keySerializer, Serializer valueSerializer, Runnable onClose) { return new DatabaseMapDictionary<>(dictionary, null, keySerializer, valueSerializer, onClose); } public static DatabaseMapDictionary tail(LLDictionary dictionary, @Nullable BufSupplier prefixKeySupplier, SerializerFixedBinaryLength keySuffixSerializer, Serializer valueSerializer, Runnable onClose) { return new DatabaseMapDictionary<>(dictionary, prefixKeySupplier, keySuffixSerializer, valueSerializer, onClose); } public static Flux> getLeavesFrom(DatabaseMapDictionary databaseMapDictionary, CompositeSnapshot snapshot, Mono keyMin, Mono keyMax, boolean reverse, boolean smallRange) { Mono> keyMinOptMono = keyMin.map(Optional::of).defaultIfEmpty(Optional.empty()); Mono> keyMaxOptMono = keyMax.map(Optional::of).defaultIfEmpty(Optional.empty()); return Mono.zip(keyMinOptMono, keyMaxOptMono).flatMapMany(entry -> { var keyMinOpt = entry.getT1(); var keyMaxOpt = entry.getT2(); if (keyMinOpt.isPresent() || keyMaxOpt.isPresent()) { return databaseMapDictionary.getAllValues(snapshot, keyMinOpt.orElse(null), keyMaxOpt.orElse(null), reverse, smallRange ); } else { return databaseMapDictionary.getAllValues(snapshot, smallRange); } }); } public static Flux getKeyLeavesFrom(DatabaseMapDictionary databaseMapDictionary, CompositeSnapshot snapshot, Mono keyMin, Mono keyMax, boolean reverse, boolean smallRange) { Mono> keyMinOptMono = keyMin.map(Optional::of).defaultIfEmpty(Optional.empty()); Mono> keyMaxOptMono = keyMax.map(Optional::of).defaultIfEmpty(Optional.empty()); return Mono.zip(keyMinOptMono, keyMaxOptMono).flatMapMany(keys -> { var keyMinOpt = keys.getT1(); var keyMaxOpt = keys.getT2(); Flux>> stagesFlux; if (keyMinOpt.isPresent() || keyMaxOpt.isPresent()) { stagesFlux = databaseMapDictionary .getAllStages(snapshot, keyMinOpt.orElse(null), keyMaxOpt.orElse(null), reverse, smallRange); } else { stagesFlux = databaseMapDictionary.getAllStages(snapshot, smallRange); } return stagesFlux.doOnNext(e -> e.getValue().close()) .doOnDiscard(Entry.class, e -> { if (e.getValue() instanceof DatabaseStageEntry resource) { resource.close(); } }) .map(Entry::getKey); }); } private @Nullable U deserializeValue(T keySuffix, Buffer value) { try { return valueSerializer.deserialize(value); } catch (IndexOutOfBoundsException ex) { var exMessage = ex.getMessage(); if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) { var totalZeroBytesErrors = this.totalZeroBytesErrors.incrementAndGet(); if (totalZeroBytesErrors < 512 || totalZeroBytesErrors % 10000 == 0) { try (var keyPrefix = keyPrefixSupplier.get()) { try (var keySuffixBytes = serializeKeySuffixToKey(keySuffix)) { LOG.error( "Unexpected zero-bytes value at " + dictionary.getDatabaseName() + ":" + dictionary.getColumnName() + ":" + LLUtils.toStringSafe(keyPrefix) + ":" + keySuffix + "(" + LLUtils.toStringSafe(keySuffixBytes) + ") total=" + totalZeroBytesErrors); } catch (SerializationException e) { LOG.error( "Unexpected zero-bytes value at " + dictionary.getDatabaseName() + ":" + dictionary.getColumnName() + ":" + LLUtils.toStringSafe(keyPrefix) + ":" + keySuffix + "(?) total=" + totalZeroBytesErrors); } } } return null; } else { throw ex; } } } private Buffer serializeValue(U value) throws SerializationException { var valSizeHint = valueSerializer.getSerializedSizeHint(); if (valSizeHint == -1) valSizeHint = 128; var valBuf = dictionary.getAllocator().allocate(valSizeHint); try { valueSerializer.serialize(value, valBuf); return valBuf; } catch (Throwable t) { valBuf.close(); throw t; } } private Buffer serializeKeySuffixToKey(T keySuffix) throws SerializationException { Buffer keyBuf; if (keyPrefixSupplier != null) { keyBuf = keyPrefixSupplier.get(); } else { keyBuf = this.dictionary.getAllocator().allocate(keyPrefixLength + keySuffixLength + keyExtLength); } try { assert keyBuf.readableBytes() == keyPrefixLength; keyBuf.ensureWritable(keySuffixLength + keyExtLength); serializeSuffix(keySuffix, keyBuf); assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength; return keyBuf; } catch (Throwable t) { keyBuf.close(); throw t; } } private Buffer toKey(Buffer suffixKey) { assert suffixKeyLengthConsistency(suffixKey.readableBytes()); if (keyPrefixSupplier != null) { var result = LLUtils.compositeBuffer(dictionary.getAllocator(), keyPrefixSupplier.get().send(), suffixKey.send()); try { assert result.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength; return result; } catch (Throwable t) { result.close(); throw t; } } else { assert suffixKey.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength; return suffixKey; } } @Override public Mono> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { return dictionary .getRange(resolveSnapshot(snapshot), rangeMono, false, true) .>handle((entry, sink) -> { Entry deserializedEntry; try { try (entry) { T key; var serializedKey = entry.getKeyUnsafe(); var serializedValue = entry.getValueUnsafe(); splitPrefix(serializedKey).close(); suffixKeyLengthConsistency(serializedKey.readableBytes()); key = deserializeSuffix(serializedKey); U value = valueSerializer.deserialize(serializedValue); deserializedEntry = Map.entry(key, value); } sink.next(deserializedEntry); } catch (Throwable ex) { sink.error(ex); } }) .collectMap(Entry::getKey, Entry::getValue, Object2ObjectLinkedOpenHashMap::new) .map(map -> (Object2ObjectSortedMap) map) .filter(map -> !map.isEmpty()); } @Override public Mono> setAndGetPrevious(Object2ObjectSortedMap value) { return this .get(null, false) .concatWith(dictionary.setRange(rangeMono, Flux .fromIterable(Collections.unmodifiableMap(value).entrySet()) .handle(this::serializeEntrySink), true).then(Mono.empty())) .singleOrEmpty(); } @Override public Mono> clearAndGetPrevious() { return this .setAndGetPrevious(Object2ObjectSortedMaps.emptyMap()); } @Override public Mono leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) { return dictionary.sizeRange(resolveSnapshot(snapshot), rangeMono, fast); } @Override public Mono isEmpty(@Nullable CompositeSnapshot snapshot) { return dictionary.isRangeEmpty(resolveSnapshot(snapshot), rangeMono, false); } @Override public Mono> at(@Nullable CompositeSnapshot snapshot, T keySuffix) { return Mono.fromCallable(() -> new DatabaseMapSingle<>(dictionary, BufSupplier.ofOwned(serializeKeySuffixToKey(keySuffix)), valueSerializer, null)); } @Override public Mono containsKey(@Nullable CompositeSnapshot snapshot, T keySuffix) { return dictionary .isRangeEmpty(resolveSnapshot(snapshot), Mono.fromCallable(() -> LLRange.singleUnsafe(serializeKeySuffixToKey(keySuffix))), true ) .map(empty -> !empty); } @Override public Mono getValue(@Nullable CompositeSnapshot snapshot, T keySuffix, boolean existsAlmostCertainly) { return Mono.usingWhen(dictionary .get(resolveSnapshot(snapshot), Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix))), value -> Mono.fromCallable(() -> deserializeValue(keySuffix, value)), value -> Mono.fromRunnable(value::close)); } @Override public Mono putValue(T keySuffix, U value) { var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)).single(); var valueMono = Mono.fromCallable(() -> serializeValue(value)).single(); return dictionary .put(keyMono, valueMono, LLDictionaryResultType.VOID) .doOnNext(Resource::close) .then(); } @Override public Mono getUpdateMode() { return dictionary.getUpdateMode(); } @Override public Mono updateValue(T keySuffix, UpdateReturnMode updateReturnMode, SerializationFunction<@Nullable U, @Nullable U> updater) { var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)); return Mono.usingWhen(dictionary.update(keyMono, getSerializedUpdater(updater), updateReturnMode), result -> Mono.fromCallable(() -> deserializeValue(keySuffix, result)), result -> Mono.fromRunnable(result::close) ); } @Override public Mono> updateValueAndGetDelta(T keySuffix, SerializationFunction<@Nullable U, @Nullable U> updater) { var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)); return dictionary .updateAndGetDelta(keyMono, getSerializedUpdater(updater)) .transform(mono -> LLUtils.mapLLDelta(mono, valueSerializer::deserialize)); } public BinarySerializationFunction getSerializedUpdater(SerializationFunction<@Nullable U, @Nullable U> updater) { return oldSerialized -> { U result; if (oldSerialized == null) { result = updater.apply(null); } else { result = updater.apply(valueSerializer.deserialize(oldSerialized)); } if (result == null) { return null; } else { return serializeValue(result); } }; } public KVSerializationFunction<@NotNull T, @Nullable Buffer, @Nullable Buffer> getSerializedUpdater( KVSerializationFunction<@NotNull T, @Nullable U, @Nullable U> updater) { return (key, oldSerialized) -> { try (oldSerialized) { U result; if (oldSerialized == null) { result = updater.apply(key, null); } else { try (oldSerialized) { result = updater.apply(key, valueSerializer.deserialize(oldSerialized)); } } if (result == null) { return null; } else { return serializeValue(result); } } }; } @Override public Mono putValueAndGetPrevious(T keySuffix, U value) { var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)); var valueMono = Mono.fromCallable(() -> serializeValue(value)); return Mono.usingWhen(dictionary.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE), valueBuf -> Mono.fromCallable(() -> deserializeValue(keySuffix, valueBuf)), valueBuf -> Mono.fromRunnable(valueBuf::close) ); } @Override public Mono putValueAndGetChanged(T keySuffix, U value) { var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)); var valueMono = Mono.fromCallable(() -> serializeValue(value)); return Mono .usingWhen(dictionary.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE), valueBuf -> Mono.fromCallable(() -> deserializeValue(keySuffix, valueBuf)), valueBuf -> Mono.fromRunnable(valueBuf::close) ) .map(oldValue -> !Objects.equals(oldValue, value)) .defaultIfEmpty(value != null); } @Override public Mono remove(T keySuffix) { var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)); return dictionary .remove(keyMono, LLDictionaryResultType.VOID) .doOnNext(Resource::close) .then(); } @Override public Mono removeAndGetPrevious(T keySuffix) { var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)); return Mono.usingWhen(dictionary.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE), valueBuf -> Mono.fromCallable(() -> deserializeValue(keySuffix, valueBuf)), valueBuf -> Mono.fromRunnable(valueBuf::close) ); } @Override public Mono removeAndGetStatus(T keySuffix) { var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)); return dictionary .remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE_EXISTENCE) .map(LLUtils::responseToBoolean); } @Override public Flux> getMulti(@Nullable CompositeSnapshot snapshot, Flux keys, boolean existsAlmostCertainly) { var mappedKeys = keys .handle((keySuffix, sink) -> { try { sink.next(serializeKeySuffixToKey(keySuffix)); } catch (Throwable ex) { sink.error(ex); } }); return dictionary .getMulti(resolveSnapshot(snapshot), mappedKeys) .handle((valueBufOpt, sink) -> { try { sink.next(valueBufOpt.map(valueSerializer::deserialize)); } catch (Throwable ex) { sink.error(ex); } finally { valueBufOpt.ifPresent(Resource::close); } }); } private LLEntry serializeEntry(T keySuffix, U value) throws SerializationException { var key = serializeKeySuffixToKey(keySuffix); try { var serializedValue = serializeValue(value); return LLEntry.of(key, serializedValue); } catch (Throwable t) { key.close(); throw t; } } private void serializeEntrySink(Entry entry, SynchronousSink sink) { try { sink.next(serializeEntry(entry.getKey(), entry.getValue())); } catch (Throwable e) { sink.error(e); } } @Override public Mono putMulti(Flux> entries) { var serializedEntries = entries .handle((entry, sink) -> { try { sink.next(serializeEntry(entry.getKey(), entry.getValue())); } catch (Throwable e) { sink.error(e); } }); return dictionary.putMulti(serializedEntries); } @Override public Flux updateMulti(Flux keys, KVSerializationFunction updater) { var sharedKeys = keys.publish().refCount(2); var serializedKeys = sharedKeys.handle((key, sink) -> { try { Buffer serializedKey = serializeKeySuffixToKey(key); sink.next(serializedKey); } catch (Throwable ex) { sink.error(ex); } }); var serializedUpdater = getSerializedUpdater(updater); return dictionary.updateMulti(sharedKeys, serializedKeys, serializedUpdater); } @Override public Flux>> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange) { return getAllStages(snapshot, rangeMono, false, smallRange); } private LLRange getPatchedRange(@NotNull LLRange range, @Nullable T keyMin, @Nullable T keyMax) throws SerializationException { Buffer keyMinBuf = requireNonNullElseGet(serializeSuffixForRange(keyMin), range::getMinCopy); Buffer keyMaxBuf = requireNonNullElseGet(serializeSuffixForRange(keyMax), range::getMaxCopy); return LLRange.ofUnsafe(keyMinBuf, keyMaxBuf); } private Buffer serializeSuffixForRange(@Nullable T key) throws SerializationException { if (key == null) { return null; } var keyWithoutExtBuf = keyPrefixSupplier == null ? alloc.allocate(keySuffixLength + keyExtLength) : keyPrefixSupplier.get(); try { keyWithoutExtBuf.ensureWritable(keySuffixLength + keyExtLength); serializeSuffix(key, keyWithoutExtBuf); return keyWithoutExtBuf; } catch (Throwable ex) { keyWithoutExtBuf.close(); throw ex; } } /** * Get all stages * @param reverse if true, the results will go backwards from the specified key (inclusive) */ public Flux>> getAllStages(@Nullable CompositeSnapshot snapshot, @Nullable T keyMin, @Nullable T keyMax, boolean reverse, boolean smallRange) { if (keyMin == null && keyMax == null) { return getAllStages(snapshot, smallRange); } else { Mono boundedRangeMono = Mono.usingWhen(rangeMono, range -> Mono.fromCallable(() -> getPatchedRange(range, keyMin, keyMax)), range -> Mono.fromRunnable(range::close) ); return getAllStages(snapshot, boundedRangeMono, reverse, smallRange); } } private Flux>> getAllStages(@Nullable CompositeSnapshot snapshot, Mono sliceRangeMono, boolean reverse, boolean smallRange) { return dictionary .getRangeKeys(resolveSnapshot(snapshot), sliceRangeMono, reverse, smallRange) .flatMapSequential(keyBuf -> Mono .>>fromCallable(() -> { assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength; // Remove prefix. Keep only the suffix and the ext splitPrefix(keyBuf).close(); suffixKeyLengthConsistency(keyBuf.readableBytes()); T keySuffix; try (var keyBufCopy = keyBuf.copy()) { keySuffix = deserializeSuffix(keyBufCopy); } var bufSupplier = BufSupplier.ofOwned(toKey(keyBuf)); var subStage = new DatabaseMapSingle<>(dictionary, bufSupplier, valueSerializer, null); return new SubStageEntry<>(keySuffix, subStage); }).doOnCancel(() -> { if (keyBuf.isAccessible()) { keyBuf.close(); } }).doOnError(ex -> { if (keyBuf.isAccessible()) { keyBuf.close(); } }) ); } @Override public Flux> getAllValues(@Nullable CompositeSnapshot snapshot, boolean smallRange) { return getAllValues(snapshot, rangeMono, false, smallRange); } /** * Get all values * @param reverse if true, the results will go backwards from the specified key (inclusive) */ public Flux> getAllValues(@Nullable CompositeSnapshot snapshot, @Nullable T keyMin, @Nullable T keyMax, boolean reverse, boolean smallRange) { if (keyMin == null && keyMax == null) { return getAllValues(snapshot, smallRange); } else { Mono boundedRangeMono = Mono.usingWhen(rangeMono, range -> Mono.fromCallable(() -> getPatchedRange(range, keyMin, keyMax)), range -> Mono.fromRunnable(range::close)); return getAllValues(snapshot, boundedRangeMono, reverse, smallRange); } } private Flux> getAllValues(@Nullable CompositeSnapshot snapshot, Mono sliceRangeMono, boolean reverse, boolean smallRange) { return dictionary .getRange(resolveSnapshot(snapshot), sliceRangeMono, reverse, smallRange) .handle((serializedEntry, sink) -> { try { Entry entry; try (serializedEntry) { var keyBuf = serializedEntry.getKeyUnsafe(); assert keyBuf != null; assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength; // Remove prefix. Keep only the suffix and the ext splitPrefix(keyBuf).close(); assert suffixKeyLengthConsistency(keyBuf.readableBytes()); T keySuffix = deserializeSuffix(keyBuf); assert serializedEntry.getValueUnsafe() != null; U value = valueSerializer.deserialize(serializedEntry.getValueUnsafe()); entry = Map.entry(keySuffix, value); } sink.next(entry); } catch (Throwable e) { sink.error(e); } }); } @Override public Flux> setAllValuesAndGetPrevious(Flux> entries) { return Flux.concat( this.getAllValues(null, false), dictionary.setRange(rangeMono, entries.handle(this::serializeEntrySink), false).then(Mono.empty()) ); } @Override public Mono clear() { return Mono.using(rangeSupplier::get, range -> { if (range.isAll()) { return dictionary.clear(); } else if (range.isSingle()) { return dictionary .remove(Mono.fromCallable(range::getSingleUnsafe), LLDictionaryResultType.VOID) .doOnNext(Resource::close) .then(); } else { return dictionary.setRange(rangeMono, Flux.empty(), false); } }, ResourceSupport::close); } }