From 57a0206cf1884b82adb918007556b700b9ed190d Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 7 Sep 2021 19:32:37 +0200 Subject: [PATCH] Fully implement memory dictionary --- .../database/disk/LLLocalDictionary.java | 5 +- .../dbengine/database/memory/BLRange.java | 47 ++++ .../database/memory/LLMemoryDictionary.java | 259 ++++++++++++++---- 3 files changed, 258 insertions(+), 53 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/database/memory/BLRange.java diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 4883cb7..2cfdc61 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -1111,8 +1111,9 @@ public class LLLocalDictionary implements LLDictionary { public Flux> putMulti(Flux> entries, boolean getOldValues) { return entries .buffer(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP)) - .flatMapSequential(ew -> this - .>>runOnDb(() -> { + .publishOn(dbScheduler) + .flatMapSequential(ew -> Mono + .>>fromCallable(() -> { var entriesWindow = new ArrayList(ew.size()); for (Send entrySend : ew) { entriesWindow.add(entrySend.receive()); diff --git a/src/main/java/it/cavallium/dbengine/database/memory/BLRange.java b/src/main/java/it/cavallium/dbengine/database/memory/BLRange.java new file mode 100644 index 0000000..e97530e --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/memory/BLRange.java @@ -0,0 +1,47 @@ +package it.cavallium.dbengine.database.memory; + +import it.unimi.dsi.fastutil.bytes.ByteList; + +public class BLRange { + + private final ByteList min; + private final ByteList max; + private final ByteList single; + + public BLRange(ByteList min, ByteList max, ByteList single) { + if (single != null && (min != null || max != null)) { + throw new IllegalArgumentException(); + } + this.min = min; + this.max = max; + this.single = single; + } + + public ByteList getMin() { + return min; + } + + public ByteList getMax() { + return max; + } + + public ByteList getSingle() { + return single; + } + + public boolean isSingle() { + return single != null; + } + + public boolean isAll() { + return single == null && min == null && max == null; + } + + public boolean hasMin() { + return min != null; + } + + public boolean hasMax() { + return max != null; + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java index dd570f1..fef921f 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java @@ -20,6 +20,7 @@ import it.unimi.dsi.fastutil.bytes.ByteList; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; @@ -103,6 +104,22 @@ public class LLMemoryDictionary implements LLDictionary { } } + private BLRange r(Send send) { + try(var range = send.receive()) { + if (range.isAll()) { + return new BLRange(null, null, null); + } else if (range.isSingle()) { + return new BLRange(null, null, k(range.getSingle())); + } else if (range.hasMin() && range.hasMax()) { + return new BLRange(k(range.getMin()), k(range.getMax()), null); + } else if (range.hasMin()) { + return new BLRange(k(range.getMin()), null, null); + } else { + return new BLRange(k(range.getMax()), null, null); + } + } + } + private Map mapSlice(LLSnapshot snapshot, Send rangeToReceive) { try (var range = rangeToReceive.receive()) { if (range.isAll()) { @@ -186,21 +203,25 @@ public class LLMemoryDictionary implements LLDictionary { if (old != null) { oldRef.set(kk(old)); } - Send v = null; + Send v; try { v = updater.apply(old != null ? kk(old) : null); } catch (SerializationException e) { throw new IllegalStateException(e); } try { - return k(v); + if (v != null) { + return k(v); + } else { + return null; + } } finally { if (v != null) { v.close(); } } }); - return LLDelta.of(oldRef.get(), kk(newValue)); + return LLDelta.of(oldRef.get(), newValue != null ? kk(newValue) : null); }), key -> Mono.fromRunnable(key::close) ); @@ -257,11 +278,9 @@ public class LLMemoryDictionary implements LLDictionary { try (var entry = entryToReceive.receive()) { try (var key = entry.getKey().receive()) { try (var val = entry.getValue().receive()) { - var v = mainDb.put(k(key.copy().send()), k(val.send())); - if (v == null || !getOldValues) { - sink.complete(); - } else { - sink.next(LLEntry.of(key.send(), kk(v)).send()); + var oldValue = mainDb.put(k(key.copy().send()), k(val.send())); + if (oldValue != null && getOldValues) { + sink.next(LLEntry.of(key.send(), kk(oldValue)).send()); } } } @@ -286,7 +305,11 @@ public class LLMemoryDictionary implements LLDictionary { return Mono.fromCallable(() -> { try (var single = singleToReceive.receive()) { var element = snapshots.get(resolveSnapshot(snapshot)).get(k(single.copy().send())); - return LLEntry.of(single.send(), kk(element)).send(); + if (element != null) { + return LLEntry.of(single.send(), kk(element)).send(); + } else { + return null; + } } }).flux(); } else { @@ -305,7 +328,33 @@ public class LLMemoryDictionary implements LLDictionary { Mono> rangeMono, int prefixLength, boolean existsAlmostCertainly) { - return Flux.error(new UnsupportedOperationException("Not implemented")); + return Flux.usingWhen(rangeMono, rangeToReceive -> { + try (var range = rangeToReceive.receive()) { + if (range.isSingle()) { + var singleToReceive = range.getSingle(); + return Mono.fromCallable(() -> { + try (var single = singleToReceive.receive()) { + var element = snapshots.get(resolveSnapshot(snapshot)).get(k(single.copy().send())); + if (element != null) { + return List.of(LLEntry.of(single.send(), kk(element)).send()); + } else { + return List.>of(); + } + } + }).flux(); + } else { + var rangeToReceive2 = range.send(); + return Mono + .fromCallable(() -> mapSlice(snapshot, rangeToReceive2)) + .flatMapMany(map -> Flux.fromIterable(map.entrySet())) + .groupBy(k -> k.getKey().subList(0, prefixLength)) + .flatMap(groupedFlux -> groupedFlux + .map(entry -> LLEntry.of(kk(entry.getKey()), kk(entry.getValue())).send()) + .collectList() + ); + } + } + }, range -> Mono.fromRunnable(range::close)); } @Override @@ -334,54 +383,70 @@ public class LLMemoryDictionary implements LLDictionary { ); } - private static record BufferWithPrefix(Send buffer, Send prefix) {} - @Override public Flux>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, Mono> rangeMono, int prefixLength) { - return getRangeKeys(snapshot, rangeMono) - .map(bufferToReceive -> { - try(var buffer = bufferToReceive.receive()) { - try (var bufferPrefix = buffer.copy(buffer.readerOffset(), prefixLength)) { - return new BufferWithPrefix(buffer.send(), bufferPrefix.send()); + return Flux.usingWhen(rangeMono, rangeToReceive -> { + try (var range = rangeToReceive.receive()) { + if (range.isSingle()) { + var singleToReceive = range.getSingle(); + return Mono.fromCallable(() -> { + try (var single = singleToReceive.receive()) { + var containsElement = snapshots.get(resolveSnapshot(snapshot)).containsKey(k(single.copy().send())); + if (containsElement) { + return List.of(single.send()); + } else { + return List.>of(); + } } - } - }) - .windowUntilChanged(bufferTuple -> bufferTuple.prefix().receive(), LLUtils::equals) - .flatMapSequential(window -> window.map(tuple -> { - try (var ignored = tuple.prefix()) { - return tuple.buffer(); - } - }).collectList()); + }).flux(); + } else { + var rangeToReceive2 = range.send(); + return Mono + .fromCallable(() -> mapSlice(snapshot, rangeToReceive2)) + .flatMapMany(map -> Flux.fromIterable(map.entrySet())) + .groupBy(k -> k.getKey().subList(0, prefixLength)) + .flatMap(groupedFlux -> groupedFlux + .map(entry -> kk(entry.getKey())) + .collectList() + ); + } + } + }, range -> Mono.fromRunnable(range::close)); } + @SuppressWarnings("RedundantCast") @Override public Flux> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono> rangeMono, int prefixLength) { - return getRangeKeys(snapshot, rangeMono) - .map(bufferToReceive -> { - try(var buffer = bufferToReceive.receive()) { - try (var bufferPrefix = buffer.copy(buffer.readerOffset(), prefixLength)) { - return new BufferWithPrefix(buffer.send(), bufferPrefix.send()); + return Flux.usingWhen(rangeMono, rangeToReceive -> { + try (var range = rangeToReceive.receive()) { + if (range.isSingle()) { + var singleToReceive = range.getSingle(); + return Mono.fromCallable(() -> { + try (var single = singleToReceive.receive()) { + var k = k(single.copy().send()); + var containsElement = snapshots.get(resolveSnapshot(snapshot)).containsKey(k); + if (containsElement) { + return kk(k.subList(0, prefixLength)); + } else { + return null; + } } - } - }) - .distinctUntilChanged(bufferTuple -> bufferTuple.prefix().receive(), (a, b) -> { - if (LLUtils.equals(a, b)) { - b.close(); - return true; - } else { - return false; - } - }) - .map(tuple -> { - try (var ignored = tuple.prefix()) { - return tuple.buffer(); - } - }) - .transform(LLUtils::handleDiscard); + }).flux(); + } else { + var rangeToReceive2 = range.send(); + return Mono + .fromCallable(() -> mapSlice(snapshot, rangeToReceive2)) + .flatMapMany(map -> Flux.fromIterable(map.entrySet())) + .map(k -> (ByteList) k.getKey().subList(0, prefixLength)) + .distinctUntilChanged() + .map(this::kk); + } + } + }, range -> Mono.fromRunnable(range::close)); } @Override @@ -391,12 +456,67 @@ public class LLMemoryDictionary implements LLDictionary { @Override public Mono setRange(Mono> rangeMono, Flux> entries) { - return Mono.error(new UnsupportedOperationException("Not implemented")); + return Mono.usingWhen(rangeMono, rangeToReceive -> { + try (var range = rangeToReceive.receive()) { + Mono clearMono; + if (range.isSingle()) { + var singleToReceive = range.getSingle(); + clearMono = Mono.fromRunnable(() -> { + try (var single = singleToReceive.receive()) { + var k = k(single.copy().send()); + mainDb.remove(k); + } + }); + } else { + var rangeToReceive2 = range.copy().send(); + clearMono = Mono.fromRunnable(() -> mapSlice(null, rangeToReceive2).clear()); + } + + var r = r(range.copy().send()); + + return clearMono + .thenMany(entries) + .doOnNext(entryToReceive -> { + try (var entry = entryToReceive.receive()) { + if (!isInsideRange(r, k(entry.getKey()))) { + throw new IndexOutOfBoundsException("Trying to set a key outside the range!"); + } + mainDb.put(k(entry.getKey()), k(entry.getValue())); + } + }) + .then(); + } + }, range -> Mono.fromRunnable(range::close)); + } + + private boolean isInsideRange(BLRange range, ByteList key) { + if (range.isAll()) { + return true; + } else if (range.isSingle()) { + var single = range.getSingle(); + return Objects.equals(single, key); + } else if (range.hasMin() && range.hasMax()) { + var min = range.getMin(); + var max = range.getMax(); + return min.compareTo(key) <= 0 && max.compareTo(key) > 0; + } else if (range.hasMin()) { + var min = range.getMin(); + return min.compareTo(key) <= 0; + } else { + var max = range.getMax(); + return max.compareTo(key) > 0; + } } @Override public Mono isRangeEmpty(@Nullable LLSnapshot snapshot, Mono> rangeMono) { - return Mono.error(new UnsupportedOperationException("Not implemented")); + return getRangeKeys(snapshot, rangeMono) + .map(buf -> { + buf.receive().close(); + return true; + }) + .count() + .map(count -> count == 0); } @Override @@ -409,17 +529,54 @@ public class LLMemoryDictionary implements LLDictionary { @Override public Mono> getOne(@Nullable LLSnapshot snapshot, Mono> rangeMono) { - return Mono.error(new UnsupportedOperationException("Not implemented")); + return getRange(snapshot, rangeMono) + .take(1, true) + .singleOrEmpty() + .doOnDiscard(Send.class, Send::close); } @Override public Mono> getOneKey(@Nullable LLSnapshot snapshot, Mono> rangeMono) { - return Mono.error(new UnsupportedOperationException("Not implemented")); + return getRangeKeys(snapshot, rangeMono) + .take(1, true) + .singleOrEmpty() + .doOnDiscard(Send.class, Send::close); } @Override public Mono> removeOne(Mono> rangeMono) { - return Mono.error(new UnsupportedOperationException("Not implemented")); + return Mono.usingWhen(rangeMono, rangeToReceive -> { + try (var range = rangeToReceive.receive()) { + if (range.isSingle()) { + var singleToReceive = range.getSingle(); + return Mono.fromCallable(() -> { + try (var single = singleToReceive.receive()) { + var element = mainDb.remove(k(single.copy().send())); + if (element != null) { + return LLEntry.of(single.send(), kk(element)).send(); + } else { + return null; + } + } + }); + } else { + var rangeToReceive2 = range.send(); + return Mono + .fromCallable(() -> mapSlice(null, rangeToReceive2)) + .mapNotNull(map -> { + var it = map.entrySet().iterator(); + if (it.hasNext()) { + var next = it.next(); + it.remove(); + return next; + } else { + return null; + } + }) + .map(entry -> LLEntry.of(kk(entry.getKey()), kk(entry.getValue())).send()); + } + } + }, range -> Mono.fromRunnable(range::close)); } @Override