Update pom.xml, Example.java, and 9 more files...

This commit is contained in:
Andrea Cavalli 2021-02-02 15:36:11 +01:00
parent dbca36b3aa
commit d9187b70a9
11 changed files with 402 additions and 53 deletions

View File

@ -26,6 +26,11 @@
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>org.warp</groupId>
<artifactId>common-utils</artifactId>
@ -138,12 +143,12 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.1</version>
<version>3.4.2</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-tools</artifactId>
<version>3.4.1</version>
<version>3.4.2</version>
</dependency>
</dependencies>

View File

@ -1,12 +1,18 @@
package it.cavallium.dbengine.client;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionary;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
import it.cavallium.dbengine.database.collections.DatabaseStageEntry;
import it.cavallium.dbengine.database.collections.DatabaseStageMap;
import it.cavallium.dbengine.database.collections.QueryableBuilder;
import it.cavallium.dbengine.database.collections.Serializer;
import it.cavallium.dbengine.database.collections.SerializerFixedBinaryLength;
import it.cavallium.dbengine.database.collections.SubStageGetterMap;
import it.cavallium.dbengine.database.collections.SubStageGetterMapDeep;
import it.cavallium.dbengine.database.collections.SubStageGetterSingleBytes;
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
import java.io.IOException;
@ -20,6 +26,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
@ -32,9 +39,9 @@ import reactor.util.function.Tuples;
public class Example {
private static final boolean printPreviousValue = false;
public static final boolean printPreviousValue = false;
private static final int numRepeats = 1000;
private static final int batchSize = 10000;
private static final int batchSize = 1000;
public static void main(String[] args) throws InterruptedException {
/*
@ -53,12 +60,169 @@ public class Example {
*/
rangeTestPutMultiProgressive()
.then(rangeTestPutMultiSame())
rangeTestPutMultiSame()
.then(rangeTestPutMultiProgressive())
.then(testPutMulti())
.then(testPutValue())
.then(testAtPut())
.then(test2LevelPut())
.then(test3LevelPut())
.then(test4LevelPut())
.subscribeOn(Schedulers.parallel())
.blockOptional();
}
private static Mono<Void> testCreateQueryable() {
var ssg = new SubStageGetterSingleBytes();
var ser = SerializerFixedBinaryLength.noop(4);
var itemKey = new byte[]{0, 1, 2, 3};
var newValue = new byte[]{4, 5, 6, 7};
return test("Create Queryable",
tempDb()
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
.map(tuple -> tuple.mapT2(dict -> {
var builder = new QueryableBuilder(2);
return builder.wrap(DatabaseMapDictionaryDeep.simple(dict, builder.tail(ssg, ser), builder.serializer()));
})),
tuple -> Flux.range(0, batchSize).flatMap(n -> Mono
.defer(() -> Mono
.fromRunnable(() -> {
if (printPreviousValue)
System.out.println("Setting new value at key " + Arrays.toString(itemKey) + ": " + Arrays.toString(newValue));
})
.then(tuple.getT2().at(null, itemKey))
.flatMap(handle -> handle.setAndGetPrevious(newValue))
.doOnSuccess(oldValue -> {
if (printPreviousValue)
System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue)));
})
))
.then(),
numRepeats,
tuple -> tuple.getT1().close());
}
private static Mono<Void> test2LevelPut() {
var k1ser = SerializerFixedBinaryLength.noop(4);
var k2ser = SerializerFixedBinaryLength.noop(4);
var vser = SerializerFixedBinaryLength.noop(4);
var ssg = new SubStageGetterMap<byte[], byte[]>(k2ser, vser);
return test("2 level put",
tempDb()
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.deepTail(dict, ssg, k1ser, ssg.getKeyBinaryLength()))),
tuple -> Flux.range(0, batchSize).flatMap(n -> {
var itemKey1 = Ints.toByteArray(n / 4);
var itemKey2 = Ints.toByteArray(n);
var newValue = Ints.toByteArray(n);
return Mono
.defer(() -> Mono
.fromRunnable(() -> {
if (printPreviousValue)
System.out.println("Setting new value at key " + Arrays.toString(itemKey1) + "+" + Arrays.toString(itemKey2) + ": " + Arrays.toString(newValue));
})
.then(tuple.getT2().at(null, itemKey1))
.map(handle -> (DatabaseStageMap<byte[], byte[], DatabaseStageEntry<byte[]>>) handle)
.flatMap(handleK1 -> handleK1.at(null, itemKey2))
.flatMap(handleK2 -> handleK2.setAndGetPrevious(newValue))
.doOnSuccess(oldValue -> {
if (printPreviousValue)
System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue)));
})
);
})
.then(),
numRepeats,
tuple -> tuple.getT1().close());
}
private static Mono<Void> test3LevelPut() {
var k1ser = SerializerFixedBinaryLength.noop(4);
var k2ser = SerializerFixedBinaryLength.noop(8);
var k3ser = SerializerFixedBinaryLength.noop(4);
var vser = SerializerFixedBinaryLength.noop(4);
var ssg3 = new SubStageGetterMap<byte[], byte[]>(k3ser, vser);
var ssg2 = new SubStageGetterMapDeep<>(ssg3, k2ser, ssg3.getKeyBinaryLength());
return test("3 level put",
tempDb()
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
.map(tuple -> tuple.mapT2(dict -> {
return DatabaseMapDictionaryDeep.deepTail(dict, ssg2, k1ser, ssg2.getKeyBinaryLength());
})),
tuple -> Flux.range(0, batchSize).flatMap(n -> {
var itemKey1 = Ints.toByteArray(n / 4);
var itemKey2 = Longs.toByteArray(n);
var itemKey3 = Ints.toByteArray(n);
var newValue = Ints.toByteArray(n);
return Mono
.defer(() -> Mono
.fromRunnable(() -> {
if (printPreviousValue)
System.out.println("Setting new value at key " + Arrays.toString(itemKey1) + "+" + Arrays.toString(itemKey2) + "+" + Arrays.toString(itemKey3) + ": " + Arrays.toString(newValue));
})
.then(tuple.getT2().at(null, itemKey1))
.map(handle -> (DatabaseStageMap<byte[], Map<byte[], byte[]>, DatabaseStageEntry<Map<byte[], byte[]>>>) handle)
.flatMap(handleK1 -> handleK1.at(null, itemKey2))
.map(handle -> (DatabaseStageMap<byte[], byte[], DatabaseStageEntry<byte[]>>) handle)
.flatMap(handleK2 -> handleK2.at(null, itemKey3))
.flatMap(handleK3 -> handleK3.setAndGetPrevious(newValue))
.doOnSuccess(oldValue -> {
if (printPreviousValue)
System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue)));
})
);
})
.then(),
numRepeats,
tuple -> tuple.getT1().close());
}
private static Mono<Void> test4LevelPut() {
var k1ser = SerializerFixedBinaryLength.noop(4);
var k2ser = SerializerFixedBinaryLength.noop(8);
var k3ser = SerializerFixedBinaryLength.noop(4);
var k4ser = SerializerFixedBinaryLength.noop(8);
var vser = SerializerFixedBinaryLength.noop(4);
var ssg4 = new SubStageGetterMap<byte[], byte[]>(k4ser, vser);
var ssg3 = new SubStageGetterMapDeep<>(ssg4, k3ser, ssg4.getKeyBinaryLength());
var ssg2 = new SubStageGetterMapDeep<>(ssg3, k2ser, ssg3.getKeyBinaryLength());
return test("4 level put",
tempDb()
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
.map(tuple -> tuple.mapT2(dict -> {
return DatabaseMapDictionaryDeep.deepTail(dict, ssg2, k1ser, ssg2.getKeyBinaryLength());
})),
tuple -> Flux.range(0, batchSize).flatMap(n -> {
var itemKey1 = Ints.toByteArray(n / 4);
var itemKey2 = Longs.toByteArray(n);
var itemKey3 = Ints.toByteArray(n * 2);
var itemKey4 = Longs.toByteArray(n * 3L);
var newValue = Ints.toByteArray(n * 4);
return Mono
.defer(() -> Mono
.fromRunnable(() -> {
if (printPreviousValue)
System.out.println("Setting new value at key " + Arrays.toString(itemKey1) + "+" + Arrays.toString(itemKey2) + "+" + Arrays.toString(itemKey3) + "+" + Arrays.toString(itemKey4) + ": " + Arrays.toString(newValue));
})
.then(tuple.getT2().at(null, itemKey1))
.map(handle -> (DatabaseStageMap<byte[], Map<byte[], Map<byte[], byte[]>>, DatabaseStageEntry<Map<byte[], Map<byte[], byte[]>>>>) handle)
.flatMap(handleK1 -> handleK1.at(null, itemKey2))
.map(handle -> (DatabaseStageMap<byte[], Map<byte[], byte[]>, DatabaseStageEntry<Map<byte[], byte[]>>>) handle)
.flatMap(handleK2 -> handleK2.at(null, itemKey3))
.map(handle -> (DatabaseStageMap<byte[], byte[], DatabaseStageEntry<byte[]>>) handle)
.flatMap(handleK3 -> handleK3.at(null, itemKey4))
.flatMap(handleK4 -> handleK4.setAndGetPrevious(newValue))
.doOnSuccess(oldValue -> {
if (printPreviousValue)
System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue)));
})
);
})
.then(),
numRepeats,
tuple -> tuple.getT1().close());
}
private static Mono<Void> testAtPut() {
var ssg = new SubStageGetterSingleBytes();
var ser = SerializerFixedBinaryLength.noop(4);
@ -238,7 +402,7 @@ public class Example {
for (int i = 0; i < batchSize; i++) {
keysToPut.put(Ints.toByteArray(i * 3), Ints.toByteArray(i * 11));
}
return test("MapDictionary::putMulti (batch of " + batchSize + " entries)",
return test("MapDictionary::putMulti (same keys, batch of " + batchSize + " entries)",
tempDb()
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, vser))),
@ -258,7 +422,7 @@ public class Example {
var ser = SerializerFixedBinaryLength.noop(4);
var vser = Serializer.noop();
AtomicInteger ai = new AtomicInteger(0);
return test("MapDictionary::putMulti (batch of " + batchSize + " entries)",
return test("MapDictionary::putMulti (progressive keys, batch of " + batchSize + " entries)",
tempDb()
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, vser))),

View File

@ -53,5 +53,9 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, LLRange range, boolean fast);
Mono<Entry<byte[], byte[]>> getOne(@Nullable LLSnapshot snapshot, LLRange range);
Mono<byte[]> getOneKey(@Nullable LLSnapshot snapshot, LLRange range);
Mono<Entry<byte[], byte[]>> removeOne(LLRange range);
}

