Fix #814 - Prevent IllegalBufferAccessException on write() and flush()

- Also fixed a incorrect port of SpdySessionHandler
  - Previously, it closed the connection too early when sending a GOAWAY frame
  - After this fix, SpdySessionHandlerTest now passes again without the previous fix
This commit is contained in:
Trustin Lee 2012-12-18 04:52:46 +09:00
parent 5a467b69bf
commit 310a87a51d
3 changed files with 45 additions and 41 deletions

View File

@ -25,7 +25,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandler;
import io.netty.channel.ChannelOutboundMessageHandler;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -440,8 +439,7 @@ public class SpdySessionHandler
@Override
public void close(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
sendGoAwayFrame(ctx);
super.close(ctx, future);
sendGoAwayFrame(ctx, future);
}
@Override
@ -878,20 +876,22 @@ public class SpdySessionHandler
}
}
private void sendGoAwayFrame(ChannelHandlerContext ctx) {
private void sendGoAwayFrame(ChannelHandlerContext ctx, ChannelFuture future) {
// Avoid NotYetConnectedException
if (!ctx.channel().isActive()) {
ctx.close(future);
return;
}
sendGoAwayFrame(ctx, SpdySessionStatus.OK);
ChannelFuture f = ctx.flush();
if (spdySession.noActiveStreams()) {
f.addListener(new ClosingChannelFutureListener(ctx));
f.addListener(new ClosingChannelFutureListener(ctx, future));
} else {
closeSessionFuture = ctx.newFuture();
closeSessionFuture.addListener(new ClosingChannelFutureListener(ctx));
closeSessionFuture.addListener(new ClosingChannelFutureListener(ctx, future));
}
// FIXME: Close the connection forcibly after timeout.
}
private synchronized void sendGoAwayFrame(
@ -905,16 +905,17 @@ public class SpdySessionHandler
private static final class ClosingChannelFutureListener implements ChannelFutureListener {
private final ChannelHandlerContext ctx;
private final ChannelFuture future;
ClosingChannelFutureListener(ChannelHandlerContext ctx) {
ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelFuture future) {
this.ctx = ctx;
this.future = future;
}
@Override
public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
if (!(sentGoAwayFuture.cause() instanceof ClosedChannelException)) {
ctx.close();
}
System.err.println("ASDF");
ctx.close(future);
}
}
}

View File

@ -113,7 +113,7 @@ public class SpdySessionHandlerTest {
SpdyPingFrame remotePingFrame = new DefaultSpdyPingFrame(remoteStreamID);
SpdySynStreamFrame spdySynStreamFrame =
new DefaultSpdySynStreamFrame(localStreamID, 0, (byte) 0);
new DefaultSpdySynStreamFrame(localStreamID, 0, (byte) 0);
spdySynStreamFrame.setHeader("Compression", "test");
SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(localStreamID);
@ -233,25 +233,23 @@ public class SpdySessionHandlerTest {
sessionHandler.writeInbound(remotePingFrame);
assertNull(sessionHandler.readOutbound());
// Note that we cannot use writeInbound() here because sending closeMessage will close the channel
// immediately, and then we cannot test if SYN_STREAM and DATA frames are ignored after a GOAWAY frame is sent.
//// 1. Sending this message will make EchoHandler send a GOAWAY frame and close the session.
sessionHandler.inboundBuffer().add(closeMessage);
//// 2. Sending SYN_STREAM after sending closeMessage should fail with REFUSED_STREAM.
spdySynStreamFrame.setStreamId(localStreamID + 2);
sessionHandler.inboundBuffer().add(spdySynStreamFrame);
//// 3. Sending DATA after sending closeMessage should do nothing.
spdyDataFrame.setStreamId(localStreamID + 2);
sessionHandler.inboundBuffer().add(spdyDataFrame);
// At this point, we added three SPDY messages to the inbound buffer. Start testing.
sessionHandler.pipeline().fireInboundBufferUpdated();
//// 1. Check if session handler sends a GOAWAY frame when closing
// Check if session handler sends a GOAWAY frame when closing
sessionHandler.writeInbound(closeMessage);
assertGoAway(sessionHandler.readOutbound(), localStreamID);
//// 2. Check if session handler returns REFUSED_STREAM if it receives SYN_STREAM frames
//// after sending a GOAWAY frame
assertRstStream(sessionHandler.readOutbound(), localStreamID + 2, SpdyStreamStatus.REFUSED_STREAM);
//// 3. Check if session handler ignores Data frames after sending a GOAWAY frame
assertNull(sessionHandler.readOutbound());
localStreamID += 2;
// Check if session handler returns REFUSED_STREAM if it receives
// SYN_STREAM frames after sending a GOAWAY frame
spdySynStreamFrame.setStreamId(localStreamID);
sessionHandler.writeInbound(spdySynStreamFrame);
assertRstStream(sessionHandler.readOutbound(), localStreamID, SpdyStreamStatus.REFUSED_STREAM);
assertNull(sessionHandler.readOutbound());
// Check if session handler ignores Data frames after sending
// a GOAWAY frame
spdyDataFrame.setStreamId(localStreamID);
sessionHandler.writeInbound(spdyDataFrame);
assertNull(sessionHandler.readOutbound());
sessionHandler.finish();
@ -289,7 +287,7 @@ public class SpdySessionHandlerTest {
// Initiate 4 new streams
int streamID = server ? 2 : 1;
SpdySynStreamFrame spdySynStreamFrame =
new DefaultSpdySynStreamFrame(streamID, 0, (byte) 0);
new DefaultSpdySynStreamFrame(streamID, 0, (byte) 0);
spdySynStreamFrame.setLast(true);
ctx.write(spdySynStreamFrame);
spdySynStreamFrame.setStreamId(spdySynStreamFrame.getStreamId() + 2);
@ -308,8 +306,8 @@ public class SpdySessionHandlerTest {
@Override
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof SpdyDataFrame ||
msg instanceof SpdyPingFrame ||
msg instanceof SpdyHeadersFrame) {
msg instanceof SpdyPingFrame ||
msg instanceof SpdyHeadersFrame) {
ctx.write(msg);
return;

View File

@ -23,6 +23,7 @@ import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.IdentityHashMap;
@ -1238,6 +1239,11 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
private void flush0(final DefaultChannelHandlerContext ctx, ChannelFuture future) {
if (!channel.isRegistered() && !channel.isActive()) {
future.setFailure(new ClosedChannelException());
return;
}
try {
ctx.flushBridge();
((ChannelOperationHandler) ctx.handler()).flush(ctx, future);
@ -1320,17 +1326,16 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
private void write0(DefaultChannelHandlerContext ctx, Object message, ChannelFuture future, boolean msgBuf) {
if (!channel.isRegistered() && !channel.isActive()) {
future.setFailure(new ClosedChannelException());
return;
}
if (msgBuf) {
MessageBuf<Object> outMsgBuf = ctx.outboundMessageBuffer();
if (!outMsgBuf.isFreed()) {
outMsgBuf.add(message);
}
ctx.outboundMessageBuffer().add(message);
} else {
ByteBuf outByteBuf = ctx.outboundByteBuffer();
if (!outByteBuf.isFreed()) {
ByteBuf buf = (ByteBuf) message;
outByteBuf.writeBytes(buf, buf.readerIndex(), buf.readableBytes());
}
ByteBuf buf = (ByteBuf) message;
ctx.outboundByteBuffer().writeBytes(buf, buf.readerIndex(), buf.readableBytes());
}
flush0(ctx, future);
}