Fixed issue: NETTY-217 If catching InterruptedException, set back interrupt status of the thread

* Handle interruption state properly as advised
This commit is contained in:
Trustin Lee 2009-09-04 04:21:56 +00:00
parent 219647385a
commit 16124dc14c
10 changed files with 162 additions and 69 deletions

View File

@ -229,16 +229,21 @@ public class ClientBootstrap extends Bootstrap {
// Wait until the future is available.
ChannelFuture future = null;
boolean interrupted = false;
do {
try {
future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// Ignore
interrupted = true;
}
} while (future == null);
pipeline.remove("connector");
if (interrupted) {
Thread.currentThread().interrupt();
}
return future;
}

View File

@ -199,16 +199,21 @@ public class ConnectionlessBootstrap extends Bootstrap {
// Wait until the future is available.
ChannelFuture future = null;
boolean interrupted = false;
do {
try {
future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// Ignore
interrupted = true;
}
} while (future == null);
pipeline.remove("binder");
if (interrupted) {
Thread.currentThread().interrupt();
}
// Wait for the future.
future.awaitUninterruptibly();
if (!future.isSuccess()) {
@ -317,16 +322,21 @@ public class ConnectionlessBootstrap extends Bootstrap {
// Wait until the future is available.
ChannelFuture future = null;
boolean interrupted = false;
do {
try {
future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// Ignore
interrupted = true;
}
} while (future == null);
pipeline.remove("connector");
if (interrupted) {
Thread.currentThread().interrupt();
}
return future;
}

View File

@ -276,14 +276,19 @@ public class ServerBootstrap extends Bootstrap {
// Wait until the future is available.
ChannelFuture future = null;
boolean interrupted = false;
do {
try {
future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// Ignore
interrupted = true;
}
} while (future == null);
if (interrupted) {
Thread.currentThread().interrupt();
}
// Wait for the future.
future.awaitUninterruptibly();
if (!future.isSuccess()) {

View File

@ -63,14 +63,23 @@ public abstract class CompleteChannelFuture implements ChannelFuture {
}
public ChannelFuture await() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
return this;
}
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
return true;
}
public boolean await(long timeoutMillis) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
return true;
}

View File

@ -161,6 +161,10 @@ public class DefaultChannelFuture implements ChannelFuture {
}
public ChannelFuture await() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
synchronized (this) {
while (!done) {
checkDeadLock();
@ -185,6 +189,7 @@ public class DefaultChannelFuture implements ChannelFuture {
}
public ChannelFuture awaitUninterruptibly() {
boolean interrupted = false;
synchronized (this) {
while (!done) {
checkDeadLock();
@ -192,13 +197,17 @@ public class DefaultChannelFuture implements ChannelFuture {
try {
this.wait();
} catch (InterruptedException e) {
// Ignore.
interrupted = true;
} finally {
waiters--;
}
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
return this;
}
@ -219,39 +228,52 @@ public class DefaultChannelFuture implements ChannelFuture {
}
private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
if (interruptable && Thread.interrupted()) {
throw new InterruptedException();
}
long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
long waitTime = timeoutNanos;
boolean interrupted = false;
synchronized (this) {
if (done) {
return done;
} else if (waitTime <= 0) {
return done;
}
checkDeadLock();
waiters++;
try {
for (;;) {
try {
this.wait(waitTime / 1000000, (int) (waitTime % 1000000));
} catch (InterruptedException e) {
if (interruptable) {
throw e;
}
}
if (done) {
return true;
} else {
waitTime = timeoutNanos - (System.nanoTime() - startTime);
if (waitTime <= 0) {
return done;
}
}
try {
synchronized (this) {
if (done) {
return done;
} else if (waitTime <= 0) {
return done;
}
} finally {
waiters--;
checkDeadLock();
waiters++;
try {
for (;;) {
try {
this.wait(waitTime / 1000000, (int) (waitTime % 1000000));
} catch (InterruptedException e) {
if (interruptable) {
throw e;
} else {
interrupted = true;
}
}
if (done) {
return true;
} else {
waitTime = timeoutNanos - (System.nanoTime() - startTime);
if (waitTime <= 0) {
return done;
}
}
}
} finally {
waiters--;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}

View File

@ -202,6 +202,10 @@ public class DefaultChannelGroupFuture implements ChannelGroupFuture {
}
public ChannelGroupFuture await() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
synchronized (this) {
while (!done) {
checkDeadLock();
@ -226,6 +230,7 @@ public class DefaultChannelGroupFuture implements ChannelGroupFuture {
}
public ChannelGroupFuture awaitUninterruptibly() {
boolean interrupted = false;
synchronized (this) {
while (!done) {
checkDeadLock();
@ -233,13 +238,17 @@ public class DefaultChannelGroupFuture implements ChannelGroupFuture {
try {
this.wait();
} catch (InterruptedException e) {
// Ignore.
interrupted = true;
} finally {
waiters--;
}
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
return this;
}
@ -260,39 +269,52 @@ public class DefaultChannelGroupFuture implements ChannelGroupFuture {
}
private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
if (interruptable && Thread.interrupted()) {
throw new InterruptedException();
}
long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
long waitTime = timeoutNanos;
boolean interrupted = false;
synchronized (this) {
if (done) {
return done;
} else if (waitTime <= 0) {
return done;
}
checkDeadLock();
waiters++;
try {
for (;;) {
try {
this.wait(waitTime / 1000000, (int) (waitTime % 1000000));
} catch (InterruptedException e) {
if (interruptable) {
throw e;
}
}
if (done) {
return true;
} else {
waitTime = timeoutNanos - (System.nanoTime() - startTime);
if (waitTime <= 0) {
return done;
}
}
try {
synchronized (this) {
if (done) {
return done;
} else if (waitTime <= 0) {
return done;
}
} finally {
waiters--;
checkDeadLock();
waiters++;
try {
for (;;) {
try {
this.wait(waitTime / 1000000, (int) (waitTime % 1000000));
} catch (InterruptedException e) {
if (interruptable) {
throw e;
} else {
interrupted = true;
}
}
if (done) {
return true;
} else {
waitTime = timeoutNanos - (System.nanoTime() - startTime);
if (waitTime <= 0) {
return done;
}
}
}
} finally {
waiters--;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}

View File

@ -63,11 +63,16 @@ public class FactorialClientHandler extends SimpleChannelUpstreamHandler {
}
public BigInteger getFactorial() {
boolean interrupted = false;
for (;;) {
try {
return answer.take();
BigInteger factorial = answer.take();
if (interrupted) {
Thread.currentThread().interrupt();
}
return factorial;
} catch (InterruptedException e) {
// Ignore.
interrupted = true;
}
}
}

View File

@ -67,15 +67,20 @@ public class LocalTimeClientHandler extends SimpleChannelUpstreamHandler {
channel.write(builder.build());
LocalTimes localTimes;
boolean interrupted = false;
for (;;) {
try {
localTimes = answer.take();
break;
} catch (InterruptedException e) {
// Ignore.
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
List<String> result = new ArrayList<String>();
for (LocalTime lt: localTimes.getLocalTimeList()) {
result.add(

View File

@ -275,15 +275,20 @@ public class HashedWheelTimer implements Timer {
return Collections.emptySet();
}
boolean interrupted = false;
while (workerThread.isAlive()) {
workerThread.interrupt();
try {
workerThread.join(100);
} catch (InterruptedException e) {
// Ignore
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
activeInstances.decrementAndGet();
Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();

View File

@ -58,6 +58,7 @@ public class ExecutorUtil {
executorsCopy[i] = executors[i];
}
boolean interrupted = false;
for (Executor e: executorsCopy) {
if (!(e instanceof ExecutorService)) {
continue;
@ -87,10 +88,14 @@ public class ExecutorUtil {
break;
}
} catch (InterruptedException ex) {
// Ignore.
interrupted = true;
}
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
}
private ExecutorUtil() {