From 4a41699551ad7cf7097fdbc86547679745fd4387 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 20 Oct 2023 17:56:04 +0200 Subject: [PATCH] Handle manager failure --- pom.xml | 2 +- .../it/cavallium/filequeue/QueueToConsumer.java | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 697f42b..7c4f88f 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ it.cavallium filequeue file queue project - 3.1.10 + 3.1.11 jar Light weight, high performance, simple, reliable and persistent queue 4.0.0 diff --git a/src/main/java/it/cavallium/filequeue/QueueToConsumer.java b/src/main/java/it/cavallium/filequeue/QueueToConsumer.java index 1d40472..20f68ce 100644 --- a/src/main/java/it/cavallium/filequeue/QueueToConsumer.java +++ b/src/main/java/it/cavallium/filequeue/QueueToConsumer.java @@ -16,6 +16,7 @@ class QueueToConsumer implements IQueueToConsumer { private final QueueConsumer consumer; private Manager manager; private volatile boolean closed; + private volatile Throwable managerFailure = null; public QueueToConsumer(SimpleQueue queue, QueueConsumer consumer) { this.queue = queue; @@ -29,6 +30,10 @@ class QueueToConsumer implements IQueueToConsumer { } public synchronized void startQueue() { + if (managerFailure != null) { + manager = null; + managerFailure = null; + } if (manager == null) { this.manager = new Manager(); manager.setName(getQueueManagerName()); @@ -38,6 +43,7 @@ class QueueToConsumer implements IQueueToConsumer { @Override public void add(T value) { + checkManagerStatus(); boolean shouldAdd = true; if (preAddQueued.getAndIncrement() == 0) { boolean crashed = true; @@ -63,6 +69,12 @@ class QueueToConsumer implements IQueueToConsumer { } } + private void checkManagerStatus() { + if (managerFailure != null) { + throw new RuntimeException("Can't add an element, the queue manager has failed", managerFailure); + } + } + @Override public void close() { closed = true; @@ -76,6 +88,9 @@ class QueueToConsumer implements IQueueToConsumer { while (!closed) { runStepInternal(true); } + } catch (Throwable ex) { + managerFailure = ex; + throw ex; } finally { closed = true; }