From 591b424a76e1c90bfcbd4af5e80f4f8039d93636 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 2 Feb 2021 20:08:22 +0100 Subject: [PATCH] Update CodecsExample.java and SpeedExample.java --- .../CodecsExample.java | 79 ++++++++++++++++--- .../SpeedExample.java | 25 ++---- 2 files changed, 76 insertions(+), 28 deletions(-) diff --git a/src/example/java/it.cavallium.dbengine.client/CodecsExample.java b/src/example/java/it.cavallium.dbengine.client/CodecsExample.java index e71e180..812bb15 100644 --- a/src/example/java/it.cavallium.dbengine.client/CodecsExample.java +++ b/src/example/java/it.cavallium.dbengine.client/CodecsExample.java @@ -1,11 +1,13 @@ package it.cavallium.dbengine.client; +import com.google.common.primitives.Ints; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.ByteBufOutputStream; import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep; import it.cavallium.dbengine.database.collections.SubStageGetterSingle; +import it.cavallium.dbengine.database.collections.SubStageGetterSingleBytes; import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection; import it.cavallium.dbengine.database.serialization.Codec; import it.cavallium.dbengine.database.serialization.CodecSerializer; @@ -19,6 +21,7 @@ import java.util.List; import java.util.StringJoiner; import java.util.concurrent.CompletionException; import org.jetbrains.annotations.NotNull; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuples; @@ -26,22 +29,85 @@ import reactor.util.function.Tuples; public class CodecsExample { public static void main(String[] args) { + testConversionSpeed(); + } + + private static void testConversionSpeed() { + SpeedExample.test("No-Op Conversion", + tempDb(true) + .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) + .map(tuple -> tuple.mapT2(dict -> Tuples.of(DatabaseMapDictionaryDeep.simple(dict, + SerializerFixedBinaryLength.longSerializer(), + new SubStageGetterSingleBytes() + ), + DatabaseMapDictionaryDeep.simple(dict, + SerializerFixedBinaryLength.longSerializer(), + new SubStageGetterSingleBytes() + ) + ))), + tuple -> Flux + .range(0, SpeedExample.batchSize) + .flatMap(n -> tuple + .getT2() + .getT1() + .putValue(n * 7L, Ints.toByteArray(n * 3)) + .then(tuple.getT2().getT2().getValue(null, n * 7L))) + .then(), + SpeedExample.numRepeats, + tuple -> tuple.getT1().close() + ).then( + SpeedExample.test("Conversion", + tempDb(true) + .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) + .map(tuple -> tuple.mapT2(dict -> Tuples.of(DatabaseMapDictionaryDeep.simple(dict, + SerializerFixedBinaryLength.longSerializer(), + getOldSubStageGetter() + ), + DatabaseMapDictionaryDeep.simple(dict, + SerializerFixedBinaryLength.longSerializer(), + getNewSubStageGetter() + ) + ))), + tuple -> Flux + .range(0, SpeedExample.batchSize) + .flatMap(n -> tuple + .getT2() + .getT1() + .putValue(n * 7L, new OldCustomType(n * 3)) + .then(tuple.getT2().getT2().getValue(null, n * 7L))) + .then(), + SpeedExample.numRepeats, + tuple -> tuple.getT1().close() + )).subscribeOn(Schedulers.parallel()).blockOptional(); + } + + private static void testConversion() { writeOld().then().then(readNew()).subscribeOn(Schedulers.parallel()).blockOptional(); } - private static Mono readNew() { + private static SubStageGetterSingle getNewSubStageGetter() { var newCodec = new NewCustomTypeCodecV2(); var newCodecs = new Codecs(); newCodecs.registerCodec(1, new NewCustomTypeCodecV1()); newCodecs.registerCodec(2, newCodec); var newSerializer = new CodecSerializer<>(newCodecs, newCodec, 2, true); - var newSsg = new SubStageGetterSingle<>(newSerializer); + return new SubStageGetterSingle<>(newSerializer); + } + private static SubStageGetterSingle getOldSubStageGetter() { + var oldCodec = new OldCustomTypeCodec(); + var oldCodecs = new Codecs(); + oldCodecs.registerCodec(1, oldCodec); + var oldSerializer = new CodecSerializer<>(oldCodecs, oldCodec, 1, true); + return new SubStageGetterSingle<>(oldSerializer); + } + + private static Mono readNew() { return tempDb(false) .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, SerializerFixedBinaryLength.longSerializer(), - newSsg + getNewSubStageGetter() ))) .flatMap(tuple -> { System.out.println("Reading from disk current value with any codec id..."); @@ -56,17 +122,12 @@ public class CodecsExample { } private static Mono writeOld() { - var oldCodec = new OldCustomTypeCodec(); - var oldCodecs = new Codecs(); - oldCodecs.registerCodec(1, oldCodec); - var oldSerializer = new CodecSerializer<>(oldCodecs, oldCodec, 1, true); - var oldSsg = new SubStageGetterSingle<>(oldSerializer); return tempDb(true) .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, SerializerFixedBinaryLength.longSerializer(), - oldSsg + getOldSubStageGetter() ))) .flatMap(tuple -> { var oldValue = new OldCustomType(155); diff --git a/src/example/java/it.cavallium.dbengine.client/SpeedExample.java b/src/example/java/it.cavallium.dbengine.client/SpeedExample.java index 1e58bb3..3bc1792 100644 --- a/src/example/java/it.cavallium.dbengine.client/SpeedExample.java +++ b/src/example/java/it.cavallium.dbengine.client/SpeedExample.java @@ -39,26 +39,13 @@ import reactor.util.function.Tuples; public class SpeedExample { public static final boolean printPreviousValue = false; - private static final int numRepeats = 1000; - private static final int batchSize = 1000; + public static final int numRepeats = 1000; + public static final int batchSize = 1000; public static void main(String[] args) throws InterruptedException { + testPutValue().block(); + /* - testAtPut(); - testPutValueAndGetPrevious(); - testPutValue(); - testAtPut() - .then(rangeTestAtPut()) - .then(testPutValue()) - .then(rangeTestPutValue()) - .then(testPutMulti()) - .then(rangeTestPutMulti()) - .subscribeOn(Schedulers.parallel()) - .blockOptional(); - - - */ - rangeTestPutMultiSame() .then(rangeTestPutMultiProgressive()) .then(testPutMulti()) @@ -68,7 +55,7 @@ public class SpeedExample { .then(test3LevelPut()) .then(test4LevelPut()) .subscribeOn(Schedulers.parallel()) - .blockOptional(); + .blockOptional();*/ } private static Mono testCreateQueryable() { @@ -445,7 +432,7 @@ public class SpeedExample { ); } - private static Mono tempDb() { + public static Mono tempDb() { var wrkspcPath = Path.of("/tmp/tempdb/"); return Mono .fromCallable(() -> {