Cancellable iterations

This commit is contained in:
Andrea Cavalli 2021-01-17 18:31:25 +01:00
parent bfe6af4088
commit 1418821b48
11 changed files with 213 additions and 123 deletions

View File

@ -163,6 +163,11 @@
<artifactId>lucene-core</artifactId>
<version>8.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-join</artifactId>
<version>8.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-analyzers-common</artifactId>

View File

@ -3,14 +3,15 @@ package it.cavallium.dbengine.database;
import java.io.IOException;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
import org.warp.commonutils.functional.TriConsumer;
import org.warp.commonutils.functional.TriFunction;
import org.warp.commonutils.functional.CancellableBiConsumer;
import org.warp.commonutils.functional.CancellableBiFunction;
import org.warp.commonutils.functional.CancellableTriConsumer;
import org.warp.commonutils.functional.CancellableTriFunction;
import org.warp.commonutils.functional.ConsumerResult;
import org.warp.commonutils.type.Bytes;
import org.warp.commonutils.type.UnmodifiableIterableMap;
import org.warp.commonutils.type.UnmodifiableMap;
@ -49,18 +50,18 @@ public interface LLDeepDictionary extends LLKeyValueDatabaseStructure {
Optional<byte[]> remove(byte[] key1, byte[] key2, LLDictionaryResultType resultType) throws IOException;
void forEach(@Nullable LLSnapshot snapshot, int parallelism, TriConsumer<byte[], byte[], byte[]> consumer);
ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableTriConsumer<byte[], byte[], byte[]> consumer);
void forEach(@Nullable LLSnapshot snapshot, int parallelism, BiConsumer<byte[], UnmodifiableIterableMap<byte[], byte[]>> consumer);
ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer<byte[], UnmodifiableIterableMap<byte[], byte[]>> consumer);
void forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key1, BiConsumer<byte[], byte[]> consumer);
ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key1, CancellableBiConsumer<byte[], byte[]> consumer);
void replaceAll(int parallelism, boolean replaceKeys, TriFunction<byte[], byte[], byte[], ImmutableTriple<byte[], byte[], byte[]>> consumer) throws IOException;
ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableTriFunction<byte[], byte[], byte[], ImmutableTriple<byte[], byte[], byte[]>> consumer) throws IOException;
void replaceAll(int parallelism, boolean replaceKeys, BiFunction<byte[], UnmodifiableIterableMap<byte[], byte[]>, Entry<byte[], UnmodifiableMap<Bytes, byte[]>>> consumer) throws IOException;
ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction<byte[], UnmodifiableIterableMap<byte[], byte[]>, Entry<byte[], UnmodifiableMap<Bytes, byte[]>>> consumer) throws IOException;
void replaceAll(int parallelism, boolean replaceKeys, byte[] key1, BiFunction<byte[], byte[], Entry<byte[], byte[]>> consumer) throws IOException;
ConsumerResult replaceAll(int parallelism, boolean replaceKeys, byte[] key1, CancellableBiFunction<byte[], byte[], Entry<byte[], byte[]>> consumer) throws IOException;
long size(@Nullable LLSnapshot snapshot, boolean fast) throws IOException;

View File

@ -3,11 +3,12 @@ package it.cavallium.dbengine.database;
import java.io.IOException;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
import org.warp.commonutils.functional.CancellableBiConsumer;
import org.warp.commonutils.functional.CancellableBiFunction;
import org.warp.commonutils.functional.ConsumerResult;
@NotAtomic
public interface LLDictionary extends LLKeyValueDatabaseStructure {
@ -27,12 +28,12 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
/**
* This method can call the consumer from different threads in parallel
*/
void forEach(@Nullable LLSnapshot snapshot, int parallelism, BiConsumer<byte[], byte[]> consumer);
ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer<byte[], byte[]> consumer);
/**
* This method can call the consumer from different threads in parallel
*/
void replaceAll(int parallelism, boolean replaceKeys, BiFunction<byte[], byte[], Entry<byte[], byte[]>> consumer) throws IOException;
ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction<byte[], byte[], Entry<byte[], byte[]>> consumer) throws IOException;
void clear() throws IOException;

View File

