Update CodecsExample.java and SpeedExample.java
This commit is contained in:
parent
0cdb066b19
commit
591b424a76
@ -1,11 +1,13 @@
|
|||||||
package it.cavallium.dbengine.client;
|
package it.cavallium.dbengine.client;
|
||||||
|
|
||||||
|
import com.google.common.primitives.Ints;
|
||||||
import io.netty.buffer.ByteBufInputStream;
|
import io.netty.buffer.ByteBufInputStream;
|
||||||
import io.netty.buffer.ByteBufOutputStream;
|
import io.netty.buffer.ByteBufOutputStream;
|
||||||
import it.cavallium.dbengine.database.Column;
|
import it.cavallium.dbengine.database.Column;
|
||||||
import it.cavallium.dbengine.database.LLKeyValueDatabase;
|
import it.cavallium.dbengine.database.LLKeyValueDatabase;
|
||||||
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
|
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
|
||||||
import it.cavallium.dbengine.database.collections.SubStageGetterSingle;
|
import it.cavallium.dbengine.database.collections.SubStageGetterSingle;
|
||||||
|
import it.cavallium.dbengine.database.collections.SubStageGetterSingleBytes;
|
||||||
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
|
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
|
||||||
import it.cavallium.dbengine.database.serialization.Codec;
|
import it.cavallium.dbengine.database.serialization.Codec;
|
||||||
import it.cavallium.dbengine.database.serialization.CodecSerializer;
|
import it.cavallium.dbengine.database.serialization.CodecSerializer;
|
||||||
@ -19,6 +21,7 @@ import java.util.List;
|
|||||||
import java.util.StringJoiner;
|
import java.util.StringJoiner;
|
||||||
import java.util.concurrent.CompletionException;
|
import java.util.concurrent.CompletionException;
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
import reactor.core.scheduler.Schedulers;
|
import reactor.core.scheduler.Schedulers;
|
||||||
import reactor.util.function.Tuples;
|
import reactor.util.function.Tuples;
|
||||||
@ -26,22 +29,85 @@ import reactor.util.function.Tuples;
|
|||||||
public class CodecsExample {
|
public class CodecsExample {
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
|
testConversionSpeed();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void testConversionSpeed() {
|
||||||
|
SpeedExample.test("No-Op Conversion",
|
||||||
|
tempDb(true)
|
||||||
|
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||||
|
.map(tuple -> tuple.mapT2(dict -> Tuples.of(DatabaseMapDictionaryDeep.simple(dict,
|
||||||
|
SerializerFixedBinaryLength.longSerializer(),
|
||||||
|
new SubStageGetterSingleBytes()
|
||||||
|
),
|
||||||
|
DatabaseMapDictionaryDeep.simple(dict,
|
||||||
|
SerializerFixedBinaryLength.longSerializer(),
|
||||||
|
new SubStageGetterSingleBytes()
|
||||||
|
)
|
||||||
|
))),
|
||||||
|
tuple -> Flux
|
||||||
|
.range(0, SpeedExample.batchSize)
|
||||||
|
.flatMap(n -> tuple
|
||||||
|
.getT2()
|
||||||
|
.getT1()
|
||||||
|
.putValue(n * 7L, Ints.toByteArray(n * 3))
|
||||||
|
.then(tuple.getT2().getT2().getValue(null, n * 7L)))
|
||||||
|
.then(),
|
||||||
|
SpeedExample.numRepeats,
|
||||||
|
tuple -> tuple.getT1().close()
|
||||||
|
).then(
|
||||||
|
SpeedExample.test("Conversion",
|
||||||
|
tempDb(true)
|
||||||
|
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||||
|
.map(tuple -> tuple.mapT2(dict -> Tuples.of(DatabaseMapDictionaryDeep.simple(dict,
|
||||||
|
SerializerFixedBinaryLength.longSerializer(),
|
||||||
|
getOldSubStageGetter()
|
||||||
|
),
|
||||||
|
DatabaseMapDictionaryDeep.simple(dict,
|
||||||
|
SerializerFixedBinaryLength.longSerializer(),
|
||||||
|
getNewSubStageGetter()
|
||||||
|
)
|
||||||
|
))),
|
||||||
|
tuple -> Flux
|
||||||
|
.range(0, SpeedExample.batchSize)
|
||||||
|
.flatMap(n -> tuple
|
||||||
|
.getT2()
|
||||||
|
.getT1()
|
||||||
|
.putValue(n * 7L, new OldCustomType(n * 3))
|
||||||
|
.then(tuple.getT2().getT2().getValue(null, n * 7L)))
|
||||||
|
.then(),
|
||||||
|
SpeedExample.numRepeats,
|
||||||
|
tuple -> tuple.getT1().close()
|
||||||
|
)).subscribeOn(Schedulers.parallel()).blockOptional();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void testConversion() {
|
||||||
writeOld().then().then(readNew()).subscribeOn(Schedulers.parallel()).blockOptional();
|
writeOld().then().then(readNew()).subscribeOn(Schedulers.parallel()).blockOptional();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Mono<Void> readNew() {
|
private static SubStageGetterSingle<CurrentCustomType> getNewSubStageGetter() {
|
||||||
var newCodec = new NewCustomTypeCodecV2();
|
var newCodec = new NewCustomTypeCodecV2();
|
||||||
var newCodecs = new Codecs<CurrentCustomType>();
|
var newCodecs = new Codecs<CurrentCustomType>();
|
||||||
newCodecs.registerCodec(1, new NewCustomTypeCodecV1());
|
newCodecs.registerCodec(1, new NewCustomTypeCodecV1());
|
||||||
newCodecs.registerCodec(2, newCodec);
|
newCodecs.registerCodec(2, newCodec);
|
||||||
var newSerializer = new CodecSerializer<>(newCodecs, newCodec, 2, true);
|
var newSerializer = new CodecSerializer<>(newCodecs, newCodec, 2, true);
|
||||||
var newSsg = new SubStageGetterSingle<>(newSerializer);
|
return new SubStageGetterSingle<>(newSerializer);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static SubStageGetterSingle<OldCustomType> getOldSubStageGetter() {
|
||||||
|
var oldCodec = new OldCustomTypeCodec();
|
||||||
|
var oldCodecs = new Codecs<OldCustomType>();
|
||||||
|
oldCodecs.registerCodec(1, oldCodec);
|
||||||
|
var oldSerializer = new CodecSerializer<>(oldCodecs, oldCodec, 1, true);
|
||||||
|
return new SubStageGetterSingle<>(oldSerializer);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Mono<Void> readNew() {
|
||||||
return tempDb(false)
|
return tempDb(false)
|
||||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict,
|
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict,
|
||||||
SerializerFixedBinaryLength.longSerializer(),
|
SerializerFixedBinaryLength.longSerializer(),
|
||||||
newSsg
|
getNewSubStageGetter()
|
||||||
)))
|
)))
|
||||||
.flatMap(tuple -> {
|
.flatMap(tuple -> {
|
||||||
System.out.println("Reading from disk current value with any codec id...");
|
System.out.println("Reading from disk current value with any codec id...");
|
||||||
@ -56,17 +122,12 @@ public class CodecsExample {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static Mono<Void> writeOld() {
|
private static Mono<Void> writeOld() {
|
||||||
var oldCodec = new OldCustomTypeCodec();
|
|
||||||
var oldCodecs = new Codecs<OldCustomType>();
|
|
||||||
oldCodecs.registerCodec(1, oldCodec);
|
|
||||||
var oldSerializer = new CodecSerializer<>(oldCodecs, oldCodec, 1, true);
|
|
||||||
var oldSsg = new SubStageGetterSingle<>(oldSerializer);
|
|
||||||
|
|
||||||
return tempDb(true)
|
return tempDb(true)
|
||||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict,
|
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict,
|
||||||
SerializerFixedBinaryLength.longSerializer(),
|
SerializerFixedBinaryLength.longSerializer(),
|
||||||
oldSsg
|
getOldSubStageGetter()
|
||||||
)))
|
)))
|
||||||
.flatMap(tuple -> {
|
.flatMap(tuple -> {
|
||||||
var oldValue = new OldCustomType(155);
|
var oldValue = new OldCustomType(155);
|
||||||
|
@ -39,26 +39,13 @@ import reactor.util.function.Tuples;
|
|||||||
public class SpeedExample {
|
public class SpeedExample {
|
||||||
|
|
||||||
public static final boolean printPreviousValue = false;
|
public static final boolean printPreviousValue = false;
|
||||||
private static final int numRepeats = 1000;
|
public static final int numRepeats = 1000;
|
||||||
private static final int batchSize = 1000;
|
public static final int batchSize = 1000;
|
||||||
|
|
||||||
public static void main(String[] args) throws InterruptedException {
|
public static void main(String[] args) throws InterruptedException {
|
||||||
|
testPutValue().block();
|
||||||
|
|
||||||
/*
|
/*
|
||||||
testAtPut();
|
|
||||||
testPutValueAndGetPrevious();
|
|
||||||
testPutValue();
|
|
||||||
testAtPut()
|
|
||||||
.then(rangeTestAtPut())
|
|
||||||
.then(testPutValue())
|
|
||||||
.then(rangeTestPutValue())
|
|
||||||
.then(testPutMulti())
|
|
||||||
.then(rangeTestPutMulti())
|
|
||||||
.subscribeOn(Schedulers.parallel())
|
|
||||||
.blockOptional();
|
|
||||||
|
|
||||||
|
|
||||||
*/
|
|
||||||
|
|
||||||
rangeTestPutMultiSame()
|
rangeTestPutMultiSame()
|
||||||
.then(rangeTestPutMultiProgressive())
|
.then(rangeTestPutMultiProgressive())
|
||||||
.then(testPutMulti())
|
.then(testPutMulti())
|
||||||
@ -68,7 +55,7 @@ public class SpeedExample {
|
|||||||
.then(test3LevelPut())
|
.then(test3LevelPut())
|
||||||
.then(test4LevelPut())
|
.then(test4LevelPut())
|
||||||
.subscribeOn(Schedulers.parallel())
|
.subscribeOn(Schedulers.parallel())
|
||||||
.blockOptional();
|
.blockOptional();*/
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Mono<Void> testCreateQueryable() {
|
private static Mono<Void> testCreateQueryable() {
|
||||||
@ -445,7 +432,7 @@ public class SpeedExample {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <U> Mono<? extends LLKeyValueDatabase> tempDb() {
|
public static <U> Mono<? extends LLKeyValueDatabase> tempDb() {
|
||||||
var wrkspcPath = Path.of("/tmp/tempdb/");
|
var wrkspcPath = Path.of("/tmp/tempdb/");
|
||||||
return Mono
|
return Mono
|
||||||
.fromCallable(() -> {
|
.fromCallable(() -> {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user