SPDY: make MAX_CONCURRENT_STREAMS unidirectional

This commit is contained in:
Jeff Pinner 2013-09-25 15:10:15 -07:00
parent 5a44162c52
commit dcced96bee
3 changed files with 183 additions and 133 deletions

View File

@ -29,42 +29,65 @@ final class SpdySession {
private static final SpdyProtocolException STREAM_CLOSED = new SpdyProtocolException("Stream closed");
private final AtomicInteger activeLocalStreams = new AtomicInteger();
private final AtomicInteger activeRemoteStreams = new AtomicInteger();
private final Map<Integer, StreamState> activeStreams =
new ConcurrentHashMap<Integer, StreamState>();
int numActiveStreams() {
return activeStreams.size();
int numActiveStreams(boolean remote) {
if (remote) {
return activeRemoteStreams.get();
} else {
return activeLocalStreams.get();
}
}
boolean noActiveStreams() {
return activeStreams.isEmpty();
}
boolean isActiveStream(int streamID) {
return activeStreams.containsKey(streamID);
boolean isActiveStream(int streamId) {
return activeStreams.containsKey(streamId);
}
// Stream-IDs should be iterated in priority order
Set<Integer> getActiveStreams() {
TreeSet<Integer> StreamIDs = new TreeSet<Integer>(new PriorityComparator());
StreamIDs.addAll(activeStreams.keySet());
return StreamIDs;
TreeSet<Integer> StreamIds = new TreeSet<Integer>(new PriorityComparator());
StreamIds.addAll(activeStreams.keySet());
return StreamIds;
}
void acceptStream(
int streamID, byte priority, boolean remoteSideClosed, boolean localSideClosed,
int sendWindowSize, int receiveWindowSize) {
int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed,
int sendWindowSize, int receiveWindowSize, boolean remote) {
if (!remoteSideClosed || !localSideClosed) {
activeStreams.put(
streamID,
StreamState state = activeStreams.put(
streamId,
new StreamState(priority, remoteSideClosed, localSideClosed, sendWindowSize, receiveWindowSize));
if (state == null) {
if (remote) {
activeRemoteStreams.incrementAndGet();
} else {
activeLocalStreams.incrementAndGet();
}
}
}
}
void removeStream(int streamID) {
Integer StreamID = streamID;
StreamState state = activeStreams.get(StreamID);
activeStreams.remove(StreamID);
private StreamState removeActiveStream(int streamId, boolean remote) {
StreamState state = activeStreams.remove(streamId);
if (state != null) {
if (remote) {
activeRemoteStreams.decrementAndGet();
} else {
activeLocalStreams.decrementAndGet();
}
}
return state;
}
void removeStream(int streamId, boolean remote) {
StreamState state = removeActiveStream(streamId, remote);
if (state != null) {
MessageEvent e = state.removePendingWrite();
while (e != null) {
@ -74,34 +97,32 @@ final class SpdySession {
}
}
boolean isRemoteSideClosed(int streamID) {
StreamState state = activeStreams.get(streamID);
boolean isRemoteSideClosed(int streamId) {
StreamState state = activeStreams.get(streamId);
return state == null || state.isRemoteSideClosed();
}
void closeRemoteSide(int streamID) {
Integer StreamID = streamID;
StreamState state = activeStreams.get(StreamID);
void closeRemoteSide(int streamId, boolean remote) {
StreamState state = activeStreams.get(streamId);
if (state != null) {
state.closeRemoteSide();
if (state.isLocalSideClosed()) {
activeStreams.remove(StreamID);
removeActiveStream(streamId, remote);
}
}
}
boolean isLocalSideClosed(int streamID) {
StreamState state = activeStreams.get(streamID);
boolean isLocalSideClosed(int streamId) {
StreamState state = activeStreams.get(streamId);
return state == null || state.isLocalSideClosed();
}
void closeLocalSide(int streamID) {
Integer StreamID = streamID;
StreamState state = activeStreams.get(StreamID);
void closeLocalSide(int streamId, boolean remote) {
StreamState state = activeStreams.get(streamId);
if (state != null) {
state.closeLocalSide();
if (state.isRemoteSideClosed()) {
activeStreams.remove(StreamID);
removeActiveStream(streamId, remote);
}
}
}
@ -111,38 +132,38 @@ final class SpdySession {
* no need to synchronize access to the StreamState
*/
boolean hasReceivedReply(int streamID) {
StreamState state = activeStreams.get(streamID);
boolean hasReceivedReply(int streamId) {
StreamState state = activeStreams.get(streamId);
return state != null && state.hasReceivedReply();
}
void receivedReply(int streamID) {
StreamState state = activeStreams.get(streamID);
void receivedReply(int streamId) {
StreamState state = activeStreams.get(streamId);
if (state != null) {
state.receivedReply();
}
}
int getSendWindowSize(int streamID) {
StreamState state = activeStreams.get(streamID);
int getSendWindowSize(int streamId) {
StreamState state = activeStreams.get(streamId);
return state != null ? state.getSendWindowSize() : -1;
}
int updateSendWindowSize(int streamID, int deltaWindowSize) {
StreamState state = activeStreams.get(streamID);
int updateSendWindowSize(int streamId, int deltaWindowSize) {
StreamState state = activeStreams.get(streamId);
return state != null ? state.updateSendWindowSize(deltaWindowSize) : -1;
}
int updateReceiveWindowSize(int streamID, int deltaWindowSize) {
StreamState state = activeStreams.get(streamID);
int updateReceiveWindowSize(int streamId, int deltaWindowSize) {
StreamState state = activeStreams.get(streamId);
if (deltaWindowSize > 0) {
state.setReceiveWindowSizeLowerBound(0);
}
return state != null ? state.updateReceiveWindowSize(deltaWindowSize) : -1;
}
int getReceiveWindowSizeLowerBound(int streamID) {
StreamState state = activeStreams.get(streamID);
int getReceiveWindowSizeLowerBound(int streamId) {
StreamState state = activeStreams.get(streamId);
return state != null ? state.getReceiveWindowSizeLowerBound() : 0;
}
@ -155,18 +176,18 @@ final class SpdySession {
}
}
boolean putPendingWrite(int streamID, MessageEvent evt) {
StreamState state = activeStreams.get(streamID);
boolean putPendingWrite(int streamId, MessageEvent evt) {
StreamState state = activeStreams.get(streamId);
return state != null && state.putPendingWrite(evt);
}
MessageEvent getPendingWrite(int streamID) {
StreamState state = activeStreams.get(streamID);
MessageEvent getPendingWrite(int streamId) {
StreamState state = activeStreams.get(streamId);
return state != null ? state.getPendingWrite() : null;
}
MessageEvent removePendingWrite(int streamID) {
StreamState state = activeStreams.get(streamID);
MessageEvent removePendingWrite(int streamId) {
StreamState state = activeStreams.get(streamId);
return state != null ? state.removePendingWrite() : null;
}

View File

@ -47,7 +47,6 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
private static final int DEFAULT_MAX_CONCURRENT_STREAMS = Integer.MAX_VALUE;
private volatile int remoteConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
private volatile int localConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
private volatile int maxConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
private static final int DEFAULT_WINDOW_SIZE = 64 * 1024; // 64 KB default initial window size
private volatile int initialSendWindowSize = DEFAULT_WINDOW_SIZE;
@ -133,7 +132,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
}
// Check if we received a data frame before receiving a SYN_REPLY
if (!isRemoteInitiatedID(streamId) && !spdySession.hasReceivedReply(streamId)) {
if (!isRemoteInitiatedId(streamId) && !spdySession.hasReceivedReply(streamId)) {
issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.PROTOCOL_ERROR);
return;
}
@ -206,7 +205,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
// Check if we received a valid SYN_STREAM frame
if (spdySynStreamFrame.isInvalid() ||
!isRemoteInitiatedID(streamId) ||
!isRemoteInitiatedId(streamId) ||
spdySession.isActiveStream(streamId)) {
issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.PROTOCOL_ERROR);
return;
@ -241,7 +240,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
// Check if we received a valid SYN_REPLY frame
if (spdySynReplyFrame.isInvalid() ||
isRemoteInitiatedID(streamId) ||
isRemoteInitiatedId(streamId) ||
spdySession.isRemoteSideClosed(streamId)) {
issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.INVALID_STREAM);
return;
@ -281,7 +280,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
int newConcurrentStreams =
spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
if (newConcurrentStreams >= 0) {
updateConcurrentStreams(newConcurrentStreams, true);
remoteConcurrentStreams = newConcurrentStreams;
}
// Persistence flag are inconsistent with the use of SETTINGS to communicate
@ -313,7 +312,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
if (isRemoteInitiatedID(spdyPingFrame.getId())) {
if (isRemoteInitiatedId(spdyPingFrame.getId())) {
Channels.write(ctx, Channels.future(e.getChannel()), spdyPingFrame, e.getRemoteAddress());
return;
}
@ -517,7 +516,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
int streamId = spdySynStreamFrame.getStreamId();
if (isRemoteInitiatedID(streamId)) {
if (isRemoteInitiatedId(streamId)) {
e.getFuture().setFailure(PROTOCOL_EXCEPTION);
return;
}
@ -536,7 +535,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
int streamId = spdySynReplyFrame.getStreamId();
// Frames must not be sent on half-closed streams
if (!isRemoteInitiatedID(streamId) || spdySession.isLocalSideClosed(streamId)) {
if (!isRemoteInitiatedId(streamId) || spdySession.isLocalSideClosed(streamId)) {
e.getFuture().setFailure(PROTOCOL_EXCEPTION);
return;
}
@ -558,7 +557,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
int newConcurrentStreams =
spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
if (newConcurrentStreams >= 0) {
updateConcurrentStreams(newConcurrentStreams, false);
localConcurrentStreams = newConcurrentStreams;
}
// Persistence flag are inconsistent with the use of SETTINGS to communicate
@ -580,7 +579,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
} else if (msg instanceof SpdyPingFrame) {
SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
if (isRemoteInitiatedID(spdyPingFrame.getId())) {
if (isRemoteInitiatedId(spdyPingFrame.getId())) {
e.getFuture().setFailure(new IllegalArgumentException(
"invalid PING ID: " + spdyPingFrame.getId()));
return;
@ -665,18 +664,9 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
* Helper functions
*/
private boolean isRemoteInitiatedID(int id) {
boolean serverID = isServerId(id);
return server && !serverID || !server && serverID;
}
private void updateConcurrentStreams(int newConcurrentStreams, boolean remote) {
if (remote) {
remoteConcurrentStreams = newConcurrentStreams;
} else {
localConcurrentStreams = newConcurrentStreams;
}
maxConcurrentStreams = Math.min(localConcurrentStreams, remoteConcurrentStreams);
private boolean isRemoteInitiatedId(int id) {
boolean serverId = isServerId(id);
return server && !serverId || !server && serverId;
}
// need to synchronize to prevent new streams from being created while updating active streams
@ -703,14 +693,15 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
return false;
}
int maxConcurrentStreams = this.maxConcurrentStreams; // read volatile once
if (spdySession.numActiveStreams() >= maxConcurrentStreams) {
boolean remote = isRemoteInitiatedId(streamId);
int maxConcurrentStreams = remote ? localConcurrentStreams : remoteConcurrentStreams;
if (spdySession.numActiveStreams(remote) >= maxConcurrentStreams) {
return false;
}
spdySession.acceptStream(
streamId, priority, remoteSideClosed, localSideClosed,
initialSendWindowSize, initialReceiveWindowSize);
if (isRemoteInitiatedID(streamId)) {
initialSendWindowSize, initialReceiveWindowSize, remote);
if (remote) {
lastGoodStreamId = streamId;
}
return true;
@ -718,9 +709,9 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
private void halfCloseStream(int streamId, boolean remote, ChannelFuture future) {
if (remote) {
spdySession.closeRemoteSide(streamId);
spdySession.closeRemoteSide(streamId, isRemoteInitiatedId(streamId));
} else {
spdySession.closeLocalSide(streamId);
spdySession.closeLocalSide(streamId, isRemoteInitiatedId(streamId));
}
if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
future.addListener(closeSessionFutureListener);
@ -728,7 +719,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
}
private void removeStream(int streamId, ChannelFuture future) {
spdySession.removeStream(streamId);
spdySession.removeStream(streamId, isRemoteInitiatedId(streamId));
if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
future.addListener(closeSessionFutureListener);
}

View File

@ -100,9 +100,6 @@ public class SpdySessionHandlerTest {
int localStreamId = server ? 1 : 2;
int remoteStreamId = server ? 2 : 1;
SpdyPingFrame localPingFrame = new DefaultSpdyPingFrame(localStreamId);
SpdyPingFrame remotePingFrame = new DefaultSpdyPingFrame(remoteStreamId);
SpdySynStreamFrame spdySynStreamFrame = new DefaultSpdySynStreamFrame(localStreamId, 0, (byte) 0);
spdySynStreamFrame.setHeader("Compression", "test");
@ -122,24 +119,11 @@ public class SpdySessionHandlerTest {
assertNull(sessionHandler.peek());
remoteStreamId += 2;
// Check if session handler correctly limits the number of
// concurrent streams in the SETTINGS frame
SpdySettingsFrame spdySettingsFrame = new DefaultSpdySettingsFrame();
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 0);
sessionHandler.offer(spdySettingsFrame);
assertNull(sessionHandler.peek());
sessionHandler.offer(spdySynStreamFrame);
assertRstStream(sessionHandler.poll(), localStreamId, SpdyStreamStatus.REFUSED_STREAM);
assertNull(sessionHandler.peek());
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 100);
sessionHandler.offer(spdySettingsFrame);
assertNull(sessionHandler.peek());
sessionHandler.offer(new DefaultSpdySynReplyFrame(remoteStreamId));
assertNull(sessionHandler.peek());
// Check if session handler returns PROTOCOL_ERROR if it receives
// multiple SYN_REPLY frames for the same active Stream-ID
sessionHandler.offer(new DefaultSpdySynReplyFrame(remoteStreamId));
assertNull(sessionHandler.peek());
sessionHandler.offer(new DefaultSpdySynReplyFrame(remoteStreamId));
assertRstStream(sessionHandler.poll(), remoteStreamId, SpdyStreamStatus.STREAM_IN_USE);
assertNull(sessionHandler.peek());
remoteStreamId += 2;
@ -166,6 +150,16 @@ public class SpdySessionHandlerTest {
assertRstStream(sessionHandler.poll(), localStreamId, SpdyStreamStatus.REFUSED_STREAM);
assertNull(sessionHandler.peek());
// Check if session handler rejects HEADERS for closed streams
int testStreamId = spdyDataFrame.getStreamId();
sessionHandler.offer(spdyDataFrame);
assertDataFrame(sessionHandler.poll(), testStreamId, spdyDataFrame.isLast());
assertNull(sessionHandler.peek());
spdyHeadersFrame.setStreamId(testStreamId);
sessionHandler.offer(spdyHeadersFrame);
assertRstStream(sessionHandler.poll(), testStreamId, SpdyStreamStatus.INVALID_STREAM);
assertNull(sessionHandler.peek());
// Check if session handler drops active streams if it receives
// a RST_STREAM frame for that Stream-ID
sessionHandler.offer(new DefaultSpdyRstStreamFrame(remoteStreamId, 3));
@ -193,31 +187,6 @@ public class SpdySessionHandlerTest {
assertNull(sessionHandler.peek());
spdySynStreamFrame.setStreamId(localStreamId);
// Check if session handler correctly handles updates to the max
// concurrent streams in the SETTINGS frame
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 2);
sessionHandler.offer(spdySettingsFrame);
assertNull(sessionHandler.peek());
sessionHandler.offer(spdySynStreamFrame);
assertRstStream(sessionHandler.poll(), localStreamId, SpdyStreamStatus.REFUSED_STREAM);
assertNull(sessionHandler.peek());
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 4);
sessionHandler.offer(spdySettingsFrame);
assertNull(sessionHandler.peek());
sessionHandler.offer(spdySynStreamFrame);
assertSynReply(sessionHandler.poll(), localStreamId, false, spdySynStreamFrame);
assertNull(sessionHandler.peek());
// Check if session handler rejects HEADERS for closed streams
int testStreamId = spdyDataFrame.getStreamId();
sessionHandler.offer(spdyDataFrame);
assertDataFrame(sessionHandler.poll(), testStreamId, spdyDataFrame.isLast());
assertNull(sessionHandler.peek());
spdyHeadersFrame.setStreamId(testStreamId);
sessionHandler.offer(spdyHeadersFrame);
assertRstStream(sessionHandler.poll(), testStreamId, SpdyStreamStatus.INVALID_STREAM);
assertNull(sessionHandler.peek());
// Check if session handler returns PROTOCOL_ERROR if it receives
// an invalid HEADERS frame
spdyHeadersFrame.setStreamId(localStreamId);
@ -226,6 +195,63 @@ public class SpdySessionHandlerTest {
assertRstStream(sessionHandler.poll(), localStreamId, SpdyStreamStatus.PROTOCOL_ERROR);
assertNull(sessionHandler.peek());
sessionHandler.finish();
}
@Test
public void testSpdyClientSessionHandler() {
for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) {
testSpdySessionHandler(version, false);
}
}
@Test
public void testSpdyClientSessionHandlerPing() {
for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) {
testSpdySessionHandlerPing(version, false);
}
}
@Test
public void testSpdyClientSessionHandlerGoAway() {
for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) {
testSpdySessionHandlerGoAway(version, false);
}
}
@Test
public void testSpdyServerSessionHandler() {
for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) {
testSpdySessionHandler(version, true);
}
}
@Test
public void testSpdyServerSessionHandlerPing() {
for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) {
testSpdySessionHandlerPing(version, true);
}
}
@Test
public void testSpdyServerSessionHandlerGoAway() {
for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) {
testSpdySessionHandlerGoAway(version, true);
}
}
private static void testSpdySessionHandlerPing(int version, boolean server) {
DecoderEmbedder<Object> sessionHandler =
new DecoderEmbedder<Object>(
new SpdySessionHandler(version, server), new EchoHandler(closeSignal, server));
sessionHandler.pollAll();
int localStreamId = server ? 1 : 2;
int remoteStreamId = server ? 2 : 1;
SpdyPingFrame localPingFrame = new DefaultSpdyPingFrame(localStreamId);
SpdyPingFrame remotePingFrame = new DefaultSpdyPingFrame(remoteStreamId);
// Check if session handler returns identical local PINGs
sessionHandler.offer(localPingFrame);
assertPing(sessionHandler.poll(), localPingFrame.getId());
@ -235,6 +261,32 @@ public class SpdySessionHandlerTest {
sessionHandler.offer(remotePingFrame);
assertNull(sessionHandler.peek());
sessionHandler.finish();
}
private static void testSpdySessionHandlerGoAway(int version, boolean server) {
DecoderEmbedder<Object> sessionHandler =
new DecoderEmbedder<Object>(
new SpdySessionHandler(version, server), new EchoHandler(closeSignal, server));
sessionHandler.pollAll();
int localStreamId = server ? 1 : 2;
int remoteStreamId = server ? 2 : 1;
SpdySynStreamFrame spdySynStreamFrame = new DefaultSpdySynStreamFrame(localStreamId, 0, (byte) 0);
spdySynStreamFrame.setHeader("Compression", "test");
SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(localStreamId);
spdyDataFrame.setLast(true);
// Send an initial request
sessionHandler.offer(spdySynStreamFrame);
assertSynReply(sessionHandler.poll(), localStreamId, false, spdySynStreamFrame);
assertNull(sessionHandler.peek());
sessionHandler.offer(spdyDataFrame);
assertDataFrame(sessionHandler.poll(), localStreamId, true);
assertNull(sessionHandler.peek());
// Check if session handler sends a GOAWAY frame when closing
sessionHandler.offer(closeMessage);
assertGoAway(sessionHandler.poll(), localStreamId);
@ -257,20 +309,6 @@ public class SpdySessionHandlerTest {
sessionHandler.finish();
}
@Test
public void testSpdyClientSessionHandler() {
for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) {
testSpdySessionHandler(version, false);
}
}
@Test
public void testSpdyServerSessionHandler() {
for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) {
testSpdySessionHandler(version, true);
}
}
// Echo Handler opens 4 half-closed streams on session connection
// and then sets the number of concurrent streams to 3
private static class EchoHandler extends SimpleChannelUpstreamHandler {
@ -297,9 +335,9 @@ public class SpdySessionHandlerTest {
spdySynStreamFrame.setStreamId(spdySynStreamFrame.getStreamId() + 2);
Channels.write(e.getChannel(), spdySynStreamFrame);
// Limit the number of concurrent streams to 3
// Limit the number of concurrent streams to 1
SpdySettingsFrame spdySettingsFrame = new DefaultSpdySettingsFrame();
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 3);
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 1);
Channels.write(e.getChannel(), spdySettingsFrame);
}