Add example

This commit is contained in:
Andrea Cavalli 2021-01-31 15:47:48 +01:00
parent 345bc81252
commit cef2c796c0
10 changed files with 127 additions and 74 deletions

View File

@ -1,17 +1,50 @@
package it.cavallium.dbengine.client; package it.cavallium.dbengine.client;
import io.netty.buffer.Unpooled;
import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionary;
import it.cavallium.dbengine.database.collections.FixedLengthSerializer;
import it.cavallium.dbengine.database.collections.SubStageGetterSingle;
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection; import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.List; import java.util.List;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.One;
public class Example { public class Example {
public static void main(String[] args) { public static void main(String[] args) {
System.out.println("Test"); System.out.println("Test");
var ssg = new SubStageGetterSingle();
var ser = FixedLengthSerializer.noop(4);
var itemKey = new byte[] {0, 1, 2, 3};
var newValue = new byte[] {4, 5, 6, 7};
var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey);
One<Instant> instantFirst = Sinks.one();
One<Instant> instantSecond = Sinks.one();
One<Instant> instantThird = Sinks.one();
new LLLocalDatabaseConnection(Path.of("/tmp/"), true) new LLLocalDatabaseConnection(Path.of("/tmp/"), true)
.connect() .connect()
.flatMap(conn -> conn.getDatabase("testdb", List.of(Column.hashMap("testmap")), false)) .flatMap(conn -> conn.getDatabase("testdb", List.of(Column.dictionary("testmap")), false))
.block(); .flatMap(db -> db.getDictionary("testmap"))
.map(dictionary -> new DatabaseMapDictionary<>(dictionary, ssg, ser, 10))
.doOnSuccess(s -> System.out.println("Setting new value at key " + Arrays.toString(itemKey) + ": " + Arrays.toString(newValue)))
.doOnSuccess(s -> instantFirst.tryEmitValue(Instant.now()))
.flatMap(map -> map.at(null, itemKeyBuffer))
.doOnSuccess(s -> instantSecond.tryEmitValue(Instant.now()))
.flatMap(handle -> handle.setAndGetPrevious(newValue))
.doOnSuccess(s -> instantThird.tryEmitValue(Instant.now()))
.doOnSuccess(oldValue -> System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue))))
.then(Mono.zip(instantFirst.asMono(), instantSecond.asMono(), instantThird.asMono()))
.doOnSuccess(s -> {
System.out.println("Time to get value reference: " + Duration.between(s.getT1(), s.getT2()).toMillis() + "ms");
System.out.println("Time to set new value and get previous: " + Duration.between(s.getT2(), s.getT3()).toMillis() + "ms");
System.out.println("(Total time) " + Duration.between(s.getT1(), s.getT3()).toMillis() + "ms");
})
.blockOptional();
} }
} }

View File

@ -12,11 +12,12 @@ public class Column {
this.name = name; this.name = name;
} }
public static Column hashMap(String name) { public static Column dictionary(String name) {
return new Column("hash_map_" + name); return new Column("hash_map_" + name);
} }
public static Column fixedSet(String name) { @Deprecated
public static Column deprecatedSet(String name) {
return new Column("hash_set_" + name); return new Column("hash_set_" + name);
} }

View File

