ByteBridge.flush() does not flush anything if the target buffer is not writable but expandable

- Fixes #1055
- fire inboundBufferUpdated() again if the bridge was not flushed completely.
This commit is contained in:
Trustin Lee 2013-02-15 15:50:12 -08:00
parent dc43c2d8a9
commit 189c2785c0
2 changed files with 120 additions and 33 deletions

View File

@ -22,6 +22,12 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandlerAdapter; import io.netty.channel.ChannelInboundByteHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventExecutorGroup;
import io.netty.channel.EventExecutorGroup;
import io.netty.channel.socket.SocketChannel;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
@ -35,34 +41,81 @@ public class SocketEchoTest extends AbstractSocketTest {
private static final Random random = new Random(); private static final Random random = new Random();
static final byte[] data = new byte[1048576]; static final byte[] data = new byte[1048576];
private static EventExecutorGroup group;
static { static {
random.nextBytes(data); random.nextBytes(data);
} }
@Test @BeforeClass
public static void createGroup() {
group = new DefaultEventExecutorGroup(2);
}
@AfterClass
public static void destroyGroup() {
group.shutdown();
}
@Test(timeout = 30000)
public void testSimpleEcho() throws Throwable { public void testSimpleEcho() throws Throwable {
run(); run();
} }
public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, Integer.MAX_VALUE); testSimpleEcho0(sb, cb, Integer.MAX_VALUE, false);
} }
@Test @Test(timeout = 30000)
public void testSimpleEchoWithBridge() throws Throwable {
run();
}
public void testSimpleEchoWithBridge(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, Integer.MAX_VALUE, true);
}
@Test(timeout = 30000)
public void testSimpleEchoWithBoundedBuffer() throws Throwable { public void testSimpleEchoWithBoundedBuffer() throws Throwable {
run(); run();
} }
public void testSimpleEchoWithBoundedBuffer(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testSimpleEchoWithBoundedBuffer(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, 32); testSimpleEcho0(sb, cb, 32, false);
} }
private static void testSimpleEcho0(ServerBootstrap sb, Bootstrap cb, int maxInboundBufferSize) throws Throwable { @Test(timeout = 30000)
EchoHandler sh = new EchoHandler(maxInboundBufferSize); public void testSimpleEchoWithBridgedBoundedBuffer() throws Throwable {
EchoHandler ch = new EchoHandler(maxInboundBufferSize); run();
}
sb.childHandler(sh); public void testSimpleEchoWithBridgedBoundedBuffer(ServerBootstrap sb, Bootstrap cb) throws Throwable {
cb.handler(ch); testSimpleEcho0(sb, cb, 32, true);
}
private static void testSimpleEcho0(
ServerBootstrap sb, Bootstrap cb, int maxInboundBufferSize, boolean bridge) throws Throwable {
final EchoHandler sh = new EchoHandler(maxInboundBufferSize);
final EchoHandler ch = new EchoHandler(maxInboundBufferSize);
if (bridge) {
sb.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel c) throws Exception {
c.pipeline().addLast(group, sh);
}
});
cb.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel c) throws Exception {
c.pipeline().addLast(group, ch);
}
});
} else {
sb.childHandler(sh);
cb.handler(ch);
}
Channel sc = sb.bind().sync().channel(); Channel sc = sb.bind().sync().channel();
Channel cc = cb.connect().sync().channel(); Channel cc = cb.connect().sync().channel();

View File

