Reenable map dictionary range
This commit is contained in:
parent
aabf925f2a
commit
a97613284c
@ -2,60 +2,160 @@ package it.cavallium.dbengine.client;
|
||||
|
||||
import io.netty.buffer.Unpooled;
|
||||
import it.cavallium.dbengine.database.Column;
|
||||
import it.cavallium.dbengine.database.LLKeyValueDatabase;
|
||||
import it.cavallium.dbengine.database.collections.DatabaseMapDictionary;
|
||||
import it.cavallium.dbengine.database.collections.FixedLengthSerializer;
|
||||
import it.cavallium.dbengine.database.collections.SubStageGetterSingle;
|
||||
import it.cavallium.dbengine.database.collections.SubStageGetterSingleBytes;
|
||||
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
|
||||
import java.nio.file.Path;
|
||||
import java.text.DecimalFormat;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.Locale;
|
||||
import java.util.function.Function;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.Sinks;
|
||||
import reactor.core.publisher.Sinks.One;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
import reactor.util.function.Tuples;
|
||||
|
||||
public class Example {
|
||||
|
||||
private static final boolean printPreviousValue = false;
|
||||
|
||||
public static void main(String[] args) {
|
||||
System.out.println("Test");
|
||||
var ssg = new SubStageGetterSingle();
|
||||
testAtPut();
|
||||
testPutValueAndGetPrevious();
|
||||
testPutValue()
|
||||
.subscribeOn(Schedulers.parallel())
|
||||
.blockOptional();
|
||||
}
|
||||
|
||||
private static Mono<Void> testAtPut() {
|
||||
var ssg = new SubStageGetterSingleBytes();
|
||||
var ser = FixedLengthSerializer.noop(4);
|
||||
var itemKey = new byte[] {0, 1, 2, 3};
|
||||
var newValue = new byte[] {4, 5, 6, 7};
|
||||
var itemKey = new byte[]{0, 1, 2, 3};
|
||||
var newValue = new byte[]{4, 5, 6, 7};
|
||||
var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey);
|
||||
One<Instant> instantFirst = Sinks.one();
|
||||
One<Instant> instantSecond = Sinks.one();
|
||||
One<Instant> instantThird = Sinks.one();
|
||||
AtomicInteger ai = new AtomicInteger(0);
|
||||
new LLLocalDatabaseConnection(Path.of("/tmp/"), true)
|
||||
.connect()
|
||||
.flatMap(conn -> conn.getDatabase("testdb", List.of(Column.dictionary("testmap")), false))
|
||||
.flatMap(db -> db.getDictionary("testmap"))
|
||||
.map(dictionary -> new DatabaseMapDictionary<>(dictionary, ssg, ser, 10))
|
||||
.flatMapMany(map -> Mono
|
||||
return test("MapDictionary::at::put (same key, same value)",
|
||||
tempDb()
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ssg, ser))),
|
||||
tuple -> Mono
|
||||
.defer(() -> Mono
|
||||
.fromRunnable(() -> System.out.println("Setting new value at key " + Arrays.toString(itemKey) + ": " + Arrays.toString(newValue)))
|
||||
.doOnSuccess(s -> instantFirst.tryEmitValue(Instant.now()))
|
||||
.flatMap(s -> map.at(null, itemKeyBuffer))
|
||||
.doOnSuccess(s -> instantSecond.tryEmitValue(Instant.now()))
|
||||
.flatMap(handle -> handle.setAndGetPrevious(newValue))
|
||||
.doOnSuccess(s -> instantThird.tryEmitValue(Instant.now()))
|
||||
.doOnSuccess(oldValue -> System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue))))
|
||||
.then(Mono.zip(instantFirst.asMono(), instantSecond.asMono(), instantThird.asMono()))
|
||||
.doOnSuccess(s -> {
|
||||
var dur1 = Duration.between(s.getT1(), s.getT2());
|
||||
var dur2 = Duration.between(s.getT2(), s.getT3());
|
||||
var durtot = Duration.between(s.getT1(), s.getT3());
|
||||
System.out.println("Iteration " + ai.incrementAndGet());
|
||||
System.out.println("Time to get value reference: " + dur1.toMillisPart() + "ms " + dur1.toNanosPart() + "ns");
|
||||
System.out.println("Time to set new value and get previous: " + dur2.toMillisPart() + "ms " + dur2.toNanosPart() + "ns");
|
||||
System.out.println("(Total time) " + durtot.toMillisPart() + "ms " + durtot.toNanosPart() + "ns");
|
||||
.fromRunnable(() -> {
|
||||
if (printPreviousValue)
|
||||
System.out.println("Setting new value at key " + Arrays.toString(itemKey) + ": " + Arrays.toString(newValue));
|
||||
})
|
||||
)
|
||||
.repeat(100)
|
||||
)
|
||||
.blockLast();
|
||||
.then(tuple.getT2().at(null, itemKeyBuffer))
|
||||
.flatMap(handle -> handle.setAndGetPrevious(newValue))
|
||||
.doOnSuccess(oldValue -> {
|
||||
if (printPreviousValue)
|
||||
System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue)));
|
||||
})
|
||||
),
|
||||
100000,
|
||||
tuple -> tuple.getT1().close());
|
||||
}
|
||||
|
||||
private static Mono<Void> testPutValueAndGetPrevious() {
|
||||
var ssg = new SubStageGetterSingleBytes();
|
||||
var ser = FixedLengthSerializer.noop(4);
|
||||
var itemKey = new byte[]{0, 1, 2, 3};
|
||||
var newValue = new byte[]{4, 5, 6, 7};
|
||||
var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey);
|
||||
return test("MapDictionary::putValueAndGetPrevious (same key, same value)",
|
||||
tempDb()
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ssg, ser))),
|
||||
tuple -> Mono
|
||||
.defer(() -> Mono
|
||||
.fromRunnable(() -> {
|
||||
if (printPreviousValue)
|
||||
System.out.println("Setting new value at key " + Arrays.toString(itemKey) + ": " + Arrays.toString(newValue));
|
||||
})
|
||||
.then(tuple.getT2().putValueAndGetPrevious(itemKeyBuffer, newValue))
|
||||
.doOnSuccess(oldValue -> {
|
||||
if (printPreviousValue)
|
||||
System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue)));
|
||||
})
|
||||
),
|
||||
10000,
|
||||
tuple -> tuple.getT1().close());
|
||||
}
|
||||
|
||||
private static Mono<Void> testPutValue() {
|
||||
var ssg = new SubStageGetterSingleBytes();
|
||||
var ser = FixedLengthSerializer.noop(4);
|
||||
var itemKey = new byte[]{0, 1, 2, 3};
|
||||
var newValue = new byte[]{4, 5, 6, 7};
|
||||
var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey);
|
||||
return test("MapDictionary::putValue (same key, same value)",
|
||||
tempDb()
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ssg, ser))),
|
||||
tuple -> Mono
|
||||
.defer(() -> Mono
|
||||
.fromRunnable(() -> {
|
||||
if (printPreviousValue)
|
||||
System.out.println("Setting new value at key " + Arrays.toString(itemKey) + ": " + Arrays.toString(newValue));
|
||||
})
|
||||
.then(tuple.getT2().putValue(itemKeyBuffer, newValue))
|
||||
),
|
||||
10000,
|
||||
tuple -> tuple.getT1().close());
|
||||
}
|
||||
|
||||
private static <U> Mono<? extends LLKeyValueDatabase> tempDb() {
|
||||
return new LLLocalDatabaseConnection(Path.of("/tmp/"), true)
|
||||
.connect()
|
||||
.flatMap(conn -> conn.getDatabase("testdb", List.of(Column.dictionary("testmap")), false));
|
||||
}
|
||||
|
||||
public static <A, B, C> Mono<Void> test(String name, Mono<A> setup, Function<A, Mono<B>> test, long numRepeats, Function<A, Mono<C>> close) {
|
||||
One<Instant> instantInit = Sinks.one();
|
||||
One<Instant> instantInitTest = Sinks.one();
|
||||
One<Instant> instantEndTest = Sinks.one();
|
||||
One<Instant> instantEnd = Sinks.one();
|
||||
return Mono
|
||||
.fromRunnable(() -> instantInit.tryEmitValue(now()))
|
||||
.then(setup)
|
||||
.doOnSuccess(s -> instantInitTest.tryEmitValue(now()))
|
||||
.flatMap(a -> Mono.defer(() -> test.apply(a))
|
||||
.repeat(numRepeats)
|
||||
.then()
|
||||
.doOnSuccess(s -> instantEndTest.tryEmitValue(now()))
|
||||
.then(close.apply(a)))
|
||||
.doOnSuccess(s -> instantEnd.tryEmitValue(now()))
|
||||
.then(Mono.zip(instantInit.asMono(), instantInitTest.asMono(), instantEndTest.asMono(), instantEnd.asMono()))
|
||||
.doOnSuccess(tuple -> {
|
||||
System.out.println("----------------------------------------------------------------------");
|
||||
System.out.println(name);
|
||||
System.out.println(
|
||||
"\t - Executed " + DecimalFormat.getInstance(Locale.ITALY).format(numRepeats) + " times:");
|
||||
System.out.println("\t - Test time: " + DecimalFormat
|
||||
.getInstance(Locale.ITALY)
|
||||
.format(Duration.between(tuple.getT2(), tuple.getT3()).toNanos() / (double) numRepeats / (double) 1000000)
|
||||
+ "ms");
|
||||
System.out.println("\t - Test speed: " + DecimalFormat
|
||||
.getInstance(Locale.ITALY)
|
||||
.format(numRepeats / (Duration.between(tuple.getT2(), tuple.getT3()).toNanos() / (double) 1000000 / (double) 1000))
|
||||
+ " tests/s");
|
||||
System.out.println("\t - Total time: " + DecimalFormat
|
||||
.getInstance(Locale.ITALY)
|
||||
.format(Duration.between(tuple.getT2(), tuple.getT3()).toNanos() / (double) 1000000) + "ms");
|
||||
System.out.println("\t - Total time (setup+test+end): " + DecimalFormat
|
||||
.getInstance(Locale.ITALY)
|
||||
.format(Duration.between(tuple.getT1(), tuple.getT4()).toNanos() / (double) 1000000) + "ms");
|
||||
System.out.println("----------------------------------------------------------------------");
|
||||
})
|
||||
.then();
|
||||
}
|
||||
|
||||
public static Instant now() {
|
||||
return Instant.ofEpochSecond(0, System.nanoTime());
|
||||
}
|
||||
}
|
@ -3,11 +3,10 @@ package it.cavallium.dbengine.database;
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.google.common.primitives.Longs;
|
||||
import it.cavallium.dbengine.database.collections.DatabaseInt;
|
||||
import java.io.Closeable;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public interface LLKeyValueDatabase extends Closeable, LLSnapshottable, LLKeyValueDatabaseStructure {
|
||||
public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseStructure {
|
||||
|
||||
Mono<? extends LLSingleton> getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue);
|
||||
|
||||
@ -41,4 +40,6 @@ public interface LLKeyValueDatabase extends Closeable, LLSnapshottable, LLKeyVal
|
||||
}
|
||||
|
||||
Mono<Long> getProperty(String propertyName);
|
||||
|
||||
Mono<Void> close();
|
||||
}
|
||||
|
@ -67,12 +67,32 @@ public class DatabaseMapDictionary<T, U, US extends DatabaseStage<U>> implements
|
||||
return result;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public DatabaseMapDictionary(LLDictionary dictionary, SubStageGetter<U, US> subStageGetter, FixedLengthSerializer<T> keySerializer, int keyExtLength) {
|
||||
this(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, keyExtLength);
|
||||
public static <T, U> DatabaseMapDictionary<T, U, DatabaseStageEntry<U>> simple(LLDictionary dictionary,
|
||||
SubStageGetterSingle<U> subStageGetter,
|
||||
FixedLengthSerializer<T> keySerializer) {
|
||||
return new DatabaseMapDictionary<>(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, 0);
|
||||
}
|
||||
|
||||
public DatabaseMapDictionary(LLDictionary dictionary, SubStageGetter<U, US> subStageGetter, FixedLengthSerializer<T> keySuffixSerializer, byte[] prefixKey, int keyExtLength) {
|
||||
public static <T, U, US extends DatabaseStage<U>> DatabaseMapDictionary<T, U, US> deep(LLDictionary dictionary,
|
||||
SubStageGetter<U, US> subStageGetter,
|
||||
FixedLengthSerializer<T> keySerializer,
|
||||
int keyExtLength) {
|
||||
return new DatabaseMapDictionary<>(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, keyExtLength);
|
||||
}
|
||||
|
||||
public static <T, U, US extends DatabaseStage<U>> DatabaseMapDictionary<T, U, US> deepIntermediate(LLDictionary dictionary,
|
||||
SubStageGetter<U, US> subStageGetter,
|
||||
FixedLengthSerializer<T> keySuffixSerializer,
|
||||
byte[] prefixKey,
|
||||
int keyExtLength) {
|
||||
return new DatabaseMapDictionary<>(dictionary, subStageGetter, keySuffixSerializer, prefixKey, keyExtLength);
|
||||
}
|
||||
|
||||
private DatabaseMapDictionary(LLDictionary dictionary,
|
||||
SubStageGetter<U, US> subStageGetter,
|
||||
FixedLengthSerializer<T> keySuffixSerializer,
|
||||
byte[] prefixKey,
|
||||
int keyExtLength) {
|
||||
this.dictionary = dictionary;
|
||||
this.subStageGetter = subStageGetter;
|
||||
this.keySuffixSerializer = keySuffixSerializer;
|
||||
|
@ -1,5 +1,6 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import io.netty.buffer.Unpooled;
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import it.cavallium.dbengine.database.LLRange;
|
||||
@ -13,16 +14,16 @@ import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
/**
|
||||
* @deprecated Use DatabaseMapDictionary with SubStageGetterSingle
|
||||
* Optimized implementation of "DatabaseMapDictionary with SubStageGetterSingle"
|
||||
*/
|
||||
@Deprecated
|
||||
public class DatabaseMapDictionaryRange implements DatabaseStageMap<byte[], byte[], DatabaseStageEntry<byte[]>> {
|
||||
public class DatabaseMapDictionaryRange<T, U> implements DatabaseStageMap<T, U, DatabaseStageEntry<U>> {
|
||||
|
||||
public static final byte[] NO_PREFIX = new byte[0];
|
||||
private final LLDictionary dictionary;
|
||||
private final byte[] keyPrefix;
|
||||
private final int keySuffixLength;
|
||||
private final FixedLengthSerializer<T> keySuffixSerializer;
|
||||
private final LLRange range;
|
||||
private final Serializer<U> valueSerializer;
|
||||
|
||||
private static byte[] lastKey(byte[] prefixKey, int prefixLength, int suffixLength) {
|
||||
assert prefixKey.length == prefixLength;
|
||||
@ -39,21 +40,22 @@ public class DatabaseMapDictionaryRange implements DatabaseStageMap<byte[], byte
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public DatabaseMapDictionaryRange(LLDictionary dictionary, int keyLength) {
|
||||
this(dictionary, NO_PREFIX, keyLength);
|
||||
public DatabaseMapDictionaryRange(LLDictionary dictionary, FixedLengthSerializer<T> keySuffixSerializer, Serializer<U> valueSerializer) {
|
||||
this(dictionary, NO_PREFIX, keySuffixSerializer, valueSerializer);
|
||||
}
|
||||
|
||||
public DatabaseMapDictionaryRange(LLDictionary dictionary, byte[] prefixKey, int keySuffixLength) {
|
||||
public DatabaseMapDictionaryRange(LLDictionary dictionary, byte[] prefixKey, FixedLengthSerializer<T> keySuffixSerializer, Serializer<U> valueSerializer) {
|
||||
this.dictionary = dictionary;
|
||||
this.keyPrefix = prefixKey;
|
||||
this.keySuffixLength = keySuffixLength;
|
||||
byte[] firstKey = firstKey(keyPrefix, keyPrefix.length, keySuffixLength);
|
||||
byte[] lastKey = lastKey(keyPrefix, keyPrefix.length, keySuffixLength);
|
||||
this.keySuffixSerializer = keySuffixSerializer;
|
||||
this.valueSerializer = valueSerializer;
|
||||
byte[] firstKey = firstKey(keyPrefix, keyPrefix.length, keySuffixSerializer.getLength());
|
||||
byte[] lastKey = lastKey(keyPrefix, keyPrefix.length, keySuffixSerializer.getLength());
|
||||
this.range = keyPrefix.length == 0 ? LLRange.all() : LLRange.of(firstKey, lastKey);
|
||||
}
|
||||
|
||||
private boolean suffixKeyConsistency(int keySuffixLength) {
|
||||
return this.keySuffixLength == keySuffixLength;
|
||||
return this.keySuffixSerializer.getLength() == keySuffixLength;
|
||||
}
|
||||
|
||||
private byte[] toKey(byte[] suffixKey) {
|
||||
@ -76,19 +78,24 @@ public class DatabaseMapDictionaryRange implements DatabaseStageMap<byte[], byte
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Map<byte[], byte[]>> get(@Nullable CompositeSnapshot snapshot) {
|
||||
public Mono<Map<T, U>> get(@Nullable CompositeSnapshot snapshot) {
|
||||
return dictionary
|
||||
.getRange(resolveSnapshot(snapshot), range)
|
||||
.map(this::stripPrefix)
|
||||
.collectMap(Entry::getKey, Entry::getValue, HashMap::new);
|
||||
.collectMap(entry -> deserializeSuffix(entry.getKey()), entry -> deserialize(entry.getValue()), HashMap::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Map<byte[], byte[]>> setAndGetPrevious(Map<byte[], byte[]> value) {
|
||||
public Mono<Map<T, U>> setAndGetPrevious(Map<T, U> value) {
|
||||
return dictionary
|
||||
.setRange(range, Flux.fromIterable(value.entrySet()), true)
|
||||
.setRange(range,
|
||||
Flux
|
||||
.fromIterable(value.entrySet())
|
||||
.map(entry -> Map.entry(serializeSuffix(entry.getKey()), serialize(entry.getValue()))),
|
||||
true
|
||||
)
|
||||
.map(this::stripPrefix)
|
||||
.collectMap(Entry::getKey, Entry::getValue, HashMap::new);
|
||||
.collectMap(entry -> deserializeSuffix(entry.getKey()), entry -> deserialize(entry.getValue()), HashMap::new);
|
||||
}
|
||||
|
||||
private Entry<byte[], byte[]> stripPrefix(Entry<byte[], byte[]> entry) {
|
||||
@ -97,11 +104,11 @@ public class DatabaseMapDictionaryRange implements DatabaseStageMap<byte[], byte
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Map<byte[], byte[]>> clearAndGetPrevious() {
|
||||
public Mono<Map<T, U>> clearAndGetPrevious() {
|
||||
return dictionary
|
||||
.setRange(range, Flux.empty(), true)
|
||||
.map(this::stripPrefix)
|
||||
.collectMap(Entry::getKey, Entry::getValue, HashMap::new);
|
||||
.collectMap(entry -> deserializeSuffix(entry.getKey()), entry -> deserialize(entry.getValue()), HashMap::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -110,20 +117,59 @@ public class DatabaseMapDictionaryRange implements DatabaseStageMap<byte[], byte
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<DatabaseStageEntry<byte[]>> at(@Nullable CompositeSnapshot snapshot, byte[] keySuffix) {
|
||||
return Mono.just(new DatabaseSingle(dictionary, toKey(keySuffix)));
|
||||
public Mono<DatabaseStageEntry<U>> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
|
||||
return Mono.just(new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), Serializer.noopBytes())).map(entry -> new DatabaseSingleMapped<>(entry, valueSerializer));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Entry<byte[], DatabaseStageEntry<byte[]>>> getAllStages(@Nullable CompositeSnapshot snapshot) {
|
||||
public Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot) {
|
||||
return dictionary
|
||||
.getRangeKeys(resolveSnapshot(snapshot), range)
|
||||
.map(this::stripPrefix)
|
||||
.map(keySuffix -> Map.entry(keySuffix, new DatabaseSingle(dictionary, toKey(keySuffix))));
|
||||
.map(keySuffix -> Map.entry(deserializeSuffix(keySuffix),
|
||||
new DatabaseSingleMapped<>(new DatabaseSingle<>(dictionary, toKey(keySuffix), Serializer.noopBytes()),
|
||||
valueSerializer
|
||||
)
|
||||
));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Entry<byte[], byte[]>> setAllValuesAndGetPrevious(Flux<Entry<byte[], byte[]>> entries) {
|
||||
return dictionary.setRange(range, Flux.empty(), true);
|
||||
public Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) {
|
||||
var serializedEntries = entries
|
||||
.map(entry -> Map.entry(toKey(serializeSuffix(entry.getKey())), serialize(entry.getValue())));
|
||||
return dictionary.setRange(range, serializedEntries, true)
|
||||
.map(entry -> Map.entry(deserializeSuffix(entry.getKey()), deserialize(entry.getValue())));
|
||||
}
|
||||
|
||||
//todo: temporary wrapper. convert the whole class to buffers
|
||||
private U deserialize(byte[] bytes) {
|
||||
var serialized = Unpooled.wrappedBuffer(bytes);
|
||||
return valueSerializer.deserialize(serialized);
|
||||
}
|
||||
|
||||
//todo: temporary wrapper. convert the whole class to buffers
|
||||
private byte[] serialize(U bytes) {
|
||||
var output = Unpooled.buffer();
|
||||
valueSerializer.serialize(bytes, output);
|
||||
output.resetReaderIndex();
|
||||
int length = output.readableBytes();
|
||||
var outputBytes = new byte[length];
|
||||
output.getBytes(0, outputBytes, 0, length);
|
||||
return outputBytes;
|
||||
}
|
||||
|
||||
//todo: temporary wrapper. convert the whole class to buffers
|
||||
private T deserializeSuffix(byte[] keySuffix) {
|
||||
var serialized = Unpooled.wrappedBuffer(keySuffix);
|
||||
return keySuffixSerializer.deserialize(serialized);
|
||||
}
|
||||
|
||||
//todo: temporary wrapper. convert the whole class to buffers
|
||||
private byte[] serializeSuffix(T keySuffix) {
|
||||
var output = Unpooled.buffer(keySuffixSerializer.getLength(), keySuffixSerializer.getLength());
|
||||
var outputBytes = new byte[keySuffixSerializer.getLength()];
|
||||
keySuffixSerializer.serialize(keySuffix, output);
|
||||
output.getBytes(0, outputBytes, 0, keySuffixSerializer.getLength());
|
||||
return outputBytes;
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import io.netty.buffer.Unpooled;
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import it.cavallium.dbengine.database.LLDictionaryResultType;
|
||||
@ -8,14 +9,16 @@ import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class DatabaseSingle implements DatabaseStageEntry<byte[]> {
|
||||
public class DatabaseSingle<U> implements DatabaseStageEntry<U> {
|
||||
|
||||
private final LLDictionary dictionary;
|
||||
private final byte[] key;
|
||||
private final Serializer<U> serializer;
|
||||
|
||||
public DatabaseSingle(LLDictionary dictionary, byte[] key) {
|
||||
public DatabaseSingle(LLDictionary dictionary, byte[] key, Serializer<U> serializer) {
|
||||
this.dictionary = dictionary;
|
||||
this.key = key;
|
||||
this.serializer = serializer;
|
||||
}
|
||||
|
||||
private LLSnapshot resolveSnapshot(@Nullable CompositeSnapshot snapshot) {
|
||||
@ -27,18 +30,18 @@ public class DatabaseSingle implements DatabaseStageEntry<byte[]> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<byte[]> get(@Nullable CompositeSnapshot snapshot) {
|
||||
return dictionary.get(resolveSnapshot(snapshot), key);
|
||||
public Mono<U> get(@Nullable CompositeSnapshot snapshot) {
|
||||
return dictionary.get(resolveSnapshot(snapshot), key).map(this::deserialize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<byte[]> setAndGetPrevious(byte[] value) {
|
||||
return dictionary.put(key, value, LLDictionaryResultType.PREVIOUS_VALUE);
|
||||
public Mono<U> setAndGetPrevious(U value) {
|
||||
return dictionary.put(key, serialize(value), LLDictionaryResultType.PREVIOUS_VALUE).map(this::deserialize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<byte[]> clearAndGetPrevious() {
|
||||
return dictionary.remove(key, LLDictionaryResultType.PREVIOUS_VALUE);
|
||||
public Mono<U> clearAndGetPrevious() {
|
||||
return dictionary.remove(key, LLDictionaryResultType.PREVIOUS_VALUE).map(this::deserialize);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -53,4 +56,21 @@ public class DatabaseSingle implements DatabaseStageEntry<byte[]> {
|
||||
return dictionary
|
||||
.isRangeEmpty(resolveSnapshot(snapshot), LLRange.single(key));
|
||||
}
|
||||
|
||||
//todo: temporary wrapper. convert the whole class to buffers
|
||||
private U deserialize(byte[] bytes) {
|
||||
var serialized = Unpooled.wrappedBuffer(bytes);
|
||||
return serializer.deserialize(serialized);
|
||||
}
|
||||
|
||||
//todo: temporary wrapper. convert the whole class to buffers
|
||||
private byte[] serialize(U bytes) {
|
||||
var output = Unpooled.buffer();
|
||||
serializer.serialize(bytes, output);
|
||||
output.resetReaderIndex();
|
||||
int length = output.readableBytes();
|
||||
var outputBytes = new byte[length];
|
||||
output.getBytes(0, outputBytes, 0, length);
|
||||
return outputBytes;
|
||||
}
|
||||
}
|
@ -0,0 +1,94 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import io.netty.buffer.Unpooled;
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class DatabaseSingleMapped<U> implements DatabaseStageEntry<U> {
|
||||
|
||||
private final DatabaseSingle<byte[]> serializedSingle;
|
||||
private final Serializer<U> serializer;
|
||||
|
||||
public DatabaseSingleMapped(DatabaseSingle<byte[]> serializedSingle, Serializer<U> serializer) {
|
||||
this.serializedSingle = serializedSingle;
|
||||
this.serializer = serializer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<U> get(@Nullable CompositeSnapshot snapshot) {
|
||||
return serializedSingle.get(snapshot).map(this::deserialize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<U> getOrDefault(@Nullable CompositeSnapshot snapshot, Mono<U> defaultValue) {
|
||||
return serializedSingle.get(snapshot).map(this::deserialize).switchIfEmpty(defaultValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> set(U value) {
|
||||
return serializedSingle.set(serialize(value));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<U> setAndGetPrevious(U value) {
|
||||
return serializedSingle.setAndGetPrevious(serialize(value)).map(this::deserialize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Boolean> setAndGetStatus(U value) {
|
||||
return serializedSingle.setAndGetStatus(serialize(value));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> clear() {
|
||||
return serializedSingle.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<U> clearAndGetPrevious() {
|
||||
return serializedSingle.clearAndGetPrevious().map(this::deserialize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Boolean> clearAndGetStatus() {
|
||||
return serializedSingle.clearAndGetStatus();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> close() {
|
||||
return serializedSingle.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Long> size(@Nullable CompositeSnapshot snapshot, boolean fast) {
|
||||
return serializedSingle.size(snapshot, fast);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) {
|
||||
return serializedSingle.isEmpty(snapshot);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DatabaseStageEntry<U> entry() {
|
||||
return this;
|
||||
}
|
||||
|
||||
//todo: temporary wrapper. convert the whole class to buffers
|
||||
private U deserialize(byte[] bytes) {
|
||||
var serialized = Unpooled.wrappedBuffer(bytes);
|
||||
return serializer.deserialize(serialized);
|
||||
}
|
||||
|
||||
//todo: temporary wrapper. convert the whole class to buffers
|
||||
private byte[] serialize(U bytes) {
|
||||
var output = Unpooled.buffer();
|
||||
serializer.serialize(bytes, output);
|
||||
output.resetReaderIndex();
|
||||
int length = output.readableBytes();
|
||||
var outputBytes = new byte[length];
|
||||
output.getBytes(0, outputBytes, 0, length);
|
||||
return outputBytes;
|
||||
}
|
||||
}
|
@ -2,11 +2,7 @@ package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
||||
public interface FixedLengthSerializer<B> {
|
||||
|
||||
B deserialize(ByteBuf serialized);
|
||||
|
||||
void serialize(B deserialized, ByteBuf output);
|
||||
public interface FixedLengthSerializer<B> extends Serializer<B> {
|
||||
|
||||
int getLength();
|
||||
|
||||
@ -19,6 +15,7 @@ public interface FixedLengthSerializer<B> {
|
||||
|
||||
@Override
|
||||
public void serialize(ByteBuf deserialized, ByteBuf output) {
|
||||
deserialized.resetReaderIndex();
|
||||
output.writeBytes(deserialized, length);
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,41 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
||||
public interface Serializer<B> {
|
||||
|
||||
B deserialize(ByteBuf serialized);
|
||||
|
||||
void serialize(B deserialized, ByteBuf output);
|
||||
|
||||
static Serializer<ByteBuf> noop() {
|
||||
return new Serializer<>() {
|
||||
@Override
|
||||
public ByteBuf deserialize(ByteBuf serialized) {
|
||||
return serialized.readSlice(serialized.readableBytes());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serialize(ByteBuf deserialized, ByteBuf output) {
|
||||
deserialized.resetReaderIndex();
|
||||
output.writeBytes(deserialized, deserialized.readableBytes());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
static Serializer<byte[]> noopBytes() {
|
||||
return new Serializer<>() {
|
||||
@Override
|
||||
public byte[] deserialize(ByteBuf serialized) {
|
||||
var result = new byte[serialized.readableBytes()];
|
||||
serialized.readBytes(result);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serialize(byte[] deserialized, ByteBuf output) {
|
||||
output.writeBytes(deserialized);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
@ -27,7 +27,7 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
|
||||
@Nullable CompositeSnapshot snapshot,
|
||||
byte[] prefixKey,
|
||||
Flux<byte[]> keyFlux) {
|
||||
return Mono.just(new DatabaseMapDictionary<>(dictionary,
|
||||
return Mono.just(DatabaseMapDictionary.deepIntermediate(dictionary,
|
||||
subStageGetter,
|
||||
keySerializer,
|
||||
prefixKey,
|
||||
|
@ -7,19 +7,21 @@ import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class SubStageGetterMapRange implements SubStageGetter<Map<byte[], byte[]>, DatabaseStageEntry<Map<byte[], byte[]>>> {
|
||||
public class SubStageGetterMapRange<T, U> implements SubStageGetter<Map<T, U>, DatabaseStageEntry<Map<T, U>>> {
|
||||
|
||||
private final int keyLength;
|
||||
private final FixedLengthSerializer<T> keySerializer;
|
||||
private final Serializer<U> valueSerializer;
|
||||
|
||||
public SubStageGetterMapRange(int keyLength) {
|
||||
this.keyLength = keyLength;
|
||||
public SubStageGetterMapRange(FixedLengthSerializer<T> keySerializer, Serializer<U> valueSerializer) {
|
||||
this.keySerializer = keySerializer;
|
||||
this.valueSerializer = valueSerializer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<DatabaseStageEntry<Map<byte[], byte[]>>> subStage(LLDictionary dictionary,
|
||||
public Mono<DatabaseStageEntry<Map<T, U>>> subStage(LLDictionary dictionary,
|
||||
@Nullable CompositeSnapshot snapshot,
|
||||
byte[] prefixKey,
|
||||
Flux<byte[]> keyFlux) {
|
||||
return Mono.just(new DatabaseMapDictionaryRange(dictionary, prefixKey, keyLength));
|
||||
return Mono.just(new DatabaseMapDictionaryRange<>(dictionary, prefixKey, keySerializer, valueSerializer));
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import io.netty.buffer.Unpooled;
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import java.util.Arrays;
|
||||
@ -7,10 +8,16 @@ import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class SubStageGetterSingle implements SubStageGetter<byte[], DatabaseStageEntry<byte[]>> {
|
||||
public class SubStageGetterSingle<T> implements SubStageGetter<T, DatabaseStageEntry<T>> {
|
||||
|
||||
private final Serializer<T> serializer;
|
||||
|
||||
public SubStageGetterSingle(Serializer<T> serializer) {
|
||||
this.serializer = serializer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<DatabaseStageEntry<byte[]>> subStage(LLDictionary dictionary,
|
||||
public Mono<DatabaseStageEntry<T>> subStage(LLDictionary dictionary,
|
||||
@Nullable CompositeSnapshot snapshot,
|
||||
byte[] keyPrefix,
|
||||
Flux<byte[]> keyFlux) {
|
||||
@ -19,6 +26,23 @@ public class SubStageGetterSingle implements SubStageGetter<byte[], DatabaseStag
|
||||
throw new IndexOutOfBoundsException("Found more than one element!");
|
||||
}
|
||||
return null;
|
||||
})).thenReturn(new DatabaseSingle(dictionary, keyPrefix));
|
||||
})).thenReturn(new DatabaseSingle(dictionary, keyPrefix, Serializer.noopBytes()));
|
||||
}
|
||||
|
||||
//todo: temporary wrapper. convert the whole class to buffers
|
||||
private T deserialize(byte[] bytes) {
|
||||
var serialized = Unpooled.wrappedBuffer(bytes);
|
||||
return serializer.deserialize(serialized);
|
||||
}
|
||||
|
||||
//todo: temporary wrapper. convert the whole class to buffers
|
||||
private byte[] serialize(T bytes) {
|
||||
var output = Unpooled.buffer();
|
||||
serializer.serialize(bytes, output);
|
||||
output.resetReaderIndex();
|
||||
int length = output.readableBytes();
|
||||
var outputBytes = new byte[length];
|
||||
output.getBytes(0, outputBytes, 0, length);
|
||||
return outputBytes;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,8 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
public class SubStageGetterSingleBytes extends SubStageGetterSingle<byte[]> {
|
||||
|
||||
public SubStageGetterSingleBytes() {
|
||||
super(Serializer.noopBytes());
|
||||
}
|
||||
}
|
@ -349,13 +349,19 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
try {
|
||||
flushAndCloseDb(db, new ArrayList<>(handles.values()));
|
||||
deleteUnusedOldLogFiles();
|
||||
} catch (RocksDBException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
public Mono<Void> close() {
|
||||
return Mono
|
||||
.<Void>fromCallable(() -> {
|
||||
try {
|
||||
flushAndCloseDb(db, new ArrayList<>(handles.values()));
|
||||
deleteUnusedOldLogFiles();
|
||||
} catch (RocksDBException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
return null;
|
||||
})
|
||||
.onErrorMap(IOException::new)
|
||||
.subscribeOn(Schedulers.boundedElastic());
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
x
Reference in New Issue
Block a user