From f991a8c7d4475690da97a8d796c24e883acbfa98 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Sun, 3 Jun 2012 04:10:32 -0700 Subject: [PATCH] Fix a bug where DefaultChannelPipeline.write() doesn't find the buffer - Also fixed failures in SpdySessionHandlerTest --- .../codec/spdy/SpdySessionHandler.java | 6 +- .../codec/spdy/SpdySessionHandlerTest.java | 7 +++ .../netty/channel/DefaultChannelPipeline.java | 55 +++++++++++++------ 3 files changed, 48 insertions(+), 20 deletions(-) diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java index cfc45e4f94..2774f6a7c7 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java @@ -203,6 +203,7 @@ public class SpdySessionHandler extends ChannelHandlerAdapter { SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID); partialDataFrame.setData(spdyDataFrame.getData().readSlice(initialReceiveWindowSize)); ctx.nextOutboundMessageBuffer().add(partialDataFrame); + ctx.flush(); } } @@ -704,7 +705,7 @@ public class SpdySessionHandler extends ChannelHandlerAdapter { removeStream(ctx, streamID); SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamID, status); - ctx.nextOutboundMessageBuffer().add(spdyRstStreamFrame); + ctx.write(spdyRstStreamFrame); if (fireMessageReceived) { ctx.nextInboundMessageBuffer().add(spdyRstStreamFrame); ctx.fireInboundBufferUpdated(); @@ -884,8 +885,9 @@ public class SpdySessionHandler extends ChannelHandlerAdapter { } sendGoAwayFrame(ctx, SpdySessionStatus.OK); + ChannelFuture f = ctx.flush(); if (spdySession.noActiveStreams()) { - ctx.flush().addListener(new ClosingChannelFutureListener(ctx)); + f.addListener(new ClosingChannelFutureListener(ctx)); } else { closeSessionFuture = ctx.newFuture(); closeSessionFuture.addListener(new ClosingChannelFutureListener(ctx)); diff --git a/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java b/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java index e588801dfe..4b0d4efa37 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java @@ -19,6 +19,8 @@ import static io.netty.handler.codec.spdy.SpdyCodecUtil.*; import io.netty.channel.ChannelInboundHandlerContext; import io.netty.channel.ChannelInboundMessageHandlerAdapter; import io.netty.handler.codec.embedder.DecoderEmbedder; +import io.netty.logging.InternalLogger; +import io.netty.logging.InternalLoggerFactory; import java.util.List; import java.util.Map; @@ -28,6 +30,9 @@ import org.junit.Test; public class SpdySessionHandlerTest { + private static final InternalLogger logger = + InternalLoggerFactory.getInstance(SpdySessionHandlerTest.class); + private static final int closeSignal = SpdyCodecUtil.SPDY_SETTINGS_MAX_ID; private static final SpdySettingsFrame closeMessage = new DefaultSpdySettingsFrame(); @@ -252,6 +257,7 @@ public class SpdySessionHandlerTest { @Test public void testSpdyClientSessionHandler() { for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) { + logger.info("Running: testSpdyClientSessionHandler v" + version); testSpdySessionHandler(version, false); } } @@ -259,6 +265,7 @@ public class SpdySessionHandlerTest { @Test public void testSpdyServerSessionHandler() { for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) { + logger.info("Running: testSpdyServerSessionHandler v" + version); testSpdySessionHandler(version, true); } } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index dfc1e93597..433309a297 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -654,7 +654,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { static ChannelBuffer nextInboundByteBuffer(DefaultChannelHandlerContext ctx) { for (;;) { if (ctx == null) { - throw NoSuchBufferException.INSTANCE; + throw new NoSuchBufferException(); } ChannelBufferHolder in = ctx.in; if (in != null && !in.isBypass() && in.hasByteBuffer()) { @@ -667,7 +667,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { static Queue nextInboundMessageBuffer(DefaultChannelHandlerContext ctx) { for (;;) { if (ctx == null) { - throw NoSuchBufferException.INSTANCE; + throw new NoSuchBufferException(); } ChannelBufferHolder in = ctx.inbound(); if (in != null && !in.isBypass() && in.hasMessageBuffer()) { @@ -683,7 +683,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { if (directOutbound.hasByteBuffer()) { return directOutbound.byteBuffer(); } else { - throw NoSuchBufferException.INSTANCE; + throw new NoSuchBufferException(); } } @@ -701,7 +701,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { if (directOutbound.hasMessageBuffer()) { return directOutbound.messageBuffer(); } else { - throw NoSuchBufferException.INSTANCE; + throw new NoSuchBufferException(); } } @@ -1131,7 +1131,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { return write(firstOutboundContext(), message, future); } - ChannelFuture write(final DefaultChannelHandlerContext ctx, final Object message, final ChannelFuture future) { + ChannelFuture write(DefaultChannelHandlerContext ctx, final Object message, final ChannelFuture future) { if (message == null) { throw new NullPointerException("message"); } @@ -1139,24 +1139,42 @@ public class DefaultChannelPipeline implements ChannelPipeline { EventExecutor executor; ChannelBufferHolder out; - if (ctx != null) { - executor = ctx.executor(); + boolean msgBuf = false; + for (;;) { + if (ctx == null) { + executor = channel.eventLoop(); + out = directOutbound; + if (out.hasByteBuffer()) { + if(!(message instanceof ChannelBuffer)) { + throw new IllegalArgumentException( + "cannot write a message whose type is not " + + ChannelBuffer.class.getSimpleName() + ": " + message.getClass().getName()); + } + } else { + msgBuf = true; + } + break; + } + out = ctx.outbound(); - } else { - executor = channel().eventLoop(); - out = directOutbound; + if (out.hasMessageBuffer()) { + msgBuf = true; + executor = ctx.executor(); + break; + } else if (message instanceof ChannelBuffer) { + executor = ctx.executor(); + break; + } + + ctx = ctx.prev; } if (executor.inEventLoop()) { - if (out.hasMessageBuffer()) { + if (msgBuf) { out.messageBuffer().add(message); - } else if (message instanceof ChannelBuffer) { - ChannelBuffer m = (ChannelBuffer) message; - out.byteBuffer().writeBytes(m, m.readerIndex(), m.readableBytes()); } else { - throw new IllegalArgumentException( - "cannot write a message whose type is not " + - ChannelBuffer.class.getSimpleName() + ": " + message.getClass().getName()); + ChannelBuffer buf = (ChannelBuffer) message; + out.byteBuffer().writeBytes(buf, buf.readerIndex(), buf.readableBytes()); } if (ctx != null) { flush0(ctx, future); @@ -1165,10 +1183,11 @@ public class DefaultChannelPipeline implements ChannelPipeline { } return future; } else { + final DefaultChannelHandlerContext ctx0 = ctx; executor.execute(new Runnable() { @Override public void run() { - write(ctx, message, future); + write(ctx0, message, future); } }); }