@ -3,41 +3,42 @@ package it.cavallium.dbengine.database;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import it.cavallium.dbengine.database.collections.DatabaseInt; import it.cavallium.dbengine.database.collections.DatabaseInt;
import it.cavallium.dbengine.database.collections.DatabaseLong;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import reactor.core.publisher.Mono;
public interface LLKeyValueDatabase extends Closeable, LLSnapshottable, LLKeyValueDatabaseStructure { public interface LLKeyValueDatabase extends Closeable, LLSnapshottable, LLKeyValueDatabaseStructure {
LLSingleton getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue) Mono<? extends LLSingleton> getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue);
throws IOException;
LLDictionary getDictionary(byte[] columnName) throws IOException; Mono<? extends LLDictionary> getDictionary(byte[] columnName);
default LLDictionary getSet(String name) throws IOException { @Deprecated
return getDictionary(Column.fixedSet(name).getName().getBytes(StandardCharsets.US_ASCII)); default Mono<? extends LLDictionary> getDeprecatedSet(String name) {
return getDictionary(Column.deprecatedSet(name).getName().getBytes(StandardCharsets.US_ASCII));
} }
default LLDictionary getMap(String name) throws IOException { default Mono<? extends LLDictionary> getDictionary(String name) {
return getDictionary(Column.hashMap(name).getName().getBytes(StandardCharsets.US_ASCII)); return getDictionary(Column.dictionary(name).getName().getBytes(StandardCharsets.US_ASCII));
} }
default DatabaseInt getInteger(String singletonListName, String name, int defaultValue) default Mono<DatabaseInt> getInteger(String singletonListName, String name, int defaultValue) {
throws IOException { return this
LLSingleton singleton = getSingleton( .getSingleton(Column.special(singletonListName).getName().getBytes(StandardCharsets.US_ASCII),
Column.special(singletonListName).getName().getBytes(StandardCharsets.US_ASCII), name.getBytes(StandardCharsets.US_ASCII),
name.getBytes(StandardCharsets.US_ASCII), Ints.toByteArray(defaultValue)); Ints.toByteArray(defaultValue)
return new DatabaseInt(singleton); )
.map(DatabaseInt::new);
} }
default DatabaseLong getLong(String singletonListName, String name, long defaultValue) default Mono<DatabaseInt> getLong(String singletonListName, String name, long defaultValue) {
throws IOException { return this
LLSingleton singleton = getSingleton( .getSingleton(Column.special(singletonListName).getName().getBytes(StandardCharsets.US_ASCII),
Column.special(singletonListName).getName().getBytes(StandardCharsets.US_ASCII), name.getBytes(StandardCharsets.US_ASCII),
name.getBytes(StandardCharsets.US_ASCII), Longs.toByteArray(defaultValue)); Longs.toByteArray(defaultValue)
return new DatabaseLong(singleton); )
.map(DatabaseInt::new);
} }
long getProperty(String propertyName) throws IOException; Mono<Long> getProperty(String propertyName);
} }

View File