View File

@ -40,7 +40,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
byte fillValue) {
assert prefixKey.length == prefixLength;
assert suffixLength > 0;
assert extLength > 0;
assert extLength >= 0;
byte[] result = Arrays.copyOf(prefixKey, prefixLength + suffixLength + extLength);
Arrays.fill(result, prefixLength, result.length, fillValue);
return result;
@ -71,7 +71,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
assert prefixKey.length == prefixLength;
assert suffixKey.length == suffixLength;
assert suffixLength > 0;
assert extLength > 0;
assert extLength >= 0;
byte[] result = Arrays.copyOf(prefixKey, prefixLength + suffixLength + extLength);
System.arraycopy(suffixKey, 0, result, prefixLength, suffixLength);
Arrays.fill(result, prefixLength + suffixLength, result.length, fillValue);
@ -120,6 +120,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
byte[] firstKey = firstKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength);
byte[] lastKey = lastKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength);
this.range = keyPrefix.length == 0 ? LLRange.all() : LLRange.of(firstKey, lastKey);
assert subStageKeysConsistency(keyPrefix.length + keySuffixLength + keyExtLength);
}
@SuppressWarnings("unused")
@ -167,6 +168,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
assert suffixKey.length == keySuffixLength;
byte[] result = Arrays.copyOf(keyPrefix, keyPrefix.length + keySuffixLength);
System.arraycopy(suffixKey, 0, result, keyPrefix.length, keySuffixLength);
assert result.length == keyPrefix.length + keySuffixLength;
return result;
}
@ -193,6 +195,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
return LLRange.of(first, end);
}
@SuppressWarnings("ReactiveStreamsUnusedPublisher")
@Override
public Mono<US> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
byte[] keySuffixData = serializeSuffix(keySuffix);
@ -200,26 +203,57 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
.subStage(dictionary,
snapshot,
toKeyWithoutExt(keySuffixData),
this.dictionary.getRangeKeys(resolveSnapshot(snapshot), toExtRange(keySuffixData))
this.subStageGetter.needsKeyFlux()
? this.dictionary.getRangeKeys(resolveSnapshot(snapshot), toExtRange(keySuffixData))
: Flux.empty()
);
}
@Override
public Flux<Entry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot) {
return dictionary
.getRangeKeysGrouped(resolveSnapshot(snapshot), range, keyPrefix.length + keySuffixLength)
.flatMap(rangeKeys -> {
//System.out.println(Thread.currentThread() + "\tkReceived range key flux");
byte[] groupKeyWithoutExt = removeExtFromFullKey(rangeKeys.get(0));
byte[] groupSuffix = this.stripPrefix(groupKeyWithoutExt);
return this.subStageGetter
.subStage(dictionary, snapshot, groupKeyWithoutExt, Flux.fromIterable(rangeKeys))
//.doOnSuccess(s -> System.out.println(Thread.currentThread() + "\tObtained stage for a key"))
return Flux.defer(() -> {
if (this.subStageGetter.needsKeyFlux()) {
return dictionary
.getRangeKeysGrouped(resolveSnapshot(snapshot), range, keyPrefix.length + keySuffixLength)
.flatMap(rangeKeys -> {
byte[] groupKeyWithExt = rangeKeys.get(0);
byte[] groupKeyWithoutExt = removeExtFromFullKey(groupKeyWithExt);
byte[] groupSuffix = this.stripPrefix(groupKeyWithoutExt);
assert subStageKeysConsistency(groupKeyWithExt.length);
return this.subStageGetter
.subStage(dictionary,
snapshot,
groupKeyWithoutExt,
this.subStageGetter.needsKeyFlux() ? Flux.defer(() -> Flux.fromIterable(rangeKeys)) : Flux.empty()
)
.map(us -> Map.entry(this.deserializeSuffix(groupSuffix), us));
//.doOnSuccess(s -> System.out.println(Thread.currentThread() + "\tMapped stage for a key"));
}
);
//.doOnNext(s -> System.out.println(Thread.currentThread() + "\tNext stage"))
});
} else {
return dictionary
.getOneKey(resolveSnapshot(snapshot), range)
.flatMap(randomKeyWithExt -> {
byte[] keyWithoutExt = removeExtFromFullKey(randomKeyWithExt);
byte[] keySuffix = this.stripPrefix(keyWithoutExt);
assert subStageKeysConsistency(keyWithoutExt.length);
return this.subStageGetter
.subStage(dictionary, snapshot, keyWithoutExt, Mono.just(randomKeyWithExt).flux())
.map(us -> Map.entry(this.deserializeSuffix(keySuffix), us));
});
}
});
}
private boolean subStageKeysConsistency(int totalKeyLength) {
if (subStageGetter instanceof SubStageGetterMapDeep) {
return totalKeyLength
== keyPrefix.length + keySuffixLength + ((SubStageGetterMapDeep<?, ?, ?>) subStageGetter).getKeyBinaryLength();
} else if (subStageGetter instanceof SubStageGetterMap) {
return totalKeyLength
== keyPrefix.length + keySuffixLength + ((SubStageGetterMap<?, ?>) subStageGetter).getKeyBinaryLength();
} else {
return true;
}
}
@Override
@ -234,11 +268,14 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
//todo: temporary wrapper. convert the whole class to buffers
protected T deserializeSuffix(byte[] keySuffix) {
assert suffixKeyConsistency(keySuffix.length);
return keySuffixSerializer.deserialize(keySuffix);
}
//todo: temporary wrapper. convert the whole class to buffers
protected byte[] serializeSuffix(T keySuffix) {
return keySuffixSerializer.serialize(keySuffix);
byte[] suffixData = keySuffixSerializer.serialize(keySuffix);
assert suffixKeyConsistency(suffixData.length);
return suffixData;
}
}

