From 1f2285f57e98ec286b4fc1505fdf46f0a6847376 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Sun, 24 Oct 2010 18:24:20 +0000 Subject: [PATCH] Fixed issue: NETTY-360 Changing Channel.interestOps sometimes does not work under heavy write traffic in NIO transport * Made sure all setRawInterestOps() calls are protected by interestOpsLock Fixed a race condition in the HexDumpProxy example --- .../channel/socket/nio/NioDatagramWorker.java | 24 +++------ .../netty/channel/socket/nio/NioWorker.java | 25 +++------ .../proxy/HexDumpProxyInboundHandler.java | 51 ++++++++++++------- 3 files changed, 47 insertions(+), 53 deletions(-) diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java index 68762403a8..b4358f2e7d 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java @@ -604,23 +604,17 @@ class NioDatagramWorker implements Runnable { close(key); return; } - int interestOps; - boolean changed = false; // interestOps can change at any time and at any thread. // Acquire a lock to avoid possible race condition. synchronized (channel.interestOpsLock) { - interestOps = channel.getRawInterestOps(); + int interestOps = channel.getRawInterestOps(); if ((interestOps & SelectionKey.OP_WRITE) == 0) { interestOps |= SelectionKey.OP_WRITE; key.interestOps(interestOps); - changed = true; + channel.setRawInterestOpsNow(interestOps); } } - - if (changed) { - channel.setRawInterestOpsNow(interestOps); - } } private void clearOpWrite(NioDatagramChannel channel) { @@ -633,23 +627,17 @@ class NioDatagramWorker implements Runnable { close(key); return; } - int interestOps; - boolean changed = false; // interestOps can change at any time and at any thread. // Acquire a lock to avoid possible race condition. synchronized (channel.interestOpsLock) { - interestOps = channel.getRawInterestOps(); + int interestOps = channel.getRawInterestOps(); if ((interestOps & SelectionKey.OP_WRITE) != 0) { interestOps &= ~SelectionKey.OP_WRITE; key.interestOps(interestOps); - changed = true; + channel.setRawInterestOpsNow(interestOps); } } - - if (changed) { - channel.setRawInterestOpsNow(interestOps); - } } static void disconnect(NioDatagramChannel channel, ChannelFuture future) { @@ -811,11 +799,13 @@ class NioDatagramWorker implements Runnable { default: throw new Error(); } + if (changed) { + channel.setRawInterestOpsNow(interestOps); + } } future.setSuccess(); if (changed) { - channel.setRawInterestOpsNow(interestOps); fireChannelInterestChanged(channel); } } catch (final CancelledKeyException e) { diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index 066771cd12..6e23d520e6 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -536,23 +536,17 @@ class NioWorker implements Runnable { close(key); return; } - int interestOps; - boolean changed = false; // interestOps can change at any time and at any thread. // Acquire a lock to avoid possible race condition. synchronized (channel.interestOpsLock) { - interestOps = channel.getRawInterestOps(); + int interestOps = channel.getRawInterestOps(); if ((interestOps & SelectionKey.OP_WRITE) == 0) { interestOps |= SelectionKey.OP_WRITE; key.interestOps(interestOps); - changed = true; + channel.setRawInterestOpsNow(interestOps); } } - - if (changed) { - channel.setRawInterestOpsNow(interestOps); - } } private void clearOpWrite(NioSocketChannel channel) { @@ -565,23 +559,17 @@ class NioWorker implements Runnable { close(key); return; } - int interestOps; - boolean changed = false; // interestOps can change at any time and at any thread. // Acquire a lock to avoid possible race condition. synchronized (channel.interestOpsLock) { - interestOps = channel.getRawInterestOps(); + int interestOps = channel.getRawInterestOps(); if ((interestOps & SelectionKey.OP_WRITE) != 0) { interestOps &= ~SelectionKey.OP_WRITE; key.interestOps(interestOps); - changed = true; + channel.setRawInterestOpsNow(interestOps); } } - - if (changed) { - channel.setRawInterestOpsNow(interestOps); - } } void close(NioSocketChannel channel, ChannelFuture future) { @@ -719,11 +707,14 @@ class NioWorker implements Runnable { default: throw new Error(); } + + if (changed) { + channel.setRawInterestOpsNow(interestOps); + } } future.setSuccess(); if (changed) { - channel.setRawInterestOpsNow(interestOps); fireChannelInterestChanged(channel); } } catch (CancelledKeyException e) { diff --git a/src/main/java/org/jboss/netty/example/proxy/HexDumpProxyInboundHandler.java b/src/main/java/org/jboss/netty/example/proxy/HexDumpProxyInboundHandler.java index e55da736ee..7890914ebd 100644 --- a/src/main/java/org/jboss/netty/example/proxy/HexDumpProxyInboundHandler.java +++ b/src/main/java/org/jboss/netty/example/proxy/HexDumpProxyInboundHandler.java @@ -41,6 +41,11 @@ public class HexDumpProxyInboundHandler extends SimpleChannelUpstreamHandler { private final String remoteHost; private final int remotePort; + // This lock guards against the race condition that overrides the + // OP_READ flag incorrectly. + // See the related discussion: http://markmail.org/message/x7jc6mqx6ripynqf + final Object trafficLock = new Object(); + private volatile Channel outboundChannel; public HexDumpProxyInboundHandler( @@ -78,15 +83,17 @@ public class HexDumpProxyInboundHandler extends SimpleChannelUpstreamHandler { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) throws Exception { ChannelBuffer msg = (ChannelBuffer) e.getMessage(); - System.out.println(">>> " + ChannelBuffers.hexDump(msg)); - outboundChannel.write(msg); - // If outboundChannel is saturated, do not read until notified in - // OutboundHandler.channelInterestChanged(). - if (!outboundChannel.isWritable()) { - e.getChannel().setReadable(false); + //System.out.println(">>> " + ChannelBuffers.hexDump(msg)); + synchronized (trafficLock) { + outboundChannel.write(msg); + // If outboundChannel is saturated, do not read until notified in + // OutboundHandler.channelInterestChanged(). + if (!outboundChannel.isWritable()) { + e.getChannel().setReadable(false); + } } } @@ -95,8 +102,10 @@ public class HexDumpProxyInboundHandler extends SimpleChannelUpstreamHandler { ChannelStateEvent e) throws Exception { // If inboundChannel is not saturated anymore, continue accepting // the incoming traffic from the outboundChannel. - if (e.getChannel().isWritable()) { - outboundChannel.setReadable(true); + synchronized (trafficLock) { + if (e.getChannel().isWritable()) { + outboundChannel.setReadable(true); + } } } @@ -115,7 +124,7 @@ public class HexDumpProxyInboundHandler extends SimpleChannelUpstreamHandler { closeOnFlush(e.getChannel()); } - private static class OutboundHandler extends SimpleChannelUpstreamHandler { + private class OutboundHandler extends SimpleChannelUpstreamHandler { private final Channel inboundChannel; @@ -124,15 +133,17 @@ public class HexDumpProxyInboundHandler extends SimpleChannelUpstreamHandler { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) throws Exception { ChannelBuffer msg = (ChannelBuffer) e.getMessage(); - System.out.println("<<< " + ChannelBuffers.hexDump(msg)); - inboundChannel.write(msg); - // If inboundChannel is saturated, do not read until notified in - // HexDumpProxyInboundHandler.channelInterestChanged(). - if (!inboundChannel.isWritable()) { - e.getChannel().setReadable(false); + //System.out.println("<<< " + ChannelBuffers.hexDump(msg)); + synchronized (trafficLock) { + inboundChannel.write(msg); + // If inboundChannel is saturated, do not read until notified in + // HexDumpProxyInboundHandler.channelInterestChanged(). + if (!inboundChannel.isWritable()) { + e.getChannel().setReadable(false); + } } } @@ -141,8 +152,10 @@ public class HexDumpProxyInboundHandler extends SimpleChannelUpstreamHandler { ChannelStateEvent e) throws Exception { // If outboundChannel is not saturated anymore, continue accepting // the incoming traffic from the inboundChannel. - if (e.getChannel().isWritable()) { - inboundChannel.setReadable(true); + synchronized (trafficLock) { + if (e.getChannel().isWritable()) { + inboundChannel.setReadable(true); + } } }