Fixed issue: NETTY-167 resumeRead() sometimes does not work in OIO transport.

This commit is contained in:
Trustin Lee 2009-06-05 07:30:55 +00:00
parent 7339e172e0
commit bb6f5a6104
4 changed files with 22 additions and 16 deletions

View File

@ -54,6 +54,7 @@ final class OioDatagramChannel extends AbstractChannel
implements DatagramChannel {
final MulticastSocket socket;
final Object interestOpsLock = new Object();
private final DatagramChannelConfig config;
volatile Thread workerThread;

View File

@ -56,12 +56,12 @@ class OioDatagramWorker implements Runnable {
final MulticastSocket socket = channel.socket;
while (channel.isOpen()) {
synchronized (this) {
synchronized (channel.interestOpsLock) {
while (!channel.isReadable()) {
try {
// notify() is not called at all.
// close() and setInterestOps() calls Thread.interrupt()
this.wait();
channel.interestOpsLock.wait();
} catch (InterruptedException e) {
if (!channel.isOpen()) {
break;
@ -144,13 +144,15 @@ class OioDatagramWorker implements Runnable {
future.setSuccess();
if (changed) {
channel.setInterestOpsNow(interestOps);
synchronized (channel.interestOpsLock) {
channel.setInterestOpsNow(interestOps);
// Notify the worker so it stops or continues reading.
Thread currentThread = Thread.currentThread();
Thread workerThread = channel.workerThread;
if (workerThread != null && currentThread != workerThread) {
workerThread.interrupt();
// Notify the worker so it stops or continues reading.
Thread currentThread = Thread.currentThread();
Thread workerThread = channel.workerThread;
if (workerThread != null && currentThread != workerThread) {
workerThread.interrupt();
}
}
fireChannelInterestChanged(channel);

View File

@ -50,6 +50,7 @@ abstract class OioSocketChannel extends AbstractChannel
implements SocketChannel {
final Socket socket;
final Object interestOpsLock = new Object();
private final SocketChannelConfig config;
volatile Thread workerThread;

View File

@ -59,12 +59,12 @@ class OioWorker implements Runnable {
final PushbackInputStream in = channel.getInputStream();
while (channel.isOpen()) {
synchronized (this) {
synchronized (channel.interestOpsLock) {
while (!channel.isReadable()) {
try {
// notify() is not called at all.
// close() and setInterestOps() calls Thread.interrupt()
this.wait();
channel.interestOpsLock.wait();
} catch (InterruptedException e) {
if (!channel.isOpen()) {
break;
@ -159,13 +159,15 @@ class OioWorker implements Runnable {
future.setSuccess();
if (changed) {
channel.setInterestOpsNow(interestOps);
synchronized (channel.interestOpsLock) {
channel.setInterestOpsNow(interestOps);
// Notify the worker so it stops or continues reading.
Thread currentThread = Thread.currentThread();
Thread workerThread = channel.workerThread;
if (workerThread != null && currentThread != workerThread) {
workerThread.interrupt();
// Notify the worker so it stops or continues reading.
Thread currentThread = Thread.currentThread();
Thread workerThread = channel.workerThread;
if (workerThread != null && currentThread != workerThread) {
workerThread.interrupt();
}
}
fireChannelInterestChanged(channel);