Describe errors

This commit is contained in:
Andrea Cavalli 2021-03-04 22:01:50 +01:00
parent e3fcf7f74f
commit 2a8bec00d4
5 changed files with 37 additions and 29 deletions

View File

@ -143,14 +143,14 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
/** /**
* Value getter doesn't lock data. Please make sure to lock before getting data. * Value getter doesn't lock data. Please make sure to lock before getting data.
*/ */
default ValueGetterBlocking<T, U> getDbValueGetter() { default ValueGetterBlocking<T, U> getDbValueGetter(@Nullable CompositeSnapshot snapshot) {
return k -> getValue(null, k).block(); return k -> getValue(snapshot, k).block();
} }
/** /**
* Value getter doesn't lock data. Please make sure to lock before getting data. * Value getter doesn't lock data. Please make sure to lock before getting data.
*/ */
default ValueGetter<T, U> getAsyncDbValueGetter() { default ValueGetter<T, U> getAsyncDbValueGetter(@Nullable CompositeSnapshot snapshot) {
return k -> getValue(null, k); return k -> getValue(snapshot, k);
} }
} }

View File

@ -151,7 +151,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
} }
}) })
.onErrorMap(IOException::new) .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(key), cause))
.subscribeOn(dbScheduler); .subscribeOn(dbScheduler);
} }
@ -185,7 +185,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
} }
}) })
.onErrorMap(IOException::new) .onErrorMap(cause -> new IOException("Failed to read range " + range.toString(), cause))
.subscribeOn(dbScheduler); .subscribeOn(dbScheduler);
} }
@ -219,7 +219,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
} }
}) })
.onErrorMap(IOException::new) .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(key), cause))
.subscribeOn(dbScheduler); .subscribeOn(dbScheduler);
} }
@ -248,7 +248,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
} }
}) })
.onErrorMap(IOException::new) .onErrorMap(cause -> new IOException("Failed to write " + Arrays.toString(key), cause))
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.then(Mono.empty()) .then(Mono.empty())
).singleOrEmpty(); ).singleOrEmpty();
@ -326,7 +326,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
} }
}) })
.onErrorMap(IOException::new) .onErrorMap(cause -> new IOException("Failed to read or write " + Arrays.toString(key), cause))
.subscribeOn(dbScheduler); .subscribeOn(dbScheduler);
} }
@ -354,7 +354,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
} }
}) })
.onErrorMap(IOException::new) .onErrorMap(cause -> new IOException("Failed to delete " + Arrays.toString(key), cause))
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.then(Mono.empty()) .then(Mono.empty())
).singleOrEmpty(); ).singleOrEmpty();
@ -396,7 +396,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
} }
}) })
.onErrorMap(IOException::new) .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(key), cause))
.subscribeOn(dbScheduler); .subscribeOn(dbScheduler);
case VOID: case VOID:
return Mono.empty(); return Mono.empty();
@ -452,9 +452,10 @@ public class LLLocalDictionary implements LLDictionary {
}) })
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.flatMapMany(Flux::fromIterable) .flatMapMany(Flux::fromIterable)
.onErrorMap(cause -> new IOException("Failed to read keys "
+ Arrays.deepToString(keysWindow.toArray(byte[][]::new)), cause))
) )
) );
.onErrorMap(IOException::new);
} }
@Override @Override
@ -688,8 +689,9 @@ public class LLLocalDictionary implements LLDictionary {
synchronized (writeBatch) { synchronized (writeBatch) {
writeBatch.close(); writeBatch.close();
} }
})) })
.onErrorMap(IOException::new); )
.onErrorMap(cause -> new IOException("Failed to write range", cause));
} }
}); });
} }
@ -722,7 +724,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
return null; return null;
}) })
.onErrorMap(IOException::new) .onErrorMap(cause -> new IOException("Failed to clear", cause))
.subscribeOn(dbScheduler); .subscribeOn(dbScheduler);
} }
@ -760,7 +762,8 @@ public class LLLocalDictionary implements LLDictionary {
return i; return i;
} }
}) })
.onErrorMap(IOException::new) .onErrorMap(cause -> new IOException("Failed to get size of range "
+ range.toString(), cause))
.subscribeOn(dbScheduler); .subscribeOn(dbScheduler);
} }
}); });
@ -873,7 +876,7 @@ public class LLLocalDictionary implements LLDictionary {
return Map.entry(key, value); return Map.entry(key, value);
} }
}) })
.onErrorMap(IOException::new) .onErrorMap(cause -> new IOException("Failed to delete " + range.toString(), cause))
.subscribeOn(dbScheduler); .subscribeOn(dbScheduler);
} }
} }

View File

@ -294,7 +294,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
dbScheduler, dbScheduler,
defaultValue defaultValue
)) ))
.onErrorMap(IOException::new) .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause))
.subscribeOn(dbScheduler); .subscribeOn(dbScheduler);
} }
@ -314,7 +314,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
@Override @Override
public Mono<Long> getProperty(String propertyName) { public Mono<Long> getProperty(String propertyName) {
return Mono.fromCallable(() -> db.getAggregatedLongProperty(propertyName)) return Mono.fromCallable(() -> db.getAggregatedLongProperty(propertyName))
.onErrorMap(IOException::new) .onErrorMap(cause -> new IOException("Failed to read " + propertyName, cause))
.subscribeOn(dbScheduler); .subscribeOn(dbScheduler);
} }
@ -356,7 +356,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
} }
return null; return null;
}) })
.onErrorMap(IOException::new) .onErrorMap(cause -> new IOException("Failed to close", cause))
.subscribeOn(dbScheduler); .subscribeOn(dbScheduler);
} }

View File

