Fix a bug where DefaultChannelPipeline.write() doesn't find the buffer

- Also fixed failures in SpdySessionHandlerTest
This commit is contained in:
Trustin Lee 2012-06-03 04:10:32 -07:00
parent 3e0cbf0caa
commit f991a8c7d4
3 changed files with 48 additions and 20 deletions

View File

@ -203,6 +203,7 @@ public class SpdySessionHandler extends ChannelHandlerAdapter<Object, Object> {
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID); SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID);
partialDataFrame.setData(spdyDataFrame.getData().readSlice(initialReceiveWindowSize)); partialDataFrame.setData(spdyDataFrame.getData().readSlice(initialReceiveWindowSize));
ctx.nextOutboundMessageBuffer().add(partialDataFrame); ctx.nextOutboundMessageBuffer().add(partialDataFrame);
ctx.flush();
} }
} }
@ -704,7 +705,7 @@ public class SpdySessionHandler extends ChannelHandlerAdapter<Object, Object> {
removeStream(ctx, streamID); removeStream(ctx, streamID);
SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamID, status); SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamID, status);
ctx.nextOutboundMessageBuffer().add(spdyRstStreamFrame); ctx.write(spdyRstStreamFrame);
if (fireMessageReceived) { if (fireMessageReceived) {
ctx.nextInboundMessageBuffer().add(spdyRstStreamFrame); ctx.nextInboundMessageBuffer().add(spdyRstStreamFrame);
ctx.fireInboundBufferUpdated(); ctx.fireInboundBufferUpdated();
@ -884,8 +885,9 @@ public class SpdySessionHandler extends ChannelHandlerAdapter<Object, Object> {
} }
sendGoAwayFrame(ctx, SpdySessionStatus.OK); sendGoAwayFrame(ctx, SpdySessionStatus.OK);
ChannelFuture f = ctx.flush();
if (spdySession.noActiveStreams()) { if (spdySession.noActiveStreams()) {
ctx.flush().addListener(new ClosingChannelFutureListener(ctx)); f.addListener(new ClosingChannelFutureListener(ctx));
} else { } else {
closeSessionFuture = ctx.newFuture(); closeSessionFuture = ctx.newFuture();
closeSessionFuture.addListener(new ClosingChannelFutureListener(ctx)); closeSessionFuture.addListener(new ClosingChannelFutureListener(ctx));

View File

@ -19,6 +19,8 @@ import static io.netty.handler.codec.spdy.SpdyCodecUtil.*;
import io.netty.channel.ChannelInboundHandlerContext; import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter; import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.handler.codec.embedder.DecoderEmbedder; import io.netty.handler.codec.embedder.DecoderEmbedder;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -28,6 +30,9 @@ import org.junit.Test;
public class SpdySessionHandlerTest { public class SpdySessionHandlerTest {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SpdySessionHandlerTest.class);
private static final int closeSignal = SpdyCodecUtil.SPDY_SETTINGS_MAX_ID; private static final int closeSignal = SpdyCodecUtil.SPDY_SETTINGS_MAX_ID;
private static final SpdySettingsFrame closeMessage = new DefaultSpdySettingsFrame(); private static final SpdySettingsFrame closeMessage = new DefaultSpdySettingsFrame();
@ -252,6 +257,7 @@ public class SpdySessionHandlerTest {
@Test @Test
public void testSpdyClientSessionHandler() { public void testSpdyClientSessionHandler() {
for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) { for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) {
logger.info("Running: testSpdyClientSessionHandler v" + version);
testSpdySessionHandler(version, false); testSpdySessionHandler(version, false);
} }
} }
@ -259,6 +265,7 @@ public class SpdySessionHandlerTest {
@Test @Test
public void testSpdyServerSessionHandler() { public void testSpdyServerSessionHandler() {
for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) { for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) {
logger.info("Running: testSpdyServerSessionHandler v" + version);
testSpdySessionHandler(version, true); testSpdySessionHandler(version, true);
} }
} }

View File

@ -654,7 +654,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
static ChannelBuffer nextInboundByteBuffer(DefaultChannelHandlerContext ctx) { static ChannelBuffer nextInboundByteBuffer(DefaultChannelHandlerContext ctx) {
for (;;) { for (;;) {
if (ctx == null) { if (ctx == null) {
throw NoSuchBufferException.INSTANCE; throw new NoSuchBufferException();
} }
ChannelBufferHolder<Object> in = ctx.in; ChannelBufferHolder<Object> in = ctx.in;
if (in != null && !in.isBypass() && in.hasByteBuffer()) { if (in != null && !in.isBypass() && in.hasByteBuffer()) {
@ -667,7 +667,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
static Queue<Object> nextInboundMessageBuffer(DefaultChannelHandlerContext ctx) { static Queue<Object> nextInboundMessageBuffer(DefaultChannelHandlerContext ctx) {
for (;;) { for (;;) {
if (ctx == null) { if (ctx == null) {
throw NoSuchBufferException.INSTANCE; throw new NoSuchBufferException();
} }
ChannelBufferHolder<Object> in = ctx.inbound(); ChannelBufferHolder<Object> in = ctx.inbound();
if (in != null && !in.isBypass() && in.hasMessageBuffer()) { if (in != null && !in.isBypass() && in.hasMessageBuffer()) {
@ -683,7 +683,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
if (directOutbound.hasByteBuffer()) { if (directOutbound.hasByteBuffer()) {
return directOutbound.byteBuffer(); return directOutbound.byteBuffer();
} else { } else {
throw NoSuchBufferException.INSTANCE; throw new NoSuchBufferException();
} }
} }
@ -701,7 +701,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
if (directOutbound.hasMessageBuffer()) { if (directOutbound.hasMessageBuffer()) {
return directOutbound.messageBuffer(); return directOutbound.messageBuffer();
} else { } else {
throw NoSuchBufferException.INSTANCE; throw new NoSuchBufferException();
} }
} }
@ -1131,7 +1131,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
return write(firstOutboundContext(), message, future); return write(firstOutboundContext(), message, future);
} }
ChannelFuture write(final DefaultChannelHandlerContext ctx, final Object message, final ChannelFuture future) { ChannelFuture write(DefaultChannelHandlerContext ctx, final Object message, final ChannelFuture future) {
if (message == null) { if (message == null) {
throw new NullPointerException("message"); throw new NullPointerException("message");
} }
@ -1139,24 +1139,42 @@ public class DefaultChannelPipeline implements ChannelPipeline {
EventExecutor executor; EventExecutor executor;
ChannelBufferHolder<Object> out; ChannelBufferHolder<Object> out;
if (ctx != null) { boolean msgBuf = false;
executor = ctx.executor(); for (;;) {
if (ctx == null) {
executor = channel.eventLoop();
out = directOutbound;
if (out.hasByteBuffer()) {
if(!(message instanceof ChannelBuffer)) {
throw new IllegalArgumentException(
"cannot write a message whose type is not " +
ChannelBuffer.class.getSimpleName() + ": " + message.getClass().getName());
}
} else {
msgBuf = true;
}
break;
}
out = ctx.outbound(); out = ctx.outbound();
} else { if (out.hasMessageBuffer()) {
executor = channel().eventLoop(); msgBuf = true;
out = directOutbound; executor = ctx.executor();
break;
} else if (message instanceof ChannelBuffer) {
executor = ctx.executor();
break;
}
ctx = ctx.prev;
} }
if (executor.inEventLoop()) { if (executor.inEventLoop()) {
if (out.hasMessageBuffer()) { if (msgBuf) {
out.messageBuffer().add(message); out.messageBuffer().add(message);
} else if (message instanceof ChannelBuffer) {
ChannelBuffer m = (ChannelBuffer) message;
out.byteBuffer().writeBytes(m, m.readerIndex(), m.readableBytes());
} else { } else {
throw new IllegalArgumentException( ChannelBuffer buf = (ChannelBuffer) message;
"cannot write a message whose type is not " + out.byteBuffer().writeBytes(buf, buf.readerIndex(), buf.readableBytes());
ChannelBuffer.class.getSimpleName() + ": " + message.getClass().getName());
} }
if (ctx != null) { if (ctx != null) {
flush0(ctx, future); flush0(ctx, future);
@ -1165,10 +1183,11 @@ public class DefaultChannelPipeline implements ChannelPipeline {
} }
return future; return future;
} else { } else {
final DefaultChannelHandlerContext ctx0 = ctx;
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
write(ctx, message, future); write(ctx0, message, future);
} }
}); });
} }