SPDY: ensure channel close happens after outbound message is written

This commit is contained in:
Jeff Pinner 2013-08-14 11:19:34 -07:00
parent eb80b4e104
commit fd5d75de14

View File

@ -59,7 +59,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
private volatile boolean sentGoAwayFrame;
private volatile boolean receivedGoAwayFrame;
private volatile ChannelFuture closeSessionFuture;
private volatile ChannelFutureListener closeSessionFutureListener;
private final boolean server;
private final boolean flowControl;
@ -181,7 +181,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
// Close the remote side of the stream if this is the last frame
if (spdyDataFrame.isLast()) {
halfCloseStream(streamId, true);
halfCloseStream(streamId, true, e.getFuture());
}
} else if (msg instanceof SpdySynStreamFrame) {
@ -256,7 +256,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
// Close the remote side of the stream if this is the last frame
if (spdySynReplyFrame.isLast()) {
halfCloseStream(streamId, true);
halfCloseStream(streamId, true, e.getFuture());
}
} else if (msg instanceof SpdyRstStreamFrame) {
@ -271,7 +271,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
*/
SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
removeStream(spdyRstStreamFrame.getStreamId());
removeStream(spdyRstStreamFrame.getStreamId(), e.getFuture());
} else if (msg instanceof SpdySettingsFrame) {
@ -345,7 +345,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
// Close the remote side of the stream if this is the last frame
if (spdyHeadersFrame.isLast()) {
halfCloseStream(streamId, true);
halfCloseStream(streamId, true, e.getFuture());
}
} else if (msg instanceof SpdyWindowUpdateFrame) {
@ -508,7 +508,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
// Close the local side of the stream if this is the last frame
if (spdyDataFrame.isLast()) {
halfCloseStream(streamId, false);
halfCloseStream(streamId, false, e.getFuture());
}
} else if (msg instanceof SpdySynStreamFrame) {
@ -542,13 +542,13 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
// Close the local side of the stream if this is the last frame
if (spdySynReplyFrame.isLast()) {
halfCloseStream(streamId, false);
halfCloseStream(streamId, false, e.getFuture());
}
} else if (msg instanceof SpdyRstStreamFrame) {
SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
removeStream(spdyRstStreamFrame.getStreamId());
removeStream(spdyRstStreamFrame.getStreamId(), e.getFuture());
} else if (msg instanceof SpdySettingsFrame) {
@ -606,7 +606,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
// Close the local side of the stream if this is the last frame
if (spdyHeadersFrame.isLast()) {
halfCloseStream(streamId, false);
halfCloseStream(streamId, false, e.getFuture());
}
} else if (msg instanceof SpdyWindowUpdateFrame) {
@ -650,10 +650,11 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
ChannelHandlerContext ctx, SocketAddress remoteAddress, int streamId, SpdyStreamStatus status) {
boolean fireMessageReceived = !spdySession.isRemoteSideClosed(streamId);
removeStream(streamId);
ChannelFuture future = Channels.future(ctx.getChannel());
removeStream(streamId, future);
SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamId, status);
Channels.write(ctx, Channels.future(ctx.getChannel()), spdyRstStreamFrame, remoteAddress);
Channels.write(ctx, future, spdyRstStreamFrame, remoteAddress);
if (fireMessageReceived) {
Channels.fireMessageReceived(ctx, spdyRstStreamFrame, remoteAddress);
}
@ -731,21 +732,21 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
return true;
}
private void halfCloseStream(int streamId, boolean remote) {
private void halfCloseStream(int streamId, boolean remote, ChannelFuture future) {
if (remote) {
spdySession.closeRemoteSide(streamId);
} else {
spdySession.closeLocalSide(streamId);
}
if (closeSessionFuture != null && spdySession.noActiveStreams()) {
closeSessionFuture.setSuccess();
if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
future.addListener(closeSessionFutureListener);
}
}
private void removeStream(int streamId) {
private void removeStream(int streamId, ChannelFuture future) {
spdySession.removeStream(streamId);
if (closeSessionFuture != null && spdySession.noActiveStreams()) {
closeSessionFuture.setSuccess();
if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
future.addListener(closeSessionFutureListener);
}
}
@ -782,7 +783,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
// Close the local side of the stream if this is the last frame
if (spdyDataFrame.isLast()) {
halfCloseStream(streamId, false);
halfCloseStream(streamId, false, e.getFuture());
}
Channels.write(ctx, e.getFuture(), spdyDataFrame, e.getRemoteAddress());
@ -827,8 +828,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
if (spdySession.noActiveStreams()) {
future.addListener(new ClosingChannelFutureListener(ctx, e));
} else {
closeSessionFuture = Channels.future(e.getChannel());
closeSessionFuture.addListener(new ClosingChannelFutureListener(ctx, e));
closeSessionFutureListener = new ClosingChannelFutureListener(ctx, e);
}
}