From 5ae015b70143c294f5188a231b9d078f369f2420 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sun, 31 Jan 2021 22:20:00 +0100 Subject: [PATCH] Update Example.java, SubStageGetterMap.java, and SubStageGetterMapRange.java --- .../it.cavallium.dbengine.client/Example.java | 87 ++++++++++++++++--- ...erMapRange.java => SubStageGetterMap.java} | 4 +- 2 files changed, 76 insertions(+), 15 deletions(-) rename src/main/java/it/cavallium/dbengine/database/collections/{SubStageGetterMapRange.java => SubStageGetterMap.java} (78%) diff --git a/src/example/java/it.cavallium.dbengine.client/Example.java b/src/example/java/it.cavallium.dbengine.client/Example.java index 7d6f2c1..1be0893 100644 --- a/src/example/java/it.cavallium.dbengine.client/Example.java +++ b/src/example/java/it.cavallium.dbengine.client/Example.java @@ -1,5 +1,7 @@ package it.cavallium.dbengine.client; +import com.google.common.primitives.Ints; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.LLKeyValueDatabase; @@ -14,9 +16,11 @@ import java.text.DecimalFormat; import java.time.Duration; import java.time.Instant; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.function.Function; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.One; @@ -26,13 +30,19 @@ import reactor.util.function.Tuples; public class Example { private static final boolean printPreviousValue = false; - private static final int numRepeats = 100000; + private static final int numRepeats = 500; + private static final int batchSize = 1000; - public static void main(String[] args) { + public static void main(String[] args) throws InterruptedException { testAtPut(); testPutValueAndGetPrevious(); testPutValue(); - testPutValue() + testAtPut() + .then(rangeTestAtPut()) + .then(testPutValue()) + .then(rangeTestPutValue()) + .then(testPutMulti()) + .then(rangeTestPutMulti()) .subscribeOn(Schedulers.parallel()) .blockOptional(); } @@ -43,7 +53,7 @@ public class Example { var itemKey = new byte[]{0, 1, 2, 3}; var newValue = new byte[]{4, 5, 6, 7}; var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey); - return test("MapDictionaryDeep::at::put (same key, same value)", + return test("MapDictionaryDeep::at::put (same key, same value, " + batchSize + " times)", tempDb() .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ssg, ser))), @@ -59,7 +69,9 @@ public class Example { if (printPreviousValue) System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue))); }) - ), + ) + .repeat(batchSize) + .then(), numRepeats, tuple -> tuple.getT1().close()); } @@ -70,7 +82,7 @@ public class Example { var itemKey = new byte[]{0, 1, 2, 3}; var newValue = new byte[]{4, 5, 6, 7}; var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey); - return test("MapDictionaryDeep::putValueAndGetPrevious (same key, same value)", + return test("MapDictionaryDeep::putValueAndGetPrevious (same key, same value, " + batchSize + " times)", tempDb() .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ssg, ser))), @@ -85,7 +97,9 @@ public class Example { if (printPreviousValue) System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue))); }) - ), + ) + .repeat(batchSize) + .then(), numRepeats, tuple -> tuple.getT1().close()); } @@ -96,7 +110,7 @@ public class Example { var itemKey = new byte[]{0, 1, 2, 3}; var newValue = new byte[]{4, 5, 6, 7}; var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey); - return test("MapDictionaryDeep::putValue (same key, same value)", + return test("MapDictionaryDeep::putValue (same key, same value, " + batchSize + " times)", tempDb() .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ssg, ser))), @@ -107,6 +121,28 @@ public class Example { System.out.println("Setting new value at key " + Arrays.toString(itemKey) + ": " + Arrays.toString(newValue)); }) .then(tuple.getT2().putValue(itemKeyBuffer, newValue)) + ) + .repeat(batchSize) + .then(), + numRepeats, + tuple -> tuple.getT1().close()); + } + + private static Mono testPutMulti() { + var ssg = new SubStageGetterSingleBytes(); + var ser = FixedLengthSerializer.noop(4); + int batchSize = 1000; + HashMap keysToPut = new HashMap<>(); + for (int i = 0; i < 1000; i++) { + keysToPut.put(Unpooled.wrappedBuffer(Ints.toByteArray(i * 3)), Ints.toByteArray(i * 11)); + } + var putMultiFlux = Flux.fromIterable(keysToPut.entrySet()); + return test("MapDictionaryDeep::putMulti (batch of " + batchSize + " entries)", + tempDb() + .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ssg, ser))), + tuple -> Mono + .defer(() -> tuple.getT2().putMulti(putMultiFlux) ), numRepeats, tuple -> tuple.getT1().close()); @@ -118,7 +154,7 @@ public class Example { var itemKey = new byte[]{0, 1, 2, 3}; var newValue = new byte[]{4, 5, 6, 7}; var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey); - return test("MapDictionary::at::put (same key, same value)", + return test("MapDictionary::at::put (same key, same value, " + batchSize + " times)", tempDb() .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, vser))), @@ -134,7 +170,9 @@ public class Example { if (printPreviousValue) System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue))); }) - ), + ) + .repeat(batchSize) + .then(), numRepeats, tuple -> tuple.getT1().close()); } @@ -145,7 +183,7 @@ public class Example { var itemKey = new byte[]{0, 1, 2, 3}; var newValue = new byte[]{4, 5, 6, 7}; var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey); - return test("MapDictionary::putValueAndGetPrevious (same key, same value)", + return test("MapDictionary::putValueAndGetPrevious (same key, same value, " + batchSize + " times)", tempDb() .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, vser))), @@ -160,7 +198,9 @@ public class Example { if (printPreviousValue) System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue))); }) - ), + ) + .repeat(batchSize) + .then(), numRepeats, tuple -> tuple.getT1().close()); } @@ -171,7 +211,7 @@ public class Example { var itemKey = new byte[]{0, 1, 2, 3}; var newValue = new byte[]{4, 5, 6, 7}; var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey); - return test("MapDictionary::putValue (same key, same value)", + return test("MapDictionary::putValue (same key, same value, " + batchSize + " times)", tempDb() .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, vser))), @@ -182,6 +222,27 @@ public class Example { System.out.println("Setting new value at key " + Arrays.toString(itemKey) + ": " + Arrays.toString(newValue)); }) .then(tuple.getT2().putValue(itemKeyBuffer, newValue)) + ) + .repeat(batchSize) + .then(), + numRepeats, + tuple -> tuple.getT1().close()); + } + + private static Mono rangeTestPutMulti() { + var ser = FixedLengthSerializer.noop(4); + var vser = Serializer.noopBytes(); + HashMap keysToPut = new HashMap<>(); + for (int i = 0; i < batchSize; i++) { + keysToPut.put(Unpooled.wrappedBuffer(Ints.toByteArray(i * 3)), Ints.toByteArray(i * 11)); + } + var putMultiFlux = Flux.fromIterable(keysToPut.entrySet()); + return test("MapDictionary::putMulti (batch of " + batchSize + " entries)", + tempDb() + .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, vser))), + tuple -> Mono + .defer(() -> tuple.getT2().putMulti(putMultiFlux) ), numRepeats, tuple -> tuple.getT1().close()); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapRange.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java similarity index 78% rename from src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapRange.java rename to src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java index cd0c74c..a8406ec 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapRange.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java @@ -7,12 +7,12 @@ import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -public class SubStageGetterMapRange implements SubStageGetter, DatabaseStageEntry>> { +public class SubStageGetterMap implements SubStageGetter, DatabaseStageEntry>> { private final FixedLengthSerializer keySerializer; private final Serializer valueSerializer; - public SubStageGetterMapRange(FixedLengthSerializer keySerializer, Serializer valueSerializer) { + public SubStageGetterMap(FixedLengthSerializer keySerializer, Serializer valueSerializer) { this.keySerializer = keySerializer; this.valueSerializer = valueSerializer; }