diff --git a/pom.xml b/pom.xml
index 697f42b..7c4f88f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
it.cavallium
filequeue
file queue project
- 3.1.10
+ 3.1.11
jar
Light weight, high performance, simple, reliable and persistent queue
4.0.0
diff --git a/src/main/java/it/cavallium/filequeue/QueueToConsumer.java b/src/main/java/it/cavallium/filequeue/QueueToConsumer.java
index 1d40472..20f68ce 100644
--- a/src/main/java/it/cavallium/filequeue/QueueToConsumer.java
+++ b/src/main/java/it/cavallium/filequeue/QueueToConsumer.java
@@ -16,6 +16,7 @@ class QueueToConsumer implements IQueueToConsumer {
private final QueueConsumer consumer;
private Manager manager;
private volatile boolean closed;
+ private volatile Throwable managerFailure = null;
public QueueToConsumer(SimpleQueue queue, QueueConsumer consumer) {
this.queue = queue;
@@ -29,6 +30,10 @@ class QueueToConsumer implements IQueueToConsumer {
}
public synchronized void startQueue() {
+ if (managerFailure != null) {
+ manager = null;
+ managerFailure = null;
+ }
if (manager == null) {
this.manager = new Manager();
manager.setName(getQueueManagerName());
@@ -38,6 +43,7 @@ class QueueToConsumer implements IQueueToConsumer {
@Override
public void add(T value) {
+ checkManagerStatus();
boolean shouldAdd = true;
if (preAddQueued.getAndIncrement() == 0) {
boolean crashed = true;
@@ -63,6 +69,12 @@ class QueueToConsumer implements IQueueToConsumer {
}
}
+ private void checkManagerStatus() {
+ if (managerFailure != null) {
+ throw new RuntimeException("Can't add an element, the queue manager has failed", managerFailure);
+ }
+ }
+
@Override
public void close() {
closed = true;
@@ -76,6 +88,9 @@ class QueueToConsumer implements IQueueToConsumer {
while (!closed) {
runStepInternal(true);
}
+ } catch (Throwable ex) {
+ managerFailure = ex;
+ throw ex;
} finally {
closed = true;
}