From 241b3fbee191b4da8f06830cb6514dca2f85ddc7 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 30 Jan 2021 00:24:55 +0100 Subject: [PATCH] Asynchronous LLDictionary --- .../dbengine/database/LLDeepDictionary.java | 70 -- .../dbengine/database/LLDictionary.java | 50 +- .../dbengine/database/LLKeyValueDatabase.java | 30 +- .../cavallium/dbengine/database/LLRange.java | 87 ++ .../database/disk/LLLocalDeepDictionary.java | 918 ------------------ .../database/disk/LLLocalDictionary.java | 662 ++++++++----- .../disk/LLLocalKeyValueDatabase.java | 33 +- .../database/structures/LLDeepMap.java | 162 ---- .../database/structures/LLFixedDeepSet.java | 210 ---- .../dbengine/database/structures/LLMap.java | 118 --- .../dbengine/database/structures/LLSet.java | 105 -- 11 files changed, 566 insertions(+), 1879 deletions(-) delete mode 100644 src/main/java/it/cavallium/dbengine/database/LLDeepDictionary.java create mode 100644 src/main/java/it/cavallium/dbengine/database/LLRange.java delete mode 100644 src/main/java/it/cavallium/dbengine/database/disk/LLLocalDeepDictionary.java delete mode 100644 src/main/java/it/cavallium/dbengine/database/structures/LLDeepMap.java delete mode 100644 src/main/java/it/cavallium/dbengine/database/structures/LLFixedDeepSet.java delete mode 100644 src/main/java/it/cavallium/dbengine/database/structures/LLMap.java delete mode 100644 src/main/java/it/cavallium/dbengine/database/structures/LLSet.java diff --git a/src/main/java/it/cavallium/dbengine/database/LLDeepDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDeepDictionary.java deleted file mode 100644 index 6136d4f..0000000 --- a/src/main/java/it/cavallium/dbengine/database/LLDeepDictionary.java +++ /dev/null @@ -1,70 +0,0 @@ -package it.cavallium.dbengine.database; - -import java.io.IOException; -import java.util.Map.Entry; -import java.util.Optional; -import java.util.function.Consumer; -import org.apache.commons.lang3.tuple.ImmutableTriple; -import org.jetbrains.annotations.Nullable; -import org.warp.commonutils.concurrency.atomicity.NotAtomic; -import org.warp.commonutils.functional.CancellableBiConsumer; -import org.warp.commonutils.functional.CancellableBiFunction; -import org.warp.commonutils.functional.CancellableTriConsumer; -import org.warp.commonutils.functional.CancellableTriFunction; -import org.warp.commonutils.functional.ConsumerResult; -import org.warp.commonutils.type.Bytes; -import org.warp.commonutils.type.UnmodifiableIterableMap; -import org.warp.commonutils.type.UnmodifiableMap; - -@NotAtomic -public interface LLDeepDictionary extends LLKeyValueDatabaseStructure { - - UnmodifiableIterableMap get(@Nullable LLSnapshot snapshot, byte[] key1) throws IOException; - - Optional get(@Nullable LLSnapshot snapshot, byte[] key1, byte[] key2) throws IOException; - - - boolean isEmpty(@Nullable LLSnapshot snapshot, byte[] key1); - - boolean contains(@Nullable LLSnapshot snapshot, byte[] key1, byte[] key2) throws IOException; - - /** - * Note: this will remove previous elements because it replaces the entire map of key - */ - void put(byte[] key1, UnmodifiableIterableMap value) throws IOException; - - Optional put(byte[] key1, byte[] key2, byte[] value, LLDictionaryResultType resultType) throws IOException; - - - void putMulti(byte[][] keys1, UnmodifiableIterableMap[] values) throws IOException; - - void putMulti(byte[] key1, byte[][] keys2, byte[][] values, LLDictionaryResultType resultType, Consumer responses) throws IOException; - - void putMulti(byte[][] keys1, byte[][] keys2, byte[][] values, LLDictionaryResultType resultType, Consumer responses) throws IOException; - - - void clear() throws IOException; - - Optional> clear(byte[] key1, LLDictionaryResultType resultType) throws IOException; - - Optional remove(byte[] key1, byte[] key2, LLDictionaryResultType resultType) throws IOException; - - - ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableTriConsumer consumer); - - ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer> consumer); - - ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key1, CancellableBiConsumer consumer); - - - ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableTriFunction> consumer) throws IOException; - - ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction, Entry>> consumer) throws IOException; - - ConsumerResult replaceAll(int parallelism, boolean replaceKeys, byte[] key1, CancellableBiFunction> consumer) throws IOException; - - - long size(@Nullable LLSnapshot snapshot, boolean fast) throws IOException; - - long exactSize(@Nullable LLSnapshot snapshot, byte[] key1); -} diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java index 39195fb..5a3507c 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java @@ -1,45 +1,45 @@ package it.cavallium.dbengine.database; -import java.io.IOException; import java.util.Map.Entry; -import java.util.Optional; -import java.util.function.Consumer; +import java.util.function.Function; import org.jetbrains.annotations.Nullable; import org.warp.commonutils.concurrency.atomicity.NotAtomic; -import org.warp.commonutils.functional.CancellableBiConsumer; -import org.warp.commonutils.functional.CancellableBiFunction; -import org.warp.commonutils.functional.ConsumerResult; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; @NotAtomic public interface LLDictionary extends LLKeyValueDatabaseStructure { - Optional get(@Nullable LLSnapshot snapshot, byte[] key) throws IOException; + Mono get(@Nullable LLSnapshot snapshot, byte[] key); - boolean contains(@Nullable LLSnapshot snapshot, byte[] key) throws IOException; + Mono put(byte[] key, byte[] value, LLDictionaryResultType resultType); - Optional put(byte[] key, byte[] value, LLDictionaryResultType resultType) - throws IOException; + Mono remove(byte[] key, LLDictionaryResultType resultType); - void putMulti(byte[][] key, byte[][] value, LLDictionaryResultType resultType, - Consumer responses) throws IOException; + Flux> getMulti(@Nullable LLSnapshot snapshot, Flux keys); - Optional remove(byte[] key, LLDictionaryResultType resultType) throws IOException; + Flux> putMulti(Flux> entries, boolean getOldValues); - /** - * This method can call the consumer from different threads in parallel - */ - ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer consumer); + Flux> getRange(@Nullable LLSnapshot snapshot, LLRange range); - /** - * This method can call the consumer from different threads in parallel - */ - ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction> consumer) throws IOException; + Flux> setRange(LLRange range, Flux> entries, boolean getOldValues); - void clear() throws IOException; + default Mono replaceRange(LLRange range, boolean canKeysChange, Function, Mono>> entriesReplacer) { + Flux> replacedFlux = this.getRange(null, range).flatMap(entriesReplacer); + if (canKeysChange) { + return this + .setRange(range, replacedFlux, false) + .then(); + } else { + return this + .putMulti(replacedFlux, false) + .then(); + } + } - long size(@Nullable LLSnapshot snapshot, boolean fast) throws IOException; + Mono isRangeEmpty(@Nullable LLSnapshot snapshot, LLRange range); - boolean isEmpty(@Nullable LLSnapshot snapshot) throws IOException; + Mono sizeRange(@Nullable LLSnapshot snapshot, LLRange range, boolean fast); - Optional> removeOne() throws IOException; + Mono> removeOne(LLRange range); } diff --git a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java index 8372ae3..94f908e 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java @@ -2,15 +2,11 @@ package it.cavallium.dbengine.database; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; -import it.cavallium.dbengine.database.structures.LLDeepMap; import java.io.Closeable; import java.io.IOException; import java.nio.charset.StandardCharsets; -import it.cavallium.dbengine.database.structures.LLFixedDeepSet; import it.cavallium.dbengine.database.structures.LLInt; import it.cavallium.dbengine.database.structures.LLLong; -import it.cavallium.dbengine.database.structures.LLMap; -import it.cavallium.dbengine.database.structures.LLSet; public interface LLKeyValueDatabase extends Closeable, LLSnapshottable, LLKeyValueDatabaseStructure { @@ -19,30 +15,12 @@ public interface LLKeyValueDatabase extends Closeable, LLSnapshottable, LLKeyVal LLDictionary getDictionary(byte[] columnName) throws IOException; - LLDeepDictionary getDeepDictionary(byte[] columnName, int keySize, int key2Size) throws IOException; - - default LLSet getSet(String name) throws IOException { - LLDictionary dictionary = getDictionary( - Column.fixedSet(name).getName().getBytes(StandardCharsets.US_ASCII)); - return new LLSet(dictionary); + default LLDictionary getSet(String name) throws IOException { + return getDictionary(Column.fixedSet(name).getName().getBytes(StandardCharsets.US_ASCII)); } - default LLMap getMap(String name) throws IOException { - LLDictionary dictionary = getDictionary( - Column.hashMap(name).getName().getBytes(StandardCharsets.US_ASCII)); - return new LLMap(dictionary); - } - - default LLFixedDeepSet getDeepSet(String name, int keySize, int key2Size) throws IOException { - LLDeepDictionary deepDictionary = getDeepDictionary( - Column.fixedSet(name).getName().getBytes(StandardCharsets.US_ASCII), keySize, key2Size); - return new LLFixedDeepSet(deepDictionary); - } - - default LLDeepMap getDeepMap(String name, int keySize, int key2Size) throws IOException { - LLDeepDictionary deepDictionary = getDeepDictionary( - Column.hashMap(name).getName().getBytes(StandardCharsets.US_ASCII), keySize, key2Size); - return new LLDeepMap(deepDictionary); + default LLDictionary getMap(String name) throws IOException { + return getDictionary(Column.hashMap(name).getName().getBytes(StandardCharsets.US_ASCII)); } default LLInt getInteger(String singletonListName, String name, int defaultValue) diff --git a/src/main/java/it/cavallium/dbengine/database/LLRange.java b/src/main/java/it/cavallium/dbengine/database/LLRange.java new file mode 100644 index 0000000..72f5202 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/LLRange.java @@ -0,0 +1,87 @@ +package it.cavallium.dbengine.database; + +import java.util.Arrays; +import java.util.StringJoiner; + +public class LLRange { + + private static final LLRange RANGE_ALL = new LLRange(null, null); + private final byte[] min; + private final byte[] max; + + private LLRange(byte[] min, byte[] max) { + this.min = min; + this.max = max; + } + + public static LLRange all() { + return RANGE_ALL; + } + + public static LLRange from(byte[] min) { + return new LLRange(min, null); + } + + public static LLRange to(byte[] max) { + return new LLRange(null, max); + } + + public boolean isAll() { + return min == null && max == null; + } + + public boolean isSingle() { + if (min == null || max == null) return false; + return Arrays.equals(min, max); + } + + public boolean hasMin() { + return min != null; + } + + public byte[] getMin() { + assert min != null; + return min; + } + + public boolean hasMax() { + return max != null; + } + + public byte[] getMax() { + assert max != null; + return max; + } + + public byte[] getSingle() { + assert isSingle(); + return min; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + LLRange llRange = (LLRange) o; + return Arrays.equals(min, llRange.min) && Arrays.equals(max, llRange.max); + } + + @Override + public int hashCode() { + int result = Arrays.hashCode(min); + result = 31 * result + Arrays.hashCode(max); + return result; + } + + @Override + public String toString() { + return new StringJoiner(", ", LLRange.class.getSimpleName() + "[", "]") + .add("min=" + Arrays.toString(min)) + .add("max=" + Arrays.toString(max)) + .toString(); + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDeepDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDeepDictionary.java deleted file mode 100644 index c81d827..0000000 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDeepDictionary.java +++ /dev/null @@ -1,918 +0,0 @@ -package it.cavallium.dbengine.database.disk; - -import it.cavallium.dbengine.database.LLDeepDictionary; -import it.cavallium.dbengine.database.LLDictionaryResultType; -import it.cavallium.dbengine.database.LLSnapshot; -import it.cavallium.dbengine.database.LLUtils; -import it.unimi.dsi.fastutil.objects.ObjectArrayList; -import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map.Entry; -import java.util.Objects; -import java.util.Optional; -import java.util.function.Consumer; -import java.util.function.Function; -import org.apache.commons.lang3.tuple.ImmutableTriple; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.FlushOptions; -import org.rocksdb.Holder; -import org.rocksdb.ReadOptions; -import org.rocksdb.RocksDB; -import org.rocksdb.RocksDBException; -import org.rocksdb.RocksIterator; -import org.rocksdb.Snapshot; -import org.rocksdb.WriteBatchInterface; -import org.warp.commonutils.concurrency.atomicity.NotAtomic; -import org.warp.commonutils.error.IndexOutOfBoundsException; -import org.warp.commonutils.functional.CancellableBiConsumer; -import org.warp.commonutils.functional.CancellableBiFunction; -import org.warp.commonutils.functional.CancellableTriConsumer; -import org.warp.commonutils.functional.CancellableTriFunction; -import org.warp.commonutils.functional.ConsumerResult; -import org.warp.commonutils.type.Bytes; -import org.warp.commonutils.type.UnmodifiableIterableMap; -import org.warp.commonutils.type.UnmodifiableMap; - -@NotAtomic -public class LLLocalDeepDictionary implements LLDeepDictionary { - - private static final byte[] NO_DATA = new byte[0]; - private static final byte[][] NO_DATA_MAP = new byte[0][0]; - private static final ReadOptions EMPTY_READ_OPTIONS = new ReadOptions(); - private final RocksDB db; - private final ColumnFamilyHandle cfh; - private final String databaseName; - private final Function snapshotResolver; - private final int key1Size; - private final int key2Size; - private final int key1Position; - private final int key2Position; - private final int key1EndPosition; - private final int key2EndPosition; - private final int combinedKeySize; - - public LLLocalDeepDictionary(@NotNull RocksDB db, @NotNull ColumnFamilyHandle columnFamilyHandle, - String databaseName, - Function snapshotResolver, int keySize, int key2Size) { - Objects.requireNonNull(db); - this.db = db; - Objects.requireNonNull(columnFamilyHandle); - this.cfh = columnFamilyHandle; - this.databaseName = databaseName; - this.snapshotResolver = snapshotResolver; - this.key1Size = keySize; - this.key2Size = key2Size; - this.key1Position = 0; - this.key2Position = key1Size; - this.key1EndPosition = key1Position + key1Size; - this.key2EndPosition = key2Position + key2Size; - this.combinedKeySize = keySize + key2Size; - } - - @Override - public String getDatabaseName() { - return databaseName; - } - - @SuppressWarnings("BooleanMethodIsAlwaysInverted") - private boolean isSubKey(byte[] key1, byte[] combinedKey) { - if (key1 == null || combinedKey == null || key1.length != key1Size || combinedKey.length != combinedKeySize) { - return false; - } - - return Arrays.equals(key1, 0, key1Size, combinedKey, key1Position, key1EndPosition); - } - - private byte[] getStartSeekKey(byte[] key1) { - if (key1.length != key1Size) { - throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size); - } - return Arrays.copyOf(key1, combinedKeySize); - } - - private byte[] getEndSeekKey(byte[] key1) { - if (key1.length != key1Size) { - throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size); - } - byte[] endSeekKey = Arrays.copyOf(key1, combinedKeySize); - Arrays.fill(endSeekKey, key2Position, key2EndPosition, (byte) 0xFF); - return endSeekKey; - } - - @NotNull - private byte[] getKey1(@NotNull byte[] combinedKey) { - if (combinedKey.length != combinedKeySize) { - throw new IndexOutOfBoundsException(combinedKey.length, combinedKeySize, combinedKeySize); - } - return Arrays.copyOfRange(combinedKey, key1Position, key1EndPosition); - } - - @NotNull - private byte[] getKey2(@NotNull byte[] combinedKey) { - return Arrays.copyOfRange(combinedKey, key2Position, key2EndPosition); - } - - @NotNull - private byte[] getCombinedKey(@NotNull byte[] key1, @NotNull byte[] key2) { - if (key1.length != key1Size) { - throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size); - } - if (key2.length != key2Size) { - throw new IndexOutOfBoundsException(key2.length, key2Size, key2Size); - } - var combinedKey = new byte[combinedKeySize]; - System.arraycopy(key1, 0, combinedKey, key1Position, key1Size); - System.arraycopy(key2, 0, combinedKey, key2Position, key2Size); - return combinedKey; - } - - private ReadOptions resolveSnapshot(LLSnapshot snapshot) { - if (snapshot != null) { - return new ReadOptions().setSnapshot(snapshotResolver.apply(snapshot)); - } else { - return EMPTY_READ_OPTIONS; - } - } - - @Override - public UnmodifiableIterableMap get(@Nullable LLSnapshot snapshot, byte[] key) throws IOException { - if (key.length != key1Size) { - throw new IndexOutOfBoundsException(key.length, key1Size, key1Size); - } - ObjectArrayList keys = new ObjectArrayList<>(); - ObjectArrayList values = new ObjectArrayList<>(); - try (var iterator = db.newIterator(cfh, resolveSnapshot(snapshot))) { - iterator.seek(key); - while (iterator.isValid()) { - - byte[] combinedKey = iterator.key(); - - if (!isSubKey(key, combinedKey)) { - break; - } - - byte[] key2 = getKey2(combinedKey); - byte[] value = iterator.value(); - keys.add(key2); - values.add(value); - - iterator.next(); - } - } - - return UnmodifiableIterableMap.of(keys.toArray(byte[][]::new), values.toArray(byte[][]::new)); - } - - @Override - public Optional get(@Nullable LLSnapshot snapshot, byte[] key1, byte[] key2) throws IOException { - if (key1.length != key1Size) { - throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size); - } - if (key2.length != key2Size) { - throw new IndexOutOfBoundsException(key2.length, key2Size, key2Size); - } - try { - Holder data = new Holder<>(); - byte[] combinedKey = getCombinedKey(key1, key2); - if (db.keyMayExist(cfh, resolveSnapshot(snapshot), combinedKey, data)) { - if (data.getValue() != null) { - return Optional.of(data.getValue()); - } else { - byte[] value = db.get(cfh, resolveSnapshot(snapshot), combinedKey); - return Optional.ofNullable(value); - } - } else { - return Optional.empty(); - } - } catch (RocksDBException e) { - throw new IOException(e); - } - } - - @Override - public boolean isEmpty(@Nullable LLSnapshot snapshot, byte[] key1) { - if (key1.length != key1Size) { - throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size); - } - byte[] startSeekKey = getStartSeekKey(key1); - try (var iterator = db.newIterator(cfh, resolveSnapshot(snapshot))) { - iterator.seek(startSeekKey); - if (!iterator.isValid()) { - return true; - } - byte[] startKey = iterator.key(); - return !isSubKey(key1, startKey); - } - } - - @Override - public boolean contains(@Nullable LLSnapshot snapshot, byte[] key1, byte[] key2) throws IOException { - if (key1.length != key1Size) { - throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size); - } - if (key2.length != key2Size) { - throw new IndexOutOfBoundsException(key2.length, key2Size, key2Size); - } - try { - var combinedKey = getCombinedKey(key1, key2); - int size = RocksDB.NOT_FOUND; - Holder data = new Holder<>(); - if (db.keyMayExist(cfh, resolveSnapshot(snapshot), combinedKey, data)) { - if (data.getValue() != null) { - size = data.getValue().length; - } else { - size = db.get(cfh, resolveSnapshot(snapshot), combinedKey, NO_DATA); - } - } - return size != RocksDB.NOT_FOUND; - } catch (RocksDBException e) { - throw new IOException(e); - } - } - - //todo: use WriteBatch to enhance performance - @Override - public void put(byte[] key1, UnmodifiableIterableMap value) throws IOException { - if (key1.length != key1Size) { - throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size); - } - try { - var bytesValue = Bytes.ofMap(value); - var alreadyEditedKeys = new ObjectOpenHashSet(); - - // Delete old keys and change keys that are already present - try (var iterator = db.newIterator(cfh)) { - iterator.seek(getStartSeekKey(key1)); - while (iterator.isValid()) { - byte[] combinedKey = iterator.key(); - - if (!isSubKey(key1, combinedKey)) { - // The key is outside of key1: exit from the iteration - break; - } - - byte[] key2 = getKey2(combinedKey); - var valueToSetHere = bytesValue.get(key2); - if (valueToSetHere == null) { - // key not present in the new data: remove it from the database - db.delete(cfh, combinedKey); - } else { - // key present in the new data: replace it on the database - alreadyEditedKeys.add(new Bytes(key2)); - db.put(cfh, combinedKey, valueToSetHere.data); - } - - iterator.next(); - } - } - - // Add new keys, avoiding to add already changed keys - var mapIterator = bytesValue.fastIterator(); - while (mapIterator.hasNext()) { - var mapEntry = mapIterator.next(); - var key2 = mapEntry.getKey(); - if (key2.data.length != key2Size) { - throw new IndexOutOfBoundsException(key2.data.length, key2Size, key2Size); - } - - if (!alreadyEditedKeys.contains(key2)) { - var value2 = mapEntry.getValue(); - db.put(cfh, getCombinedKey(key1, key2.data), value2.data); - } - } - } catch (RocksDBException ex) { - throw new IOException(ex); - } - } - - //todo: use WriteBatch to enhance performance - @Override - public void putMulti(byte[][] keys1, UnmodifiableIterableMap[] values) throws IOException { - if (keys1.length == values.length) { - for (int i = 0; i < keys1.length; i++) { - put(keys1[i], values[i]); - } - } else { - throw new IOException("Wrong parameters count"); - } - } - - @Override - public Optional put(byte[] key1, byte[] key2, byte[] value, LLDictionaryResultType resultType) - throws IOException { - if (key1.length != key1Size) { - throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size); - } - if (key2.length != key2Size) { - throw new IndexOutOfBoundsException(key2.length, key2Size, key2Size); - } - try { - byte[] response = null; - var combinedKey = getCombinedKey(key1, key2); - switch (resultType) { - case VALUE_CHANGED: - response = LLUtils.booleanToResponse(!this.contains(null, key1, key2)); - break; - case PREVIOUS_VALUE: - var data = new Holder(); - if (db.keyMayExist(cfh, combinedKey, data)) { - if (data.getValue() != null) { - response = data.getValue(); - } else { - response = db.get(cfh, combinedKey); - } - } else { - response = null; - } - break; - } - db.put(cfh, combinedKey, value); - return Optional.ofNullable(response); - } catch (RocksDBException e) { - throw new IOException(e); - } - } - - //todo: use WriteBatch to enhance performance - @Override - public void putMulti(byte[] key1, - byte[][] keys2, - byte[][] values2, - LLDictionaryResultType resultType, - Consumer responses) throws IOException { - if (keys2.length == values2.length) { - for (int i = 0; i < keys2.length; i++) { - var result = put(key1, keys2[i], values2[i], resultType); - if (resultType != LLDictionaryResultType.VOID) { - responses.accept(result.orElse(NO_DATA)); - } - } - } else { - throw new IOException("Wrong parameters count"); - } - } - - //todo: use WriteBatch to enhance performance - @Override - public void putMulti(byte[][] keys1, - byte[][] keys2, - byte[][] values2, - LLDictionaryResultType resultType, - Consumer responses) throws IOException { - if (keys1.length == keys2.length && keys2.length == values2.length) { - for (int i = 0; i < keys1.length; i++) { - var result = put(keys1[i], keys2[i], values2[i], resultType); - if (resultType != LLDictionaryResultType.VOID) { - responses.accept(result.orElse(NO_DATA)); - } - } - } else { - throw new IOException("Wrong parameters count"); - } - } - - @Override - public Optional remove(byte[] key1, byte[] key2, LLDictionaryResultType resultType) throws IOException { - if (key1.length != key1Size) { - throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size); - } - if (key2.length != key2Size) { - throw new IndexOutOfBoundsException(key2.length, key2Size, key2Size); - } - try { - byte[] response = null; - var combinedKey = getCombinedKey(key1, key2); - switch (resultType) { - case VALUE_CHANGED: - response = LLUtils.booleanToResponse(this.contains(null, key1, key2)); - break; - case PREVIOUS_VALUE: - var data = new Holder(); - if (db.keyMayExist(cfh, combinedKey, data)) { - if (data.getValue() != null) { - response = data.getValue(); - } else { - response = db.get(cfh, combinedKey); - } - } else { - response = null; - } - break; - } - db.delete(cfh, combinedKey); - return Optional.ofNullable(response); - } catch (RocksDBException e) { - throw new IOException(e); - } - } - - @Override - public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableTriConsumer consumer) { - return forEach_(consumer, snapshot == null ? null : snapshotResolver.apply(snapshot), parallelism); - } - - //todo: implement parallel execution - private ConsumerResult forEach_(CancellableTriConsumer consumer, @Nullable Snapshot snapshot, int parallelism) { - try (RocksIterator iterator = (snapshot != null ? db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot)) - : db.newIterator(cfh))) { - iterator.seekToFirst(); - while (iterator.isValid()) { - var combinedKey = iterator.key(); - var key1 = getKey1(combinedKey); - var key2 = getKey2(combinedKey); - - var result = consumer.acceptCancellable(key1, key2, iterator.value()); - if (result.isCancelled()) { - return ConsumerResult.cancelNext(); - } - - iterator.next(); - } - return ConsumerResult.result(); - } - } - - @Override - public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer> consumer) { - return forEach_(consumer, snapshot == null ? null : snapshotResolver.apply(snapshot), parallelism); - } - - //todo: implement parallel execution - private ConsumerResult forEach_(CancellableBiConsumer> consumer, @Nullable Snapshot snapshot, int parallelism) { - try (RocksIterator iterator = (snapshot != null ? db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot)) - : db.newIterator(cfh))) { - iterator.seekToFirst(); - byte[] currentKey1 = null; - // only append or iterate on this object! byte[].equals() and hash is not trustworthy! - List key2Keys = null; - // only append or iterate on this object! byte[].equals() and hash is not trustworthy! - List key2Values = null; - while (iterator.isValid()) { - var combinedKey = iterator.key(); - var key1 = getKey1(combinedKey); - - if (currentKey1 == null || !Arrays.equals(currentKey1, key1)) { - if (currentKey1 != null && !key2Values.isEmpty()) { - var result = consumer.acceptCancellable(currentKey1, UnmodifiableIterableMap.of(key2Keys.toArray(byte[][]::new), key2Values.toArray(byte[][]::new))); - if (result.isCancelled()) { - return ConsumerResult.cancelNext(); - } - } - currentKey1 = key1; - key2Keys = new ArrayList<>(); - key2Values = new ArrayList<>(); - } - - key2Keys.add(getKey2(combinedKey)); - key2Values.add(iterator.value()); - - iterator.next(); - } - if (currentKey1 != null && !key2Values.isEmpty()) { - var result = consumer.acceptCancellable(currentKey1, UnmodifiableIterableMap.of(key2Keys.toArray(byte[][]::new), key2Values.toArray(byte[][]::new))); - if (result.isCancelled()) { - return ConsumerResult.cancelNext(); - } - } - return ConsumerResult.result(); - } - } - - @Override - public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key, CancellableBiConsumer consumer) { - return forEach_(key, consumer, snapshot == null ? null : snapshotResolver.apply(snapshot), parallelism); - } - - //todo: implement parallel execution - private ConsumerResult forEach_(byte[] key1, CancellableBiConsumer consumer, @Nullable Snapshot snapshot, int parallelism) { - try (RocksIterator iterator = (snapshot != null ? db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot)) - : db.newIterator(cfh))) { - iterator.seek(getStartSeekKey(key1)); - while (iterator.isValid()) { - byte[] combinedKey = iterator.key(); - - if (!isSubKey(key1, combinedKey)) { - // The key is outside of key1: exit from the iteration - break; - } - - byte[] key2 = getKey2(combinedKey); - byte[] value2 = iterator.value(); - var result = consumer.acceptCancellable(key2, value2); - if (result.isCancelled()) { - return ConsumerResult.cancelNext(); - } - - iterator.next(); - } - return ConsumerResult.result(); - } - } - - //todo: implement parallel execution - //todo: implement replaceKeys = false optimization (like in LLLocalDictionary), check if it's feasible - @Override - public ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableTriFunction> consumer) throws IOException { - var snapshot = db.getSnapshot(); - try { - try (RocksIterator iter = db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot)); - CappedWriteBatch writeBatch = new CappedWriteBatch(db, LLLocalDictionary.CAPPED_WRITE_BATCH_CAP, LLLocalDictionary.RESERVED_WRITE_BATCH_SIZE, LLLocalDictionary.MAX_WRITE_BATCH_SIZE, LLLocalDictionary.BATCH_WRITE_OPTIONS)) { - - iter.seekToFirst(); - - while (iter.isValid()) { - - writeBatch.delete(cfh, iter.key()); - - iter.next(); - } - - iter.seekToFirst(); - - while (iter.isValid()) { - var combinedKey = iter.key(); - var key1 = getKey1(combinedKey); - var key2 = getKey2(combinedKey); - - var result = consumer.applyCancellable(key1, key2, iter.value()); - if (result.getValue().getLeft().length != key1Size) { - throw new IndexOutOfBoundsException(result.getValue().getLeft().length, key1Size, key1Size); - } - if (result.getValue().getMiddle().length != key2Size) { - throw new IndexOutOfBoundsException(result.getValue().getMiddle().length, key2Size, key2Size); - } - - writeBatch.put(cfh, getCombinedKey(result.getValue().getLeft(), result.getValue().getMiddle()), result.getValue().getRight()); - - if (result.isCancelled()) { - // Cancels and discards the write batch - writeBatch.clear(); - return ConsumerResult.cancelNext(); - } - - iter.next(); - } - - writeBatch.writeToDbAndClose(); - - return ConsumerResult.result(); - } - } catch (RocksDBException ex) { - throw new IOException(ex); - } finally { - db.releaseSnapshot(snapshot); - snapshot.close(); - } - } - - //todo: implement parallel execution - //todo: implement replaceKeys = false optimization (like in LLLocalDictionary), check if it's feasible - @Override - public ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction, Entry>> consumer) - throws IOException { - try { - var snapshot = db.getSnapshot(); - try (RocksIterator iter = db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot)); - CappedWriteBatch writeBatch = new CappedWriteBatch(db, LLLocalDictionary.CAPPED_WRITE_BATCH_CAP, LLLocalDictionary.RESERVED_WRITE_BATCH_SIZE, LLLocalDictionary.MAX_WRITE_BATCH_SIZE, LLLocalDictionary.BATCH_WRITE_OPTIONS)) { - - iter.seekToFirst(); - - while (iter.isValid()) { - - writeBatch.delete(cfh, iter.key()); - - iter.next(); - } - - iter.seekToFirst(); - - byte[] currentKey1 = null; - // only append or iterate on this object! byte[].equals() and hash is not trustworthy! - ObjectArrayList key2Keys = null; - // only append or iterate on this object! byte[].equals() and hash is not trustworthy! - ObjectArrayList key2Values = null; - while (iter.isValid()) { - var combinedKey = iter.key(); - var key1 = getKey1(combinedKey); - - if (currentKey1 == null || !Arrays.equals(currentKey1, key1)) { - if (currentKey1 != null && !key2Values.isEmpty()) { - var result = replaceAll_(writeBatch, - currentKey1, - key2Keys.toArray(byte[][]::new), - key2Values.toArray(byte[][]::new), - consumer - ); - - if (result.isCancelled()) { - // Cancels and discards the write batch - writeBatch.clear(); - return ConsumerResult.cancelNext(); - } - } - currentKey1 = key1; - key2Keys = new ObjectArrayList<>(); - key2Values = new ObjectArrayList<>(); - } - - key2Keys.add(getKey2(combinedKey)); - key2Values.add(iter.value()); - - iter.next(); - } - if (currentKey1 != null && !key2Values.isEmpty()) { - var result = replaceAll_(writeBatch, - currentKey1, - key2Keys.toArray(byte[][]::new), - key2Values.toArray(byte[][]::new), - consumer - ); - - if (result.isCancelled()) { - // Cancels and discards the write batch - writeBatch.clear(); - return ConsumerResult.cancelNext(); - } - } - - writeBatch.writeToDbAndClose(); - - return ConsumerResult.result(); - } finally { - db.releaseSnapshot(snapshot); - snapshot.close(); - } - } catch (RocksDBException exception) { - throw new IOException(exception); - } - } - - private ConsumerResult replaceAll_(WriteBatchInterface writeBatch, - byte[] key1, - byte[][] key2Keys, - byte[][] key2Values, - CancellableBiFunction, Entry>> consumer) - throws RocksDBException { - if (key1.length != key1Size) { - throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size); - } - var previousValues = UnmodifiableMap.of(key2Keys, key2Values); - var result = consumer.applyCancellable(key1, previousValues); - - var resultKey1 = result.getValue().getKey(); - if (resultKey1.length != key1Size) { - throw new IndexOutOfBoundsException(resultKey1.length, key1Size, key1Size); - } - var resultValues = result.getValue().getValue(); - - var mapIterator = resultValues.fastIterator(); - while (mapIterator.hasNext()) { - var mapEntry = mapIterator.next(); - var key2 = mapEntry.getKey(); - if (key2.data.length != key2Size) { - throw new IndexOutOfBoundsException(key2.data.length, key2Size, key2Size); - } - - var value2 = mapEntry.getValue(); - writeBatch.put(cfh, getCombinedKey(key1, key2.data), value2); - - if (result.isCancelled()) { - // Cancels and discards the write batch - writeBatch.clear(); - return ConsumerResult.cancelNext(); - } - } - return ConsumerResult.result(); - } - - //todo: implement parallel execution - //todo: implement replaceKeys = false optimization (like in LLLocalDictionary), check if it's feasible - @Override - public ConsumerResult replaceAll(int parallelism, boolean replaceKeys, byte[] key1, CancellableBiFunction> consumer) throws IOException { - if (key1.length != key1Size) { - throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size); - } - try { - var snapshot = db.getSnapshot(); - try (RocksIterator iter = db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot)); - CappedWriteBatch writeBatch = new CappedWriteBatch(db, LLLocalDictionary.CAPPED_WRITE_BATCH_CAP, LLLocalDictionary.RESERVED_WRITE_BATCH_SIZE, LLLocalDictionary.MAX_WRITE_BATCH_SIZE, LLLocalDictionary.BATCH_WRITE_OPTIONS)) { - - iter.seek(getStartSeekKey(key1)); - - while (iter.isValid()) { - byte[] combinedKey = iter.key(); - - if (!isSubKey(key1, combinedKey)) { - // The key is outside of key1: exit from the iteration - break; - } - - writeBatch.delete(cfh, combinedKey); - - iter.next(); - } - - iter.seek(getStartSeekKey(key1)); - - while (iter.isValid()) { - byte[] combinedKey = iter.key(); - - if (!isSubKey(key1, combinedKey)) { - // The key is outside of key1: exit from the iteration - break; - } - - byte[] key2 = getKey2(combinedKey); - byte[] value2 = iter.value(); - - var result = consumer.applyCancellable(key2, value2); - if (result.getValue().getKey().length != key2Size) { - throw new IndexOutOfBoundsException(result.getValue().getKey().length, key2Size, key2Size); - } - - writeBatch.put(cfh, result.getValue().getKey(), result.getValue().getValue()); - - if (result.isCancelled()) { - // Cancels and discards the write batch - writeBatch.clear(); - return ConsumerResult.cancelNext(); - } - - iter.next(); - } - - writeBatch.writeToDbAndClose(); - - return ConsumerResult.result(); - } finally { - db.releaseSnapshot(snapshot); - snapshot.close(); - } - } catch (RocksDBException e) { - throw new IOException(e); - } - } - - // This method is exactly the same of LLLocalDictionary. Remember to keep the code equal - @Override - public void clear() throws IOException { - try { - List ranges = new ArrayList<>(); - byte[] firstKey = null; - byte[] lastKey = null; - boolean empty = false; - while (!empty) { - // retrieve the range extremities - try (RocksIterator iter = db.newIterator(cfh)) { - iter.seekToFirst(); - if (iter.isValid()) { - firstKey = iter.key(); - iter.seekToLast(); - lastKey = iter.key(); - ranges.add(firstKey); - ranges.add(lastKey); - } else { - empty = true; - } - } - - if (!empty) { - if (Arrays.equals(firstKey, lastKey)) { - // Delete single key - db.delete(cfh, lastKey); - } else { - // Delete all - db.deleteRange(cfh, firstKey, lastKey); - // Delete the end because it's not included in the deleteRange domain - db.delete(cfh, lastKey); - } - } - } - - // Delete files related - db.deleteFilesInRanges(cfh, ranges, true); - - // Compact range - db.compactRange(cfh); - - db.flush(new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true), cfh); - db.flushWal(true); - - var finalSize = exactSize(null); - if (finalSize != 0) { - throw new IllegalStateException("The dictionary is not empty after calling clear()"); - } - } catch (RocksDBException e) { - throw new IOException(e); - } - } - - @Override - public Optional> clear(byte[] key1, LLDictionaryResultType resultType) - throws IOException { - if (key1.length != key1Size) { - throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size); - } - try { - Optional> result; - switch (resultType) { - case PREVIOUS_VALUE: - List keys = new ArrayList<>(); - List values = new ArrayList<>(); - try (RocksIterator iter = db.newIterator(cfh)) { - iter.seek(getStartSeekKey(key1)); - while (iter.isValid()) { - var combinedKey = iter.key(); - - if (!isSubKey(key1, combinedKey)) { - break; - } - - keys.add(getKey2(combinedKey)); - values.add(iter.value()); - } - } - result = Optional.of(UnmodifiableIterableMap.of(keys.toArray(byte[][]::new), values.toArray(byte[][]::new))); - break; - case VALUE_CHANGED: - if (isEmpty(null, key1)) { - result = Optional.empty(); - } else { - result = Optional.of(UnmodifiableIterableMap.of(NO_DATA_MAP, NO_DATA_MAP)); - } - break; - case VOID: - default: - result = Optional.empty(); - break; - } - db.deleteRange(cfh, getStartSeekKey(key1), getEndSeekKey(key1)); - return result; - } catch (RocksDBException ex) { - throw new IOException(ex); - } - } - - @Override - public long size(@Nullable LLSnapshot snapshot, boolean fast) { - return fast ? fastSize(snapshot) : exactSize(snapshot); - } - - public long fastSize(@Nullable LLSnapshot snapshot) { - try { - if (snapshot != null) { - return this.exactSize(snapshot); - } - return db.getLongProperty(cfh, "rocksdb.estimate-num-keys"); - } catch (RocksDBException e) { - e.printStackTrace(); - return 0; - } - } - - public long exactSize(@Nullable LLSnapshot snapshot) { - long count = 0; - byte[] currentKey1 = null; - try (RocksIterator iter = db.newIterator(cfh, resolveSnapshot(snapshot))) { - iter.seekToFirst(); - while (iter.isValid()) { - byte[] combinedKey = iter.key(); - - if (!isSubKey(currentKey1, combinedKey)) { - count++; - currentKey1 = getKey1(combinedKey); - } - iter.next(); - } - return count; - } - } - - @Override - public long exactSize(@Nullable LLSnapshot snapshot, byte[] key1) { - if (key1.length != key1Size) { - throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size); - } - long count = 0; - try (RocksIterator iterator = db.newIterator(cfh, resolveSnapshot(snapshot))) { - iterator.seek(getStartSeekKey(key1)); - while (iterator.isValid()) { - byte[] combinedKey = iterator.key(); - - if (!isSubKey(key1, combinedKey)) { - // The key is outside of key1: exit from the iteration - break; - } - - count++; - iterator.next(); - } - } - return count; - } -} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 004cbed..1dabd05 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -2,17 +2,18 @@ package it.cavallium.dbengine.database.disk; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; +import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; -import java.util.function.Consumer; import java.util.function.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -27,9 +28,12 @@ import org.rocksdb.Snapshot; import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; import org.warp.commonutils.concurrency.atomicity.NotAtomic; -import org.warp.commonutils.functional.CancellableBiConsumer; import org.warp.commonutils.functional.CancellableBiFunction; import org.warp.commonutils.functional.ConsumerResult; +import org.warp.commonutils.type.VariableWrapper; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; @NotAtomic public class LLLocalDictionary implements LLDictionary { @@ -40,6 +44,7 @@ public class LLLocalDictionary implements LLDictionary { static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations static final WriteOptions BATCH_WRITE_OPTIONS = new WriteOptions().setLowPri(true); + private static final byte[] FIRST_KEY = new byte[]{}; private static final byte[] NO_DATA = new byte[0]; private static final ReadOptions EMPTY_READ_OPTIONS = new ReadOptions(); private static final List EMPTY_UNMODIFIABLE_LIST = List.of(); @@ -82,104 +87,208 @@ public class LLLocalDictionary implements LLDictionary { } @Override - public Optional get(@Nullable LLSnapshot snapshot, byte[] key) throws IOException { - try { - Holder data = new Holder<>(); - if (db.keyMayExist(cfh, resolveSnapshot(snapshot), key, data)) { - if (data.getValue() != null) { - return Optional.of(data.getValue()); - } else { - byte[] value = db.get(cfh, resolveSnapshot(snapshot), key); - return Optional.ofNullable(value); - } - } else { - return Optional.empty(); - } - } catch (RocksDBException e) { - throw new IOException(e); - } - } - - @Override - public boolean contains(@Nullable LLSnapshot snapshot, byte[] key) throws IOException { - return contains_(snapshot, key); - } - - private boolean contains_(@Nullable LLSnapshot snapshot, byte[] key) throws IOException { - try { - int size = RocksDB.NOT_FOUND; - Holder data = new Holder<>(); - if (db.keyMayExist(cfh, resolveSnapshot(snapshot), key, data)) { - if (data.getValue() != null) { - size = data.getValue().length; - } else { - size = db.get(cfh, resolveSnapshot(snapshot), key, NO_DATA); - } - } - return size != RocksDB.NOT_FOUND; - } catch (RocksDBException e) { - throw new IOException(e); - } - } - - @Override - public Optional put(byte[] key, byte[] value, LLDictionaryResultType resultType) throws IOException { - try { - byte[] response = null; - switch (resultType) { - case VALUE_CHANGED: - response = LLUtils.booleanToResponse(!contains_(null, key)); - break; - case PREVIOUS_VALUE: - var data = new Holder(); - if (db.keyMayExist(cfh, key, data)) { + public Mono get(@Nullable LLSnapshot snapshot, byte[] key) { + return Mono + .fromCallable(() -> { + Holder data = new Holder<>(); + if (db.keyMayExist(cfh, resolveSnapshot(snapshot), key, data)) { if (data.getValue() != null) { - response = data.getValue(); + return data.getValue(); } else { - response = db.get(cfh, key); + return db.get(cfh, resolveSnapshot(snapshot), key); } } else { - response = null; + return null; } - break; - } - db.put(cfh, key, value); - return Optional.ofNullable(response); - } catch (RocksDBException e) { - throw new IOException(e); - } + }) + .onErrorMap(IOException::new) + .subscribeOn(Schedulers.boundedElastic()); } @Override - public void putMulti(byte[][] key, byte[][] value, LLDictionaryResultType resultType, Consumer responsesConsumer) - throws IOException { - if (key.length == value.length) { - List responses; - try (WriteBatch writeBatch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) { - - if (resultType == LLDictionaryResultType.VOID) { - responses = EMPTY_UNMODIFIABLE_LIST; - } else { - responses = db.multiGetAsList(newCfhList(cfh, key.length), Arrays.asList(key)); - } - - for (int i = 0; i < key.length; i++) { - writeBatch.put(cfh, key[i], value[i]); - } - - db.write(BATCH_WRITE_OPTIONS, writeBatch); - } catch (RocksDBException e) { - throw new IOException(e); - } - - for (byte[] response : responses) { - responsesConsumer.accept(response); - } + public Mono isEmpty(@Nullable LLSnapshot snapshot, LLRange range) { + if (range.isSingle()) { + return containsKey(snapshot, range.getSingle()).map(contains -> !contains); } else { - throw new IOException("Wrong parameters count"); + return containsRange(snapshot, range).map(contains -> !contains); } } + public Mono containsRange(@Nullable LLSnapshot snapshot, LLRange range) { + return Mono + .fromCallable(() -> { + try (RocksIterator iter = db.newIterator(cfh, resolveSnapshot(snapshot))) { + if (range.hasMin()) { + iter.seek(range.getMin()); + } else { + iter.seekToFirst(); + } + if (!iter.isValid()) { + return false; + } + + if (range.hasMax()) { + byte[] key1 = iter.key(); + return Arrays.compareUnsigned(key1, range.getMax()) <= 0; + } else { + return true; + } + } + }) + .onErrorMap(IOException::new) + .subscribeOn(Schedulers.boundedElastic()); + } + + private Mono containsKey(@Nullable LLSnapshot snapshot, byte[] key) { + return Mono + .fromCallable(() -> { + int size = RocksDB.NOT_FOUND; + Holder data = new Holder<>(); + if (db.keyMayExist(cfh, resolveSnapshot(snapshot), key, data)) { + if (data.getValue() != null) { + size = data.getValue().length; + } else { + size = db.get(cfh, resolveSnapshot(snapshot), key, NO_DATA); + } + } + return size != RocksDB.NOT_FOUND; + }) + .onErrorMap(IOException::new) + .subscribeOn(Schedulers.boundedElastic()); + } + + @Override + public Mono put(byte[] key, byte[] value, LLDictionaryResultType resultType) { + Mono response = null; + switch (resultType) { + case VALUE_CHANGED: + response = containsKey(null, key).single().map(LLUtils::booleanToResponse); + break; + case PREVIOUS_VALUE: + response = Mono + .fromCallable(() -> { + var data = new Holder(); + if (db.keyMayExist(cfh, key, data)) { + if (data.getValue() != null) { + return data.getValue(); + } else { + return db.get(cfh, key); + } + } else { + return null; + } + }) + .onErrorMap(IOException::new) + .subscribeOn(Schedulers.boundedElastic()); + break; + case VOID: + response = Mono.empty(); + break; + } + + return Mono + .fromCallable(() -> { + db.put(cfh, key, value); + return null; + }) + .onErrorMap(IOException::new) + .subscribeOn(Schedulers.boundedElastic()) + .then(response); + } + + @Override + public Mono remove(byte[] key, LLDictionaryResultType resultType) { + Mono response = null; + switch (resultType) { + case VALUE_CHANGED: + response = containsKey(null, key).single().map(LLUtils::booleanToResponse); + break; + case PREVIOUS_VALUE: + response = Mono + .fromCallable(() -> { + var data = new Holder(); + if (db.keyMayExist(cfh, key, data)) { + if (data.getValue() != null) { + return data.getValue(); + } else { + return db.get(cfh, key); + } + } else { + return null; + } + }) + .onErrorMap(IOException::new) + .subscribeOn(Schedulers.boundedElastic()); + break; + case VOID: + response = Mono.empty(); + break; + } + + return Mono + .fromCallable(() -> { + db.delete(cfh, key); + return null; + }) + .onErrorMap(IOException::new) + .subscribeOn(Schedulers.boundedElastic()) + .then(response); + } + + @Override + public Flux> getMulti(@Nullable LLSnapshot snapshot, Flux keys) { + return keys.flatMap(key -> this.get(snapshot, key).map(value -> Map.entry(key, value))); + } + + @Override + public Flux> putMulti(Flux> entries, boolean getOldValues) { + return Mono + .fromCallable(() -> new CappedWriteBatch(db, + CAPPED_WRITE_BATCH_CAP, + RESERVED_WRITE_BATCH_SIZE, + MAX_WRITE_BATCH_SIZE, + BATCH_WRITE_OPTIONS + )) + .subscribeOn(Schedulers.boundedElastic()) + .flatMapMany(writeBatch -> entries + .flatMap(newEntry -> Mono + .defer(() -> { + if (getOldValues) { + return get(null, newEntry.getKey()); + } else { + return Mono.empty(); + } + }) + .concatWith(Mono + .fromCallable(() -> { + synchronized (writeBatch) { + writeBatch.put(cfh, newEntry.getKey(), newEntry.getValue()); + } + return null; + }) + .subscribeOn(Schedulers.boundedElastic()) + ) + .map(oldValue -> Map.entry(newEntry.getKey(), oldValue)) + ) + .concatWith(Mono + .>fromCallable(() -> { + synchronized (writeBatch) { + writeBatch.writeToDbAndClose(); + writeBatch.close(); + } + return null; + }) + .subscribeOn(Schedulers.boundedElastic()) + ) + .doFinally(signalType -> { + synchronized (writeBatch) { + writeBatch.close(); + } + }) + ) + .onErrorMap(IOException::new); + } + private static List newCfhList(ColumnFamilyHandle cfh, int size) { var list = new ArrayList(size); for (int i = 0; i < size; i++) { @@ -189,139 +298,225 @@ public class LLLocalDictionary implements LLDictionary { } @Override - public Optional remove(byte[] key, LLDictionaryResultType resultType) throws IOException { - try { - byte[] response = null; - switch (resultType) { - case VALUE_CHANGED: - response = LLUtils.booleanToResponse(contains_(null, key)); - break; - case PREVIOUS_VALUE: - var data = new Holder(); - if (db.keyMayExist(cfh, key, data)) { - if (data.getValue() != null) { - response = data.getValue(); - } else { - response = db.get(cfh, key); - } + public Flux> getRange(@Nullable LLSnapshot snapshot, LLRange range) { + if (range.isSingle()) { + return getRangeSingle(snapshot, range.getMin()); + } else { + return getRangeMulti(snapshot, range); + } + } + + private Flux> getRangeMulti(LLSnapshot snapshot, LLRange range) { + return Mono + .fromCallable(() -> { + var iter = db.newIterator(cfh, resolveSnapshot(snapshot)); + if (range.hasMin()) { + iter.seek(range.getMin()); } else { - response = null; + iter.seekToFirst(); } - break; - } - db.delete(cfh, key); - return Optional.ofNullable(response); - } catch (RocksDBException e) { - throw new IOException(e); + return iter; + }) + .subscribeOn(Schedulers.boundedElastic()) + .flatMapMany(rocksIterator -> Flux + .>fromIterable(() -> { + VariableWrapper nextKey = new VariableWrapper<>(null); + VariableWrapper nextValue = new VariableWrapper<>(null); + return new Iterator<>() { + @Override + public boolean hasNext() { + assert nextKey.var == null; + assert nextValue.var == null; + if (!rocksIterator.isValid()) { + nextKey.var = null; + nextValue.var = null; + return false; + } + var key = rocksIterator.key(); + var value = rocksIterator.value(); + if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) { + nextKey.var = null; + nextValue.var = null; + return false; + } + nextKey.var = key; + nextValue.var = value; + return true; + } + + @Override + public Entry next() { + var key = nextKey.var; + var val = nextValue.var; + assert key != null; + assert val != null; + nextKey.var = null; + nextValue.var = null; + return Map.entry(key, val); + } + }; + }) + .doFinally(signalType -> rocksIterator.close()) + .subscribeOn(Schedulers.boundedElastic()) + ); + } + + private Flux> getRangeSingle(LLSnapshot snapshot, byte[] key) { + return this + .get(snapshot, key) + .map(value -> Map.entry(key, value)) + .flux(); + } + + @Override + public Flux> setRange(LLRange range, + Flux> entries, + boolean getOldValues) { + if (range.isAll()) { + return clear().thenMany(Flux.empty()); + } else { + return Mono + .fromCallable(() -> new CappedWriteBatch(db, CAPPED_WRITE_BATCH_CAP, RESERVED_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE, BATCH_WRITE_OPTIONS)) + .subscribeOn(Schedulers.boundedElastic()) + .flatMapMany(writeBatch -> Mono + .fromCallable(() -> { + synchronized (writeBatch) { + if (range.hasMin() && range.hasMax()) { + writeBatch.deleteRange(cfh, range.getMin(), range.getMax()); + writeBatch.delete(cfh, range.getMax()); + } else if (range.hasMax()) { + writeBatch.deleteRange(cfh, FIRST_KEY, range.getMax()); + writeBatch.delete(cfh, range.getMax()); + } else { + try (var it = db.newIterator(cfh, getReadOptions(null))) { + it.seekToLast(); + if (it.isValid()) { + writeBatch.deleteRange(cfh, range.getMin(), it.key()); + writeBatch.delete(cfh, it.key()); + } + } + } + } + return null; + }) + .subscribeOn(Schedulers.boundedElastic()) + .thenMany(entries) + .flatMap(newEntry -> Mono + .defer(() -> { + if (getOldValues) { + return get(null, newEntry.getKey()); + } else { + return Mono.empty(); + } + }) + .concatWith(Mono + .fromCallable(() -> { + synchronized (writeBatch) { + writeBatch.put(cfh, newEntry.getKey(), newEntry.getValue()); + } + return null; + }) + .subscribeOn(Schedulers.boundedElastic()) + ) + .map(oldValue -> Map.entry(newEntry.getKey(), oldValue)) + ) + .concatWith(Mono + .>fromCallable(() -> { + synchronized (writeBatch) { + writeBatch.writeToDbAndClose(); + } + return null; + }) + .subscribeOn(Schedulers.boundedElastic()) + ) + .doFinally(signalType -> { + synchronized (writeBatch) { + writeBatch.close(); + } + }) + ) + .onErrorMap(IOException::new); } } - //todo: implement parallel forEach - @Override - public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer consumer) { - try (RocksIterator iter = db.newIterator(cfh, resolveSnapshot(snapshot))) { - iter.seekToFirst(); - while (iter.isValid()) { - if (consumer.acceptCancellable(iter.key(), iter.value()).isCancelled()) { - return ConsumerResult.cancelNext(); - } - iter.next(); - } - } - return ConsumerResult.result(); - } + public Mono clear() { + return Mono + .fromCallable(() -> { + try (RocksIterator iter = db.newIterator(cfh); CappedWriteBatch writeBatch = new CappedWriteBatch(db, + CAPPED_WRITE_BATCH_CAP, + RESERVED_WRITE_BATCH_SIZE, + MAX_WRITE_BATCH_SIZE, + BATCH_WRITE_OPTIONS + )) { - //todo: implement parallel replace - @Override - public ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction> consumer) throws IOException { - try { - try (var snapshot = replaceKeys ? db.getSnapshot() : null) { - try (RocksIterator iter = db.newIterator(cfh, getReadOptions(snapshot)); - CappedWriteBatch writeBatch = new CappedWriteBatch(db, CAPPED_WRITE_BATCH_CAP, RESERVED_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE, BATCH_WRITE_OPTIONS)) { + iter.seekToFirst(); - iter.seekToFirst(); - - if (replaceKeys) { while (iter.isValid()) { writeBatch.delete(cfh, iter.key()); iter.next(); } + + writeBatch.writeToDbAndClose(); + + // Compact range + db.compactRange(cfh); + + db.flush(new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true), cfh); + db.flushWal(true); + + var finalSize = exactSize(null); + if (finalSize != 0) { + throw new IllegalStateException("The dictionary is not empty after calling clear()"); + } } + return null; + }) + .onErrorMap(IOException::new) + .subscribeOn(Schedulers.boundedElastic()); - iter.seekToFirst(); - - while (iter.isValid()) { - - var result = consumer.applyCancellable(iter.key(), iter.value()); - boolean keyDiffers = !Arrays.equals(iter.key(), result.getValue().getKey()); - if (!replaceKeys && keyDiffers) { - throw new IOException("Tried to replace a key"); - } - - // put if changed or if keys can be swapped/replaced - if (replaceKeys || !Arrays.equals(iter.value(), result.getValue().getValue())) { - writeBatch.put(cfh, result.getValue().getKey(), result.getValue().getValue()); - } - - if (result.isCancelled()) { - // Cancels and discards the write batch - writeBatch.clear(); - return ConsumerResult.cancelNext(); - } - - iter.next(); - } - - writeBatch.writeToDbAndClose(); - - return ConsumerResult.result(); - } finally { - db.releaseSnapshot(snapshot); - } - } - } catch (RocksDBException e) { - throw new IOException(e); - } - } - - // This method is exactly the same of LLLocalDictionary. Remember to keep the code equal - @Override - public void clear() throws IOException { - try (RocksIterator iter = db.newIterator(cfh); - CappedWriteBatch writeBatch = new CappedWriteBatch(db, CAPPED_WRITE_BATCH_CAP, RESERVED_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE, BATCH_WRITE_OPTIONS)) { - - iter.seekToFirst(); - - while (iter.isValid()) { - writeBatch.delete(cfh, iter.key()); - - iter.next(); - } - - writeBatch.writeToDbAndClose(); - - // Compact range - db.compactRange(cfh); - - db.flush(new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true), cfh); - db.flushWal(true); - - var finalSize = exactSize(null); - if (finalSize != 0) { - throw new IllegalStateException("The dictionary is not empty after calling clear()"); - } - } catch (RocksDBException e) { - throw new IOException(e); - } } @Override - public long size(@Nullable LLSnapshot snapshot, boolean fast) throws IOException { - return fast ? fastSize(snapshot) : exactSize(snapshot); + public Mono sizeRange(@Nullable LLSnapshot snapshot, LLRange range, boolean fast) { + return Mono + .defer(() -> { + if (range.isAll()) { + return Mono + .fromCallable(() -> fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot)) + .onErrorMap(IOException::new) + .subscribeOn(Schedulers.boundedElastic()); + } else { + return Mono + .fromCallable(() -> { + try (var iter = db.newIterator(cfh, resolveSnapshot(snapshot))) { + if (range.hasMin()) { + iter.seek(range.getMin()); + } else { + iter.seekToFirst(); + } + long i = 0; + while (iter.isValid()) { + if (range.hasMax()) { + byte[] key1 = iter.key(); + if (Arrays.compareUnsigned(key1, range.getMax()) > 0) { + break; + } + } + + iter.next(); + i++; + } + return i; + } + }) + .onErrorMap(IOException::new) + .subscribeOn(Schedulers.boundedElastic()); + } + }); } - public long fastSize(@Nullable LLSnapshot snapshot) { + private long fastSizeAll(@Nullable LLSnapshot snapshot) { var rocksdbSnapshot = resolveSnapshot(snapshot); if (USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS || rocksdbSnapshot.snapshot() == null) { try { @@ -344,7 +539,7 @@ public class LLLocalDictionary implements LLDictionary { } } - public long exactSize(@Nullable LLSnapshot snapshot) { + private long exactSizeAll(@Nullable LLSnapshot snapshot) { long count = 0; try (RocksIterator iter = db.newIterator(cfh, resolveSnapshot(snapshot))) { iter.seekToFirst(); @@ -357,29 +552,48 @@ public class LLLocalDictionary implements LLDictionary { } @Override - public boolean isEmpty(@Nullable LLSnapshot snapshot) { - try (RocksIterator iter = db.newIterator(cfh, resolveSnapshot(snapshot))) { - iter.seekToFirst(); - if (iter.isValid()) { - return false; - } - } - return true; + public Mono isRangeEmpty(@Nullable LLSnapshot snapshot, LLRange range) { + return Mono + .fromCallable(() -> { + try (RocksIterator iter = db.newIterator(cfh, resolveSnapshot(snapshot))) { + if (range.hasMin()) { + iter.seek(range.getMin()); + } else { + iter.seekToFirst(); + } + if (!iter.isValid()) { + return true; + } + return range.hasMax() && Arrays.compareUnsigned(iter.key(), range.getMax()) > 0; + } + }) + .onErrorMap(IOException::new) + .subscribeOn(Schedulers.boundedElastic()); } @Override - public Optional> removeOne() throws IOException { - try (RocksIterator iter = db.newIterator(cfh)) { - iter.seekToFirst(); - if (iter.isValid()) { - byte[] key = iter.key(); - byte[] value = iter.value(); - db.delete(cfh, key); - return Optional.of(Map.entry(key, value)); - } - } catch (RocksDBException e) { - throw new IOException(e); - } - return Optional.empty(); + public Mono> removeOne(LLRange range) { + return Mono + .fromCallable(() -> { + try (RocksIterator iter = db.newIterator(cfh)) { + if (range.hasMin()) { + iter.seek(range.getMin()); + } else { + iter.seekToFirst(); + } + if (!iter.isValid()) { + return null; + } + if (range.hasMax() && Arrays.compareUnsigned(iter.key(), range.getMax()) > 0) { + return null; + } + byte[] key = iter.key(); + byte[] value = iter.value(); + db.delete(cfh, key); + return Map.entry(key, value); + } + }) + .onErrorMap(IOException::new) + .subscribeOn(Schedulers.boundedElastic()); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 2b16135..6e68669 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -1,5 +1,10 @@ package it.cavallium.dbengine.database.disk; +import it.cavallium.dbengine.database.Column; +import it.cavallium.dbengine.database.LLDictionary; +import it.cavallium.dbengine.database.LLKeyValueDatabase; +import it.cavallium.dbengine.database.LLSingleton; +import it.cavallium.dbengine.database.LLSnapshot; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -31,12 +36,6 @@ import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.Snapshot; import org.rocksdb.WALRecoveryMode; -import it.cavallium.dbengine.database.Column; -import it.cavallium.dbengine.database.LLDeepDictionary; -import it.cavallium.dbengine.database.LLDictionary; -import it.cavallium.dbengine.database.LLKeyValueDatabase; -import it.cavallium.dbengine.database.LLSingleton; -import it.cavallium.dbengine.database.LLSnapshot; public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { @@ -54,6 +53,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { private final ConcurrentHashMap snapshotsHandles = new ConcurrentHashMap<>(); private final AtomicLong nextSnapshotNumbers = new AtomicLong(1); + @SuppressWarnings("CommentedOutCode") public LLLocalKeyValueDatabase(String name, Path path, List columns, List handles, boolean crashIfWalError, boolean lowMemory) throws IOException { Options options = openRocksDb(path, crashIfWalError, lowMemory); @@ -139,6 +139,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { // end force flush } + @SuppressWarnings("CommentedOutCode") private static Options openRocksDb(Path path, boolean crashIfWalError, boolean lowMemory) throws IOException { // Get databases directory path @@ -243,8 +244,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { var handles = new LinkedList(); - /** - * SkipStatsUpdateOnDbOpen = true because this RocksDB.open session is used only to add just some columns + /* + SkipStatsUpdateOnDbOpen = true because this RocksDB.open session is used only to add just some columns */ //var dbOptionsFastLoadSlowEdit = new DBOptions(options.setSkipStatsUpdateOnDbOpen(true)); @@ -270,8 +271,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { descriptorsToCreate .removeIf((cf) -> Arrays.equals(cf.getName(), DEFAULT_COLUMN_FAMILY.getName())); - /** - * SkipStatsUpdateOnDbOpen = true because this RocksDB.open session is used only to add just some columns + /* + SkipStatsUpdateOnDbOpen = true because this RocksDB.open session is used only to add just some columns */ //var dbOptionsFastLoadSlowEdit = options.setSkipStatsUpdateOnDbOpen(true); @@ -310,17 +311,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { ); } - @Override - public LLDeepDictionary getDeepDictionary(byte[] columnName, int keySize, int key2Size) { - return new LLLocalDeepDictionary(db, - handles.get(Column.special(Column.toString(columnName))), - name, - (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()), - keySize, - key2Size - ); - } - @Override public long getProperty(String propertyName) throws IOException { try { @@ -360,6 +350,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { /** * Call this method ONLY AFTER flushing completely a db and closing it! */ + @SuppressWarnings("unused") private void deleteUnusedOldLogFiles() { Path basePath = dbPath; try { diff --git a/src/main/java/it/cavallium/dbengine/database/structures/LLDeepMap.java b/src/main/java/it/cavallium/dbengine/database/structures/LLDeepMap.java deleted file mode 100644 index 7587aad..0000000 --- a/src/main/java/it/cavallium/dbengine/database/structures/LLDeepMap.java +++ /dev/null @@ -1,162 +0,0 @@ -package it.cavallium.dbengine.database.structures; - -import it.cavallium.dbengine.database.LLDeepDictionary; -import it.cavallium.dbengine.database.LLDictionaryResultType; -import it.cavallium.dbengine.database.LLKeyValueDatabaseStructure; -import it.cavallium.dbengine.database.LLSnapshot; -import java.io.IOException; -import java.util.Map.Entry; -import java.util.Objects; -import java.util.Optional; -import java.util.StringJoiner; -import java.util.function.Consumer; -import org.apache.commons.lang3.tuple.ImmutableTriple; -import org.jetbrains.annotations.Nullable; -import org.warp.commonutils.functional.CancellableBiConsumer; -import org.warp.commonutils.functional.CancellableBiFunction; -import org.warp.commonutils.functional.CancellableTriConsumer; -import org.warp.commonutils.functional.CancellableTriFunction; -import org.warp.commonutils.functional.ConsumerResult; -import org.warp.commonutils.type.Bytes; -import org.warp.commonutils.type.UnmodifiableIterableMap; -import org.warp.commonutils.type.UnmodifiableMap; - -public class LLDeepMap implements LLKeyValueDatabaseStructure { - - private final LLDeepDictionary dictionary; - - public LLDeepMap(LLDeepDictionary dictionary) { - this.dictionary = dictionary; - } - - public UnmodifiableIterableMap get(@Nullable LLSnapshot snapshot, byte[] key) throws IOException { - return dictionary.get(snapshot, key); - } - - public Optional get(@Nullable LLSnapshot snapshot, byte[] key1, byte[] key2) throws IOException { - return dictionary.get(snapshot, key1, key2); - } - - public boolean isEmpty(@Nullable LLSnapshot snapshot, byte[] key1) { - return dictionary.isEmpty(snapshot, key1); - } - - public boolean contains(@Nullable LLSnapshot snapshot, byte[] key1, byte[] key2) throws IOException { - return dictionary.contains(snapshot, key1, key2); - } - - /** - * Note: this will remove previous elements because it replaces the entire map of key - */ - public void put(byte[] key1, UnmodifiableIterableMap value) throws IOException { - dictionary.put(key1, value); - } - - public Optional put(byte[] key1, byte[] key2, byte[] value, LLDeepMapResultType resultType) throws IOException { - return dictionary.put(key1, key2, value, resultType.getDictionaryResultType()); - } - - public void putMulti(byte[][] keys1, UnmodifiableIterableMap[] values) throws IOException { - dictionary.putMulti(keys1, values); - } - - public void putMulti(byte[] key1, byte[][] keys2, byte[][] values, LLDeepMapResultType resultType, Consumer responses) throws IOException { - dictionary.putMulti(key1, keys2, values, resultType.getDictionaryResultType(), responses); - } - - public void putMulti(byte[][] keys1, byte[][] keys2, byte[][] values, LLDeepMapResultType resultType, Consumer responses) throws IOException { - dictionary.putMulti(keys1, keys2, values, resultType.getDictionaryResultType(), responses); - } - - public void clear() throws IOException { - dictionary.clear(); - } - - public Optional> clear(byte[] key1, LLDeepMapResultType resultType) throws IOException { - return dictionary.clear(key1, resultType.getDictionaryResultType()); - } - - public Optional remove(byte[] key1, byte[] key2, LLDeepMapResultType resultType) throws IOException { - return dictionary.remove(key1, key2, resultType.getDictionaryResultType()); - } - - public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer> consumer) { - return dictionary.forEach(snapshot, parallelism, consumer); - } - - public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key1, CancellableBiConsumer consumer) { - return dictionary.forEach(snapshot, parallelism, key1, consumer); - } - - public void replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction, Entry>> consumer) throws IOException { - dictionary.replaceAll(parallelism, replaceKeys, consumer); - } - - public void replaceAll(int parallelism, boolean replaceKeys, byte[] key1, CancellableBiFunction> consumer) throws IOException { - dictionary.replaceAll(parallelism, replaceKeys, key1, consumer); - } - - public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableTriConsumer consumer) { - return dictionary.forEach(snapshot, parallelism, consumer); - } - - public void replaceAll(int parallelism, boolean replaceKeys, CancellableTriFunction> consumer) throws IOException { - dictionary.replaceAll(parallelism, replaceKeys, consumer); - } - - public long size(@Nullable LLSnapshot snapshot, boolean fast) throws IOException { - return dictionary.size(snapshot, fast); - } - - public long exactSize(@Nullable LLSnapshot snapshot, byte[] key1) { - return dictionary.exactSize(snapshot, key1); - } - - @Override - public String getDatabaseName() { - return dictionary.getDatabaseName(); - } - - public enum LLDeepMapResultType { - VOID, - VALUE_CHANGED, - PREVIOUS_VALUE; - - public LLDictionaryResultType getDictionaryResultType() { - switch (this) { - case VOID: - return LLDictionaryResultType.VOID; - case VALUE_CHANGED: - return LLDictionaryResultType.VALUE_CHANGED; - case PREVIOUS_VALUE: - return LLDictionaryResultType.PREVIOUS_VALUE; - } - - return LLDictionaryResultType.VOID; - } - } - - @Override - public String toString() { - return new StringJoiner(", ", LLDeepMap.class.getSimpleName() + "[", "]") - .add("dictionary=" + dictionary) - .toString(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - LLDeepMap llMap = (LLDeepMap) o; - return Objects.equals(dictionary, llMap.dictionary); - } - - @Override - public int hashCode() { - return Objects.hash(dictionary); - } -} diff --git a/src/main/java/it/cavallium/dbengine/database/structures/LLFixedDeepSet.java b/src/main/java/it/cavallium/dbengine/database/structures/LLFixedDeepSet.java deleted file mode 100644 index e31b2a5..0000000 --- a/src/main/java/it/cavallium/dbengine/database/structures/LLFixedDeepSet.java +++ /dev/null @@ -1,210 +0,0 @@ -package it.cavallium.dbengine.database.structures; - -import it.cavallium.dbengine.database.LLDeepDictionary; -import it.cavallium.dbengine.database.LLDictionaryResultType; -import it.cavallium.dbengine.database.LLKeyValueDatabaseStructure; -import it.cavallium.dbengine.database.LLSnapshot; -import it.cavallium.dbengine.database.LLUtils; -import it.unimi.dsi.fastutil.objects.ObjectSets.UnmodifiableSet; -import java.io.IOException; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Objects; -import java.util.Optional; -import java.util.StringJoiner; -import org.jetbrains.annotations.Nullable; -import org.warp.commonutils.functional.CancellableBiConsumer; -import org.warp.commonutils.functional.CancellableBiFunction; -import org.warp.commonutils.functional.CancellableConsumer; -import org.warp.commonutils.functional.CancellableFunction; -import org.warp.commonutils.functional.ConsumerResult; -import org.warp.commonutils.type.Bytes; -import org.warp.commonutils.type.UnmodifiableIterableMap; -import org.warp.commonutils.type.UnmodifiableIterableSet; -import org.warp.commonutils.type.UnmodifiableMap; - -/** - * A set in which keys and values must have a fixed size - */ -public class LLFixedDeepSet implements LLKeyValueDatabaseStructure { - - private static final byte[] EMPTY_VALUE = new byte[0]; - private static final Bytes EMPTY_VALUE_BYTES = new Bytes(EMPTY_VALUE); - private final LLDeepDictionary dictionary; - - public LLFixedDeepSet(LLDeepDictionary dictionary) { - this.dictionary = dictionary; - } - - private byte[][] generateEmptyArray(int length) { - byte[][] data = new byte[length][]; - for (int i = 0; i < length; i++) { - data[i] = EMPTY_VALUE; - } - return data; - } - - private Bytes[] generateEmptyBytesArray(int length) { - Bytes[] data = new Bytes[length]; - for (int i = 0; i < length; i++) { - data[i] = EMPTY_VALUE_BYTES; - } - return data; - } - - public UnmodifiableIterableSet get(@Nullable LLSnapshot snapshot, byte[] key1) throws IOException { - return dictionary.get(snapshot, key1).toUnmodifiableIterableKeysSet(byte[][]::new); - } - - public boolean contains(@Nullable LLSnapshot snapshot, byte[] key1, byte[] value) throws IOException { - return dictionary.contains(snapshot, key1, value); - } - - public boolean isEmpty(@Nullable LLSnapshot snapshot, byte[] key1) { - return dictionary.isEmpty(snapshot, key1); - } - - public boolean add(byte[] key1, byte[] value, LLDeepSetItemResultType resultType) throws IOException { - Optional response = dictionary.put(key1, value, EMPTY_VALUE, resultType.getDictionaryResultType()); - if (resultType == LLDeepSetItemResultType.VALUE_CHANGED) { - return LLUtils.responseToBoolean(response.orElseThrow()); - } - return false; - } - - public void addMulti(byte[] key1, byte[][] values) throws IOException { - dictionary.putMulti(key1, values, generateEmptyArray(values.length), LLDictionaryResultType.VOID, (x) -> {}); - } - - /** - * Note: this will remove previous elements because it replaces the entire set - */ - public void put(byte[] key1, UnmodifiableIterableSet values) throws IOException { - dictionary.put(key1, values.toUnmodifiableIterableMapSetValues(generateEmptyArray(values.size()))); - } - - public void putMulti(byte[][] keys1, UnmodifiableIterableSet[] values) throws IOException { - var fixedValues = new UnmodifiableIterableMap[values.length]; - for (int i = 0; i < values.length; i++) { - fixedValues[i] = values[i].toUnmodifiableIterableMapSetValues(generateEmptyArray(values[i].size())); - } - //noinspection unchecked - dictionary.putMulti(keys1, fixedValues); - } - - public void clear() throws IOException { - dictionary.clear(); - } - - public Optional> clear(byte[] key1, LLDeepSetResultType resultType) throws IOException { - Optional> response = dictionary.clear(key1, resultType.getDictionaryResultType()); - if (response.isEmpty()) { - return Optional.empty(); - } else { - return Optional.of(response.get().toUnmodifiableIterableKeysSet(byte[][]::new)); - } - } - - public boolean remove(byte[] key1, byte[] value, LLDeepSetItemResultType resultType) throws IOException { - Optional response = dictionary.remove(key1, value, resultType.getDictionaryResultType()); - if (resultType == LLDeepSetItemResultType.VALUE_CHANGED) { - return LLUtils.responseToBoolean(response.orElseThrow()); - } - return false; - } - - public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer> consumer) { - return dictionary.forEach(snapshot, parallelism, (key1, entries) -> consumer.acceptCancellable(key1, entries.toUnmodifiableIterableKeysSet(byte[][]::new))); - } - - public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key1, CancellableConsumer consumer) { - return dictionary.forEach(snapshot, parallelism, key1, (value, empty) -> consumer.acceptCancellable(value)); - } - - public void replaceAll(int parallelism, CancellableBiFunction, Entry>> consumer) throws IOException { - dictionary.replaceAll(parallelism, true, (key1, entries) -> { - var result = consumer.applyCancellable(key1, entries.toUnmodifiableIterableKeysSet(byte[][]::new)); - var resultItems = result.getValue().getValue().toArray(Bytes[]::new); - return result.copyStatusWith(Map.entry(result.getValue().getKey(), UnmodifiableMap.of(resultItems, generateEmptyArray(resultItems.length)))); - }); - } - - public void replaceAll(int parallelism, byte[] key1, CancellableFunction consumer) throws IOException { - dictionary.replaceAll(parallelism, true, key1, (value, empty) -> { - var changedValue = consumer.applyCancellable(value); - return changedValue.copyStatusWith(Map.entry(changedValue.getValue(), EMPTY_VALUE)); - }); - } - - public long size(@Nullable LLSnapshot snapshot, boolean fast) throws IOException { - return dictionary.size(snapshot, fast); - } - - public long exactSize(@Nullable LLSnapshot snapshot, byte[] key1) { - return dictionary.exactSize(snapshot, key1); - } - - @Override - public String getDatabaseName() { - return dictionary.getDatabaseName(); - } - - public enum LLDeepSetResultType { - VOID, - VALUE_CHANGED, - PREVIOUS_VALUE; - - public LLDictionaryResultType getDictionaryResultType() { - switch (this) { - case VOID: - return LLDictionaryResultType.VOID; - case VALUE_CHANGED: - return LLDictionaryResultType.VALUE_CHANGED; - case PREVIOUS_VALUE: - return LLDictionaryResultType.PREVIOUS_VALUE; - } - - return LLDictionaryResultType.VOID; - } - } - - public enum LLDeepSetItemResultType { - VOID, - VALUE_CHANGED; - - public LLDictionaryResultType getDictionaryResultType() { - switch (this) { - case VOID: - return LLDictionaryResultType.VOID; - case VALUE_CHANGED: - return LLDictionaryResultType.VALUE_CHANGED; - } - - return LLDictionaryResultType.VOID; - } - } - - @Override - public String toString() { - return new StringJoiner(", ", LLFixedDeepSet.class.getSimpleName() + "[", "]") - .add("dictionary=" + dictionary) - .toString(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - LLFixedDeepSet llMap = (LLFixedDeepSet) o; - return Objects.equals(dictionary, llMap.dictionary); - } - - @Override - public int hashCode() { - return Objects.hash(dictionary); - } -} diff --git a/src/main/java/it/cavallium/dbengine/database/structures/LLMap.java b/src/main/java/it/cavallium/dbengine/database/structures/LLMap.java deleted file mode 100644 index dae889e..0000000 --- a/src/main/java/it/cavallium/dbengine/database/structures/LLMap.java +++ /dev/null @@ -1,118 +0,0 @@ -package it.cavallium.dbengine.database.structures; - -import it.cavallium.dbengine.database.LLDictionary; -import it.cavallium.dbengine.database.LLDictionaryResultType; -import it.cavallium.dbengine.database.LLKeyValueDatabaseStructure; -import it.cavallium.dbengine.database.LLSnapshot; -import java.io.IOException; -import java.util.Map.Entry; -import java.util.Objects; -import java.util.Optional; -import java.util.StringJoiner; -import java.util.function.Consumer; -import org.jetbrains.annotations.Nullable; -import org.warp.commonutils.functional.CancellableBiConsumer; -import org.warp.commonutils.functional.CancellableBiFunction; -import org.warp.commonutils.functional.ConsumerResult; - -public class LLMap implements LLKeyValueDatabaseStructure { - - private final LLDictionary dictionary; - - public LLMap(LLDictionary dictionary) { - this.dictionary = dictionary; - } - - public Optional get(@Nullable LLSnapshot snapshot, byte[] key) throws IOException { - return dictionary.get(snapshot, key); - } - - public Optional put(byte[] key, byte[] value, LLMapResultType resultType) - throws IOException { - return dictionary.put(key, value, resultType.getDictionaryResultType()); - } - - public void putMulti(byte[][] key, byte[][] value, LLMapResultType resultType, - Consumer> results) throws IOException { - dictionary.putMulti(key, value, resultType.getDictionaryResultType(), - (result) -> results.accept(Optional.ofNullable(result.length == 0 ? null : result))); - } - - public boolean contains(@Nullable LLSnapshot snapshot, byte[] key) throws IOException { - return dictionary.contains(snapshot, key); - } - - public Optional remove(byte[] key, LLMapResultType resultType) throws IOException { - return dictionary.remove(key, resultType.getDictionaryResultType()); - } - - public void clear() throws IOException { - dictionary.clear(); - } - - public long size(@Nullable LLSnapshot snapshot, boolean fast) throws IOException { - return dictionary.size(snapshot, fast); - } - - /** - * The consumer can be called from different threads - */ - public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer consumer) { - return dictionary.forEach(snapshot, parallelism, consumer); - } - - /** - * The consumer can be called from different threads - */ - public ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction> consumer) throws IOException { - return dictionary.replaceAll(parallelism, replaceKeys, consumer); - } - - @Override - public String getDatabaseName() { - return dictionary.getDatabaseName(); - } - - public enum LLMapResultType { - VOID, - VALUE_CHANGED, - PREVIOUS_VALUE; - - public LLDictionaryResultType getDictionaryResultType() { - switch (this) { - case VOID: - return LLDictionaryResultType.VOID; - case VALUE_CHANGED: - return LLDictionaryResultType.VALUE_CHANGED; - case PREVIOUS_VALUE: - return LLDictionaryResultType.PREVIOUS_VALUE; - } - - return LLDictionaryResultType.VOID; - } - } - - @Override - public String toString() { - return new StringJoiner(", ", LLMap.class.getSimpleName() + "[", "]") - .add("dictionary=" + dictionary) - .toString(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - LLMap llMap = (LLMap) o; - return Objects.equals(dictionary, llMap.dictionary); - } - - @Override - public int hashCode() { - return Objects.hash(dictionary); - } -} diff --git a/src/main/java/it/cavallium/dbengine/database/structures/LLSet.java b/src/main/java/it/cavallium/dbengine/database/structures/LLSet.java deleted file mode 100644 index 50ac2db..0000000 --- a/src/main/java/it/cavallium/dbengine/database/structures/LLSet.java +++ /dev/null @@ -1,105 +0,0 @@ -package it.cavallium.dbengine.database.structures; - -import it.cavallium.dbengine.database.LLDictionary; -import it.cavallium.dbengine.database.LLDictionaryResultType; -import it.cavallium.dbengine.database.LLKeyValueDatabaseStructure; -import it.cavallium.dbengine.database.LLSnapshot; -import it.cavallium.dbengine.database.LLUtils; -import java.io.IOException; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; -import org.jetbrains.annotations.Nullable; -import org.warp.commonutils.functional.CancellableConsumer; -import org.warp.commonutils.functional.CancellableFunction; -import org.warp.commonutils.functional.ConsumerResult; - -public class LLSet implements LLKeyValueDatabaseStructure { - - private static final byte[] EMPTY_VALUE = new byte[0]; - private final LLDictionary dictionary; - - public LLSet(LLDictionary dictionary) { - this.dictionary = dictionary; - } - - @Override - public String getDatabaseName() { - return dictionary.getDatabaseName(); - } - - private byte[][] generateEmptyArray(int length) { - byte[][] data = new byte[length][]; - for (int i = 0; i < length; i++) { - data[i] = EMPTY_VALUE; - } - return data; - } - - public boolean contains(@Nullable LLSnapshot snapshot, byte[] value) throws IOException { - return dictionary.contains(snapshot, value); - } - - public boolean add(byte[] value, LLSetResultType resultType) throws IOException { - Optional response = dictionary.put(value, EMPTY_VALUE, resultType.getDictionaryResultType()); - if (resultType == LLSetResultType.VALUE_CHANGED) { - return LLUtils.responseToBoolean(response.orElseThrow()); - } - return false; - } - - public void addMulti(byte[][] values) throws IOException { - dictionary.putMulti(values, generateEmptyArray(values.length), LLDictionaryResultType.VOID, (x) -> {}); - } - - public boolean remove(byte[] value, LLSetResultType resultType) throws IOException { - Optional response = dictionary.remove(value, resultType.getDictionaryResultType()); - if (resultType == LLSetResultType.VALUE_CHANGED) { - return LLUtils.responseToBoolean(response.orElseThrow()); - } - return false; - } - - public void clearUnsafe() throws IOException { - dictionary.clear(); - } - - public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableConsumer consumer) { - return dictionary.forEach(snapshot, parallelism, (key, emptyValue) -> consumer.acceptCancellable(key)); - } - - public ConsumerResult replaceAll(int parallelism, CancellableFunction consumer) throws IOException { - return dictionary.replaceAll(parallelism, true, (key, emptyValue) -> { - var result = consumer.applyCancellable(key); - return result.copyStatusWith(Map.entry(result.getValue(), emptyValue)); - }); - } - - public long size(@Nullable LLSnapshot snapshot, boolean fast) throws IOException { - return dictionary.size(snapshot, fast); - } - - public boolean isEmptyUnsafe(@Nullable LLSnapshot snapshot) throws IOException { - return dictionary.isEmpty(snapshot); - } - - public Optional removeOneUnsafe() throws IOException { - return dictionary.removeOne().map(Entry::getKey); - } - - public enum LLSetResultType { - VOID, - VALUE_CHANGED; - - public LLDictionaryResultType getDictionaryResultType() { - switch (this) { - case VOID: - return LLDictionaryResultType.VOID; - case VALUE_CHANGED: - return LLDictionaryResultType.VALUE_CHANGED; - } - - return LLDictionaryResultType.VOID; - } - } -}