View File

@ -1,14 +1,13 @@
package it.cavallium.dbengine.database.collections;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.collections.Joiner.ValueGetter;
import it.cavallium.dbengine.database.collections.JoinerBlocking.ValueGetterBlocking;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
import it.cavallium.dbengine.database.collections.Joiner.ValueGetter;
import it.cavallium.dbengine.database.collections.JoinerBlocking.ValueGetterBlocking;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -26,7 +25,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
}
default Mono<Void> putValue(T key, U value) {
return putValueAndGetStatus(key, value).then();
return at(null, key).single().flatMap(v -> v.set(value));
}
default Mono<U> putValueAndGetPrevious(T key, U value) {
@ -34,7 +33,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
}
default Mono<Boolean> putValueAndGetStatus(T key, U value) {
return putValueAndGetPrevious(key, value).map(oldValue -> !Objects.equals(oldValue, value)).defaultIfEmpty(false);
return at(null, key).single().flatMap(v -> v.setAndGetStatus(value));
}
default Mono<Void> remove(T key) {

View File

@ -0,0 +1,22 @@
package it.cavallium.dbengine.database.collections;
public class QueryableBuilder {
public QueryableBuilder(int stagesNumber) {
}
public SerializerFixedBinaryLength<byte[], byte[]> serializer() {
return null;
}
public <T, U extends SubStageGetterSingle<T>> SubStageGetterSingleBytes tail(U ssg,
SerializerFixedBinaryLength<T, byte[]> ser) {
return null;
}
public <T, U, US extends DatabaseStage<U>, M extends DatabaseStageMap<T, U, US>> M wrap(M map) {
return null;
}
}

View File

@ -12,4 +12,6 @@ public interface SubStageGetter<U, US extends DatabaseStage<U>> {
@Nullable CompositeSnapshot snapshot,
byte[] prefixKey,
Flux<byte[]> keyFlux);
boolean needsKeyFlux();
}

