From 310a87a51d1abfee9e1f2441adeae31748d61ed3 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Tue, 18 Dec 2012 04:52:46 +0900 Subject: [PATCH] Fix #814 - Prevent IllegalBufferAccessException on write() and flush() - Also fixed a incorrect port of SpdySessionHandler - Previously, it closed the connection too early when sending a GOAWAY frame - After this fix, SpdySessionHandlerTest now passes again without the previous fix --- .../codec/spdy/SpdySessionHandler.java | 21 +++++----- .../codec/spdy/SpdySessionHandlerTest.java | 42 +++++++++---------- .../netty/channel/DefaultChannelPipeline.java | 23 ++++++---- 3 files changed, 45 insertions(+), 41 deletions(-) diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java index a67d8d5381..2ecfd8656c 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java @@ -25,7 +25,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundMessageHandler; import io.netty.channel.ChannelOutboundMessageHandler; -import java.nio.channels.ClosedChannelException; import java.util.concurrent.atomic.AtomicInteger; /** @@ -440,8 +439,7 @@ public class SpdySessionHandler @Override public void close(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { - sendGoAwayFrame(ctx); - super.close(ctx, future); + sendGoAwayFrame(ctx, future); } @Override @@ -878,20 +876,22 @@ public class SpdySessionHandler } } - private void sendGoAwayFrame(ChannelHandlerContext ctx) { + private void sendGoAwayFrame(ChannelHandlerContext ctx, ChannelFuture future) { // Avoid NotYetConnectedException if (!ctx.channel().isActive()) { + ctx.close(future); return; } sendGoAwayFrame(ctx, SpdySessionStatus.OK); ChannelFuture f = ctx.flush(); if (spdySession.noActiveStreams()) { - f.addListener(new ClosingChannelFutureListener(ctx)); + f.addListener(new ClosingChannelFutureListener(ctx, future)); } else { closeSessionFuture = ctx.newFuture(); - closeSessionFuture.addListener(new ClosingChannelFutureListener(ctx)); + closeSessionFuture.addListener(new ClosingChannelFutureListener(ctx, future)); } + // FIXME: Close the connection forcibly after timeout. } private synchronized void sendGoAwayFrame( @@ -905,16 +905,17 @@ public class SpdySessionHandler private static final class ClosingChannelFutureListener implements ChannelFutureListener { private final ChannelHandlerContext ctx; + private final ChannelFuture future; - ClosingChannelFutureListener(ChannelHandlerContext ctx) { + ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelFuture future) { this.ctx = ctx; + this.future = future; } @Override public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception { - if (!(sentGoAwayFuture.cause() instanceof ClosedChannelException)) { - ctx.close(); - } + System.err.println("ASDF"); + ctx.close(future); } } } diff --git a/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java b/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java index 1dc38028e8..8d7af8dc2f 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java @@ -113,7 +113,7 @@ public class SpdySessionHandlerTest { SpdyPingFrame remotePingFrame = new DefaultSpdyPingFrame(remoteStreamID); SpdySynStreamFrame spdySynStreamFrame = - new DefaultSpdySynStreamFrame(localStreamID, 0, (byte) 0); + new DefaultSpdySynStreamFrame(localStreamID, 0, (byte) 0); spdySynStreamFrame.setHeader("Compression", "test"); SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(localStreamID); @@ -233,25 +233,23 @@ public class SpdySessionHandlerTest { sessionHandler.writeInbound(remotePingFrame); assertNull(sessionHandler.readOutbound()); - // Note that we cannot use writeInbound() here because sending closeMessage will close the channel - // immediately, and then we cannot test if SYN_STREAM and DATA frames are ignored after a GOAWAY frame is sent. - //// 1. Sending this message will make EchoHandler send a GOAWAY frame and close the session. - sessionHandler.inboundBuffer().add(closeMessage); - //// 2. Sending SYN_STREAM after sending closeMessage should fail with REFUSED_STREAM. - spdySynStreamFrame.setStreamId(localStreamID + 2); - sessionHandler.inboundBuffer().add(spdySynStreamFrame); - //// 3. Sending DATA after sending closeMessage should do nothing. - spdyDataFrame.setStreamId(localStreamID + 2); - sessionHandler.inboundBuffer().add(spdyDataFrame); - - // At this point, we added three SPDY messages to the inbound buffer. Start testing. - sessionHandler.pipeline().fireInboundBufferUpdated(); - //// 1. Check if session handler sends a GOAWAY frame when closing + // Check if session handler sends a GOAWAY frame when closing + sessionHandler.writeInbound(closeMessage); assertGoAway(sessionHandler.readOutbound(), localStreamID); - //// 2. Check if session handler returns REFUSED_STREAM if it receives SYN_STREAM frames - //// after sending a GOAWAY frame - assertRstStream(sessionHandler.readOutbound(), localStreamID + 2, SpdyStreamStatus.REFUSED_STREAM); - //// 3. Check if session handler ignores Data frames after sending a GOAWAY frame + assertNull(sessionHandler.readOutbound()); + localStreamID += 2; + + // Check if session handler returns REFUSED_STREAM if it receives + // SYN_STREAM frames after sending a GOAWAY frame + spdySynStreamFrame.setStreamId(localStreamID); + sessionHandler.writeInbound(spdySynStreamFrame); + assertRstStream(sessionHandler.readOutbound(), localStreamID, SpdyStreamStatus.REFUSED_STREAM); + assertNull(sessionHandler.readOutbound()); + + // Check if session handler ignores Data frames after sending + // a GOAWAY frame + spdyDataFrame.setStreamId(localStreamID); + sessionHandler.writeInbound(spdyDataFrame); assertNull(sessionHandler.readOutbound()); sessionHandler.finish(); @@ -289,7 +287,7 @@ public class SpdySessionHandlerTest { // Initiate 4 new streams int streamID = server ? 2 : 1; SpdySynStreamFrame spdySynStreamFrame = - new DefaultSpdySynStreamFrame(streamID, 0, (byte) 0); + new DefaultSpdySynStreamFrame(streamID, 0, (byte) 0); spdySynStreamFrame.setLast(true); ctx.write(spdySynStreamFrame); spdySynStreamFrame.setStreamId(spdySynStreamFrame.getStreamId() + 2); @@ -308,8 +306,8 @@ public class SpdySessionHandlerTest { @Override public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof SpdyDataFrame || - msg instanceof SpdyPingFrame || - msg instanceof SpdyHeadersFrame) { + msg instanceof SpdyPingFrame || + msg instanceof SpdyHeadersFrame) { ctx.write(msg); return; diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index f757a74b06..b95417592e 100755 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -23,6 +23,7 @@ import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import java.net.SocketAddress; +import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.HashMap; import java.util.IdentityHashMap; @@ -1238,6 +1239,11 @@ public class DefaultChannelPipeline implements ChannelPipeline { } private void flush0(final DefaultChannelHandlerContext ctx, ChannelFuture future) { + if (!channel.isRegistered() && !channel.isActive()) { + future.setFailure(new ClosedChannelException()); + return; + } + try { ctx.flushBridge(); ((ChannelOperationHandler) ctx.handler()).flush(ctx, future); @@ -1320,17 +1326,16 @@ public class DefaultChannelPipeline implements ChannelPipeline { } private void write0(DefaultChannelHandlerContext ctx, Object message, ChannelFuture future, boolean msgBuf) { + if (!channel.isRegistered() && !channel.isActive()) { + future.setFailure(new ClosedChannelException()); + return; + } + if (msgBuf) { - MessageBuf outMsgBuf = ctx.outboundMessageBuffer(); - if (!outMsgBuf.isFreed()) { - outMsgBuf.add(message); - } + ctx.outboundMessageBuffer().add(message); } else { - ByteBuf outByteBuf = ctx.outboundByteBuffer(); - if (!outByteBuf.isFreed()) { - ByteBuf buf = (ByteBuf) message; - outByteBuf.writeBytes(buf, buf.readerIndex(), buf.readableBytes()); - } + ByteBuf buf = (ByteBuf) message; + ctx.outboundByteBuffer().writeBytes(buf, buf.readerIndex(), buf.readableBytes()); } flush0(ctx, future); }