Fixed issue: NETTY-26 (Infinite blocking in old blocking I/O transport.)
* Added a proper interrupt() call in OioWorker.close() to wake the worker thread up
This commit is contained in:
parent
b4c6d82be7
commit
a0d9a59206
@ -44,10 +44,12 @@ class OioWorker implements Runnable {
|
||||
channel.workerThread = Thread.currentThread();
|
||||
final PushbackInputStream in = channel.getInputStream();
|
||||
|
||||
for (;;) {
|
||||
while (channel.isOpen()) {
|
||||
synchronized (this) {
|
||||
while (!channel.isReadable()) {
|
||||
try {
|
||||
// notify() is not called at all.
|
||||
// close() and setInterestOps() calls Thread.interrupt()
|
||||
this.wait();
|
||||
} catch (InterruptedException e) {
|
||||
if (!channel.isOpen()) {
|
||||
@ -83,10 +85,17 @@ class OioWorker implements Runnable {
|
||||
if (readBytes == buf.length) {
|
||||
buffer = ChannelBuffers.wrappedBuffer(buf);
|
||||
} else {
|
||||
// A rare case, but it sometimes happen.
|
||||
buffer = ChannelBuffers.wrappedBuffer(buf, 0, readBytes);
|
||||
}
|
||||
fireMessageReceived(channel, buffer);
|
||||
}
|
||||
|
||||
// Setting the workerThread to null will prevent any channel
|
||||
// operations from interrupting this thread from now on.
|
||||
channel.workerThread = null;
|
||||
|
||||
// Clean up.
|
||||
close(channel, channel.getSucceededFuture());
|
||||
}
|
||||
|
||||
@ -122,7 +131,7 @@ class OioWorker implements Runnable {
|
||||
|
||||
future.setSuccess();
|
||||
if (changed) {
|
||||
// Notify the worker so it stops reading.
|
||||
// Notify the worker so it stops or continues reading.
|
||||
Thread currentThread = Thread.currentThread();
|
||||
Thread workerThread = channel.workerThread;
|
||||
if (workerThread != null && currentThread != workerThread) {
|
||||
@ -146,6 +155,13 @@ class OioWorker implements Runnable {
|
||||
future.setSuccess();
|
||||
if (channel.setClosed()) {
|
||||
if (connected) {
|
||||
// Notify the worker so it stops reading.
|
||||
Thread currentThread = Thread.currentThread();
|
||||
Thread workerThread = channel.workerThread;
|
||||
if (workerThread != null && currentThread != workerThread) {
|
||||
workerThread.interrupt();
|
||||
}
|
||||
|
||||
if (channel.getInterestOps() != Channel.OP_WRITE) {
|
||||
channel.setInterestOpsNow(Channel.OP_WRITE);
|
||||
fireChannelInterestChanged(channel, Channel.OP_WRITE);
|
||||
|
Loading…
Reference in New Issue
Block a user