From 2a8bec00d485d51af16d775f049997e9469c8936 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 4 Mar 2021 22:01:50 +0100 Subject: [PATCH] Describe errors --- .../collections/DatabaseStageMap.java | 8 ++--- .../database/disk/LLLocalDictionary.java | 31 ++++++++++--------- .../disk/LLLocalKeyValueDatabase.java | 6 ++-- .../database/disk/LLLocalLuceneIndex.java | 16 ++++++---- .../database/disk/LLLocalSingleton.java | 5 +-- 5 files changed, 37 insertions(+), 29 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index cfd759a..5bdcff1 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -143,14 +143,14 @@ public interface DatabaseStageMap> extends Dat /** * Value getter doesn't lock data. Please make sure to lock before getting data. */ - default ValueGetterBlocking getDbValueGetter() { - return k -> getValue(null, k).block(); + default ValueGetterBlocking getDbValueGetter(@Nullable CompositeSnapshot snapshot) { + return k -> getValue(snapshot, k).block(); } /** * Value getter doesn't lock data. Please make sure to lock before getting data. */ - default ValueGetter getAsyncDbValueGetter() { - return k -> getValue(null, k); + default ValueGetter getAsyncDbValueGetter(@Nullable CompositeSnapshot snapshot) { + return k -> getValue(snapshot, k); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 6f92473..7bb5480 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -151,7 +151,7 @@ public class LLLocalDictionary implements LLDictionary { } } }) - .onErrorMap(IOException::new) + .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(key), cause)) .subscribeOn(dbScheduler); } @@ -185,7 +185,7 @@ public class LLLocalDictionary implements LLDictionary { } } }) - .onErrorMap(IOException::new) + .onErrorMap(cause -> new IOException("Failed to read range " + range.toString(), cause)) .subscribeOn(dbScheduler); } @@ -219,7 +219,7 @@ public class LLLocalDictionary implements LLDictionary { } } }) - .onErrorMap(IOException::new) + .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(key), cause)) .subscribeOn(dbScheduler); } @@ -248,7 +248,7 @@ public class LLLocalDictionary implements LLDictionary { } } }) - .onErrorMap(IOException::new) + .onErrorMap(cause -> new IOException("Failed to write " + Arrays.toString(key), cause)) .subscribeOn(dbScheduler) .then(Mono.empty()) ).singleOrEmpty(); @@ -326,7 +326,7 @@ public class LLLocalDictionary implements LLDictionary { } } }) - .onErrorMap(IOException::new) + .onErrorMap(cause -> new IOException("Failed to read or write " + Arrays.toString(key), cause)) .subscribeOn(dbScheduler); } @@ -354,7 +354,7 @@ public class LLLocalDictionary implements LLDictionary { } } }) - .onErrorMap(IOException::new) + .onErrorMap(cause -> new IOException("Failed to delete " + Arrays.toString(key), cause)) .subscribeOn(dbScheduler) .then(Mono.empty()) ).singleOrEmpty(); @@ -396,7 +396,7 @@ public class LLLocalDictionary implements LLDictionary { } } }) - .onErrorMap(IOException::new) + .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(key), cause)) .subscribeOn(dbScheduler); case VOID: return Mono.empty(); @@ -452,9 +452,10 @@ public class LLLocalDictionary implements LLDictionary { }) .subscribeOn(dbScheduler) .flatMapMany(Flux::fromIterable) + .onErrorMap(cause -> new IOException("Failed to read keys " + + Arrays.deepToString(keysWindow.toArray(byte[][]::new)), cause)) ) - ) - .onErrorMap(IOException::new); + ); } @Override @@ -688,8 +689,9 @@ public class LLLocalDictionary implements LLDictionary { synchronized (writeBatch) { writeBatch.close(); } - })) - .onErrorMap(IOException::new); + }) + ) + .onErrorMap(cause -> new IOException("Failed to write range", cause)); } }); } @@ -722,7 +724,7 @@ public class LLLocalDictionary implements LLDictionary { } return null; }) - .onErrorMap(IOException::new) + .onErrorMap(cause -> new IOException("Failed to clear", cause)) .subscribeOn(dbScheduler); } @@ -760,7 +762,8 @@ public class LLLocalDictionary implements LLDictionary { return i; } }) - .onErrorMap(IOException::new) + .onErrorMap(cause -> new IOException("Failed to get size of range " + + range.toString(), cause)) .subscribeOn(dbScheduler); } }); @@ -873,7 +876,7 @@ public class LLLocalDictionary implements LLDictionary { return Map.entry(key, value); } }) - .onErrorMap(IOException::new) + .onErrorMap(cause -> new IOException("Failed to delete " + range.toString(), cause)) .subscribeOn(dbScheduler); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 7c33b44..d0ef28c 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -294,7 +294,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { dbScheduler, defaultValue )) - .onErrorMap(IOException::new) + .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause)) .subscribeOn(dbScheduler); } @@ -314,7 +314,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { @Override public Mono getProperty(String propertyName) { return Mono.fromCallable(() -> db.getAggregatedLongProperty(propertyName)) - .onErrorMap(IOException::new) + .onErrorMap(cause -> new IOException("Failed to read " + propertyName, cause)) .subscribeOn(dbScheduler); } @@ -356,7 +356,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } return null; }) - .onErrorMap(IOException::new) + .onErrorMap(cause -> new IOException("Failed to close", cause)) .subscribeOn(dbScheduler); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 4ab9419..d832f7f 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -93,10 +93,12 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { Schedulers.newBoundedElastic(1, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "lucene-low-memory", Integer.MAX_VALUE))::get; private static final Supplier querySchedulerSupplier = Suppliers.memoize(() -> boundedSchedulerSupplier.apply("query"))::get; private static final Supplier blockingSchedulerSupplier = Suppliers.memoize(() -> boundedSchedulerSupplier.apply("blocking"))::get; + private static final Supplier blockingLuceneSearchSchedulerSupplier = Suppliers.memoize(() -> boundedSchedulerSupplier.apply("search-blocking"))::get; /** * Lucene query scheduler. */ private final Scheduler luceneQueryScheduler; + private final Scheduler blockingLuceneSearchScheduler; private final String luceneIndexName; private final SnapshotDeletionPolicy snapshotter; @@ -155,6 +157,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { this.luceneBlockingScheduler = blockingSchedulerSupplier.get(); this.luceneQueryScheduler = querySchedulerSupplier.get(); } + this.blockingLuceneSearchScheduler = blockingLuceneSearchSchedulerSupplier.get(); // Create scheduled tasks lifecycle manager this.scheduledTasksLifecycle = new ScheduledTaskLifecycle(); @@ -565,7 +568,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { }); try { - luceneQueryScheduler.schedule(() -> { + blockingLuceneSearchScheduler.schedule(() -> { try { if (!cancelled.get()) { if (doDistributedPre) { @@ -585,13 +588,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { if (cancelled.get()) { return HandleResult.HALT; } - while (requests.get() <= 0) { + while (requests.decrementAndGet() < 0) { + requests.incrementAndGet(); requestsAvailable.acquire(); if (cancelled.get()) { return HandleResult.HALT; } } - requests.decrementAndGet(); sink.next(fixKeyScore(keyScore, scoreDivisor)); return HandleResult.CONTINUE; } catch (Exception ex) { @@ -606,13 +609,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { if (cancelled.get()) { return; } - while (requests.get() <= 0) { + while (requests.decrementAndGet() < 0) { + requests.incrementAndGet(); requestsAvailable.acquire(); if (cancelled.get()) { return; } } - requests.decrementAndGet(); sink.next(new LLTotalHitsCount(totalHitsCount)); } catch (Exception ex) { sink.error(ex); @@ -631,13 +634,14 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } catch (Exception ex) { sink.error(ex); } - }, OverflowStrategy.ERROR).subscribeOn(Schedulers.boundedElastic())))); + }, OverflowStrategy.BUFFER).subscribeOn(luceneQueryScheduler)))); } @Override public Mono close() { return Mono .fromCallable(() -> { + this.blockingLuceneSearchScheduler.dispose(); scheduledTasksLifecycle.cancelAndWait(); //noinspection BlockingMethodInNonBlockingContext indexWriter.close(); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java index 3c40f37..c0c860a 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java @@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.disk; import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSnapshot; import java.io.IOException; +import java.util.Arrays; import java.util.function.Function; import org.jetbrains.annotations.Nullable; import org.rocksdb.ColumnFamilyHandle; @@ -52,7 +53,7 @@ public class LLLocalSingleton implements LLSingleton { public Mono get(@Nullable LLSnapshot snapshot) { return Mono .fromCallable(() -> db.get(cfh, resolveSnapshot(snapshot), name)) - .onErrorMap(IOException::new) + .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause)) .subscribeOn(dbScheduler); } @@ -63,7 +64,7 @@ public class LLLocalSingleton implements LLSingleton { db.put(cfh, name, value); return null; }) - .onErrorMap(IOException::new) + .onErrorMap(cause -> new IOException("Failed to write " + Arrays.toString(name), cause)) .subscribeOn(dbScheduler); }