@ -93,10 +93,12 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
Schedulers.newBoundedElastic(1, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "lucene-low-memory", Integer.MAX_VALUE))::get; Schedulers.newBoundedElastic(1, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "lucene-low-memory", Integer.MAX_VALUE))::get;
private static final Supplier<Scheduler> querySchedulerSupplier = Suppliers.memoize(() -> boundedSchedulerSupplier.apply("query"))::get; private static final Supplier<Scheduler> querySchedulerSupplier = Suppliers.memoize(() -> boundedSchedulerSupplier.apply("query"))::get;
private static final Supplier<Scheduler> blockingSchedulerSupplier = Suppliers.memoize(() -> boundedSchedulerSupplier.apply("blocking"))::get; private static final Supplier<Scheduler> blockingSchedulerSupplier = Suppliers.memoize(() -> boundedSchedulerSupplier.apply("blocking"))::get;
private static final Supplier<Scheduler> blockingLuceneSearchSchedulerSupplier = Suppliers.memoize(() -> boundedSchedulerSupplier.apply("search-blocking"))::get;
/** /**
* Lucene query scheduler. * Lucene query scheduler.
*/ */
private final Scheduler luceneQueryScheduler; private final Scheduler luceneQueryScheduler;
private final Scheduler blockingLuceneSearchScheduler;
private final String luceneIndexName; private final String luceneIndexName;
private final SnapshotDeletionPolicy snapshotter; private final SnapshotDeletionPolicy snapshotter;
@ -155,6 +157,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
this.luceneBlockingScheduler = blockingSchedulerSupplier.get(); this.luceneBlockingScheduler = blockingSchedulerSupplier.get();
this.luceneQueryScheduler = querySchedulerSupplier.get(); this.luceneQueryScheduler = querySchedulerSupplier.get();
} }
this.blockingLuceneSearchScheduler = blockingLuceneSearchSchedulerSupplier.get();
// Create scheduled tasks lifecycle manager // Create scheduled tasks lifecycle manager
this.scheduledTasksLifecycle = new ScheduledTaskLifecycle(); this.scheduledTasksLifecycle = new ScheduledTaskLifecycle();
@ -565,7 +568,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}); });
try { try {
luceneQueryScheduler.schedule(() -> { blockingLuceneSearchScheduler.schedule(() -> {
try { try {
if (!cancelled.get()) { if (!cancelled.get()) {
if (doDistributedPre) { if (doDistributedPre) {
@ -585,13 +588,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
if (cancelled.get()) { if (cancelled.get()) {
return HandleResult.HALT; return HandleResult.HALT;
} }
while (requests.get() <= 0) { while (requests.decrementAndGet() < 0) {
requests.incrementAndGet();
requestsAvailable.acquire(); requestsAvailable.acquire();
if (cancelled.get()) { if (cancelled.get()) {
return HandleResult.HALT; return HandleResult.HALT;
} }
} }
requests.decrementAndGet();
sink.next(fixKeyScore(keyScore, scoreDivisor)); sink.next(fixKeyScore(keyScore, scoreDivisor));
return HandleResult.CONTINUE; return HandleResult.CONTINUE;
} catch (Exception ex) { } catch (Exception ex) {
@ -606,13 +609,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
if (cancelled.get()) { if (cancelled.get()) {
return; return;
} }
while (requests.get() <= 0) { while (requests.decrementAndGet() < 0) {
requests.incrementAndGet();
requestsAvailable.acquire(); requestsAvailable.acquire();
if (cancelled.get()) { if (cancelled.get()) {
return; return;
} }
} }
requests.decrementAndGet();
sink.next(new LLTotalHitsCount(totalHitsCount)); sink.next(new LLTotalHitsCount(totalHitsCount));
} catch (Exception ex) { } catch (Exception ex) {
sink.error(ex); sink.error(ex);
@ -631,13 +634,14 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
} catch (Exception ex) { } catch (Exception ex) {
sink.error(ex); sink.error(ex);
} }
}, OverflowStrategy.ERROR).subscribeOn(Schedulers.boundedElastic())))); }, OverflowStrategy.BUFFER).subscribeOn(luceneQueryScheduler))));
} }
@Override @Override
public Mono<Void> close() { public Mono<Void> close() {
return Mono return Mono
.<Void>fromCallable(() -> { .<Void>fromCallable(() -> {
this.blockingLuceneSearchScheduler.dispose();
scheduledTasksLifecycle.cancelAndWait(); scheduledTasksLifecycle.cancelAndWait();
//noinspection BlockingMethodInNonBlockingContext //noinspection BlockingMethodInNonBlockingContext
indexWriter.close(); indexWriter.close();

View File

@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLSnapshot;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.function.Function; import java.util.function.Function;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyHandle;
@ -52,7 +53,7 @@ public class LLLocalSingleton implements LLSingleton {
public Mono<byte[]> get(@Nullable LLSnapshot snapshot) { public Mono<byte[]> get(@Nullable LLSnapshot snapshot) {
return Mono return Mono
.fromCallable(() -> db.get(cfh, resolveSnapshot(snapshot), name)) .fromCallable(() -> db.get(cfh, resolveSnapshot(snapshot), name))
.onErrorMap(IOException::new) .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause))
.subscribeOn(dbScheduler); .subscribeOn(dbScheduler);
} }
@ -63,7 +64,7 @@ public class LLLocalSingleton implements LLSingleton {
db.put(cfh, name, value); db.put(cfh, name, value);
return null; return null;
}) })
.onErrorMap(IOException::new) .onErrorMap(cause -> new IOException("Failed to write " + Arrays.toString(name), cause))
.subscribeOn(dbScheduler); .subscribeOn(dbScheduler);
} }