package it.cavallium.dbengine.database.collections; import io.netty.buffer.ByteBuf; import io.netty.util.ReferenceCounted; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import java.util.List; import java.util.Map; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class SubStageGetterMapDeep> implements SubStageGetter, DatabaseMapDictionaryDeep> { private static final boolean assertsEnabled; static { boolean assertsEnabledTmp = false; //noinspection AssertWithSideEffects assert assertsEnabledTmp = true; //noinspection ConstantConditions assertsEnabled = assertsEnabledTmp; } private final SubStageGetter subStageGetter; private final SerializerFixedBinaryLength keySerializer; private final int keyExtLength; private final boolean enableAssertionsWhenUsingAssertions; public SubStageGetterMapDeep(SubStageGetter subStageGetter, SerializerFixedBinaryLength keySerializer, int keyExtLength, boolean enableAssertionsWhenUsingAssertions) { this.subStageGetter = subStageGetter; this.keySerializer = keySerializer; this.keyExtLength = keyExtLength; assert keyExtConsistency(); this.enableAssertionsWhenUsingAssertions = enableAssertionsWhenUsingAssertions; } private boolean keyExtConsistency() { if (subStageGetter instanceof SubStageGetterMapDeep) { return keyExtLength == ((SubStageGetterMapDeep) subStageGetter).getKeyBinaryLength(); } else if (subStageGetter instanceof SubStageGetterMap) { return keyExtLength == ((SubStageGetterMap) subStageGetter).getKeyBinaryLength(); } else { return true; } } @Override public Mono> subStage(LLDictionary dictionary, @Nullable CompositeSnapshot snapshot, Mono prefixKeyMono, Flux debuggingKeysFlux) { return Mono.usingWhen(prefixKeyMono, prefixKey -> Mono .fromSupplier(() -> DatabaseMapDictionaryDeep .deepIntermediate(dictionary, prefixKey.retain(), keySerializer, subStageGetter, keyExtLength ) ) .transform(mono -> { if (assertsEnabled && enableAssertionsWhenUsingAssertions) { return debuggingKeysFlux.handle((key, sink) -> { try { if (key.readableBytes() != prefixKey.readableBytes() + getKeyBinaryLength()) { sink.error(new IndexOutOfBoundsException()); } else { sink.complete(); } } finally { key.release(); } }).then(mono); } else { return mono; } }), prefixKey -> Mono.fromRunnable(prefixKey::release) ); } @Override public boolean isMultiKey() { return true; } @Override public boolean needsDebuggingKeyFlux() { return assertsEnabled && enableAssertionsWhenUsingAssertions; } private Mono checkKeyFluxConsistency(ByteBuf prefixKey, List keys) { return Mono .fromCallable(() -> { try { for (ByteBuf key : keys) { assert key.readableBytes() == prefixKey.readableBytes() + getKeyBinaryLength(); } } finally { prefixKey.release(); for (ByteBuf key : keys) { key.release(); } } return null; }); } public int getKeyBinaryLength() { return keySerializer.getSerializedBinaryLength() + keyExtLength; } }