Fully implement memory dictionary

This commit is contained in:
Andrea Cavalli 2021-09-07 19:32:37 +02:00
parent a662033228
commit 57a0206cf1
3 changed files with 258 additions and 53 deletions

View File

@ -1111,8 +1111,9 @@ public class LLLocalDictionary implements LLDictionary {
public Flux<Send<LLEntry>> putMulti(Flux<Send<LLEntry>> entries, boolean getOldValues) { public Flux<Send<LLEntry>> putMulti(Flux<Send<LLEntry>> entries, boolean getOldValues) {
return entries return entries
.buffer(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP)) .buffer(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP))
.flatMapSequential(ew -> this .publishOn(dbScheduler)
.<List<Send<LLEntry>>>runOnDb(() -> { .flatMapSequential(ew -> Mono
.<List<Send<LLEntry>>>fromCallable(() -> {
var entriesWindow = new ArrayList<LLEntry>(ew.size()); var entriesWindow = new ArrayList<LLEntry>(ew.size());
for (Send<LLEntry> entrySend : ew) { for (Send<LLEntry> entrySend : ew) {
entriesWindow.add(entrySend.receive()); entriesWindow.add(entrySend.receive());

View File

@ -0,0 +1,47 @@
package it.cavallium.dbengine.database.memory;
import it.unimi.dsi.fastutil.bytes.ByteList;
public class BLRange {
private final ByteList min;
private final ByteList max;
private final ByteList single;
public BLRange(ByteList min, ByteList max, ByteList single) {
if (single != null && (min != null || max != null)) {
throw new IllegalArgumentException();
}
this.min = min;
this.max = max;
this.single = single;
}
public ByteList getMin() {
return min;
}
public ByteList getMax() {
return max;
}
public ByteList getSingle() {
return single;
}
public boolean isSingle() {
return single != null;
}
public boolean isAll() {
return single == null && min == null && max == null;
}
public boolean hasMin() {
return min != null;
}
public boolean hasMax() {
return max != null;
}
}

View File

@ -20,6 +20,7 @@ import it.unimi.dsi.fastutil.bytes.ByteList;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
@ -103,6 +104,22 @@ public class LLMemoryDictionary implements LLDictionary {
} }
} }
private BLRange r(Send<LLRange> send) {
try(var range = send.receive()) {
if (range.isAll()) {
return new BLRange(null, null, null);
} else if (range.isSingle()) {
return new BLRange(null, null, k(range.getSingle()));
} else if (range.hasMin() && range.hasMax()) {
return new BLRange(k(range.getMin()), k(range.getMax()), null);
} else if (range.hasMin()) {
return new BLRange(k(range.getMin()), null, null);
} else {
return new BLRange(k(range.getMax()), null, null);
}
}
}
private Map<ByteList, ByteList> mapSlice(LLSnapshot snapshot, Send<LLRange> rangeToReceive) { private Map<ByteList, ByteList> mapSlice(LLSnapshot snapshot, Send<LLRange> rangeToReceive) {
try (var range = rangeToReceive.receive()) { try (var range = rangeToReceive.receive()) {
if (range.isAll()) { if (range.isAll()) {
@ -186,21 +203,25 @@ public class LLMemoryDictionary implements LLDictionary {
if (old != null) { if (old != null) {
oldRef.set(kk(old)); oldRef.set(kk(old));
} }
Send<Buffer> v = null; Send<Buffer> v;
try { try {
v = updater.apply(old != null ? kk(old) : null); v = updater.apply(old != null ? kk(old) : null);
} catch (SerializationException e) { } catch (SerializationException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
try { try {
return k(v); if (v != null) {
return k(v);
} else {
return null;
}
} finally { } finally {
if (v != null) { if (v != null) {
v.close(); v.close();
} }
} }
}); });
return LLDelta.of(oldRef.get(), kk(newValue)); return LLDelta.of(oldRef.get(), newValue != null ? kk(newValue) : null);
}), }),
key -> Mono.fromRunnable(key::close) key -> Mono.fromRunnable(key::close)
); );
@ -257,11 +278,9 @@ public class LLMemoryDictionary implements LLDictionary {
try (var entry = entryToReceive.receive()) { try (var entry = entryToReceive.receive()) {
try (var key = entry.getKey().receive()) { try (var key = entry.getKey().receive()) {
try (var val = entry.getValue().receive()) { try (var val = entry.getValue().receive()) {
var v = mainDb.put(k(key.copy().send()), k(val.send())); var oldValue = mainDb.put(k(key.copy().send()), k(val.send()));
if (v == null || !getOldValues) { if (oldValue != null && getOldValues) {
sink.complete(); sink.next(LLEntry.of(key.send(), kk(oldValue)).send());
} else {
sink.next(LLEntry.of(key.send(), kk(v)).send());
} }
} }
} }
@ -286,7 +305,11 @@ public class LLMemoryDictionary implements LLDictionary {
return Mono.fromCallable(() -> { return Mono.fromCallable(() -> {
try (var single = singleToReceive.receive()) { try (var single = singleToReceive.receive()) {
var element = snapshots.get(resolveSnapshot(snapshot)).get(k(single.copy().send())); var element = snapshots.get(resolveSnapshot(snapshot)).get(k(single.copy().send()));
return LLEntry.of(single.send(), kk(element)).send(); if (element != null) {
return LLEntry.of(single.send(), kk(element)).send();
} else {
return null;
}
} }
}).flux(); }).flux();
} else { } else {
@ -305,7 +328,33 @@ public class LLMemoryDictionary implements LLDictionary {
Mono<Send<LLRange>> rangeMono, Mono<Send<LLRange>> rangeMono,
int prefixLength, int prefixLength,
boolean existsAlmostCertainly) { boolean existsAlmostCertainly) {
return Flux.error(new UnsupportedOperationException("Not implemented")); return Flux.usingWhen(rangeMono, rangeToReceive -> {
try (var range = rangeToReceive.receive()) {
if (range.isSingle()) {
var singleToReceive = range.getSingle();
return Mono.fromCallable(() -> {
try (var single = singleToReceive.receive()) {
var element = snapshots.get(resolveSnapshot(snapshot)).get(k(single.copy().send()));
if (element != null) {
return List.of(LLEntry.of(single.send(), kk(element)).send());
} else {
return List.<Send<LLEntry>>of();
}
}
}).flux();
} else {
var rangeToReceive2 = range.send();
return Mono
.fromCallable(() -> mapSlice(snapshot, rangeToReceive2))
.flatMapMany(map -> Flux.fromIterable(map.entrySet()))
.groupBy(k -> k.getKey().subList(0, prefixLength))
.flatMap(groupedFlux -> groupedFlux
.map(entry -> LLEntry.of(kk(entry.getKey()), kk(entry.getValue())).send())
.collectList()
);
}
}
}, range -> Mono.fromRunnable(range::close));
} }
@Override @Override
@ -334,54 +383,70 @@ public class LLMemoryDictionary implements LLDictionary {
); );
} }
private static record BufferWithPrefix(Send<Buffer> buffer, Send<Buffer> prefix) {}
@Override @Override
public Flux<List<Send<Buffer>>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, public Flux<List<Send<Buffer>>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot,
Mono<Send<LLRange>> rangeMono, Mono<Send<LLRange>> rangeMono,
int prefixLength) { int prefixLength) {
return getRangeKeys(snapshot, rangeMono) return Flux.usingWhen(rangeMono, rangeToReceive -> {
.map(bufferToReceive -> { try (var range = rangeToReceive.receive()) {
try(var buffer = bufferToReceive.receive()) { if (range.isSingle()) {
try (var bufferPrefix = buffer.copy(buffer.readerOffset(), prefixLength)) { var singleToReceive = range.getSingle();
return new BufferWithPrefix(buffer.send(), bufferPrefix.send()); return Mono.fromCallable(() -> {
try (var single = singleToReceive.receive()) {
var containsElement = snapshots.get(resolveSnapshot(snapshot)).containsKey(k(single.copy().send()));
if (containsElement) {
return List.of(single.send());
} else {
return List.<Send<Buffer>>of();
}
} }
} }).flux();
}) } else {
.windowUntilChanged(bufferTuple -> bufferTuple.prefix().receive(), LLUtils::equals) var rangeToReceive2 = range.send();
.flatMapSequential(window -> window.map(tuple -> { return Mono
try (var ignored = tuple.prefix()) { .fromCallable(() -> mapSlice(snapshot, rangeToReceive2))
return tuple.buffer(); .flatMapMany(map -> Flux.fromIterable(map.entrySet()))
} .groupBy(k -> k.getKey().subList(0, prefixLength))
}).collectList()); .flatMap(groupedFlux -> groupedFlux
.map(entry -> kk(entry.getKey()))
.collectList()
);
}
}
}, range -> Mono.fromRunnable(range::close));
} }
@SuppressWarnings("RedundantCast")
@Override @Override
public Flux<Send<Buffer>> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, public Flux<Send<Buffer>> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot,
Mono<Send<LLRange>> rangeMono, Mono<Send<LLRange>> rangeMono,
int prefixLength) { int prefixLength) {
return getRangeKeys(snapshot, rangeMono) return Flux.usingWhen(rangeMono, rangeToReceive -> {
.map(bufferToReceive -> { try (var range = rangeToReceive.receive()) {
try(var buffer = bufferToReceive.receive()) { if (range.isSingle()) {
try (var bufferPrefix = buffer.copy(buffer.readerOffset(), prefixLength)) { var singleToReceive = range.getSingle();
return new BufferWithPrefix(buffer.send(), bufferPrefix.send()); return Mono.fromCallable(() -> {
try (var single = singleToReceive.receive()) {
var k = k(single.copy().send());
var containsElement = snapshots.get(resolveSnapshot(snapshot)).containsKey(k);
if (containsElement) {
return kk(k.subList(0, prefixLength));
} else {
return null;
}
} }
} }).flux();
}) } else {
.distinctUntilChanged(bufferTuple -> bufferTuple.prefix().receive(), (a, b) -> { var rangeToReceive2 = range.send();
if (LLUtils.equals(a, b)) { return Mono
b.close(); .fromCallable(() -> mapSlice(snapshot, rangeToReceive2))
return true; .flatMapMany(map -> Flux.fromIterable(map.entrySet()))
} else { .map(k -> (ByteList) k.getKey().subList(0, prefixLength))
return false; .distinctUntilChanged()
} .map(this::kk);
}) }
.map(tuple -> { }
try (var ignored = tuple.prefix()) { }, range -> Mono.fromRunnable(range::close));
return tuple.buffer();
}
})
.transform(LLUtils::handleDiscard);
} }
@Override @Override
@ -391,12 +456,67 @@ public class LLMemoryDictionary implements LLDictionary {
@Override @Override
public Mono<Void> setRange(Mono<Send<LLRange>> rangeMono, Flux<Send<LLEntry>> entries) { public Mono<Void> setRange(Mono<Send<LLRange>> rangeMono, Flux<Send<LLEntry>> entries) {
return Mono.error(new UnsupportedOperationException("Not implemented")); return Mono.usingWhen(rangeMono, rangeToReceive -> {
try (var range = rangeToReceive.receive()) {
Mono<Void> clearMono;
if (range.isSingle()) {
var singleToReceive = range.getSingle();
clearMono = Mono.fromRunnable(() -> {
try (var single = singleToReceive.receive()) {
var k = k(single.copy().send());
mainDb.remove(k);
}
});
} else {
var rangeToReceive2 = range.copy().send();
clearMono = Mono.fromRunnable(() -> mapSlice(null, rangeToReceive2).clear());
}
var r = r(range.copy().send());
return clearMono
.thenMany(entries)
.doOnNext(entryToReceive -> {
try (var entry = entryToReceive.receive()) {
if (!isInsideRange(r, k(entry.getKey()))) {
throw new IndexOutOfBoundsException("Trying to set a key outside the range!");
}
mainDb.put(k(entry.getKey()), k(entry.getValue()));
}
})
.then();
}
}, range -> Mono.fromRunnable(range::close));
}
private boolean isInsideRange(BLRange range, ByteList key) {
if (range.isAll()) {
return true;
} else if (range.isSingle()) {
var single = range.getSingle();
return Objects.equals(single, key);
} else if (range.hasMin() && range.hasMax()) {
var min = range.getMin();
var max = range.getMax();
return min.compareTo(key) <= 0 && max.compareTo(key) > 0;
} else if (range.hasMin()) {
var min = range.getMin();
return min.compareTo(key) <= 0;
} else {
var max = range.getMax();
return max.compareTo(key) > 0;
}
} }
@Override @Override
public Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) { public Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) {
return Mono.error(new UnsupportedOperationException("Not implemented")); return getRangeKeys(snapshot, rangeMono)
.map(buf -> {
buf.receive().close();
return true;
})
.count()
.map(count -> count == 0);
} }
@Override @Override
@ -409,17 +529,54 @@ public class LLMemoryDictionary implements LLDictionary {
@Override @Override
public Mono<Send<LLEntry>> getOne(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) { public Mono<Send<LLEntry>> getOne(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) {
return Mono.error(new UnsupportedOperationException("Not implemented")); return getRange(snapshot, rangeMono)
.take(1, true)
.singleOrEmpty()
.doOnDiscard(Send.class, Send::close);
} }
@Override @Override
public Mono<Send<Buffer>> getOneKey(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) { public Mono<Send<Buffer>> getOneKey(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) {
return Mono.error(new UnsupportedOperationException("Not implemented")); return getRangeKeys(snapshot, rangeMono)
.take(1, true)
.singleOrEmpty()
.doOnDiscard(Send.class, Send::close);
} }
@Override @Override
public Mono<Send<LLEntry>> removeOne(Mono<Send<LLRange>> rangeMono) { public Mono<Send<LLEntry>> removeOne(Mono<Send<LLRange>> rangeMono) {
return Mono.error(new UnsupportedOperationException("Not implemented")); return Mono.usingWhen(rangeMono, rangeToReceive -> {
try (var range = rangeToReceive.receive()) {
if (range.isSingle()) {
var singleToReceive = range.getSingle();
return Mono.fromCallable(() -> {
try (var single = singleToReceive.receive()) {
var element = mainDb.remove(k(single.copy().send()));
if (element != null) {
return LLEntry.of(single.send(), kk(element)).send();
} else {
return null;
}
}
});
} else {
var rangeToReceive2 = range.send();
return Mono
.fromCallable(() -> mapSlice(null, rangeToReceive2))
.mapNotNull(map -> {
var it = map.entrySet().iterator();
if (it.hasNext()) {
var next = it.next();
it.remove();
return next;
} else {
return null;
}
})
.map(entry -> LLEntry.of(kk(entry.getKey()), kk(entry.getValue())).send());
}
}
}, range -> Mono.fromRunnable(range::close));
} }
@Override @Override