@ -12,6 +12,7 @@ import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux; import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;
// todo: implement optimized methods // todo: implement optimized methods
public class DatabaseMapDictionary<T, U, US extends DatabaseStage<U>> implements DatabaseStageMap<T, U, US> { public class DatabaseMapDictionary<T, U, US extends DatabaseStage<U>> implements DatabaseStageMap<T, U, US> {
@ -67,16 +68,16 @@ public class DatabaseMapDictionary<T, U, US extends DatabaseStage<U>> implements
} }
@SuppressWarnings("unused") @SuppressWarnings("unused")
public DatabaseMapDictionary(LLDictionary dictionary, SubStageGetter<U, US> subStageGetter, FixedLengthSerializer<T> keySerializer, int keyLength, int keyExtLength) { public DatabaseMapDictionary(LLDictionary dictionary, SubStageGetter<U, US> subStageGetter, FixedLengthSerializer<T> keySerializer, int keyExtLength) {
this(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, keyLength, keyExtLength); this(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, keyExtLength);
} }
public DatabaseMapDictionary(LLDictionary dictionary, SubStageGetter<U, US> subStageGetter, FixedLengthSerializer<T> keySuffixSerializer, byte[] prefixKey, int keySuffixLength, int keyExtLength) { public DatabaseMapDictionary(LLDictionary dictionary, SubStageGetter<U, US> subStageGetter, FixedLengthSerializer<T> keySuffixSerializer, byte[] prefixKey, int keyExtLength) {
this.dictionary = dictionary; this.dictionary = dictionary;
this.subStageGetter = subStageGetter; this.subStageGetter = subStageGetter;
this.keySuffixSerializer = keySuffixSerializer; this.keySuffixSerializer = keySuffixSerializer;
this.keyPrefix = prefixKey; this.keyPrefix = prefixKey;
this.keySuffixLength = keySuffixLength; this.keySuffixLength = keySuffixSerializer.getLength();
this.keyExtLength = keyExtLength; this.keyExtLength = keyExtLength;
byte[] firstKey = firstKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength); byte[] firstKey = firstKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength);
byte[] lastKey = lastKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength); byte[] lastKey = lastKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength);
@ -176,17 +177,28 @@ public class DatabaseMapDictionary<T, U, US extends DatabaseStage<U>> implements
); );
} }
@Override
public Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) {
var newValues = entries
.flatMap(entry -> at(null, entry.getKey()).map(us -> Tuples.of(us, entry.getValue())))
.flatMap(tuple -> tuple.getT1().set(tuple.getT2()));
return getAllStages(null)
.flatMap(stage -> stage.getValue().get(null).map(val -> Map.entry(stage.getKey(), val)))
.concatWith(newValues.then(Mono.empty()));
}
//todo: temporary wrapper. convert the whole class to buffers //todo: temporary wrapper. convert the whole class to buffers
private T deserializeSuffix(byte[] keySuffix) { private T deserializeSuffix(byte[] keySuffix) {
var serialized = Unpooled.wrappedBuffer(keySuffix); var serialized = Unpooled.wrappedBuffer(keySuffix);
return keySuffixSerializer.deserialize(serialized, keySuffixLength); return keySuffixSerializer.deserialize(serialized);
} }
//todo: temporary wrapper. convert the whole class to buffers //todo: temporary wrapper. convert the whole class to buffers
private byte[] serializeSuffix(T keySuffix) { private byte[] serializeSuffix(T keySuffix) {
var output = Unpooled.buffer(keySuffixLength, keySuffixLength); var output = Unpooled.buffer(keySuffixLength, keySuffixLength);
var outputBytes = new byte[keySuffixLength]; var outputBytes = new byte[keySuffixLength];
keySuffixSerializer.serialize(keySuffix, output, keySuffixLength); keySuffixSerializer.serialize(keySuffix, output);
output.getBytes(0, outputBytes, 0, keySuffixLength); output.getBytes(0, outputBytes, 0, keySuffixLength);
return outputBytes; return outputBytes;
} }

View File

@ -121,4 +121,9 @@ public class DatabaseMapDictionaryRange implements DatabaseStageMap<byte[], byte
.map(this::stripPrefix) .map(this::stripPrefix)
.map(keySuffix -> Map.entry(keySuffix, new DatabaseSingle(dictionary, toKey(keySuffix)))); .map(keySuffix -> Map.entry(keySuffix, new DatabaseSingle(dictionary, toKey(keySuffix))));
} }
@Override
public Flux<Entry<byte[], byte[]>> setAllValuesAndGetPrevious(Flux<Entry<byte[], byte[]>> entries) {
return dictionary.setRange(range, Flux.empty(), true);
}
} }

View File

