Asynchronous LLDictionary

This commit is contained in:
Andrea Cavalli 2021-01-30 00:24:55 +01:00
parent dd1fb834b5
commit 241b3fbee1
11 changed files with 566 additions and 1879 deletions

View File

@ -1,70 +0,0 @@
package it.cavallium.dbengine.database;
import java.io.IOException;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
import org.warp.commonutils.functional.CancellableBiConsumer;
import org.warp.commonutils.functional.CancellableBiFunction;
import org.warp.commonutils.functional.CancellableTriConsumer;
import org.warp.commonutils.functional.CancellableTriFunction;
import org.warp.commonutils.functional.ConsumerResult;
import org.warp.commonutils.type.Bytes;
import org.warp.commonutils.type.UnmodifiableIterableMap;
import org.warp.commonutils.type.UnmodifiableMap;
@NotAtomic
public interface LLDeepDictionary extends LLKeyValueDatabaseStructure {
UnmodifiableIterableMap<byte[], byte[]> get(@Nullable LLSnapshot snapshot, byte[] key1) throws IOException;
Optional<byte[]> get(@Nullable LLSnapshot snapshot, byte[] key1, byte[] key2) throws IOException;
boolean isEmpty(@Nullable LLSnapshot snapshot, byte[] key1);
boolean contains(@Nullable LLSnapshot snapshot, byte[] key1, byte[] key2) throws IOException;
/**
* Note: this will remove previous elements because it replaces the entire map of key
*/
void put(byte[] key1, UnmodifiableIterableMap<byte[], byte[]> value) throws IOException;
Optional<byte[]> put(byte[] key1, byte[] key2, byte[] value, LLDictionaryResultType resultType) throws IOException;
void putMulti(byte[][] keys1, UnmodifiableIterableMap<byte[], byte[]>[] values) throws IOException;
void putMulti(byte[] key1, byte[][] keys2, byte[][] values, LLDictionaryResultType resultType, Consumer<byte[]> responses) throws IOException;
void putMulti(byte[][] keys1, byte[][] keys2, byte[][] values, LLDictionaryResultType resultType, Consumer<byte[]> responses) throws IOException;
void clear() throws IOException;
Optional<UnmodifiableIterableMap<byte[], byte[]>> clear(byte[] key1, LLDictionaryResultType resultType) throws IOException;
Optional<byte[]> remove(byte[] key1, byte[] key2, LLDictionaryResultType resultType) throws IOException;
ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableTriConsumer<byte[], byte[], byte[]> consumer);
ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer<byte[], UnmodifiableIterableMap<byte[], byte[]>> consumer);
ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key1, CancellableBiConsumer<byte[], byte[]> consumer);
ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableTriFunction<byte[], byte[], byte[], ImmutableTriple<byte[], byte[], byte[]>> consumer) throws IOException;
ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction<byte[], UnmodifiableIterableMap<byte[], byte[]>, Entry<byte[], UnmodifiableMap<Bytes, byte[]>>> consumer) throws IOException;
ConsumerResult replaceAll(int parallelism, boolean replaceKeys, byte[] key1, CancellableBiFunction<byte[], byte[], Entry<byte[], byte[]>> consumer) throws IOException;
long size(@Nullable LLSnapshot snapshot, boolean fast) throws IOException;
long exactSize(@Nullable LLSnapshot snapshot, byte[] key1);
}

View File

@ -1,45 +1,45 @@
package it.cavallium.dbengine.database;
import java.io.IOException;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
import org.warp.commonutils.functional.CancellableBiConsumer;
import org.warp.commonutils.functional.CancellableBiFunction;
import org.warp.commonutils.functional.ConsumerResult;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@NotAtomic
public interface LLDictionary extends LLKeyValueDatabaseStructure {
Optional<byte[]> get(@Nullable LLSnapshot snapshot, byte[] key) throws IOException;
Mono<byte[]> get(@Nullable LLSnapshot snapshot, byte[] key);
boolean contains(@Nullable LLSnapshot snapshot, byte[] key) throws IOException;
Mono<byte[]> put(byte[] key, byte[] value, LLDictionaryResultType resultType);
Optional<byte[]> put(byte[] key, byte[] value, LLDictionaryResultType resultType)
throws IOException;
Mono<byte[]> remove(byte[] key, LLDictionaryResultType resultType);
void putMulti(byte[][] key, byte[][] value, LLDictionaryResultType resultType,
Consumer<byte[]> responses) throws IOException;
Flux<Entry<byte[], byte[]>> getMulti(@Nullable LLSnapshot snapshot, Flux<byte[]> keys);
Optional<byte[]> remove(byte[] key, LLDictionaryResultType resultType) throws IOException;
Flux<Entry<byte[], byte[]>> putMulti(Flux<Entry<byte[], byte[]>> entries, boolean getOldValues);
/**
* This method can call the consumer from different threads in parallel
*/
ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer<byte[], byte[]> consumer);
Flux<Entry<byte[], byte[]>> getRange(@Nullable LLSnapshot snapshot, LLRange range);
/**
* This method can call the consumer from different threads in parallel
*/
ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction<byte[], byte[], Entry<byte[], byte[]>> consumer) throws IOException;
Flux<Entry<byte[], byte[]>> setRange(LLRange range, Flux<Entry<byte[], byte[]>> entries, boolean getOldValues);
void clear() throws IOException;
long size(@Nullable LLSnapshot snapshot, boolean fast) throws IOException;
boolean isEmpty(@Nullable LLSnapshot snapshot) throws IOException;
Optional<Entry<byte[], byte[]>> removeOne() throws IOException;
default Mono<Void> replaceRange(LLRange range, boolean canKeysChange, Function<Entry<byte[], byte[]>, Mono<Entry<byte[], byte[]>>> entriesReplacer) {
Flux<Entry<byte[], byte[]>> replacedFlux = this.getRange(null, range).flatMap(entriesReplacer);
if (canKeysChange) {
return this
.setRange(range, replacedFlux, false)
.then();
} else {
return this
.putMulti(replacedFlux, false)
.then();
}
}
Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, LLRange range);
Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, LLRange range, boolean fast);
Mono<Entry<byte[], byte[]>> removeOne(LLRange range);
}

View File