View File

@ -9,6 +9,15 @@ import reactor.core.publisher.Mono;
public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, DatabaseStageEntry<Map<T, U>>> {
private static final boolean assertsEnabled;
static {
boolean assertsEnabledTmp = false;
//noinspection AssertWithSideEffects
assert assertsEnabledTmp = true;
//noinspection ConstantConditions
assertsEnabled = assertsEnabledTmp;
}
private final SerializerFixedBinaryLength<T, byte[]> keySerializer;
private final Serializer<U, byte[]> valueSerializer;
@ -23,6 +32,30 @@ public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, Databa
@Nullable CompositeSnapshot snapshot,
byte[] prefixKey,
Flux<byte[]> keyFlux) {
return Mono.just(DatabaseMapDictionary.tail(dictionary, keySerializer, valueSerializer, prefixKey));
Mono<DatabaseStageEntry<Map<T, U>>> result = Mono.just(DatabaseMapDictionary.tail(dictionary,
keySerializer,
valueSerializer,
prefixKey
));
if (assertsEnabled) {
return checkKeyFluxConsistency(prefixKey, keyFlux).then(result);
} else {
return result;
}
}
@Override
public boolean needsKeyFlux() {
return assertsEnabled;
}
private Mono<Void> checkKeyFluxConsistency(byte[] prefixKey, Flux<byte[]> keyFlux) {
return keyFlux.doOnNext(key -> {
assert key.length == prefixKey.length + getKeyBinaryLength();
}).then();
}
public int getKeyBinaryLength() {
return keySerializer.getSerializedBinaryLength();
}
}