@ -69,12 +69,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
return setAllValuesAndGetPrevious(entries).then(); return setAllValuesAndGetPrevious(entries).then();
} }
default Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) { Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries);
return this
.clear()
.thenMany(entries)
.flatMap(entry -> this.putValue(entry.getKey(), entry.getValue()).thenReturn(entry));
}
default Mono<Void> clear() { default Mono<Void> clear() {
return setAllValues(Flux.empty()); return setAllValues(Flux.empty());

View File

@ -4,21 +4,28 @@ import io.netty.buffer.ByteBuf;
public interface FixedLengthSerializer<B> { public interface FixedLengthSerializer<B> {
B deserialize(ByteBuf serialized, int length); B deserialize(ByteBuf serialized);
void serialize(B deserialized, ByteBuf output, int length); void serialize(B deserialized, ByteBuf output);
static FixedLengthSerializer<ByteBuf> noop() { int getLength();
static FixedLengthSerializer<ByteBuf> noop(int length) {
return new FixedLengthSerializer<>() { return new FixedLengthSerializer<>() {
@Override @Override
public ByteBuf deserialize(ByteBuf serialized, int length) { public ByteBuf deserialize(ByteBuf serialized) {
return serialized.readSlice(length); return serialized.readSlice(length);
} }
@Override @Override
public void serialize(ByteBuf deserialized, ByteBuf output, int length) { public void serialize(ByteBuf deserialized, ByteBuf output) {
output.writeBytes(deserialized, length); output.writeBytes(deserialized, length);
} }
@Override
public int getLength() {
return length;
}
}; };
} }
} }

View File

@ -12,16 +12,13 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
private final SubStageGetter<U, US> subStageGetter; private final SubStageGetter<U, US> subStageGetter;
private final FixedLengthSerializer<T> keySerializer; private final FixedLengthSerializer<T> keySerializer;
private final int keyLength;
private final int keyExtLength; private final int keyExtLength;
public SubStageGetterMapDeep(SubStageGetter<U, US> subStageGetter, public SubStageGetterMapDeep(SubStageGetter<U, US> subStageGetter,
FixedLengthSerializer<T> keySerializer, FixedLengthSerializer<T> keySerializer,
int keyLength,
int keyExtLength) { int keyExtLength) {
this.subStageGetter = subStageGetter; this.subStageGetter = subStageGetter;
this.keySerializer = keySerializer; this.keySerializer = keySerializer;
this.keyLength = keyLength;
this.keyExtLength = keyExtLength; this.keyExtLength = keyExtLength;
} }
@ -34,7 +31,6 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
subStageGetter, subStageGetter,
keySerializer, keySerializer,
prefixKey, prefixKey,
keyLength,
keyExtLength keyExtLength
)); ));
} }

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.collections;
import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionary;
import java.util.Arrays;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@ -13,6 +14,11 @@ public class SubStageGetterSingle implements SubStageGetter<byte[], DatabaseStag
@Nullable CompositeSnapshot snapshot, @Nullable CompositeSnapshot snapshot,
byte[] keyPrefix, byte[] keyPrefix,
Flux<byte[]> keyFlux) { Flux<byte[]> keyFlux) {
return keyFlux.single().map(key -> new DatabaseSingle(dictionary, key)); return keyFlux.singleOrEmpty().flatMap(key -> Mono.fromCallable(() -> {
if (!Arrays.equals(keyPrefix, key)) {
throw new IndexOutOfBoundsException("Found more than one element!");
}
return null;
})).thenReturn(new DatabaseSingle(dictionary, keyPrefix));
} }
} }

View File

@ -1,9 +1,7 @@
package it.cavallium.dbengine.database.disk; package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLSnapshot;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -293,36 +291,35 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
} }
@Override @Override
public LLSingleton getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue) public Mono<LLLocalSingleton> getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue) {
throws IOException { return Mono
try { .fromCallable(() -> new LLLocalSingleton(db,
return new LLLocalSingleton(db, handles.get(Column.special(Column.toString(singletonListColumnName))),
handles.get(Column.special(Column.toString(singletonListColumnName))), (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()),
(snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()), LLLocalKeyValueDatabase.this.name,
LLLocalKeyValueDatabase.this.name, name,
name, defaultValue
defaultValue); ))
} catch (RocksDBException e) { .onErrorMap(IOException::new)
throw new IOException(e); .subscribeOn(Schedulers.boundedElastic());
}
} }
@Override @Override
public LLDictionary getDictionary(byte[] columnName) { public Mono<LLLocalDictionary> getDictionary(byte[] columnName) {
return new LLLocalDictionary(db, return Mono
handles.get(Column.special(Column.toString(columnName))), .fromCallable(() -> new LLLocalDictionary(db,
name, handles.get(Column.special(Column.toString(columnName))),
(snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()) name,
); (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber())
))
.subscribeOn(Schedulers.boundedElastic());
} }
@Override @Override
public long getProperty(String propertyName) throws IOException { public Mono<Long> getProperty(String propertyName) {
try { return Mono.fromCallable(() -> db.getAggregatedLongProperty(propertyName))
return db.getAggregatedLongProperty(propertyName); .onErrorMap(IOException::new)
} catch (RocksDBException exception) { .subscribeOn(Schedulers.boundedElastic());
throw new IOException(exception);
}
} }
@Override @Override