2021-07-10 20:52:01 +02:00
|
|
|
package it.cavallium.dbengine.database.memory;
|
|
|
|
|
2022-03-16 13:47:56 +01:00
|
|
|
import io.netty5.buffer.api.Buffer;
|
|
|
|
import io.netty5.buffer.api.BufferAllocator;
|
2022-05-20 10:20:00 +02:00
|
|
|
import io.netty5.buffer.api.Resource;
|
2022-03-16 13:47:56 +01:00
|
|
|
import io.netty5.buffer.api.Send;
|
2021-07-10 20:52:01 +02:00
|
|
|
import it.cavallium.dbengine.client.BadBlock;
|
2021-08-31 15:50:11 +02:00
|
|
|
import it.cavallium.dbengine.database.LLDelta;
|
2021-07-10 20:52:01 +02:00
|
|
|
import it.cavallium.dbengine.database.LLDictionary;
|
|
|
|
import it.cavallium.dbengine.database.LLDictionaryResultType;
|
2021-08-28 22:42:51 +02:00
|
|
|
import it.cavallium.dbengine.database.LLEntry;
|
2021-07-10 20:52:01 +02:00
|
|
|
import it.cavallium.dbengine.database.LLRange;
|
|
|
|
import it.cavallium.dbengine.database.LLSnapshot;
|
|
|
|
import it.cavallium.dbengine.database.LLUtils;
|
2022-05-21 15:28:52 +02:00
|
|
|
import it.cavallium.dbengine.database.OptionalBuf;
|
2021-07-10 20:52:01 +02:00
|
|
|
import it.cavallium.dbengine.database.UpdateMode;
|
2022-04-01 01:30:56 +02:00
|
|
|
import it.cavallium.dbengine.database.disk.BinarySerializationFunction;
|
2021-11-08 10:49:59 +01:00
|
|
|
import it.cavallium.dbengine.database.serialization.KVSerializationFunction;
|
2021-08-22 21:23:22 +02:00
|
|
|
import it.cavallium.dbengine.database.serialization.SerializationException;
|
|
|
|
import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
2021-07-10 20:52:01 +02:00
|
|
|
import it.unimi.dsi.fastutil.bytes.ByteList;
|
|
|
|
import java.io.IOException;
|
|
|
|
import java.util.List;
|
|
|
|
import java.util.Map;
|
2021-09-07 19:32:37 +02:00
|
|
|
import java.util.Objects;
|
2021-07-23 15:20:33 +02:00
|
|
|
import java.util.Optional;
|
2022-03-24 21:14:17 +01:00
|
|
|
import java.util.SortedMap;
|
2021-07-10 20:52:01 +02:00
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
2022-03-24 21:14:17 +01:00
|
|
|
import java.util.concurrent.ConcurrentNavigableMap;
|
2021-07-10 20:52:01 +02:00
|
|
|
import java.util.concurrent.ConcurrentSkipListMap;
|
2021-07-18 19:37:24 +02:00
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
2021-07-10 20:52:01 +02:00
|
|
|
import org.jetbrains.annotations.Nullable;
|
|
|
|
import reactor.core.publisher.Flux;
|
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
|
|
public class LLMemoryDictionary implements LLDictionary {
|
|
|
|
|
|
|
|
private final String databaseName;
|
|
|
|
private final String columnName;
|
2021-08-31 09:14:46 +02:00
|
|
|
private final BufferAllocator allocator;
|
2021-07-10 20:52:01 +02:00
|
|
|
private final UpdateMode updateMode;
|
|
|
|
private final Getter<Long, ConcurrentSkipListMap<ByteList, ByteList>> snapshots;
|
|
|
|
private final ConcurrentSkipListMap<ByteList, ByteList> mainDb;
|
|
|
|
|
|
|
|
private interface Getter<T, U> {
|
|
|
|
U get(T argument);
|
|
|
|
}
|
|
|
|
|
2021-08-31 09:14:46 +02:00
|
|
|
public LLMemoryDictionary(BufferAllocator allocator,
|
2021-07-10 20:52:01 +02:00
|
|
|
String databaseName,
|
|
|
|
String columnName,
|
|
|
|
UpdateMode updateMode,
|
|
|
|
ConcurrentHashMap<Long, ConcurrentHashMap<String, ConcurrentSkipListMap<ByteList, ByteList>>> snapshots,
|
|
|
|
ConcurrentHashMap<String, ConcurrentSkipListMap<ByteList, ByteList>> mainDb) {
|
|
|
|
this.databaseName = databaseName;
|
|
|
|
this.columnName = columnName;
|
|
|
|
this.allocator = allocator;
|
|
|
|
this.updateMode = updateMode;
|
|
|
|
this.snapshots = (snapshotId) -> snapshots.get(snapshotId).get(columnName);
|
|
|
|
this.mainDb = mainDb.get(columnName);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public String getColumnName() {
|
|
|
|
return columnName;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-08-31 09:14:46 +02:00
|
|
|
public BufferAllocator getAllocator() {
|
2021-07-10 20:52:01 +02:00
|
|
|
return allocator;
|
|
|
|
}
|
|
|
|
|
|
|
|
private long resolveSnapshot(LLSnapshot snapshot) {
|
|
|
|
if (snapshot == null) {
|
|
|
|
return Long.MIN_VALUE + 1L;
|
|
|
|
} else if (snapshot.getSequenceNumber() == Long.MIN_VALUE + 1L) {
|
|
|
|
throw new IllegalStateException();
|
|
|
|
} else {
|
|
|
|
return snapshot.getSequenceNumber();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-05-20 10:20:00 +02:00
|
|
|
private Mono<Buffer> transformResult(Mono<ByteList> result, LLDictionaryResultType resultType) {
|
2021-07-10 20:52:01 +02:00
|
|
|
if (resultType == LLDictionaryResultType.PREVIOUS_VALUE) {
|
|
|
|
// Don't retain the result because it has been removed from the skip list
|
2022-05-20 10:20:00 +02:00
|
|
|
return result.map(this::kkB);
|
2021-07-10 20:52:01 +02:00
|
|
|
} else if (resultType == LLDictionaryResultType.PREVIOUS_VALUE_EXISTENCE) {
|
|
|
|
return result
|
|
|
|
.map(prev -> true)
|
|
|
|
.defaultIfEmpty(false)
|
2021-08-31 15:50:11 +02:00
|
|
|
.map((Boolean bool) -> LLUtils.booleanToResponseByteBuffer(allocator, bool));
|
2021-07-10 20:52:01 +02:00
|
|
|
} else {
|
|
|
|
return result.then(Mono.empty());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-31 15:50:11 +02:00
|
|
|
private ByteList k(Send<Buffer> buf) {
|
2021-09-08 00:22:39 +02:00
|
|
|
try (var b = buf.receive()) {
|
|
|
|
return new BinaryLexicographicList(LLUtils.toArray(b));
|
|
|
|
}
|
2021-07-10 20:52:01 +02:00
|
|
|
}
|
|
|
|
|
2022-05-20 10:20:00 +02:00
|
|
|
private ByteList kShr(Buffer buf) {
|
|
|
|
return new BinaryLexicographicList(LLUtils.toArray(buf));
|
|
|
|
}
|
|
|
|
|
|
|
|
private ByteList kOwn(Buffer buf) {
|
|
|
|
try (buf) {
|
|
|
|
return new BinaryLexicographicList(LLUtils.toArray(buf));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-31 15:50:11 +02:00
|
|
|
private Send<Buffer> kk(ByteList bytesList) {
|
|
|
|
try (var buffer = getAllocator().allocate(bytesList.size())) {
|
|
|
|
buffer.writeBytes(bytesList.toByteArray());
|
|
|
|
return buffer.send();
|
|
|
|
}
|
2021-07-10 20:52:01 +02:00
|
|
|
}
|
|
|
|
|
2021-11-08 16:33:41 +01:00
|
|
|
private Buffer kkB(ByteList bytesList) {
|
|
|
|
var buffer = getAllocator().allocate(bytesList.size());
|
|
|
|
try {
|
|
|
|
buffer.writeBytes(bytesList.toByteArray());
|
|
|
|
return buffer;
|
|
|
|
} catch (Throwable t) {
|
|
|
|
buffer.close();
|
|
|
|
throw t;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-07 19:32:37 +02:00
|
|
|
private BLRange r(Send<LLRange> send) {
|
|
|
|
try(var range = send.receive()) {
|
|
|
|
if (range.isAll()) {
|
|
|
|
return new BLRange(null, null, null);
|
|
|
|
} else if (range.isSingle()) {
|
|
|
|
return new BLRange(null, null, k(range.getSingle()));
|
|
|
|
} else if (range.hasMin() && range.hasMax()) {
|
|
|
|
return new BLRange(k(range.getMin()), k(range.getMax()), null);
|
|
|
|
} else if (range.hasMin()) {
|
|
|
|
return new BLRange(k(range.getMin()), null, null);
|
|
|
|
} else {
|
|
|
|
return new BLRange(k(range.getMax()), null, null);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-05-20 10:20:00 +02:00
|
|
|
private ConcurrentNavigableMap<ByteList, ByteList> mapSlice(LLSnapshot snapshot, LLRange range) {
|
|
|
|
if (range.isAll()) {
|
|
|
|
return snapshots.get(resolveSnapshot(snapshot));
|
|
|
|
} else if (range.isSingle()) {
|
|
|
|
var key = k(range.getSingle());
|
|
|
|
var value = snapshots
|
|
|
|
.get(resolveSnapshot(snapshot))
|
|
|
|
.get(key);
|
|
|
|
if (value != null) {
|
|
|
|
return new ConcurrentSkipListMap<>(Map.of(key, value));
|
2021-07-10 20:52:01 +02:00
|
|
|
} else {
|
2022-05-20 10:20:00 +02:00
|
|
|
return new ConcurrentSkipListMap<>(Map.of());
|
2021-07-10 20:52:01 +02:00
|
|
|
}
|
2022-05-20 10:20:00 +02:00
|
|
|
} else if (range.hasMin() && range.hasMax()) {
|
|
|
|
var min = k(range.getMin());
|
|
|
|
var max = k(range.getMax());
|
|
|
|
if (min.compareTo(max) > 0) {
|
|
|
|
return new ConcurrentSkipListMap<>(Map.of());
|
|
|
|
}
|
|
|
|
return snapshots
|
|
|
|
.get(resolveSnapshot(snapshot))
|
|
|
|
.subMap(min, true, max, false);
|
|
|
|
} else if (range.hasMin()) {
|
|
|
|
return snapshots
|
|
|
|
.get(resolveSnapshot(snapshot))
|
|
|
|
.tailMap(k(range.getMin()), true);
|
|
|
|
} else {
|
|
|
|
return snapshots
|
|
|
|
.get(resolveSnapshot(snapshot))
|
|
|
|
.headMap(k(range.getMax()), false);
|
2021-07-10 20:52:01 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2022-05-20 10:20:00 +02:00
|
|
|
public Mono<Buffer> get(@Nullable LLSnapshot snapshot, Mono<Buffer> keyMono) {
|
2021-08-22 18:20:05 +02:00
|
|
|
return Mono.usingWhen(keyMono,
|
|
|
|
key -> Mono
|
2022-05-20 10:20:00 +02:00
|
|
|
.fromCallable(() -> snapshots.get(resolveSnapshot(snapshot)).get(kShr(key)))
|
|
|
|
.map(this::kkB)
|
2021-08-31 15:50:11 +02:00
|
|
|
.onErrorMap(cause -> new IOException("Failed to read", cause)),
|
|
|
|
key -> Mono.fromRunnable(key::close)
|
2021-08-22 18:20:05 +02:00
|
|
|
);
|
2021-07-10 20:52:01 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2022-05-20 10:20:00 +02:00
|
|
|
public Mono<Buffer> put(Mono<Buffer> keyMono, Mono<Buffer> valueMono, LLDictionaryResultType resultType) {
|
|
|
|
var kMono = keyMono.map(this::kOwn);
|
|
|
|
var vMono = valueMono.map(this::kOwn);
|
2021-09-08 00:22:39 +02:00
|
|
|
return Mono
|
|
|
|
.zip(kMono, vMono)
|
|
|
|
.mapNotNull(tuple -> mainDb.put(tuple.getT1(), tuple.getT2()))
|
|
|
|
.transform(result -> this.transformResult(result, resultType))
|
|
|
|
.onErrorMap(cause -> new IOException("Failed to read", cause));
|
2021-07-10 20:52:01 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Mono<UpdateMode> getUpdateMode() {
|
|
|
|
return Mono.just(updateMode);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2022-05-20 10:20:00 +02:00
|
|
|
public Mono<LLDelta> updateAndGetDelta(Mono<Buffer> keyMono, BinarySerializationFunction updater) {
|
2021-08-22 18:20:05 +02:00
|
|
|
return Mono.usingWhen(keyMono,
|
|
|
|
key -> Mono.fromCallable(() -> {
|
2022-05-20 10:20:00 +02:00
|
|
|
if (updateMode == UpdateMode.DISALLOW) {
|
|
|
|
throw new UnsupportedOperationException("update() is disallowed");
|
|
|
|
}
|
|
|
|
AtomicReference<ByteList> oldRef = new AtomicReference<>(null);
|
|
|
|
var newValue = mainDb.compute(kShr(key), (_unused, old) -> {
|
|
|
|
if (old != null) {
|
|
|
|
oldRef.set(old);
|
2021-08-22 18:20:05 +02:00
|
|
|
}
|
2022-05-20 10:20:00 +02:00
|
|
|
Buffer v;
|
|
|
|
var oldToSend = old != null ? kkB(old) : null;
|
|
|
|
try {
|
|
|
|
assert oldToSend == null || oldToSend.isAccessible();
|
|
|
|
v = updater.apply(oldToSend);
|
|
|
|
assert v == null || v.isAccessible();
|
|
|
|
} catch (SerializationException e) {
|
|
|
|
throw new IllegalStateException(e);
|
|
|
|
} finally {
|
|
|
|
if (oldToSend != null && oldToSend.isAccessible()) {
|
|
|
|
oldToSend.close();
|
2021-09-07 19:32:37 +02:00
|
|
|
}
|
2022-05-20 10:20:00 +02:00
|
|
|
}
|
|
|
|
if (v != null) {
|
|
|
|
return kOwn(v);
|
|
|
|
} else {
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
});
|
|
|
|
var oldVal = oldRef.get();
|
|
|
|
return LLDelta.of(oldVal != null ? kkB(oldRef.get()) : null, newValue != null ? kkB(newValue) : null);
|
2021-08-22 18:20:05 +02:00
|
|
|
}),
|
2021-08-31 15:50:11 +02:00
|
|
|
key -> Mono.fromRunnable(key::close)
|
2021-08-22 18:20:05 +02:00
|
|
|
);
|
2021-07-10 20:52:01 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Mono<Void> clear() {
|
|
|
|
return Mono.fromRunnable(mainDb::clear);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2022-05-20 10:20:00 +02:00
|
|
|
public Mono<Buffer> remove(Mono<Buffer> keyMono, LLDictionaryResultType resultType) {
|
2021-08-22 18:20:05 +02:00
|
|
|
return Mono.usingWhen(keyMono,
|
|
|
|
key -> Mono
|
2022-05-20 10:20:00 +02:00
|
|
|
.fromCallable(() -> mainDb.remove(kShr(key)))
|
2021-08-22 18:20:05 +02:00
|
|
|
// Don't retain the result because it has been removed from the skip list
|
|
|
|
.mapNotNull(bytesList -> switch (resultType) {
|
|
|
|
case VOID -> null;
|
2021-08-31 15:50:11 +02:00
|
|
|
case PREVIOUS_VALUE_EXISTENCE -> LLUtils.booleanToResponseByteBuffer(allocator, true);
|
2022-05-20 10:20:00 +02:00
|
|
|
case PREVIOUS_VALUE -> kkB(bytesList);
|
2021-08-22 18:20:05 +02:00
|
|
|
})
|
|
|
|
.switchIfEmpty(Mono.defer(() -> {
|
|
|
|
if (resultType == LLDictionaryResultType.PREVIOUS_VALUE_EXISTENCE) {
|
2021-08-31 15:50:11 +02:00
|
|
|
return Mono.fromCallable(() -> LLUtils.booleanToResponseByteBuffer(allocator, false));
|
2021-08-22 18:20:05 +02:00
|
|
|
} else {
|
|
|
|
return Mono.empty();
|
|
|
|
}
|
|
|
|
}))
|
2021-08-31 15:50:11 +02:00
|
|
|
.onErrorMap(cause -> new IOException("Failed to read", cause)),
|
|
|
|
key -> Mono.fromRunnable(key::close)
|
2021-08-22 18:20:05 +02:00
|
|
|
);
|
2021-07-10 20:52:01 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2022-05-21 15:28:52 +02:00
|
|
|
public Flux<OptionalBuf> getMulti(@Nullable LLSnapshot snapshot, Flux<Buffer> keys) {
|
2021-11-08 10:49:59 +01:00
|
|
|
return keys.map(key -> {
|
2022-05-20 10:20:00 +02:00
|
|
|
try (key) {
|
|
|
|
ByteList v = snapshots.get(resolveSnapshot(snapshot)).get(k(key.copy().send()));
|
2021-11-08 10:49:59 +01:00
|
|
|
if (v != null) {
|
2022-05-21 15:28:52 +02:00
|
|
|
return OptionalBuf.of(kkB(v));
|
2021-11-08 10:49:59 +01:00
|
|
|
} else {
|
2022-05-21 15:28:52 +02:00
|
|
|
return OptionalBuf.empty();
|
2021-11-08 10:49:59 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
2021-07-10 20:52:01 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2022-05-20 10:20:00 +02:00
|
|
|
public Mono<Void> putMulti(Flux<LLEntry> entries) {
|
|
|
|
return entries.doOnNext(entry -> {
|
|
|
|
try (entry) {
|
|
|
|
mainDb.put(k(entry.getKeyUnsafe().send()), k(entry.getValueUnsafe().send()));
|
2021-08-31 15:50:11 +02:00
|
|
|
}
|
2022-01-26 19:03:51 +01:00
|
|
|
}).then();
|
2021-07-10 20:52:01 +02:00
|
|
|
}
|
|
|
|
|
2021-07-17 11:52:08 +02:00
|
|
|
@Override
|
2021-11-08 10:49:59 +01:00
|
|
|
public <K> Flux<Boolean> updateMulti(Flux<K> keys,
|
2022-05-20 10:20:00 +02:00
|
|
|
Flux<Buffer> serializedKeys,
|
|
|
|
KVSerializationFunction<K, @Nullable Buffer, @Nullable Buffer> updateFunction) {
|
2021-07-17 11:52:08 +02:00
|
|
|
return Flux.error(new UnsupportedOperationException("Not implemented"));
|
|
|
|
}
|
|
|
|
|
2021-07-10 20:52:01 +02:00
|
|
|
@Override
|
2022-05-20 10:20:00 +02:00
|
|
|
public Flux<LLEntry> getRange(@Nullable LLSnapshot snapshot,
|
|
|
|
Mono<LLRange> rangeMono,
|
2022-03-24 23:56:23 +01:00
|
|
|
boolean reverse,
|
|
|
|
boolean smallRange) {
|
2022-05-20 10:20:00 +02:00
|
|
|
return Flux.usingWhen(rangeMono, range -> {
|
|
|
|
if (range.isSingle()) {
|
|
|
|
var singleToReceive = range.getSingle();
|
|
|
|
return Mono.fromCallable(() -> {
|
|
|
|
try (var single = singleToReceive.receive()) {
|
|
|
|
var element = snapshots.get(resolveSnapshot(snapshot)).get(k(single.copy().send()));
|
|
|
|
if (element != null) {
|
|
|
|
return LLEntry.of(single, kkB(element));
|
|
|
|
} else {
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}).flux();
|
|
|
|
} else {
|
|
|
|
return Mono
|
|
|
|
.fromCallable(() -> mapSlice(snapshot, range))
|
|
|
|
.flatMapIterable(map -> {
|
|
|
|
if (reverse) {
|
|
|
|
return map.descendingMap().entrySet();
|
2021-09-07 19:32:37 +02:00
|
|
|
} else {
|
2022-05-20 10:20:00 +02:00
|
|
|
return map.entrySet();
|
2021-09-07 19:32:37 +02:00
|
|
|
}
|
2022-05-20 10:20:00 +02:00
|
|
|
})
|
|
|
|
.map(entry -> LLEntry.of(kkB(entry.getKey()), kkB(entry.getValue())));
|
2021-08-31 15:50:11 +02:00
|
|
|
}
|
|
|
|
}, range -> Mono.fromRunnable(range::close));
|
2021-07-10 20:52:01 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2022-05-20 10:20:00 +02:00
|
|
|
public Flux<List<LLEntry>> getRangeGrouped(@Nullable LLSnapshot snapshot,
|
|
|
|
Mono<LLRange> rangeMono,
|
2022-03-24 23:56:23 +01:00
|
|
|
int prefixLength, boolean smallRange) {
|
2022-05-20 10:20:00 +02:00
|
|
|
return Flux.usingWhen(rangeMono, range -> {
|
|
|
|
try (range) {
|
2021-09-07 19:32:37 +02:00
|
|
|
if (range.isSingle()) {
|
|
|
|
var singleToReceive = range.getSingle();
|
|
|
|
return Mono.fromCallable(() -> {
|
|
|
|
try (var single = singleToReceive.receive()) {
|
|
|
|
var element = snapshots.get(resolveSnapshot(snapshot)).get(k(single.copy().send()));
|
|
|
|
if (element != null) {
|
2022-05-20 10:20:00 +02:00
|
|
|
return List.of(LLEntry.of(single, kkB(element)));
|
2021-09-07 19:32:37 +02:00
|
|
|
} else {
|
2022-05-20 10:20:00 +02:00
|
|
|
return List.<LLEntry>of();
|
2021-09-07 19:32:37 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}).flux();
|
|
|
|
} else {
|
|
|
|
return Mono
|
2022-05-20 10:20:00 +02:00
|
|
|
.fromCallable(() -> mapSlice(snapshot, range))
|
2022-03-24 21:14:17 +01:00
|
|
|
.flatMapIterable(SortedMap::entrySet)
|
2021-09-07 19:32:37 +02:00
|
|
|
.groupBy(k -> k.getKey().subList(0, prefixLength))
|
|
|
|
.flatMap(groupedFlux -> groupedFlux
|
2022-05-20 10:20:00 +02:00
|
|
|
.map(entry -> LLEntry.of(kkB(entry.getKey()), kkB(entry.getValue())))
|
2021-09-07 19:32:37 +02:00
|
|
|
.collectList()
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}, range -> Mono.fromRunnable(range::close));
|
2021-07-10 20:52:01 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2022-05-20 10:20:00 +02:00
|
|
|
public Flux<Buffer> getRangeKeys(@Nullable LLSnapshot snapshot,
|
|
|
|
Mono<LLRange> rangeMono,
|
2022-03-24 23:56:23 +01:00
|
|
|
boolean reverse,
|
|
|
|
boolean smallRange) {
|
2021-08-22 18:20:05 +02:00
|
|
|
return Flux.usingWhen(rangeMono,
|
2022-05-20 10:20:00 +02:00
|
|
|
range -> {
|
|
|
|
if (range.isSingle()) {
|
|
|
|
var singleToReceive = range.getSingle();
|
|
|
|
return Mono.fromCallable(() -> {
|
|
|
|
var single = singleToReceive.receive();
|
|
|
|
try {
|
|
|
|
var contains = snapshots.get(resolveSnapshot(snapshot)).containsKey(k(single.copy().send()));
|
|
|
|
return contains ? single : null;
|
|
|
|
} catch (Throwable ex) {
|
|
|
|
single.close();
|
|
|
|
throw ex;
|
|
|
|
}
|
|
|
|
}).flux();
|
|
|
|
} else {
|
|
|
|
return Mono
|
|
|
|
.fromCallable(() -> mapSlice(snapshot, range))
|
|
|
|
.<ByteList>flatMapIterable(map -> {
|
|
|
|
if (reverse) {
|
|
|
|
return map.descendingMap().keySet();
|
|
|
|
} else {
|
|
|
|
return map.keySet();
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.map(this::kkB);
|
2021-08-22 18:20:05 +02:00
|
|
|
}
|
|
|
|
},
|
2021-08-31 15:50:11 +02:00
|
|
|
range -> Mono.fromRunnable(range::close)
|
2021-08-22 18:20:05 +02:00
|
|
|
);
|
2021-07-10 20:52:01 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2022-05-20 10:20:00 +02:00
|
|
|
public Flux<List<Buffer>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot,
|
|
|
|
Mono<LLRange> rangeMono,
|
2022-03-24 23:56:23 +01:00
|
|
|
int prefixLength, boolean smallRange) {
|
2022-05-20 10:20:00 +02:00
|
|
|
return Flux.usingWhen(rangeMono, range -> {
|
|
|
|
try (range) {
|
2021-09-07 19:32:37 +02:00
|
|
|
if (range.isSingle()) {
|
|
|
|
var singleToReceive = range.getSingle();
|
|
|
|
return Mono.fromCallable(() -> {
|
2022-05-20 10:20:00 +02:00
|
|
|
var single = singleToReceive.receive();
|
|
|
|
try {
|
2021-09-07 19:32:37 +02:00
|
|
|
var containsElement = snapshots.get(resolveSnapshot(snapshot)).containsKey(k(single.copy().send()));
|
|
|
|
if (containsElement) {
|
2022-05-20 10:20:00 +02:00
|
|
|
return List.of(single);
|
2021-09-07 19:32:37 +02:00
|
|
|
} else {
|
2022-05-20 10:20:00 +02:00
|
|
|
return List.<Buffer>of();
|
2021-09-07 19:32:37 +02:00
|
|
|
}
|
2022-05-20 10:20:00 +02:00
|
|
|
} catch (Throwable ex) {
|
|
|
|
single.close();
|
|
|
|
throw ex;
|
2021-08-31 15:50:11 +02:00
|
|
|
}
|
2021-09-07 19:32:37 +02:00
|
|
|
}).flux();
|
|
|
|
} else {
|
|
|
|
return Mono
|
2022-05-20 10:20:00 +02:00
|
|
|
.fromCallable(() -> mapSlice(snapshot, range))
|
2022-03-24 21:14:17 +01:00
|
|
|
.flatMapIterable(SortedMap::entrySet)
|
2021-09-07 19:32:37 +02:00
|
|
|
.groupBy(k -> k.getKey().subList(0, prefixLength))
|
|
|
|
.flatMap(groupedFlux -> groupedFlux
|
2022-05-20 10:20:00 +02:00
|
|
|
.map(entry -> kkB(entry.getKey()))
|
2021-09-07 19:32:37 +02:00
|
|
|
.collectList()
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}, range -> Mono.fromRunnable(range::close));
|
2021-07-10 20:52:01 +02:00
|
|
|
}
|
|
|
|
|
2021-09-07 19:32:37 +02:00
|
|
|
@SuppressWarnings("RedundantCast")
|
2021-07-10 20:52:01 +02:00
|
|
|
@Override
|
2022-05-20 10:20:00 +02:00
|
|
|
public Flux<Buffer> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot,
|
|
|
|
Mono<LLRange> rangeMono,
|
2022-03-24 23:56:23 +01:00
|
|
|
int prefixLength, boolean smallRange) {
|
2022-05-20 10:20:00 +02:00
|
|
|
return Flux.usingWhen(rangeMono, range -> {
|
|
|
|
try (range) {
|
2021-09-07 19:32:37 +02:00
|
|
|
if (range.isSingle()) {
|
|
|
|
var singleToReceive = range.getSingle();
|
|
|
|
return Mono.fromCallable(() -> {
|
|
|
|
try (var single = singleToReceive.receive()) {
|
|
|
|
var k = k(single.copy().send());
|
|
|
|
var containsElement = snapshots.get(resolveSnapshot(snapshot)).containsKey(k);
|
|
|
|
if (containsElement) {
|
2022-05-20 10:20:00 +02:00
|
|
|
return kkB(k.subList(0, prefixLength));
|
2021-09-07 19:32:37 +02:00
|
|
|
} else {
|
|
|
|
return null;
|
|
|
|
}
|
2021-08-31 15:50:11 +02:00
|
|
|
}
|
2021-09-07 19:32:37 +02:00
|
|
|
}).flux();
|
|
|
|
} else {
|
|
|
|
return Mono
|
2022-05-20 10:20:00 +02:00
|
|
|
.fromCallable(() -> mapSlice(snapshot, range))
|
2022-03-24 21:14:17 +01:00
|
|
|
.flatMapIterable(SortedMap::entrySet)
|
2021-09-07 19:32:37 +02:00
|
|
|
.map(k -> (ByteList) k.getKey().subList(0, prefixLength))
|
|
|
|
.distinctUntilChanged()
|
2022-05-20 10:20:00 +02:00
|
|
|
.map(this::kkB);
|
2021-09-07 19:32:37 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}, range -> Mono.fromRunnable(range::close));
|
2021-07-10 20:52:01 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2022-05-20 10:20:00 +02:00
|
|
|
public Flux<BadBlock> badBlocks(Mono<LLRange> rangeMono) {
|
2021-07-10 20:52:01 +02:00
|
|
|
return Flux.empty();
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2022-05-20 10:20:00 +02:00
|
|
|
public Mono<Void> setRange(Mono<LLRange> rangeMono, Flux<LLEntry> entries, boolean smallRange) {
|
|
|
|
return Mono.usingWhen(rangeMono, range -> {
|
|
|
|
Mono<Void> clearMono;
|
|
|
|
if (range.isSingle()) {
|
|
|
|
var singleToReceive = range.getSingle();
|
|
|
|
clearMono = Mono.fromRunnable(() -> {
|
|
|
|
try (var single = singleToReceive.receive()) {
|
|
|
|
var k = k(single.copy().send());
|
|
|
|
mainDb.remove(k);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
} else {
|
|
|
|
clearMono = Mono.fromRunnable(() -> mapSlice(null, range).clear());
|
|
|
|
}
|
2021-09-07 19:32:37 +02:00
|
|
|
|
2022-05-20 10:20:00 +02:00
|
|
|
var r = r(range.copy().send());
|
2021-09-07 19:32:37 +02:00
|
|
|
|
2022-05-20 10:20:00 +02:00
|
|
|
return clearMono
|
|
|
|
.thenMany(entries)
|
|
|
|
.doOnNext(entry -> {
|
|
|
|
try (entry) {
|
|
|
|
if (!isInsideRange(r, kShr(entry.getKeyUnsafe()))) {
|
|
|
|
throw new IndexOutOfBoundsException("Trying to set a key outside the range!");
|
2021-09-07 19:32:37 +02:00
|
|
|
}
|
2022-05-20 10:20:00 +02:00
|
|
|
mainDb.put(kShr(entry.getKeyUnsafe()), kShr(entry.getValueUnsafe()));
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.then();
|
2021-09-07 19:32:37 +02:00
|
|
|
}, range -> Mono.fromRunnable(range::close));
|
|
|
|
}
|
|
|
|
|
|
|
|
private boolean isInsideRange(BLRange range, ByteList key) {
|
|
|
|
if (range.isAll()) {
|
|
|
|
return true;
|
|
|
|
} else if (range.isSingle()) {
|
|
|
|
var single = range.getSingle();
|
|
|
|
return Objects.equals(single, key);
|
|
|
|
} else if (range.hasMin() && range.hasMax()) {
|
|
|
|
var min = range.getMin();
|
|
|
|
var max = range.getMax();
|
|
|
|
return min.compareTo(key) <= 0 && max.compareTo(key) > 0;
|
|
|
|
} else if (range.hasMin()) {
|
|
|
|
var min = range.getMin();
|
|
|
|
return min.compareTo(key) <= 0;
|
|
|
|
} else {
|
|
|
|
var max = range.getMax();
|
|
|
|
return max.compareTo(key) > 0;
|
|
|
|
}
|
2021-07-10 20:52:01 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2022-05-20 10:20:00 +02:00
|
|
|
public Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono, boolean fillCache) {
|
2022-03-24 23:56:23 +01:00
|
|
|
return getRangeKeys(snapshot, rangeMono, false, false)
|
2022-05-20 10:20:00 +02:00
|
|
|
.doOnNext(Resource::close)
|
2021-09-07 19:32:37 +02:00
|
|
|
.count()
|
|
|
|
.map(count -> count == 0);
|
2021-07-10 20:52:01 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2022-05-20 10:20:00 +02:00
|
|
|
public Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono, boolean fast) {
|
2021-08-22 18:20:05 +02:00
|
|
|
return Mono.usingWhen(rangeMono,
|
|
|
|
range -> Mono.fromCallable(() -> (long) mapSlice(snapshot, range).size()),
|
2021-08-31 15:50:11 +02:00
|
|
|
range -> Mono.fromRunnable(range::close)
|
2021-08-22 18:20:05 +02:00
|
|
|
);
|
2021-07-10 20:52:01 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2022-05-20 10:20:00 +02:00
|
|
|
public Mono<LLEntry> getOne(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono) {
|
2022-03-24 23:56:23 +01:00
|
|
|
return getRange(snapshot, rangeMono, false, false)
|
2021-09-07 19:32:37 +02:00
|
|
|
.take(1, true)
|
2022-01-26 14:22:54 +01:00
|
|
|
.singleOrEmpty();
|
2021-07-10 20:52:01 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2022-05-20 10:20:00 +02:00
|
|
|
public Mono<Buffer> getOneKey(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono) {
|
2022-03-24 23:56:23 +01:00
|
|
|
return getRangeKeys(snapshot, rangeMono, false, false)
|
2021-09-07 19:32:37 +02:00
|
|
|
.take(1, true)
|
2022-01-26 14:22:54 +01:00
|
|
|
.singleOrEmpty();
|
2021-07-10 20:52:01 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2022-05-20 10:20:00 +02:00
|
|
|
public Mono<LLEntry> removeOne(Mono<LLRange> rangeMono) {
|
|
|
|
return Mono.usingWhen(rangeMono, range -> {
|
|
|
|
try (range) {
|
2021-09-07 19:32:37 +02:00
|
|
|
if (range.isSingle()) {
|
|
|
|
var singleToReceive = range.getSingle();
|
|
|
|
return Mono.fromCallable(() -> {
|
|
|
|
try (var single = singleToReceive.receive()) {
|
|
|
|
var element = mainDb.remove(k(single.copy().send()));
|
|
|
|
if (element != null) {
|
2022-05-20 10:20:00 +02:00
|
|
|
return LLEntry.of(single, kkB(element));
|
2021-09-07 19:32:37 +02:00
|
|
|
} else {
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
} else {
|
|
|
|
return Mono
|
2022-05-20 10:20:00 +02:00
|
|
|
.fromCallable(() -> mapSlice(null, range))
|
2021-09-07 19:32:37 +02:00
|
|
|
.mapNotNull(map -> {
|
|
|
|
var it = map.entrySet().iterator();
|
|
|
|
if (it.hasNext()) {
|
|
|
|
var next = it.next();
|
|
|
|
it.remove();
|
|
|
|
return next;
|
|
|
|
} else {
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
})
|
2022-05-20 10:20:00 +02:00
|
|
|
.map(entry -> LLEntry.of(kkB(entry.getKey()), kkB(entry.getValue())));
|
2021-09-07 19:32:37 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}, range -> Mono.fromRunnable(range::close));
|
2021-07-10 20:52:01 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public String getDatabaseName() {
|
|
|
|
return databaseName;
|
|
|
|
}
|
|
|
|
}
|