SPDY: fix SpdySessionHandler::updateSendWindowSize
In Netty 3, downstream writes of SPDY data frames and upstream reads of SPDY window udpate frames occur on different threads. When receiving a window update frame, we synchronize on a java object (SpdySessionHandler::flowControlLock) while sending any pending writes that are now able to complete. When writing a data frame, we check the send window size to see if we are allowed to write it to the socket, or if we have to enqueue it as a pending write. To prevent races with the window update frame, this is also synchronized on the same SpdySessionHandler::flowControlLock. In Netty 4, upstream and downstream operations on any given channel now occur on the same thread. Since java locks are re-entrant, this now allows downstream writes to occur while processing window update frames. In particular, when we receive a window update frame that unblocks a pending write, this write completes which triggers an event notification on the response, which in turn triggers a write of a data frame. Since this is on the same thread it re-enters the lock and modifies the send window. When the write completes, we continue processing pending writes without knowledge that the window size has been decremented.
This commit is contained in:
parent
23d3b84273
commit
af26826348
@ -170,10 +170,13 @@ final class SpdySession {
|
|||||||
}
|
}
|
||||||
|
|
||||||
StreamState state = activeStreams.get(streamId);
|
StreamState state = activeStreams.get(streamId);
|
||||||
|
if (state == null) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
if (deltaWindowSize > 0) {
|
if (deltaWindowSize > 0) {
|
||||||
state.setReceiveWindowSizeLowerBound(0);
|
state.setReceiveWindowSizeLowerBound(0);
|
||||||
}
|
}
|
||||||
return state != null ? state.updateReceiveWindowSize(deltaWindowSize) : -1;
|
return state.updateReceiveWindowSize(deltaWindowSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
int getReceiveWindowSizeLowerBound(int streamId) {
|
int getReceiveWindowSizeLowerBound(int streamId) {
|
||||||
|
@ -51,8 +51,6 @@ public class SpdySessionHandler extends ChannelDuplexHandler {
|
|||||||
private int remoteConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
|
private int remoteConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
|
||||||
private int localConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
|
private int localConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
|
||||||
|
|
||||||
private final Object flowControlLock = new Object();
|
|
||||||
|
|
||||||
private final AtomicInteger pings = new AtomicInteger();
|
private final AtomicInteger pings = new AtomicInteger();
|
||||||
|
|
||||||
private boolean sentGoAwayFrame;
|
private boolean sentGoAwayFrame;
|
||||||
@ -484,57 +482,55 @@ public class SpdySessionHandler extends ChannelDuplexHandler {
|
|||||||
* sender must pause transmitting data frames.
|
* sender must pause transmitting data frames.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
synchronized (flowControlLock) {
|
int dataLength = spdyDataFrame.content().readableBytes();
|
||||||
int dataLength = spdyDataFrame.content().readableBytes();
|
int sendWindowSize = spdySession.getSendWindowSize(streamId);
|
||||||
int sendWindowSize = spdySession.getSendWindowSize(streamId);
|
int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
|
||||||
int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
|
sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
|
||||||
sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
|
|
||||||
|
|
||||||
if (sendWindowSize <= 0) {
|
if (sendWindowSize <= 0) {
|
||||||
// Stream is stalled -- enqueue Data frame and return
|
// Stream is stalled -- enqueue Data frame and return
|
||||||
spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
|
spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
|
||||||
return;
|
return;
|
||||||
} else if (sendWindowSize < dataLength) {
|
} else if (sendWindowSize < dataLength) {
|
||||||
// Stream is not stalled but we cannot send the entire frame
|
// Stream is not stalled but we cannot send the entire frame
|
||||||
spdySession.updateSendWindowSize(streamId, -1 * sendWindowSize);
|
spdySession.updateSendWindowSize(streamId, -1 * sendWindowSize);
|
||||||
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
|
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
|
||||||
|
|
||||||
// Create a partial data frame whose length is the current window size
|
// Create a partial data frame whose length is the current window size
|
||||||
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId,
|
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId,
|
||||||
spdyDataFrame.content().readSlice(sendWindowSize).retain());
|
spdyDataFrame.content().readSlice(sendWindowSize).retain());
|
||||||
|
|
||||||
// Enqueue the remaining data (will be the first frame queued)
|
// Enqueue the remaining data (will be the first frame queued)
|
||||||
spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
|
spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
|
||||||
|
|
||||||
// The transfer window size is pre-decremented when sending a data frame downstream.
|
// The transfer window size is pre-decremented when sending a data frame downstream.
|
||||||
// Close the session on write failures that leave the transfer window in a corrupt state.
|
// Close the session on write failures that leave the transfer window in a corrupt state.
|
||||||
final ChannelHandlerContext context = ctx;
|
final ChannelHandlerContext context = ctx;
|
||||||
ctx.write(partialDataFrame).addListener(new ChannelFutureListener() {
|
ctx.write(partialDataFrame).addListener(new ChannelFutureListener() {
|
||||||
@Override
|
@Override
|
||||||
public void operationComplete(ChannelFuture future) throws Exception {
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
if (!future.isSuccess()) {
|
if (!future.isSuccess()) {
|
||||||
issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
|
issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
return;
|
});
|
||||||
} else {
|
return;
|
||||||
// Window size is large enough to send entire data frame
|
} else {
|
||||||
spdySession.updateSendWindowSize(streamId, -1 * dataLength);
|
// Window size is large enough to send entire data frame
|
||||||
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataLength);
|
spdySession.updateSendWindowSize(streamId, -1 * dataLength);
|
||||||
|
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataLength);
|
||||||
|
|
||||||
// The transfer window size is pre-decremented when sending a data frame downstream.
|
// The transfer window size is pre-decremented when sending a data frame downstream.
|
||||||
// Close the session on write failures that leave the transfer window in a corrupt state.
|
// Close the session on write failures that leave the transfer window in a corrupt state.
|
||||||
final ChannelHandlerContext context = ctx;
|
final ChannelHandlerContext context = ctx;
|
||||||
promise.addListener(new ChannelFutureListener() {
|
promise.addListener(new ChannelFutureListener() {
|
||||||
@Override
|
@Override
|
||||||
public void operationComplete(ChannelFuture future) throws Exception {
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
if (!future.isSuccess()) {
|
if (!future.isSuccess()) {
|
||||||
issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
|
issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
}
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close the local side of the stream if this is the last frame
|
// Close the local side of the stream if this is the last frame
|
||||||
@ -758,72 +754,64 @@ public class SpdySessionHandler extends ChannelDuplexHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void updateSendWindowSize(final ChannelHandlerContext ctx, int streamId, int deltaWindowSize) {
|
private void updateSendWindowSize(final ChannelHandlerContext ctx, int streamId, int deltaWindowSize) {
|
||||||
synchronized (flowControlLock) {
|
spdySession.updateSendWindowSize(streamId, deltaWindowSize);
|
||||||
int newWindowSize = spdySession.updateSendWindowSize(streamId, deltaWindowSize);
|
|
||||||
if (streamId != SPDY_SESSION_STREAM_ID) {
|
while (true) {
|
||||||
int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
|
// Check if we have unblocked a stalled stream
|
||||||
newWindowSize = Math.min(newWindowSize, sessionSendWindowSize);
|
SpdySession.PendingWrite pendingWrite = spdySession.getPendingWrite(streamId);
|
||||||
|
if (pendingWrite == null) {
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (newWindowSize > 0) {
|
SpdyDataFrame spdyDataFrame = pendingWrite.spdyDataFrame;
|
||||||
// Check if we have unblocked a stalled stream
|
int dataFrameSize = spdyDataFrame.content().readableBytes();
|
||||||
SpdySession.PendingWrite pendingWrite = spdySession.getPendingWrite(streamId);
|
int writeStreamId = spdyDataFrame.streamId();
|
||||||
if (pendingWrite == null) {
|
int sendWindowSize = spdySession.getSendWindowSize(writeStreamId);
|
||||||
break;
|
int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
|
||||||
}
|
sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
|
||||||
|
|
||||||
SpdyDataFrame spdyDataFrame = pendingWrite.spdyDataFrame;
|
if (sendWindowSize <= 0) {
|
||||||
int dataFrameSize = spdyDataFrame.content().readableBytes();
|
return;
|
||||||
int writeStreamId = spdyDataFrame.streamId();
|
} else if (sendWindowSize < dataFrameSize) {
|
||||||
if (streamId == SPDY_SESSION_STREAM_ID) {
|
// We can send a partial frame
|
||||||
newWindowSize = Math.min(newWindowSize, spdySession.getSendWindowSize(writeStreamId));
|
spdySession.updateSendWindowSize(writeStreamId, -1 * sendWindowSize);
|
||||||
}
|
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
|
||||||
|
|
||||||
if (newWindowSize >= dataFrameSize) {
|
// Create a partial data frame whose length is the current window size
|
||||||
// Window size is large enough to send entire data frame
|
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(writeStreamId,
|
||||||
spdySession.removePendingWrite(writeStreamId);
|
spdyDataFrame.content().readSlice(sendWindowSize).retain());
|
||||||
newWindowSize = spdySession.updateSendWindowSize(writeStreamId, -1 * dataFrameSize);
|
|
||||||
int sessionSendWindowSize =
|
|
||||||
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataFrameSize);
|
|
||||||
newWindowSize = Math.min(newWindowSize, sessionSendWindowSize);
|
|
||||||
|
|
||||||
// Close the local side of the stream if this is the last frame
|
// The transfer window size is pre-decremented when sending a data frame downstream.
|
||||||
if (spdyDataFrame.isLast()) {
|
// Close the session on write failures that leave the transfer window in a corrupt state.
|
||||||
halfCloseStream(writeStreamId, false, pendingWrite.promise);
|
ctx.writeAndFlush(partialDataFrame).addListener(new ChannelFutureListener() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
|
if (!future.isSuccess()) {
|
||||||
|
issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
// Window size is large enough to send entire data frame
|
||||||
|
spdySession.removePendingWrite(writeStreamId);
|
||||||
|
spdySession.updateSendWindowSize(writeStreamId, -1 * dataFrameSize);
|
||||||
|
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataFrameSize);
|
||||||
|
|
||||||
// The transfer window size is pre-decremented when sending a data frame downstream.
|
// Close the local side of the stream if this is the last frame
|
||||||
// Close the session on write failures that leave the transfer window in a corrupt state.
|
if (spdyDataFrame.isLast()) {
|
||||||
ctx.writeAndFlush(spdyDataFrame, pendingWrite.promise).addListener(new ChannelFutureListener() {
|
halfCloseStream(writeStreamId, false, pendingWrite.promise);
|
||||||
@Override
|
|
||||||
public void operationComplete(ChannelFuture future) throws Exception {
|
|
||||||
if (!future.isSuccess()) {
|
|
||||||
issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
// We can send a partial frame
|
|
||||||
spdySession.updateSendWindowSize(writeStreamId, -1 * newWindowSize);
|
|
||||||
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * newWindowSize);
|
|
||||||
|
|
||||||
// Create a partial data frame whose length is the current window size
|
|
||||||
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(writeStreamId,
|
|
||||||
spdyDataFrame.content().readSlice(newWindowSize).retain());
|
|
||||||
|
|
||||||
// The transfer window size is pre-decremented when sending a data frame downstream.
|
|
||||||
// Close the session on write failures that leave the transfer window in a corrupt state.
|
|
||||||
ctx.writeAndFlush(partialDataFrame).addListener(new ChannelFutureListener() {
|
|
||||||
@Override
|
|
||||||
public void operationComplete(ChannelFuture future) throws Exception {
|
|
||||||
if (!future.isSuccess()) {
|
|
||||||
issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
newWindowSize = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The transfer window size is pre-decremented when sending a data frame downstream.
|
||||||
|
// Close the session on write failures that leave the transfer window in a corrupt state.
|
||||||
|
ctx.writeAndFlush(spdyDataFrame, pendingWrite.promise).addListener(new ChannelFutureListener() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
|
if (!future.isSuccess()) {
|
||||||
|
issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user