From 189c2785c012c3981af4b12a4d29ae1fc8d85a78 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Fri, 15 Feb 2013 15:50:12 -0800 Subject: [PATCH] ByteBridge.flush() does not flush anything if the target buffer is not writable but expandable - Fixes #1055 - fire inboundBufferUpdated() again if the bridge was not flushed completely. --- .../transport/socket/SocketEchoTest.java | 71 ++++++++++++++-- .../channel/DefaultChannelHandlerContext.java | 82 +++++++++++++------ 2 files changed, 120 insertions(+), 33 deletions(-) diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java index 6ae7307f6e..7136abe578 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java @@ -22,6 +22,12 @@ import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundByteHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.DefaultEventExecutorGroup; +import io.netty.channel.EventExecutorGroup; +import io.netty.channel.socket.SocketChannel; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; @@ -35,34 +41,81 @@ public class SocketEchoTest extends AbstractSocketTest { private static final Random random = new Random(); static final byte[] data = new byte[1048576]; + private static EventExecutorGroup group; + static { random.nextBytes(data); } - @Test + @BeforeClass + public static void createGroup() { + group = new DefaultEventExecutorGroup(2); + } + + @AfterClass + public static void destroyGroup() { + group.shutdown(); + } + + @Test(timeout = 30000) public void testSimpleEcho() throws Throwable { run(); } public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable { - testSimpleEcho0(sb, cb, Integer.MAX_VALUE); + testSimpleEcho0(sb, cb, Integer.MAX_VALUE, false); } - @Test + @Test(timeout = 30000) + public void testSimpleEchoWithBridge() throws Throwable { + run(); + } + + public void testSimpleEchoWithBridge(ServerBootstrap sb, Bootstrap cb) throws Throwable { + testSimpleEcho0(sb, cb, Integer.MAX_VALUE, true); + } + + @Test(timeout = 30000) public void testSimpleEchoWithBoundedBuffer() throws Throwable { run(); } public void testSimpleEchoWithBoundedBuffer(ServerBootstrap sb, Bootstrap cb) throws Throwable { - testSimpleEcho0(sb, cb, 32); + testSimpleEcho0(sb, cb, 32, false); } - private static void testSimpleEcho0(ServerBootstrap sb, Bootstrap cb, int maxInboundBufferSize) throws Throwable { - EchoHandler sh = new EchoHandler(maxInboundBufferSize); - EchoHandler ch = new EchoHandler(maxInboundBufferSize); + @Test(timeout = 30000) + public void testSimpleEchoWithBridgedBoundedBuffer() throws Throwable { + run(); + } - sb.childHandler(sh); - cb.handler(ch); + public void testSimpleEchoWithBridgedBoundedBuffer(ServerBootstrap sb, Bootstrap cb) throws Throwable { + testSimpleEcho0(sb, cb, 32, true); + } + + private static void testSimpleEcho0( + ServerBootstrap sb, Bootstrap cb, int maxInboundBufferSize, boolean bridge) throws Throwable { + + final EchoHandler sh = new EchoHandler(maxInboundBufferSize); + final EchoHandler ch = new EchoHandler(maxInboundBufferSize); + + if (bridge) { + sb.childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel c) throws Exception { + c.pipeline().addLast(group, sh); + } + }); + cb.handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel c) throws Exception { + c.pipeline().addLast(group, ch); + } + }); + } else { + sb.childHandler(sh); + cb.handler(ch); + } Channel sc = sb.bind().sync().channel(); Channel cc = cb.connect().sync().channel(); diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index d7a7940f38..a371edf700 100755 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -319,32 +319,36 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } } - private void flushInboundBridge() { + private boolean flushInboundBridge() { if (inMsgBridge != null) { MessageBridge bridge = inMsgBridge; if (bridge != null) { - bridge.flush(inMsgBuf); + return bridge.flush(inMsgBuf); } } else if (inByteBridge != null) { ByteBridge bridge = inByteBridge; if (bridge != null) { - bridge.flush(inByteBuf); + return bridge.flush(inByteBuf); } } + + return true; } - private void flushOutboundBridge() { + private boolean flushOutboundBridge() { if (outMsgBridge != null) { MessageBridge bridge = outMsgBridge; if (bridge != null) { - bridge.flush(outMsgBuf); + return bridge.flush(outMsgBuf); } } else if (outByteBridge != null) { ByteBridge bridge = outByteBridge; if (bridge != null) { - bridge.flush(outByteBuf); + return bridge.flush(outByteBuf); } } + + return true; } void freeHandlerBuffersAfterRemoval() { @@ -943,23 +947,34 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements private void invokeInboundBufferUpdated() { ChannelStateHandler handler = (ChannelStateHandler) handler(); - if (handler instanceof ChannelInboundHandler) { - flushInboundBridge(); - } - try { - handler.inboundBufferUpdated(this); - } catch (Throwable t) { - pipeline.notifyHandlerException(t); - } finally { - if (handler instanceof ChannelInboundByteHandler && !isInboundBufferFreed()) { + if (handler instanceof ChannelInboundHandler) { + for (;;) { try { - ((ChannelInboundByteHandler) handler).discardInboundReadBytes(this); + boolean flushedAll = flushInboundBridge(); + handler.inboundBufferUpdated(this); + if (flushedAll) { + break; + } } catch (Throwable t) { pipeline.notifyHandlerException(t); + } finally { + if (handler instanceof ChannelInboundByteHandler && !isInboundBufferFreed()) { + try { + ((ChannelInboundByteHandler) handler).discardInboundReadBytes(this); + } catch (Throwable t) { + pipeline.notifyHandlerException(t); + } + } + freeHandlerBuffersAfterRemoval(); } } - freeHandlerBuffersAfterRemoval(); + } else { + try { + handler.inboundBufferUpdated(this); + } catch (Throwable t) { + pipeline.notifyHandlerException(t); + } } } @@ -1593,14 +1608,32 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements exchangeBuf.add(data); } - private void flush(MessageBuf out) { + private boolean flush(MessageBuf out) { for (;;) { - Object[] data = exchangeBuf.poll(); + Object[] data = exchangeBuf.peek(); if (data == null) { - break; + return true; } - Collections.addAll(out, data); + int i; + for (i = 0; i < data.length; i ++) { + Object m = data[i]; + if (m == null) { + break; + } + + if (out.offer(m)) { + data[i] = null; + } else { + System.arraycopy(data, i, data, 0, data.length - i); + for (int j = i + 1; j < data.length; j ++) { + data[j] = null; + } + return false; + } + } + + exchangeBuf.remove(); } } @@ -1650,17 +1683,18 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements exchangeBuf.add(data); } - private void flush(ByteBuf out) { - while (out.maxCapacity() != out.writerIndex()) { + private boolean flush(ByteBuf out) { + for (;;) { ByteBuf data = exchangeBuf.peek(); if (data == null) { - break; + return true; } if (out.writerIndex() > out.maxCapacity() - data.readableBytes()) { // The target buffer is not going to be able to accept all data in the bridge. out.capacity(out.maxCapacity()); out.writeBytes(data, out.writableBytes()); + return false; } else { exchangeBuf.remove(); try {