From 1418821b4828ee72faf135bf4f42dcf20a871086 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sun, 17 Jan 2021 18:31:25 +0100 Subject: [PATCH] Cancellable iterations --- pom.xml | 5 + .../dbengine/database/LLDeepDictionary.java | 21 +-- .../dbengine/database/LLDictionary.java | 9 +- .../database/disk/LLLocalDeepDictionary.java | 135 ++++++++++++------ .../database/disk/LLLocalDictionary.java | 38 +++-- .../remote/client/LLRemoteDictionary.java | 31 ++-- .../remote/server/DbServerFunctions.java | 2 + .../database/structures/LLDeepMap.java | 35 ++--- .../database/structures/LLFixedDeepSet.java | 31 ++-- .../dbengine/database/structures/LLMap.java | 13 +- .../dbengine/database/structures/LLSet.java | 16 ++- 11 files changed, 213 insertions(+), 123 deletions(-) diff --git a/pom.xml b/pom.xml index 21510f4..4b7231a 100644 --- a/pom.xml +++ b/pom.xml @@ -163,6 +163,11 @@ lucene-core 8.6.2 + + org.apache.lucene + lucene-join + 8.6.2 + org.apache.lucene lucene-analyzers-common diff --git a/src/main/java/it/cavallium/dbengine/database/LLDeepDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDeepDictionary.java index 3e25275..6136d4f 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDeepDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDeepDictionary.java @@ -3,14 +3,15 @@ package it.cavallium.dbengine.database; import java.io.IOException; import java.util.Map.Entry; import java.util.Optional; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; import java.util.function.Consumer; import org.apache.commons.lang3.tuple.ImmutableTriple; import org.jetbrains.annotations.Nullable; import org.warp.commonutils.concurrency.atomicity.NotAtomic; -import org.warp.commonutils.functional.TriConsumer; -import org.warp.commonutils.functional.TriFunction; +import org.warp.commonutils.functional.CancellableBiConsumer; +import org.warp.commonutils.functional.CancellableBiFunction; +import org.warp.commonutils.functional.CancellableTriConsumer; +import org.warp.commonutils.functional.CancellableTriFunction; +import org.warp.commonutils.functional.ConsumerResult; import org.warp.commonutils.type.Bytes; import org.warp.commonutils.type.UnmodifiableIterableMap; import org.warp.commonutils.type.UnmodifiableMap; @@ -49,18 +50,18 @@ public interface LLDeepDictionary extends LLKeyValueDatabaseStructure { Optional remove(byte[] key1, byte[] key2, LLDictionaryResultType resultType) throws IOException; - void forEach(@Nullable LLSnapshot snapshot, int parallelism, TriConsumer consumer); + ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableTriConsumer consumer); - void forEach(@Nullable LLSnapshot snapshot, int parallelism, BiConsumer> consumer); + ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer> consumer); - void forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key1, BiConsumer consumer); + ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key1, CancellableBiConsumer consumer); - void replaceAll(int parallelism, boolean replaceKeys, TriFunction> consumer) throws IOException; + ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableTriFunction> consumer) throws IOException; - void replaceAll(int parallelism, boolean replaceKeys, BiFunction, Entry>> consumer) throws IOException; + ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction, Entry>> consumer) throws IOException; - void replaceAll(int parallelism, boolean replaceKeys, byte[] key1, BiFunction> consumer) throws IOException; + ConsumerResult replaceAll(int parallelism, boolean replaceKeys, byte[] key1, CancellableBiFunction> consumer) throws IOException; long size(@Nullable LLSnapshot snapshot, boolean fast) throws IOException; diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java index 4fc782e..39195fb 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java @@ -3,11 +3,12 @@ package it.cavallium.dbengine.database; import java.io.IOException; import java.util.Map.Entry; import java.util.Optional; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; import java.util.function.Consumer; import org.jetbrains.annotations.Nullable; import org.warp.commonutils.concurrency.atomicity.NotAtomic; +import org.warp.commonutils.functional.CancellableBiConsumer; +import org.warp.commonutils.functional.CancellableBiFunction; +import org.warp.commonutils.functional.ConsumerResult; @NotAtomic public interface LLDictionary extends LLKeyValueDatabaseStructure { @@ -27,12 +28,12 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure { /** * This method can call the consumer from different threads in parallel */ - void forEach(@Nullable LLSnapshot snapshot, int parallelism, BiConsumer consumer); + ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer consumer); /** * This method can call the consumer from different threads in parallel */ - void replaceAll(int parallelism, boolean replaceKeys, BiFunction> consumer) throws IOException; + ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction> consumer) throws IOException; void clear() throws IOException; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDeepDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDeepDictionary.java index 9280822..c81d827 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDeepDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDeepDictionary.java @@ -1,5 +1,9 @@ package it.cavallium.dbengine.database.disk; +import it.cavallium.dbengine.database.LLDeepDictionary; +import it.cavallium.dbengine.database.LLDictionaryResultType; +import it.cavallium.dbengine.database.LLSnapshot; +import it.cavallium.dbengine.database.LLUtils; import it.unimi.dsi.fastutil.objects.ObjectArrayList; import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; import java.io.IOException; @@ -9,8 +13,6 @@ import java.util.List; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import org.apache.commons.lang3.tuple.ImmutableTriple; @@ -27,15 +29,14 @@ import org.rocksdb.Snapshot; import org.rocksdb.WriteBatchInterface; import org.warp.commonutils.concurrency.atomicity.NotAtomic; import org.warp.commonutils.error.IndexOutOfBoundsException; -import org.warp.commonutils.functional.TriConsumer; -import org.warp.commonutils.functional.TriFunction; +import org.warp.commonutils.functional.CancellableBiConsumer; +import org.warp.commonutils.functional.CancellableBiFunction; +import org.warp.commonutils.functional.CancellableTriConsumer; +import org.warp.commonutils.functional.CancellableTriFunction; +import org.warp.commonutils.functional.ConsumerResult; import org.warp.commonutils.type.Bytes; import org.warp.commonutils.type.UnmodifiableIterableMap; import org.warp.commonutils.type.UnmodifiableMap; -import it.cavallium.dbengine.database.LLDeepDictionary; -import it.cavallium.dbengine.database.LLDictionaryResultType; -import it.cavallium.dbengine.database.LLSnapshot; -import it.cavallium.dbengine.database.LLUtils; @NotAtomic public class LLLocalDeepDictionary implements LLDeepDictionary { @@ -411,12 +412,12 @@ public class LLLocalDeepDictionary implements LLDeepDictionary { } @Override - public void forEach(@Nullable LLSnapshot snapshot, int parallelism, TriConsumer consumer) { - forEach_(consumer, snapshot == null ? null : snapshotResolver.apply(snapshot), parallelism); + public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableTriConsumer consumer) { + return forEach_(consumer, snapshot == null ? null : snapshotResolver.apply(snapshot), parallelism); } //todo: implement parallel execution - private void forEach_(TriConsumer consumer, @Nullable Snapshot snapshot, int parallelism) { + private ConsumerResult forEach_(CancellableTriConsumer consumer, @Nullable Snapshot snapshot, int parallelism) { try (RocksIterator iterator = (snapshot != null ? db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot)) : db.newIterator(cfh))) { iterator.seekToFirst(); @@ -425,20 +426,24 @@ public class LLLocalDeepDictionary implements LLDeepDictionary { var key1 = getKey1(combinedKey); var key2 = getKey2(combinedKey); - consumer.accept(key1, key2, iterator.value()); + var result = consumer.acceptCancellable(key1, key2, iterator.value()); + if (result.isCancelled()) { + return ConsumerResult.cancelNext(); + } iterator.next(); } + return ConsumerResult.result(); } } @Override - public void forEach(@Nullable LLSnapshot snapshot, int parallelism, BiConsumer> consumer) { - forEach_(consumer, snapshot == null ? null : snapshotResolver.apply(snapshot), parallelism); + public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer> consumer) { + return forEach_(consumer, snapshot == null ? null : snapshotResolver.apply(snapshot), parallelism); } //todo: implement parallel execution - private void forEach_(BiConsumer> consumer, @Nullable Snapshot snapshot, int parallelism) { + private ConsumerResult forEach_(CancellableBiConsumer> consumer, @Nullable Snapshot snapshot, int parallelism) { try (RocksIterator iterator = (snapshot != null ? db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot)) : db.newIterator(cfh))) { iterator.seekToFirst(); @@ -453,7 +458,10 @@ public class LLLocalDeepDictionary implements LLDeepDictionary { if (currentKey1 == null || !Arrays.equals(currentKey1, key1)) { if (currentKey1 != null && !key2Values.isEmpty()) { - consumer.accept(currentKey1, UnmodifiableIterableMap.of(key2Keys.toArray(byte[][]::new), key2Values.toArray(byte[][]::new))); + var result = consumer.acceptCancellable(currentKey1, UnmodifiableIterableMap.of(key2Keys.toArray(byte[][]::new), key2Values.toArray(byte[][]::new))); + if (result.isCancelled()) { + return ConsumerResult.cancelNext(); + } } currentKey1 = key1; key2Keys = new ArrayList<>(); @@ -466,18 +474,22 @@ public class LLLocalDeepDictionary implements LLDeepDictionary { iterator.next(); } if (currentKey1 != null && !key2Values.isEmpty()) { - consumer.accept(currentKey1, UnmodifiableIterableMap.of(key2Keys.toArray(byte[][]::new), key2Values.toArray(byte[][]::new))); + var result = consumer.acceptCancellable(currentKey1, UnmodifiableIterableMap.of(key2Keys.toArray(byte[][]::new), key2Values.toArray(byte[][]::new))); + if (result.isCancelled()) { + return ConsumerResult.cancelNext(); + } } + return ConsumerResult.result(); } } @Override - public void forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key, BiConsumer consumer) { - forEach_(key, consumer, snapshot == null ? null : snapshotResolver.apply(snapshot), parallelism); + public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key, CancellableBiConsumer consumer) { + return forEach_(key, consumer, snapshot == null ? null : snapshotResolver.apply(snapshot), parallelism); } //todo: implement parallel execution - private void forEach_(byte[] key1, BiConsumer consumer, @Nullable Snapshot snapshot, int parallelism) { + private ConsumerResult forEach_(byte[] key1, CancellableBiConsumer consumer, @Nullable Snapshot snapshot, int parallelism) { try (RocksIterator iterator = (snapshot != null ? db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot)) : db.newIterator(cfh))) { iterator.seek(getStartSeekKey(key1)); @@ -491,17 +503,21 @@ public class LLLocalDeepDictionary implements LLDeepDictionary { byte[] key2 = getKey2(combinedKey); byte[] value2 = iterator.value(); - consumer.accept(key2, value2); + var result = consumer.acceptCancellable(key2, value2); + if (result.isCancelled()) { + return ConsumerResult.cancelNext(); + } iterator.next(); } + return ConsumerResult.result(); } } //todo: implement parallel execution //todo: implement replaceKeys = false optimization (like in LLLocalDictionary), check if it's feasible @Override - public void replaceAll(int parallelism, boolean replaceKeys, TriFunction> consumer) throws IOException { + public ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableTriFunction> consumer) throws IOException { var snapshot = db.getSnapshot(); try { try (RocksIterator iter = db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot)); @@ -523,20 +539,28 @@ public class LLLocalDeepDictionary implements LLDeepDictionary { var key1 = getKey1(combinedKey); var key2 = getKey2(combinedKey); - var result = consumer.apply(key1, key2, iter.value()); - if (result.getLeft().length != key1Size) { - throw new IndexOutOfBoundsException(result.getLeft().length, key1Size, key1Size); + var result = consumer.applyCancellable(key1, key2, iter.value()); + if (result.getValue().getLeft().length != key1Size) { + throw new IndexOutOfBoundsException(result.getValue().getLeft().length, key1Size, key1Size); } - if (result.getMiddle().length != key2Size) { - throw new IndexOutOfBoundsException(result.getMiddle().length, key2Size, key2Size); + if (result.getValue().getMiddle().length != key2Size) { + throw new IndexOutOfBoundsException(result.getValue().getMiddle().length, key2Size, key2Size); } - writeBatch.put(cfh, getCombinedKey(result.getLeft(), result.getMiddle()), result.getRight()); + writeBatch.put(cfh, getCombinedKey(result.getValue().getLeft(), result.getValue().getMiddle()), result.getValue().getRight()); + + if (result.isCancelled()) { + // Cancels and discards the write batch + writeBatch.clear(); + return ConsumerResult.cancelNext(); + } iter.next(); } writeBatch.writeToDbAndClose(); + + return ConsumerResult.result(); } } catch (RocksDBException ex) { throw new IOException(ex); @@ -549,7 +573,7 @@ public class LLLocalDeepDictionary implements LLDeepDictionary { //todo: implement parallel execution //todo: implement replaceKeys = false optimization (like in LLLocalDictionary), check if it's feasible @Override - public void replaceAll(int parallelism, boolean replaceKeys, BiFunction, Entry>> consumer) + public ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction, Entry>> consumer) throws IOException { try { var snapshot = db.getSnapshot(); @@ -578,12 +602,18 @@ public class LLLocalDeepDictionary implements LLDeepDictionary { if (currentKey1 == null || !Arrays.equals(currentKey1, key1)) { if (currentKey1 != null && !key2Values.isEmpty()) { - replaceAll_(writeBatch, + var result = replaceAll_(writeBatch, currentKey1, key2Keys.toArray(byte[][]::new), key2Values.toArray(byte[][]::new), consumer ); + + if (result.isCancelled()) { + // Cancels and discards the write batch + writeBatch.clear(); + return ConsumerResult.cancelNext(); + } } currentKey1 = key1; key2Keys = new ObjectArrayList<>(); @@ -596,15 +626,23 @@ public class LLLocalDeepDictionary implements LLDeepDictionary { iter.next(); } if (currentKey1 != null && !key2Values.isEmpty()) { - replaceAll_(writeBatch, + var result = replaceAll_(writeBatch, currentKey1, key2Keys.toArray(byte[][]::new), key2Values.toArray(byte[][]::new), consumer ); + + if (result.isCancelled()) { + // Cancels and discards the write batch + writeBatch.clear(); + return ConsumerResult.cancelNext(); + } } writeBatch.writeToDbAndClose(); + + return ConsumerResult.result(); } finally { db.releaseSnapshot(snapshot); snapshot.close(); @@ -614,23 +652,23 @@ public class LLLocalDeepDictionary implements LLDeepDictionary { } } - private void replaceAll_(WriteBatchInterface writeBatch, + private ConsumerResult replaceAll_(WriteBatchInterface writeBatch, byte[] key1, byte[][] key2Keys, byte[][] key2Values, - BiFunction, Entry>> consumer) + CancellableBiFunction, Entry>> consumer) throws RocksDBException { if (key1.length != key1Size) { throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size); } var previousValues = UnmodifiableMap.of(key2Keys, key2Values); - var result = consumer.apply(key1, previousValues); + var result = consumer.applyCancellable(key1, previousValues); - var resultKey1 = result.getKey(); + var resultKey1 = result.getValue().getKey(); if (resultKey1.length != key1Size) { throw new IndexOutOfBoundsException(resultKey1.length, key1Size, key1Size); } - var resultValues = result.getValue(); + var resultValues = result.getValue().getValue(); var mapIterator = resultValues.fastIterator(); while (mapIterator.hasNext()) { @@ -642,13 +680,20 @@ public class LLLocalDeepDictionary implements LLDeepDictionary { var value2 = mapEntry.getValue(); writeBatch.put(cfh, getCombinedKey(key1, key2.data), value2); + + if (result.isCancelled()) { + // Cancels and discards the write batch + writeBatch.clear(); + return ConsumerResult.cancelNext(); + } } + return ConsumerResult.result(); } //todo: implement parallel execution //todo: implement replaceKeys = false optimization (like in LLLocalDictionary), check if it's feasible @Override - public void replaceAll(int parallelism, boolean replaceKeys, byte[] key1, BiFunction> consumer) throws IOException { + public ConsumerResult replaceAll(int parallelism, boolean replaceKeys, byte[] key1, CancellableBiFunction> consumer) throws IOException { if (key1.length != key1Size) { throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size); } @@ -685,17 +730,25 @@ public class LLLocalDeepDictionary implements LLDeepDictionary { byte[] key2 = getKey2(combinedKey); byte[] value2 = iter.value(); - var result = consumer.apply(key2, value2); - if (result.getKey().length != key2Size) { - throw new IndexOutOfBoundsException(result.getKey().length, key2Size, key2Size); + var result = consumer.applyCancellable(key2, value2); + if (result.getValue().getKey().length != key2Size) { + throw new IndexOutOfBoundsException(result.getValue().getKey().length, key2Size, key2Size); } - writeBatch.put(cfh, result.getKey(), result.getValue()); + writeBatch.put(cfh, result.getValue().getKey(), result.getValue().getValue()); + + if (result.isCancelled()) { + // Cancels and discards the write batch + writeBatch.clear(); + return ConsumerResult.cancelNext(); + } iter.next(); } writeBatch.writeToDbAndClose(); + + return ConsumerResult.result(); } finally { db.releaseSnapshot(snapshot); snapshot.close(); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 5d4fe90..004cbed 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -1,5 +1,9 @@ package it.cavallium.dbengine.database.disk; +import it.cavallium.dbengine.database.LLDictionary; +import it.cavallium.dbengine.database.LLDictionaryResultType; +import it.cavallium.dbengine.database.LLSnapshot; +import it.cavallium.dbengine.database.LLUtils; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -8,8 +12,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import org.jetbrains.annotations.NotNull; @@ -25,10 +27,9 @@ import org.rocksdb.Snapshot; import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; import org.warp.commonutils.concurrency.atomicity.NotAtomic; -import it.cavallium.dbengine.database.LLDictionary; -import it.cavallium.dbengine.database.LLDictionaryResultType; -import it.cavallium.dbengine.database.LLSnapshot; -import it.cavallium.dbengine.database.LLUtils; +import org.warp.commonutils.functional.CancellableBiConsumer; +import org.warp.commonutils.functional.CancellableBiFunction; +import org.warp.commonutils.functional.ConsumerResult; @NotAtomic public class LLLocalDictionary implements LLDictionary { @@ -217,19 +218,22 @@ public class LLLocalDictionary implements LLDictionary { //todo: implement parallel forEach @Override - public void forEach(@Nullable LLSnapshot snapshot, int parallelism, BiConsumer consumer) { + public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer consumer) { try (RocksIterator iter = db.newIterator(cfh, resolveSnapshot(snapshot))) { iter.seekToFirst(); while (iter.isValid()) { - consumer.accept(iter.key(), iter.value()); + if (consumer.acceptCancellable(iter.key(), iter.value()).isCancelled()) { + return ConsumerResult.cancelNext(); + } iter.next(); } } + return ConsumerResult.result(); } //todo: implement parallel replace @Override - public void replaceAll(int parallelism, boolean replaceKeys, BiFunction> consumer) throws IOException { + public ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction> consumer) throws IOException { try { try (var snapshot = replaceKeys ? db.getSnapshot() : null) { try (RocksIterator iter = db.newIterator(cfh, getReadOptions(snapshot)); @@ -249,21 +253,29 @@ public class LLLocalDictionary implements LLDictionary { while (iter.isValid()) { - var result = consumer.apply(iter.key(), iter.value()); - boolean keyDiffers = !Arrays.equals(iter.key(), result.getKey()); + var result = consumer.applyCancellable(iter.key(), iter.value()); + boolean keyDiffers = !Arrays.equals(iter.key(), result.getValue().getKey()); if (!replaceKeys && keyDiffers) { throw new IOException("Tried to replace a key"); } // put if changed or if keys can be swapped/replaced - if (replaceKeys || !Arrays.equals(iter.value(), result.getValue())) { - writeBatch.put(cfh, result.getKey(), result.getValue()); + if (replaceKeys || !Arrays.equals(iter.value(), result.getValue().getValue())) { + writeBatch.put(cfh, result.getValue().getKey(), result.getValue().getValue()); + } + + if (result.isCancelled()) { + // Cancels and discards the write batch + writeBatch.clear(); + return ConsumerResult.cancelNext(); } iter.next(); } writeBatch.writeToDbAndClose(); + + return ConsumerResult.result(); } finally { db.releaseSnapshot(snapshot); } diff --git a/src/main/java/it/cavallium/dbengine/database/remote/client/LLRemoteDictionary.java b/src/main/java/it/cavallium/dbengine/database/remote/client/LLRemoteDictionary.java index 687fd3e..f01e915 100644 --- a/src/main/java/it/cavallium/dbengine/database/remote/client/LLRemoteDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/remote/client/LLRemoteDictionary.java @@ -24,12 +24,13 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.stream.Collectors; import org.jetbrains.annotations.Nullable; import org.warp.commonutils.concurrency.atomicity.NotAtomic; +import org.warp.commonutils.functional.CancellableBiConsumer; +import org.warp.commonutils.functional.CancellableBiFunction; +import org.warp.commonutils.functional.ConsumerResult; @NotAtomic public class LLRemoteDictionary implements LLDictionary { @@ -155,26 +156,33 @@ public class LLRemoteDictionary implements LLDictionary { } @Override - public void forEach(@Nullable LLSnapshot snapshot, int parallelism, BiConsumer consumer) { + public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer consumer) { try { var request = DictionaryMethodForEachRequest.newBuilder().setDictionaryHandle(handle); if (snapshot != null) { request.setSequenceNumber(snapshot.getSequenceNumber()); } var response = blockingStub.dictionaryMethodForEach(request.build()); - response.forEachRemaining((entry) -> { + while (response.hasNext()) { + var entry = response.next(); var key = entry.getKey().toByteArray(); var value = entry.getValue().toByteArray(); - consumer.accept(key, value); - }); + var cancelled = consumer.acceptCancellable(key, value); + if (cancelled.isCancelled()) { + return ConsumerResult.cancelNext(); + } + } + return ConsumerResult.result(); } catch (StatusRuntimeException ex) { throw new IOError(ex); } } @Override - public void replaceAll(int parallelism, boolean replaceKeys, BiFunction> consumer) throws IOException { + public ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction> consumer) throws IOException { try { + //todo: reimplement remote replaceAll using writeBatch + //todo: implement cancellation during iteration var response = blockingStub .dictionaryMethodReplaceAll(DictionaryMethodReplaceAllRequest.newBuilder() .setDictionaryHandle(handle) @@ -183,18 +191,19 @@ public class LLRemoteDictionary implements LLDictionary { response.forEachRemaining((entry) -> { var key = entry.getKey().toByteArray(); var value = entry.getValue().toByteArray(); - var singleResponse = consumer.apply(key, value); + var singleResponse = consumer.applyCancellable(key, value); boolean keyDiffers = false; - if (!Arrays.equals(key, singleResponse.getKey())) { + if (!Arrays.equals(key, singleResponse.getValue().getKey())) { remove_(key, LLDictionaryResultType.VOID); keyDiffers = true; } // put if changed - if (keyDiffers || !Arrays.equals(value, singleResponse.getValue())) { - put_(singleResponse.getKey(), singleResponse.getValue(), LLDictionaryResultType.VOID); + if (keyDiffers || !Arrays.equals(value, singleResponse.getValue().getValue())) { + put_(singleResponse.getValue().getKey(), singleResponse.getValue().getValue(), LLDictionaryResultType.VOID); } }); + return ConsumerResult.result(); } catch (StatusRuntimeException ex) { throw new IOException(ex); } diff --git a/src/main/java/it/cavallium/dbengine/database/remote/server/DbServerFunctions.java b/src/main/java/it/cavallium/dbengine/database/remote/server/DbServerFunctions.java index cdd9f83..b2bd2f5 100644 --- a/src/main/java/it/cavallium/dbengine/database/remote/server/DbServerFunctions.java +++ b/src/main/java/it/cavallium/dbengine/database/remote/server/DbServerFunctions.java @@ -80,6 +80,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import org.warp.commonutils.functional.ConsumerResult; public class DbServerFunctions extends CavalliumDBEngineServiceGrpc.CavalliumDBEngineServiceImplBase { @@ -562,6 +563,7 @@ public class DbServerFunctions extends CavalliumDBEngineServiceGrpc.CavalliumDBE response.setKey(ByteString.copyFrom(key)); response.setValue(ByteString.copyFrom(val)); responseObserver.onNext(response.build()); + return ConsumerResult.result(); }); responseObserver.onCompleted(); } diff --git a/src/main/java/it/cavallium/dbengine/database/structures/LLDeepMap.java b/src/main/java/it/cavallium/dbengine/database/structures/LLDeepMap.java index fd6088c..7587aad 100644 --- a/src/main/java/it/cavallium/dbengine/database/structures/LLDeepMap.java +++ b/src/main/java/it/cavallium/dbengine/database/structures/LLDeepMap.java @@ -1,24 +1,25 @@ package it.cavallium.dbengine.database.structures; +import it.cavallium.dbengine.database.LLDeepDictionary; +import it.cavallium.dbengine.database.LLDictionaryResultType; +import it.cavallium.dbengine.database.LLKeyValueDatabaseStructure; +import it.cavallium.dbengine.database.LLSnapshot; import java.io.IOException; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.StringJoiner; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; import java.util.function.Consumer; import org.apache.commons.lang3.tuple.ImmutableTriple; import org.jetbrains.annotations.Nullable; -import org.warp.commonutils.functional.TriConsumer; -import org.warp.commonutils.functional.TriFunction; +import org.warp.commonutils.functional.CancellableBiConsumer; +import org.warp.commonutils.functional.CancellableBiFunction; +import org.warp.commonutils.functional.CancellableTriConsumer; +import org.warp.commonutils.functional.CancellableTriFunction; +import org.warp.commonutils.functional.ConsumerResult; import org.warp.commonutils.type.Bytes; import org.warp.commonutils.type.UnmodifiableIterableMap; import org.warp.commonutils.type.UnmodifiableMap; -import it.cavallium.dbengine.database.LLDeepDictionary; -import it.cavallium.dbengine.database.LLDictionaryResultType; -import it.cavallium.dbengine.database.LLKeyValueDatabaseStructure; -import it.cavallium.dbengine.database.LLSnapshot; public class LLDeepMap implements LLKeyValueDatabaseStructure { @@ -79,27 +80,27 @@ public class LLDeepMap implements LLKeyValueDatabaseStructure { return dictionary.remove(key1, key2, resultType.getDictionaryResultType()); } - public void forEach(@Nullable LLSnapshot snapshot, int parallelism, BiConsumer> consumer) { - dictionary.forEach(snapshot, parallelism, consumer); + public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer> consumer) { + return dictionary.forEach(snapshot, parallelism, consumer); } - public void forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key1, BiConsumer consumer) { - dictionary.forEach(snapshot, parallelism, key1, consumer); + public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key1, CancellableBiConsumer consumer) { + return dictionary.forEach(snapshot, parallelism, key1, consumer); } - public void replaceAll(int parallelism, boolean replaceKeys, BiFunction, Entry>> consumer) throws IOException { + public void replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction, Entry>> consumer) throws IOException { dictionary.replaceAll(parallelism, replaceKeys, consumer); } - public void replaceAll(int parallelism, boolean replaceKeys, byte[] key1, BiFunction> consumer) throws IOException { + public void replaceAll(int parallelism, boolean replaceKeys, byte[] key1, CancellableBiFunction> consumer) throws IOException { dictionary.replaceAll(parallelism, replaceKeys, key1, consumer); } - public void forEach(@Nullable LLSnapshot snapshot, int parallelism, TriConsumer consumer) { - dictionary.forEach(snapshot, parallelism, consumer); + public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableTriConsumer consumer) { + return dictionary.forEach(snapshot, parallelism, consumer); } - public void replaceAll(int parallelism, boolean replaceKeys, TriFunction> consumer) throws IOException { + public void replaceAll(int parallelism, boolean replaceKeys, CancellableTriFunction> consumer) throws IOException { dictionary.replaceAll(parallelism, replaceKeys, consumer); } diff --git a/src/main/java/it/cavallium/dbengine/database/structures/LLFixedDeepSet.java b/src/main/java/it/cavallium/dbengine/database/structures/LLFixedDeepSet.java index 0a269b7..e31b2a5 100644 --- a/src/main/java/it/cavallium/dbengine/database/structures/LLFixedDeepSet.java +++ b/src/main/java/it/cavallium/dbengine/database/structures/LLFixedDeepSet.java @@ -12,11 +12,12 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.StringJoiner; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.Consumer; -import java.util.function.Function; import org.jetbrains.annotations.Nullable; +import org.warp.commonutils.functional.CancellableBiConsumer; +import org.warp.commonutils.functional.CancellableBiFunction; +import org.warp.commonutils.functional.CancellableConsumer; +import org.warp.commonutils.functional.CancellableFunction; +import org.warp.commonutils.functional.ConsumerResult; import org.warp.commonutils.type.Bytes; import org.warp.commonutils.type.UnmodifiableIterableMap; import org.warp.commonutils.type.UnmodifiableIterableSet; @@ -112,26 +113,26 @@ public class LLFixedDeepSet implements LLKeyValueDatabaseStructure { return false; } - public void forEach(@Nullable LLSnapshot snapshot, int parallelism, BiConsumer> consumer) { - dictionary.forEach(snapshot, parallelism, (key1, entries) -> consumer.accept(key1, entries.toUnmodifiableIterableKeysSet(byte[][]::new))); + public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer> consumer) { + return dictionary.forEach(snapshot, parallelism, (key1, entries) -> consumer.acceptCancellable(key1, entries.toUnmodifiableIterableKeysSet(byte[][]::new))); } - public void forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key1, Consumer consumer) { - dictionary.forEach(snapshot, parallelism, key1, (value, empty) -> consumer.accept(value)); + public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key1, CancellableConsumer consumer) { + return dictionary.forEach(snapshot, parallelism, key1, (value, empty) -> consumer.acceptCancellable(value)); } - public void replaceAll(int parallelism, BiFunction, Entry>> consumer) throws IOException { + public void replaceAll(int parallelism, CancellableBiFunction, Entry>> consumer) throws IOException { dictionary.replaceAll(parallelism, true, (key1, entries) -> { - var result = consumer.apply(key1, entries.toUnmodifiableIterableKeysSet(byte[][]::new)); - var resultItems = result.getValue().toArray(Bytes[]::new); - return Map.entry(result.getKey(), UnmodifiableMap.of(resultItems, generateEmptyArray(resultItems.length))); + var result = consumer.applyCancellable(key1, entries.toUnmodifiableIterableKeysSet(byte[][]::new)); + var resultItems = result.getValue().getValue().toArray(Bytes[]::new); + return result.copyStatusWith(Map.entry(result.getValue().getKey(), UnmodifiableMap.of(resultItems, generateEmptyArray(resultItems.length)))); }); } - public void replaceAll(int parallelism, byte[] key1, Function consumer) throws IOException { + public void replaceAll(int parallelism, byte[] key1, CancellableFunction consumer) throws IOException { dictionary.replaceAll(parallelism, true, key1, (value, empty) -> { - var changedValue = consumer.apply(value); - return Map.entry(changedValue, EMPTY_VALUE); + var changedValue = consumer.applyCancellable(value); + return changedValue.copyStatusWith(Map.entry(changedValue.getValue(), EMPTY_VALUE)); }); } diff --git a/src/main/java/it/cavallium/dbengine/database/structures/LLMap.java b/src/main/java/it/cavallium/dbengine/database/structures/LLMap.java index c2bbb10..dae889e 100644 --- a/src/main/java/it/cavallium/dbengine/database/structures/LLMap.java +++ b/src/main/java/it/cavallium/dbengine/database/structures/LLMap.java @@ -9,10 +9,11 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.StringJoiner; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; import java.util.function.Consumer; import org.jetbrains.annotations.Nullable; +import org.warp.commonutils.functional.CancellableBiConsumer; +import org.warp.commonutils.functional.CancellableBiFunction; +import org.warp.commonutils.functional.ConsumerResult; public class LLMap implements LLKeyValueDatabaseStructure { @@ -56,15 +57,15 @@ public class LLMap implements LLKeyValueDatabaseStructure { /** * The consumer can be called from different threads */ - public void forEach(@Nullable LLSnapshot snapshot, int parallelism, BiConsumer consumer) { - dictionary.forEach(snapshot, parallelism, consumer); + public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer consumer) { + return dictionary.forEach(snapshot, parallelism, consumer); } /** * The consumer can be called from different threads */ - public void replaceAll(int parallelism, boolean replaceKeys, BiFunction> consumer) throws IOException { - dictionary.replaceAll(parallelism, replaceKeys, consumer); + public ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction> consumer) throws IOException { + return dictionary.replaceAll(parallelism, replaceKeys, consumer); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/structures/LLSet.java b/src/main/java/it/cavallium/dbengine/database/structures/LLSet.java index 0df1385..50ac2db 100644 --- a/src/main/java/it/cavallium/dbengine/database/structures/LLSet.java +++ b/src/main/java/it/cavallium/dbengine/database/structures/LLSet.java @@ -9,9 +9,10 @@ import java.io.IOException; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; -import java.util.function.Consumer; -import java.util.function.Function; import org.jetbrains.annotations.Nullable; +import org.warp.commonutils.functional.CancellableConsumer; +import org.warp.commonutils.functional.CancellableFunction; +import org.warp.commonutils.functional.ConsumerResult; public class LLSet implements LLKeyValueDatabaseStructure { @@ -63,12 +64,15 @@ public class LLSet implements LLKeyValueDatabaseStructure { dictionary.clear(); } - public void forEach(@Nullable LLSnapshot snapshot, int parallelism, Consumer consumer) { - dictionary.forEach(snapshot, parallelism, (key, emptyValue) -> consumer.accept(key)); + public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableConsumer consumer) { + return dictionary.forEach(snapshot, parallelism, (key, emptyValue) -> consumer.acceptCancellable(key)); } - public void replaceAll(int parallelism, Function consumer) throws IOException { - dictionary.replaceAll(parallelism, true, (key, emptyValue) -> Map.entry(consumer.apply(key), emptyValue)); + public ConsumerResult replaceAll(int parallelism, CancellableFunction consumer) throws IOException { + return dictionary.replaceAll(parallelism, true, (key, emptyValue) -> { + var result = consumer.applyCancellable(key); + return result.copyStatusWith(Map.entry(result.getValue(), emptyValue)); + }); } public long size(@Nullable LLSnapshot snapshot, boolean fast) throws IOException {