SPDY: port SpdySessionHandler to netty 4

This commit is contained in:
Jeff Pinner 2013-07-15 13:39:39 -07:00 committed by Norman Maurer
parent e879848056
commit b8200d975c
2 changed files with 139 additions and 138 deletions

View File

@ -15,6 +15,7 @@
*/
package io.netty.handler.codec.spdy;
import io.netty.channel.ChannelPromise;
import io.netty.util.internal.PlatformDependent;
import java.util.Comparator;
@ -37,66 +38,59 @@ final class SpdySession {
return activeStreams.isEmpty();
}
boolean isActiveStream(int streamID) {
return activeStreams.containsKey(Integer.valueOf(streamID));
boolean isActiveStream(int streamId) {
return activeStreams.containsKey(streamId);
}
// Stream-IDs should be iterated in priority order
Set<Integer> getActiveStreams() {
TreeSet<Integer> StreamIDs = new TreeSet<Integer>(new PriorityComparator());
StreamIDs.addAll(activeStreams.keySet());
return StreamIDs;
TreeSet<Integer> streamIds = new TreeSet<Integer>(new PriorityComparator());
streamIds.addAll(activeStreams.keySet());
return streamIds;
}
void acceptStream(
int streamID, byte priority, boolean remoteSideClosed, boolean localSideClosed,
int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed,
int sendWindowSize, int receiveWindowSize) {
if (!remoteSideClosed || !localSideClosed) {
activeStreams.put(
Integer.valueOf(streamID),
new StreamState(priority, remoteSideClosed, localSideClosed, sendWindowSize, receiveWindowSize));
activeStreams.put(streamId, new StreamState(
priority, remoteSideClosed, localSideClosed, sendWindowSize, receiveWindowSize));
}
}
boolean removeStream(int streamID) {
Integer StreamID = Integer.valueOf(streamID);
StreamState state = activeStreams.get(StreamID);
activeStreams.remove(StreamID);
void removeStream(int streamId, Throwable cause) {
StreamState state = activeStreams.remove(streamId);
if (state != null) {
return state.clearPendingWrites();
} else {
return false;
state.clearPendingWrites(cause);
}
}
boolean isRemoteSideClosed(int streamID) {
StreamState state = activeStreams.get(Integer.valueOf(streamID));
boolean isRemoteSideClosed(int streamId) {
StreamState state = activeStreams.get(streamId);
return state == null || state.isRemoteSideClosed();
}
void closeRemoteSide(int streamID) {
Integer StreamID = Integer.valueOf(streamID);
StreamState state = activeStreams.get(StreamID);
void closeRemoteSide(int streamId) {
StreamState state = activeStreams.get(streamId);
if (state != null) {
state.closeRemoteSide();
if (state.isLocalSideClosed()) {
activeStreams.remove(StreamID);
activeStreams.remove(streamId);
}
}
}
boolean isLocalSideClosed(int streamID) {
StreamState state = activeStreams.get(Integer.valueOf(streamID));
boolean isLocalSideClosed(int streamId) {
StreamState state = activeStreams.get(streamId);
return state == null || state.isLocalSideClosed();
}
void closeLocalSide(int streamID) {
Integer StreamID = Integer.valueOf(streamID);
StreamState state = activeStreams.get(StreamID);
void closeLocalSide(int streamId) {
StreamState state = activeStreams.get(streamId);
if (state != null) {
state.closeLocalSide();
if (state.isRemoteSideClosed()) {
activeStreams.remove(StreamID);
activeStreams.remove(streamId);
}
}
}
@ -105,41 +99,47 @@ final class SpdySession {
* hasReceivedReply and receivedReply are only called from channelRead()
* no need to synchronize access to the StreamState
*/
boolean hasReceivedReply(int streamID) {
StreamState state = activeStreams.get(Integer.valueOf(streamID));
boolean hasReceivedReply(int streamId) {
StreamState state = activeStreams.get(streamId);
return state != null && state.hasReceivedReply();
}
void receivedReply(int streamID) {
StreamState state = activeStreams.get(Integer.valueOf(streamID));
void receivedReply(int streamId) {
StreamState state = activeStreams.get(streamId);
if (state != null) {
state.receivedReply();
}
}
int getSendWindowSize(int streamID) {
StreamState state = activeStreams.get(Integer.valueOf(streamID));
int getSendWindowSize(int streamId) {
StreamState state = activeStreams.get(streamId);
return state != null ? state.getSendWindowSize() : -1;
}
int updateSendWindowSize(int streamID, int deltaWindowSize) {
StreamState state = activeStreams.get(Integer.valueOf(streamID));
int updateSendWindowSize(int streamId, int deltaWindowSize) {
StreamState state = activeStreams.get(streamId);
return state != null ? state.updateSendWindowSize(deltaWindowSize) : -1;
}
int updateReceiveWindowSize(int streamID, int deltaWindowSize) {
StreamState state = activeStreams.get(Integer.valueOf(streamID));
int updateReceiveWindowSize(int streamId, int deltaWindowSize) {
StreamState state = activeStreams.get(streamId);
if (deltaWindowSize > 0) {
state.setReceiveWindowSizeLowerBound(0);
}
return state != null ? state.updateReceiveWindowSize(deltaWindowSize) : -1;
}
int getReceiveWindowSizeLowerBound(int streamID) {
StreamState state = activeStreams.get(Integer.valueOf(streamID));
int getReceiveWindowSizeLowerBound(int streamId) {
StreamState state = activeStreams.get(streamId);
return state != null ? state.getReceiveWindowSizeLowerBound() : 0;
}
void updateAllSendWindowSizes(int deltaWindowSize) {
for (StreamState state: activeStreams.values()) {
state.updateSendWindowSize(deltaWindowSize);
}
}
void updateAllReceiveWindowSizes(int deltaWindowSize) {
for (StreamState state: activeStreams.values()) {
state.updateReceiveWindowSize(deltaWindowSize);
@ -149,18 +149,18 @@ final class SpdySession {
}
}
boolean putPendingWrite(int streamID, Object msg) {
StreamState state = activeStreams.get(Integer.valueOf(streamID));
return state != null && state.putPendingWrite(msg);
boolean putPendingWrite(int streamId, PendingWrite pendingWrite) {
StreamState state = activeStreams.get(streamId);
return state != null && state.putPendingWrite(pendingWrite);
}
Object getPendingWrite(int streamID) {
StreamState state = activeStreams.get(Integer.valueOf(streamID));
PendingWrite getPendingWrite(int streamId) {
StreamState state = activeStreams.get(streamId);
return state != null ? state.getPendingWrite() : null;
}
Object removePendingWrite(int streamID) {
StreamState state = activeStreams.get(Integer.valueOf(streamID));
PendingWrite removePendingWrite(int streamId) {
StreamState state = activeStreams.get(streamId);
return state != null ? state.removePendingWrite() : null;
}
@ -173,7 +173,7 @@ final class SpdySession {
private final AtomicInteger sendWindowSize;
private final AtomicInteger receiveWindowSize;
private int receiveWindowSizeLowerBound;
private final Queue<Object> pendingWriteQueue = new ConcurrentLinkedQueue<Object>();
private final Queue<PendingWrite> pendingWriteQueue = new ConcurrentLinkedQueue<PendingWrite>();
StreamState(
byte priority, boolean remoteSideClosed, boolean localSideClosed,
@ -233,24 +233,26 @@ final class SpdySession {
this.receiveWindowSizeLowerBound = receiveWindowSizeLowerBound;
}
boolean putPendingWrite(Object msg) {
boolean putPendingWrite(PendingWrite msg) {
return pendingWriteQueue.offer(msg);
}
Object getPendingWrite() {
PendingWrite getPendingWrite() {
return pendingWriteQueue.peek();
}
Object removePendingWrite() {
PendingWrite removePendingWrite() {
return pendingWriteQueue.poll();
}
boolean clearPendingWrites() {
if (pendingWriteQueue.isEmpty()) {
return false;
void clearPendingWrites(Throwable cause) {
for (;;) {
PendingWrite pendingWrite = pendingWriteQueue.poll();
if (pendingWrite == null) {
break;
}
pendingWrite.fail(cause);
}
pendingWriteQueue.clear();
return true;
}
}
@ -262,4 +264,19 @@ final class SpdySession {
return state1.getPriority() - state2.getPriority();
}
}
public static final class PendingWrite {
final SpdyDataFrame spdyDataFrame;
final ChannelPromise promise;
PendingWrite(SpdyDataFrame spdyDataFrame, ChannelPromise promise) {
this.spdyDataFrame = spdyDataFrame;
this.promise = promise;
}
void fail(Throwable cause) {
spdyDataFrame.release();
promise.setFailure(cause);
}
}
}

View File

@ -111,6 +111,7 @@ public class SpdySessionHandler
// Check if we received a data frame for a Stream-ID which is not open
if (!spdySession.isActiveStream(streamId)) {
spdyDataFrame.release();
if (streamId <= lastGoodStreamId) {
issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
} else if (!sentGoAwayFrame) {
@ -122,12 +123,14 @@ public class SpdySessionHandler
// Check if we received a data frame for a stream which is half-closed
if (spdySession.isRemoteSideClosed(streamId)) {
spdyDataFrame.release();
issueStreamError(ctx, streamId, SpdyStreamStatus.STREAM_ALREADY_CLOSED);
return;
}
// Check if we received a data frame before receiving a SYN_REPLY
if (!isRemoteInitiatedID(streamId) && !spdySession.hasReceivedReply(streamId)) {
spdyDataFrame.release();
issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
return;
}
@ -149,6 +152,7 @@ public class SpdySessionHandler
// This difference is stored for the session when writing the SETTINGS frame
// and is cleared once we send a WINDOW_UPDATE frame.
if (newWindowSize < spdySession.getReceiveWindowSizeLowerBound(streamId)) {
spdyDataFrame.release();
issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
return;
}
@ -265,7 +269,7 @@ public class SpdySessionHandler
*/
SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
removeStream(ctx, spdyRstStreamFrame.getStreamId());
removeStream(spdyRstStreamFrame.getStreamId());
} else if (msg instanceof SpdySettingsFrame) {
@ -377,13 +381,21 @@ public class SpdySessionHandler
ctx.fireChannelRead(msg);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
for (Integer streamId: spdySession.getActiveStreams()) {
removeStream(streamId);
}
ctx.fireChannelInactive();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof SpdyProtocolException) {
issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
}
super.exceptionCaught(ctx, cause);
ctx.fireExceptionCaught(cause);
}
@Override
@ -417,6 +429,7 @@ public class SpdySessionHandler
// Frames must not be sent on half-closed streams
if (spdySession.isLocalSideClosed(streamId)) {
spdyDataFrame.release();
promise.setFailure(PROTOCOL_EXCEPTION);
return;
}
@ -441,7 +454,7 @@ public class SpdySessionHandler
if (sendWindowSize <= 0) {
// Stream is stalled -- enqueue Data frame and return
spdySession.putPendingWrite(streamId, spdyDataFrame);
spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
return;
} else if (sendWindowSize < dataLength) {
// Stream is not stalled but we cannot send the entire frame
@ -452,46 +465,35 @@ public class SpdySessionHandler
spdyDataFrame.content().readSlice(sendWindowSize).retain());
// Enqueue the remaining data (will be the first frame queued)
spdySession.putPendingWrite(streamId, spdyDataFrame);
spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
// The transfer window size is pre-decremented when sending a data frame downstream.
// Close the stream on write failures that leaves the transfer window in a corrupt state.
//
// This is never sent because on write failure the connection will be closed
// immediately. Commenting out just in case I misunderstood it - T
//
//final SocketAddress remoteAddress = e.getRemoteAddress();
//final ChannelHandlerContext context = ctx;
//e.getFuture().addListener(new ChannelFutureListener() {
// @Override
// public void operationComplete(ChannelFuture future) throws Exception {
// if (!future.isSuccess()) {
// issueStreamError(context, streamId, SpdyStreamStatus.INTERNAL_ERROR);
// }
// }
//});
ctx.write(partialDataFrame, promise);
// Close the stream on write failures that leave the transfer window in a corrupt state.
final ChannelHandlerContext context = ctx;
ctx.write(partialDataFrame).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
issueStreamError(context, streamId, SpdyStreamStatus.INTERNAL_ERROR);
}
}
});
return;
} else {
// Window size is large enough to send entire data frame
spdySession.updateSendWindowSize(streamId, -1 * dataLength);
// The transfer window size is pre-decremented when sending a data frame downstream.
// Close the stream on write failures that leaves the transfer window in a corrupt state.
//
// This is never sent because on write failure the connection will be closed
// immediately. Commenting out just in case I misunderstood it - T
//
//final ChannelHandlerContext context = ctx;
//e.getFuture().addListener(new ChannelFutureListener() {
// @Override
// public void operationComplete(ChannelFuture future) throws Exception {
// if (!future.isSuccess()) {
// issueStreamError(context, streamId, SpdyStreamStatus.INTERNAL_ERROR);
// }
// }
//});
// Close the stream on write failures that leave the transfer window in a corrupt state.
final ChannelHandlerContext context = ctx;
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
issueStreamError(context, streamId, SpdyStreamStatus.INTERNAL_ERROR);
}
}
});
}
}
}
@ -538,7 +540,7 @@ public class SpdySessionHandler
} else if (msg instanceof SpdyRstStreamFrame) {
SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
removeStream(ctx, spdyRstStreamFrame.getStreamId());
removeStream(spdyRstStreamFrame.getStreamId());
} else if (msg instanceof SpdySettingsFrame) {
@ -621,7 +623,7 @@ public class SpdySessionHandler
private void issueSessionError(
ChannelHandlerContext ctx, SpdySessionStatus status) {
sendGoAwayFrame(ctx, status).addListener(ChannelFutureListener.CLOSE);
sendGoAwayFrame(ctx, status).addListener(new ClosingChannelFutureListener(ctx, ctx.newPromise()));
}
/*
@ -637,7 +639,7 @@ public class SpdySessionHandler
*/
private void issueStreamError(ChannelHandlerContext ctx, int streamId, SpdyStreamStatus status) {
boolean fireChannelRead = !spdySession.isRemoteSideClosed(streamId);
removeStream(ctx, streamId);
removeStream(streamId);
SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamId, status);
ctx.writeAndFlush(spdyRstStreamFrame);
@ -684,9 +686,7 @@ public class SpdySessionHandler
private synchronized void updateInitialSendWindowSize(int newInitialWindowSize) {
int deltaWindowSize = newInitialWindowSize - initialSendWindowSize;
initialSendWindowSize = newInitialWindowSize;
for (Integer streamId: spdySession.getActiveStreams()) {
spdySession.updateSendWindowSize(streamId.intValue(), deltaWindowSize);
}
spdySession.updateAllSendWindowSizes(deltaWindowSize);
}
// need to synchronize to prevent new streams from being created while updating active streams
@ -729,27 +729,26 @@ public class SpdySessionHandler
}
}
private void removeStream(ChannelHandlerContext ctx, int streamId) {
if (spdySession.removeStream(streamId)) {
ctx.fireExceptionCaught(STREAM_CLOSED);
}
private void removeStream(int streamId) {
spdySession.removeStream(streamId, STREAM_CLOSED);
if (closeSessionFuture != null && spdySession.noActiveStreams()) {
closeSessionFuture.trySuccess();
}
}
private void updateSendWindowSize(ChannelHandlerContext ctx, final int streamId, int deltaWindowSize) {
private void updateSendWindowSize(final ChannelHandlerContext ctx, final int streamId, int deltaWindowSize) {
synchronized (flowControlLock) {
int newWindowSize = spdySession.updateSendWindowSize(streamId, deltaWindowSize);
while (newWindowSize > 0) {
// Check if we have unblocked a stalled stream
SpdyDataFrame spdyDataFrame = (SpdyDataFrame) spdySession.getPendingWrite(streamId);
if (spdyDataFrame == null) {
SpdySession.PendingWrite pendingWrite = spdySession.getPendingWrite(streamId);
if (pendingWrite == null) {
break;
}
SpdyDataFrame spdyDataFrame = pendingWrite.spdyDataFrame;
int dataFrameSize = spdyDataFrame.content().readableBytes();
if (newWindowSize >= dataFrameSize) {
@ -757,28 +756,21 @@ public class SpdySessionHandler
spdySession.removePendingWrite(streamId);
newWindowSize = spdySession.updateSendWindowSize(streamId, -1 * dataFrameSize);
// The transfer window size is pre-decremented when sending a data frame downstream.
// Close the stream on write failures that leaves the transfer window in a corrupt state.
//
// This is never sent because on write failure the connection will be closed
// immediately. Commenting out just in case I misunderstood it - T
//
//final ChannelHandlerContext context = ctx;
//e.getFuture().addListener(new ChannelFutureListener() {
// @Override
// public void operationComplete(ChannelFuture future) throws Exception {
// if (!future.isSuccess()) {
// issueStreamError(context, streamId, SpdyStreamStatus.INTERNAL_ERROR);
// }
// }
//});
// Close the local side of the stream if this is the last frame
if (spdyDataFrame.isLast()) {
halfCloseStream(streamId, false);
}
ctx.fireChannelRead(spdyDataFrame);
// The transfer window size is pre-decremented when sending a data frame downstream.
// Close the stream on write failures that leave the transfer window in a corrupt state.
ctx.writeAndFlush(spdyDataFrame, pendingWrite.promise).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
issueStreamError(ctx, streamId, SpdyStreamStatus.INTERNAL_ERROR);
}
}
});
} else {
// We can send a partial frame
spdySession.updateSendWindowSize(streamId, -1 * newWindowSize);
@ -788,23 +780,15 @@ public class SpdySessionHandler
spdyDataFrame.content().readSlice(newWindowSize).retain());
// The transfer window size is pre-decremented when sending a data frame downstream.
// Close the stream on write failures that leaves the transfer window in a corrupt state.
//
// This is never sent because on write failure the connection will be closed
// immediately. Commenting out just in case I misunderstood it - T
//
//final SocketAddress remoteAddress = e.getRemoteAddress();
//final ChannelHandlerContext context = ctx;
//e.getFuture().addListener(new ChannelFutureListener() {
// @Override
// public void operationComplete(ChannelFuture future) throws Exception {
// if (!future.isSuccess()) {
// issueStreamError(context, streamId, SpdyStreamStatus.INTERNAL_ERROR);
// }
// }
//});
ctx.fireChannelRead(partialDataFrame);
// Close the stream on write failures that leave the transfer window in a corrupt state.
ctx.writeAndFlush(partialDataFrame).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
issueStreamError(ctx, streamId, SpdyStreamStatus.INTERNAL_ERROR);
}
}
});
newWindowSize = 0;
}