From dcced96bee5cdd426bd4cac8f0c82d3c94090f63 Mon Sep 17 00:00:00 2001 From: Jeff Pinner Date: Wed, 25 Sep 2013 15:10:15 -0700 Subject: [PATCH] SPDY: make MAX_CONCURRENT_STREAMS unidirectional --- .../netty/handler/codec/spdy/SpdySession.java | 111 ++++++++----- .../codec/spdy/SpdySessionHandler.java | 49 +++--- .../codec/spdy/SpdySessionHandlerTest.java | 156 +++++++++++------- 3 files changed, 183 insertions(+), 133 deletions(-) diff --git a/src/main/java/org/jboss/netty/handler/codec/spdy/SpdySession.java b/src/main/java/org/jboss/netty/handler/codec/spdy/SpdySession.java index b200b5e1ec..4e6c465e40 100644 --- a/src/main/java/org/jboss/netty/handler/codec/spdy/SpdySession.java +++ b/src/main/java/org/jboss/netty/handler/codec/spdy/SpdySession.java @@ -29,42 +29,65 @@ final class SpdySession { private static final SpdyProtocolException STREAM_CLOSED = new SpdyProtocolException("Stream closed"); + private final AtomicInteger activeLocalStreams = new AtomicInteger(); + private final AtomicInteger activeRemoteStreams = new AtomicInteger(); private final Map activeStreams = new ConcurrentHashMap(); - int numActiveStreams() { - return activeStreams.size(); + int numActiveStreams(boolean remote) { + if (remote) { + return activeRemoteStreams.get(); + } else { + return activeLocalStreams.get(); + } } boolean noActiveStreams() { return activeStreams.isEmpty(); } - boolean isActiveStream(int streamID) { - return activeStreams.containsKey(streamID); + boolean isActiveStream(int streamId) { + return activeStreams.containsKey(streamId); } // Stream-IDs should be iterated in priority order Set getActiveStreams() { - TreeSet StreamIDs = new TreeSet(new PriorityComparator()); - StreamIDs.addAll(activeStreams.keySet()); - return StreamIDs; + TreeSet StreamIds = new TreeSet(new PriorityComparator()); + StreamIds.addAll(activeStreams.keySet()); + return StreamIds; } void acceptStream( - int streamID, byte priority, boolean remoteSideClosed, boolean localSideClosed, - int sendWindowSize, int receiveWindowSize) { + int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed, + int sendWindowSize, int receiveWindowSize, boolean remote) { if (!remoteSideClosed || !localSideClosed) { - activeStreams.put( - streamID, + StreamState state = activeStreams.put( + streamId, new StreamState(priority, remoteSideClosed, localSideClosed, sendWindowSize, receiveWindowSize)); + if (state == null) { + if (remote) { + activeRemoteStreams.incrementAndGet(); + } else { + activeLocalStreams.incrementAndGet(); + } + } } } - void removeStream(int streamID) { - Integer StreamID = streamID; - StreamState state = activeStreams.get(StreamID); - activeStreams.remove(StreamID); + private StreamState removeActiveStream(int streamId, boolean remote) { + StreamState state = activeStreams.remove(streamId); + if (state != null) { + if (remote) { + activeRemoteStreams.decrementAndGet(); + } else { + activeLocalStreams.decrementAndGet(); + } + } + return state; + } + + void removeStream(int streamId, boolean remote) { + StreamState state = removeActiveStream(streamId, remote); if (state != null) { MessageEvent e = state.removePendingWrite(); while (e != null) { @@ -74,34 +97,32 @@ final class SpdySession { } } - boolean isRemoteSideClosed(int streamID) { - StreamState state = activeStreams.get(streamID); + boolean isRemoteSideClosed(int streamId) { + StreamState state = activeStreams.get(streamId); return state == null || state.isRemoteSideClosed(); } - void closeRemoteSide(int streamID) { - Integer StreamID = streamID; - StreamState state = activeStreams.get(StreamID); + void closeRemoteSide(int streamId, boolean remote) { + StreamState state = activeStreams.get(streamId); if (state != null) { state.closeRemoteSide(); if (state.isLocalSideClosed()) { - activeStreams.remove(StreamID); + removeActiveStream(streamId, remote); } } } - boolean isLocalSideClosed(int streamID) { - StreamState state = activeStreams.get(streamID); + boolean isLocalSideClosed(int streamId) { + StreamState state = activeStreams.get(streamId); return state == null || state.isLocalSideClosed(); } - void closeLocalSide(int streamID) { - Integer StreamID = streamID; - StreamState state = activeStreams.get(StreamID); + void closeLocalSide(int streamId, boolean remote) { + StreamState state = activeStreams.get(streamId); if (state != null) { state.closeLocalSide(); if (state.isRemoteSideClosed()) { - activeStreams.remove(StreamID); + removeActiveStream(streamId, remote); } } } @@ -111,38 +132,38 @@ final class SpdySession { * no need to synchronize access to the StreamState */ - boolean hasReceivedReply(int streamID) { - StreamState state = activeStreams.get(streamID); + boolean hasReceivedReply(int streamId) { + StreamState state = activeStreams.get(streamId); return state != null && state.hasReceivedReply(); } - void receivedReply(int streamID) { - StreamState state = activeStreams.get(streamID); + void receivedReply(int streamId) { + StreamState state = activeStreams.get(streamId); if (state != null) { state.receivedReply(); } } - int getSendWindowSize(int streamID) { - StreamState state = activeStreams.get(streamID); + int getSendWindowSize(int streamId) { + StreamState state = activeStreams.get(streamId); return state != null ? state.getSendWindowSize() : -1; } - int updateSendWindowSize(int streamID, int deltaWindowSize) { - StreamState state = activeStreams.get(streamID); + int updateSendWindowSize(int streamId, int deltaWindowSize) { + StreamState state = activeStreams.get(streamId); return state != null ? state.updateSendWindowSize(deltaWindowSize) : -1; } - int updateReceiveWindowSize(int streamID, int deltaWindowSize) { - StreamState state = activeStreams.get(streamID); + int updateReceiveWindowSize(int streamId, int deltaWindowSize) { + StreamState state = activeStreams.get(streamId); if (deltaWindowSize > 0) { state.setReceiveWindowSizeLowerBound(0); } return state != null ? state.updateReceiveWindowSize(deltaWindowSize) : -1; } - int getReceiveWindowSizeLowerBound(int streamID) { - StreamState state = activeStreams.get(streamID); + int getReceiveWindowSizeLowerBound(int streamId) { + StreamState state = activeStreams.get(streamId); return state != null ? state.getReceiveWindowSizeLowerBound() : 0; } @@ -155,18 +176,18 @@ final class SpdySession { } } - boolean putPendingWrite(int streamID, MessageEvent evt) { - StreamState state = activeStreams.get(streamID); + boolean putPendingWrite(int streamId, MessageEvent evt) { + StreamState state = activeStreams.get(streamId); return state != null && state.putPendingWrite(evt); } - MessageEvent getPendingWrite(int streamID) { - StreamState state = activeStreams.get(streamID); + MessageEvent getPendingWrite(int streamId) { + StreamState state = activeStreams.get(streamId); return state != null ? state.getPendingWrite() : null; } - MessageEvent removePendingWrite(int streamID) { - StreamState state = activeStreams.get(streamID); + MessageEvent removePendingWrite(int streamId) { + StreamState state = activeStreams.get(streamId); return state != null ? state.removePendingWrite() : null; } diff --git a/src/main/java/org/jboss/netty/handler/codec/spdy/SpdySessionHandler.java b/src/main/java/org/jboss/netty/handler/codec/spdy/SpdySessionHandler.java index c7c38127ac..658b5f42cf 100644 --- a/src/main/java/org/jboss/netty/handler/codec/spdy/SpdySessionHandler.java +++ b/src/main/java/org/jboss/netty/handler/codec/spdy/SpdySessionHandler.java @@ -47,7 +47,6 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler private static final int DEFAULT_MAX_CONCURRENT_STREAMS = Integer.MAX_VALUE; private volatile int remoteConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS; private volatile int localConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS; - private volatile int maxConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS; private static final int DEFAULT_WINDOW_SIZE = 64 * 1024; // 64 KB default initial window size private volatile int initialSendWindowSize = DEFAULT_WINDOW_SIZE; @@ -133,7 +132,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler } // Check if we received a data frame before receiving a SYN_REPLY - if (!isRemoteInitiatedID(streamId) && !spdySession.hasReceivedReply(streamId)) { + if (!isRemoteInitiatedId(streamId) && !spdySession.hasReceivedReply(streamId)) { issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.PROTOCOL_ERROR); return; } @@ -206,7 +205,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler // Check if we received a valid SYN_STREAM frame if (spdySynStreamFrame.isInvalid() || - !isRemoteInitiatedID(streamId) || + !isRemoteInitiatedId(streamId) || spdySession.isActiveStream(streamId)) { issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.PROTOCOL_ERROR); return; @@ -241,7 +240,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler // Check if we received a valid SYN_REPLY frame if (spdySynReplyFrame.isInvalid() || - isRemoteInitiatedID(streamId) || + isRemoteInitiatedId(streamId) || spdySession.isRemoteSideClosed(streamId)) { issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.INVALID_STREAM); return; @@ -281,7 +280,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler int newConcurrentStreams = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS); if (newConcurrentStreams >= 0) { - updateConcurrentStreams(newConcurrentStreams, true); + remoteConcurrentStreams = newConcurrentStreams; } // Persistence flag are inconsistent with the use of SETTINGS to communicate @@ -313,7 +312,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg; - if (isRemoteInitiatedID(spdyPingFrame.getId())) { + if (isRemoteInitiatedId(spdyPingFrame.getId())) { Channels.write(ctx, Channels.future(e.getChannel()), spdyPingFrame, e.getRemoteAddress()); return; } @@ -517,7 +516,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg; int streamId = spdySynStreamFrame.getStreamId(); - if (isRemoteInitiatedID(streamId)) { + if (isRemoteInitiatedId(streamId)) { e.getFuture().setFailure(PROTOCOL_EXCEPTION); return; } @@ -536,7 +535,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler int streamId = spdySynReplyFrame.getStreamId(); // Frames must not be sent on half-closed streams - if (!isRemoteInitiatedID(streamId) || spdySession.isLocalSideClosed(streamId)) { + if (!isRemoteInitiatedId(streamId) || spdySession.isLocalSideClosed(streamId)) { e.getFuture().setFailure(PROTOCOL_EXCEPTION); return; } @@ -558,7 +557,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler int newConcurrentStreams = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS); if (newConcurrentStreams >= 0) { - updateConcurrentStreams(newConcurrentStreams, false); + localConcurrentStreams = newConcurrentStreams; } // Persistence flag are inconsistent with the use of SETTINGS to communicate @@ -580,7 +579,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler } else if (msg instanceof SpdyPingFrame) { SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg; - if (isRemoteInitiatedID(spdyPingFrame.getId())) { + if (isRemoteInitiatedId(spdyPingFrame.getId())) { e.getFuture().setFailure(new IllegalArgumentException( "invalid PING ID: " + spdyPingFrame.getId())); return; @@ -665,18 +664,9 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler * Helper functions */ - private boolean isRemoteInitiatedID(int id) { - boolean serverID = isServerId(id); - return server && !serverID || !server && serverID; - } - - private void updateConcurrentStreams(int newConcurrentStreams, boolean remote) { - if (remote) { - remoteConcurrentStreams = newConcurrentStreams; - } else { - localConcurrentStreams = newConcurrentStreams; - } - maxConcurrentStreams = Math.min(localConcurrentStreams, remoteConcurrentStreams); + private boolean isRemoteInitiatedId(int id) { + boolean serverId = isServerId(id); + return server && !serverId || !server && serverId; } // need to synchronize to prevent new streams from being created while updating active streams @@ -703,14 +693,15 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler return false; } - int maxConcurrentStreams = this.maxConcurrentStreams; // read volatile once - if (spdySession.numActiveStreams() >= maxConcurrentStreams) { + boolean remote = isRemoteInitiatedId(streamId); + int maxConcurrentStreams = remote ? localConcurrentStreams : remoteConcurrentStreams; + if (spdySession.numActiveStreams(remote) >= maxConcurrentStreams) { return false; } spdySession.acceptStream( streamId, priority, remoteSideClosed, localSideClosed, - initialSendWindowSize, initialReceiveWindowSize); - if (isRemoteInitiatedID(streamId)) { + initialSendWindowSize, initialReceiveWindowSize, remote); + if (remote) { lastGoodStreamId = streamId; } return true; @@ -718,9 +709,9 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler private void halfCloseStream(int streamId, boolean remote, ChannelFuture future) { if (remote) { - spdySession.closeRemoteSide(streamId); + spdySession.closeRemoteSide(streamId, isRemoteInitiatedId(streamId)); } else { - spdySession.closeLocalSide(streamId); + spdySession.closeLocalSide(streamId, isRemoteInitiatedId(streamId)); } if (closeSessionFutureListener != null && spdySession.noActiveStreams()) { future.addListener(closeSessionFutureListener); @@ -728,7 +719,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler } private void removeStream(int streamId, ChannelFuture future) { - spdySession.removeStream(streamId); + spdySession.removeStream(streamId, isRemoteInitiatedId(streamId)); if (closeSessionFutureListener != null && spdySession.noActiveStreams()) { future.addListener(closeSessionFutureListener); } diff --git a/src/test/java/org/jboss/netty/handler/codec/spdy/SpdySessionHandlerTest.java b/src/test/java/org/jboss/netty/handler/codec/spdy/SpdySessionHandlerTest.java index 7817e70924..5d26e216b1 100644 --- a/src/test/java/org/jboss/netty/handler/codec/spdy/SpdySessionHandlerTest.java +++ b/src/test/java/org/jboss/netty/handler/codec/spdy/SpdySessionHandlerTest.java @@ -100,9 +100,6 @@ public class SpdySessionHandlerTest { int localStreamId = server ? 1 : 2; int remoteStreamId = server ? 2 : 1; - SpdyPingFrame localPingFrame = new DefaultSpdyPingFrame(localStreamId); - SpdyPingFrame remotePingFrame = new DefaultSpdyPingFrame(remoteStreamId); - SpdySynStreamFrame spdySynStreamFrame = new DefaultSpdySynStreamFrame(localStreamId, 0, (byte) 0); spdySynStreamFrame.setHeader("Compression", "test"); @@ -122,24 +119,11 @@ public class SpdySessionHandlerTest { assertNull(sessionHandler.peek()); remoteStreamId += 2; - // Check if session handler correctly limits the number of - // concurrent streams in the SETTINGS frame - SpdySettingsFrame spdySettingsFrame = new DefaultSpdySettingsFrame(); - spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 0); - sessionHandler.offer(spdySettingsFrame); - assertNull(sessionHandler.peek()); - sessionHandler.offer(spdySynStreamFrame); - assertRstStream(sessionHandler.poll(), localStreamId, SpdyStreamStatus.REFUSED_STREAM); - assertNull(sessionHandler.peek()); - spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 100); - sessionHandler.offer(spdySettingsFrame); - assertNull(sessionHandler.peek()); - sessionHandler.offer(new DefaultSpdySynReplyFrame(remoteStreamId)); - assertNull(sessionHandler.peek()); - // Check if session handler returns PROTOCOL_ERROR if it receives // multiple SYN_REPLY frames for the same active Stream-ID sessionHandler.offer(new DefaultSpdySynReplyFrame(remoteStreamId)); + assertNull(sessionHandler.peek()); + sessionHandler.offer(new DefaultSpdySynReplyFrame(remoteStreamId)); assertRstStream(sessionHandler.poll(), remoteStreamId, SpdyStreamStatus.STREAM_IN_USE); assertNull(sessionHandler.peek()); remoteStreamId += 2; @@ -166,6 +150,16 @@ public class SpdySessionHandlerTest { assertRstStream(sessionHandler.poll(), localStreamId, SpdyStreamStatus.REFUSED_STREAM); assertNull(sessionHandler.peek()); + // Check if session handler rejects HEADERS for closed streams + int testStreamId = spdyDataFrame.getStreamId(); + sessionHandler.offer(spdyDataFrame); + assertDataFrame(sessionHandler.poll(), testStreamId, spdyDataFrame.isLast()); + assertNull(sessionHandler.peek()); + spdyHeadersFrame.setStreamId(testStreamId); + sessionHandler.offer(spdyHeadersFrame); + assertRstStream(sessionHandler.poll(), testStreamId, SpdyStreamStatus.INVALID_STREAM); + assertNull(sessionHandler.peek()); + // Check if session handler drops active streams if it receives // a RST_STREAM frame for that Stream-ID sessionHandler.offer(new DefaultSpdyRstStreamFrame(remoteStreamId, 3)); @@ -193,31 +187,6 @@ public class SpdySessionHandlerTest { assertNull(sessionHandler.peek()); spdySynStreamFrame.setStreamId(localStreamId); - // Check if session handler correctly handles updates to the max - // concurrent streams in the SETTINGS frame - spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 2); - sessionHandler.offer(spdySettingsFrame); - assertNull(sessionHandler.peek()); - sessionHandler.offer(spdySynStreamFrame); - assertRstStream(sessionHandler.poll(), localStreamId, SpdyStreamStatus.REFUSED_STREAM); - assertNull(sessionHandler.peek()); - spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 4); - sessionHandler.offer(spdySettingsFrame); - assertNull(sessionHandler.peek()); - sessionHandler.offer(spdySynStreamFrame); - assertSynReply(sessionHandler.poll(), localStreamId, false, spdySynStreamFrame); - assertNull(sessionHandler.peek()); - - // Check if session handler rejects HEADERS for closed streams - int testStreamId = spdyDataFrame.getStreamId(); - sessionHandler.offer(spdyDataFrame); - assertDataFrame(sessionHandler.poll(), testStreamId, spdyDataFrame.isLast()); - assertNull(sessionHandler.peek()); - spdyHeadersFrame.setStreamId(testStreamId); - sessionHandler.offer(spdyHeadersFrame); - assertRstStream(sessionHandler.poll(), testStreamId, SpdyStreamStatus.INVALID_STREAM); - assertNull(sessionHandler.peek()); - // Check if session handler returns PROTOCOL_ERROR if it receives // an invalid HEADERS frame spdyHeadersFrame.setStreamId(localStreamId); @@ -226,6 +195,63 @@ public class SpdySessionHandlerTest { assertRstStream(sessionHandler.poll(), localStreamId, SpdyStreamStatus.PROTOCOL_ERROR); assertNull(sessionHandler.peek()); + sessionHandler.finish(); + } + + @Test + public void testSpdyClientSessionHandler() { + for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) { + testSpdySessionHandler(version, false); + } + } + + @Test + public void testSpdyClientSessionHandlerPing() { + for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) { + testSpdySessionHandlerPing(version, false); + } + } + + @Test + public void testSpdyClientSessionHandlerGoAway() { + for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) { + testSpdySessionHandlerGoAway(version, false); + } + } + + @Test + public void testSpdyServerSessionHandler() { + for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) { + testSpdySessionHandler(version, true); + } + } + + @Test + public void testSpdyServerSessionHandlerPing() { + for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) { + testSpdySessionHandlerPing(version, true); + } + } + + @Test + public void testSpdyServerSessionHandlerGoAway() { + for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) { + testSpdySessionHandlerGoAway(version, true); + } + } + + private static void testSpdySessionHandlerPing(int version, boolean server) { + DecoderEmbedder sessionHandler = + new DecoderEmbedder( + new SpdySessionHandler(version, server), new EchoHandler(closeSignal, server)); + sessionHandler.pollAll(); + + int localStreamId = server ? 1 : 2; + int remoteStreamId = server ? 2 : 1; + + SpdyPingFrame localPingFrame = new DefaultSpdyPingFrame(localStreamId); + SpdyPingFrame remotePingFrame = new DefaultSpdyPingFrame(remoteStreamId); + // Check if session handler returns identical local PINGs sessionHandler.offer(localPingFrame); assertPing(sessionHandler.poll(), localPingFrame.getId()); @@ -235,6 +261,32 @@ public class SpdySessionHandlerTest { sessionHandler.offer(remotePingFrame); assertNull(sessionHandler.peek()); + sessionHandler.finish(); + } + + private static void testSpdySessionHandlerGoAway(int version, boolean server) { + DecoderEmbedder sessionHandler = + new DecoderEmbedder( + new SpdySessionHandler(version, server), new EchoHandler(closeSignal, server)); + sessionHandler.pollAll(); + + int localStreamId = server ? 1 : 2; + int remoteStreamId = server ? 2 : 1; + + SpdySynStreamFrame spdySynStreamFrame = new DefaultSpdySynStreamFrame(localStreamId, 0, (byte) 0); + spdySynStreamFrame.setHeader("Compression", "test"); + + SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(localStreamId); + spdyDataFrame.setLast(true); + + // Send an initial request + sessionHandler.offer(spdySynStreamFrame); + assertSynReply(sessionHandler.poll(), localStreamId, false, spdySynStreamFrame); + assertNull(sessionHandler.peek()); + sessionHandler.offer(spdyDataFrame); + assertDataFrame(sessionHandler.poll(), localStreamId, true); + assertNull(sessionHandler.peek()); + // Check if session handler sends a GOAWAY frame when closing sessionHandler.offer(closeMessage); assertGoAway(sessionHandler.poll(), localStreamId); @@ -257,20 +309,6 @@ public class SpdySessionHandlerTest { sessionHandler.finish(); } - @Test - public void testSpdyClientSessionHandler() { - for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) { - testSpdySessionHandler(version, false); - } - } - - @Test - public void testSpdyServerSessionHandler() { - for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) { - testSpdySessionHandler(version, true); - } - } - // Echo Handler opens 4 half-closed streams on session connection // and then sets the number of concurrent streams to 3 private static class EchoHandler extends SimpleChannelUpstreamHandler { @@ -297,9 +335,9 @@ public class SpdySessionHandlerTest { spdySynStreamFrame.setStreamId(spdySynStreamFrame.getStreamId() + 2); Channels.write(e.getChannel(), spdySynStreamFrame); - // Limit the number of concurrent streams to 3 + // Limit the number of concurrent streams to 1 SpdySettingsFrame spdySettingsFrame = new DefaultSpdySettingsFrame(); - spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 3); + spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 1); Channels.write(e.getChannel(), spdySettingsFrame); }