@ -2,15 +2,11 @@ package it.cavallium.dbengine.database;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import it.cavallium.dbengine.database.structures.LLDeepMap;
import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import it.cavallium.dbengine.database.structures.LLFixedDeepSet;
import it.cavallium.dbengine.database.structures.LLInt;
import it.cavallium.dbengine.database.structures.LLLong;
import it.cavallium.dbengine.database.structures.LLMap;
import it.cavallium.dbengine.database.structures.LLSet;
public interface LLKeyValueDatabase extends Closeable, LLSnapshottable, LLKeyValueDatabaseStructure {
@ -19,30 +15,12 @@ public interface LLKeyValueDatabase extends Closeable, LLSnapshottable, LLKeyVal
LLDictionary getDictionary(byte[] columnName) throws IOException;
LLDeepDictionary getDeepDictionary(byte[] columnName, int keySize, int key2Size) throws IOException;
default LLSet getSet(String name) throws IOException {
LLDictionary dictionary = getDictionary(
Column.fixedSet(name).getName().getBytes(StandardCharsets.US_ASCII));
return new LLSet(dictionary);
default LLDictionary getSet(String name) throws IOException {
return getDictionary(Column.fixedSet(name).getName().getBytes(StandardCharsets.US_ASCII));
}
default LLMap getMap(String name) throws IOException {
LLDictionary dictionary = getDictionary(
Column.hashMap(name).getName().getBytes(StandardCharsets.US_ASCII));
return new LLMap(dictionary);
}
default LLFixedDeepSet getDeepSet(String name, int keySize, int key2Size) throws IOException {
LLDeepDictionary deepDictionary = getDeepDictionary(
Column.fixedSet(name).getName().getBytes(StandardCharsets.US_ASCII), keySize, key2Size);
return new LLFixedDeepSet(deepDictionary);
}
default LLDeepMap getDeepMap(String name, int keySize, int key2Size) throws IOException {
LLDeepDictionary deepDictionary = getDeepDictionary(
Column.hashMap(name).getName().getBytes(StandardCharsets.US_ASCII), keySize, key2Size);
return new LLDeepMap(deepDictionary);
default LLDictionary getMap(String name) throws IOException {
return getDictionary(Column.hashMap(name).getName().getBytes(StandardCharsets.US_ASCII));
}
default LLInt getInteger(String singletonListName, String name, int defaultValue)

View File

@ -0,0 +1,87 @@
package it.cavallium.dbengine.database;
import java.util.Arrays;
import java.util.StringJoiner;
public class LLRange {
private static final LLRange RANGE_ALL = new LLRange(null, null);
private final byte[] min;
private final byte[] max;
private LLRange(byte[] min, byte[] max) {
this.min = min;
this.max = max;
}
public static LLRange all() {
return RANGE_ALL;
}
public static LLRange from(byte[] min) {
return new LLRange(min, null);
}
public static LLRange to(byte[] max) {
return new LLRange(null, max);
}
public boolean isAll() {
return min == null && max == null;
}
public boolean isSingle() {
if (min == null || max == null) return false;
return Arrays.equals(min, max);
}
public boolean hasMin() {
return min != null;
}
public byte[] getMin() {
assert min != null;
return min;
}
public boolean hasMax() {
return max != null;
}
public byte[] getMax() {
assert max != null;
return max;
}
public byte[] getSingle() {
assert isSingle();
return min;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LLRange llRange = (LLRange) o;
return Arrays.equals(min, llRange.min) && Arrays.equals(max, llRange.max);
}
@Override
public int hashCode() {
int result = Arrays.hashCode(min);
result = 31 * result + Arrays.hashCode(max);
return result;
}
@Override
public String toString() {
return new StringJoiner(", ", LLRange.class.getSimpleName() + "[", "]")
.add("min=" + Arrays.toString(min))
.add("max=" + Arrays.toString(max))
.toString();
}
}

View File

@ -1,918 +0,0 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.LLDeepDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.FlushOptions;
import org.rocksdb.Holder;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Snapshot;
import org.rocksdb.WriteBatchInterface;
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
import org.warp.commonutils.error.IndexOutOfBoundsException;
import org.warp.commonutils.functional.CancellableBiConsumer;
import org.warp.commonutils.functional.CancellableBiFunction;
import org.warp.commonutils.functional.CancellableTriConsumer;
import org.warp.commonutils.functional.CancellableTriFunction;
import org.warp.commonutils.functional.ConsumerResult;
import org.warp.commonutils.type.Bytes;
import org.warp.commonutils.type.UnmodifiableIterableMap;
import org.warp.commonutils.type.UnmodifiableMap;
@NotAtomic
public class LLLocalDeepDictionary implements LLDeepDictionary {
private static final byte[] NO_DATA = new byte[0];
private static final byte[][] NO_DATA_MAP = new byte[0][0];
private static final ReadOptions EMPTY_READ_OPTIONS = new ReadOptions();
private final RocksDB db;
private final ColumnFamilyHandle cfh;
private final String databaseName;
private final Function<LLSnapshot, Snapshot> snapshotResolver;
private final int key1Size;
private final int key2Size;
private final int key1Position;
private final int key2Position;
private final int key1EndPosition;
private final int key2EndPosition;
private final int combinedKeySize;
public LLLocalDeepDictionary(@NotNull RocksDB db, @NotNull ColumnFamilyHandle columnFamilyHandle,
String databaseName,
Function<LLSnapshot, Snapshot> snapshotResolver, int keySize, int key2Size) {
Objects.requireNonNull(db);
this.db = db;
Objects.requireNonNull(columnFamilyHandle);
this.cfh = columnFamilyHandle;
this.databaseName = databaseName;
this.snapshotResolver = snapshotResolver;
this.key1Size = keySize;
this.key2Size = key2Size;
this.key1Position = 0;
this.key2Position = key1Size;
this.key1EndPosition = key1Position + key1Size;
this.key2EndPosition = key2Position + key2Size;
this.combinedKeySize = keySize + key2Size;
}
@Override
public String getDatabaseName() {
return databaseName;
}
@SuppressWarnings("BooleanMethodIsAlwaysInverted")
private boolean isSubKey(byte[] key1, byte[] combinedKey) {
if (key1 == null || combinedKey == null || key1.length != key1Size || combinedKey.length != combinedKeySize) {
return false;
}
return Arrays.equals(key1, 0, key1Size, combinedKey, key1Position, key1EndPosition);
}
private byte[] getStartSeekKey(byte[] key1) {
if (key1.length != key1Size) {
throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
}
return Arrays.copyOf(key1, combinedKeySize);
}
private byte[] getEndSeekKey(byte[] key1) {
if (key1.length != key1Size) {
throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
}
byte[] endSeekKey = Arrays.copyOf(key1, combinedKeySize);
Arrays.fill(endSeekKey, key2Position, key2EndPosition, (byte) 0xFF);
return endSeekKey;
}
@NotNull
private byte[] getKey1(@NotNull byte[] combinedKey) {
if (combinedKey.length != combinedKeySize) {
throw new IndexOutOfBoundsException(combinedKey.length, combinedKeySize, combinedKeySize);
}
return Arrays.copyOfRange(combinedKey, key1Position, key1EndPosition);
}
@NotNull
private byte[] getKey2(@NotNull byte[] combinedKey) {
return Arrays.copyOfRange(combinedKey, key2Position, key2EndPosition);
}
@NotNull
private byte[] getCombinedKey(@NotNull byte[] key1, @NotNull byte[] key2) {
if (key1.length != key1Size) {
throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
}
if (key2.length != key2Size) {
throw new IndexOutOfBoundsException(key2.length, key2Size, key2Size);
}
var combinedKey = new byte[combinedKeySize];
System.arraycopy(key1, 0, combinedKey, key1Position, key1Size);
System.arraycopy(key2, 0, combinedKey, key2Position, key2Size);
return combinedKey;
}
private ReadOptions resolveSnapshot(LLSnapshot snapshot) {
if (snapshot != null) {
return new ReadOptions().setSnapshot(snapshotResolver.apply(snapshot));
} else {
return EMPTY_READ_OPTIONS;
}
}
@Override
public UnmodifiableIterableMap<byte[], byte[]> get(@Nullable LLSnapshot snapshot, byte[] key) throws IOException {
if (key.length != key1Size) {
throw new IndexOutOfBoundsException(key.length, key1Size, key1Size);
}
ObjectArrayList<byte[]> keys = new ObjectArrayList<>();
ObjectArrayList<byte[]> values = new ObjectArrayList<>();
try (var iterator = db.newIterator(cfh, resolveSnapshot(snapshot))) {
iterator.seek(key);
while (iterator.isValid()) {
byte[] combinedKey = iterator.key();
if (!isSubKey(key, combinedKey)) {
break;
}
byte[] key2 = getKey2(combinedKey);
byte[] value = iterator.value();
keys.add(key2);
values.add(value);
iterator.next();
}
}
return UnmodifiableIterableMap.of(keys.toArray(byte[][]::new), values.toArray(byte[][]::new));
}
@Override
public Optional<byte[]> get(@Nullable LLSnapshot snapshot, byte[] key1, byte[] key2) throws IOException {
if (key1.length != key1Size) {
throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
}
if (key2.length != key2Size) {
throw new IndexOutOfBoundsException(key2.length, key2Size, key2Size);
}
try {
Holder<byte[]> data = new Holder<>();
byte[] combinedKey = getCombinedKey(key1, key2);
if (db.keyMayExist(cfh, resolveSnapshot(snapshot), combinedKey, data)) {
if (data.getValue() != null) {
return Optional.of(data.getValue());
} else {
byte[] value = db.get(cfh, resolveSnapshot(snapshot), combinedKey);
return Optional.ofNullable(value);
}
} else {
return Optional.empty();
}
} catch (RocksDBException e) {
throw new IOException(e);
}
}
@Override
public boolean isEmpty(@Nullable LLSnapshot snapshot, byte[] key1) {
if (key1.length != key1Size) {
throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
}
byte[] startSeekKey = getStartSeekKey(key1);
try (var iterator = db.newIterator(cfh, resolveSnapshot(snapshot))) {
iterator.seek(startSeekKey);
if (!iterator.isValid()) {
return true;
}
byte[] startKey = iterator.key();
return !isSubKey(key1, startKey);
}
}
@Override
public boolean contains(@Nullable LLSnapshot snapshot, byte[] key1, byte[] key2) throws IOException {
if (key1.length != key1Size) {
throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
}
if (key2.length != key2Size) {
throw new IndexOutOfBoundsException(key2.length, key2Size, key2Size);
}
try {
var combinedKey = getCombinedKey(key1, key2);
int size = RocksDB.NOT_FOUND;
Holder<byte[]> data = new Holder<>();
if (db.keyMayExist(cfh, resolveSnapshot(snapshot), combinedKey, data)) {
if (data.getValue() != null) {
size = data.getValue().length;
} else {
size = db.get(cfh, resolveSnapshot(snapshot), combinedKey, NO_DATA);
}
}
return size != RocksDB.NOT_FOUND;
} catch (RocksDBException e) {
throw new IOException(e);
}
}
//todo: use WriteBatch to enhance performance
@Override
public void put(byte[] key1, UnmodifiableIterableMap<byte[], byte[]> value) throws IOException {
if (key1.length != key1Size) {
throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
}
try {
var bytesValue = Bytes.ofMap(value);
var alreadyEditedKeys = new ObjectOpenHashSet<Bytes>();
// Delete old keys and change keys that are already present
try (var iterator = db.newIterator(cfh)) {
iterator.seek(getStartSeekKey(key1));
while (iterator.isValid()) {
byte[] combinedKey = iterator.key();
if (!isSubKey(key1, combinedKey)) {
// The key is outside of key1: exit from the iteration
break;
}
byte[] key2 = getKey2(combinedKey);
var valueToSetHere = bytesValue.get(key2);
if (valueToSetHere == null) {
// key not present in the new data: remove it from the database
db.delete(cfh, combinedKey);
} else {
// key present in the new data: replace it on the database
alreadyEditedKeys.add(new Bytes(key2));
db.put(cfh, combinedKey, valueToSetHere.data);
}
iterator.next();
}
}
// Add new keys, avoiding to add already changed keys
var mapIterator = bytesValue.fastIterator();
while (mapIterator.hasNext()) {
var mapEntry = mapIterator.next();
var key2 = mapEntry.getKey();
if (key2.data.length != key2Size) {
throw new IndexOutOfBoundsException(key2.data.length, key2Size, key2Size);
}
if (!alreadyEditedKeys.contains(key2)) {
var value2 = mapEntry.getValue();
db.put(cfh, getCombinedKey(key1, key2.data), value2.data);
}
}
} catch (RocksDBException ex) {
throw new IOException(ex);
}
}
//todo: use WriteBatch to enhance performance
@Override
public void putMulti(byte[][] keys1, UnmodifiableIterableMap<byte[], byte[]>[] values) throws IOException {
if (keys1.length == values.length) {
for (int i = 0; i < keys1.length; i++) {
put(keys1[i], values[i]);
}
} else {
throw new IOException("Wrong parameters count");
}
}
@Override
public Optional<byte[]> put(byte[] key1, byte[] key2, byte[] value, LLDictionaryResultType resultType)
throws IOException {
if (key1.length != key1Size) {
throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
}
if (key2.length != key2Size) {
throw new IndexOutOfBoundsException(key2.length, key2Size, key2Size);
}
try {
byte[] response = null;
var combinedKey = getCombinedKey(key1, key2);
switch (resultType) {
case VALUE_CHANGED:
response = LLUtils.booleanToResponse(!this.contains(null, key1, key2));
break;
case PREVIOUS_VALUE:
var data = new Holder<byte[]>();
if (db.keyMayExist(cfh, combinedKey, data)) {
if (data.getValue() != null) {
response = data.getValue();
} else {
response = db.get(cfh, combinedKey);
}
} else {
response = null;
}
break;
}
db.put(cfh, combinedKey, value);
return Optional.ofNullable(response);
} catch (RocksDBException e) {
throw new IOException(e);
}
}
//todo: use WriteBatch to enhance performance
@Override
public void putMulti(byte[] key1,
byte[][] keys2,
byte[][] values2,
LLDictionaryResultType resultType,
Consumer<byte[]> responses) throws IOException {
if (keys2.length == values2.length) {
for (int i = 0; i < keys2.length; i++) {
var result = put(key1, keys2[i], values2[i], resultType);
if (resultType != LLDictionaryResultType.VOID) {
responses.accept(result.orElse(NO_DATA));
}
}
} else {
throw new IOException("Wrong parameters count");
}
}
//todo: use WriteBatch to enhance performance
@Override
public void putMulti(byte[][] keys1,
byte[][] keys2,
byte[][] values2,
LLDictionaryResultType resultType,
Consumer<byte[]> responses) throws IOException {
if (keys1.length == keys2.length && keys2.length == values2.length) {
for (int i = 0; i < keys1.length; i++) {
var result = put(keys1[i], keys2[i], values2[i], resultType);
if (resultType != LLDictionaryResultType.VOID) {
responses.accept(result.orElse(NO_DATA));
}
}
} else {
throw new IOException("Wrong parameters count");
}
}
@Override
public Optional<byte[]> remove(byte[] key1, byte[] key2, LLDictionaryResultType resultType) throws IOException {
if (key1.length != key1Size) {
throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
}
if (key2.length != key2Size) {
throw new IndexOutOfBoundsException(key2.length, key2Size, key2Size);
}
try {
byte[] response = null;
var combinedKey = getCombinedKey(key1, key2);
switch (resultType) {
case VALUE_CHANGED:
response = LLUtils.booleanToResponse(this.contains(null, key1, key2));
break;
case PREVIOUS_VALUE:
var data = new Holder<byte[]>();
if (db.keyMayExist(cfh, combinedKey, data)) {
if (data.getValue() != null) {
response = data.getValue();
} else {
response = db.get(cfh, combinedKey);
}
} else {
response = null;
}
break;
}
db.delete(cfh, combinedKey);
return Optional.ofNullable(response);
} catch (RocksDBException e) {
throw new IOException(e);
}
}
@Override
public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableTriConsumer<byte[], byte[], byte[]> consumer) {
return forEach_(consumer, snapshot == null ? null : snapshotResolver.apply(snapshot), parallelism);
}
//todo: implement parallel execution
private ConsumerResult forEach_(CancellableTriConsumer<byte[], byte[], byte[]> consumer, @Nullable Snapshot snapshot, int parallelism) {
try (RocksIterator iterator = (snapshot != null ? db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot))
: db.newIterator(cfh))) {
iterator.seekToFirst();
while (iterator.isValid()) {
var combinedKey = iterator.key();
var key1 = getKey1(combinedKey);
var key2 = getKey2(combinedKey);
var result = consumer.acceptCancellable(key1, key2, iterator.value());
if (result.isCancelled()) {
return ConsumerResult.cancelNext();
}
iterator.next();
}
return ConsumerResult.result();
}
}
@Override
public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer<byte[], UnmodifiableIterableMap<byte[], byte[]>> consumer) {
return forEach_(consumer, snapshot == null ? null : snapshotResolver.apply(snapshot), parallelism);
}
//todo: implement parallel execution
private ConsumerResult forEach_(CancellableBiConsumer<byte[], UnmodifiableIterableMap<byte[], byte[]>> consumer, @Nullable Snapshot snapshot, int parallelism) {
try (RocksIterator iterator = (snapshot != null ? db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot))
: db.newIterator(cfh))) {
iterator.seekToFirst();
byte[] currentKey1 = null;
// only append or iterate on this object! byte[].equals() and hash is not trustworthy!
List<byte[]> key2Keys = null;
// only append or iterate on this object! byte[].equals() and hash is not trustworthy!
List<byte[]> key2Values = null;
while (iterator.isValid()) {
var combinedKey = iterator.key();
var key1 = getKey1(combinedKey);
if (currentKey1 == null || !Arrays.equals(currentKey1, key1)) {
if (currentKey1 != null && !key2Values.isEmpty()) {
var result = consumer.acceptCancellable(currentKey1, UnmodifiableIterableMap.of(key2Keys.toArray(byte[][]::new), key2Values.toArray(byte[][]::new)));
if (result.isCancelled()) {
return ConsumerResult.cancelNext();
}
}
currentKey1 = key1;
key2Keys = new ArrayList<>();
key2Values = new ArrayList<>();
}
key2Keys.add(getKey2(combinedKey));
key2Values.add(iterator.value());
iterator.next();
}
if (currentKey1 != null && !key2Values.isEmpty()) {
var result = consumer.acceptCancellable(currentKey1, UnmodifiableIterableMap.of(key2Keys.toArray(byte[][]::new), key2Values.toArray(byte[][]::new)));
if (result.isCancelled()) {
return ConsumerResult.cancelNext();
}
}
return ConsumerResult.result();
}
}
@Override
public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key, CancellableBiConsumer<byte[], byte[]> consumer) {
return forEach_(key, consumer, snapshot == null ? null : snapshotResolver.apply(snapshot), parallelism);
}
//todo: implement parallel execution
private ConsumerResult forEach_(byte[] key1, CancellableBiConsumer<byte[], byte[]> consumer, @Nullable Snapshot snapshot, int parallelism) {
try (RocksIterator iterator = (snapshot != null ? db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot))
: db.newIterator(cfh))) {
iterator.seek(getStartSeekKey(key1));
while (iterator.isValid()) {
byte[] combinedKey = iterator.key();
if (!isSubKey(key1, combinedKey)) {
// The key is outside of key1: exit from the iteration
break;
}
byte[] key2 = getKey2(combinedKey);
byte[] value2 = iterator.value();
var result = consumer.acceptCancellable(key2, value2);
if (result.isCancelled()) {
return ConsumerResult.cancelNext();
}
iterator.next();
}
return ConsumerResult.result();
}
}
//todo: implement parallel execution
//todo: implement replaceKeys = false optimization (like in LLLocalDictionary), check if it's feasible
@Override
public ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableTriFunction<byte[], byte[], byte[], ImmutableTriple<byte[], byte[], byte[]>> consumer) throws IOException {
var snapshot = db.getSnapshot();
try {
try (RocksIterator iter = db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot));
CappedWriteBatch writeBatch = new CappedWriteBatch(db, LLLocalDictionary.CAPPED_WRITE_BATCH_CAP, LLLocalDictionary.RESERVED_WRITE_BATCH_SIZE, LLLocalDictionary.MAX_WRITE_BATCH_SIZE, LLLocalDictionary.BATCH_WRITE_OPTIONS)) {
iter.seekToFirst();
while (iter.isValid()) {
writeBatch.delete(cfh, iter.key());
iter.next();
}
iter.seekToFirst();
while (iter.isValid()) {
var combinedKey = iter.key();
var key1 = getKey1(combinedKey);
var key2 = getKey2(combinedKey);
var result = consumer.applyCancellable(key1, key2, iter.value());
if (result.getValue().getLeft().length != key1Size) {
throw new IndexOutOfBoundsException(result.getValue().getLeft().length, key1Size, key1Size);
}
if (result.getValue().getMiddle().length != key2Size) {
throw new IndexOutOfBoundsException(result.getValue().getMiddle().length, key2Size, key2Size);
}
writeBatch.put(cfh, getCombinedKey(result.getValue().getLeft(), result.getValue().getMiddle()), result.getValue().getRight());
if (result.isCancelled()) {
// Cancels and discards the write batch
writeBatch.clear();
return ConsumerResult.cancelNext();
}
iter.next();
}
writeBatch.writeToDbAndClose();
return ConsumerResult.result();
}
} catch (RocksDBException ex) {
throw new IOException(ex);
} finally {
db.releaseSnapshot(snapshot);
snapshot.close();
}
}
//todo: implement parallel execution
//todo: implement replaceKeys = false optimization (like in LLLocalDictionary), check if it's feasible
@Override
public ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction<byte[], UnmodifiableIterableMap<byte[], byte[]>, Entry<byte[], UnmodifiableMap<Bytes, byte[]>>> consumer)
throws IOException {
try {
var snapshot = db.getSnapshot();
try (RocksIterator iter = db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot));
CappedWriteBatch writeBatch = new CappedWriteBatch(db, LLLocalDictionary.CAPPED_WRITE_BATCH_CAP, LLLocalDictionary.RESERVED_WRITE_BATCH_SIZE, LLLocalDictionary.MAX_WRITE_BATCH_SIZE, LLLocalDictionary.BATCH_WRITE_OPTIONS)) {
iter.seekToFirst();
while (iter.isValid()) {
writeBatch.delete(cfh, iter.key());
iter.next();
}
iter.seekToFirst();
byte[] currentKey1 = null;
// only append or iterate on this object! byte[].equals() and hash is not trustworthy!
ObjectArrayList<byte[]> key2Keys = null;
// only append or iterate on this object! byte[].equals() and hash is not trustworthy!
ObjectArrayList<byte[]> key2Values = null;
while (iter.isValid()) {
var combinedKey = iter.key();
var key1 = getKey1(combinedKey);
if (currentKey1 == null || !Arrays.equals(currentKey1, key1)) {
if (currentKey1 != null && !key2Values.isEmpty()) {
var result = replaceAll_(writeBatch,
currentKey1,
key2Keys.toArray(byte[][]::new),
key2Values.toArray(byte[][]::new),
consumer
);
if (result.isCancelled()) {
// Cancels and discards the write batch
writeBatch.clear();
return ConsumerResult.cancelNext();
}
}
currentKey1 = key1;
key2Keys = new ObjectArrayList<>();
key2Values = new ObjectArrayList<>();
}
key2Keys.add(getKey2(combinedKey));
key2Values.add(iter.value());
iter.next();
}
if (currentKey1 != null && !key2Values.isEmpty()) {
var result = replaceAll_(writeBatch,
currentKey1,
key2Keys.toArray(byte[][]::new),
key2Values.toArray(byte[][]::new),
consumer
);
if (result.isCancelled()) {
// Cancels and discards the write batch
writeBatch.clear();
return ConsumerResult.cancelNext();
}
}
writeBatch.writeToDbAndClose();
return ConsumerResult.result();
} finally {
db.releaseSnapshot(snapshot);
snapshot.close();
}
} catch (RocksDBException exception) {
throw new IOException(exception);
}
}
private ConsumerResult replaceAll_(WriteBatchInterface writeBatch,
byte[] key1,
byte[][] key2Keys,
byte[][] key2Values,
CancellableBiFunction<byte[], UnmodifiableIterableMap<byte[], byte[]>, Entry<byte[], UnmodifiableMap<Bytes, byte[]>>> consumer)
throws RocksDBException {
if (key1.length != key1Size) {
throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
}
var previousValues = UnmodifiableMap.of(key2Keys, key2Values);
var result = consumer.applyCancellable(key1, previousValues);
var resultKey1 = result.getValue().getKey();
if (resultKey1.length != key1Size) {
throw new IndexOutOfBoundsException(resultKey1.length, key1Size, key1Size);
}
var resultValues = result.getValue().getValue();
var mapIterator = resultValues.fastIterator();
while (mapIterator.hasNext()) {
var mapEntry = mapIterator.next();
var key2 = mapEntry.getKey();
if (key2.data.length != key2Size) {
throw new IndexOutOfBoundsException(key2.data.length, key2Size, key2Size);
}
var value2 = mapEntry.getValue();
writeBatch.put(cfh, getCombinedKey(key1, key2.data), value2);
if (result.isCancelled()) {
// Cancels and discards the write batch
writeBatch.clear();
return ConsumerResult.cancelNext();
}
}
return ConsumerResult.result();
}
//todo: implement parallel execution
//todo: implement replaceKeys = false optimization (like in LLLocalDictionary), check if it's feasible
@Override
public ConsumerResult replaceAll(int parallelism, boolean replaceKeys, byte[] key1, CancellableBiFunction<byte[], byte[], Entry<byte[], byte[]>> consumer) throws IOException {
if (key1.length != key1Size) {
throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
}
try {
var snapshot = db.getSnapshot();
try (RocksIterator iter = db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot));
CappedWriteBatch writeBatch = new CappedWriteBatch(db, LLLocalDictionary.CAPPED_WRITE_BATCH_CAP, LLLocalDictionary.RESERVED_WRITE_BATCH_SIZE, LLLocalDictionary.MAX_WRITE_BATCH_SIZE, LLLocalDictionary.BATCH_WRITE_OPTIONS)) {
iter.seek(getStartSeekKey(key1));
while (iter.isValid()) {
byte[] combinedKey = iter.key();
if (!isSubKey(key1, combinedKey)) {
// The key is outside of key1: exit from the iteration
break;
}
writeBatch.delete(cfh, combinedKey);
iter.next();
}
iter.seek(getStartSeekKey(key1));
while (iter.isValid()) {
byte[] combinedKey = iter.key();
if (!isSubKey(key1, combinedKey)) {
// The key is outside of key1: exit from the iteration
break;
}
byte[] key2 = getKey2(combinedKey);
byte[] value2 = iter.value();
var result = consumer.applyCancellable(key2, value2);
if (result.getValue().getKey().length != key2Size) {
throw new IndexOutOfBoundsException(result.getValue().getKey().length, key2Size, key2Size);
}
writeBatch.put(cfh, result.getValue().getKey(), result.getValue().getValue());
if (result.isCancelled()) {
// Cancels and discards the write batch
writeBatch.clear();
return ConsumerResult.cancelNext();
}
iter.next();
}
writeBatch.writeToDbAndClose();
return ConsumerResult.result();
} finally {
db.releaseSnapshot(snapshot);
snapshot.close();
}
} catch (RocksDBException e) {
throw new IOException(e);
}
}
// This method is exactly the same of LLLocalDictionary. Remember to keep the code equal
@Override
public void clear() throws IOException {
try {
List<byte[]> ranges = new ArrayList<>();
byte[] firstKey = null;
byte[] lastKey = null;
boolean empty = false;
while (!empty) {
// retrieve the range extremities
try (RocksIterator iter = db.newIterator(cfh)) {
iter.seekToFirst();
if (iter.isValid()) {
firstKey = iter.key();
iter.seekToLast();
lastKey = iter.key();
ranges.add(firstKey);
ranges.add(lastKey);
} else {
empty = true;
}
}
if (!empty) {
if (Arrays.equals(firstKey, lastKey)) {
// Delete single key
db.delete(cfh, lastKey);
} else {
// Delete all
db.deleteRange(cfh, firstKey, lastKey);
// Delete the end because it's not included in the deleteRange domain
db.delete(cfh, lastKey);
}
}
}
// Delete files related
db.deleteFilesInRanges(cfh, ranges, true);
// Compact range
db.compactRange(cfh);
db.flush(new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true), cfh);
db.flushWal(true);
var finalSize = exactSize(null);
if (finalSize != 0) {
throw new IllegalStateException("The dictionary is not empty after calling clear()");
}
} catch (RocksDBException e) {
throw new IOException(e);
}
}
@Override
public Optional<UnmodifiableIterableMap<byte[], byte[]>> clear(byte[] key1, LLDictionaryResultType resultType)
throws IOException {
if (key1.length != key1Size) {
throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
}
try {
Optional<UnmodifiableIterableMap<byte[], byte[]>> result;
switch (resultType) {
case PREVIOUS_VALUE:
List<byte[]> keys = new ArrayList<>();
List<byte[]> values = new ArrayList<>();
try (RocksIterator iter = db.newIterator(cfh)) {
iter.seek(getStartSeekKey(key1));
while (iter.isValid()) {
var combinedKey = iter.key();
if (!isSubKey(key1, combinedKey)) {
break;
}
keys.add(getKey2(combinedKey));
values.add(iter.value());
}
}
result = Optional.of(UnmodifiableIterableMap.of(keys.toArray(byte[][]::new), values.toArray(byte[][]::new)));
break;
case VALUE_CHANGED:
if (isEmpty(null, key1)) {
result = Optional.empty();
} else {
result = Optional.of(UnmodifiableIterableMap.of(NO_DATA_MAP, NO_DATA_MAP));
}
break;
case VOID:
default:
result = Optional.empty();
break;
}
db.deleteRange(cfh, getStartSeekKey(key1), getEndSeekKey(key1));
return result;
} catch (RocksDBException ex) {
throw new IOException(ex);
}
}
@Override
public long size(@Nullable LLSnapshot snapshot, boolean fast) {
return fast ? fastSize(snapshot) : exactSize(snapshot);
}
public long fastSize(@Nullable LLSnapshot snapshot) {
try {
if (snapshot != null) {
return this.exactSize(snapshot);
}
return db.getLongProperty(cfh, "rocksdb.estimate-num-keys");
} catch (RocksDBException e) {
e.printStackTrace();
return 0;
}
}
public long exactSize(@Nullable LLSnapshot snapshot) {
long count = 0;
byte[] currentKey1 = null;
try (RocksIterator iter = db.newIterator(cfh, resolveSnapshot(snapshot))) {
iter.seekToFirst();
while (iter.isValid()) {
byte[] combinedKey = iter.key();
if (!isSubKey(currentKey1, combinedKey)) {
count++;
currentKey1 = getKey1(combinedKey);
}
iter.next();
}
return count;
}
}
@Override
public long exactSize(@Nullable LLSnapshot snapshot, byte[] key1) {
if (key1.length != key1Size) {
throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
}
long count = 0;
try (RocksIterator iterator = db.newIterator(cfh, resolveSnapshot(snapshot))) {
iterator.seek(getStartSeekKey(key1));
while (iterator.isValid()) {
byte[] combinedKey = iterator.key();
if (!isSubKey(key1, combinedKey)) {
// The key is outside of key1: exit from the iteration
break;
}
count++;
iterator.next();
}
}
return count;
}
}

