Fixed issue: NETTY-360 Changing Channel.interestOps sometimes does not work under heavy write traffic in NIO transport

* Made sure all setRawInterestOps() calls are protected by interestOpsLock
Fixed a race condition in the HexDumpProxy example
This commit is contained in:
Trustin Lee 2010-10-24 18:24:20 +00:00
parent d9dba0d754
commit 1f2285f57e
3 changed files with 47 additions and 53 deletions

View File

@ -604,24 +604,18 @@ class NioDatagramWorker implements Runnable {
close(key); close(key);
return; return;
} }
int interestOps;
boolean changed = false;
// interestOps can change at any time and at any thread. // interestOps can change at any time and at any thread.
// Acquire a lock to avoid possible race condition. // Acquire a lock to avoid possible race condition.
synchronized (channel.interestOpsLock) { synchronized (channel.interestOpsLock) {
interestOps = channel.getRawInterestOps(); int interestOps = channel.getRawInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) { if ((interestOps & SelectionKey.OP_WRITE) == 0) {
interestOps |= SelectionKey.OP_WRITE; interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true;
}
}
if (changed) {
channel.setRawInterestOpsNow(interestOps); channel.setRawInterestOpsNow(interestOps);
} }
} }
}
private void clearOpWrite(NioDatagramChannel channel) { private void clearOpWrite(NioDatagramChannel channel) {
Selector selector = this.selector; Selector selector = this.selector;
@ -633,24 +627,18 @@ class NioDatagramWorker implements Runnable {
close(key); close(key);
return; return;
} }
int interestOps;
boolean changed = false;
// interestOps can change at any time and at any thread. // interestOps can change at any time and at any thread.
// Acquire a lock to avoid possible race condition. // Acquire a lock to avoid possible race condition.
synchronized (channel.interestOpsLock) { synchronized (channel.interestOpsLock) {
interestOps = channel.getRawInterestOps(); int interestOps = channel.getRawInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) { if ((interestOps & SelectionKey.OP_WRITE) != 0) {
interestOps &= ~SelectionKey.OP_WRITE; interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true;
}
}
if (changed) {
channel.setRawInterestOpsNow(interestOps); channel.setRawInterestOpsNow(interestOps);
} }
} }
}
static void disconnect(NioDatagramChannel channel, ChannelFuture future) { static void disconnect(NioDatagramChannel channel, ChannelFuture future) {
boolean connected = channel.isConnected(); boolean connected = channel.isConnected();
@ -811,11 +799,13 @@ class NioDatagramWorker implements Runnable {
default: default:
throw new Error(); throw new Error();
} }
if (changed) {
channel.setRawInterestOpsNow(interestOps);
}
} }
future.setSuccess(); future.setSuccess();
if (changed) { if (changed) {
channel.setRawInterestOpsNow(interestOps);
fireChannelInterestChanged(channel); fireChannelInterestChanged(channel);
} }
} catch (final CancelledKeyException e) { } catch (final CancelledKeyException e) {

View File

@ -536,24 +536,18 @@ class NioWorker implements Runnable {
close(key); close(key);
return; return;
} }
int interestOps;
boolean changed = false;
// interestOps can change at any time and at any thread. // interestOps can change at any time and at any thread.
// Acquire a lock to avoid possible race condition. // Acquire a lock to avoid possible race condition.
synchronized (channel.interestOpsLock) { synchronized (channel.interestOpsLock) {
interestOps = channel.getRawInterestOps(); int interestOps = channel.getRawInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) { if ((interestOps & SelectionKey.OP_WRITE) == 0) {
interestOps |= SelectionKey.OP_WRITE; interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true;
}
}
if (changed) {
channel.setRawInterestOpsNow(interestOps); channel.setRawInterestOpsNow(interestOps);
} }
} }
}
private void clearOpWrite(NioSocketChannel channel) { private void clearOpWrite(NioSocketChannel channel) {
Selector selector = this.selector; Selector selector = this.selector;
@ -565,24 +559,18 @@ class NioWorker implements Runnable {
close(key); close(key);
return; return;
} }
int interestOps;
boolean changed = false;
// interestOps can change at any time and at any thread. // interestOps can change at any time and at any thread.
// Acquire a lock to avoid possible race condition. // Acquire a lock to avoid possible race condition.
synchronized (channel.interestOpsLock) { synchronized (channel.interestOpsLock) {
interestOps = channel.getRawInterestOps(); int interestOps = channel.getRawInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) { if ((interestOps & SelectionKey.OP_WRITE) != 0) {
interestOps &= ~SelectionKey.OP_WRITE; interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true;
}
}
if (changed) {
channel.setRawInterestOpsNow(interestOps); channel.setRawInterestOpsNow(interestOps);
} }
} }
}
void close(NioSocketChannel channel, ChannelFuture future) { void close(NioSocketChannel channel, ChannelFuture future) {
boolean connected = channel.isConnected(); boolean connected = channel.isConnected();
@ -719,11 +707,14 @@ class NioWorker implements Runnable {
default: default:
throw new Error(); throw new Error();
} }
if (changed) {
channel.setRawInterestOpsNow(interestOps);
}
} }
future.setSuccess(); future.setSuccess();
if (changed) { if (changed) {
channel.setRawInterestOpsNow(interestOps);
fireChannelInterestChanged(channel); fireChannelInterestChanged(channel);
} }
} catch (CancelledKeyException e) { } catch (CancelledKeyException e) {

View File

@ -41,6 +41,11 @@ public class HexDumpProxyInboundHandler extends SimpleChannelUpstreamHandler {
private final String remoteHost; private final String remoteHost;
private final int remotePort; private final int remotePort;
// This lock guards against the race condition that overrides the
// OP_READ flag incorrectly.
// See the related discussion: http://markmail.org/message/x7jc6mqx6ripynqf
final Object trafficLock = new Object();
private volatile Channel outboundChannel; private volatile Channel outboundChannel;
public HexDumpProxyInboundHandler( public HexDumpProxyInboundHandler(
@ -78,10 +83,11 @@ public class HexDumpProxyInboundHandler extends SimpleChannelUpstreamHandler {
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
throws Exception { throws Exception {
ChannelBuffer msg = (ChannelBuffer) e.getMessage(); ChannelBuffer msg = (ChannelBuffer) e.getMessage();
System.out.println(">>> " + ChannelBuffers.hexDump(msg)); //System.out.println(">>> " + ChannelBuffers.hexDump(msg));
synchronized (trafficLock) {
outboundChannel.write(msg); outboundChannel.write(msg);
// If outboundChannel is saturated, do not read until notified in // If outboundChannel is saturated, do not read until notified in
// OutboundHandler.channelInterestChanged(). // OutboundHandler.channelInterestChanged().
@ -89,16 +95,19 @@ public class HexDumpProxyInboundHandler extends SimpleChannelUpstreamHandler {
e.getChannel().setReadable(false); e.getChannel().setReadable(false);
} }
} }
}
@Override @Override
public void channelInterestChanged(ChannelHandlerContext ctx, public void channelInterestChanged(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception { ChannelStateEvent e) throws Exception {
// If inboundChannel is not saturated anymore, continue accepting // If inboundChannel is not saturated anymore, continue accepting
// the incoming traffic from the outboundChannel. // the incoming traffic from the outboundChannel.
synchronized (trafficLock) {
if (e.getChannel().isWritable()) { if (e.getChannel().isWritable()) {
outboundChannel.setReadable(true); outboundChannel.setReadable(true);
} }
} }
}
@Override @Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
@ -115,7 +124,7 @@ public class HexDumpProxyInboundHandler extends SimpleChannelUpstreamHandler {
closeOnFlush(e.getChannel()); closeOnFlush(e.getChannel());
} }
private static class OutboundHandler extends SimpleChannelUpstreamHandler { private class OutboundHandler extends SimpleChannelUpstreamHandler {
private final Channel inboundChannel; private final Channel inboundChannel;
@ -124,10 +133,11 @@ public class HexDumpProxyInboundHandler extends SimpleChannelUpstreamHandler {
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
throws Exception { throws Exception {
ChannelBuffer msg = (ChannelBuffer) e.getMessage(); ChannelBuffer msg = (ChannelBuffer) e.getMessage();
System.out.println("<<< " + ChannelBuffers.hexDump(msg)); //System.out.println("<<< " + ChannelBuffers.hexDump(msg));
synchronized (trafficLock) {
inboundChannel.write(msg); inboundChannel.write(msg);
// If inboundChannel is saturated, do not read until notified in // If inboundChannel is saturated, do not read until notified in
// HexDumpProxyInboundHandler.channelInterestChanged(). // HexDumpProxyInboundHandler.channelInterestChanged().
@ -135,16 +145,19 @@ public class HexDumpProxyInboundHandler extends SimpleChannelUpstreamHandler {
e.getChannel().setReadable(false); e.getChannel().setReadable(false);
} }
} }
}
@Override @Override
public void channelInterestChanged(ChannelHandlerContext ctx, public void channelInterestChanged(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception { ChannelStateEvent e) throws Exception {
// If outboundChannel is not saturated anymore, continue accepting // If outboundChannel is not saturated anymore, continue accepting
// the incoming traffic from the inboundChannel. // the incoming traffic from the inboundChannel.
synchronized (trafficLock) {
if (e.getChannel().isWritable()) { if (e.getChannel().isWritable()) {
inboundChannel.setReadable(true); inboundChannel.setReadable(true);
} }
} }
}
@Override @Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)