package it.cavallium.dbengine.database.memory; import io.netty.buffer.api.Buffer; import io.netty.buffer.api.BufferAllocator; import io.netty.buffer.api.Send; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.ExtraKeyOperationResult; import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLEntry; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.disk.ReleasableSlice; import it.cavallium.dbengine.database.serialization.BiSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.unimi.dsi.fastutil.bytes.ByteList; import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Function; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; import reactor.util.function.Tuple3; import reactor.util.function.Tuples; public class LLMemoryDictionary implements LLDictionary { private final String databaseName; private final String columnName; private final BufferAllocator allocator; private final UpdateMode updateMode; private final Getter> snapshots; private final ConcurrentSkipListMap mainDb; private interface Getter { U get(T argument); } public LLMemoryDictionary(BufferAllocator allocator, String databaseName, String columnName, UpdateMode updateMode, ConcurrentHashMap>> snapshots, ConcurrentHashMap> mainDb) { this.databaseName = databaseName; this.columnName = columnName; this.allocator = allocator; this.updateMode = updateMode; this.snapshots = (snapshotId) -> snapshots.get(snapshotId).get(columnName); this.mainDb = mainDb.get(columnName); } @Override public String getColumnName() { return columnName; } @Override public BufferAllocator getAllocator() { return allocator; } private long resolveSnapshot(LLSnapshot snapshot) { if (snapshot == null) { return Long.MIN_VALUE + 1L; } else if (snapshot.getSequenceNumber() == Long.MIN_VALUE + 1L) { throw new IllegalStateException(); } else { return snapshot.getSequenceNumber(); } } private Mono> transformResult(Mono result, LLDictionaryResultType resultType) { if (resultType == LLDictionaryResultType.PREVIOUS_VALUE) { // Don't retain the result because it has been removed from the skip list return result.map(this::kk); } else if (resultType == LLDictionaryResultType.PREVIOUS_VALUE_EXISTENCE) { return result .map(prev -> true) .defaultIfEmpty(false) .map((Boolean bool) -> LLUtils.booleanToResponseByteBuffer(allocator, bool)); } else { return result.then(Mono.empty()); } } private ByteList k(Send buf) { return new BinaryLexicographicList(LLUtils.toArray(buf.receive())); } private Send kk(ByteList bytesList) { try (var buffer = getAllocator().allocate(bytesList.size())) { buffer.writeBytes(bytesList.toByteArray()); return buffer.send(); } } private Map mapSlice(LLSnapshot snapshot, Send rangeToReceive) { try (var range = rangeToReceive.receive()) { if (range.isAll()) { return snapshots.get(resolveSnapshot(snapshot)); } else if (range.isSingle()) { var key = k(range.getSingle()); var value = snapshots .get(resolveSnapshot(snapshot)) .get(key); if (value != null) { return Map.of(key, value); } else { return Map.of(); } } else if (range.hasMin() && range.hasMax()) { var min = k(range.getMin()); var max = k(range.getMax()); if (min.compareTo(max) > 0) { return Map.of(); } return snapshots .get(resolveSnapshot(snapshot)) .subMap(min, true, max, false); } else if (range.hasMin()) { return snapshots .get(resolveSnapshot(snapshot)) .tailMap(k(range.getMin()), true); } else { return snapshots .get(resolveSnapshot(snapshot)) .headMap(k(range.getMax()), false); } } } @Override public Mono> get(@Nullable LLSnapshot snapshot, Mono> keyMono, boolean existsAlmostCertainly) { return Mono.usingWhen(keyMono, key -> Mono .fromCallable(() -> snapshots.get(resolveSnapshot(snapshot)).get(k(key))) .map(this::kk) .onErrorMap(cause -> new IOException("Failed to read", cause)), key -> Mono.fromRunnable(key::close) ); } @Override public Mono> put(Mono> keyMono, Mono> valueMono, LLDictionaryResultType resultType) { return Mono.usingWhen(keyMono, key -> Mono.usingWhen(valueMono, value -> Mono .fromCallable(() -> mainDb.put(k(key), k(value))) .transform(result -> this.transformResult(result, resultType)) .onErrorMap(cause -> new IOException("Failed to read", cause)), value -> Mono.fromRunnable(value::close) ), key -> Mono.fromRunnable(key::close) ); } @Override public Mono getUpdateMode() { return Mono.just(updateMode); } @Override public Mono updateAndGetDelta(Mono> keyMono, SerializationFunction<@Nullable Send, @Nullable Send> updater, boolean existsAlmostCertainly) { return Mono.usingWhen(keyMono, key -> Mono.fromCallable(() -> { AtomicReference> oldRef = new AtomicReference<>(null); var newValue = mainDb.compute(k(key), (_unused, old) -> { if (old != null) { oldRef.set(kk(old)); } Send v = null; try { v = updater.apply(old != null ? kk(old) : null); } catch (SerializationException e) { throw new IllegalStateException(e); } try { return k(v); } finally { if (v != null) { v.close(); } } }); return LLDelta.of(oldRef.get(), kk(newValue)); }), key -> Mono.fromRunnable(key::close) ); } @Override public Mono clear() { return Mono.fromRunnable(mainDb::clear); } @Override public Mono> remove(Mono> keyMono, LLDictionaryResultType resultType) { return Mono.usingWhen(keyMono, key -> Mono .fromCallable(() -> mainDb.remove(k(key))) // Don't retain the result because it has been removed from the skip list .mapNotNull(bytesList -> switch (resultType) { case VOID -> null; case PREVIOUS_VALUE_EXISTENCE -> LLUtils.booleanToResponseByteBuffer(allocator, true); case PREVIOUS_VALUE -> kk(bytesList); }) .switchIfEmpty(Mono.defer(() -> { if (resultType == LLDictionaryResultType.PREVIOUS_VALUE_EXISTENCE) { return Mono.fromCallable(() -> LLUtils.booleanToResponseByteBuffer(allocator, false)); } else { return Mono.empty(); } })) .onErrorMap(cause -> new IOException("Failed to read", cause)), key -> Mono.fromRunnable(key::close) ); } @Override public Flux, Optional>>> getMulti(@Nullable LLSnapshot snapshot, Flux>> keys, boolean existsAlmostCertainly) { return keys .map(key -> { try (var t2 = key.getT2().receive()) { ByteList v = snapshots.get(resolveSnapshot(snapshot)).get(k(t2.copy().send())); if (v != null) { return Tuples.of(key.getT1(), t2.send(), Optional.of(kk(v))); } else { return Tuples.of(key.getT1(), t2.send(), Optional.empty()); } } }); } @Override public Flux> putMulti(Flux> entries, boolean getOldValues) { return entries.handle((entryToReceive, sink) -> { try (var entry = entryToReceive.receive()) { try (var key = entry.getKey().receive()) { try (var val = entry.getValue().receive()) { var v = mainDb.put(k(key.copy().send()), k(val.send())); if (v == null || !getOldValues) { sink.complete(); } else { sink.next(LLEntry.of(key.send(), kk(v)).send()); } } } } }); } @Override public Flux, X>> updateMulti(Flux, X>> entries, BiSerializationFunction, X, Send> updateFunction) { return Flux.error(new UnsupportedOperationException("Not implemented")); } @Override public Flux> getRange(@Nullable LLSnapshot snapshot, Mono> rangeMono, boolean existsAlmostCertainly) { return Flux.usingWhen(rangeMono, rangeToReceive -> { try (var range = rangeToReceive.receive()) { if (range.isSingle()) { var singleToReceive = range.getSingle(); return Mono.fromCallable(() -> { try (var single = singleToReceive.receive()) { var element = snapshots.get(resolveSnapshot(snapshot)).get(k(single.copy().send())); return LLEntry.of(single.send(), kk(element)).send(); } }).flux(); } else { var rangeToReceive2 = range.send(); return Mono .fromCallable(() -> mapSlice(snapshot, rangeToReceive2)) .flatMapMany(map -> Flux.fromIterable(map.entrySet())) .map(entry -> LLEntry.of(kk(entry.getKey()), kk(entry.getValue())).send()); } } }, range -> Mono.fromRunnable(range::close)); } @Override public Flux>> getRangeGrouped(@Nullable LLSnapshot snapshot, Mono> rangeMono, int prefixLength, boolean existsAlmostCertainly) { return Flux.error(new UnsupportedOperationException("Not implemented")); } @Override public Flux> getRangeKeys(@Nullable LLSnapshot snapshot, Mono> rangeMono) { return Flux.usingWhen(rangeMono, rangeToReceive -> { try (var range = rangeToReceive.receive()) { if (range.isSingle()) { var singleToReceive = range.getSingle(); return Mono.fromCallable(() -> { try (var single = singleToReceive.receive()) { var contains = snapshots.get(resolveSnapshot(snapshot)).containsKey(k(single.copy().send())); return contains ? single.send() : null; } }).flux(); } else { var rangeToReceive2 = range.send(); return Mono .fromCallable(() -> mapSlice(snapshot, rangeToReceive2)) .flatMapMany(map -> Flux.fromIterable(map.entrySet())) .map(entry -> kk(entry.getKey())); } } }, range -> Mono.fromRunnable(range::close) ); } private static record BufferWithPrefix(Send buffer, Send prefix) {} @Override public Flux>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, Mono> rangeMono, int prefixLength) { return getRangeKeys(snapshot, rangeMono) .map(bufferToReceive -> { try(var buffer = bufferToReceive.receive()) { try (var bufferPrefix = buffer.copy(buffer.readerOffset(), prefixLength)) { return new BufferWithPrefix(buffer.send(), bufferPrefix.send()); } } }) .windowUntilChanged(bufferTuple -> bufferTuple.prefix().receive(), LLUtils::equals) .flatMapSequential(window -> window.map(tuple -> { try (var ignored = tuple.prefix()) { return tuple.buffer(); } }).collectList()); } @Override public Flux> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono> rangeMono, int prefixLength) { return getRangeKeys(snapshot, rangeMono) .map(bufferToReceive -> { try(var buffer = bufferToReceive.receive()) { try (var bufferPrefix = buffer.copy(buffer.readerOffset(), prefixLength)) { return new BufferWithPrefix(buffer.send(), bufferPrefix.send()); } } }) .distinctUntilChanged(bufferTuple -> bufferTuple.prefix().receive(), (a, b) -> { if (LLUtils.equals(a, b)) { b.close(); return true; } else { return false; } }) .map(tuple -> { try (var ignored = tuple.prefix()) { return tuple.buffer(); } }) .transform(LLUtils::handleDiscard); } @Override public Flux badBlocks(Mono> rangeMono) { return Flux.empty(); } @Override public Mono setRange(Mono> rangeMono, Flux> entries) { return Mono.error(new UnsupportedOperationException("Not implemented")); } @Override public Mono isRangeEmpty(@Nullable LLSnapshot snapshot, Mono> rangeMono) { return Mono.error(new UnsupportedOperationException("Not implemented")); } @Override public Mono sizeRange(@Nullable LLSnapshot snapshot, Mono> rangeMono, boolean fast) { return Mono.usingWhen(rangeMono, range -> Mono.fromCallable(() -> (long) mapSlice(snapshot, range).size()), range -> Mono.fromRunnable(range::close) ); } @Override public Mono> getOne(@Nullable LLSnapshot snapshot, Mono> rangeMono) { return Mono.error(new UnsupportedOperationException("Not implemented")); } @Override public Mono> getOneKey(@Nullable LLSnapshot snapshot, Mono> rangeMono) { return Mono.error(new UnsupportedOperationException("Not implemented")); } @Override public Mono> removeOne(Mono> rangeMono) { return Mono.error(new UnsupportedOperationException("Not implemented")); } @Override public String getDatabaseName() { return databaseName; } }