diff --git a/src/example/java/it.cavallium.dbengine.client/CodecsExample.java b/src/example/java/it.cavallium.dbengine.client/CodecsExample.java new file mode 100644 index 0000000..fc7e6d6 --- /dev/null +++ b/src/example/java/it.cavallium.dbengine.client/CodecsExample.java @@ -0,0 +1,173 @@ +package it.cavallium.dbengine.client; + +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.ByteBufOutputStream; +import it.cavallium.dbengine.database.Column; +import it.cavallium.dbengine.database.LLKeyValueDatabase; +import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep; +import it.cavallium.dbengine.database.collections.SubStageGetterSingle; +import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection; +import it.cavallium.dbengine.database.serialization.Codec; +import it.cavallium.dbengine.database.serialization.CodecSerializer; +import it.cavallium.dbengine.database.serialization.Codecs; +import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; +import java.util.List; +import java.util.StringJoiner; +import java.util.concurrent.CompletionException; +import org.jetbrains.annotations.NotNull; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.util.function.Tuples; + +public class CodecsExample { + + public static void main(String[] args) { + var oldCodec = new OldCustomTypeCodec(); + var oldCodecs = new Codecs(); + oldCodecs.registerCodec(1, oldCodec); + var oldSerializer = new CodecSerializer<>(oldCodecs, oldCodec, 1, true); + var oldSsg = new SubStageGetterSingle<>(oldSerializer); + + var newCodec = new NewCustomTypeCodecV2(); + var newCodecs = new Codecs(); + newCodecs.registerCodec(1, new NewCustomTypeCodecV1()); + newCodecs.registerCodec(2, newCodec); + var newSerializer = new CodecSerializer<>(newCodecs, newCodec, 2, true); + var newSsg = new SubStageGetterSingle<>(newSerializer); + + tempDb(true) + .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, SerializerFixedBinaryLength.longSerializer(), oldSsg))) + .flatMap(tuple -> { + var oldValue = new OldCustomType(155); + System.out.println("Writing to disk old value with codec id 1: " + oldValue); + + return tuple.getT2().putValue(15L, oldValue).then(tuple.getT1().close()); + }) + .then(tempDb(false)) + .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, SerializerFixedBinaryLength.longSerializer(), newSsg))) + .flatMap(tuple -> { + System.out.println("Reading from disk current value with any codec id..."); + return tuple.getT2().getValue(null, 15L).doOnSuccess(s -> { + if (s == null) { + System.err.println("No value found for key 15"); + } else { + System.out.println("Current value read successfully: " + s); + } + }).then(tuple.getT1().close()); + }) + .subscribeOn(Schedulers.parallel()) + .blockOptional(); + } + + private static class OldCustomTypeCodec implements Codec { + + @Override + public @NotNull OldCustomType deserialize(@NotNull ByteBufInputStream serialized) throws IOException { + return new OldCustomType(serialized.readUTF()); + } + + @Override + public void serialize(@NotNull ByteBufOutputStream outputStream, @NotNull OldCustomType deserialized) throws IOException { + outputStream.writeUTF(deserialized.number); + } + } + + private static class NewCustomTypeCodecV1 implements Codec { + + @Override + public @NotNull CurrentCustomType deserialize(@NotNull ByteBufInputStream serialized) throws IOException { + return new CurrentCustomType(Integer.parseInt(serialized.readUTF())); + } + + @Override + public void serialize(@NotNull ByteBufOutputStream outputStream, @NotNull CurrentCustomType deserialized) throws IOException { + throw new UnsupportedOperationException("Can't serialize with an old version"); + } + } + + private static class NewCustomTypeCodecV2 implements Codec { + + @Override + public @NotNull CurrentCustomType deserialize(@NotNull ByteBufInputStream serialized) throws IOException { + return new CurrentCustomType(serialized.readInt()); + } + + @Override + public void serialize(@NotNull ByteBufOutputStream outputStream, @NotNull CurrentCustomType deserialized) throws IOException { + outputStream.writeInt(deserialized.number); + } + } + + public static final class OldCustomType { + + private final String number; + + public OldCustomType(int number) { + this.number = "" + number; + } + + public OldCustomType(String readUTF) { + this.number = readUTF; + } + + public String getNumber() { + return number; + } + + @Override + public String toString() { + return new StringJoiner(", ", OldCustomType.class.getSimpleName() + "[", "]") + .add("number='" + number + "'") + .toString(); + } + } + + public static final class CurrentCustomType { + + private final int number; + + public CurrentCustomType(int number) { + this.number = number; + } + + public int getNumber() { + return number; + } + + @Override + public String toString() { + return new StringJoiner(", ", CurrentCustomType.class.getSimpleName() + "[", "]") + .add("number=" + number) + .toString(); + } + } + + private static Mono tempDb(boolean delete) { + var wrkspcPath = Path.of("/tmp/tempdb/"); + return Mono + .fromCallable(() -> { + if (delete && Files.exists(wrkspcPath)) { + Files.walk(wrkspcPath) + .sorted(Comparator.reverseOrder()) + .forEach(file -> { + try { + Files.delete(file); + } catch (IOException ex) { + throw new CompletionException(ex); + } + }); + } + Files.createDirectories(wrkspcPath); + return null; + }) + .subscribeOn(Schedulers.boundedElastic()) + .then(new LLLocalDatabaseConnection(wrkspcPath, true).connect()) + .flatMap(conn -> conn.getDatabase("testdb", List.of(Column.dictionary("testmap")), false)); + } +} diff --git a/src/example/java/it.cavallium.dbengine.client/Example.java b/src/example/java/it.cavallium.dbengine.client/SpeedExample.java similarity index 97% rename from src/example/java/it.cavallium.dbengine.client/Example.java rename to src/example/java/it.cavallium.dbengine.client/SpeedExample.java index 56ef1db..1e58bb3 100644 --- a/src/example/java/it.cavallium.dbengine.client/Example.java +++ b/src/example/java/it.cavallium.dbengine.client/SpeedExample.java @@ -9,8 +9,8 @@ import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep; import it.cavallium.dbengine.database.collections.DatabaseStageEntry; import it.cavallium.dbengine.database.collections.DatabaseStageMap; import it.cavallium.dbengine.database.collections.QueryableBuilder; -import it.cavallium.dbengine.database.collections.Serializer; -import it.cavallium.dbengine.database.collections.SerializerFixedBinaryLength; +import it.cavallium.dbengine.database.serialization.Serializer; +import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import it.cavallium.dbengine.database.collections.SubStageGetterMap; import it.cavallium.dbengine.database.collections.SubStageGetterMapDeep; import it.cavallium.dbengine.database.collections.SubStageGetterSingleBytes; @@ -36,7 +36,7 @@ import reactor.core.publisher.Sinks.One; import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuples; -public class Example { +public class SpeedExample { public static final boolean printPreviousValue = false; private static final int numRepeats = 1000; @@ -81,7 +81,7 @@ public class Example { .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) .map(tuple -> tuple.mapT2(dict -> { var builder = new QueryableBuilder(2); - return builder.wrap(DatabaseMapDictionaryDeep.simple(dict, builder.tail(ssg, ser), builder.serializer())); + return builder.wrap(DatabaseMapDictionaryDeep.simple(dict, builder.serializer(), builder.tail(ssg, ser))); })), tuple -> Flux.range(0, batchSize).flatMap(n -> Mono .defer(() -> Mono @@ -109,7 +109,11 @@ public class Example { return test("2 level put", tempDb() .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) - .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.deepTail(dict, ssg, k1ser, ssg.getKeyBinaryLength()))), + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.deepTail(dict, + k1ser, + ssg.getKeyBinaryLength(), + ssg + ))), tuple -> Flux.range(0, batchSize).flatMap(n -> { var itemKey1 = Ints.toByteArray(n / 4); var itemKey2 = Ints.toByteArray(n); @@ -147,7 +151,7 @@ public class Example { .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) .map(tuple -> tuple.mapT2(dict -> { return DatabaseMapDictionaryDeep - .deepTail(dict, ssg2, k1ser, ssg2.getKeyBinaryLength()); + .deepTail(dict, k1ser, ssg2.getKeyBinaryLength(), ssg2); })), tuple -> Flux.range(0, batchSize).flatMap(n -> { var itemKey1 = Ints.toByteArray(n / 4); @@ -188,7 +192,7 @@ public class Example { tempDb() .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep - .deepTail(dict, ssg2, k1ser, ssg2.getKeyBinaryLength()))), + .deepTail(dict, k1ser, ssg2.getKeyBinaryLength(), ssg2))), tuple -> Flux.range(0, batchSize).flatMap(n -> { var itemKey1 = Ints.toByteArray(n / 4); var itemKey2 = Longs.toByteArray(n); @@ -225,7 +229,7 @@ public class Example { return test("MapDictionaryDeep::at::put (same key, same value, " + batchSize + " times)", tempDb() .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) - .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ssg, ser))), + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ser, ssg))), tuple -> Flux.range(0, batchSize).flatMap(n -> Mono .defer(() -> Mono .fromRunnable(() -> { @@ -252,7 +256,7 @@ public class Example { return test("MapDictionaryDeep::putValueAndGetPrevious (same key, same value, " + batchSize + " times)", tempDb() .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) - .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ssg, ser))), + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ser, ssg))), tuple -> Flux.range(0, batchSize).flatMap(n -> Mono .defer(() -> Mono .fromRunnable(() -> { @@ -278,7 +282,7 @@ public class Example { return test("MapDictionaryDeep::putValue (same key, same value, " + batchSize + " times)", tempDb() .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) - .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ssg, ser))), + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ser, ssg))), tuple -> Flux.range(0, batchSize).flatMap(n -> Mono .defer(() -> Mono .fromRunnable(() -> { @@ -303,7 +307,7 @@ public class Example { return test("MapDictionaryDeep::putMulti (batch of " + batchSize + " entries)", tempDb() .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) - .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ssg, ser))), + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ser, ssg))), tuple -> Mono.defer(() -> tuple.getT2().putMulti(putMultiFlux)), numRepeats, tuple -> Mono diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 9a684f9..1467132 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -4,6 +4,8 @@ import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.serialization.Serializer; +import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -23,7 +25,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep keySuffixSerializer, Serializer valueSerializer) { - super(dictionary, new SubStageGetterSingle<>(valueSerializer), keySuffixSerializer, prefixKey, 0); + super(dictionary, prefixKey, keySuffixSerializer, new SubStageGetterSingle<>(valueSerializer), 0); this.valueSerializer = valueSerializer; } @@ -34,9 +36,9 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep DatabaseMapDictionary tail(LLDictionary dictionary, + byte[] prefixKey, SerializerFixedBinaryLength keySuffixSerializer, - Serializer valueSerializer, - byte[] prefixKey) { + Serializer valueSerializer) { return new DatabaseMapDictionary<>(dictionary, prefixKey, keySuffixSerializer, valueSerializer); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index 4cab9e0..47e8deb 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -4,6 +4,7 @@ import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; +import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import java.util.Arrays; import java.util.Map; import java.util.Map.Entry; @@ -82,34 +83,31 @@ public class DatabaseMapDictionaryDeep> implem * Use DatabaseMapDictionaryRange.simple instead */ @Deprecated - public static DatabaseMapDictionaryDeep> simple( - LLDictionary dictionary, - SubStageGetterSingle subStageGetter, - SerializerFixedBinaryLength keySerializer) { - return new DatabaseMapDictionaryDeep<>(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, 0); - } - - public static > DatabaseMapDictionaryDeep deepTail( - LLDictionary dictionary, - SubStageGetter subStageGetter, + public static DatabaseMapDictionaryDeep> simple(LLDictionary dictionary, SerializerFixedBinaryLength keySerializer, - int keyExtLength) { - return new DatabaseMapDictionaryDeep<>(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, keyExtLength); + SubStageGetterSingle subStageGetter) { + return new DatabaseMapDictionaryDeep<>(dictionary, EMPTY_BYTES, keySerializer, subStageGetter, 0); } - public static > DatabaseMapDictionaryDeep deepIntermediate( - LLDictionary dictionary, - SubStageGetter subStageGetter, - SerializerFixedBinaryLength keySuffixSerializer, + public static > DatabaseMapDictionaryDeep deepTail(LLDictionary dictionary, + SerializerFixedBinaryLength keySerializer, + int keyExtLength, + SubStageGetter subStageGetter) { + return new DatabaseMapDictionaryDeep<>(dictionary, EMPTY_BYTES, keySerializer, subStageGetter, keyExtLength); + } + + public static > DatabaseMapDictionaryDeep deepIntermediate(LLDictionary dictionary, byte[] prefixKey, + SerializerFixedBinaryLength keySuffixSerializer, + SubStageGetter subStageGetter, int keyExtLength) { - return new DatabaseMapDictionaryDeep<>(dictionary, subStageGetter, keySuffixSerializer, prefixKey, keyExtLength); + return new DatabaseMapDictionaryDeep<>(dictionary, prefixKey, keySuffixSerializer, subStageGetter, keyExtLength); } protected DatabaseMapDictionaryDeep(LLDictionary dictionary, - SubStageGetter subStageGetter, - SerializerFixedBinaryLength keySuffixSerializer, byte[] prefixKey, + SerializerFixedBinaryLength keySuffixSerializer, + SubStageGetter subStageGetter, int keyExtLength) { this.dictionary = dictionary; this.subStageGetter = subStageGetter; diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java index e9f5ca3..bcac399 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java @@ -5,6 +5,7 @@ import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; +import it.cavallium.dbengine.database.serialization.Serializer; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java index b8f8b39..ea50d7b 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java @@ -1,6 +1,7 @@ package it.cavallium.dbengine.database.collections; import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.database.serialization.Serializer; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; diff --git a/src/main/java/it/cavallium/dbengine/database/collections/QueryableBuilder.java b/src/main/java/it/cavallium/dbengine/database/collections/QueryableBuilder.java index 7f57683..b4f6fe1 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/QueryableBuilder.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/QueryableBuilder.java @@ -1,5 +1,7 @@ package it.cavallium.dbengine.database.collections; +import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; + public class QueryableBuilder { public QueryableBuilder(int stagesNumber) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/Serializer.java b/src/main/java/it/cavallium/dbengine/database/collections/Serializer.java deleted file mode 100644 index e4cfd8a..0000000 --- a/src/main/java/it/cavallium/dbengine/database/collections/Serializer.java +++ /dev/null @@ -1,22 +0,0 @@ -package it.cavallium.dbengine.database.collections; - -public interface Serializer { - - A deserialize(B serialized); - - B serialize(A deserialized); - - static Serializer noop() { - return new Serializer<>() { - @Override - public byte[] deserialize(byte[] serialized) { - return serialized; - } - - @Override - public byte[] serialize(byte[] deserialized) { - return deserialized; - } - }; - } -} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SerializerFixedBinaryLength.java b/src/main/java/it/cavallium/dbengine/database/collections/SerializerFixedBinaryLength.java deleted file mode 100644 index e2d09b8..0000000 --- a/src/main/java/it/cavallium/dbengine/database/collections/SerializerFixedBinaryLength.java +++ /dev/null @@ -1,27 +0,0 @@ -package it.cavallium.dbengine.database.collections; - -public interface SerializerFixedBinaryLength extends Serializer { - - int getSerializedBinaryLength(); - - static SerializerFixedBinaryLength noop(int length) { - return new SerializerFixedBinaryLength<>() { - @Override - public byte[] deserialize(byte[] serialized) { - assert serialized.length == getSerializedBinaryLength(); - return serialized; - } - - @Override - public byte[] serialize(byte[] deserialized) { - assert deserialized.length == getSerializedBinaryLength(); - return deserialized; - } - - @Override - public int getSerializedBinaryLength() { - return length; - } - }; - } -} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java index ab3e209..bd948a0 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java @@ -2,6 +2,8 @@ package it.cavallium.dbengine.database.collections; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; +import it.cavallium.dbengine.database.serialization.Serializer; +import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import java.util.Map; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; @@ -32,10 +34,8 @@ public class SubStageGetterMap implements SubStageGetter, Databa @Nullable CompositeSnapshot snapshot, byte[] prefixKey, Flux keyFlux) { - Mono> result = Mono.just(DatabaseMapDictionary.tail(dictionary, - keySerializer, - valueSerializer, - prefixKey + Mono> result = Mono.just(DatabaseMapDictionary.tail(dictionary, prefixKey, keySerializer, + valueSerializer )); if (assertsEnabled) { return checkKeyFluxConsistency(prefixKey, keyFlux).then(result); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java index 392b23e..2786e1e 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java @@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.collections; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; +import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import java.util.Map; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; @@ -48,9 +49,9 @@ public class SubStageGetterMapDeep> implements byte[] prefixKey, Flux keyFlux) { Mono> result = Mono.just(DatabaseMapDictionaryDeep.deepIntermediate(dictionary, - subStageGetter, - keySerializer, prefixKey, + keySerializer, + subStageGetter, keyExtLength )); if (assertsEnabled) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java index 6a90686..5e2f2af 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java @@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.collections; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; +import it.cavallium.dbengine.database.serialization.Serializer; import java.util.Arrays; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingleBytes.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingleBytes.java index e0dfc35..6fcc4a3 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingleBytes.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingleBytes.java @@ -1,5 +1,7 @@ package it.cavallium.dbengine.database.collections; +import it.cavallium.dbengine.database.serialization.Serializer; + public class SubStageGetterSingleBytes extends SubStageGetterSingle { public SubStageGetterSingleBytes() { diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/Codec.java b/src/main/java/it/cavallium/dbengine/database/serialization/Codec.java new file mode 100644 index 0000000..46119e8 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/serialization/Codec.java @@ -0,0 +1,13 @@ +package it.cavallium.dbengine.database.serialization; + +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.ByteBufOutputStream; +import java.io.IOException; +import org.jetbrains.annotations.NotNull; + +public interface Codec { + + @NotNull A deserialize(@NotNull ByteBufInputStream serialized) throws IOException; + + void serialize(@NotNull ByteBufOutputStream outputStream, @NotNull A deserialized) throws IOException; +} diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/CodecSerializer.java b/src/main/java/it/cavallium/dbengine/database/serialization/CodecSerializer.java new file mode 100644 index 0000000..71dcf77 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/serialization/CodecSerializer.java @@ -0,0 +1,72 @@ +package it.cavallium.dbengine.database.serialization; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.buffer.Unpooled; +import java.io.IOException; +import org.jetbrains.annotations.NotNull; +import org.warp.commonutils.error.IndexOutOfBoundsException; + +public class CodecSerializer implements Serializer { + + private final Codecs deserializationCodecs; + private final Codec serializationCodec; + private final int serializationCodecId; + private final boolean microCodecs; + + /** + * + * @param microCodecs if true, allow only codecs with a value from 0 to 255 to save disk space + */ + public CodecSerializer(Codecs deserializationCodecs, + Codec serializationCodec, + int serializationCodecId, + boolean microCodecs) { + this.deserializationCodecs = deserializationCodecs; + this.serializationCodec = serializationCodec; + this.serializationCodecId = serializationCodecId; + this.microCodecs = microCodecs; + if (microCodecs && (serializationCodecId > 255 || serializationCodecId < 0)) { + throw new IndexOutOfBoundsException(serializationCodecId, 0, 255); + } + } + + @Override + public @NotNull A deserialize(byte @NotNull [] serialized) { + ByteBuf buf = Unpooled.wrappedBuffer(serialized); + try (var is = new ByteBufInputStream(buf)) { + int codecId; + if (microCodecs) { + codecId = is.readUnsignedByte(); + } else { + codecId = is.readInt(); + } + var serializer = deserializationCodecs.getCodec(codecId); + return serializer.deserialize(is); + } catch (IOException ex) { + // This shouldn't happen + throw new RuntimeException(ex); + } + } + + @Override + public byte @NotNull [] serialize(@NotNull A deserialized) { + ByteBuf buf = Unpooled.buffer(256); + try (var os = new ByteBufOutputStream(buf)) { + if (microCodecs) { + os.writeByte(serializationCodecId); + } else { + os.writeInt(serializationCodecId); + } + serializationCodec.serialize(os, deserialized); + os.flush(); + var bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + return bytes; + } catch (IOException ex) { + // This shouldn't happen + throw new RuntimeException(ex); + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/Codecs.java b/src/main/java/it/cavallium/dbengine/database/serialization/Codecs.java new file mode 100644 index 0000000..08ffe3f --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/serialization/Codecs.java @@ -0,0 +1,29 @@ +package it.cavallium.dbengine.database.serialization; + +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import org.warp.commonutils.concurrency.atomicity.NotAtomic; + +@NotAtomic +public class Codecs { + + private final Int2ObjectMap> codecs; + + public Codecs() { + codecs = new Int2ObjectOpenHashMap<>(); + } + + public void registerCodec(int id, Codec serializer) { + if (codecs.put(id, serializer) != null) { + throw new IllegalArgumentException("Codec " + id + " already registered!"); + } + } + + public Codec getCodec(int id) { + var codec = codecs.get(id); + if (codec == null) { + throw new UnsupportedOperationException("Unsupported codec " + id); + } + return codec; + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java b/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java new file mode 100644 index 0000000..bbca58f --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java @@ -0,0 +1,24 @@ +package it.cavallium.dbengine.database.serialization; + +import org.jetbrains.annotations.NotNull; + +public interface Serializer { + + @NotNull A deserialize(@NotNull B serialized); + + @NotNull B serialize(@NotNull A deserialized); + + static Serializer noop() { + return new Serializer<>() { + @Override + public byte @NotNull [] deserialize(byte @NotNull [] serialized) { + return serialized; + } + + @Override + public byte @NotNull [] serialize(byte @NotNull [] deserialized) { + return deserialized; + } + }; + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/SerializerFixedBinaryLength.java b/src/main/java/it/cavallium/dbengine/database/serialization/SerializerFixedBinaryLength.java new file mode 100644 index 0000000..a751619 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/serialization/SerializerFixedBinaryLength.java @@ -0,0 +1,71 @@ +package it.cavallium.dbengine.database.serialization; + +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import org.jetbrains.annotations.NotNull; + +public interface SerializerFixedBinaryLength extends Serializer { + + int getSerializedBinaryLength(); + + static SerializerFixedBinaryLength noop(int length) { + return new SerializerFixedBinaryLength<>() { + @Override + public byte @NotNull [] deserialize(byte @NotNull [] serialized) { + assert serialized.length == getSerializedBinaryLength(); + return serialized; + } + + @Override + public byte @NotNull [] serialize(byte @NotNull [] deserialized) { + assert deserialized.length == getSerializedBinaryLength(); + return deserialized; + } + + @Override + public int getSerializedBinaryLength() { + return length; + } + }; + } + + static SerializerFixedBinaryLength intSerializer() { + return new SerializerFixedBinaryLength<>() { + @Override + public @NotNull Integer deserialize(byte @NotNull [] serialized) { + assert serialized.length == getSerializedBinaryLength(); + return Ints.fromByteArray(serialized); + } + + @Override + public byte @NotNull [] serialize(@NotNull Integer deserialized) { + return Ints.toByteArray(deserialized); + } + + @Override + public int getSerializedBinaryLength() { + return Integer.BYTES; + } + }; + } + + static SerializerFixedBinaryLength longSerializer() { + return new SerializerFixedBinaryLength<>() { + @Override + public @NotNull Long deserialize(byte @NotNull [] serialized) { + assert serialized.length == getSerializedBinaryLength(); + return Longs.fromByteArray(serialized); + } + + @Override + public byte @NotNull [] serialize(@NotNull Long deserialized) { + return Longs.toByteArray(deserialized); + } + + @Override + public int getSerializedBinaryLength() { + return Long.BYTES; + } + }; + } +}