Add codecs
This commit is contained in:
parent
ae5cda2f70
commit
82accc2405
173
src/example/java/it.cavallium.dbengine.client/CodecsExample.java
Normal file
173
src/example/java/it.cavallium.dbengine.client/CodecsExample.java
Normal file
@ -0,0 +1,173 @@
|
||||
package it.cavallium.dbengine.client;
|
||||
|
||||
import io.netty.buffer.ByteBufInputStream;
|
||||
import io.netty.buffer.ByteBufOutputStream;
|
||||
import it.cavallium.dbengine.database.Column;
|
||||
import it.cavallium.dbengine.database.LLKeyValueDatabase;
|
||||
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
|
||||
import it.cavallium.dbengine.database.collections.SubStageGetterSingle;
|
||||
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
|
||||
import it.cavallium.dbengine.database.serialization.Codec;
|
||||
import it.cavallium.dbengine.database.serialization.CodecSerializer;
|
||||
import it.cavallium.dbengine.database.serialization.Codecs;
|
||||
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.StringJoiner;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
import reactor.util.function.Tuples;
|
||||
|
||||
public class CodecsExample {
|
||||
|
||||
public static void main(String[] args) {
|
||||
var oldCodec = new OldCustomTypeCodec();
|
||||
var oldCodecs = new Codecs<OldCustomType>();
|
||||
oldCodecs.registerCodec(1, oldCodec);
|
||||
var oldSerializer = new CodecSerializer<>(oldCodecs, oldCodec, 1, true);
|
||||
var oldSsg = new SubStageGetterSingle<>(oldSerializer);
|
||||
|
||||
var newCodec = new NewCustomTypeCodecV2();
|
||||
var newCodecs = new Codecs<CurrentCustomType>();
|
||||
newCodecs.registerCodec(1, new NewCustomTypeCodecV1());
|
||||
newCodecs.registerCodec(2, newCodec);
|
||||
var newSerializer = new CodecSerializer<>(newCodecs, newCodec, 2, true);
|
||||
var newSsg = new SubStageGetterSingle<>(newSerializer);
|
||||
|
||||
tempDb(true)
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, SerializerFixedBinaryLength.longSerializer(), oldSsg)))
|
||||
.flatMap(tuple -> {
|
||||
var oldValue = new OldCustomType(155);
|
||||
System.out.println("Writing to disk old value with codec id 1: " + oldValue);
|
||||
|
||||
return tuple.getT2().putValue(15L, oldValue).then(tuple.getT1().close());
|
||||
})
|
||||
.then(tempDb(false))
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, SerializerFixedBinaryLength.longSerializer(), newSsg)))
|
||||
.flatMap(tuple -> {
|
||||
System.out.println("Reading from disk current value with any codec id...");
|
||||
return tuple.getT2().getValue(null, 15L).doOnSuccess(s -> {
|
||||
if (s == null) {
|
||||
System.err.println("No value found for key 15");
|
||||
} else {
|
||||
System.out.println("Current value read successfully: " + s);
|
||||
}
|
||||
}).then(tuple.getT1().close());
|
||||
})
|
||||
.subscribeOn(Schedulers.parallel())
|
||||
.blockOptional();
|
||||
}
|
||||
|
||||
private static class OldCustomTypeCodec implements Codec<OldCustomType> {
|
||||
|
||||
@Override
|
||||
public @NotNull OldCustomType deserialize(@NotNull ByteBufInputStream serialized) throws IOException {
|
||||
return new OldCustomType(serialized.readUTF());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serialize(@NotNull ByteBufOutputStream outputStream, @NotNull OldCustomType deserialized) throws IOException {
|
||||
outputStream.writeUTF(deserialized.number);
|
||||
}
|
||||
}
|
||||
|
||||
private static class NewCustomTypeCodecV1 implements Codec<CurrentCustomType> {
|
||||
|
||||
@Override
|
||||
public @NotNull CurrentCustomType deserialize(@NotNull ByteBufInputStream serialized) throws IOException {
|
||||
return new CurrentCustomType(Integer.parseInt(serialized.readUTF()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serialize(@NotNull ByteBufOutputStream outputStream, @NotNull CurrentCustomType deserialized) throws IOException {
|
||||
throw new UnsupportedOperationException("Can't serialize with an old version");
|
||||
}
|
||||
}
|
||||
|
||||
private static class NewCustomTypeCodecV2 implements Codec<CurrentCustomType> {
|
||||
|
||||
@Override
|
||||
public @NotNull CurrentCustomType deserialize(@NotNull ByteBufInputStream serialized) throws IOException {
|
||||
return new CurrentCustomType(serialized.readInt());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serialize(@NotNull ByteBufOutputStream outputStream, @NotNull CurrentCustomType deserialized) throws IOException {
|
||||
outputStream.writeInt(deserialized.number);
|
||||
}
|
||||
}
|
||||
|
||||
public static final class OldCustomType {
|
||||
|
||||
private final String number;
|
||||
|
||||
public OldCustomType(int number) {
|
||||
this.number = "" + number;
|
||||
}
|
||||
|
||||
public OldCustomType(String readUTF) {
|
||||
this.number = readUTF;
|
||||
}
|
||||
|
||||
public String getNumber() {
|
||||
return number;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new StringJoiner(", ", OldCustomType.class.getSimpleName() + "[", "]")
|
||||
.add("number='" + number + "'")
|
||||
.toString();
|
||||
}
|
||||
}
|
||||
|
||||
public static final class CurrentCustomType {
|
||||
|
||||
private final int number;
|
||||
|
||||
public CurrentCustomType(int number) {
|
||||
this.number = number;
|
||||
}
|
||||
|
||||
public int getNumber() {
|
||||
return number;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new StringJoiner(", ", CurrentCustomType.class.getSimpleName() + "[", "]")
|
||||
.add("number=" + number)
|
||||
.toString();
|
||||
}
|
||||
}
|
||||
|
||||
private static <U> Mono<? extends LLKeyValueDatabase> tempDb(boolean delete) {
|
||||
var wrkspcPath = Path.of("/tmp/tempdb/");
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
if (delete && Files.exists(wrkspcPath)) {
|
||||
Files.walk(wrkspcPath)
|
||||
.sorted(Comparator.reverseOrder())
|
||||
.forEach(file -> {
|
||||
try {
|
||||
Files.delete(file);
|
||||
} catch (IOException ex) {
|
||||
throw new CompletionException(ex);
|
||||
}
|
||||
});
|
||||
}
|
||||
Files.createDirectories(wrkspcPath);
|
||||
return null;
|
||||
})
|
||||
.subscribeOn(Schedulers.boundedElastic())
|
||||
.then(new LLLocalDatabaseConnection(wrkspcPath, true).connect())
|
||||
.flatMap(conn -> conn.getDatabase("testdb", List.of(Column.dictionary("testmap")), false));
|
||||
}
|
||||
}
|
@ -9,8 +9,8 @@ import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
|
||||
import it.cavallium.dbengine.database.collections.DatabaseStageEntry;
|
||||
import it.cavallium.dbengine.database.collections.DatabaseStageMap;
|
||||
import it.cavallium.dbengine.database.collections.QueryableBuilder;
|
||||
import it.cavallium.dbengine.database.collections.Serializer;
|
||||
import it.cavallium.dbengine.database.collections.SerializerFixedBinaryLength;
|
||||
import it.cavallium.dbengine.database.serialization.Serializer;
|
||||
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
|
||||
import it.cavallium.dbengine.database.collections.SubStageGetterMap;
|
||||
import it.cavallium.dbengine.database.collections.SubStageGetterMapDeep;
|
||||
import it.cavallium.dbengine.database.collections.SubStageGetterSingleBytes;
|
||||
@ -36,7 +36,7 @@ import reactor.core.publisher.Sinks.One;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
import reactor.util.function.Tuples;
|
||||
|
||||
public class Example {
|
||||
public class SpeedExample {
|
||||
|
||||
public static final boolean printPreviousValue = false;
|
||||
private static final int numRepeats = 1000;
|
||||
@ -81,7 +81,7 @@ public class Example {
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> {
|
||||
var builder = new QueryableBuilder(2);
|
||||
return builder.wrap(DatabaseMapDictionaryDeep.simple(dict, builder.tail(ssg, ser), builder.serializer()));
|
||||
return builder.wrap(DatabaseMapDictionaryDeep.simple(dict, builder.serializer(), builder.tail(ssg, ser)));
|
||||
})),
|
||||
tuple -> Flux.range(0, batchSize).flatMap(n -> Mono
|
||||
.defer(() -> Mono
|
||||
@ -109,7 +109,11 @@ public class Example {
|
||||
return test("2 level put",
|
||||
tempDb()
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.deepTail(dict, ssg, k1ser, ssg.getKeyBinaryLength()))),
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.deepTail(dict,
|
||||
k1ser,
|
||||
ssg.getKeyBinaryLength(),
|
||||
ssg
|
||||
))),
|
||||
tuple -> Flux.range(0, batchSize).flatMap(n -> {
|
||||
var itemKey1 = Ints.toByteArray(n / 4);
|
||||
var itemKey2 = Ints.toByteArray(n);
|
||||
@ -147,7 +151,7 @@ public class Example {
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> {
|
||||
return DatabaseMapDictionaryDeep
|
||||
.deepTail(dict, ssg2, k1ser, ssg2.getKeyBinaryLength());
|
||||
.deepTail(dict, k1ser, ssg2.getKeyBinaryLength(), ssg2);
|
||||
})),
|
||||
tuple -> Flux.range(0, batchSize).flatMap(n -> {
|
||||
var itemKey1 = Ints.toByteArray(n / 4);
|
||||
@ -188,7 +192,7 @@ public class Example {
|
||||
tempDb()
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep
|
||||
.deepTail(dict, ssg2, k1ser, ssg2.getKeyBinaryLength()))),
|
||||
.deepTail(dict, k1ser, ssg2.getKeyBinaryLength(), ssg2))),
|
||||
tuple -> Flux.range(0, batchSize).flatMap(n -> {
|
||||
var itemKey1 = Ints.toByteArray(n / 4);
|
||||
var itemKey2 = Longs.toByteArray(n);
|
||||
@ -225,7 +229,7 @@ public class Example {
|
||||
return test("MapDictionaryDeep::at::put (same key, same value, " + batchSize + " times)",
|
||||
tempDb()
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ssg, ser))),
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ser, ssg))),
|
||||
tuple -> Flux.range(0, batchSize).flatMap(n -> Mono
|
||||
.defer(() -> Mono
|
||||
.fromRunnable(() -> {
|
||||
@ -252,7 +256,7 @@ public class Example {
|
||||
return test("MapDictionaryDeep::putValueAndGetPrevious (same key, same value, " + batchSize + " times)",
|
||||
tempDb()
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ssg, ser))),
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ser, ssg))),
|
||||
tuple -> Flux.range(0, batchSize).flatMap(n -> Mono
|
||||
.defer(() -> Mono
|
||||
.fromRunnable(() -> {
|
||||
@ -278,7 +282,7 @@ public class Example {
|
||||
return test("MapDictionaryDeep::putValue (same key, same value, " + batchSize + " times)",
|
||||
tempDb()
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ssg, ser))),
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ser, ssg))),
|
||||
tuple -> Flux.range(0, batchSize).flatMap(n -> Mono
|
||||
.defer(() -> Mono
|
||||
.fromRunnable(() -> {
|
||||
@ -303,7 +307,7 @@ public class Example {
|
||||
return test("MapDictionaryDeep::putMulti (batch of " + batchSize + " entries)",
|
||||
tempDb()
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ssg, ser))),
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ser, ssg))),
|
||||
tuple -> Mono.defer(() -> tuple.getT2().putMulti(putMultiFlux)),
|
||||
numRepeats,
|
||||
tuple -> Mono
|
@ -4,6 +4,8 @@ import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import it.cavallium.dbengine.database.LLDictionaryResultType;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.serialization.Serializer;
|
||||
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
@ -23,7 +25,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
byte[] prefixKey,
|
||||
SerializerFixedBinaryLength<T, byte[]> keySuffixSerializer,
|
||||
Serializer<U, byte[]> valueSerializer) {
|
||||
super(dictionary, new SubStageGetterSingle<>(valueSerializer), keySuffixSerializer, prefixKey, 0);
|
||||
super(dictionary, prefixKey, keySuffixSerializer, new SubStageGetterSingle<>(valueSerializer), 0);
|
||||
this.valueSerializer = valueSerializer;
|
||||
}
|
||||
|
||||
@ -34,9 +36,9 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
}
|
||||
|
||||
public static <T, U> DatabaseMapDictionary<T, U> tail(LLDictionary dictionary,
|
||||
byte[] prefixKey,
|
||||
SerializerFixedBinaryLength<T, byte[]> keySuffixSerializer,
|
||||
Serializer<U, byte[]> valueSerializer,
|
||||
byte[] prefixKey) {
|
||||
Serializer<U, byte[]> valueSerializer) {
|
||||
return new DatabaseMapDictionary<>(dictionary, prefixKey, keySuffixSerializer, valueSerializer);
|
||||
}
|
||||
|
||||
|
@ -4,6 +4,7 @@ import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import it.cavallium.dbengine.database.LLRange;
|
||||
import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
@ -82,34 +83,31 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
|
||||
* Use DatabaseMapDictionaryRange.simple instead
|
||||
*/
|
||||
@Deprecated
|
||||
public static <T, U> DatabaseMapDictionaryDeep<T, U, DatabaseStageEntry<U>> simple(
|
||||
LLDictionary dictionary,
|
||||
SubStageGetterSingle<U> subStageGetter,
|
||||
SerializerFixedBinaryLength<T, byte[]> keySerializer) {
|
||||
return new DatabaseMapDictionaryDeep<>(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, 0);
|
||||
}
|
||||
|
||||
public static <T, U, US extends DatabaseStage<U>> DatabaseMapDictionaryDeep<T, U, US> deepTail(
|
||||
LLDictionary dictionary,
|
||||
SubStageGetter<U, US> subStageGetter,
|
||||
public static <T, U> DatabaseMapDictionaryDeep<T, U, DatabaseStageEntry<U>> simple(LLDictionary dictionary,
|
||||
SerializerFixedBinaryLength<T, byte[]> keySerializer,
|
||||
int keyExtLength) {
|
||||
return new DatabaseMapDictionaryDeep<>(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, keyExtLength);
|
||||
SubStageGetterSingle<U> subStageGetter) {
|
||||
return new DatabaseMapDictionaryDeep<>(dictionary, EMPTY_BYTES, keySerializer, subStageGetter, 0);
|
||||
}
|
||||
|
||||
public static <T, U, US extends DatabaseStage<U>> DatabaseMapDictionaryDeep<T, U, US> deepIntermediate(
|
||||
LLDictionary dictionary,
|
||||
SubStageGetter<U, US> subStageGetter,
|
||||
SerializerFixedBinaryLength<T, byte[]> keySuffixSerializer,
|
||||
public static <T, U, US extends DatabaseStage<U>> DatabaseMapDictionaryDeep<T, U, US> deepTail(LLDictionary dictionary,
|
||||
SerializerFixedBinaryLength<T, byte[]> keySerializer,
|
||||
int keyExtLength,
|
||||
SubStageGetter<U, US> subStageGetter) {
|
||||
return new DatabaseMapDictionaryDeep<>(dictionary, EMPTY_BYTES, keySerializer, subStageGetter, keyExtLength);
|
||||
}
|
||||
|
||||
public static <T, U, US extends DatabaseStage<U>> DatabaseMapDictionaryDeep<T, U, US> deepIntermediate(LLDictionary dictionary,
|
||||
byte[] prefixKey,
|
||||
SerializerFixedBinaryLength<T, byte[]> keySuffixSerializer,
|
||||
SubStageGetter<U, US> subStageGetter,
|
||||
int keyExtLength) {
|
||||
return new DatabaseMapDictionaryDeep<>(dictionary, subStageGetter, keySuffixSerializer, prefixKey, keyExtLength);
|
||||
return new DatabaseMapDictionaryDeep<>(dictionary, prefixKey, keySuffixSerializer, subStageGetter, keyExtLength);
|
||||
}
|
||||
|
||||
protected DatabaseMapDictionaryDeep(LLDictionary dictionary,
|
||||
SubStageGetter<U, US> subStageGetter,
|
||||
SerializerFixedBinaryLength<T, byte[]> keySuffixSerializer,
|
||||
byte[] prefixKey,
|
||||
SerializerFixedBinaryLength<T, byte[]> keySuffixSerializer,
|
||||
SubStageGetter<U, US> subStageGetter,
|
||||
int keyExtLength) {
|
||||
this.dictionary = dictionary;
|
||||
this.subStageGetter = subStageGetter;
|
||||
|
@ -5,6 +5,7 @@ import it.cavallium.dbengine.database.LLDictionary;
|
||||
import it.cavallium.dbengine.database.LLDictionaryResultType;
|
||||
import it.cavallium.dbengine.database.LLRange;
|
||||
import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import it.cavallium.dbengine.database.serialization.Serializer;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.serialization.Serializer;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
|
@ -1,5 +1,7 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
|
||||
|
||||
public class QueryableBuilder {
|
||||
|
||||
public QueryableBuilder(int stagesNumber) {
|
||||
|
@ -1,22 +0,0 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
public interface Serializer<A, B> {
|
||||
|
||||
A deserialize(B serialized);
|
||||
|
||||
B serialize(A deserialized);
|
||||
|
||||
static Serializer<byte[], byte[]> noop() {
|
||||
return new Serializer<>() {
|
||||
@Override
|
||||
public byte[] deserialize(byte[] serialized) {
|
||||
return serialized;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] serialize(byte[] deserialized) {
|
||||
return deserialized;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
@ -1,27 +0,0 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
public interface SerializerFixedBinaryLength<A, B> extends Serializer<A, B> {
|
||||
|
||||
int getSerializedBinaryLength();
|
||||
|
||||
static SerializerFixedBinaryLength<byte[], byte[]> noop(int length) {
|
||||
return new SerializerFixedBinaryLength<>() {
|
||||
@Override
|
||||
public byte[] deserialize(byte[] serialized) {
|
||||
assert serialized.length == getSerializedBinaryLength();
|
||||
return serialized;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] serialize(byte[] deserialized) {
|
||||
assert deserialized.length == getSerializedBinaryLength();
|
||||
return deserialized;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getSerializedBinaryLength() {
|
||||
return length;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
@ -2,6 +2,8 @@ package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import it.cavallium.dbengine.database.serialization.Serializer;
|
||||
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
|
||||
import java.util.Map;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
@ -32,10 +34,8 @@ public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, Databa
|
||||
@Nullable CompositeSnapshot snapshot,
|
||||
byte[] prefixKey,
|
||||
Flux<byte[]> keyFlux) {
|
||||
Mono<DatabaseMapDictionary<T, U>> result = Mono.just(DatabaseMapDictionary.tail(dictionary,
|
||||
keySerializer,
|
||||
valueSerializer,
|
||||
prefixKey
|
||||
Mono<DatabaseMapDictionary<T, U>> result = Mono.just(DatabaseMapDictionary.tail(dictionary, prefixKey, keySerializer,
|
||||
valueSerializer
|
||||
));
|
||||
if (assertsEnabled) {
|
||||
return checkKeyFluxConsistency(prefixKey, keyFlux).then(result);
|
||||
|
@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
|
||||
import java.util.Map;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
@ -48,9 +49,9 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
|
||||
byte[] prefixKey,
|
||||
Flux<byte[]> keyFlux) {
|
||||
Mono<DatabaseMapDictionaryDeep<T, U, US>> result = Mono.just(DatabaseMapDictionaryDeep.deepIntermediate(dictionary,
|
||||
subStageGetter,
|
||||
keySerializer,
|
||||
prefixKey,
|
||||
keySerializer,
|
||||
subStageGetter,
|
||||
keyExtLength
|
||||
));
|
||||
if (assertsEnabled) {
|
||||
|
@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import it.cavallium.dbengine.database.serialization.Serializer;
|
||||
import java.util.Arrays;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
@ -1,5 +1,7 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import it.cavallium.dbengine.database.serialization.Serializer;
|
||||
|
||||
public class SubStageGetterSingleBytes extends SubStageGetterSingle<byte[]> {
|
||||
|
||||
public SubStageGetterSingleBytes() {
|
||||
|
@ -0,0 +1,13 @@
|
||||
package it.cavallium.dbengine.database.serialization;
|
||||
|
||||
import io.netty.buffer.ByteBufInputStream;
|
||||
import io.netty.buffer.ByteBufOutputStream;
|
||||
import java.io.IOException;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
public interface Codec<A> {
|
||||
|
||||
@NotNull A deserialize(@NotNull ByteBufInputStream serialized) throws IOException;
|
||||
|
||||
void serialize(@NotNull ByteBufOutputStream outputStream, @NotNull A deserialized) throws IOException;
|
||||
}
|
@ -0,0 +1,72 @@
|
||||
package it.cavallium.dbengine.database.serialization;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufInputStream;
|
||||
import io.netty.buffer.ByteBufOutputStream;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import java.io.IOException;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.warp.commonutils.error.IndexOutOfBoundsException;
|
||||
|
||||
public class CodecSerializer<A> implements Serializer<A, byte[]> {
|
||||
|
||||
private final Codecs<A> deserializationCodecs;
|
||||
private final Codec<A> serializationCodec;
|
||||
private final int serializationCodecId;
|
||||
private final boolean microCodecs;
|
||||
|
||||
/**
|
||||
*
|
||||
* @param microCodecs if true, allow only codecs with a value from 0 to 255 to save disk space
|
||||
*/
|
||||
public CodecSerializer(Codecs<A> deserializationCodecs,
|
||||
Codec<A> serializationCodec,
|
||||
int serializationCodecId,
|
||||
boolean microCodecs) {
|
||||
this.deserializationCodecs = deserializationCodecs;
|
||||
this.serializationCodec = serializationCodec;
|
||||
this.serializationCodecId = serializationCodecId;
|
||||
this.microCodecs = microCodecs;
|
||||
if (microCodecs && (serializationCodecId > 255 || serializationCodecId < 0)) {
|
||||
throw new IndexOutOfBoundsException(serializationCodecId, 0, 255);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public @NotNull A deserialize(byte @NotNull [] serialized) {
|
||||
ByteBuf buf = Unpooled.wrappedBuffer(serialized);
|
||||
try (var is = new ByteBufInputStream(buf)) {
|
||||
int codecId;
|
||||
if (microCodecs) {
|
||||
codecId = is.readUnsignedByte();
|
||||
} else {
|
||||
codecId = is.readInt();
|
||||
}
|
||||
var serializer = deserializationCodecs.getCodec(codecId);
|
||||
return serializer.deserialize(is);
|
||||
} catch (IOException ex) {
|
||||
// This shouldn't happen
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte @NotNull [] serialize(@NotNull A deserialized) {
|
||||
ByteBuf buf = Unpooled.buffer(256);
|
||||
try (var os = new ByteBufOutputStream(buf)) {
|
||||
if (microCodecs) {
|
||||
os.writeByte(serializationCodecId);
|
||||
} else {
|
||||
os.writeInt(serializationCodecId);
|
||||
}
|
||||
serializationCodec.serialize(os, deserialized);
|
||||
os.flush();
|
||||
var bytes = new byte[buf.readableBytes()];
|
||||
buf.readBytes(bytes);
|
||||
return bytes;
|
||||
} catch (IOException ex) {
|
||||
// This shouldn't happen
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
package it.cavallium.dbengine.database.serialization;
|
||||
|
||||
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
|
||||
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
|
||||
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
|
||||
|
||||
@NotAtomic
|
||||
public class Codecs<A> {
|
||||
|
||||
private final Int2ObjectMap<Codec<A>> codecs;
|
||||
|
||||
public Codecs() {
|
||||
codecs = new Int2ObjectOpenHashMap<>();
|
||||
}
|
||||
|
||||
public void registerCodec(int id, Codec<A> serializer) {
|
||||
if (codecs.put(id, serializer) != null) {
|
||||
throw new IllegalArgumentException("Codec " + id + " already registered!");
|
||||
}
|
||||
}
|
||||
|
||||
public Codec<A> getCodec(int id) {
|
||||
var codec = codecs.get(id);
|
||||
if (codec == null) {
|
||||
throw new UnsupportedOperationException("Unsupported codec " + id);
|
||||
}
|
||||
return codec;
|
||||
}
|
||||
}
|
@ -0,0 +1,24 @@
|
||||
package it.cavallium.dbengine.database.serialization;
|
||||
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
public interface Serializer<A, B> {
|
||||
|
||||
@NotNull A deserialize(@NotNull B serialized);
|
||||
|
||||
@NotNull B serialize(@NotNull A deserialized);
|
||||
|
||||
static Serializer<byte[], byte[]> noop() {
|
||||
return new Serializer<>() {
|
||||
@Override
|
||||
public byte @NotNull [] deserialize(byte @NotNull [] serialized) {
|
||||
return serialized;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte @NotNull [] serialize(byte @NotNull [] deserialized) {
|
||||
return deserialized;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
@ -0,0 +1,71 @@
|
||||
package it.cavallium.dbengine.database.serialization;
|
||||
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.google.common.primitives.Longs;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
public interface SerializerFixedBinaryLength<A, B> extends Serializer<A, B> {
|
||||
|
||||
int getSerializedBinaryLength();
|
||||
|
||||
static SerializerFixedBinaryLength<byte[], byte[]> noop(int length) {
|
||||
return new SerializerFixedBinaryLength<>() {
|
||||
@Override
|
||||
public byte @NotNull [] deserialize(byte @NotNull [] serialized) {
|
||||
assert serialized.length == getSerializedBinaryLength();
|
||||
return serialized;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte @NotNull [] serialize(byte @NotNull [] deserialized) {
|
||||
assert deserialized.length == getSerializedBinaryLength();
|
||||
return deserialized;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getSerializedBinaryLength() {
|
||||
return length;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
static SerializerFixedBinaryLength<Integer, byte[]> intSerializer() {
|
||||
return new SerializerFixedBinaryLength<>() {
|
||||
@Override
|
||||
public @NotNull Integer deserialize(byte @NotNull [] serialized) {
|
||||
assert serialized.length == getSerializedBinaryLength();
|
||||
return Ints.fromByteArray(serialized);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte @NotNull [] serialize(@NotNull Integer deserialized) {
|
||||
return Ints.toByteArray(deserialized);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getSerializedBinaryLength() {
|
||||
return Integer.BYTES;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
static SerializerFixedBinaryLength<Long, byte[]> longSerializer() {
|
||||
return new SerializerFixedBinaryLength<>() {
|
||||
@Override
|
||||
public @NotNull Long deserialize(byte @NotNull [] serialized) {
|
||||
assert serialized.length == getSerializedBinaryLength();
|
||||
return Longs.fromByteArray(serialized);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte @NotNull [] serialize(@NotNull Long deserialized) {
|
||||
return Longs.toByteArray(deserialized);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getSerializedBinaryLength() {
|
||||
return Long.BYTES;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user