CavalliumDBEngine/src/example/java/it.cavallium.dbengine.client/CodecsExample.java
2022-07-02 11:44:13 +02:00

253 lines
8.3 KiB
Java

package it.cavallium.dbengine.client;
import com.google.common.primitives.Ints;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
import it.cavallium.dbengine.database.collections.SubStageGetterSingle;
import it.cavallium.dbengine.database.collections.SubStageGetterSingleBytes;
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
import it.cavallium.dbengine.database.serialization.Codec;
import it.cavallium.dbengine.database.serialization.CodecSerializer;
import it.cavallium.dbengine.database.serialization.CodecsArray;
import it.cavallium.dbengine.database.serialization.CodecsStandard;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.List;
import java.util.StringJoiner;
import java.util.concurrent.CompletionException;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuples;
public class CodecsExample {
public static void main(String[] args) {
testConversionSpeed();
}
private static void testConversionSpeed() {
SpeedExample.test("No-Op Conversion",
tempDb(true)
.flatMap(db -> db.getDictionary("testmap", UpdateMode.DISALLOW).map(dict -> Tuples.of(db, dict)))
.map(tuple -> tuple.mapT2(dict -> Tuples.of(DatabaseMapDictionaryDeep.simple(dict,
SerializerFixedBinaryLength.longSerializer(),
new SubStageGetterSingleBytes()
),
DatabaseMapDictionaryDeep.simple(dict,
SerializerFixedBinaryLength.longSerializer(),
new SubStageGetterSingleBytes()
)
))),
tuple -> Flux
.range(0, SpeedExample.batchSize)
.flatMap(n -> tuple
.getT2()
.getT1()
.putValue(n * 7L, Ints.toByteArray(n * 3))
.then(tuple.getT2().getT2().getValue(null, n * 7L)))
.then(),
SpeedExample.numRepeats,
tuple -> tuple.getT1().close()
).then(
SpeedExample.test("Conversion",
tempDb(true)
.flatMap(db -> db.getDictionary("testmap", UpdateMode.DISALLOW).map(dict -> Tuples.of(db, dict)))
.map(tuple -> tuple.mapT2(dict -> Tuples.of(DatabaseMapDictionaryDeep.simple(dict,
SerializerFixedBinaryLength.longSerializer(),
getOldSubStageGetter()
),
DatabaseMapDictionaryDeep.simple(dict,
SerializerFixedBinaryLength.longSerializer(),
getNewSubStageGetter()
)
))),
tuple -> Flux
.range(0, SpeedExample.batchSize)
.flatMap(n -> tuple
.getT2()
.getT1()
.putValue(n * 7L, new OldCustomType(n * 3))
.then(tuple.getT2().getT2().getValue(null, n * 7L)))
.then(),
SpeedExample.numRepeats,
tuple -> tuple.getT1().close()
)).transform(LLUtils::handleDiscard).subscribeOn(Schedulers.parallel()).blockOptional();
}
private static void testConversion() {
writeOld()
.then()
.then(readNew())
.subscribeOn(Schedulers.parallel())
.transform(LLUtils::handleDiscard)
.blockOptional();
}
private static SubStageGetterSingle<CurrentCustomType> getNewSubStageGetter() {
var newCodec = new NewCustomTypeCodecV2();
var newCodecs = new CodecsStandard<CurrentCustomType>();
newCodecs.registerCodec(1, new NewCustomTypeCodecV1());
newCodecs.registerCodec(2, newCodec);
var newSerializer = new CodecSerializer<>(newCodecs, newCodec, 2, true);
return new SubStageGetterSingle<>(newSerializer);
}
private static SubStageGetterSingle<OldCustomType> getOldSubStageGetter() {
var oldCodec = new OldCustomTypeCodec();
var oldCodecs = new CodecsArray<OldCustomType>();
oldCodecs.registerCodec(1, oldCodec);
var oldSerializer = new CodecSerializer<>(oldCodecs, oldCodec, 1, true);
return new SubStageGetterSingle<>(oldSerializer);
}
private static Mono<Void> readNew() {
return tempDb(false)
.flatMap(db -> db.getDictionary("testmap", UpdateMode.DISALLOW).map(dict -> Tuples.of(db, dict)))
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict,
SerializerFixedBinaryLength.longSerializer(),
getNewSubStageGetter()
)))
.flatMap(tuple -> {
System.out.println("Reading from disk current value with any codec id...");
return tuple.getT2().getValue(null, 15L).doOnSuccess(s -> {
if (s == null) {
System.err.println("No value found for key 15");
} else {
System.out.println("Current value read successfully: " + s);
}
}).then(tuple.getT1().close());
});
}
private static Mono<Void> writeOld() {
return tempDb(true)
.flatMap(db -> db.getDictionary("testmap", UpdateMode.DISALLOW).map(dict -> Tuples.of(db, dict)))
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict,
SerializerFixedBinaryLength.longSerializer(),
getOldSubStageGetter()
)))
.flatMap(tuple -> {
var oldValue = new OldCustomType(155);
System.out.println("Writing to disk old value with codec id 1: " + oldValue);
return tuple.getT2().putValue(15L, oldValue).then(tuple.getT1().close());
});
}
private static class OldCustomTypeCodec implements Codec<OldCustomType> {
@Override
public @NotNull OldCustomType deserialize(@NotNull ByteBufInputStream serialized) throws IOException {
return new OldCustomType(serialized.readUTF());
}
@Override
public void serialize(@NotNull ByteBufOutputStream outputStream, @NotNull OldCustomType deserialized) throws IOException {
outputStream.writeUTF(deserialized.number);
}
}
private static class NewCustomTypeCodecV1 implements Codec<CurrentCustomType> {
@Override
public @NotNull CurrentCustomType deserialize(@NotNull ByteBufInputStream serialized) throws IOException {
return new CurrentCustomType(Integer.parseInt(serialized.readUTF()));
}
@Override
public void serialize(@NotNull ByteBufOutputStream outputStream, @NotNull CurrentCustomType deserialized) throws IOException {
throw new UnsupportedOperationException("Can't serialize with an old version");
}
}
private static class NewCustomTypeCodecV2 implements Codec<CurrentCustomType> {
@Override
public @NotNull CurrentCustomType deserialize(@NotNull ByteBufInputStream serialized) throws IOException {
return new CurrentCustomType(serialized.readInt());
}
@Override
public void serialize(@NotNull ByteBufOutputStream outputStream, @NotNull CurrentCustomType deserialized) throws IOException {
outputStream.writeInt(deserialized.number);
}
}
public static final class OldCustomType {
private final String number;
public OldCustomType(int number) {
this.number = "" + number;
}
public OldCustomType(String readUTF) {
this.number = readUTF;
}
public String getNumber() {
return number;
}
@Override
public String toString() {
return new StringJoiner(", ", OldCustomType.class.getSimpleName() + "[", "]")
.add("number='" + number + "'")
.toString();
}
}
public static final class CurrentCustomType {
private final int number;
public CurrentCustomType(int number) {
this.number = number;
}
public int getNumber() {
return number;
}
@Override
public String toString() {
return new StringJoiner(", ", CurrentCustomType.class.getSimpleName() + "[", "]")
.add("number=" + number)
.toString();
}
}
private static <U> Mono<? extends LLKeyValueDatabase> tempDb(boolean delete) {
var wrkspcPath = Path.of("/tmp/tempdb/");
return Mono
.fromCallable(() -> {
if (delete && Files.exists(wrkspcPath)) {
Files.walk(wrkspcPath)
.sorted(Comparator.reverseOrder())
.forEach(file -> {
try {
Files.delete(file);
} catch (IOException ex) {
throw new CompletionException(ex);
}
});
}
Files.createDirectories(wrkspcPath);
return null;
})
.subscribeOn(Schedulers.boundedElastic())
.then(new LLLocalDatabaseConnection(wrkspcPath, true).connect())
.flatMap(conn -> conn.getDatabase("testdb", List.of(Column.dictionary("testmap")), false));
}
}