@ -319,32 +319,36 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} }
} }
private void flushInboundBridge() { private boolean flushInboundBridge() {
if (inMsgBridge != null) { if (inMsgBridge != null) {
MessageBridge bridge = inMsgBridge; MessageBridge bridge = inMsgBridge;
if (bridge != null) { if (bridge != null) {
bridge.flush(inMsgBuf); return bridge.flush(inMsgBuf);
} }
} else if (inByteBridge != null) { } else if (inByteBridge != null) {
ByteBridge bridge = inByteBridge; ByteBridge bridge = inByteBridge;
if (bridge != null) { if (bridge != null) {
bridge.flush(inByteBuf); return bridge.flush(inByteBuf);
} }
} }
return true;
} }
private void flushOutboundBridge() { private boolean flushOutboundBridge() {
if (outMsgBridge != null) { if (outMsgBridge != null) {
MessageBridge bridge = outMsgBridge; MessageBridge bridge = outMsgBridge;
if (bridge != null) { if (bridge != null) {
bridge.flush(outMsgBuf); return bridge.flush(outMsgBuf);
} }
} else if (outByteBridge != null) { } else if (outByteBridge != null) {
ByteBridge bridge = outByteBridge; ByteBridge bridge = outByteBridge;
if (bridge != null) { if (bridge != null) {
bridge.flush(outByteBuf); return bridge.flush(outByteBuf);
} }
} }
return true;
} }
void freeHandlerBuffersAfterRemoval() { void freeHandlerBuffersAfterRemoval() {
@ -943,23 +947,34 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
private void invokeInboundBufferUpdated() { private void invokeInboundBufferUpdated() {
ChannelStateHandler handler = (ChannelStateHandler) handler(); ChannelStateHandler handler = (ChannelStateHandler) handler();
if (handler instanceof ChannelInboundHandler) {
flushInboundBridge();
}
try { if (handler instanceof ChannelInboundHandler) {
handler.inboundBufferUpdated(this); for (;;) {
} catch (Throwable t) {
pipeline.notifyHandlerException(t);
} finally {
if (handler instanceof ChannelInboundByteHandler && !isInboundBufferFreed()) {
try { try {
((ChannelInboundByteHandler) handler).discardInboundReadBytes(this); boolean flushedAll = flushInboundBridge();
handler.inboundBufferUpdated(this);
if (flushedAll) {
break;
}
} catch (Throwable t) { } catch (Throwable t) {
pipeline.notifyHandlerException(t); pipeline.notifyHandlerException(t);
} finally {
if (handler instanceof ChannelInboundByteHandler && !isInboundBufferFreed()) {
try {
((ChannelInboundByteHandler) handler).discardInboundReadBytes(this);
} catch (Throwable t) {
pipeline.notifyHandlerException(t);
}
}
freeHandlerBuffersAfterRemoval();
} }
} }
freeHandlerBuffersAfterRemoval(); } else {
try {
handler.inboundBufferUpdated(this);
} catch (Throwable t) {
pipeline.notifyHandlerException(t);
}
} }
} }
@ -1593,14 +1608,32 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
exchangeBuf.add(data); exchangeBuf.add(data);
} }
private void flush(MessageBuf<Object> out) { private boolean flush(MessageBuf<Object> out) {
for (;;) { for (;;) {
Object[] data = exchangeBuf.poll(); Object[] data = exchangeBuf.peek();
if (data == null) { if (data == null) {
break; return true;
} }
Collections.addAll(out, data); int i;
for (i = 0; i < data.length; i ++) {
Object m = data[i];
if (m == null) {
break;
}
if (out.offer(m)) {
data[i] = null;
} else {
System.arraycopy(data, i, data, 0, data.length - i);
for (int j = i + 1; j < data.length; j ++) {
data[j] = null;
}
return false;
}
}
exchangeBuf.remove();
} }
} }
@ -1650,17 +1683,18 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
exchangeBuf.add(data); exchangeBuf.add(data);
} }
private void flush(ByteBuf out) { private boolean flush(ByteBuf out) {
while (out.maxCapacity() != out.writerIndex()) { for (;;) {
ByteBuf data = exchangeBuf.peek(); ByteBuf data = exchangeBuf.peek();
if (data == null) { if (data == null) {
break; return true;
} }
if (out.writerIndex() > out.maxCapacity() - data.readableBytes()) { if (out.writerIndex() > out.maxCapacity() - data.readableBytes()) {
// The target buffer is not going to be able to accept all data in the bridge. // The target buffer is not going to be able to accept all data in the bridge.
out.capacity(out.maxCapacity()); out.capacity(out.maxCapacity());
out.writeBytes(data, out.writableBytes()); out.writeBytes(data, out.writableBytes());
return false;
} else { } else {
exchangeBuf.remove(); exchangeBuf.remove();
try { try {