Handle manager failure
This commit is contained in:
parent
fd5764d30d
commit
4a41699551
2
pom.xml
2
pom.xml
|
@ -4,7 +4,7 @@
|
||||||
<groupId>it.cavallium</groupId>
|
<groupId>it.cavallium</groupId>
|
||||||
<artifactId>filequeue</artifactId>
|
<artifactId>filequeue</artifactId>
|
||||||
<name>file queue project</name>
|
<name>file queue project</name>
|
||||||
<version>3.1.10</version>
|
<version>3.1.11</version>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
<description>Light weight, high performance, simple, reliable and persistent queue</description>
|
<description>Light weight, high performance, simple, reliable and persistent queue</description>
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
|
@ -16,6 +16,7 @@ class QueueToConsumer<T> implements IQueueToConsumer<T> {
|
||||||
private final QueueConsumer<T> consumer;
|
private final QueueConsumer<T> consumer;
|
||||||
private Manager manager;
|
private Manager manager;
|
||||||
private volatile boolean closed;
|
private volatile boolean closed;
|
||||||
|
private volatile Throwable managerFailure = null;
|
||||||
|
|
||||||
public QueueToConsumer(SimpleQueue<T> queue, QueueConsumer<T> consumer) {
|
public QueueToConsumer(SimpleQueue<T> queue, QueueConsumer<T> consumer) {
|
||||||
this.queue = queue;
|
this.queue = queue;
|
||||||
|
@ -29,6 +30,10 @@ class QueueToConsumer<T> implements IQueueToConsumer<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void startQueue() {
|
public synchronized void startQueue() {
|
||||||
|
if (managerFailure != null) {
|
||||||
|
manager = null;
|
||||||
|
managerFailure = null;
|
||||||
|
}
|
||||||
if (manager == null) {
|
if (manager == null) {
|
||||||
this.manager = new Manager();
|
this.manager = new Manager();
|
||||||
manager.setName(getQueueManagerName());
|
manager.setName(getQueueManagerName());
|
||||||
|
@ -38,6 +43,7 @@ class QueueToConsumer<T> implements IQueueToConsumer<T> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void add(T value) {
|
public void add(T value) {
|
||||||
|
checkManagerStatus();
|
||||||
boolean shouldAdd = true;
|
boolean shouldAdd = true;
|
||||||
if (preAddQueued.getAndIncrement() == 0) {
|
if (preAddQueued.getAndIncrement() == 0) {
|
||||||
boolean crashed = true;
|
boolean crashed = true;
|
||||||
|
@ -63,6 +69,12 @@ class QueueToConsumer<T> implements IQueueToConsumer<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void checkManagerStatus() {
|
||||||
|
if (managerFailure != null) {
|
||||||
|
throw new RuntimeException("Can't add an element, the queue manager has failed", managerFailure);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
closed = true;
|
closed = true;
|
||||||
|
@ -76,6 +88,9 @@ class QueueToConsumer<T> implements IQueueToConsumer<T> {
|
||||||
while (!closed) {
|
while (!closed) {
|
||||||
runStepInternal(true);
|
runStepInternal(true);
|
||||||
}
|
}
|
||||||
|
} catch (Throwable ex) {
|
||||||
|
managerFailure = ex;
|
||||||
|
throw ex;
|
||||||
} finally {
|
} finally {
|
||||||
closed = true;
|
closed = true;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user