CavalliumDBEngine/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java

607 lines
22 KiB
Java
Raw Normal View History

2021-01-31 21:23:43 +01:00
package it.cavallium.dbengine.database.collections;
2022-03-16 13:47:56 +01:00
import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator;
2022-03-16 22:41:51 +01:00
import io.netty5.buffer.api.DefaultBufferAllocators;
2022-03-16 13:47:56 +01:00
import io.netty5.buffer.api.Drop;
import io.netty5.buffer.api.Owned;
import io.netty5.buffer.api.Resource;
import io.netty5.buffer.api.Send;
import io.netty5.buffer.api.internal.ResourceSupport;
2021-06-26 02:35:33 +02:00
import it.cavallium.dbengine.client.BadBlock;
2021-01-31 21:23:43 +01:00
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
2021-03-14 03:13:19 +01:00
import it.cavallium.dbengine.database.LLDictionaryResultType;
2021-01-31 21:23:43 +01:00
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
2021-05-02 19:18:15 +02:00
import it.cavallium.dbengine.database.UpdateMode;
2022-03-20 14:33:27 +01:00
import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing;
2021-08-22 21:23:22 +02:00
import it.cavallium.dbengine.database.serialization.SerializationException;
2022-03-20 14:33:27 +01:00
import it.cavallium.dbengine.database.serialization.Serializer;
2021-02-02 19:40:37 +01:00
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
2021-12-18 18:16:56 +01:00
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap;
2022-03-20 14:33:27 +01:00
import java.util.List;
2021-01-31 21:23:43 +01:00
import java.util.Map;
import java.util.Map.Entry;
2022-03-20 14:33:27 +01:00
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionException;
2022-03-12 02:55:18 +01:00
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.function.TriFunction;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
2021-01-31 21:23:43 +01:00
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
2021-05-02 19:18:15 +02:00
// todo: implement optimized methods (which?)
2021-12-18 18:16:56 +01:00
public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extends
ResourceSupport<DatabaseStage<Object2ObjectSortedMap<T, U>>, DatabaseMapDictionaryDeep<T, U, US>> implements
DatabaseStageMap<T, U, US> {
2021-01-31 21:23:43 +01:00
2022-03-11 17:59:46 +01:00
private static final Logger LOG = LogManager.getLogger(DatabaseMapDictionaryDeep.class);
2021-10-01 19:17:33 +02:00
private static final Drop<DatabaseMapDictionaryDeep<?, ?, ?>> DROP = new Drop<>() {
@Override
public void drop(DatabaseMapDictionaryDeep<?, ?, ?> obj) {
try {
if (obj.range != null) {
obj.range.close();
}
} catch (Throwable ex) {
2022-03-11 17:59:46 +01:00
LOG.error("Failed to close range", ex);
2021-10-01 19:17:33 +02:00
}
try {
if (obj.keyPrefix != null) {
obj.keyPrefix.close();
}
} catch (Throwable ex) {
2022-03-11 17:59:46 +01:00
LOG.error("Failed to close keyPrefix", ex);
2021-10-01 19:17:33 +02:00
}
2022-03-16 22:41:51 +01:00
try {
if (obj.keySuffixAndExtZeroBuffer != null) {
obj.keySuffixAndExtZeroBuffer.close();
}
} catch (Throwable ex) {
LOG.error("Failed to close keySuffixAndExtZeroBuffer", ex);
}
2021-10-01 19:17:33 +02:00
try {
if (obj.onClose != null) {
obj.onClose.run();
}
} catch (Throwable ex) {
2022-03-11 17:59:46 +01:00
LOG.error("Failed to close onClose", ex);
2021-10-01 19:17:33 +02:00
}
}
@Override
public Drop<DatabaseMapDictionaryDeep<?, ?, ?>> fork() {
return this;
}
@Override
public void attach(DatabaseMapDictionaryDeep<?, ?, ?> obj) {
}
};
2021-01-31 21:23:43 +01:00
protected final LLDictionary dictionary;
2022-03-20 14:33:27 +01:00
protected final BufferAllocator alloc;
2022-03-12 02:55:18 +01:00
private final AtomicLong totalZeroBytesErrors = new AtomicLong();
2021-01-31 21:23:43 +01:00
protected final SubStageGetter<U, US> subStageGetter;
2021-09-02 17:15:40 +02:00
protected final SerializerFixedBinaryLength<T> keySuffixSerializer;
protected final int keyPrefixLength;
2021-01-31 21:23:43 +01:00
protected final int keySuffixLength;
protected final int keyExtLength;
2021-08-29 23:18:03 +02:00
protected final Mono<Send<LLRange>> rangeMono;
2021-09-23 20:57:28 +02:00
protected LLRange range;
protected Buffer keyPrefix;
2022-03-16 22:41:51 +01:00
protected Buffer keySuffixAndExtZeroBuffer;
2021-10-01 19:17:33 +02:00
protected Runnable onClose;
2021-01-31 21:23:43 +01:00
2021-09-22 18:33:28 +02:00
private static void incrementPrefix(Buffer prefix, int prefixLength) {
assert prefix.readableBytes() >= prefixLength;
assert prefix.readerOffset() == 0;
final var originalKeyLength = prefix.readableBytes();
boolean overflowed = true;
final int ff = 0xFF;
int writtenBytes = 0;
for (int i = prefixLength - 1; i >= 0; i--) {
int iByte = prefix.getUnsignedByte(i);
if (iByte != ff) {
prefix.setUnsignedByte(i, iByte + 1);
writtenBytes++;
overflowed = false;
break;
} else {
prefix.setUnsignedByte(i, 0x00);
writtenBytes++;
2021-03-13 19:01:36 +01:00
}
}
2021-09-22 18:33:28 +02:00
assert prefixLength - writtenBytes >= 0;
if (overflowed) {
assert prefix.writerOffset() == originalKeyLength;
prefix.ensureWritable(1, 1, true);
prefix.writerOffset(originalKeyLength + 1);
for (int i = 0; i < originalKeyLength; i++) {
prefix.setUnsignedByte(i, 0xFF);
}
prefix.setUnsignedByte(originalKeyLength, (byte) 0x00);
}
2021-03-13 19:01:36 +01:00
}
2022-03-16 22:41:51 +01:00
static void firstRangeKey(Buffer prefixKey, int prefixLength, Buffer suffixAndExtZeroes) {
zeroFillKeySuffixAndExt(prefixKey, prefixLength, suffixAndExtZeroes);
}
static void nextRangeKey(Buffer prefixKey, int prefixLength, Buffer suffixAndExtZeroes) {
zeroFillKeySuffixAndExt(prefixKey, prefixLength, suffixAndExtZeroes);
incrementPrefix(prefixKey, prefixLength);
}
@Deprecated
2021-11-08 16:33:41 +01:00
static void firstRangeKey(Buffer prefixKey, int prefixLength, int suffixLength, int extLength) {
2022-03-16 22:41:51 +01:00
try (var zeroBuf = DefaultBufferAllocators.offHeapAllocator().allocate(suffixLength + extLength)) {
zeroBuf.fill((byte) 0);
zeroBuf.writerOffset(suffixLength + extLength);
zeroFillKeySuffixAndExt(prefixKey, prefixLength, zeroBuf);
}
2021-01-31 21:23:43 +01:00
}
2022-03-16 22:41:51 +01:00
@Deprecated
2021-11-08 16:33:41 +01:00
static void nextRangeKey(Buffer prefixKey, int prefixLength, int suffixLength, int extLength) {
2022-03-16 22:41:51 +01:00
try (var zeroBuf = DefaultBufferAllocators.offHeapAllocator().allocate(suffixLength + extLength)) {
zeroBuf.fill((byte) 0);
zeroBuf.writerOffset(suffixLength + extLength);
zeroFillKeySuffixAndExt(prefixKey, prefixLength, zeroBuf);
incrementPrefix(prefixKey, prefixLength);
}
2021-01-31 21:23:43 +01:00
}
2021-11-08 16:33:41 +01:00
protected static void zeroFillKeySuffixAndExt(@NotNull Buffer prefixKey,
2022-03-16 22:41:51 +01:00
int prefixLength, Buffer suffixAndExtZeroes) {
2021-11-08 16:33:41 +01:00
//noinspection UnnecessaryLocalVariable
var result = prefixKey;
2022-03-16 22:41:51 +01:00
var suffixLengthAndExtLength = suffixAndExtZeroes.readableBytes();
2021-11-08 16:33:41 +01:00
assert result.readableBytes() == prefixLength;
2022-03-16 22:41:51 +01:00
assert suffixLengthAndExtLength > 0 : "Suffix length + ext length is < 0: " + suffixLengthAndExtLength;
prefixKey.ensureWritable(suffixLengthAndExtLength);
suffixAndExtZeroes.copyInto(suffixAndExtZeroes.readerOffset(),
prefixKey,
prefixKey.writerOffset(),
suffixLengthAndExtLength
);
prefixKey.skipWritable(suffixLengthAndExtLength);
2021-01-31 21:23:43 +01:00
}
/**
* Use DatabaseMapDictionaryRange.simple instead
*/
@Deprecated
2021-02-02 19:40:37 +01:00
public static <T, U> DatabaseMapDictionaryDeep<T, U, DatabaseStageEntry<U>> simple(LLDictionary dictionary,
2021-09-23 20:57:28 +02:00
SerializerFixedBinaryLength<T> keySerializer, SubStageGetterSingle<U> subStageGetter,
2021-10-01 19:17:33 +02:00
Runnable onClose) {
2021-11-08 16:33:41 +01:00
return new DatabaseMapDictionaryDeep<>(dictionary, null, keySerializer,
2021-10-01 19:17:33 +02:00
subStageGetter, 0, onClose);
2021-01-31 21:23:43 +01:00
}
2021-09-23 20:57:28 +02:00
public static <T, U, US extends DatabaseStage<U>> DatabaseMapDictionaryDeep<T, U, US> deepTail(
LLDictionary dictionary, SerializerFixedBinaryLength<T> keySerializer, int keyExtLength,
2021-10-01 19:17:33 +02:00
SubStageGetter<U, US> subStageGetter, Runnable onClose) {
2021-11-08 16:33:41 +01:00
return new DatabaseMapDictionaryDeep<>(dictionary, null, keySerializer,
2021-10-01 19:17:33 +02:00
subStageGetter, keyExtLength, onClose);
2021-01-31 21:23:43 +01:00
}
2021-09-23 20:57:28 +02:00
public static <T, U, US extends DatabaseStage<U>> DatabaseMapDictionaryDeep<T, U, US> deepIntermediate(
2021-11-08 16:33:41 +01:00
LLDictionary dictionary, Buffer prefixKey, SerializerFixedBinaryLength<T> keySuffixSerializer,
2021-10-01 19:17:33 +02:00
SubStageGetter<U, US> subStageGetter, int keyExtLength, Runnable onClose) {
2021-09-23 20:57:28 +02:00
return new DatabaseMapDictionaryDeep<>(dictionary, prefixKey, keySuffixSerializer, subStageGetter,
2021-10-01 19:17:33 +02:00
keyExtLength, onClose);
2021-01-31 21:23:43 +01:00
}
2021-10-01 19:17:33 +02:00
@SuppressWarnings({"unchecked", "rawtypes"})
2021-11-08 16:33:41 +01:00
protected DatabaseMapDictionaryDeep(LLDictionary dictionary, @Nullable Buffer prefixKey,
2021-09-23 20:57:28 +02:00
SerializerFixedBinaryLength<T> keySuffixSerializer, SubStageGetter<U, US> subStageGetter, int keyExtLength,
2021-10-01 19:17:33 +02:00
Runnable onClose) {
super((Drop<DatabaseMapDictionaryDeep<T, U, US>>) (Drop) DROP);
2021-11-08 16:33:41 +01:00
try {
2021-08-31 15:50:11 +02:00
this.dictionary = dictionary;
this.alloc = dictionary.getAllocator();
this.subStageGetter = subStageGetter;
this.keySuffixSerializer = keySuffixSerializer;
2021-11-08 16:33:41 +01:00
assert prefixKey == null || prefixKey.isAccessible();
this.keyPrefixLength = prefixKey == null ? 0 : prefixKey.readableBytes();
2021-08-31 15:50:11 +02:00
this.keySuffixLength = keySuffixSerializer.getSerializedBinaryLength();
this.keyExtLength = keyExtLength;
2022-03-16 22:41:51 +01:00
this.keySuffixAndExtZeroBuffer = alloc
.allocate(keySuffixLength + keyExtLength)
.fill((byte) 0)
.writerOffset(keySuffixLength + keyExtLength)
.makeReadOnly();
assert keySuffixAndExtZeroBuffer.readableBytes() == keySuffixLength + keyExtLength :
"Key suffix and ext zero buffer readable length is not equal"
+ " to the key suffix length + key ext length. keySuffixAndExtZeroBuffer="
+ keySuffixAndExtZeroBuffer.readableBytes() + " keySuffixLength=" + keySuffixLength + " keyExtLength="
+ keyExtLength;
assert keySuffixAndExtZeroBuffer.readableBytes() > 0;
2021-11-08 16:33:41 +01:00
var firstKey = prefixKey == null ? alloc.allocate(keyPrefixLength + keySuffixLength + keyExtLength)
: prefixKey.copy();
try {
2022-03-16 22:41:51 +01:00
firstRangeKey(firstKey, keyPrefixLength, keySuffixAndExtZeroBuffer);
2021-11-08 16:33:41 +01:00
var nextRangeKey = prefixKey == null ? alloc.allocate(keyPrefixLength + keySuffixLength + keyExtLength)
: prefixKey.copy();
try {
2022-03-16 22:41:51 +01:00
nextRangeKey(nextRangeKey, keyPrefixLength, keySuffixAndExtZeroBuffer);
2021-11-08 16:33:41 +01:00
assert prefixKey == null || prefixKey.isAccessible();
2021-08-31 15:50:11 +02:00
assert keyPrefixLength == 0 || !LLUtils.equals(firstKey, nextRangeKey);
2021-11-08 16:33:41 +01:00
if (keyPrefixLength == 0) {
this.range = LLRange.all();
firstKey.close();
nextRangeKey.close();
} else {
this.range = LLRange.ofUnsafe(firstKey, nextRangeKey);
}
2021-08-31 15:50:11 +02:00
this.rangeMono = LLUtils.lazyRetainRange(this.range);
assert subStageKeysConsistency(keyPrefixLength + keySuffixLength + keyExtLength);
2021-11-08 16:33:41 +01:00
} catch (Throwable t) {
nextRangeKey.close();
throw t;
2021-08-31 15:50:11 +02:00
}
2021-11-08 16:33:41 +01:00
} catch (Throwable t) {
firstKey.close();
throw t;
}
2021-09-22 18:33:28 +02:00
2021-11-08 16:33:41 +01:00
this.keyPrefix = prefixKey;
2021-10-01 19:17:33 +02:00
this.onClose = onClose;
2021-11-08 16:33:41 +01:00
} catch (Throwable t) {
2022-03-16 22:41:51 +01:00
if (this.keySuffixAndExtZeroBuffer != null && keySuffixAndExtZeroBuffer.isAccessible()) {
keySuffixAndExtZeroBuffer.close();
}
2021-11-08 16:33:41 +01:00
if (prefixKey != null && prefixKey.isAccessible()) {
prefixKey.close();
}
throw t;
}
2021-01-31 21:23:43 +01:00
}
2021-10-01 19:17:33 +02:00
@SuppressWarnings({"unchecked", "rawtypes"})
2021-09-23 20:57:28 +02:00
private DatabaseMapDictionaryDeep(LLDictionary dictionary,
BufferAllocator alloc,
SubStageGetter<U, US> subStageGetter,
SerializerFixedBinaryLength<T> keySuffixSerializer,
int keyPrefixLength,
int keySuffixLength,
int keyExtLength,
Mono<Send<LLRange>> rangeMono,
Send<LLRange> range,
Send<Buffer> keyPrefix,
2022-03-16 22:41:51 +01:00
Send<Buffer> keySuffixAndExtZeroBuffer,
2021-10-01 19:17:33 +02:00
Runnable onClose) {
super((Drop<DatabaseMapDictionaryDeep<T,U,US>>) (Drop) DROP);
2021-09-23 20:57:28 +02:00
this.dictionary = dictionary;
this.alloc = alloc;
this.subStageGetter = subStageGetter;
this.keySuffixSerializer = keySuffixSerializer;
this.keyPrefixLength = keyPrefixLength;
this.keySuffixLength = keySuffixLength;
this.keyExtLength = keyExtLength;
this.rangeMono = rangeMono;
this.range = range.receive();
this.keyPrefix = keyPrefix.receive();
2022-03-16 22:41:51 +01:00
this.keySuffixAndExtZeroBuffer = keySuffixAndExtZeroBuffer.receive();
2021-10-01 19:17:33 +02:00
this.onClose = onClose;
2021-09-23 20:57:28 +02:00
}
2021-01-31 21:23:43 +01:00
@SuppressWarnings("unused")
2021-10-19 00:22:05 +02:00
protected boolean suffixKeyLengthConsistency(int keySuffixLength) {
2021-01-31 21:23:43 +01:00
return this.keySuffixLength == keySuffixLength;
}
@SuppressWarnings("unused")
protected boolean extKeyConsistency(int keyExtLength) {
return this.keyExtLength == keyExtLength;
}
@SuppressWarnings("unused")
protected boolean suffixAndExtKeyConsistency(int keySuffixAndExtLength) {
return this.keySuffixLength + this.keyExtLength == keySuffixAndExtLength;
}
/**
* Removes the prefix from the key
2021-10-19 00:22:05 +02:00
* @return the prefix
2021-01-31 21:23:43 +01:00
*/
2021-10-19 00:22:05 +02:00
protected Buffer splitPrefix(Buffer key) {
2021-09-23 02:22:30 +02:00
assert key.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength
|| key.readableBytes() == keyPrefixLength + keySuffixLength;
2021-10-19 00:22:05 +02:00
var prefix = key.readSplit(this.keyPrefixLength);
2021-09-23 02:22:30 +02:00
assert key.readableBytes() == keySuffixLength + keyExtLength
|| key.readableBytes() == keySuffixLength;
2021-10-19 00:22:05 +02:00
return prefix;
2021-01-31 21:23:43 +01:00
}
protected LLSnapshot resolveSnapshot(@Nullable CompositeSnapshot snapshot) {
if (snapshot == null) {
return null;
} else {
return snapshot.getSnapshot(dictionary);
}
}
2021-02-24 16:43:07 +01:00
@Override
public Mono<Long> leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) {
return dictionary.sizeRange(resolveSnapshot(snapshot), rangeMono, fast);
2021-02-24 16:43:07 +01:00
}
2021-03-14 03:13:19 +01:00
@Override
public Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) {
return dictionary.isRangeEmpty(resolveSnapshot(snapshot), rangeMono, false);
2021-03-14 03:13:19 +01:00
}
2021-01-31 21:23:43 +01:00
@Override
public Mono<US> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
2021-10-19 00:22:05 +02:00
var suffixKeyWithoutExt = Mono.fromCallable(() -> {
2021-11-08 16:33:41 +01:00
try (var keyWithoutExtBuf = keyPrefix == null
? alloc.allocate(keySuffixLength + keyExtLength) : keyPrefix.copy()) {
2021-10-19 00:22:05 +02:00
keyWithoutExtBuf.ensureWritable(keySuffixLength + keyExtLength);
serializeSuffix(keySuffix, keyWithoutExtBuf);
return keyWithoutExtBuf.send();
}
});
2021-09-02 21:14:26 +02:00
return this.subStageGetter
2022-01-26 14:22:54 +01:00
.subStage(dictionary, snapshot, suffixKeyWithoutExt);
2021-05-02 19:18:15 +02:00
}
@Override
public Mono<UpdateMode> getUpdateMode() {
return dictionary.getUpdateMode();
}
2021-06-26 02:35:33 +02:00
@Override
public Flux<BadBlock> badBlocks() {
return dictionary.badBlocks(rangeMono);
2021-06-26 02:35:33 +02:00
}
2021-01-31 21:23:43 +01:00
@Override
2022-03-24 23:56:23 +01:00
public Flux<Entry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
2021-09-02 21:14:26 +02:00
return dictionary
2022-03-24 23:56:23 +01:00
.getRangeKeyPrefixes(resolveSnapshot(snapshot), rangeMono, keyPrefixLength + keySuffixLength, smallRange)
2021-09-02 21:14:26 +02:00
.flatMapSequential(groupKeyWithoutExtSend_ -> Mono.using(
groupKeyWithoutExtSend_::receive,
groupKeyWithoutExtSend -> this.subStageGetter
.subStage(dictionary, snapshot, Mono.fromCallable(() -> groupKeyWithoutExtSend.copy().send()))
2022-03-16 22:41:51 +01:00
.handle((us, sink) -> {
2021-10-19 00:22:05 +02:00
T deserializedSuffix;
2021-12-12 02:40:26 +01:00
try (var splittedGroupSuffix = splitGroupSuffix(groupKeyWithoutExtSend)) {
deserializedSuffix = this.deserializeSuffix(splittedGroupSuffix);
2021-10-19 00:22:05 +02:00
sink.next(Map.entry(deserializedSuffix, us));
2021-09-02 21:14:26 +02:00
} catch (SerializationException ex) {
sink.error(ex);
}
2021-09-02 21:14:26 +02:00
}),
Resource::close
2022-01-26 14:22:54 +01:00
));
}
2021-10-19 00:22:05 +02:00
/**
* Split the input. The input will become the ext, the returned data will be the group suffix
* @param groupKey group key, will become ext
* @return group suffix
*/
private Buffer splitGroupSuffix(@NotNull Buffer groupKey) {
assert subStageKeysConsistency(groupKey.readableBytes())
|| subStageKeysConsistency(groupKey.readableBytes() + keyExtLength);
this.splitPrefix(groupKey).close();
assert subStageKeysConsistency(keyPrefixLength + groupKey.readableBytes())
|| subStageKeysConsistency(keyPrefixLength + groupKey.readableBytes() + keyExtLength);
return groupKey.readSplit(keySuffixLength);
2021-09-02 21:14:26 +02:00
}
private boolean subStageKeysConsistency(int totalKeyLength) {
if (subStageGetter instanceof SubStageGetterMapDeep) {
return totalKeyLength
== keyPrefixLength + keySuffixLength + ((SubStageGetterMapDeep<?, ?, ?>) subStageGetter).getKeyBinaryLength();
} else if (subStageGetter instanceof SubStageGetterMap) {
return totalKeyLength
== keyPrefixLength + keySuffixLength + ((SubStageGetterMap<?, ?>) subStageGetter).getKeyBinaryLength();
} else {
return true;
}
2021-01-31 21:23:43 +01:00
}
@Override
public Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) {
2021-05-02 19:18:15 +02:00
return this
2022-03-24 23:56:23 +01:00
.getAllValues(null, false)
2021-05-02 19:18:15 +02:00
.concatWith(this
.clear()
.then(this.putMulti(entries))
.then(Mono.empty())
2021-05-02 19:18:15 +02:00
);
2021-03-11 02:22:59 +01:00
}
@Override
public Mono<Void> clear() {
return Mono
.defer(() -> {
if (range.isAll()) {
return dictionary.clear();
} else if (range.isSingle()) {
return dictionary
2021-09-02 21:14:26 +02:00
.remove(Mono.fromCallable(range::getSingle), LLDictionaryResultType.VOID)
2021-08-29 23:18:03 +02:00
.doOnNext(Send::close)
.then();
} else {
2022-03-24 23:56:23 +01:00
return dictionary.setRange(rangeMono, Flux.empty(), false);
}
});
2021-01-31 21:23:43 +01:00
}
2021-10-19 00:22:05 +02:00
protected T deserializeSuffix(@NotNull Buffer keySuffix) throws SerializationException {
assert suffixKeyLengthConsistency(keySuffix.readableBytes());
var result = keySuffixSerializer.deserialize(keySuffix);
2021-11-08 16:33:41 +01:00
assert keyPrefix == null || keyPrefix.isAccessible();
2021-10-19 00:22:05 +02:00
return result;
2021-01-31 21:23:43 +01:00
}
2021-10-19 00:22:05 +02:00
protected void serializeSuffix(T keySuffix, Buffer output) throws SerializationException {
output.ensureWritable(keySuffixLength);
var beforeWriterOffset = output.writerOffset();
keySuffixSerializer.serialize(keySuffix, output);
var afterWriterOffset = output.writerOffset();
assert suffixKeyLengthConsistency(afterWriterOffset - beforeWriterOffset);
2021-11-08 16:33:41 +01:00
assert keyPrefix == null || keyPrefix.isAccessible();
2021-01-31 21:23:43 +01:00
}
@Override
2021-09-23 20:57:28 +02:00
protected RuntimeException createResourceClosedException() {
throw new IllegalStateException("Closed");
}
@Override
protected Owned<DatabaseMapDictionaryDeep<T, U, US>> prepareSend() {
2022-03-16 22:41:51 +01:00
var keyPrefix = this.keyPrefix.send();
var keySuffixAndExtZeroBuffer = this.keySuffixAndExtZeroBuffer.send();
2021-09-23 20:57:28 +02:00
var range = this.range.send();
2021-10-01 19:17:33 +02:00
var onClose = this.onClose;
return drop -> {
var instance = new DatabaseMapDictionaryDeep<>(dictionary,
alloc,
subStageGetter,
keySuffixSerializer,
keyPrefixLength,
keySuffixLength,
keyExtLength,
rangeMono,
range,
keyPrefix,
2022-03-16 22:41:51 +01:00
keySuffixAndExtZeroBuffer,
2021-10-01 19:17:33 +02:00
onClose
);
drop.attach(instance);
return instance;
};
2021-09-23 20:57:28 +02:00
}
@Override
protected void makeInaccessible() {
this.keyPrefix = null;
2022-03-16 22:41:51 +01:00
this.keySuffixAndExtZeroBuffer = null;
2021-09-23 20:57:28 +02:00
this.range = null;
2021-10-01 19:17:33 +02:00
this.onClose = null;
}
2022-03-12 02:55:18 +01:00
2022-03-20 14:33:27 +01:00
public static <K1, K2, V, R> Flux<R> getAllLeaves2(DatabaseMapDictionaryDeep<K1, Object2ObjectSortedMap<K2, V>, ? extends DatabaseStageMap<K2, V, DatabaseStageEntry<V>>> deepMap,
2022-03-12 02:55:18 +01:00
CompositeSnapshot snapshot,
2022-03-20 14:33:27 +01:00
TriFunction<K1, K2, V, R> merger,
@NotNull Mono<K1> savedProgressKey1) {
var keySuffix1Serializer = deepMap.keySuffixSerializer;
SerializerFixedBinaryLength<?> keySuffix2Serializer;
Serializer<?> valueSerializer;
boolean isHashed;
boolean isHashedSet;
if (deepMap.subStageGetter instanceof SubStageGetterMap subStageGetterMap) {
isHashed = false;
isHashedSet = false;
keySuffix2Serializer = subStageGetterMap.keySerializer;
valueSerializer = subStageGetterMap.valueSerializer;
} else if (deepMap.subStageGetter instanceof SubStageGetterHashMap subStageGetterHashMap) {
isHashed = true;
isHashedSet = false;
keySuffix2Serializer = subStageGetterHashMap.keyHashSerializer;
//noinspection unchecked
ValueWithHashSerializer<K2, V> valueWithHashSerializer = new ValueWithHashSerializer<>(
(Serializer<K2>) subStageGetterHashMap.keySerializer,
(Serializer<V>) subStageGetterHashMap.valueSerializer
);
valueSerializer = new ValuesSetSerializer<>(valueWithHashSerializer);
} else if (deepMap.subStageGetter instanceof SubStageGetterHashSet subStageGetterHashSet) {
isHashed = true;
isHashedSet = true;
keySuffix2Serializer = subStageGetterHashSet.keyHashSerializer;
//noinspection unchecked
valueSerializer = new ValuesSetSerializer<K2>(subStageGetterHashSet.keySerializer);
2022-03-12 02:55:18 +01:00
} else {
throw new IllegalArgumentException();
}
2022-03-20 14:33:27 +01:00
var savedProgressKey1Opt = savedProgressKey1.map(Optional::of).defaultIfEmpty(Optional.empty());
return deepMap
.dictionary
.getRange(deepMap.resolveSnapshot(snapshot), Mono.zip(savedProgressKey1Opt, deepMap.rangeMono).handle((tuple, sink) -> {
var firstKey = tuple.getT1();
try (var fullRange = tuple.getT2().receive()) {
if (firstKey.isPresent()) {
try (var key1Buf = deepMap.alloc.allocate(keySuffix1Serializer.getSerializedBinaryLength())) {
keySuffix1Serializer.serialize(firstKey.get(), key1Buf);
sink.next(LLRange.of(key1Buf.send(), fullRange.getMax()).send());
} catch (SerializationException e) {
sink.error(e);
}
} else {
sink.next(fullRange.send());
}
}
2022-03-24 23:56:23 +01:00
}), false, false)
2022-03-20 14:33:27 +01:00
.concatMapIterable(entrySend -> {
K1 key1 = null;
Object key2 = null;
try (var entry = entrySend.receive()) {
var keyBuf = entry.getKeyUnsafe();
var valueBuf = entry.getValueUnsafe();
try {
assert keyBuf != null;
keyBuf.skipReadable(deepMap.keyPrefixLength);
try (var key1Buf = keyBuf.split(deepMap.keySuffixLength)) {
key1 = keySuffix1Serializer.deserialize(key1Buf);
}
key2 = keySuffix2Serializer.deserialize(keyBuf);
assert valueBuf != null;
Object value = valueSerializer.deserialize(valueBuf);
if (isHashedSet) {
//noinspection unchecked
Set<K2> set = (Set<K2>) value;
K1 finalKey1 = key1;
//noinspection unchecked
return set.stream().map(e -> merger.apply(finalKey1, e, (V) Nothing.INSTANCE)).toList();
} else if (isHashed) {
//noinspection unchecked
Set<Entry<K2, V>> set = (Set<Entry<K2, V>>) value;
K1 finalKey1 = key1;
return set.stream().map(e -> merger.apply(finalKey1, e.getKey(), e.getValue())).toList();
} else {
//noinspection unchecked
return List.of(merger.apply(key1, (K2) key2, (V) value));
}
} catch (IndexOutOfBoundsException ex) {
var exMessage = ex.getMessage();
if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) {
var totalZeroBytesErrors = deepMap.totalZeroBytesErrors.incrementAndGet();
if (totalZeroBytesErrors < 512 || totalZeroBytesErrors % 10000 == 0) {
LOG.error("Unexpected zero-bytes value at " + deepMap.dictionary.getDatabaseName()
+ ":" + deepMap.dictionary.getColumnName()
+ ":[" + key1
+ ":" + key2
+ "](" + LLUtils.toStringSafe(keyBuf) + ") total=" + totalZeroBytesErrors);
}
return List.of();
} else {
throw ex;
}
}
} catch (SerializationException ex) {
throw new CompletionException(ex);
}
});
2022-03-12 02:55:18 +01:00
}
2021-01-31 21:23:43 +01:00
}