package it.cavallium.dbengine.database.collections; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.util.ReferenceCounted; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.disk.LLLocalDictionary; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.stream.Collectors; import lombok.Value; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; // todo: implement optimized methods public class DatabaseMapDictionaryDeep> implements DatabaseStageMap { protected final LLDictionary dictionary; private final ByteBufAllocator alloc; protected final SubStageGetter subStageGetter; protected final SerializerFixedBinaryLength keySuffixSerializer; protected final ByteBuf keyPrefix; protected final int keyPrefixLength; protected final int keySuffixLength; protected final int keyExtLength; protected final LLRange range; private volatile boolean released; private static ByteBuf incrementPrefix(ByteBufAllocator alloc, ByteBuf originalKey, int prefixLength) { try { assert originalKey.readableBytes() >= prefixLength; ByteBuf copiedBuf = alloc.buffer(originalKey.writerIndex(), originalKey.writerIndex() + 1); try { boolean overflowed = true; final int ff = 0xFF; int writtenBytes = 0; copiedBuf.writerIndex(prefixLength); for (int i = prefixLength - 1; i >= 0; i--) { int iByte = originalKey.getUnsignedByte(i); if (iByte != ff) { copiedBuf.setByte(i, iByte + 1); writtenBytes++; overflowed = false; break; } else { copiedBuf.setByte(i, 0x00); writtenBytes++; overflowed = true; } } assert prefixLength - writtenBytes >= 0; if (prefixLength - writtenBytes > 0) { copiedBuf.setBytes(0, originalKey, 0, (prefixLength - writtenBytes)); } copiedBuf.writerIndex(copiedBuf.capacity()); if (originalKey.writerIndex() - prefixLength > 0) { copiedBuf.setBytes(prefixLength, originalKey, prefixLength, originalKey.writerIndex() - prefixLength); } if (overflowed) { for (int i = 0; i < copiedBuf.writerIndex(); i++) { copiedBuf.setByte(i, 0xFF); } copiedBuf.writeZero(1); } return copiedBuf.retain(); } finally { copiedBuf.release(); } } finally { originalKey.release(); } } static ByteBuf firstRangeKey(ByteBufAllocator alloc, ByteBuf prefixKey, int prefixLength, int suffixLength, int extLength) { return zeroFillKeySuffixAndExt(alloc, prefixKey, prefixLength, suffixLength, extLength); } static ByteBuf nextRangeKey(ByteBufAllocator alloc, ByteBuf prefixKey, int prefixLength, int suffixLength, int extLength) { try { ByteBuf nonIncremented = zeroFillKeySuffixAndExt(alloc, prefixKey.retain(), prefixLength, suffixLength, extLength); try { return incrementPrefix(alloc, nonIncremented.retain(), prefixLength); } finally { nonIncremented.release(); } } finally { prefixKey.release(); } } protected static ByteBuf zeroFillKeySuffixAndExt(ByteBufAllocator alloc, ByteBuf prefixKey, int prefixLength, int suffixLength, int extLength) { try { assert prefixKey.readableBytes() == prefixLength; assert suffixLength > 0; assert extLength >= 0; ByteBuf zeroSuffixAndExt = alloc.buffer(suffixLength + extLength, suffixLength + extLength); try { zeroSuffixAndExt.writeZero(suffixLength + extLength); ByteBuf result = LLUtils.compositeBuffer(alloc, prefixKey.retain(), zeroSuffixAndExt.retain()); try { return result.retain(); } finally { result.release(); } } finally { zeroSuffixAndExt.release(); } } finally { prefixKey.release(); } } static ByteBuf firstRangeKey( ByteBufAllocator alloc, ByteBuf prefixKey, ByteBuf suffixKey, int prefixLength, int suffixLength, int extLength) { return zeroFillKeyExt(alloc, prefixKey, suffixKey, prefixLength, suffixLength, extLength); } static ByteBuf nextRangeKey( ByteBufAllocator alloc, ByteBuf prefixKey, ByteBuf suffixKey, int prefixLength, int suffixLength, int extLength) { try { ByteBuf nonIncremented = zeroFillKeyExt(alloc, prefixKey.retain(), suffixKey.retain(), prefixLength, suffixLength, extLength ); try { return incrementPrefix(alloc, nonIncremented.retain(), prefixLength + suffixLength); } finally { nonIncremented.release(); } } finally { prefixKey.release(); suffixKey.release(); } } protected static ByteBuf zeroFillKeyExt( ByteBufAllocator alloc, ByteBuf prefixKey, ByteBuf suffixKey, int prefixLength, int suffixLength, int extLength) { try { assert prefixKey.readableBytes() == prefixLength; assert suffixKey.readableBytes() == suffixLength; assert suffixLength > 0; assert extLength >= 0; ByteBuf result = LLUtils.compositeBuffer(alloc, prefixKey.retain(), suffixKey.retain(), alloc.buffer(extLength, extLength).writeZero(extLength) ); try { assert result.readableBytes() == prefixLength + suffixLength + extLength; return result.retain(); } finally { result.release(); } } finally { prefixKey.release(); suffixKey.release(); } } /** * Use DatabaseMapDictionaryRange.simple instead */ @Deprecated public static DatabaseMapDictionaryDeep> simple(LLDictionary dictionary, SerializerFixedBinaryLength keySerializer, SubStageGetterSingle subStageGetter) { return new DatabaseMapDictionaryDeep<>(dictionary, dictionary.getAllocator().buffer(0), keySerializer, subStageGetter, 0 ); } public static > DatabaseMapDictionaryDeep deepTail(LLDictionary dictionary, SerializerFixedBinaryLength keySerializer, int keyExtLength, SubStageGetter subStageGetter) { return new DatabaseMapDictionaryDeep<>(dictionary, dictionary.getAllocator().buffer(0), keySerializer, subStageGetter, keyExtLength ); } public static > DatabaseMapDictionaryDeep deepIntermediate(LLDictionary dictionary, ByteBuf prefixKey, SerializerFixedBinaryLength keySuffixSerializer, SubStageGetter subStageGetter, int keyExtLength) { return new DatabaseMapDictionaryDeep<>(dictionary, prefixKey, keySuffixSerializer, subStageGetter, keyExtLength); } protected DatabaseMapDictionaryDeep(LLDictionary dictionary, ByteBuf prefixKey, SerializerFixedBinaryLength keySuffixSerializer, SubStageGetter subStageGetter, int keyExtLength) { try { this.dictionary = dictionary; this.alloc = dictionary.getAllocator(); this.subStageGetter = subStageGetter; this.keySuffixSerializer = keySuffixSerializer; assert prefixKey.refCnt() > 0; this.keyPrefix = prefixKey.retain(); assert keyPrefix.refCnt() > 0; this.keyPrefixLength = keyPrefix.readableBytes(); this.keySuffixLength = keySuffixSerializer.getSerializedBinaryLength(); this.keyExtLength = keyExtLength; ByteBuf firstKey = firstRangeKey(alloc, keyPrefix.retain(), keyPrefixLength, keySuffixLength, keyExtLength ); try { ByteBuf nextRangeKey = nextRangeKey(alloc, keyPrefix.retain(), keyPrefixLength, keySuffixLength, keyExtLength ); try { assert keyPrefix.refCnt() > 0; assert keyPrefixLength == 0 || !LLUtils.equals(firstKey, nextRangeKey); this.range = keyPrefixLength == 0 ? LLRange.all() : LLRange.of(firstKey.retain(), nextRangeKey.retain()); assert subStageKeysConsistency(keyPrefixLength + keySuffixLength + keyExtLength); } finally { nextRangeKey.release(); } } finally { firstKey.release(); } } finally { prefixKey.release(); } } @SuppressWarnings("unused") protected boolean suffixKeyConsistency(int keySuffixLength) { return this.keySuffixLength == keySuffixLength; } @SuppressWarnings("unused") protected boolean extKeyConsistency(int keyExtLength) { return this.keyExtLength == keyExtLength; } @SuppressWarnings("unused") protected boolean suffixAndExtKeyConsistency(int keySuffixAndExtLength) { return this.keySuffixLength + this.keyExtLength == keySuffixAndExtLength; } /** * Keep only suffix and ext */ protected ByteBuf stripPrefix(ByteBuf key, boolean slice) { try { if (slice) { return key.retainedSlice(this.keyPrefixLength, key.readableBytes() - this.keyPrefixLength); } else { return key.retain().readerIndex(key.readerIndex() + keyPrefixLength); } } finally { key.release(); } } /** * Remove ext from full key */ protected ByteBuf removeExtFromFullKey(ByteBuf key, boolean slice) { try { if (slice) { return key.retainedSlice(key.readerIndex(), keyPrefixLength + keySuffixLength); } else { return key.retain().writerIndex(key.writerIndex() - (keyPrefixLength + keySuffixLength)); } } finally { key.release(); } } /** * Add prefix to suffix */ protected ByteBuf toKeyWithoutExt(ByteBuf suffixKey) { try { assert suffixKey.readableBytes() == keySuffixLength; ByteBuf result = LLUtils.compositeBuffer(alloc, keyPrefix.retain(), suffixKey.retain()); assert keyPrefix.refCnt() > 0; try { assert result.readableBytes() == keyPrefixLength + keySuffixLength; return result.retain(); } finally { result.release(); } } finally { suffixKey.release(); } } protected LLSnapshot resolveSnapshot(@Nullable CompositeSnapshot snapshot) { if (snapshot == null) { return null; } else { return snapshot.getSnapshot(dictionary); } } protected LLRange toExtRange(ByteBuf keySuffix) { try { ByteBuf first = firstRangeKey(alloc, keyPrefix.retain(), keySuffix.retain(), keyPrefixLength, keySuffixLength, keyExtLength ); ByteBuf end = nextRangeKey(alloc, keyPrefix.retain(), keySuffix.retain(), keyPrefixLength, keySuffixLength, keyExtLength ); assert keyPrefix.refCnt() > 0; return LLRange.of(first, end); } finally { keySuffix.release(); } } @Override public Mono leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) { return Mono.defer(() -> dictionary.sizeRange(resolveSnapshot(snapshot), range.retain(), fast)); } @Override public Mono isEmpty(@Nullable CompositeSnapshot snapshot) { return Mono.defer(() -> dictionary.isRangeEmpty(resolveSnapshot(snapshot), range.retain())); } @Override public Mono at(@Nullable CompositeSnapshot snapshot, T keySuffix) { return Mono .using( () -> serializeSuffix(keySuffix), keySuffixData -> { Mono> debuggingKeysMono = Mono .defer(() -> { if (LLLocalDictionary.DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED && this.subStageGetter.needsDebuggingKeyFlux()) { return Flux .using( () -> toExtRange(keySuffixData.retain()), extRangeBuf -> this.dictionary .getRangeKeys(resolveSnapshot(snapshot), extRangeBuf.retain()), LLRange::release ) .collectList(); } else { return Mono.just(List.of()); } }); return Mono .using( () -> toKeyWithoutExt(keySuffixData.retain()), keyBuf -> debuggingKeysMono .flatMap(debuggingKeysList -> this.subStageGetter .subStage(dictionary, snapshot, keyBuf.retain(), debuggingKeysList) ), ReferenceCounted::release ) .doOnDiscard(DatabaseStage.class, DatabaseStage::release); }, ReferenceCounted::release ) .doOnDiscard(DatabaseStage.class, DatabaseStage::release); } @Override public Mono getUpdateMode() { return dictionary.getUpdateMode(); } @Value private static class GroupBuffers { ByteBuf groupKeyWithExt; ByteBuf groupKeyWithoutExt; ByteBuf groupSuffix; } @Override public Flux> getAllStages(@Nullable CompositeSnapshot snapshot) { if (LLLocalDictionary.DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED && this.subStageGetter.needsDebuggingKeyFlux()) { return Flux .defer(() -> dictionary .getRangeKeysGrouped(resolveSnapshot(snapshot), range.retain(), keyPrefixLength + keySuffixLength) ) .flatMapSequential(rangeKeys -> Flux .using( () -> { assert this.subStageGetter.isMultiKey() || rangeKeys.size() == 1; ByteBuf groupKeyWithExt = rangeKeys.get(0).retainedSlice(); ByteBuf groupKeyWithoutExt = removeExtFromFullKey(groupKeyWithExt.retain(), true); ByteBuf groupSuffix = this.stripPrefix(groupKeyWithoutExt.retain(), true); return new GroupBuffers(groupKeyWithExt, groupKeyWithoutExt, groupSuffix); }, buffers -> Mono .fromCallable(() -> { assert subStageKeysConsistency(buffers.groupKeyWithExt.readableBytes()); return null; }) .then(this.subStageGetter .subStage(dictionary, snapshot, buffers.groupKeyWithoutExt.retain(), rangeKeys.stream().map(ByteBuf::retain).collect(Collectors.toList()) ) .map(us -> Map.entry(this.deserializeSuffix(buffers.groupSuffix.retain()), us)) ), buffers -> { buffers.groupSuffix.release(); buffers.groupKeyWithoutExt.release(); buffers.groupKeyWithExt.release(); } ) .doFinally(s -> { for (ByteBuf rangeKey : rangeKeys) { rangeKey.release(); } }) ) .doOnDiscard(Collection.class, discardedCollection -> { //noinspection unchecked var rangeKeys = (Collection) discardedCollection; for (ByteBuf rangeKey : rangeKeys) { rangeKey.release(); } }); } else { return Flux .defer(() -> dictionary .getRangeKeyPrefixes(resolveSnapshot(snapshot), range.retain(), keyPrefixLength + keySuffixLength) ) .flatMapSequential(groupKeyWithoutExt -> { ByteBuf groupSuffix = this.stripPrefix(groupKeyWithoutExt.retain(), true); assert subStageKeysConsistency(groupKeyWithoutExt.readableBytes() + keyExtLength); return this.subStageGetter .subStage(dictionary, snapshot, groupKeyWithoutExt.retain(), List.of() ) .map(us -> Map.entry(this.deserializeSuffix(groupSuffix.retain()), us)) .doFinally(s -> groupSuffix.release()); }); } } private boolean subStageKeysConsistency(int totalKeyLength) { if (subStageGetter instanceof SubStageGetterMapDeep) { return totalKeyLength == keyPrefixLength + keySuffixLength + ((SubStageGetterMapDeep) subStageGetter).getKeyBinaryLength(); } else if (subStageGetter instanceof SubStageGetterMap) { return totalKeyLength == keyPrefixLength + keySuffixLength + ((SubStageGetterMap) subStageGetter).getKeyBinaryLength(); } else { return true; } } @Override public Flux> setAllValuesAndGetPrevious(Flux> entries) { return this .getAllValues(null) .concatWith(this .clear() .then(entries .flatMap(entry -> this .at(null, entry.getKey()) .flatMap(us -> us .set(entry.getValue()) .doFinally(s -> us.release()) ) ) .doOnDiscard(DatabaseStage.class, DatabaseStage::release) .then(Mono.empty()) ) ); } @Override public Mono clear() { if (range.isAll()) { return dictionary .clear(); } else if (range.isSingle()) { return Mono .defer(() -> dictionary .remove(range.getSingle().retain(), LLDictionaryResultType.VOID) .doOnNext(ReferenceCounted::release) ) .then(); } else { return Mono .defer(() -> dictionary .setRange(range.retain(), Flux.empty()) ); } } //todo: temporary wrapper. convert the whole class to buffers protected T deserializeSuffix(ByteBuf keySuffix) { try { assert suffixKeyConsistency(keySuffix.readableBytes()); var result = keySuffixSerializer.deserialize(keySuffix.retain()); assert keyPrefix.refCnt() > 0; return result; } finally { keySuffix.release(); } } //todo: temporary wrapper. convert the whole class to buffers protected ByteBuf serializeSuffix(T keySuffix) { ByteBuf suffixData = keySuffixSerializer.serialize(keySuffix); assert suffixKeyConsistency(suffixData.readableBytes()); assert keyPrefix.refCnt() > 0; return suffixData; } @Override public void release() { if (!released) { released = true; this.range.release(); this.keyPrefix.release(); } else { throw new IllegalStateException("Already released"); } } }