diff --git a/src/example/java/it.cavallium.dbengine.client/Example.java b/src/example/java/it.cavallium.dbengine.client/Example.java index 0c917dc..470672d 100644 --- a/src/example/java/it.cavallium.dbengine.client/Example.java +++ b/src/example/java/it.cavallium.dbengine.client/Example.java @@ -2,60 +2,160 @@ package it.cavallium.dbengine.client; import io.netty.buffer.Unpooled; import it.cavallium.dbengine.database.Column; +import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.collections.DatabaseMapDictionary; import it.cavallium.dbengine.database.collections.FixedLengthSerializer; -import it.cavallium.dbengine.database.collections.SubStageGetterSingle; +import it.cavallium.dbengine.database.collections.SubStageGetterSingleBytes; import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection; import java.nio.file.Path; +import java.text.DecimalFormat; import java.time.Duration; import java.time.Instant; import java.util.Arrays; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.Locale; +import java.util.function.Function; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.One; +import reactor.core.scheduler.Schedulers; +import reactor.util.function.Tuples; public class Example { + private static final boolean printPreviousValue = false; + public static void main(String[] args) { System.out.println("Test"); - var ssg = new SubStageGetterSingle(); + testAtPut(); + testPutValueAndGetPrevious(); + testPutValue() + .subscribeOn(Schedulers.parallel()) + .blockOptional(); + } + + private static Mono testAtPut() { + var ssg = new SubStageGetterSingleBytes(); var ser = FixedLengthSerializer.noop(4); - var itemKey = new byte[] {0, 1, 2, 3}; - var newValue = new byte[] {4, 5, 6, 7}; + var itemKey = new byte[]{0, 1, 2, 3}; + var newValue = new byte[]{4, 5, 6, 7}; var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey); - One instantFirst = Sinks.one(); - One instantSecond = Sinks.one(); - One instantThird = Sinks.one(); - AtomicInteger ai = new AtomicInteger(0); - new LLLocalDatabaseConnection(Path.of("/tmp/"), true) - .connect() - .flatMap(conn -> conn.getDatabase("testdb", List.of(Column.dictionary("testmap")), false)) - .flatMap(db -> db.getDictionary("testmap")) - .map(dictionary -> new DatabaseMapDictionary<>(dictionary, ssg, ser, 10)) - .flatMapMany(map -> Mono + return test("MapDictionary::at::put (same key, same value)", + tempDb() + .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ssg, ser))), + tuple -> Mono .defer(() -> Mono - .fromRunnable(() -> System.out.println("Setting new value at key " + Arrays.toString(itemKey) + ": " + Arrays.toString(newValue))) - .doOnSuccess(s -> instantFirst.tryEmitValue(Instant.now())) - .flatMap(s -> map.at(null, itemKeyBuffer)) - .doOnSuccess(s -> instantSecond.tryEmitValue(Instant.now())) - .flatMap(handle -> handle.setAndGetPrevious(newValue)) - .doOnSuccess(s -> instantThird.tryEmitValue(Instant.now())) - .doOnSuccess(oldValue -> System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue)))) - .then(Mono.zip(instantFirst.asMono(), instantSecond.asMono(), instantThird.asMono())) - .doOnSuccess(s -> { - var dur1 = Duration.between(s.getT1(), s.getT2()); - var dur2 = Duration.between(s.getT2(), s.getT3()); - var durtot = Duration.between(s.getT1(), s.getT3()); - System.out.println("Iteration " + ai.incrementAndGet()); - System.out.println("Time to get value reference: " + dur1.toMillisPart() + "ms " + dur1.toNanosPart() + "ns"); - System.out.println("Time to set new value and get previous: " + dur2.toMillisPart() + "ms " + dur2.toNanosPart() + "ns"); - System.out.println("(Total time) " + durtot.toMillisPart() + "ms " + durtot.toNanosPart() + "ns"); + .fromRunnable(() -> { + if (printPreviousValue) + System.out.println("Setting new value at key " + Arrays.toString(itemKey) + ": " + Arrays.toString(newValue)); }) - ) - .repeat(100) - ) - .blockLast(); + .then(tuple.getT2().at(null, itemKeyBuffer)) + .flatMap(handle -> handle.setAndGetPrevious(newValue)) + .doOnSuccess(oldValue -> { + if (printPreviousValue) + System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue))); + }) + ), + 100000, + tuple -> tuple.getT1().close()); + } + + private static Mono testPutValueAndGetPrevious() { + var ssg = new SubStageGetterSingleBytes(); + var ser = FixedLengthSerializer.noop(4); + var itemKey = new byte[]{0, 1, 2, 3}; + var newValue = new byte[]{4, 5, 6, 7}; + var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey); + return test("MapDictionary::putValueAndGetPrevious (same key, same value)", + tempDb() + .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ssg, ser))), + tuple -> Mono + .defer(() -> Mono + .fromRunnable(() -> { + if (printPreviousValue) + System.out.println("Setting new value at key " + Arrays.toString(itemKey) + ": " + Arrays.toString(newValue)); + }) + .then(tuple.getT2().putValueAndGetPrevious(itemKeyBuffer, newValue)) + .doOnSuccess(oldValue -> { + if (printPreviousValue) + System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue))); + }) + ), + 10000, + tuple -> tuple.getT1().close()); + } + + private static Mono testPutValue() { + var ssg = new SubStageGetterSingleBytes(); + var ser = FixedLengthSerializer.noop(4); + var itemKey = new byte[]{0, 1, 2, 3}; + var newValue = new byte[]{4, 5, 6, 7}; + var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey); + return test("MapDictionary::putValue (same key, same value)", + tempDb() + .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ssg, ser))), + tuple -> Mono + .defer(() -> Mono + .fromRunnable(() -> { + if (printPreviousValue) + System.out.println("Setting new value at key " + Arrays.toString(itemKey) + ": " + Arrays.toString(newValue)); + }) + .then(tuple.getT2().putValue(itemKeyBuffer, newValue)) + ), + 10000, + tuple -> tuple.getT1().close()); + } + + private static Mono tempDb() { + return new LLLocalDatabaseConnection(Path.of("/tmp/"), true) + .connect() + .flatMap(conn -> conn.getDatabase("testdb", List.of(Column.dictionary("testmap")), false)); + } + + public static Mono test(String name, Mono setup, Function> test, long numRepeats, Function> close) { + One instantInit = Sinks.one(); + One instantInitTest = Sinks.one(); + One instantEndTest = Sinks.one(); + One instantEnd = Sinks.one(); + return Mono + .fromRunnable(() -> instantInit.tryEmitValue(now())) + .then(setup) + .doOnSuccess(s -> instantInitTest.tryEmitValue(now())) + .flatMap(a -> Mono.defer(() -> test.apply(a)) + .repeat(numRepeats) + .then() + .doOnSuccess(s -> instantEndTest.tryEmitValue(now())) + .then(close.apply(a))) + .doOnSuccess(s -> instantEnd.tryEmitValue(now())) + .then(Mono.zip(instantInit.asMono(), instantInitTest.asMono(), instantEndTest.asMono(), instantEnd.asMono())) + .doOnSuccess(tuple -> { + System.out.println("----------------------------------------------------------------------"); + System.out.println(name); + System.out.println( + "\t - Executed " + DecimalFormat.getInstance(Locale.ITALY).format(numRepeats) + " times:"); + System.out.println("\t - Test time: " + DecimalFormat + .getInstance(Locale.ITALY) + .format(Duration.between(tuple.getT2(), tuple.getT3()).toNanos() / (double) numRepeats / (double) 1000000) + + "ms"); + System.out.println("\t - Test speed: " + DecimalFormat + .getInstance(Locale.ITALY) + .format(numRepeats / (Duration.between(tuple.getT2(), tuple.getT3()).toNanos() / (double) 1000000 / (double) 1000)) + + " tests/s"); + System.out.println("\t - Total time: " + DecimalFormat + .getInstance(Locale.ITALY) + .format(Duration.between(tuple.getT2(), tuple.getT3()).toNanos() / (double) 1000000) + "ms"); + System.out.println("\t - Total time (setup+test+end): " + DecimalFormat + .getInstance(Locale.ITALY) + .format(Duration.between(tuple.getT1(), tuple.getT4()).toNanos() / (double) 1000000) + "ms"); + System.out.println("----------------------------------------------------------------------"); + }) + .then(); + } + + public static Instant now() { + return Instant.ofEpochSecond(0, System.nanoTime()); } } \ No newline at end of file diff --git a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java index d0ef215..5814f19 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java @@ -3,11 +3,10 @@ package it.cavallium.dbengine.database; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import it.cavallium.dbengine.database.collections.DatabaseInt; -import java.io.Closeable; import java.nio.charset.StandardCharsets; import reactor.core.publisher.Mono; -public interface LLKeyValueDatabase extends Closeable, LLSnapshottable, LLKeyValueDatabaseStructure { +public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseStructure { Mono getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue); @@ -41,4 +40,6 @@ public interface LLKeyValueDatabase extends Closeable, LLSnapshottable, LLKeyVal } Mono getProperty(String propertyName); + + Mono close(); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 2d97294..9ecf3fe 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -67,12 +67,32 @@ public class DatabaseMapDictionary> implements return result; } - @SuppressWarnings("unused") - public DatabaseMapDictionary(LLDictionary dictionary, SubStageGetter subStageGetter, FixedLengthSerializer keySerializer, int keyExtLength) { - this(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, keyExtLength); + public static DatabaseMapDictionary> simple(LLDictionary dictionary, + SubStageGetterSingle subStageGetter, + FixedLengthSerializer keySerializer) { + return new DatabaseMapDictionary<>(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, 0); } - public DatabaseMapDictionary(LLDictionary dictionary, SubStageGetter subStageGetter, FixedLengthSerializer keySuffixSerializer, byte[] prefixKey, int keyExtLength) { + public static > DatabaseMapDictionary deep(LLDictionary dictionary, + SubStageGetter subStageGetter, + FixedLengthSerializer keySerializer, + int keyExtLength) { + return new DatabaseMapDictionary<>(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, keyExtLength); + } + + public static > DatabaseMapDictionary deepIntermediate(LLDictionary dictionary, + SubStageGetter subStageGetter, + FixedLengthSerializer keySuffixSerializer, + byte[] prefixKey, + int keyExtLength) { + return new DatabaseMapDictionary<>(dictionary, subStageGetter, keySuffixSerializer, prefixKey, keyExtLength); + } + + private DatabaseMapDictionary(LLDictionary dictionary, + SubStageGetter subStageGetter, + FixedLengthSerializer keySuffixSerializer, + byte[] prefixKey, + int keyExtLength) { this.dictionary = dictionary; this.subStageGetter = subStageGetter; this.keySuffixSerializer = keySuffixSerializer; diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryRange.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryRange.java index e635245..be707d4 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryRange.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryRange.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.database.collections; +import io.netty.buffer.Unpooled; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLRange; @@ -13,16 +14,16 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * @deprecated Use DatabaseMapDictionary with SubStageGetterSingle + * Optimized implementation of "DatabaseMapDictionary with SubStageGetterSingle" */ -@Deprecated -public class DatabaseMapDictionaryRange implements DatabaseStageMap> { +public class DatabaseMapDictionaryRange implements DatabaseStageMap> { public static final byte[] NO_PREFIX = new byte[0]; private final LLDictionary dictionary; private final byte[] keyPrefix; - private final int keySuffixLength; + private final FixedLengthSerializer keySuffixSerializer; private final LLRange range; + private final Serializer valueSerializer; private static byte[] lastKey(byte[] prefixKey, int prefixLength, int suffixLength) { assert prefixKey.length == prefixLength; @@ -39,21 +40,22 @@ public class DatabaseMapDictionaryRange implements DatabaseStageMap keySuffixSerializer, Serializer valueSerializer) { + this(dictionary, NO_PREFIX, keySuffixSerializer, valueSerializer); } - public DatabaseMapDictionaryRange(LLDictionary dictionary, byte[] prefixKey, int keySuffixLength) { + public DatabaseMapDictionaryRange(LLDictionary dictionary, byte[] prefixKey, FixedLengthSerializer keySuffixSerializer, Serializer valueSerializer) { this.dictionary = dictionary; this.keyPrefix = prefixKey; - this.keySuffixLength = keySuffixLength; - byte[] firstKey = firstKey(keyPrefix, keyPrefix.length, keySuffixLength); - byte[] lastKey = lastKey(keyPrefix, keyPrefix.length, keySuffixLength); + this.keySuffixSerializer = keySuffixSerializer; + this.valueSerializer = valueSerializer; + byte[] firstKey = firstKey(keyPrefix, keyPrefix.length, keySuffixSerializer.getLength()); + byte[] lastKey = lastKey(keyPrefix, keyPrefix.length, keySuffixSerializer.getLength()); this.range = keyPrefix.length == 0 ? LLRange.all() : LLRange.of(firstKey, lastKey); } private boolean suffixKeyConsistency(int keySuffixLength) { - return this.keySuffixLength == keySuffixLength; + return this.keySuffixSerializer.getLength() == keySuffixLength; } private byte[] toKey(byte[] suffixKey) { @@ -76,19 +78,24 @@ public class DatabaseMapDictionaryRange implements DatabaseStageMap> get(@Nullable CompositeSnapshot snapshot) { + public Mono> get(@Nullable CompositeSnapshot snapshot) { return dictionary .getRange(resolveSnapshot(snapshot), range) .map(this::stripPrefix) - .collectMap(Entry::getKey, Entry::getValue, HashMap::new); + .collectMap(entry -> deserializeSuffix(entry.getKey()), entry -> deserialize(entry.getValue()), HashMap::new); } @Override - public Mono> setAndGetPrevious(Map value) { + public Mono> setAndGetPrevious(Map value) { return dictionary - .setRange(range, Flux.fromIterable(value.entrySet()), true) + .setRange(range, + Flux + .fromIterable(value.entrySet()) + .map(entry -> Map.entry(serializeSuffix(entry.getKey()), serialize(entry.getValue()))), + true + ) .map(this::stripPrefix) - .collectMap(Entry::getKey, Entry::getValue, HashMap::new); + .collectMap(entry -> deserializeSuffix(entry.getKey()), entry -> deserialize(entry.getValue()), HashMap::new); } private Entry stripPrefix(Entry entry) { @@ -97,11 +104,11 @@ public class DatabaseMapDictionaryRange implements DatabaseStageMap> clearAndGetPrevious() { + public Mono> clearAndGetPrevious() { return dictionary .setRange(range, Flux.empty(), true) .map(this::stripPrefix) - .collectMap(Entry::getKey, Entry::getValue, HashMap::new); + .collectMap(entry -> deserializeSuffix(entry.getKey()), entry -> deserialize(entry.getValue()), HashMap::new); } @Override @@ -110,20 +117,59 @@ public class DatabaseMapDictionaryRange implements DatabaseStageMap> at(@Nullable CompositeSnapshot snapshot, byte[] keySuffix) { - return Mono.just(new DatabaseSingle(dictionary, toKey(keySuffix))); + public Mono> at(@Nullable CompositeSnapshot snapshot, T keySuffix) { + return Mono.just(new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), Serializer.noopBytes())).map(entry -> new DatabaseSingleMapped<>(entry, valueSerializer)); } @Override - public Flux>> getAllStages(@Nullable CompositeSnapshot snapshot) { + public Flux>> getAllStages(@Nullable CompositeSnapshot snapshot) { return dictionary .getRangeKeys(resolveSnapshot(snapshot), range) .map(this::stripPrefix) - .map(keySuffix -> Map.entry(keySuffix, new DatabaseSingle(dictionary, toKey(keySuffix)))); + .map(keySuffix -> Map.entry(deserializeSuffix(keySuffix), + new DatabaseSingleMapped<>(new DatabaseSingle<>(dictionary, toKey(keySuffix), Serializer.noopBytes()), + valueSerializer + ) + )); } @Override - public Flux> setAllValuesAndGetPrevious(Flux> entries) { - return dictionary.setRange(range, Flux.empty(), true); + public Flux> setAllValuesAndGetPrevious(Flux> entries) { + var serializedEntries = entries + .map(entry -> Map.entry(toKey(serializeSuffix(entry.getKey())), serialize(entry.getValue()))); + return dictionary.setRange(range, serializedEntries, true) + .map(entry -> Map.entry(deserializeSuffix(entry.getKey()), deserialize(entry.getValue()))); + } + + //todo: temporary wrapper. convert the whole class to buffers + private U deserialize(byte[] bytes) { + var serialized = Unpooled.wrappedBuffer(bytes); + return valueSerializer.deserialize(serialized); + } + + //todo: temporary wrapper. convert the whole class to buffers + private byte[] serialize(U bytes) { + var output = Unpooled.buffer(); + valueSerializer.serialize(bytes, output); + output.resetReaderIndex(); + int length = output.readableBytes(); + var outputBytes = new byte[length]; + output.getBytes(0, outputBytes, 0, length); + return outputBytes; + } + + //todo: temporary wrapper. convert the whole class to buffers + private T deserializeSuffix(byte[] keySuffix) { + var serialized = Unpooled.wrappedBuffer(keySuffix); + return keySuffixSerializer.deserialize(serialized); + } + + //todo: temporary wrapper. convert the whole class to buffers + private byte[] serializeSuffix(T keySuffix) { + var output = Unpooled.buffer(keySuffixSerializer.getLength(), keySuffixSerializer.getLength()); + var outputBytes = new byte[keySuffixSerializer.getLength()]; + keySuffixSerializer.serialize(keySuffix, output); + output.getBytes(0, outputBytes, 0, keySuffixSerializer.getLength()); + return outputBytes; } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java index e0da7bf..bec988b 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.database.collections; +import io.netty.buffer.Unpooled; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; @@ -8,14 +9,16 @@ import it.cavallium.dbengine.database.LLSnapshot; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; -public class DatabaseSingle implements DatabaseStageEntry { +public class DatabaseSingle implements DatabaseStageEntry { private final LLDictionary dictionary; private final byte[] key; + private final Serializer serializer; - public DatabaseSingle(LLDictionary dictionary, byte[] key) { + public DatabaseSingle(LLDictionary dictionary, byte[] key, Serializer serializer) { this.dictionary = dictionary; this.key = key; + this.serializer = serializer; } private LLSnapshot resolveSnapshot(@Nullable CompositeSnapshot snapshot) { @@ -27,18 +30,18 @@ public class DatabaseSingle implements DatabaseStageEntry { } @Override - public Mono get(@Nullable CompositeSnapshot snapshot) { - return dictionary.get(resolveSnapshot(snapshot), key); + public Mono get(@Nullable CompositeSnapshot snapshot) { + return dictionary.get(resolveSnapshot(snapshot), key).map(this::deserialize); } @Override - public Mono setAndGetPrevious(byte[] value) { - return dictionary.put(key, value, LLDictionaryResultType.PREVIOUS_VALUE); + public Mono setAndGetPrevious(U value) { + return dictionary.put(key, serialize(value), LLDictionaryResultType.PREVIOUS_VALUE).map(this::deserialize); } @Override - public Mono clearAndGetPrevious() { - return dictionary.remove(key, LLDictionaryResultType.PREVIOUS_VALUE); + public Mono clearAndGetPrevious() { + return dictionary.remove(key, LLDictionaryResultType.PREVIOUS_VALUE).map(this::deserialize); } @Override @@ -53,4 +56,21 @@ public class DatabaseSingle implements DatabaseStageEntry { return dictionary .isRangeEmpty(resolveSnapshot(snapshot), LLRange.single(key)); } + + //todo: temporary wrapper. convert the whole class to buffers + private U deserialize(byte[] bytes) { + var serialized = Unpooled.wrappedBuffer(bytes); + return serializer.deserialize(serialized); + } + + //todo: temporary wrapper. convert the whole class to buffers + private byte[] serialize(U bytes) { + var output = Unpooled.buffer(); + serializer.serialize(bytes, output); + output.resetReaderIndex(); + int length = output.readableBytes(); + var outputBytes = new byte[length]; + output.getBytes(0, outputBytes, 0, length); + return outputBytes; + } } \ No newline at end of file diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java new file mode 100644 index 0000000..e550945 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java @@ -0,0 +1,94 @@ +package it.cavallium.dbengine.database.collections; + +import io.netty.buffer.Unpooled; +import it.cavallium.dbengine.client.CompositeSnapshot; +import org.jetbrains.annotations.Nullable; +import reactor.core.publisher.Mono; + +public class DatabaseSingleMapped implements DatabaseStageEntry { + + private final DatabaseSingle serializedSingle; + private final Serializer serializer; + + public DatabaseSingleMapped(DatabaseSingle serializedSingle, Serializer serializer) { + this.serializedSingle = serializedSingle; + this.serializer = serializer; + } + + @Override + public Mono get(@Nullable CompositeSnapshot snapshot) { + return serializedSingle.get(snapshot).map(this::deserialize); + } + + @Override + public Mono getOrDefault(@Nullable CompositeSnapshot snapshot, Mono defaultValue) { + return serializedSingle.get(snapshot).map(this::deserialize).switchIfEmpty(defaultValue); + } + + @Override + public Mono set(U value) { + return serializedSingle.set(serialize(value)); + } + + @Override + public Mono setAndGetPrevious(U value) { + return serializedSingle.setAndGetPrevious(serialize(value)).map(this::deserialize); + } + + @Override + public Mono setAndGetStatus(U value) { + return serializedSingle.setAndGetStatus(serialize(value)); + } + + @Override + public Mono clear() { + return serializedSingle.clear(); + } + + @Override + public Mono clearAndGetPrevious() { + return serializedSingle.clearAndGetPrevious().map(this::deserialize); + } + + @Override + public Mono clearAndGetStatus() { + return serializedSingle.clearAndGetStatus(); + } + + @Override + public Mono close() { + return serializedSingle.close(); + } + + @Override + public Mono size(@Nullable CompositeSnapshot snapshot, boolean fast) { + return serializedSingle.size(snapshot, fast); + } + + @Override + public Mono isEmpty(@Nullable CompositeSnapshot snapshot) { + return serializedSingle.isEmpty(snapshot); + } + + @Override + public DatabaseStageEntry entry() { + return this; + } + + //todo: temporary wrapper. convert the whole class to buffers + private U deserialize(byte[] bytes) { + var serialized = Unpooled.wrappedBuffer(bytes); + return serializer.deserialize(serialized); + } + + //todo: temporary wrapper. convert the whole class to buffers + private byte[] serialize(U bytes) { + var output = Unpooled.buffer(); + serializer.serialize(bytes, output); + output.resetReaderIndex(); + int length = output.readableBytes(); + var outputBytes = new byte[length]; + output.getBytes(0, outputBytes, 0, length); + return outputBytes; + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/FixedLengthSerializer.java b/src/main/java/it/cavallium/dbengine/database/collections/FixedLengthSerializer.java index 81b0c36..6fd53d0 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/FixedLengthSerializer.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/FixedLengthSerializer.java @@ -2,11 +2,7 @@ package it.cavallium.dbengine.database.collections; import io.netty.buffer.ByteBuf; -public interface FixedLengthSerializer { - - B deserialize(ByteBuf serialized); - - void serialize(B deserialized, ByteBuf output); +public interface FixedLengthSerializer extends Serializer { int getLength(); @@ -19,6 +15,7 @@ public interface FixedLengthSerializer { @Override public void serialize(ByteBuf deserialized, ByteBuf output) { + deserialized.resetReaderIndex(); output.writeBytes(deserialized, length); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/Serializer.java b/src/main/java/it/cavallium/dbengine/database/collections/Serializer.java new file mode 100644 index 0000000..b605581 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/collections/Serializer.java @@ -0,0 +1,41 @@ +package it.cavallium.dbengine.database.collections; + +import io.netty.buffer.ByteBuf; + +public interface Serializer { + + B deserialize(ByteBuf serialized); + + void serialize(B deserialized, ByteBuf output); + + static Serializer noop() { + return new Serializer<>() { + @Override + public ByteBuf deserialize(ByteBuf serialized) { + return serialized.readSlice(serialized.readableBytes()); + } + + @Override + public void serialize(ByteBuf deserialized, ByteBuf output) { + deserialized.resetReaderIndex(); + output.writeBytes(deserialized, deserialized.readableBytes()); + } + }; + } + + static Serializer noopBytes() { + return new Serializer<>() { + @Override + public byte[] deserialize(ByteBuf serialized) { + var result = new byte[serialized.readableBytes()]; + serialized.readBytes(result); + return result; + } + + @Override + public void serialize(byte[] deserialized, ByteBuf output) { + output.writeBytes(deserialized); + } + }; + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java index db8ea82..49edfb0 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java @@ -27,7 +27,7 @@ public class SubStageGetterMapDeep> implements @Nullable CompositeSnapshot snapshot, byte[] prefixKey, Flux keyFlux) { - return Mono.just(new DatabaseMapDictionary<>(dictionary, + return Mono.just(DatabaseMapDictionary.deepIntermediate(dictionary, subStageGetter, keySerializer, prefixKey, diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapRange.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapRange.java index db4960c..8d94c17 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapRange.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapRange.java @@ -7,19 +7,21 @@ import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -public class SubStageGetterMapRange implements SubStageGetter, DatabaseStageEntry>> { +public class SubStageGetterMapRange implements SubStageGetter, DatabaseStageEntry>> { - private final int keyLength; + private final FixedLengthSerializer keySerializer; + private final Serializer valueSerializer; - public SubStageGetterMapRange(int keyLength) { - this.keyLength = keyLength; + public SubStageGetterMapRange(FixedLengthSerializer keySerializer, Serializer valueSerializer) { + this.keySerializer = keySerializer; + this.valueSerializer = valueSerializer; } @Override - public Mono>> subStage(LLDictionary dictionary, + public Mono>> subStage(LLDictionary dictionary, @Nullable CompositeSnapshot snapshot, byte[] prefixKey, Flux keyFlux) { - return Mono.just(new DatabaseMapDictionaryRange(dictionary, prefixKey, keyLength)); + return Mono.just(new DatabaseMapDictionaryRange<>(dictionary, prefixKey, keySerializer, valueSerializer)); } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java index 5fda63c..8e3759e 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.database.collections; +import io.netty.buffer.Unpooled; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; import java.util.Arrays; @@ -7,10 +8,16 @@ import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -public class SubStageGetterSingle implements SubStageGetter> { +public class SubStageGetterSingle implements SubStageGetter> { + + private final Serializer serializer; + + public SubStageGetterSingle(Serializer serializer) { + this.serializer = serializer; + } @Override - public Mono> subStage(LLDictionary dictionary, + public Mono> subStage(LLDictionary dictionary, @Nullable CompositeSnapshot snapshot, byte[] keyPrefix, Flux keyFlux) { @@ -19,6 +26,23 @@ public class SubStageGetterSingle implements SubStageGetter { + + public SubStageGetterSingleBytes() { + super(Serializer.noopBytes()); + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 2d527b7..5a72172 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -349,13 +349,19 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } @Override - public void close() throws IOException { - try { - flushAndCloseDb(db, new ArrayList<>(handles.values())); - deleteUnusedOldLogFiles(); - } catch (RocksDBException e) { - throw new IOException(e); - } + public Mono close() { + return Mono + .fromCallable(() -> { + try { + flushAndCloseDb(db, new ArrayList<>(handles.values())); + deleteUnusedOldLogFiles(); + } catch (RocksDBException e) { + throw new IOException(e); + } + return null; + }) + .onErrorMap(IOException::new) + .subscribeOn(Schedulers.boundedElastic()); } /**