From 857713ad4c3c77368966194896ef8cee446bdab3 Mon Sep 17 00:00:00 2001 From: Jeff Pinner Date: Fri, 8 Aug 2014 16:16:45 -0700 Subject: [PATCH] SPDY: fix SpdySessionHandler::updateSendWindowSize In Netty 3, downstream writes of SPDY data frames and upstream reads of SPDY window udpate frames occur on different threads. When receiving a window update frame, we synchronize on a java object (SpdySessionHandler::flowControlLock) while sending any pending writes that are now able to complete. When writing a data frame, we check the send window size to see if we are allowed to write it to the socket, or if we have to enqueue it as a pending write. To prevent races with the window update frame, this is also synchronized on the same SpdySessionHandler::flowControlLock. In Netty 4, upstream and downstream operations on any given channel now occur on the same thread. Since java locks are re-entrant, this now allows downstream writes to occur while processing window update frames. In particular, when we receive a window update frame that unblocks a pending write, this write completes which triggers an event notification on the response, which in turn triggers a write of a data frame. Since this is on the same thread it re-enters the lock and modifies the send window. When the write completes, we continue processing pending writes without knowledge that the window size has been decremented. --- .../netty/handler/codec/spdy/SpdySession.java | 5 +- .../codec/spdy/SpdySessionHandler.java | 196 ++++++++---------- 2 files changed, 96 insertions(+), 105 deletions(-) diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySession.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySession.java index 82dd25a15a..9e0df986c2 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySession.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySession.java @@ -170,10 +170,13 @@ final class SpdySession { } StreamState state = activeStreams.get(streamId); + if (state == null) { + return -1; + } if (deltaWindowSize > 0) { state.setReceiveWindowSizeLowerBound(0); } - return state != null ? state.updateReceiveWindowSize(deltaWindowSize) : -1; + return state.updateReceiveWindowSize(deltaWindowSize); } int getReceiveWindowSizeLowerBound(int streamId) { diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java index 6ded3d1796..6e54025632 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java @@ -52,8 +52,6 @@ public class SpdySessionHandler private int remoteConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS; private int localConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS; - private final Object flowControlLock = new Object(); - private final AtomicInteger pings = new AtomicInteger(); private boolean sentGoAwayFrame; @@ -485,57 +483,55 @@ public class SpdySessionHandler * sender must pause transmitting data frames. */ - synchronized (flowControlLock) { - int dataLength = spdyDataFrame.content().readableBytes(); - int sendWindowSize = spdySession.getSendWindowSize(streamId); - int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID); - sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize); + int dataLength = spdyDataFrame.content().readableBytes(); + int sendWindowSize = spdySession.getSendWindowSize(streamId); + int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID); + sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize); - if (sendWindowSize <= 0) { - // Stream is stalled -- enqueue Data frame and return - spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise)); - return; - } else if (sendWindowSize < dataLength) { - // Stream is not stalled but we cannot send the entire frame - spdySession.updateSendWindowSize(streamId, -1 * sendWindowSize); - spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize); + if (sendWindowSize <= 0) { + // Stream is stalled -- enqueue Data frame and return + spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise)); + return; + } else if (sendWindowSize < dataLength) { + // Stream is not stalled but we cannot send the entire frame + spdySession.updateSendWindowSize(streamId, -1 * sendWindowSize); + spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize); - // Create a partial data frame whose length is the current window size - SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId, - spdyDataFrame.content().readSlice(sendWindowSize).retain()); + // Create a partial data frame whose length is the current window size + SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId, + spdyDataFrame.content().readSlice(sendWindowSize).retain()); - // Enqueue the remaining data (will be the first frame queued) - spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise)); + // Enqueue the remaining data (will be the first frame queued) + spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise)); - // The transfer window size is pre-decremented when sending a data frame downstream. - // Close the session on write failures that leave the transfer window in a corrupt state. - final ChannelHandlerContext context = ctx; - ctx.write(partialDataFrame).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR); - } + // The transfer window size is pre-decremented when sending a data frame downstream. + // Close the session on write failures that leave the transfer window in a corrupt state. + final ChannelHandlerContext context = ctx; + ctx.write(partialDataFrame).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR); } - }); - return; - } else { - // Window size is large enough to send entire data frame - spdySession.updateSendWindowSize(streamId, -1 * dataLength); - spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataLength); + } + }); + return; + } else { + // Window size is large enough to send entire data frame + spdySession.updateSendWindowSize(streamId, -1 * dataLength); + spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataLength); - // The transfer window size is pre-decremented when sending a data frame downstream. - // Close the session on write failures that leave the transfer window in a corrupt state. - final ChannelHandlerContext context = ctx; - promise.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR); - } + // The transfer window size is pre-decremented when sending a data frame downstream. + // Close the session on write failures that leave the transfer window in a corrupt state. + final ChannelHandlerContext context = ctx; + promise.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR); } - }); - } + } + }); } // Close the local side of the stream if this is the last frame @@ -759,72 +755,64 @@ public class SpdySessionHandler } private void updateSendWindowSize(final ChannelHandlerContext ctx, int streamId, int deltaWindowSize) { - synchronized (flowControlLock) { - int newWindowSize = spdySession.updateSendWindowSize(streamId, deltaWindowSize); - if (streamId != SPDY_SESSION_STREAM_ID) { - int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID); - newWindowSize = Math.min(newWindowSize, sessionSendWindowSize); + spdySession.updateSendWindowSize(streamId, deltaWindowSize); + + while (true) { + // Check if we have unblocked a stalled stream + SpdySession.PendingWrite pendingWrite = spdySession.getPendingWrite(streamId); + if (pendingWrite == null) { + return; } - while (newWindowSize > 0) { - // Check if we have unblocked a stalled stream - SpdySession.PendingWrite pendingWrite = spdySession.getPendingWrite(streamId); - if (pendingWrite == null) { - break; - } + SpdyDataFrame spdyDataFrame = pendingWrite.spdyDataFrame; + int dataFrameSize = spdyDataFrame.content().readableBytes(); + int writeStreamId = spdyDataFrame.streamId(); + int sendWindowSize = spdySession.getSendWindowSize(writeStreamId); + int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID); + sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize); - SpdyDataFrame spdyDataFrame = pendingWrite.spdyDataFrame; - int dataFrameSize = spdyDataFrame.content().readableBytes(); - int writeStreamId = spdyDataFrame.streamId(); - if (streamId == SPDY_SESSION_STREAM_ID) { - newWindowSize = Math.min(newWindowSize, spdySession.getSendWindowSize(writeStreamId)); - } + if (sendWindowSize <= 0) { + return; + } else if (sendWindowSize < dataFrameSize) { + // We can send a partial frame + spdySession.updateSendWindowSize(writeStreamId, -1 * sendWindowSize); + spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize); - if (newWindowSize >= dataFrameSize) { - // Window size is large enough to send entire data frame - spdySession.removePendingWrite(writeStreamId); - newWindowSize = spdySession.updateSendWindowSize(writeStreamId, -1 * dataFrameSize); - int sessionSendWindowSize = - spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataFrameSize); - newWindowSize = Math.min(newWindowSize, sessionSendWindowSize); + // Create a partial data frame whose length is the current window size + SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(writeStreamId, + spdyDataFrame.content().readSlice(sendWindowSize).retain()); - // Close the local side of the stream if this is the last frame - if (spdyDataFrame.isLast()) { - halfCloseStream(writeStreamId, false, pendingWrite.promise); + // The transfer window size is pre-decremented when sending a data frame downstream. + // Close the session on write failures that leave the transfer window in a corrupt state. + ctx.writeAndFlush(partialDataFrame).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR); + } } + }); + } else { + // Window size is large enough to send entire data frame + spdySession.removePendingWrite(writeStreamId); + spdySession.updateSendWindowSize(writeStreamId, -1 * dataFrameSize); + spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataFrameSize); - // The transfer window size is pre-decremented when sending a data frame downstream. - // Close the session on write failures that leave the transfer window in a corrupt state. - ctx.writeAndFlush(spdyDataFrame, pendingWrite.promise).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR); - } - } - }); - } else { - // We can send a partial frame - spdySession.updateSendWindowSize(writeStreamId, -1 * newWindowSize); - spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * newWindowSize); - - // Create a partial data frame whose length is the current window size - SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(writeStreamId, - spdyDataFrame.content().readSlice(newWindowSize).retain()); - - // The transfer window size is pre-decremented when sending a data frame downstream. - // Close the session on write failures that leave the transfer window in a corrupt state. - ctx.writeAndFlush(partialDataFrame).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR); - } - } - }); - - newWindowSize = 0; + // Close the local side of the stream if this is the last frame + if (spdyDataFrame.isLast()) { + halfCloseStream(writeStreamId, false, pendingWrite.promise); } + + // The transfer window size is pre-decremented when sending a data frame downstream. + // Close the session on write failures that leave the transfer window in a corrupt state. + ctx.writeAndFlush(spdyDataFrame, pendingWrite.promise).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR); + } + } + }); } } }