package it.cavallium.dbengine.database.collections; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.util.IllegalReferenceCountException; import io.netty.util.ReferenceCounted; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.disk.LLLocalDictionary; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; // todo: implement optimized methods (which?) public class DatabaseMapDictionaryDeep> implements DatabaseStageMap { protected final LLDictionary dictionary; private final ByteBufAllocator alloc; protected final SubStageGetter subStageGetter; protected final SerializerFixedBinaryLength keySuffixSerializer; protected final ByteBuf keyPrefix; protected final int keyPrefixLength; protected final int keySuffixLength; protected final int keyExtLength; protected final LLRange range; protected final Mono rangeMono; private volatile boolean released; private static ByteBuf incrementPrefix(ByteBufAllocator alloc, ByteBuf originalKey, int prefixLength) { try { assert originalKey.readableBytes() >= prefixLength; ByteBuf copiedBuf = alloc.buffer(originalKey.writerIndex(), originalKey.writerIndex() + 1); try { boolean overflowed = true; final int ff = 0xFF; int writtenBytes = 0; copiedBuf.writerIndex(prefixLength); for (int i = prefixLength - 1; i >= 0; i--) { int iByte = originalKey.getUnsignedByte(i); if (iByte != ff) { copiedBuf.setByte(i, iByte + 1); writtenBytes++; overflowed = false; break; } else { copiedBuf.setByte(i, 0x00); writtenBytes++; overflowed = true; } } assert prefixLength - writtenBytes >= 0; if (prefixLength - writtenBytes > 0) { copiedBuf.setBytes(0, originalKey, 0, (prefixLength - writtenBytes)); } copiedBuf.writerIndex(copiedBuf.capacity()); if (originalKey.writerIndex() - prefixLength > 0) { copiedBuf.setBytes(prefixLength, originalKey, prefixLength, originalKey.writerIndex() - prefixLength); } if (overflowed) { for (int i = 0; i < copiedBuf.writerIndex(); i++) { copiedBuf.setByte(i, 0xFF); } copiedBuf.writeZero(1); } return copiedBuf.retain(); } finally { copiedBuf.release(); } } finally { originalKey.release(); } } static ByteBuf firstRangeKey(ByteBufAllocator alloc, ByteBuf prefixKey, int prefixLength, int suffixLength, int extLength) { return zeroFillKeySuffixAndExt(alloc, prefixKey, prefixLength, suffixLength, extLength); } static ByteBuf nextRangeKey(ByteBufAllocator alloc, ByteBuf prefixKey, int prefixLength, int suffixLength, int extLength) { try { ByteBuf nonIncremented = zeroFillKeySuffixAndExt(alloc, prefixKey.retain(), prefixLength, suffixLength, extLength); try { return incrementPrefix(alloc, nonIncremented.retain(), prefixLength); } finally { nonIncremented.release(); } } finally { prefixKey.release(); } } protected static ByteBuf zeroFillKeySuffixAndExt(ByteBufAllocator alloc, ByteBuf prefixKey, int prefixLength, int suffixLength, int extLength) { try { assert prefixKey.readableBytes() == prefixLength; assert suffixLength > 0; assert extLength >= 0; ByteBuf zeroSuffixAndExt = alloc.buffer(suffixLength + extLength, suffixLength + extLength); try { zeroSuffixAndExt.writeZero(suffixLength + extLength); ByteBuf result = LLUtils.compositeBuffer(alloc, prefixKey.retain(), zeroSuffixAndExt.retain()); try { return result.retain(); } finally { result.release(); } } finally { zeroSuffixAndExt.release(); } } finally { prefixKey.release(); } } static ByteBuf firstRangeKey( ByteBufAllocator alloc, ByteBuf prefixKey, ByteBuf suffixKey, int prefixLength, int suffixLength, int extLength) { return zeroFillKeyExt(alloc, prefixKey, suffixKey, prefixLength, suffixLength, extLength); } static ByteBuf nextRangeKey( ByteBufAllocator alloc, ByteBuf prefixKey, ByteBuf suffixKey, int prefixLength, int suffixLength, int extLength) { try { ByteBuf nonIncremented = zeroFillKeyExt(alloc, prefixKey.retain(), suffixKey.retain(), prefixLength, suffixLength, extLength ); try { return incrementPrefix(alloc, nonIncremented.retain(), prefixLength + suffixLength); } finally { nonIncremented.release(); } } finally { prefixKey.release(); suffixKey.release(); } } protected static ByteBuf zeroFillKeyExt( ByteBufAllocator alloc, ByteBuf prefixKey, ByteBuf suffixKey, int prefixLength, int suffixLength, int extLength) { try { assert prefixKey.readableBytes() == prefixLength; assert suffixKey.readableBytes() == suffixLength; assert suffixLength > 0; assert extLength >= 0; ByteBuf result = LLUtils.compositeBuffer(alloc, prefixKey.retain(), suffixKey.retain(), alloc.buffer(extLength, extLength).writeZero(extLength) ); try { assert result.readableBytes() == prefixLength + suffixLength + extLength; return result.retain(); } finally { result.release(); } } finally { prefixKey.release(); suffixKey.release(); } } /** * Use DatabaseMapDictionaryRange.simple instead */ @Deprecated public static DatabaseMapDictionaryDeep> simple(LLDictionary dictionary, SerializerFixedBinaryLength keySerializer, SubStageGetterSingle subStageGetter) { return new DatabaseMapDictionaryDeep<>(dictionary, dictionary.getAllocator().buffer(0), keySerializer, subStageGetter, 0 ); } public static > DatabaseMapDictionaryDeep deepTail(LLDictionary dictionary, SerializerFixedBinaryLength keySerializer, int keyExtLength, SubStageGetter subStageGetter) { return new DatabaseMapDictionaryDeep<>(dictionary, dictionary.getAllocator().buffer(0), keySerializer, subStageGetter, keyExtLength ); } public static > DatabaseMapDictionaryDeep deepIntermediate(LLDictionary dictionary, ByteBuf prefixKey, SerializerFixedBinaryLength keySuffixSerializer, SubStageGetter subStageGetter, int keyExtLength) { return new DatabaseMapDictionaryDeep<>(dictionary, prefixKey, keySuffixSerializer, subStageGetter, keyExtLength); } protected DatabaseMapDictionaryDeep(LLDictionary dictionary, ByteBuf prefixKey, SerializerFixedBinaryLength keySuffixSerializer, SubStageGetter subStageGetter, int keyExtLength) { try { this.dictionary = dictionary; this.alloc = dictionary.getAllocator(); this.subStageGetter = subStageGetter; this.keySuffixSerializer = keySuffixSerializer; assert prefixKey.refCnt() > 0; this.keyPrefix = prefixKey.retain(); assert keyPrefix.refCnt() > 0; this.keyPrefixLength = keyPrefix.readableBytes(); this.keySuffixLength = keySuffixSerializer.getSerializedBinaryLength(); this.keyExtLength = keyExtLength; ByteBuf firstKey = firstRangeKey(alloc, keyPrefix.retain(), keyPrefixLength, keySuffixLength, keyExtLength ); try { ByteBuf nextRangeKey = nextRangeKey(alloc, keyPrefix.retain(), keyPrefixLength, keySuffixLength, keyExtLength ); try { assert keyPrefix.refCnt() > 0; assert keyPrefixLength == 0 || !LLUtils.equals(firstKey, nextRangeKey); this.range = keyPrefixLength == 0 ? LLRange.all() : LLRange.of(firstKey.retain(), nextRangeKey.retain()); this.rangeMono = LLUtils.lazyRetainRange(this.range); assert subStageKeysConsistency(keyPrefixLength + keySuffixLength + keyExtLength); } finally { nextRangeKey.release(); } } finally { firstKey.release(); } } finally { prefixKey.release(); } } @SuppressWarnings("unused") protected boolean suffixKeyConsistency(int keySuffixLength) { return this.keySuffixLength == keySuffixLength; } @SuppressWarnings("unused") protected boolean extKeyConsistency(int keyExtLength) { return this.keyExtLength == keyExtLength; } @SuppressWarnings("unused") protected boolean suffixAndExtKeyConsistency(int keySuffixAndExtLength) { return this.keySuffixLength + this.keyExtLength == keySuffixAndExtLength; } /** * Keep only suffix and ext */ protected ByteBuf stripPrefix(ByteBuf key, boolean slice) { try { if (slice) { return key.retainedSlice(this.keyPrefixLength, key.readableBytes() - this.keyPrefixLength); } else { return key.retain().readerIndex(key.readerIndex() + keyPrefixLength); } } finally { key.release(); } } /** * Remove ext from full key */ protected ByteBuf removeExtFromFullKey(ByteBuf key, boolean slice) { try { if (slice) { return key.retainedSlice(key.readerIndex(), keyPrefixLength + keySuffixLength); } else { return key.retain().writerIndex(key.writerIndex() - (keyPrefixLength + keySuffixLength)); } } finally { key.release(); } } /** * Add prefix to suffix */ protected ByteBuf toKeyWithoutExt(ByteBuf suffixKey) { try { assert suffixKey.readableBytes() == keySuffixLength; ByteBuf result = LLUtils.compositeBuffer(alloc, keyPrefix.retain(), suffixKey.retain()); assert keyPrefix.refCnt() > 0; try { assert result.readableBytes() == keyPrefixLength + keySuffixLength; return result.retain(); } finally { result.release(); } } finally { suffixKey.release(); } } protected LLSnapshot resolveSnapshot(@Nullable CompositeSnapshot snapshot) { if (snapshot == null) { return null; } else { return snapshot.getSnapshot(dictionary); } } protected LLRange toExtRange(ByteBuf keySuffix) { try { ByteBuf first = firstRangeKey(alloc, keyPrefix.retain(), keySuffix.retain(), keyPrefixLength, keySuffixLength, keyExtLength ); ByteBuf end = nextRangeKey(alloc, keyPrefix.retain(), keySuffix.retain(), keyPrefixLength, keySuffixLength, keyExtLength ); assert keyPrefix.refCnt() > 0; return LLRange.of(first, end); } finally { keySuffix.release(); } } @Override public Mono leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) { return dictionary.sizeRange(resolveSnapshot(snapshot), rangeMono, fast); } @Override public Mono isEmpty(@Nullable CompositeSnapshot snapshot) { return dictionary.isRangeEmpty(resolveSnapshot(snapshot), rangeMono); } @Override public Mono at(@Nullable CompositeSnapshot snapshot, T keySuffix) { return Mono.using( () -> serializeSuffix(keySuffix), keySuffixData -> { Flux debuggingKeysFlux = Mono.>defer(() -> { if (LLLocalDictionary.DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED && this.subStageGetter.needsDebuggingKeyFlux()) { return Flux .using( () -> toExtRange(keySuffixData.retain()), extRangeBuf -> this.dictionary .getRangeKeys(resolveSnapshot(snapshot), LLUtils.lazyRetainRange(extRangeBuf)), LLRange::release ) .collectList(); } else { return Mono.just(List.of()); } }).flatMapIterable(it -> it); return Mono.using( () -> toKeyWithoutExt(keySuffixData.retain()), keyWithoutExt -> this.subStageGetter .subStage(dictionary, snapshot, LLUtils.lazyRetain(keyWithoutExt), debuggingKeysFlux), ReferenceCounted::release ); }, ReferenceCounted::release ).doOnDiscard(DatabaseStage.class, DatabaseStage::release); } @Override public Mono getUpdateMode() { return dictionary.getUpdateMode(); } @Override public Flux badBlocks() { return dictionary.badBlocks(rangeMono); } private static record GroupBuffers(ByteBuf groupKeyWithExt, ByteBuf groupKeyWithoutExt, ByteBuf groupSuffix) {} @Override public Flux> getAllStages(@Nullable CompositeSnapshot snapshot) { return Flux .defer(() -> { if (LLLocalDictionary.DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED && this.subStageGetter.needsDebuggingKeyFlux()) { return dictionary .getRangeKeysGrouped(resolveSnapshot(snapshot), rangeMono, keyPrefixLength + keySuffixLength) .concatMap(rangeKeys -> Flux .using( () -> { assert this.subStageGetter.isMultiKey() || rangeKeys.size() == 1; ByteBuf groupKeyWithExt = rangeKeys.get(0).retainedSlice(); ByteBuf groupKeyWithoutExt = removeExtFromFullKey(groupKeyWithExt.retain(), true); ByteBuf groupSuffix = this.stripPrefix(groupKeyWithoutExt.retain(), true); return new GroupBuffers(groupKeyWithExt, groupKeyWithoutExt, groupSuffix); }, buffers -> Mono .fromCallable(() -> { assert subStageKeysConsistency(buffers.groupKeyWithExt.readableBytes()); return null; }) .then(this.subStageGetter .subStage(dictionary, snapshot, LLUtils.lazyRetain(buffers.groupKeyWithoutExt), Flux.fromIterable(rangeKeys).map(ByteBuf::retain) ) .map(us -> Map.entry(this.deserializeSuffix(buffers.groupSuffix.retain()), us)) ), buffers -> { buffers.groupSuffix.release(); buffers.groupKeyWithoutExt.release(); buffers.groupKeyWithExt.release(); } ) .doAfterTerminate(() -> { for (ByteBuf rangeKey : rangeKeys) { rangeKey.release(); } }) ) .doOnDiscard(Collection.class, discardedCollection -> { for (Object o : discardedCollection) { if (o instanceof ByteBuf byteBuf) { byteBuf.release(); } } }); } else { return Flux .defer(() -> dictionary.getRangeKeyPrefixes(resolveSnapshot(snapshot), rangeMono, keyPrefixLength + keySuffixLength)) .flatMapSequential(groupKeyWithoutExt -> Mono .using( () -> { var groupSuffix = this.stripPrefix(groupKeyWithoutExt.retain(), true); assert subStageKeysConsistency(groupKeyWithoutExt.readableBytes() + keyExtLength); return groupSuffix; }, groupSuffix -> this.subStageGetter .subStage(dictionary, snapshot, LLUtils.lazyRetain(groupKeyWithoutExt), Flux.empty() ) .map(us -> Map.entry(this.deserializeSuffix(groupSuffix.retain()), us)), ReferenceCounted::release ) ); } }); } private boolean subStageKeysConsistency(int totalKeyLength) { if (subStageGetter instanceof SubStageGetterMapDeep) { return totalKeyLength == keyPrefixLength + keySuffixLength + ((SubStageGetterMapDeep) subStageGetter).getKeyBinaryLength(); } else if (subStageGetter instanceof SubStageGetterMap) { return totalKeyLength == keyPrefixLength + keySuffixLength + ((SubStageGetterMap) subStageGetter).getKeyBinaryLength(); } else { return true; } } @Override public Flux> setAllValuesAndGetPrevious(Flux> entries) { return this .getAllValues(null) .concatWith(this .clear() .then(this.putMulti(entries)) .then(Mono.empty()) ); } @Override public Mono clear() { return Mono .defer(() -> { if (range.isAll()) { return dictionary.clear(); } else if (range.isSingle()) { return dictionary .remove(LLUtils.lazyRetain(range.getSingle()), LLDictionaryResultType.VOID) .doOnNext(ReferenceCounted::release) .then(); } else { return dictionary.setRange(LLUtils.lazyRetainRange(range), Flux.empty()); } }); } //todo: temporary wrapper. convert the whole class to buffers protected T deserializeSuffix(ByteBuf keySuffix) { try { assert suffixKeyConsistency(keySuffix.readableBytes()); var result = keySuffixSerializer.deserialize(keySuffix.retain()); assert keyPrefix.refCnt() > 0; return result; } finally { keySuffix.release(); } } //todo: temporary wrapper. convert the whole class to buffers protected ByteBuf serializeSuffix(T keySuffix) { ByteBuf suffixData = keySuffixSerializer.serialize(keySuffix); assert suffixKeyConsistency(suffixData.readableBytes()); assert keyPrefix.refCnt() > 0; return suffixData; } @Override public void release() { if (!released) { released = true; this.range.release(); this.keyPrefix.release(); } else { throw new IllegalReferenceCountException(0, -1); } } }