diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/OioWorker.java b/src/main/java/org/jboss/netty/channel/socket/oio/OioWorker.java index 0dda137b52..6893642d81 100644 --- a/src/main/java/org/jboss/netty/channel/socket/oio/OioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/oio/OioWorker.java @@ -44,10 +44,12 @@ class OioWorker implements Runnable { channel.workerThread = Thread.currentThread(); final PushbackInputStream in = channel.getInputStream(); - for (;;) { + while (channel.isOpen()) { synchronized (this) { while (!channel.isReadable()) { try { + // notify() is not called at all. + // close() and setInterestOps() calls Thread.interrupt() this.wait(); } catch (InterruptedException e) { if (!channel.isOpen()) { @@ -83,10 +85,17 @@ class OioWorker implements Runnable { if (readBytes == buf.length) { buffer = ChannelBuffers.wrappedBuffer(buf); } else { + // A rare case, but it sometimes happen. buffer = ChannelBuffers.wrappedBuffer(buf, 0, readBytes); } fireMessageReceived(channel, buffer); } + + // Setting the workerThread to null will prevent any channel + // operations from interrupting this thread from now on. + channel.workerThread = null; + + // Clean up. close(channel, channel.getSucceededFuture()); } @@ -122,7 +131,7 @@ class OioWorker implements Runnable { future.setSuccess(); if (changed) { - // Notify the worker so it stops reading. + // Notify the worker so it stops or continues reading. Thread currentThread = Thread.currentThread(); Thread workerThread = channel.workerThread; if (workerThread != null && currentThread != workerThread) { @@ -146,6 +155,13 @@ class OioWorker implements Runnable { future.setSuccess(); if (channel.setClosed()) { if (connected) { + // Notify the worker so it stops reading. + Thread currentThread = Thread.currentThread(); + Thread workerThread = channel.workerThread; + if (workerThread != null && currentThread != workerThread) { + workerThread.interrupt(); + } + if (channel.getInterestOps() != Channel.OP_WRITE) { channel.setInterestOpsNow(Channel.OP_WRITE); fireChannelInterestChanged(channel, Channel.OP_WRITE);