View File

@ -2,17 +2,18 @@ package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ -27,9 +28,12 @@ import org.rocksdb.Snapshot;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
import org.warp.commonutils.functional.CancellableBiConsumer;
import org.warp.commonutils.functional.CancellableBiFunction;
import org.warp.commonutils.functional.ConsumerResult;
import org.warp.commonutils.type.VariableWrapper;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@NotAtomic
public class LLLocalDictionary implements LLDictionary {
@ -40,6 +44,7 @@ public class LLLocalDictionary implements LLDictionary {
static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations
static final WriteOptions BATCH_WRITE_OPTIONS = new WriteOptions().setLowPri(true);
private static final byte[] FIRST_KEY = new byte[]{};
private static final byte[] NO_DATA = new byte[0];
private static final ReadOptions EMPTY_READ_OPTIONS = new ReadOptions();
private static final List<byte[]> EMPTY_UNMODIFIABLE_LIST = List.of();
@ -82,31 +87,61 @@ public class LLLocalDictionary implements LLDictionary {
}
@Override
public Optional<byte[]> get(@Nullable LLSnapshot snapshot, byte[] key) throws IOException {
try {
public Mono<byte[]> get(@Nullable LLSnapshot snapshot, byte[] key) {
return Mono
.fromCallable(() -> {
Holder<byte[]> data = new Holder<>();
if (db.keyMayExist(cfh, resolveSnapshot(snapshot), key, data)) {
if (data.getValue() != null) {
return Optional.of(data.getValue());
return data.getValue();
} else {
byte[] value = db.get(cfh, resolveSnapshot(snapshot), key);
return Optional.ofNullable(value);
return db.get(cfh, resolveSnapshot(snapshot), key);
}
} else {
return Optional.empty();
}
} catch (RocksDBException e) {
throw new IOException(e);
return null;
}
})
.onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic());
}
@Override
public boolean contains(@Nullable LLSnapshot snapshot, byte[] key) throws IOException {
return contains_(snapshot, key);
public Mono<Boolean> isEmpty(@Nullable LLSnapshot snapshot, LLRange range) {
if (range.isSingle()) {
return containsKey(snapshot, range.getSingle()).map(contains -> !contains);
} else {
return containsRange(snapshot, range).map(contains -> !contains);
}
}
private boolean contains_(@Nullable LLSnapshot snapshot, byte[] key) throws IOException {
try {
public Mono<Boolean> containsRange(@Nullable LLSnapshot snapshot, LLRange range) {
return Mono
.fromCallable(() -> {
try (RocksIterator iter = db.newIterator(cfh, resolveSnapshot(snapshot))) {
if (range.hasMin()) {
iter.seek(range.getMin());
} else {
iter.seekToFirst();
}
if (!iter.isValid()) {
return false;
}
if (range.hasMax()) {
byte[] key1 = iter.key();
return Arrays.compareUnsigned(key1, range.getMax()) <= 0;
} else {
return true;
}
}
})
.onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic());
}
private Mono<Boolean> containsKey(@Nullable LLSnapshot snapshot, byte[] key) {
return Mono
.fromCallable(() -> {
int size = RocksDB.NOT_FOUND;
Holder<byte[]> data = new Holder<>();
if (db.keyMayExist(cfh, resolveSnapshot(snapshot), key, data)) {
@ -117,67 +152,141 @@ public class LLLocalDictionary implements LLDictionary {
}
}
return size != RocksDB.NOT_FOUND;
} catch (RocksDBException e) {
throw new IOException(e);
}
})
.onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic());
}
@Override
public Optional<byte[]> put(byte[] key, byte[] value, LLDictionaryResultType resultType) throws IOException {
try {
byte[] response = null;
public Mono<byte[]> put(byte[] key, byte[] value, LLDictionaryResultType resultType) {
Mono<byte[]> response = null;
switch (resultType) {
case VALUE_CHANGED:
response = LLUtils.booleanToResponse(!contains_(null, key));
response = containsKey(null, key).single().map(LLUtils::booleanToResponse);
break;
case PREVIOUS_VALUE:
response = Mono
.fromCallable(() -> {
var data = new Holder<byte[]>();
if (db.keyMayExist(cfh, key, data)) {
if (data.getValue() != null) {
response = data.getValue();
return data.getValue();
} else {
response = db.get(cfh, key);
return db.get(cfh, key);
}
} else {
response = null;
return null;
}
})
.onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic());
break;
case VOID:
response = Mono.empty();
break;
}
return Mono
.fromCallable(() -> {
db.put(cfh, key, value);
return Optional.ofNullable(response);
} catch (RocksDBException e) {
throw new IOException(e);
}
return null;
})
.onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic())
.then(response);
}
@Override
public void putMulti(byte[][] key, byte[][] value, LLDictionaryResultType resultType, Consumer<byte[]> responsesConsumer)
throws IOException {
if (key.length == value.length) {
List<byte[]> responses;
try (WriteBatch writeBatch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) {
if (resultType == LLDictionaryResultType.VOID) {
responses = EMPTY_UNMODIFIABLE_LIST;
public Mono<byte[]> remove(byte[] key, LLDictionaryResultType resultType) {
Mono<byte[]> response = null;
switch (resultType) {
case VALUE_CHANGED:
response = containsKey(null, key).single().map(LLUtils::booleanToResponse);
break;
case PREVIOUS_VALUE:
response = Mono
.fromCallable(() -> {
var data = new Holder<byte[]>();
if (db.keyMayExist(cfh, key, data)) {
if (data.getValue() != null) {
return data.getValue();
} else {
responses = db.multiGetAsList(newCfhList(cfh, key.length), Arrays.asList(key));
}
for (int i = 0; i < key.length; i++) {
writeBatch.put(cfh, key[i], value[i]);
}
db.write(BATCH_WRITE_OPTIONS, writeBatch);
} catch (RocksDBException e) {
throw new IOException(e);
}
for (byte[] response : responses) {
responsesConsumer.accept(response);
return db.get(cfh, key);
}
} else {
throw new IOException("Wrong parameters count");
return null;
}
})
.onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic());
break;
case VOID:
response = Mono.empty();
break;
}
return Mono
.fromCallable(() -> {
db.delete(cfh, key);
return null;
})
.onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic())
.then(response);
}
@Override
public Flux<Entry<byte[], byte[]>> getMulti(@Nullable LLSnapshot snapshot, Flux<byte[]> keys) {
return keys.flatMap(key -> this.get(snapshot, key).map(value -> Map.entry(key, value)));
}
@Override
public Flux<Entry<byte[], byte[]>> putMulti(Flux<Entry<byte[], byte[]>> entries, boolean getOldValues) {
return Mono
.fromCallable(() -> new CappedWriteBatch(db,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS
))
.subscribeOn(Schedulers.boundedElastic())
.flatMapMany(writeBatch -> entries
.flatMap(newEntry -> Mono
.defer(() -> {
if (getOldValues) {
return get(null, newEntry.getKey());
} else {
return Mono.empty();
}
})
.concatWith(Mono
.<byte[]>fromCallable(() -> {
synchronized (writeBatch) {
writeBatch.put(cfh, newEntry.getKey(), newEntry.getValue());
}
return null;
})
.subscribeOn(Schedulers.boundedElastic())
)
.map(oldValue -> Map.entry(newEntry.getKey(), oldValue))
)
.concatWith(Mono
.<Entry<byte[], byte[]>>fromCallable(() -> {
synchronized (writeBatch) {
writeBatch.writeToDbAndClose();
writeBatch.close();
}
return null;
})
.subscribeOn(Schedulers.boundedElastic())
)
.doFinally(signalType -> {
synchronized (writeBatch) {
writeBatch.close();
}
})
)
.onErrorMap(IOException::new);
}
private static List<ColumnFamilyHandle> newCfhList(ColumnFamilyHandle cfh, int size) {
@ -189,107 +298,156 @@ public class LLLocalDictionary implements LLDictionary {
}
@Override
public Optional<byte[]> remove(byte[] key, LLDictionaryResultType resultType) throws IOException {
try {
byte[] response = null;
switch (resultType) {
case VALUE_CHANGED:
response = LLUtils.booleanToResponse(contains_(null, key));
break;
case PREVIOUS_VALUE:
var data = new Holder<byte[]>();
if (db.keyMayExist(cfh, key, data)) {
if (data.getValue() != null) {
response = data.getValue();
public Flux<Entry<byte[], byte[]>> getRange(@Nullable LLSnapshot snapshot, LLRange range) {
if (range.isSingle()) {
return getRangeSingle(snapshot, range.getMin());
} else {
response = db.get(cfh, key);
return getRangeMulti(snapshot, range);
}
}
private Flux<Entry<byte[],byte[]>> getRangeMulti(LLSnapshot snapshot, LLRange range) {
return Mono
.fromCallable(() -> {
var iter = db.newIterator(cfh, resolveSnapshot(snapshot));
if (range.hasMin()) {
iter.seek(range.getMin());
} else {
response = null;
iter.seekToFirst();
}
break;
}
db.delete(cfh, key);
return Optional.ofNullable(response);
} catch (RocksDBException e) {
throw new IOException(e);
}
}
//todo: implement parallel forEach
return iter;
})
.subscribeOn(Schedulers.boundedElastic())
.flatMapMany(rocksIterator -> Flux
.<Entry<byte[], byte[]>>fromIterable(() -> {
VariableWrapper<byte[]> nextKey = new VariableWrapper<>(null);
VariableWrapper<byte[]> nextValue = new VariableWrapper<>(null);
return new Iterator<>() {
@Override
public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer<byte[], byte[]> consumer) {
try (RocksIterator iter = db.newIterator(cfh, resolveSnapshot(snapshot))) {
iter.seekToFirst();
while (iter.isValid()) {
if (consumer.acceptCancellable(iter.key(), iter.value()).isCancelled()) {
return ConsumerResult.cancelNext();
public boolean hasNext() {
assert nextKey.var == null;
assert nextValue.var == null;
if (!rocksIterator.isValid()) {
nextKey.var = null;
nextValue.var = null;
return false;
}
iter.next();
var key = rocksIterator.key();
var value = rocksIterator.value();
if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) {
nextKey.var = null;
nextValue.var = null;
return false;
}
}
return ConsumerResult.result();
nextKey.var = key;
nextValue.var = value;
return true;
}
//todo: implement parallel replace
@Override
public ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction<byte[], byte[], Entry<byte[], byte[]>> consumer) throws IOException {
try {
try (var snapshot = replaceKeys ? db.getSnapshot() : null) {
try (RocksIterator iter = db.newIterator(cfh, getReadOptions(snapshot));
CappedWriteBatch writeBatch = new CappedWriteBatch(db, CAPPED_WRITE_BATCH_CAP, RESERVED_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE, BATCH_WRITE_OPTIONS)) {
iter.seekToFirst();
if (replaceKeys) {
while (iter.isValid()) {
writeBatch.delete(cfh, iter.key());
iter.next();
public Entry<byte[], byte[]> next() {
var key = nextKey.var;
var val = nextValue.var;
assert key != null;
assert val != null;
nextKey.var = null;
nextValue.var = null;
return Map.entry(key, val);
}
};
})
.doFinally(signalType -> rocksIterator.close())
.subscribeOn(Schedulers.boundedElastic())
);
}
iter.seekToFirst();
while (iter.isValid()) {
var result = consumer.applyCancellable(iter.key(), iter.value());
boolean keyDiffers = !Arrays.equals(iter.key(), result.getValue().getKey());
if (!replaceKeys && keyDiffers) {
throw new IOException("Tried to replace a key");
private Flux<Entry<byte[],byte[]>> getRangeSingle(LLSnapshot snapshot, byte[] key) {
return this
.get(snapshot, key)
.map(value -> Map.entry(key, value))
.flux();
}
// put if changed or if keys can be swapped/replaced
if (replaceKeys || !Arrays.equals(iter.value(), result.getValue().getValue())) {
writeBatch.put(cfh, result.getValue().getKey(), result.getValue().getValue());
@Override
public Flux<Entry<byte[], byte[]>> setRange(LLRange range,
Flux<Entry<byte[], byte[]>> entries,
boolean getOldValues) {
if (range.isAll()) {
return clear().thenMany(Flux.empty());
} else {
return Mono
.fromCallable(() -> new CappedWriteBatch(db, CAPPED_WRITE_BATCH_CAP, RESERVED_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE, BATCH_WRITE_OPTIONS))
.subscribeOn(Schedulers.boundedElastic())
.flatMapMany(writeBatch -> Mono
.fromCallable(() -> {
synchronized (writeBatch) {
if (range.hasMin() && range.hasMax()) {
writeBatch.deleteRange(cfh, range.getMin(), range.getMax());
writeBatch.delete(cfh, range.getMax());
} else if (range.hasMax()) {
writeBatch.deleteRange(cfh, FIRST_KEY, range.getMax());
writeBatch.delete(cfh, range.getMax());
} else {
try (var it = db.newIterator(cfh, getReadOptions(null))) {
it.seekToLast();
if (it.isValid()) {
writeBatch.deleteRange(cfh, range.getMin(), it.key());
writeBatch.delete(cfh, it.key());
}
if (result.isCancelled()) {
// Cancels and discards the write batch
writeBatch.clear();
return ConsumerResult.cancelNext();
}
iter.next();
}
}
return null;
})
.subscribeOn(Schedulers.boundedElastic())
.thenMany(entries)
.flatMap(newEntry -> Mono
.defer(() -> {
if (getOldValues) {
return get(null, newEntry.getKey());
} else {
return Mono.empty();
}
})
.concatWith(Mono
.<byte[]>fromCallable(() -> {
synchronized (writeBatch) {
writeBatch.put(cfh, newEntry.getKey(), newEntry.getValue());
}
return null;
})
.subscribeOn(Schedulers.boundedElastic())
)
.map(oldValue -> Map.entry(newEntry.getKey(), oldValue))
)
.concatWith(Mono
.<Entry<byte[], byte[]>>fromCallable(() -> {
synchronized (writeBatch) {
writeBatch.writeToDbAndClose();
return ConsumerResult.result();
} finally {
db.releaseSnapshot(snapshot);
}
return null;
})
.subscribeOn(Schedulers.boundedElastic())
)
.doFinally(signalType -> {
synchronized (writeBatch) {
writeBatch.close();
}
} catch (RocksDBException e) {
throw new IOException(e);
})
)
.onErrorMap(IOException::new);
}
}
// This method is exactly the same of LLLocalDictionary. Remember to keep the code equal
@Override
public void clear() throws IOException {
try (RocksIterator iter = db.newIterator(cfh);
CappedWriteBatch writeBatch = new CappedWriteBatch(db, CAPPED_WRITE_BATCH_CAP, RESERVED_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE, BATCH_WRITE_OPTIONS)) {
public Mono<Void> clear() {
return Mono
.<Void>fromCallable(() -> {
try (RocksIterator iter = db.newIterator(cfh); CappedWriteBatch writeBatch = new CappedWriteBatch(db,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS
)) {
iter.seekToFirst();
@ -311,17 +469,54 @@ public class LLLocalDictionary implements LLDictionary {
if (finalSize != 0) {
throw new IllegalStateException("The dictionary is not empty after calling clear()");
}
} catch (RocksDBException e) {
throw new IOException(e);
}
return null;
})
.onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic());
}
@Override
public long size(@Nullable LLSnapshot snapshot, boolean fast) throws IOException {
return fast ? fastSize(snapshot) : exactSize(snapshot);
public Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, LLRange range, boolean fast) {
return Mono
.defer(() -> {
if (range.isAll()) {
return Mono
.fromCallable(() -> fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot))
.onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic());
} else {
return Mono
.fromCallable(() -> {
try (var iter = db.newIterator(cfh, resolveSnapshot(snapshot))) {
if (range.hasMin()) {
iter.seek(range.getMin());
} else {
iter.seekToFirst();
}
long i = 0;
while (iter.isValid()) {
if (range.hasMax()) {
byte[] key1 = iter.key();
if (Arrays.compareUnsigned(key1, range.getMax()) > 0) {
break;
}
}
public long fastSize(@Nullable LLSnapshot snapshot) {
iter.next();
i++;
}
return i;
}
})
.onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic());
}
});
}
private long fastSizeAll(@Nullable LLSnapshot snapshot) {
var rocksdbSnapshot = resolveSnapshot(snapshot);
if (USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS || rocksdbSnapshot.snapshot() == null) {
try {
@ -344,7 +539,7 @@ public class LLLocalDictionary implements LLDictionary {
}
}
public long exactSize(@Nullable LLSnapshot snapshot) {
private long exactSizeAll(@Nullable LLSnapshot snapshot) {
long count = 0;
try (RocksIterator iter = db.newIterator(cfh, resolveSnapshot(snapshot))) {
iter.seekToFirst();
@ -357,29 +552,48 @@ public class LLLocalDictionary implements LLDictionary {
}
@Override
public boolean isEmpty(@Nullable LLSnapshot snapshot) {
public Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, LLRange range) {
return Mono
.fromCallable(() -> {
try (RocksIterator iter = db.newIterator(cfh, resolveSnapshot(snapshot))) {
if (range.hasMin()) {
iter.seek(range.getMin());
} else {
iter.seekToFirst();
if (iter.isValid()) {
return false;
}
}
if (!iter.isValid()) {
return true;
}
return range.hasMax() && Arrays.compareUnsigned(iter.key(), range.getMax()) > 0;
}
})
.onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic());
}
@Override
public Optional<Entry<byte[], byte[]>> removeOne() throws IOException {
public Mono<Entry<byte[], byte[]>> removeOne(LLRange range) {
return Mono
.fromCallable(() -> {
try (RocksIterator iter = db.newIterator(cfh)) {
if (range.hasMin()) {
iter.seek(range.getMin());
} else {
iter.seekToFirst();
if (iter.isValid()) {
}
if (!iter.isValid()) {
return null;
}
if (range.hasMax() && Arrays.compareUnsigned(iter.key(), range.getMax()) > 0) {
return null;
}
byte[] key = iter.key();
byte[] value = iter.value();
db.delete(cfh, key);
return Optional.of(Map.entry(key, value));
return Map.entry(key, value);
}
} catch (RocksDBException e) {
throw new IOException(e);
}
return Optional.empty();
})
.onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic());
}
}

View File

@ -1,5 +1,10 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@ -31,12 +36,6 @@ import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot;
import org.rocksdb.WALRecoveryMode;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLDeepDictionary;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot;
public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
@ -54,6 +53,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
private final ConcurrentHashMap<Long, Snapshot> snapshotsHandles = new ConcurrentHashMap<>();
private final AtomicLong nextSnapshotNumbers = new AtomicLong(1);
@SuppressWarnings("CommentedOutCode")
public LLLocalKeyValueDatabase(String name, Path path, List<Column> columns, List<ColumnFamilyHandle> handles,
boolean crashIfWalError, boolean lowMemory) throws IOException {
Options options = openRocksDb(path, crashIfWalError, lowMemory);
@ -139,6 +139,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
// end force flush
}
@SuppressWarnings("CommentedOutCode")
private static Options openRocksDb(Path path, boolean crashIfWalError, boolean lowMemory)
throws IOException {
// Get databases directory path
@ -243,8 +244,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
var handles = new LinkedList<ColumnFamilyHandle>();
/**
* SkipStatsUpdateOnDbOpen = true because this RocksDB.open session is used only to add just some columns
/*
SkipStatsUpdateOnDbOpen = true because this RocksDB.open session is used only to add just some columns
*/
//var dbOptionsFastLoadSlowEdit = new DBOptions(options.setSkipStatsUpdateOnDbOpen(true));
@ -270,8 +271,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
descriptorsToCreate
.removeIf((cf) -> Arrays.equals(cf.getName(), DEFAULT_COLUMN_FAMILY.getName()));
/**
* SkipStatsUpdateOnDbOpen = true because this RocksDB.open session is used only to add just some columns
/*
SkipStatsUpdateOnDbOpen = true because this RocksDB.open session is used only to add just some columns
*/
//var dbOptionsFastLoadSlowEdit = options.setSkipStatsUpdateOnDbOpen(true);
@ -310,17 +311,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
);
}
@Override
public LLDeepDictionary getDeepDictionary(byte[] columnName, int keySize, int key2Size) {
return new LLLocalDeepDictionary(db,
handles.get(Column.special(Column.toString(columnName))),
name,
(snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()),
keySize,
key2Size
);
}
@Override
public long getProperty(String propertyName) throws IOException {
try {
@ -360,6 +350,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
/**
* Call this method ONLY AFTER flushing completely a db and closing it!
*/
@SuppressWarnings("unused")
private void deleteUnusedOldLogFiles() {
Path basePath = dbPath;
try {

View File

@ -1,162 +0,0 @@
package it.cavallium.dbengine.database.structures;
import it.cavallium.dbengine.database.LLDeepDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLKeyValueDatabaseStructure;
import it.cavallium.dbengine.database.LLSnapshot;
import java.io.IOException;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.function.Consumer;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.functional.CancellableBiConsumer;
import org.warp.commonutils.functional.CancellableBiFunction;
import org.warp.commonutils.functional.CancellableTriConsumer;
import org.warp.commonutils.functional.CancellableTriFunction;
import org.warp.commonutils.functional.ConsumerResult;
import org.warp.commonutils.type.Bytes;
import org.warp.commonutils.type.UnmodifiableIterableMap;
import org.warp.commonutils.type.UnmodifiableMap;
public class LLDeepMap implements LLKeyValueDatabaseStructure {
private final LLDeepDictionary dictionary;
public LLDeepMap(LLDeepDictionary dictionary) {
this.dictionary = dictionary;
}
public UnmodifiableIterableMap<byte[], byte[]> get(@Nullable LLSnapshot snapshot, byte[] key) throws IOException {
return dictionary.get(snapshot, key);
}
public Optional<byte[]> get(@Nullable LLSnapshot snapshot, byte[] key1, byte[] key2) throws IOException {
return dictionary.get(snapshot, key1, key2);
}
public boolean isEmpty(@Nullable LLSnapshot snapshot, byte[] key1) {
return dictionary.isEmpty(snapshot, key1);
}
public boolean contains(@Nullable LLSnapshot snapshot, byte[] key1, byte[] key2) throws IOException {
return dictionary.contains(snapshot, key1, key2);
}
/**
* Note: this will remove previous elements because it replaces the entire map of key
*/
public void put(byte[] key1, UnmodifiableIterableMap<byte[], byte[]> value) throws IOException {
dictionary.put(key1, value);
}
public Optional<byte[]> put(byte[] key1, byte[] key2, byte[] value, LLDeepMapResultType resultType) throws IOException {
return dictionary.put(key1, key2, value, resultType.getDictionaryResultType());
}
public void putMulti(byte[][] keys1, UnmodifiableIterableMap<byte[], byte[]>[] values) throws IOException {
dictionary.putMulti(keys1, values);
}
public void putMulti(byte[] key1, byte[][] keys2, byte[][] values, LLDeepMapResultType resultType, Consumer<byte[]> responses) throws IOException {
dictionary.putMulti(key1, keys2, values, resultType.getDictionaryResultType(), responses);
}
public void putMulti(byte[][] keys1, byte[][] keys2, byte[][] values, LLDeepMapResultType resultType, Consumer<byte[]> responses) throws IOException {
dictionary.putMulti(keys1, keys2, values, resultType.getDictionaryResultType(), responses);
}
public void clear() throws IOException {
dictionary.clear();
}
public Optional<UnmodifiableIterableMap<byte[], byte[]>> clear(byte[] key1, LLDeepMapResultType resultType) throws IOException {
return dictionary.clear(key1, resultType.getDictionaryResultType());
}
public Optional<byte[]> remove(byte[] key1, byte[] key2, LLDeepMapResultType resultType) throws IOException {
return dictionary.remove(key1, key2, resultType.getDictionaryResultType());
}
public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer<byte[], UnmodifiableIterableMap<byte[], byte[]>> consumer) {
return dictionary.forEach(snapshot, parallelism, consumer);
}
public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key1, CancellableBiConsumer<byte[], byte[]> consumer) {
return dictionary.forEach(snapshot, parallelism, key1, consumer);
}
public void replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction<byte[], UnmodifiableIterableMap<byte[], byte[]>, Entry<byte[], UnmodifiableMap<Bytes, byte[]>>> consumer) throws IOException {
dictionary.replaceAll(parallelism, replaceKeys, consumer);
}
public void replaceAll(int parallelism, boolean replaceKeys, byte[] key1, CancellableBiFunction<byte[], byte[], Entry<byte[], byte[]>> consumer) throws IOException {
dictionary.replaceAll(parallelism, replaceKeys, key1, consumer);
}
public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableTriConsumer<byte[], byte[], byte[]> consumer) {
return dictionary.forEach(snapshot, parallelism, consumer);
}
public void replaceAll(int parallelism, boolean replaceKeys, CancellableTriFunction<byte[], byte[], byte[], ImmutableTriple<byte[], byte[], byte[]>> consumer) throws IOException {
dictionary.replaceAll(parallelism, replaceKeys, consumer);
}
public long size(@Nullable LLSnapshot snapshot, boolean fast) throws IOException {
return dictionary.size(snapshot, fast);
}
public long exactSize(@Nullable LLSnapshot snapshot, byte[] key1) {
return dictionary.exactSize(snapshot, key1);
}
@Override
public String getDatabaseName() {
return dictionary.getDatabaseName();
}
public enum LLDeepMapResultType {
VOID,
VALUE_CHANGED,
PREVIOUS_VALUE;
public LLDictionaryResultType getDictionaryResultType() {
switch (this) {
case VOID:
return LLDictionaryResultType.VOID;
case VALUE_CHANGED:
return LLDictionaryResultType.VALUE_CHANGED;
case PREVIOUS_VALUE:
return LLDictionaryResultType.PREVIOUS_VALUE;
}
return LLDictionaryResultType.VOID;
}
}
@Override
public String toString() {
return new StringJoiner(", ", LLDeepMap.class.getSimpleName() + "[", "]")
.add("dictionary=" + dictionary)
.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LLDeepMap llMap = (LLDeepMap) o;
return Objects.equals(dictionary, llMap.dictionary);
}
@Override
public int hashCode() {
return Objects.hash(dictionary);
}
}

View File

@ -1,210 +0,0 @@
package it.cavallium.dbengine.database.structures;
import it.cavallium.dbengine.database.LLDeepDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLKeyValueDatabaseStructure;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import it.unimi.dsi.fastutil.objects.ObjectSets.UnmodifiableSet;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.StringJoiner;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.functional.CancellableBiConsumer;
import org.warp.commonutils.functional.CancellableBiFunction;
import org.warp.commonutils.functional.CancellableConsumer;
import org.warp.commonutils.functional.CancellableFunction;
import org.warp.commonutils.functional.ConsumerResult;
import org.warp.commonutils.type.Bytes;
import org.warp.commonutils.type.UnmodifiableIterableMap;
import org.warp.commonutils.type.UnmodifiableIterableSet;
import org.warp.commonutils.type.UnmodifiableMap;
/**
* A set in which keys and values must have a fixed size
*/
public class LLFixedDeepSet implements LLKeyValueDatabaseStructure {
private static final byte[] EMPTY_VALUE = new byte[0];
private static final Bytes EMPTY_VALUE_BYTES = new Bytes(EMPTY_VALUE);
private final LLDeepDictionary dictionary;
public LLFixedDeepSet(LLDeepDictionary dictionary) {
this.dictionary = dictionary;
}
private byte[][] generateEmptyArray(int length) {
byte[][] data = new byte[length][];
for (int i = 0; i < length; i++) {
data[i] = EMPTY_VALUE;
}
return data;
}
private Bytes[] generateEmptyBytesArray(int length) {
Bytes[] data = new Bytes[length];
for (int i = 0; i < length; i++) {
data[i] = EMPTY_VALUE_BYTES;
}
return data;
}
public UnmodifiableIterableSet<byte[]> get(@Nullable LLSnapshot snapshot, byte[] key1) throws IOException {
return dictionary.get(snapshot, key1).toUnmodifiableIterableKeysSet(byte[][]::new);
}
public boolean contains(@Nullable LLSnapshot snapshot, byte[] key1, byte[] value) throws IOException {
return dictionary.contains(snapshot, key1, value);
}
public boolean isEmpty(@Nullable LLSnapshot snapshot, byte[] key1) {
return dictionary.isEmpty(snapshot, key1);
}
public boolean add(byte[] key1, byte[] value, LLDeepSetItemResultType resultType) throws IOException {
Optional<byte[]> response = dictionary.put(key1, value, EMPTY_VALUE, resultType.getDictionaryResultType());
if (resultType == LLDeepSetItemResultType.VALUE_CHANGED) {
return LLUtils.responseToBoolean(response.orElseThrow());
}
return false;
}
public void addMulti(byte[] key1, byte[][] values) throws IOException {
dictionary.putMulti(key1, values, generateEmptyArray(values.length), LLDictionaryResultType.VOID, (x) -> {});
}
/**
* Note: this will remove previous elements because it replaces the entire set
*/
public void put(byte[] key1, UnmodifiableIterableSet<byte[]> values) throws IOException {
dictionary.put(key1, values.toUnmodifiableIterableMapSetValues(generateEmptyArray(values.size())));
}
public void putMulti(byte[][] keys1, UnmodifiableIterableSet<byte[]>[] values) throws IOException {
var fixedValues = new UnmodifiableIterableMap[values.length];
for (int i = 0; i < values.length; i++) {
fixedValues[i] = values[i].toUnmodifiableIterableMapSetValues(generateEmptyArray(values[i].size()));
}
//noinspection unchecked
dictionary.putMulti(keys1, fixedValues);
}
public void clear() throws IOException {
dictionary.clear();
}
public Optional<UnmodifiableIterableSet<byte[]>> clear(byte[] key1, LLDeepSetResultType resultType) throws IOException {
Optional<UnmodifiableIterableMap<byte[], byte[]>> response = dictionary.clear(key1, resultType.getDictionaryResultType());
if (response.isEmpty()) {
return Optional.empty();
} else {
return Optional.of(response.get().toUnmodifiableIterableKeysSet(byte[][]::new));
}
}
public boolean remove(byte[] key1, byte[] value, LLDeepSetItemResultType resultType) throws IOException {
Optional<byte[]> response = dictionary.remove(key1, value, resultType.getDictionaryResultType());
if (resultType == LLDeepSetItemResultType.VALUE_CHANGED) {
return LLUtils.responseToBoolean(response.orElseThrow());
}
return false;
}
public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer<byte[], UnmodifiableIterableSet<byte[]>> consumer) {
return dictionary.forEach(snapshot, parallelism, (key1, entries) -> consumer.acceptCancellable(key1, entries.toUnmodifiableIterableKeysSet(byte[][]::new)));
}
public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key1, CancellableConsumer<byte[]> consumer) {
return dictionary.forEach(snapshot, parallelism, key1, (value, empty) -> consumer.acceptCancellable(value));
}
public void replaceAll(int parallelism, CancellableBiFunction<byte[], UnmodifiableIterableSet<byte[]>, Entry<byte[], UnmodifiableSet<Bytes>>> consumer) throws IOException {
dictionary.replaceAll(parallelism, true, (key1, entries) -> {
var result = consumer.applyCancellable(key1, entries.toUnmodifiableIterableKeysSet(byte[][]::new));
var resultItems = result.getValue().getValue().toArray(Bytes[]::new);
return result.copyStatusWith(Map.entry(result.getValue().getKey(), UnmodifiableMap.of(resultItems, generateEmptyArray(resultItems.length))));
});
}
public void replaceAll(int parallelism, byte[] key1, CancellableFunction<byte[], byte[]> consumer) throws IOException {
dictionary.replaceAll(parallelism, true, key1, (value, empty) -> {
var changedValue = consumer.applyCancellable(value);
return changedValue.copyStatusWith(Map.entry(changedValue.getValue(), EMPTY_VALUE));
});
}
public long size(@Nullable LLSnapshot snapshot, boolean fast) throws IOException {
return dictionary.size(snapshot, fast);
}
public long exactSize(@Nullable LLSnapshot snapshot, byte[] key1) {
return dictionary.exactSize(snapshot, key1);
}
@Override
public String getDatabaseName() {
return dictionary.getDatabaseName();
}
public enum LLDeepSetResultType {
VOID,
VALUE_CHANGED,
PREVIOUS_VALUE;
public LLDictionaryResultType getDictionaryResultType() {
switch (this) {
case VOID:
return LLDictionaryResultType.VOID;
case VALUE_CHANGED:
return LLDictionaryResultType.VALUE_CHANGED;
case PREVIOUS_VALUE:
return LLDictionaryResultType.PREVIOUS_VALUE;
}
return LLDictionaryResultType.VOID;
}
}
public enum LLDeepSetItemResultType {
VOID,
VALUE_CHANGED;
public LLDictionaryResultType getDictionaryResultType() {
switch (this) {
case VOID:
return LLDictionaryResultType.VOID;
case VALUE_CHANGED:
return LLDictionaryResultType.VALUE_CHANGED;
}
return LLDictionaryResultType.VOID;
}
}
@Override
public String toString() {
return new StringJoiner(", ", LLFixedDeepSet.class.getSimpleName() + "[", "]")
.add("dictionary=" + dictionary)
.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LLFixedDeepSet llMap = (LLFixedDeepSet) o;
return Objects.equals(dictionary, llMap.dictionary);
}
@Override
public int hashCode() {
return Objects.hash(dictionary);
}
}

View File

@ -1,118 +0,0 @@
package it.cavallium.dbengine.database.structures;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLKeyValueDatabaseStructure;
import it.cavallium.dbengine.database.LLSnapshot;
import java.io.IOException;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.function.Consumer;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.functional.CancellableBiConsumer;
import org.warp.commonutils.functional.CancellableBiFunction;
import org.warp.commonutils.functional.ConsumerResult;
public class LLMap implements LLKeyValueDatabaseStructure {
private final LLDictionary dictionary;
public LLMap(LLDictionary dictionary) {
this.dictionary = dictionary;
}
public Optional<byte[]> get(@Nullable LLSnapshot snapshot, byte[] key) throws IOException {
return dictionary.get(snapshot, key);
}
public Optional<byte[]> put(byte[] key, byte[] value, LLMapResultType resultType)
throws IOException {
return dictionary.put(key, value, resultType.getDictionaryResultType());
}
public void putMulti(byte[][] key, byte[][] value, LLMapResultType resultType,
Consumer<Optional<byte[]>> results) throws IOException {
dictionary.putMulti(key, value, resultType.getDictionaryResultType(),
(result) -> results.accept(Optional.ofNullable(result.length == 0 ? null : result)));
}
public boolean contains(@Nullable LLSnapshot snapshot, byte[] key) throws IOException {
return dictionary.contains(snapshot, key);
}
public Optional<byte[]> remove(byte[] key, LLMapResultType resultType) throws IOException {
return dictionary.remove(key, resultType.getDictionaryResultType());
}
public void clear() throws IOException {
dictionary.clear();
}
public long size(@Nullable LLSnapshot snapshot, boolean fast) throws IOException {
return dictionary.size(snapshot, fast);
}
/**
* The consumer can be called from different threads
*/
public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer<byte[], byte[]> consumer) {
return dictionary.forEach(snapshot, parallelism, consumer);
}
/**
* The consumer can be called from different threads
*/
public ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction<byte[], byte[], Entry<byte[], byte[]>> consumer) throws IOException {
return dictionary.replaceAll(parallelism, replaceKeys, consumer);
}
@Override
public String getDatabaseName() {
return dictionary.getDatabaseName();
}
public enum LLMapResultType {
VOID,
VALUE_CHANGED,
PREVIOUS_VALUE;
public LLDictionaryResultType getDictionaryResultType() {
switch (this) {
case VOID:
return LLDictionaryResultType.VOID;
case VALUE_CHANGED:
return LLDictionaryResultType.VALUE_CHANGED;
case PREVIOUS_VALUE:
return LLDictionaryResultType.PREVIOUS_VALUE;
}
return LLDictionaryResultType.VOID;
}
}
@Override
public String toString() {
return new StringJoiner(", ", LLMap.class.getSimpleName() + "[", "]")
.add("dictionary=" + dictionary)
.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LLMap llMap = (LLMap) o;
return Objects.equals(dictionary, llMap.dictionary);
}
@Override
public int hashCode() {
return Objects.hash(dictionary);
}
}

View File

@ -1,105 +0,0 @@
package it.cavallium.dbengine.database.structures;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLKeyValueDatabaseStructure;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.functional.CancellableConsumer;
import org.warp.commonutils.functional.CancellableFunction;
import org.warp.commonutils.functional.ConsumerResult;
public class LLSet implements LLKeyValueDatabaseStructure {
private static final byte[] EMPTY_VALUE = new byte[0];
private final LLDictionary dictionary;
public LLSet(LLDictionary dictionary) {
this.dictionary = dictionary;
}
@Override
public String getDatabaseName() {
return dictionary.getDatabaseName();
}
private byte[][] generateEmptyArray(int length) {
byte[][] data = new byte[length][];
for (int i = 0; i < length; i++) {
data[i] = EMPTY_VALUE;
}
return data;
}
public boolean contains(@Nullable LLSnapshot snapshot, byte[] value) throws IOException {
return dictionary.contains(snapshot, value);
}
public boolean add(byte[] value, LLSetResultType resultType) throws IOException {
Optional<byte[]> response = dictionary.put(value, EMPTY_VALUE, resultType.getDictionaryResultType());
if (resultType == LLSetResultType.VALUE_CHANGED) {
return LLUtils.responseToBoolean(response.orElseThrow());
}
return false;
}
public void addMulti(byte[][] values) throws IOException {
dictionary.putMulti(values, generateEmptyArray(values.length), LLDictionaryResultType.VOID, (x) -> {});
}
public boolean remove(byte[] value, LLSetResultType resultType) throws IOException {
Optional<byte[]> response = dictionary.remove(value, resultType.getDictionaryResultType());
if (resultType == LLSetResultType.VALUE_CHANGED) {
return LLUtils.responseToBoolean(response.orElseThrow());
}
return false;
}
public void clearUnsafe() throws IOException {
dictionary.clear();
}
public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableConsumer<byte[]> consumer) {
return dictionary.forEach(snapshot, parallelism, (key, emptyValue) -> consumer.acceptCancellable(key));
}
public ConsumerResult replaceAll(int parallelism, CancellableFunction<byte[], byte[]> consumer) throws IOException {
return dictionary.replaceAll(parallelism, true, (key, emptyValue) -> {
var result = consumer.applyCancellable(key);
return result.copyStatusWith(Map.entry(result.getValue(), emptyValue));
});
}
public long size(@Nullable LLSnapshot snapshot, boolean fast) throws IOException {
return dictionary.size(snapshot, fast);
}
public boolean isEmptyUnsafe(@Nullable LLSnapshot snapshot) throws IOException {
return dictionary.isEmpty(snapshot);
}
public Optional<byte[]> removeOneUnsafe() throws IOException {
return dictionary.removeOne().map(Entry::getKey);
}
public enum LLSetResultType {
VOID,
VALUE_CHANGED;
public LLDictionaryResultType getDictionaryResultType() {
switch (this) {
case VOID:
return LLDictionaryResultType.VOID;
case VALUE_CHANGED:
return LLDictionaryResultType.VALUE_CHANGED;
}
return LLDictionaryResultType.VOID;
}
}
}