View File

@ -10,6 +10,15 @@ import reactor.core.publisher.Mono;
public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
SubStageGetter<Map<T, U>, DatabaseStageEntry<Map<T, U>>> {
private static final boolean assertsEnabled;
static {
boolean assertsEnabledTmp = false;
//noinspection AssertWithSideEffects
assert assertsEnabledTmp = true;
//noinspection ConstantConditions
assertsEnabled = assertsEnabledTmp;
}
private final SubStageGetter<U, US> subStageGetter;
private final SerializerFixedBinaryLength<T, byte[]> keySerializer;
private final int keyExtLength;
@ -20,6 +29,18 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
this.subStageGetter = subStageGetter;
this.keySerializer = keySerializer;
this.keyExtLength = keyExtLength;
assert keyExtConsistency();
}
private boolean keyExtConsistency() {
if (subStageGetter instanceof SubStageGetterMapDeep) {
return keyExtLength == ((SubStageGetterMapDeep<?, ?, ?>) subStageGetter).getKeyBinaryLength();
} else if (subStageGetter instanceof SubStageGetterMap) {
return keyExtLength == ((SubStageGetterMap<?, ?>) subStageGetter).getKeyBinaryLength();
} else {
return true;
}
}
@Override
@ -27,11 +48,31 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
@Nullable CompositeSnapshot snapshot,
byte[] prefixKey,
Flux<byte[]> keyFlux) {
return Mono.just(DatabaseMapDictionaryDeep.deepIntermediate(dictionary,
Mono<DatabaseStageEntry<Map<T, U>>> result = Mono.just(DatabaseMapDictionaryDeep.deepIntermediate(dictionary,
subStageGetter,
keySerializer,
prefixKey,
keyExtLength
));
if (assertsEnabled) {
return checkKeyFluxConsistency(prefixKey, keyFlux).then(result);
} else {
return result;
}
}
@Override
public boolean needsKeyFlux() {
return assertsEnabled;
}
private Mono<Void> checkKeyFluxConsistency(byte[] prefixKey, Flux<byte[]> keyFlux) {
return keyFlux.doOnNext(key -> {
assert key.length == prefixKey.length + getKeyBinaryLength();
}).then();
}
public int getKeyBinaryLength() {
return keySerializer.getSerializedBinaryLength() + keyExtLength;
}
}

View File

@ -41,6 +41,11 @@ public class SubStageGetterSingle<T> implements SubStageGetter<T, DatabaseStageE
}));
}
@Override
public boolean needsKeyFlux() {
return true;
}
//todo: temporary wrapper. convert the whole class to buffers
private T deserialize(byte[] bytes) {
return serializer.deserialize(bytes);

View File

@ -25,6 +25,8 @@ import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Snapshot;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -33,6 +35,7 @@ import reactor.core.scheduler.Scheduler;
@NotAtomic
public class LLLocalDictionary implements LLDictionary {
protected static final Logger logger = LoggerFactory.getLogger(LLLocalDictionary.class);
private static final boolean USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS = true;
static final int RESERVED_WRITE_BATCH_SIZE = 2 * 1024 * 1024; // 2MiB
static final long MAX_WRITE_BATCH_SIZE = 1024L * 1024L * 1024L; // 1GiB
@ -88,6 +91,7 @@ public class LLLocalDictionary implements LLDictionary {
public Mono<byte[]> get(@Nullable LLSnapshot snapshot, byte[] key) {
return Mono
.fromCallable(() -> {
logger.trace("Reading {}", key);
Holder<byte[]> data = new Holder<>();
if (db.keyMayExist(cfh, resolveSnapshot(snapshot), key, data)) {
if (data.getValue() != null) {
@ -160,6 +164,7 @@ public class LLLocalDictionary implements LLDictionary {
return getPrevValue(key, resultType)
.concatWith(Mono
.fromCallable(() -> {
logger.trace("Writing {}: {}", key, value);
db.put(cfh, key, value);
return null;
})
@ -191,6 +196,7 @@ public class LLLocalDictionary implements LLDictionary {
case PREVIOUS_VALUE:
return Mono
.fromCallable(() -> {
logger.trace("Reading {}", key);
var data = new Holder<byte[]>();
if (db.keyMayExist(cfh, key, data)) {
if (data.getValue() != null) {
@ -249,7 +255,6 @@ public class LLLocalDictionary implements LLDictionary {
.getMulti(null, Flux.fromIterable(entriesWindow).map(Entry::getKey))
.publishOn(dbScheduler)
.concatWith(Mono.fromCallable(() -> {
//System.out.println(Thread.currentThread()+"\tTest");
var batch = new CappedWriteBatch(db,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
@ -346,7 +351,6 @@ public class LLLocalDictionary implements LLDictionary {
private Flux<Entry<byte[],byte[]>> getRangeMulti(LLSnapshot snapshot, LLRange range) {
return Flux
.<Entry<byte[], byte[]>>push(sink -> {
//System.out.println(Thread.currentThread() + "\tPreparing Read rande item");
try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) {
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
@ -359,12 +363,10 @@ public class LLLocalDictionary implements LLDictionary {
if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) {
break;
}
//System.out.println(Thread.currentThread() + "\tRead rande item");
sink.next(Map.entry(key, rocksIterator.value()));
rocksIterator.next();
}
} finally {
//System.out.println(Thread.currentThread() + "\tFinish Read rande item");
sink.complete();
}
})
@ -374,7 +376,6 @@ public class LLLocalDictionary implements LLDictionary {
private Flux<List<Entry<byte[],byte[]>>> getRangeMultiGrouped(LLSnapshot snapshot, LLRange range, int prefixLength) {
return Flux
.<List<Entry<byte[], byte[]>>>push(sink -> {
//System.out.println(Thread.currentThread() + "\tPreparing Read rande item");
try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) {
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
@ -397,7 +398,6 @@ public class LLLocalDictionary implements LLDictionary {
currentGroupValues.add(Map.entry(key, rocksIterator.value()));
} else {
if (!currentGroupValues.isEmpty()) {
//System.out.println(Thread.currentThread() + "\tRead rande item");
sink.next(currentGroupValues);
}
firstGroupKey = key;
@ -406,11 +406,9 @@ public class LLLocalDictionary implements LLDictionary {
rocksIterator.next();
}
if (!currentGroupValues.isEmpty()) {
//System.out.println(Thread.currentThread() + "\tRead rande item");
sink.next(currentGroupValues);
}
} finally {
//System.out.println(Thread.currentThread() + "\tFinish Read rande item");
} finally {;
sink.complete();
}
})
@ -421,10 +419,8 @@ public class LLLocalDictionary implements LLDictionary {
public Flux<byte[]> getRangeKeys(@Nullable LLSnapshot snapshot, LLRange range) {
return Flux.defer(() -> {
if (range.isSingle()) {
//System.out.println(Thread.currentThread() + "getRangeKeys single");
return getRangeKeysSingle(snapshot, range.getMin()).doOnTerminate(() -> {}/*System.out.println(Thread.currentThread() + "getRangeKeys single end")*/);
return getRangeKeysSingle(snapshot, range.getMin());
} else {
//System.out.println(Thread.currentThread() + "getRangeKeys multi");
return getRangeKeysMulti(snapshot, range);
}
});
@ -434,7 +430,6 @@ public class LLLocalDictionary implements LLDictionary {
public Flux<List<byte[]>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) {
return Flux
.<List<byte[]>>push(sink -> {
//System.out.println(Thread.currentThread() + "\tPreparing Read rande item");
try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) {
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
@ -457,7 +452,6 @@ public class LLLocalDictionary implements LLDictionary {
currentGroupValues.add(key);
} else {
if (!currentGroupValues.isEmpty()) {
//System.out.println(Thread.currentThread() + "\tRead rande item");
sink.next(currentGroupValues);
}
firstGroupKey = key;
@ -467,11 +461,9 @@ public class LLLocalDictionary implements LLDictionary {
rocksIterator.next();
}
if (!currentGroupValues.isEmpty()) {
//System.out.println(Thread.currentThread() + "\tRead rande item");
sink.next(currentGroupValues);
}
} finally {
//System.out.println(Thread.currentThread() + "\tFinish Read rande item");
sink.complete();
}
})
@ -489,7 +481,6 @@ public class LLLocalDictionary implements LLDictionary {
private Flux<byte[]> getRangeKeysMulti(LLSnapshot snapshot, LLRange range) {
return Flux
.<byte[]>push(sink -> {
//System.out.println(Thread.currentThread() + "\tkPreparing Read rande item");
try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) {
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
@ -497,21 +488,17 @@ public class LLLocalDictionary implements LLDictionary {
rocksIterator.seekToFirst();
}
byte[] key;
sink.onRequest(l -> {}/*System.out.println(Thread.currentThread() + "\tkRequested " + l)*/);
while (rocksIterator.isValid()) {
key = rocksIterator.key();
if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) {
break;
}
//System.out.println(Thread.currentThread() + "\tkRead rande item");
sink.next(key);
rocksIterator.next();
}
} finally {
//System.out.println(Thread.currentThread() + "\tkFinish Read rande item");
sink.complete();
}
//System.out.println(Thread.currentThread() + "\tkFinish end Read rande item");
})
.subscribeOn(dbScheduler);
}
@ -645,6 +632,56 @@ public class LLLocalDictionary implements LLDictionary {
});
}
@Override
public Mono<Entry<byte[], byte[]>> getOne(@Nullable LLSnapshot snapshot, LLRange range) {
return Mono
.fromCallable(() -> {
try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) {
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
byte[] key;
if (rocksIterator.isValid()) {
key = rocksIterator.key();
if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) {
return null;
}
return Map.entry(key, rocksIterator.value());
} else {
return null;
}
}
})
.subscribeOn(dbScheduler);
}
@Override
public Mono<byte[]> getOneKey(@Nullable LLSnapshot snapshot, LLRange range) {
return Mono
.fromCallable(() -> {
try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) {
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
byte[] key;
if (rocksIterator.isValid()) {
key = rocksIterator.key();
if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) {
return null;
}
return key;
} else {
return null;
}
}
})
.subscribeOn(dbScheduler);
}
private long fastSizeAll(@Nullable LLSnapshot snapshot) {
var rocksdbSnapshot = resolveSnapshot(snapshot);
if (USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS || rocksdbSnapshot.snapshot() == null) {