@ -1,5 +1,9 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.LLDeepDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
import java.io.IOException;
@ -9,8 +13,6 @@ import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.lang3.tuple.ImmutableTriple;
@ -27,15 +29,14 @@ import org.rocksdb.Snapshot;
import org.rocksdb.WriteBatchInterface;
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
import org.warp.commonutils.error.IndexOutOfBoundsException;
import org.warp.commonutils.functional.TriConsumer;
import org.warp.commonutils.functional.TriFunction;
import org.warp.commonutils.functional.CancellableBiConsumer;
import org.warp.commonutils.functional.CancellableBiFunction;
import org.warp.commonutils.functional.CancellableTriConsumer;
import org.warp.commonutils.functional.CancellableTriFunction;
import org.warp.commonutils.functional.ConsumerResult;
import org.warp.commonutils.type.Bytes;
import org.warp.commonutils.type.UnmodifiableIterableMap;
import org.warp.commonutils.type.UnmodifiableMap;
import it.cavallium.dbengine.database.LLDeepDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
@NotAtomic
public class LLLocalDeepDictionary implements LLDeepDictionary {
@ -411,12 +412,12 @@ public class LLLocalDeepDictionary implements LLDeepDictionary {
}
@Override
public void forEach(@Nullable LLSnapshot snapshot, int parallelism, TriConsumer<byte[], byte[], byte[]> consumer) {
forEach_(consumer, snapshot == null ? null : snapshotResolver.apply(snapshot), parallelism);
public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableTriConsumer<byte[], byte[], byte[]> consumer) {
return forEach_(consumer, snapshot == null ? null : snapshotResolver.apply(snapshot), parallelism);
}
//todo: implement parallel execution
private void forEach_(TriConsumer<byte[], byte[], byte[]> consumer, @Nullable Snapshot snapshot, int parallelism) {
private ConsumerResult forEach_(CancellableTriConsumer<byte[], byte[], byte[]> consumer, @Nullable Snapshot snapshot, int parallelism) {
try (RocksIterator iterator = (snapshot != null ? db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot))
: db.newIterator(cfh))) {
iterator.seekToFirst();
@ -425,20 +426,24 @@ public class LLLocalDeepDictionary implements LLDeepDictionary {
var key1 = getKey1(combinedKey);
var key2 = getKey2(combinedKey);
consumer.accept(key1, key2, iterator.value());
var result = consumer.acceptCancellable(key1, key2, iterator.value());
if (result.isCancelled()) {
return ConsumerResult.cancelNext();
}
iterator.next();
}
return ConsumerResult.result();
}
}
@Override
public void forEach(@Nullable LLSnapshot snapshot, int parallelism, BiConsumer<byte[], UnmodifiableIterableMap<byte[], byte[]>> consumer) {
forEach_(consumer, snapshot == null ? null : snapshotResolver.apply(snapshot), parallelism);
public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer<byte[], UnmodifiableIterableMap<byte[], byte[]>> consumer) {
return forEach_(consumer, snapshot == null ? null : snapshotResolver.apply(snapshot), parallelism);
}
//todo: implement parallel execution
private void forEach_(BiConsumer<byte[], UnmodifiableIterableMap<byte[], byte[]>> consumer, @Nullable Snapshot snapshot, int parallelism) {
private ConsumerResult forEach_(CancellableBiConsumer<byte[], UnmodifiableIterableMap<byte[], byte[]>> consumer, @Nullable Snapshot snapshot, int parallelism) {
try (RocksIterator iterator = (snapshot != null ? db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot))
: db.newIterator(cfh))) {
iterator.seekToFirst();
@ -453,7 +458,10 @@ public class LLLocalDeepDictionary implements LLDeepDictionary {
if (currentKey1 == null || !Arrays.equals(currentKey1, key1)) {
if (currentKey1 != null && !key2Values.isEmpty()) {
consumer.accept(currentKey1, UnmodifiableIterableMap.of(key2Keys.toArray(byte[][]::new), key2Values.toArray(byte[][]::new)));
var result = consumer.acceptCancellable(currentKey1, UnmodifiableIterableMap.of(key2Keys.toArray(byte[][]::new), key2Values.toArray(byte[][]::new)));
if (result.isCancelled()) {
return ConsumerResult.cancelNext();
}
}
currentKey1 = key1;
key2Keys = new ArrayList<>();
@ -466,18 +474,22 @@ public class LLLocalDeepDictionary implements LLDeepDictionary {
iterator.next();
}
if (currentKey1 != null && !key2Values.isEmpty()) {
consumer.accept(currentKey1, UnmodifiableIterableMap.of(key2Keys.toArray(byte[][]::new), key2Values.toArray(byte[][]::new)));
var result = consumer.acceptCancellable(currentKey1, UnmodifiableIterableMap.of(key2Keys.toArray(byte[][]::new), key2Values.toArray(byte[][]::new)));
if (result.isCancelled()) {
return ConsumerResult.cancelNext();
}
}
return ConsumerResult.result();
}
}
@Override
public void forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key, BiConsumer<byte[], byte[]> consumer) {
forEach_(key, consumer, snapshot == null ? null : snapshotResolver.apply(snapshot), parallelism);
public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key, CancellableBiConsumer<byte[], byte[]> consumer) {
return forEach_(key, consumer, snapshot == null ? null : snapshotResolver.apply(snapshot), parallelism);
}
//todo: implement parallel execution
private void forEach_(byte[] key1, BiConsumer<byte[], byte[]> consumer, @Nullable Snapshot snapshot, int parallelism) {
private ConsumerResult forEach_(byte[] key1, CancellableBiConsumer<byte[], byte[]> consumer, @Nullable Snapshot snapshot, int parallelism) {
try (RocksIterator iterator = (snapshot != null ? db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot))
: db.newIterator(cfh))) {
iterator.seek(getStartSeekKey(key1));
@ -491,17 +503,21 @@ public class LLLocalDeepDictionary implements LLDeepDictionary {
byte[] key2 = getKey2(combinedKey);
byte[] value2 = iterator.value();
consumer.accept(key2, value2);
var result = consumer.acceptCancellable(key2, value2);
if (result.isCancelled()) {
return ConsumerResult.cancelNext();
}
iterator.next();
}
return ConsumerResult.result();
}
}
//todo: implement parallel execution
//todo: implement replaceKeys = false optimization (like in LLLocalDictionary), check if it's feasible
@Override
public void replaceAll(int parallelism, boolean replaceKeys, TriFunction<byte[], byte[], byte[], ImmutableTriple<byte[], byte[], byte[]>> consumer) throws IOException {
public ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableTriFunction<byte[], byte[], byte[], ImmutableTriple<byte[], byte[], byte[]>> consumer) throws IOException {
var snapshot = db.getSnapshot();
try {
try (RocksIterator iter = db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot));
@ -523,20 +539,28 @@ public class LLLocalDeepDictionary implements LLDeepDictionary {
var key1 = getKey1(combinedKey);
var key2 = getKey2(combinedKey);
var result = consumer.apply(key1, key2, iter.value());
if (result.getLeft().length != key1Size) {
throw new IndexOutOfBoundsException(result.getLeft().length, key1Size, key1Size);
var result = consumer.applyCancellable(key1, key2, iter.value());
if (result.getValue().getLeft().length != key1Size) {
throw new IndexOutOfBoundsException(result.getValue().getLeft().length, key1Size, key1Size);
}
if (result.getMiddle().length != key2Size) {
throw new IndexOutOfBoundsException(result.getMiddle().length, key2Size, key2Size);
if (result.getValue().getMiddle().length != key2Size) {
throw new IndexOutOfBoundsException(result.getValue().getMiddle().length, key2Size, key2Size);
}
writeBatch.put(cfh, getCombinedKey(result.getLeft(), result.getMiddle()), result.getRight());
writeBatch.put(cfh, getCombinedKey(result.getValue().getLeft(), result.getValue().getMiddle()), result.getValue().getRight());
if (result.isCancelled()) {
// Cancels and discards the write batch
writeBatch.clear();
return ConsumerResult.cancelNext();
}
iter.next();
}
writeBatch.writeToDbAndClose();
return ConsumerResult.result();
}
} catch (RocksDBException ex) {
throw new IOException(ex);
@ -549,7 +573,7 @@ public class LLLocalDeepDictionary implements LLDeepDictionary {
//todo: implement parallel execution
//todo: implement replaceKeys = false optimization (like in LLLocalDictionary), check if it's feasible
@Override
public void replaceAll(int parallelism, boolean replaceKeys, BiFunction<byte[], UnmodifiableIterableMap<byte[], byte[]>, Entry<byte[], UnmodifiableMap<Bytes, byte[]>>> consumer)
public ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction<byte[], UnmodifiableIterableMap<byte[], byte[]>, Entry<byte[], UnmodifiableMap<Bytes, byte[]>>> consumer)
throws IOException {
try {
var snapshot = db.getSnapshot();
@ -578,12 +602,18 @@ public class LLLocalDeepDictionary implements LLDeepDictionary {
if (currentKey1 == null || !Arrays.equals(currentKey1, key1)) {
if (currentKey1 != null && !key2Values.isEmpty()) {
replaceAll_(writeBatch,
var result = replaceAll_(writeBatch,
currentKey1,
key2Keys.toArray(byte[][]::new),
key2Values.toArray(byte[][]::new),
consumer
);
if (result.isCancelled()) {
// Cancels and discards the write batch
writeBatch.clear();
return ConsumerResult.cancelNext();
}
}
currentKey1 = key1;
key2Keys = new ObjectArrayList<>();
@ -596,15 +626,23 @@ public class LLLocalDeepDictionary implements LLDeepDictionary {
iter.next();
}
if (currentKey1 != null && !key2Values.isEmpty()) {
replaceAll_(writeBatch,
var result = replaceAll_(writeBatch,
currentKey1,
key2Keys.toArray(byte[][]::new),
key2Values.toArray(byte[][]::new),
consumer
);
if (result.isCancelled()) {
// Cancels and discards the write batch
writeBatch.clear();
return ConsumerResult.cancelNext();
}
}
writeBatch.writeToDbAndClose();
return ConsumerResult.result();
} finally {
db.releaseSnapshot(snapshot);
snapshot.close();
@ -614,23 +652,23 @@ public class LLLocalDeepDictionary implements LLDeepDictionary {
}
}
private void replaceAll_(WriteBatchInterface writeBatch,
private ConsumerResult replaceAll_(WriteBatchInterface writeBatch,
byte[] key1,
byte[][] key2Keys,
byte[][] key2Values,
BiFunction<byte[], UnmodifiableIterableMap<byte[], byte[]>, Entry<byte[], UnmodifiableMap<Bytes, byte[]>>> consumer)
CancellableBiFunction<byte[], UnmodifiableIterableMap<byte[], byte[]>, Entry<byte[], UnmodifiableMap<Bytes, byte[]>>> consumer)
throws RocksDBException {
if (key1.length != key1Size) {
throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
}
var previousValues = UnmodifiableMap.of(key2Keys, key2Values);
var result = consumer.apply(key1, previousValues);
var result = consumer.applyCancellable(key1, previousValues);
var resultKey1 = result.getKey();
var resultKey1 = result.getValue().getKey();
if (resultKey1.length != key1Size) {
throw new IndexOutOfBoundsException(resultKey1.length, key1Size, key1Size);
}
var resultValues = result.getValue();
var resultValues = result.getValue().getValue();
var mapIterator = resultValues.fastIterator();
while (mapIterator.hasNext()) {
@ -642,13 +680,20 @@ public class LLLocalDeepDictionary implements LLDeepDictionary {
var value2 = mapEntry.getValue();
writeBatch.put(cfh, getCombinedKey(key1, key2.data), value2);
if (result.isCancelled()) {
// Cancels and discards the write batch
writeBatch.clear();
return ConsumerResult.cancelNext();
}
}
return ConsumerResult.result();
}
//todo: implement parallel execution
//todo: implement replaceKeys = false optimization (like in LLLocalDictionary), check if it's feasible
@Override
public void replaceAll(int parallelism, boolean replaceKeys, byte[] key1, BiFunction<byte[], byte[], Entry<byte[], byte[]>> consumer) throws IOException {
public ConsumerResult replaceAll(int parallelism, boolean replaceKeys, byte[] key1, CancellableBiFunction<byte[], byte[], Entry<byte[], byte[]>> consumer) throws IOException {
if (key1.length != key1Size) {
throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
}
@ -685,17 +730,25 @@ public class LLLocalDeepDictionary implements LLDeepDictionary {
byte[] key2 = getKey2(combinedKey);
byte[] value2 = iter.value();
var result = consumer.apply(key2, value2);
if (result.getKey().length != key2Size) {
throw new IndexOutOfBoundsException(result.getKey().length, key2Size, key2Size);
var result = consumer.applyCancellable(key2, value2);
if (result.getValue().getKey().length != key2Size) {
throw new IndexOutOfBoundsException(result.getValue().getKey().length, key2Size, key2Size);
}
writeBatch.put(cfh, result.getKey(), result.getValue());
writeBatch.put(cfh, result.getValue().getKey(), result.getValue().getValue());
if (result.isCancelled()) {
// Cancels and discards the write batch
writeBatch.clear();
return ConsumerResult.cancelNext();
}
iter.next();
}
writeBatch.writeToDbAndClose();
return ConsumerResult.result();
} finally {
db.releaseSnapshot(snapshot);
snapshot.close();

View File

@ -1,5 +1,9 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@ -8,8 +12,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
@ -25,10 +27,9 @@ import org.rocksdb.Snapshot;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import org.warp.commonutils.functional.CancellableBiConsumer;
import org.warp.commonutils.functional.CancellableBiFunction;
import org.warp.commonutils.functional.ConsumerResult;
@NotAtomic
public class LLLocalDictionary implements LLDictionary {
@ -217,19 +218,22 @@ public class LLLocalDictionary implements LLDictionary {
//todo: implement parallel forEach
@Override
public void forEach(@Nullable LLSnapshot snapshot, int parallelism, BiConsumer<byte[], byte[]> consumer) {
public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer<byte[], byte[]> consumer) {
try (RocksIterator iter = db.newIterator(cfh, resolveSnapshot(snapshot))) {
iter.seekToFirst();
while (iter.isValid()) {
consumer.accept(iter.key(), iter.value());
if (consumer.acceptCancellable(iter.key(), iter.value()).isCancelled()) {
return ConsumerResult.cancelNext();
}
iter.next();
}
}
return ConsumerResult.result();
}
//todo: implement parallel replace
@Override
public void replaceAll(int parallelism, boolean replaceKeys, BiFunction<byte[], byte[], Entry<byte[], byte[]>> consumer) throws IOException {
public ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction<byte[], byte[], Entry<byte[], byte[]>> consumer) throws IOException {
try {
try (var snapshot = replaceKeys ? db.getSnapshot() : null) {
try (RocksIterator iter = db.newIterator(cfh, getReadOptions(snapshot));
@ -249,21 +253,29 @@ public class LLLocalDictionary implements LLDictionary {
while (iter.isValid()) {
var result = consumer.apply(iter.key(), iter.value());
boolean keyDiffers = !Arrays.equals(iter.key(), result.getKey());
var result = consumer.applyCancellable(iter.key(), iter.value());
boolean keyDiffers = !Arrays.equals(iter.key(), result.getValue().getKey());
if (!replaceKeys && keyDiffers) {
throw new IOException("Tried to replace a key");
}
// put if changed or if keys can be swapped/replaced
if (replaceKeys || !Arrays.equals(iter.value(), result.getValue())) {
writeBatch.put(cfh, result.getKey(), result.getValue());
if (replaceKeys || !Arrays.equals(iter.value(), result.getValue().getValue())) {
writeBatch.put(cfh, result.getValue().getKey(), result.getValue().getValue());
}
if (result.isCancelled()) {
// Cancels and discards the write batch
writeBatch.clear();
return ConsumerResult.cancelNext();
}
iter.next();
}
writeBatch.writeToDbAndClose();
return ConsumerResult.result();
} finally {
db.releaseSnapshot(snapshot);
}

View File

@ -24,12 +24,13 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
import org.warp.commonutils.functional.CancellableBiConsumer;
import org.warp.commonutils.functional.CancellableBiFunction;
import org.warp.commonutils.functional.ConsumerResult;
@NotAtomic
public class LLRemoteDictionary implements LLDictionary {
@ -155,26 +156,33 @@ public class LLRemoteDictionary implements LLDictionary {
}
@Override
public void forEach(@Nullable LLSnapshot snapshot, int parallelism, BiConsumer<byte[], byte[]> consumer) {
public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer<byte[], byte[]> consumer) {
try {
var request = DictionaryMethodForEachRequest.newBuilder().setDictionaryHandle(handle);
if (snapshot != null) {
request.setSequenceNumber(snapshot.getSequenceNumber());
}
var response = blockingStub.dictionaryMethodForEach(request.build());
response.forEachRemaining((entry) -> {
while (response.hasNext()) {
var entry = response.next();
var key = entry.getKey().toByteArray();
var value = entry.getValue().toByteArray();
consumer.accept(key, value);
});
var cancelled = consumer.acceptCancellable(key, value);
if (cancelled.isCancelled()) {
return ConsumerResult.cancelNext();
}
}
return ConsumerResult.result();
} catch (StatusRuntimeException ex) {
throw new IOError(ex);
}
}
@Override
public void replaceAll(int parallelism, boolean replaceKeys, BiFunction<byte[], byte[], Entry<byte[], byte[]>> consumer) throws IOException {
public ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction<byte[], byte[], Entry<byte[], byte[]>> consumer) throws IOException {
try {
//todo: reimplement remote replaceAll using writeBatch
//todo: implement cancellation during iteration
var response = blockingStub
.dictionaryMethodReplaceAll(DictionaryMethodReplaceAllRequest.newBuilder()
.setDictionaryHandle(handle)
@ -183,18 +191,19 @@ public class LLRemoteDictionary implements LLDictionary {
response.forEachRemaining((entry) -> {
var key = entry.getKey().toByteArray();
var value = entry.getValue().toByteArray();
var singleResponse = consumer.apply(key, value);
var singleResponse = consumer.applyCancellable(key, value);
boolean keyDiffers = false;
if (!Arrays.equals(key, singleResponse.getKey())) {
if (!Arrays.equals(key, singleResponse.getValue().getKey())) {
remove_(key, LLDictionaryResultType.VOID);
keyDiffers = true;
}
// put if changed
if (keyDiffers || !Arrays.equals(value, singleResponse.getValue())) {
put_(singleResponse.getKey(), singleResponse.getValue(), LLDictionaryResultType.VOID);
if (keyDiffers || !Arrays.equals(value, singleResponse.getValue().getValue())) {
put_(singleResponse.getValue().getKey(), singleResponse.getValue().getValue(), LLDictionaryResultType.VOID);
}
});
return ConsumerResult.result();
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}

View File

@ -80,6 +80,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.warp.commonutils.functional.ConsumerResult;
public class DbServerFunctions extends CavalliumDBEngineServiceGrpc.CavalliumDBEngineServiceImplBase {
@ -562,6 +563,7 @@ public class DbServerFunctions extends CavalliumDBEngineServiceGrpc.CavalliumDBE
response.setKey(ByteString.copyFrom(key));
response.setValue(ByteString.copyFrom(val));
responseObserver.onNext(response.build());
return ConsumerResult.result();
});
responseObserver.onCompleted();
}

View File

@ -1,24 +1,25 @@
package it.cavallium.dbengine.database.structures;
import it.cavallium.dbengine.database.LLDeepDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLKeyValueDatabaseStructure;
import it.cavallium.dbengine.database.LLSnapshot;
import java.io.IOException;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.functional.TriConsumer;
import org.warp.commonutils.functional.TriFunction;
import org.warp.commonutils.functional.CancellableBiConsumer;
import org.warp.commonutils.functional.CancellableBiFunction;
import org.warp.commonutils.functional.CancellableTriConsumer;
import org.warp.commonutils.functional.CancellableTriFunction;
import org.warp.commonutils.functional.ConsumerResult;
import org.warp.commonutils.type.Bytes;
import org.warp.commonutils.type.UnmodifiableIterableMap;
import org.warp.commonutils.type.UnmodifiableMap;
import it.cavallium.dbengine.database.LLDeepDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLKeyValueDatabaseStructure;
import it.cavallium.dbengine.database.LLSnapshot;
public class LLDeepMap implements LLKeyValueDatabaseStructure {
@ -79,27 +80,27 @@ public class LLDeepMap implements LLKeyValueDatabaseStructure {
return dictionary.remove(key1, key2, resultType.getDictionaryResultType());
}
public void forEach(@Nullable LLSnapshot snapshot, int parallelism, BiConsumer<byte[], UnmodifiableIterableMap<byte[], byte[]>> consumer) {
dictionary.forEach(snapshot, parallelism, consumer);
public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer<byte[], UnmodifiableIterableMap<byte[], byte[]>> consumer) {
return dictionary.forEach(snapshot, parallelism, consumer);
}
public void forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key1, BiConsumer<byte[], byte[]> consumer) {
dictionary.forEach(snapshot, parallelism, key1, consumer);
public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key1, CancellableBiConsumer<byte[], byte[]> consumer) {
return dictionary.forEach(snapshot, parallelism, key1, consumer);
}
public void replaceAll(int parallelism, boolean replaceKeys, BiFunction<byte[], UnmodifiableIterableMap<byte[], byte[]>, Entry<byte[], UnmodifiableMap<Bytes, byte[]>>> consumer) throws IOException {
public void replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction<byte[], UnmodifiableIterableMap<byte[], byte[]>, Entry<byte[], UnmodifiableMap<Bytes, byte[]>>> consumer) throws IOException {
dictionary.replaceAll(parallelism, replaceKeys, consumer);
}
public void replaceAll(int parallelism, boolean replaceKeys, byte[] key1, BiFunction<byte[], byte[], Entry<byte[], byte[]>> consumer) throws IOException {
public void replaceAll(int parallelism, boolean replaceKeys, byte[] key1, CancellableBiFunction<byte[], byte[], Entry<byte[], byte[]>> consumer) throws IOException {
dictionary.replaceAll(parallelism, replaceKeys, key1, consumer);
}
public void forEach(@Nullable LLSnapshot snapshot, int parallelism, TriConsumer<byte[], byte[], byte[]> consumer) {
dictionary.forEach(snapshot, parallelism, consumer);
public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableTriConsumer<byte[], byte[], byte[]> consumer) {
return dictionary.forEach(snapshot, parallelism, consumer);
}
public void replaceAll(int parallelism, boolean replaceKeys, TriFunction<byte[], byte[], byte[], ImmutableTriple<byte[], byte[], byte[]>> consumer) throws IOException {
public void replaceAll(int parallelism, boolean replaceKeys, CancellableTriFunction<byte[], byte[], byte[], ImmutableTriple<byte[], byte[], byte[]>> consumer) throws IOException {
dictionary.replaceAll(parallelism, replaceKeys, consumer);
}

View File

@ -12,11 +12,12 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.functional.CancellableBiConsumer;
import org.warp.commonutils.functional.CancellableBiFunction;
import org.warp.commonutils.functional.CancellableConsumer;
import org.warp.commonutils.functional.CancellableFunction;
import org.warp.commonutils.functional.ConsumerResult;
import org.warp.commonutils.type.Bytes;
import org.warp.commonutils.type.UnmodifiableIterableMap;
import org.warp.commonutils.type.UnmodifiableIterableSet;
@ -112,26 +113,26 @@ public class LLFixedDeepSet implements LLKeyValueDatabaseStructure {
return false;
}
public void forEach(@Nullable LLSnapshot snapshot, int parallelism, BiConsumer<byte[], UnmodifiableIterableSet<byte[]>> consumer) {
dictionary.forEach(snapshot, parallelism, (key1, entries) -> consumer.accept(key1, entries.toUnmodifiableIterableKeysSet(byte[][]::new)));
public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer<byte[], UnmodifiableIterableSet<byte[]>> consumer) {
return dictionary.forEach(snapshot, parallelism, (key1, entries) -> consumer.acceptCancellable(key1, entries.toUnmodifiableIterableKeysSet(byte[][]::new)));
}
public void forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key1, Consumer<byte[]> consumer) {
dictionary.forEach(snapshot, parallelism, key1, (value, empty) -> consumer.accept(value));
public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key1, CancellableConsumer<byte[]> consumer) {
return dictionary.forEach(snapshot, parallelism, key1, (value, empty) -> consumer.acceptCancellable(value));
}
public void replaceAll(int parallelism, BiFunction<byte[], UnmodifiableIterableSet<byte[]>, Entry<byte[], UnmodifiableSet<Bytes>>> consumer) throws IOException {
public void replaceAll(int parallelism, CancellableBiFunction<byte[], UnmodifiableIterableSet<byte[]>, Entry<byte[], UnmodifiableSet<Bytes>>> consumer) throws IOException {
dictionary.replaceAll(parallelism, true, (key1, entries) -> {
var result = consumer.apply(key1, entries.toUnmodifiableIterableKeysSet(byte[][]::new));
var resultItems = result.getValue().toArray(Bytes[]::new);
return Map.entry(result.getKey(), UnmodifiableMap.of(resultItems, generateEmptyArray(resultItems.length)));
var result = consumer.applyCancellable(key1, entries.toUnmodifiableIterableKeysSet(byte[][]::new));
var resultItems = result.getValue().getValue().toArray(Bytes[]::new);
return result.copyStatusWith(Map.entry(result.getValue().getKey(), UnmodifiableMap.of(resultItems, generateEmptyArray(resultItems.length))));
});
}
public void replaceAll(int parallelism, byte[] key1, Function<byte[], byte[]> consumer) throws IOException {
public void replaceAll(int parallelism, byte[] key1, CancellableFunction<byte[], byte[]> consumer) throws IOException {
dictionary.replaceAll(parallelism, true, key1, (value, empty) -> {
var changedValue = consumer.apply(value);
return Map.entry(changedValue, EMPTY_VALUE);
var changedValue = consumer.applyCancellable(value);
return changedValue.copyStatusWith(Map.entry(changedValue.getValue(), EMPTY_VALUE));
});
}

View File

@ -9,10 +9,11 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.functional.CancellableBiConsumer;
import org.warp.commonutils.functional.CancellableBiFunction;
import org.warp.commonutils.functional.ConsumerResult;
public class LLMap implements LLKeyValueDatabaseStructure {
@ -56,15 +57,15 @@ public class LLMap implements LLKeyValueDatabaseStructure {
/**
* The consumer can be called from different threads
*/
public void forEach(@Nullable LLSnapshot snapshot, int parallelism, BiConsumer<byte[], byte[]> consumer) {
dictionary.forEach(snapshot, parallelism, consumer);
public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer<byte[], byte[]> consumer) {
return dictionary.forEach(snapshot, parallelism, consumer);
}
/**
* The consumer can be called from different threads
*/
public void replaceAll(int parallelism, boolean replaceKeys, BiFunction<byte[], byte[], Entry<byte[], byte[]>> consumer) throws IOException {
dictionary.replaceAll(parallelism, replaceKeys, consumer);
public ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction<byte[], byte[], Entry<byte[], byte[]>> consumer) throws IOException {
return dictionary.replaceAll(parallelism, replaceKeys, consumer);
}
@Override

View File

@ -9,9 +9,10 @@ import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.functional.CancellableConsumer;
import org.warp.commonutils.functional.CancellableFunction;
import org.warp.commonutils.functional.ConsumerResult;
public class LLSet implements LLKeyValueDatabaseStructure {
@ -63,12 +64,15 @@ public class LLSet implements LLKeyValueDatabaseStructure {
dictionary.clear();
}
public void forEach(@Nullable LLSnapshot snapshot, int parallelism, Consumer<byte[]> consumer) {
dictionary.forEach(snapshot, parallelism, (key, emptyValue) -> consumer.accept(key));
public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableConsumer<byte[]> consumer) {
return dictionary.forEach(snapshot, parallelism, (key, emptyValue) -> consumer.acceptCancellable(key));
}
public void replaceAll(int parallelism, Function<byte[], byte[]> consumer) throws IOException {
dictionary.replaceAll(parallelism, true, (key, emptyValue) -> Map.entry(consumer.apply(key), emptyValue));
public ConsumerResult replaceAll(int parallelism, CancellableFunction<byte[], byte[]> consumer) throws IOException {
return dictionary.replaceAll(parallelism, true, (key, emptyValue) -> {
var result = consumer.applyCancellable(key);
return result.copyStatusWith(Map.entry(result.getValue(), emptyValue));
});
}
public long size(@Nullable LLSnapshot snapshot, boolean fast) throws IOException {