diff --git a/src/example/java/it.cavallium.dbengine.client/Example.java b/src/example/java/it.cavallium.dbengine.client/Example.java index 2046506..689ed70 100644 --- a/src/example/java/it.cavallium.dbengine.client/Example.java +++ b/src/example/java/it.cavallium.dbengine.client/Example.java @@ -1,17 +1,50 @@ package it.cavallium.dbengine.client; +import io.netty.buffer.Unpooled; import it.cavallium.dbengine.database.Column; +import it.cavallium.dbengine.database.collections.DatabaseMapDictionary; +import it.cavallium.dbengine.database.collections.FixedLengthSerializer; +import it.cavallium.dbengine.database.collections.SubStageGetterSingle; import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection; import java.nio.file.Path; +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; import java.util.List; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.One; public class Example { public static void main(String[] args) { System.out.println("Test"); + var ssg = new SubStageGetterSingle(); + var ser = FixedLengthSerializer.noop(4); + var itemKey = new byte[] {0, 1, 2, 3}; + var newValue = new byte[] {4, 5, 6, 7}; + var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey); + One instantFirst = Sinks.one(); + One instantSecond = Sinks.one(); + One instantThird = Sinks.one(); new LLLocalDatabaseConnection(Path.of("/tmp/"), true) .connect() - .flatMap(conn -> conn.getDatabase("testdb", List.of(Column.hashMap("testmap")), false)) - .block(); + .flatMap(conn -> conn.getDatabase("testdb", List.of(Column.dictionary("testmap")), false)) + .flatMap(db -> db.getDictionary("testmap")) + .map(dictionary -> new DatabaseMapDictionary<>(dictionary, ssg, ser, 10)) + .doOnSuccess(s -> System.out.println("Setting new value at key " + Arrays.toString(itemKey) + ": " + Arrays.toString(newValue))) + .doOnSuccess(s -> instantFirst.tryEmitValue(Instant.now())) + .flatMap(map -> map.at(null, itemKeyBuffer)) + .doOnSuccess(s -> instantSecond.tryEmitValue(Instant.now())) + .flatMap(handle -> handle.setAndGetPrevious(newValue)) + .doOnSuccess(s -> instantThird.tryEmitValue(Instant.now())) + .doOnSuccess(oldValue -> System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue)))) + .then(Mono.zip(instantFirst.asMono(), instantSecond.asMono(), instantThird.asMono())) + .doOnSuccess(s -> { + System.out.println("Time to get value reference: " + Duration.between(s.getT1(), s.getT2()).toMillis() + "ms"); + System.out.println("Time to set new value and get previous: " + Duration.between(s.getT2(), s.getT3()).toMillis() + "ms"); + System.out.println("(Total time) " + Duration.between(s.getT1(), s.getT3()).toMillis() + "ms"); + }) + .blockOptional(); } } \ No newline at end of file diff --git a/src/main/java/it/cavallium/dbengine/database/Column.java b/src/main/java/it/cavallium/dbengine/database/Column.java index 09943b8..bcf013a 100644 --- a/src/main/java/it/cavallium/dbengine/database/Column.java +++ b/src/main/java/it/cavallium/dbengine/database/Column.java @@ -12,11 +12,12 @@ public class Column { this.name = name; } - public static Column hashMap(String name) { + public static Column dictionary(String name) { return new Column("hash_map_" + name); } - public static Column fixedSet(String name) { + @Deprecated + public static Column deprecatedSet(String name) { return new Column("hash_set_" + name); } diff --git a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java index 8191db4..d0ef215 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java @@ -3,41 +3,42 @@ package it.cavallium.dbengine.database; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import it.cavallium.dbengine.database.collections.DatabaseInt; -import it.cavallium.dbengine.database.collections.DatabaseLong; import java.io.Closeable; -import java.io.IOException; import java.nio.charset.StandardCharsets; +import reactor.core.publisher.Mono; public interface LLKeyValueDatabase extends Closeable, LLSnapshottable, LLKeyValueDatabaseStructure { - LLSingleton getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue) - throws IOException; + Mono getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue); - LLDictionary getDictionary(byte[] columnName) throws IOException; + Mono getDictionary(byte[] columnName); - default LLDictionary getSet(String name) throws IOException { - return getDictionary(Column.fixedSet(name).getName().getBytes(StandardCharsets.US_ASCII)); + @Deprecated + default Mono getDeprecatedSet(String name) { + return getDictionary(Column.deprecatedSet(name).getName().getBytes(StandardCharsets.US_ASCII)); } - default LLDictionary getMap(String name) throws IOException { - return getDictionary(Column.hashMap(name).getName().getBytes(StandardCharsets.US_ASCII)); + default Mono getDictionary(String name) { + return getDictionary(Column.dictionary(name).getName().getBytes(StandardCharsets.US_ASCII)); } - default DatabaseInt getInteger(String singletonListName, String name, int defaultValue) - throws IOException { - LLSingleton singleton = getSingleton( - Column.special(singletonListName).getName().getBytes(StandardCharsets.US_ASCII), - name.getBytes(StandardCharsets.US_ASCII), Ints.toByteArray(defaultValue)); - return new DatabaseInt(singleton); + default Mono getInteger(String singletonListName, String name, int defaultValue) { + return this + .getSingleton(Column.special(singletonListName).getName().getBytes(StandardCharsets.US_ASCII), + name.getBytes(StandardCharsets.US_ASCII), + Ints.toByteArray(defaultValue) + ) + .map(DatabaseInt::new); } - default DatabaseLong getLong(String singletonListName, String name, long defaultValue) - throws IOException { - LLSingleton singleton = getSingleton( - Column.special(singletonListName).getName().getBytes(StandardCharsets.US_ASCII), - name.getBytes(StandardCharsets.US_ASCII), Longs.toByteArray(defaultValue)); - return new DatabaseLong(singleton); + default Mono getLong(String singletonListName, String name, long defaultValue) { + return this + .getSingleton(Column.special(singletonListName).getName().getBytes(StandardCharsets.US_ASCII), + name.getBytes(StandardCharsets.US_ASCII), + Longs.toByteArray(defaultValue) + ) + .map(DatabaseInt::new); } - long getProperty(String propertyName) throws IOException; + Mono getProperty(String propertyName); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 6839d76..2d97294 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -12,6 +12,7 @@ import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.GroupedFlux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuples; // todo: implement optimized methods public class DatabaseMapDictionary> implements DatabaseStageMap { @@ -67,16 +68,16 @@ public class DatabaseMapDictionary> implements } @SuppressWarnings("unused") - public DatabaseMapDictionary(LLDictionary dictionary, SubStageGetter subStageGetter, FixedLengthSerializer keySerializer, int keyLength, int keyExtLength) { - this(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, keyLength, keyExtLength); + public DatabaseMapDictionary(LLDictionary dictionary, SubStageGetter subStageGetter, FixedLengthSerializer keySerializer, int keyExtLength) { + this(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, keyExtLength); } - public DatabaseMapDictionary(LLDictionary dictionary, SubStageGetter subStageGetter, FixedLengthSerializer keySuffixSerializer, byte[] prefixKey, int keySuffixLength, int keyExtLength) { + public DatabaseMapDictionary(LLDictionary dictionary, SubStageGetter subStageGetter, FixedLengthSerializer keySuffixSerializer, byte[] prefixKey, int keyExtLength) { this.dictionary = dictionary; this.subStageGetter = subStageGetter; this.keySuffixSerializer = keySuffixSerializer; this.keyPrefix = prefixKey; - this.keySuffixLength = keySuffixLength; + this.keySuffixLength = keySuffixSerializer.getLength(); this.keyExtLength = keyExtLength; byte[] firstKey = firstKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength); byte[] lastKey = lastKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength); @@ -176,17 +177,28 @@ public class DatabaseMapDictionary> implements ); } + @Override + public Flux> setAllValuesAndGetPrevious(Flux> entries) { + var newValues = entries + .flatMap(entry -> at(null, entry.getKey()).map(us -> Tuples.of(us, entry.getValue()))) + .flatMap(tuple -> tuple.getT1().set(tuple.getT2())); + + return getAllStages(null) + .flatMap(stage -> stage.getValue().get(null).map(val -> Map.entry(stage.getKey(), val))) + .concatWith(newValues.then(Mono.empty())); + } + //todo: temporary wrapper. convert the whole class to buffers private T deserializeSuffix(byte[] keySuffix) { var serialized = Unpooled.wrappedBuffer(keySuffix); - return keySuffixSerializer.deserialize(serialized, keySuffixLength); + return keySuffixSerializer.deserialize(serialized); } //todo: temporary wrapper. convert the whole class to buffers private byte[] serializeSuffix(T keySuffix) { var output = Unpooled.buffer(keySuffixLength, keySuffixLength); var outputBytes = new byte[keySuffixLength]; - keySuffixSerializer.serialize(keySuffix, output, keySuffixLength); + keySuffixSerializer.serialize(keySuffix, output); output.getBytes(0, outputBytes, 0, keySuffixLength); return outputBytes; } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryRange.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryRange.java index 0e63a24..e635245 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryRange.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryRange.java @@ -121,4 +121,9 @@ public class DatabaseMapDictionaryRange implements DatabaseStageMap Map.entry(keySuffix, new DatabaseSingle(dictionary, toKey(keySuffix)))); } + + @Override + public Flux> setAllValuesAndGetPrevious(Flux> entries) { + return dictionary.setRange(range, Flux.empty(), true); + } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index e01fb36..ffbd3e1 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -69,12 +69,7 @@ public interface DatabaseStageMap> extends Dat return setAllValuesAndGetPrevious(entries).then(); } - default Flux> setAllValuesAndGetPrevious(Flux> entries) { - return this - .clear() - .thenMany(entries) - .flatMap(entry -> this.putValue(entry.getKey(), entry.getValue()).thenReturn(entry)); - } + Flux> setAllValuesAndGetPrevious(Flux> entries); default Mono clear() { return setAllValues(Flux.empty()); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/FixedLengthSerializer.java b/src/main/java/it/cavallium/dbengine/database/collections/FixedLengthSerializer.java index 29546d8..81b0c36 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/FixedLengthSerializer.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/FixedLengthSerializer.java @@ -4,21 +4,28 @@ import io.netty.buffer.ByteBuf; public interface FixedLengthSerializer { - B deserialize(ByteBuf serialized, int length); + B deserialize(ByteBuf serialized); - void serialize(B deserialized, ByteBuf output, int length); + void serialize(B deserialized, ByteBuf output); - static FixedLengthSerializer noop() { + int getLength(); + + static FixedLengthSerializer noop(int length) { return new FixedLengthSerializer<>() { @Override - public ByteBuf deserialize(ByteBuf serialized, int length) { + public ByteBuf deserialize(ByteBuf serialized) { return serialized.readSlice(length); } @Override - public void serialize(ByteBuf deserialized, ByteBuf output, int length) { + public void serialize(ByteBuf deserialized, ByteBuf output) { output.writeBytes(deserialized, length); } + + @Override + public int getLength() { + return length; + } }; } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java index 1861807..db8ea82 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java @@ -12,16 +12,13 @@ public class SubStageGetterMapDeep> implements private final SubStageGetter subStageGetter; private final FixedLengthSerializer keySerializer; - private final int keyLength; private final int keyExtLength; public SubStageGetterMapDeep(SubStageGetter subStageGetter, FixedLengthSerializer keySerializer, - int keyLength, int keyExtLength) { this.subStageGetter = subStageGetter; this.keySerializer = keySerializer; - this.keyLength = keyLength; this.keyExtLength = keyExtLength; } @@ -34,7 +31,6 @@ public class SubStageGetterMapDeep> implements subStageGetter, keySerializer, prefixKey, - keyLength, keyExtLength )); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java index 7638d7c..5fda63c 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java @@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.collections; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; +import java.util.Arrays; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -13,6 +14,11 @@ public class SubStageGetterSingle implements SubStageGetter keyFlux) { - return keyFlux.single().map(key -> new DatabaseSingle(dictionary, key)); + return keyFlux.singleOrEmpty().flatMap(key -> Mono.fromCallable(() -> { + if (!Arrays.equals(keyPrefix, key)) { + throw new IndexOutOfBoundsException("Found more than one element!"); + } + return null; + })).thenReturn(new DatabaseSingle(dictionary, keyPrefix)); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 268d80d..2d527b7 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -1,9 +1,7 @@ package it.cavallium.dbengine.database.disk; import it.cavallium.dbengine.database.Column; -import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLKeyValueDatabase; -import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSnapshot; import java.io.File; import java.io.IOException; @@ -293,36 +291,35 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } @Override - public LLSingleton getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue) - throws IOException { - try { - return new LLLocalSingleton(db, - handles.get(Column.special(Column.toString(singletonListColumnName))), - (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()), - LLLocalKeyValueDatabase.this.name, - name, - defaultValue); - } catch (RocksDBException e) { - throw new IOException(e); - } + public Mono getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue) { + return Mono + .fromCallable(() -> new LLLocalSingleton(db, + handles.get(Column.special(Column.toString(singletonListColumnName))), + (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()), + LLLocalKeyValueDatabase.this.name, + name, + defaultValue + )) + .onErrorMap(IOException::new) + .subscribeOn(Schedulers.boundedElastic()); } @Override - public LLDictionary getDictionary(byte[] columnName) { - return new LLLocalDictionary(db, - handles.get(Column.special(Column.toString(columnName))), - name, - (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()) - ); + public Mono getDictionary(byte[] columnName) { + return Mono + .fromCallable(() -> new LLLocalDictionary(db, + handles.get(Column.special(Column.toString(columnName))), + name, + (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()) + )) + .subscribeOn(Schedulers.boundedElastic()); } @Override - public long getProperty(String propertyName) throws IOException { - try { - return db.getAggregatedLongProperty(propertyName); - } catch (RocksDBException exception) { - throw new IOException(exception); - } + public Mono getProperty(String propertyName) { + return Mono.fromCallable(() -> db.getAggregatedLongProperty(propertyName)) + .onErrorMap(IOException::new) + .subscribeOn(Schedulers.boundedElastic()); } @Override