Update Example.java, SubStageGetterMap.java, and SubStageGetterMapRange.java
This commit is contained in:
parent
4776e4062b
commit
5ae015b701
@ -1,5 +1,7 @@
|
||||
package it.cavallium.dbengine.client;
|
||||
|
||||
import com.google.common.primitives.Ints;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import it.cavallium.dbengine.database.Column;
|
||||
import it.cavallium.dbengine.database.LLKeyValueDatabase;
|
||||
@ -14,9 +16,11 @@ import java.text.DecimalFormat;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.function.Function;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.Sinks;
|
||||
import reactor.core.publisher.Sinks.One;
|
||||
@ -26,13 +30,19 @@ import reactor.util.function.Tuples;
|
||||
public class Example {
|
||||
|
||||
private static final boolean printPreviousValue = false;
|
||||
private static final int numRepeats = 100000;
|
||||
private static final int numRepeats = 500;
|
||||
private static final int batchSize = 1000;
|
||||
|
||||
public static void main(String[] args) {
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
testAtPut();
|
||||
testPutValueAndGetPrevious();
|
||||
testPutValue();
|
||||
testPutValue()
|
||||
testAtPut()
|
||||
.then(rangeTestAtPut())
|
||||
.then(testPutValue())
|
||||
.then(rangeTestPutValue())
|
||||
.then(testPutMulti())
|
||||
.then(rangeTestPutMulti())
|
||||
.subscribeOn(Schedulers.parallel())
|
||||
.blockOptional();
|
||||
}
|
||||
@ -43,7 +53,7 @@ public class Example {
|
||||
var itemKey = new byte[]{0, 1, 2, 3};
|
||||
var newValue = new byte[]{4, 5, 6, 7};
|
||||
var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey);
|
||||
return test("MapDictionaryDeep::at::put (same key, same value)",
|
||||
return test("MapDictionaryDeep::at::put (same key, same value, " + batchSize + " times)",
|
||||
tempDb()
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ssg, ser))),
|
||||
@ -59,7 +69,9 @@ public class Example {
|
||||
if (printPreviousValue)
|
||||
System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue)));
|
||||
})
|
||||
),
|
||||
)
|
||||
.repeat(batchSize)
|
||||
.then(),
|
||||
numRepeats,
|
||||
tuple -> tuple.getT1().close());
|
||||
}
|
||||
@ -70,7 +82,7 @@ public class Example {
|
||||
var itemKey = new byte[]{0, 1, 2, 3};
|
||||
var newValue = new byte[]{4, 5, 6, 7};
|
||||
var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey);
|
||||
return test("MapDictionaryDeep::putValueAndGetPrevious (same key, same value)",
|
||||
return test("MapDictionaryDeep::putValueAndGetPrevious (same key, same value, " + batchSize + " times)",
|
||||
tempDb()
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ssg, ser))),
|
||||
@ -85,7 +97,9 @@ public class Example {
|
||||
if (printPreviousValue)
|
||||
System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue)));
|
||||
})
|
||||
),
|
||||
)
|
||||
.repeat(batchSize)
|
||||
.then(),
|
||||
numRepeats,
|
||||
tuple -> tuple.getT1().close());
|
||||
}
|
||||
@ -96,7 +110,7 @@ public class Example {
|
||||
var itemKey = new byte[]{0, 1, 2, 3};
|
||||
var newValue = new byte[]{4, 5, 6, 7};
|
||||
var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey);
|
||||
return test("MapDictionaryDeep::putValue (same key, same value)",
|
||||
return test("MapDictionaryDeep::putValue (same key, same value, " + batchSize + " times)",
|
||||
tempDb()
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ssg, ser))),
|
||||
@ -107,6 +121,28 @@ public class Example {
|
||||
System.out.println("Setting new value at key " + Arrays.toString(itemKey) + ": " + Arrays.toString(newValue));
|
||||
})
|
||||
.then(tuple.getT2().putValue(itemKeyBuffer, newValue))
|
||||
)
|
||||
.repeat(batchSize)
|
||||
.then(),
|
||||
numRepeats,
|
||||
tuple -> tuple.getT1().close());
|
||||
}
|
||||
|
||||
private static Mono<Void> testPutMulti() {
|
||||
var ssg = new SubStageGetterSingleBytes();
|
||||
var ser = FixedLengthSerializer.noop(4);
|
||||
int batchSize = 1000;
|
||||
HashMap<ByteBuf, byte[]> keysToPut = new HashMap<>();
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
keysToPut.put(Unpooled.wrappedBuffer(Ints.toByteArray(i * 3)), Ints.toByteArray(i * 11));
|
||||
}
|
||||
var putMultiFlux = Flux.fromIterable(keysToPut.entrySet());
|
||||
return test("MapDictionaryDeep::putMulti (batch of " + batchSize + " entries)",
|
||||
tempDb()
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ssg, ser))),
|
||||
tuple -> Mono
|
||||
.defer(() -> tuple.getT2().putMulti(putMultiFlux)
|
||||
),
|
||||
numRepeats,
|
||||
tuple -> tuple.getT1().close());
|
||||
@ -118,7 +154,7 @@ public class Example {
|
||||
var itemKey = new byte[]{0, 1, 2, 3};
|
||||
var newValue = new byte[]{4, 5, 6, 7};
|
||||
var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey);
|
||||
return test("MapDictionary::at::put (same key, same value)",
|
||||
return test("MapDictionary::at::put (same key, same value, " + batchSize + " times)",
|
||||
tempDb()
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, vser))),
|
||||
@ -134,7 +170,9 @@ public class Example {
|
||||
if (printPreviousValue)
|
||||
System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue)));
|
||||
})
|
||||
),
|
||||
)
|
||||
.repeat(batchSize)
|
||||
.then(),
|
||||
numRepeats,
|
||||
tuple -> tuple.getT1().close());
|
||||
}
|
||||
@ -145,7 +183,7 @@ public class Example {
|
||||
var itemKey = new byte[]{0, 1, 2, 3};
|
||||
var newValue = new byte[]{4, 5, 6, 7};
|
||||
var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey);
|
||||
return test("MapDictionary::putValueAndGetPrevious (same key, same value)",
|
||||
return test("MapDictionary::putValueAndGetPrevious (same key, same value, " + batchSize + " times)",
|
||||
tempDb()
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, vser))),
|
||||
@ -160,7 +198,9 @@ public class Example {
|
||||
if (printPreviousValue)
|
||||
System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue)));
|
||||
})
|
||||
),
|
||||
)
|
||||
.repeat(batchSize)
|
||||
.then(),
|
||||
numRepeats,
|
||||
tuple -> tuple.getT1().close());
|
||||
}
|
||||
@ -171,7 +211,7 @@ public class Example {
|
||||
var itemKey = new byte[]{0, 1, 2, 3};
|
||||
var newValue = new byte[]{4, 5, 6, 7};
|
||||
var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey);
|
||||
return test("MapDictionary::putValue (same key, same value)",
|
||||
return test("MapDictionary::putValue (same key, same value, " + batchSize + " times)",
|
||||
tempDb()
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, vser))),
|
||||
@ -182,6 +222,27 @@ public class Example {
|
||||
System.out.println("Setting new value at key " + Arrays.toString(itemKey) + ": " + Arrays.toString(newValue));
|
||||
})
|
||||
.then(tuple.getT2().putValue(itemKeyBuffer, newValue))
|
||||
)
|
||||
.repeat(batchSize)
|
||||
.then(),
|
||||
numRepeats,
|
||||
tuple -> tuple.getT1().close());
|
||||
}
|
||||
|
||||
private static Mono<Void> rangeTestPutMulti() {
|
||||
var ser = FixedLengthSerializer.noop(4);
|
||||
var vser = Serializer.noopBytes();
|
||||
HashMap<ByteBuf, byte[]> keysToPut = new HashMap<>();
|
||||
for (int i = 0; i < batchSize; i++) {
|
||||
keysToPut.put(Unpooled.wrappedBuffer(Ints.toByteArray(i * 3)), Ints.toByteArray(i * 11));
|
||||
}
|
||||
var putMultiFlux = Flux.fromIterable(keysToPut.entrySet());
|
||||
return test("MapDictionary::putMulti (batch of " + batchSize + " entries)",
|
||||
tempDb()
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, vser))),
|
||||
tuple -> Mono
|
||||
.defer(() -> tuple.getT2().putMulti(putMultiFlux)
|
||||
),
|
||||
numRepeats,
|
||||
tuple -> tuple.getT1().close());
|
||||
|
@ -7,12 +7,12 @@ import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class SubStageGetterMapRange<T, U> implements SubStageGetter<Map<T, U>, DatabaseStageEntry<Map<T, U>>> {
|
||||
public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, DatabaseStageEntry<Map<T, U>>> {
|
||||
|
||||
private final FixedLengthSerializer<T> keySerializer;
|
||||
private final Serializer<U> valueSerializer;
|
||||
|
||||
public SubStageGetterMapRange(FixedLengthSerializer<T> keySerializer, Serializer<U> valueSerializer) {
|
||||
public SubStageGetterMap(FixedLengthSerializer<T> keySerializer, Serializer<U> valueSerializer) {
|
||||
this.keySerializer = keySerializer;
|
||||
this.valueSerializer = valueSerializer;
|
||||
}
|
Loading…
Reference in New Issue
Block a user