From 321990a11518d04cfd49542c31cde38e520c7bdc Mon Sep 17 00:00:00 2001 From: Jeff Pinner Date: Fri, 27 Sep 2013 10:48:16 -0700 Subject: [PATCH] SPDY: add SPDY/3.1 support - with Michael Schore --- .../handler/codec/spdy/SpdyCodecUtil.java | 2 + .../handler/codec/spdy/SpdyConstants.java | 24 --- .../handler/codec/spdy/SpdyFrameCodec.java | 4 +- .../handler/codec/spdy/SpdyFrameDecoder.java | 13 +- .../handler/codec/spdy/SpdyFrameEncoder.java | 13 +- .../codec/spdy/SpdyHeaderBlockDecoder.java | 2 +- .../codec/spdy/SpdyHeaderBlockEncoder.java | 2 +- .../spdy/SpdyHeaderBlockJZlibEncoder.java | 4 +- .../codec/spdy/SpdyHeaderBlockRawDecoder.java | 11 +- .../codec/spdy/SpdyHeaderBlockRawEncoder.java | 9 +- .../spdy/SpdyHeaderBlockZlibDecoder.java | 4 +- .../spdy/SpdyHeaderBlockZlibEncoder.java | 4 +- .../handler/codec/spdy/SpdyHttpCodec.java | 2 +- .../handler/codec/spdy/SpdyHttpDecoder.java | 11 +- .../handler/codec/spdy/SpdyHttpEncoder.java | 9 +- .../handler/codec/spdy/SpdyOrHttpChooser.java | 10 +- .../netty/handler/codec/spdy/SpdySession.java | 82 ++++++++- .../codec/spdy/SpdySessionHandler.java | 163 ++++++++++------ .../netty/handler/codec/spdy/SpdyVersion.java | 55 ++++++ .../codec/spdy/SpdyFrameDecoderTest.java | 94 +++++----- .../codec/spdy/SpdySessionHandlerTest.java | 174 ++++++++++++------ .../transport/socket/SocketSpdyEchoTest.java | 32 +++- 22 files changed, 472 insertions(+), 252 deletions(-) delete mode 100644 codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyConstants.java create mode 100644 codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyVersion.java diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyCodecUtil.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyCodecUtil.java index 1f0778157a..4dedeff9b6 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyCodecUtil.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyCodecUtil.java @@ -20,6 +20,8 @@ import io.netty.util.CharsetUtil; final class SpdyCodecUtil { + static final int SPDY_SESSION_STREAM_ID = 0; + static final int SPDY_HEADER_TYPE_OFFSET = 2; static final int SPDY_HEADER_FLAGS_OFFSET = 4; static final int SPDY_HEADER_LENGTH_OFFSET = 5; diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyConstants.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyConstants.java deleted file mode 100644 index f8ccd04a4d..0000000000 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyConstants.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.handler.codec.spdy; - -public final class SpdyConstants { - - public static final int SPDY_MIN_VERSION = 2; - public static final int SPDY_MAX_VERSION = 3; - - private SpdyConstants() { } -} diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameCodec.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameCodec.java index 7e594b27d4..e0b40bd9a4 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameCodec.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameCodec.java @@ -28,7 +28,7 @@ public final class SpdyFrameCodec extends CombinedChannelDuplexHandler SpdyConstants.SPDY_MAX_VERSION) { - throw new IllegalArgumentException( - "unsupported version: " + version); + SpdyVersion version, int maxChunkSize, SpdyHeaderBlockDecoder headerBlockDecoder) { + if (version == null) { + throw new NullPointerException("version"); } if (maxChunkSize <= 0) { throw new IllegalArgumentException( "maxChunkSize must be a positive integer: " + maxChunkSize); } - spdyVersion = version; + spdyVersion = version.getVersion(); this.maxChunkSize = maxChunkSize; this.headerBlockDecoder = headerBlockDecoder; state = State.READ_COMMON_HEADER; diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java index 83ebb72964..f397f81c54 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java @@ -39,24 +39,23 @@ public class SpdyFrameEncoder extends MessageToByteEncoder { * default {@code compressionLevel (6)}, {@code windowBits (15)}, * and {@code memLevel (8)}. */ - public SpdyFrameEncoder(int version) { + public SpdyFrameEncoder(SpdyVersion version) { this(version, 6, 15, 8); } /** * Creates a new instance with the specified parameters. */ - public SpdyFrameEncoder(int version, int compressionLevel, int windowBits, int memLevel) { + public SpdyFrameEncoder(SpdyVersion version, int compressionLevel, int windowBits, int memLevel) { this(version, SpdyHeaderBlockEncoder.newInstance( version, compressionLevel, windowBits, memLevel)); } - protected SpdyFrameEncoder(int version, SpdyHeaderBlockEncoder headerBlockEncoder) { - if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) { - throw new IllegalArgumentException( - "unknown version: " + version); + protected SpdyFrameEncoder(SpdyVersion version, SpdyHeaderBlockEncoder headerBlockEncoder) { + if (version == null) { + throw new NullPointerException("version"); } - this.version = version; + this.version = version.getVersion(); this.headerBlockEncoder = headerBlockEncoder; } diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockDecoder.java index 7e4573d333..731e7f2b6d 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockDecoder.java @@ -19,7 +19,7 @@ import io.netty.buffer.ByteBuf; abstract class SpdyHeaderBlockDecoder { - static SpdyHeaderBlockDecoder newInstance(int version, int maxHeaderSize) { + static SpdyHeaderBlockDecoder newInstance(SpdyVersion version, int maxHeaderSize) { return new SpdyHeaderBlockZlibDecoder(version, maxHeaderSize); } diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockEncoder.java index 56afcb8d2f..046856683f 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockEncoder.java @@ -22,7 +22,7 @@ import io.netty.util.internal.PlatformDependent; abstract class SpdyHeaderBlockEncoder { static SpdyHeaderBlockEncoder newInstance( - int version, int compressionLevel, int windowBits, int memLevel) { + SpdyVersion version, int compressionLevel, int windowBits, int memLevel) { if (PlatformDependent.javaVersion() >= 7) { return new SpdyHeaderBlockZlibEncoder( diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockJZlibEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockJZlibEncoder.java index 1e66a18fca..e391c72ab4 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockJZlibEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockJZlibEncoder.java @@ -31,7 +31,7 @@ class SpdyHeaderBlockJZlibEncoder extends SpdyHeaderBlockRawEncoder { private boolean finished; public SpdyHeaderBlockJZlibEncoder( - int version, int compressionLevel, int windowBits, int memLevel) { + SpdyVersion version, int compressionLevel, int windowBits, int memLevel) { super(version); if (compressionLevel < 0 || compressionLevel > 9) { throw new IllegalArgumentException( @@ -52,7 +52,7 @@ class SpdyHeaderBlockJZlibEncoder extends SpdyHeaderBlockRawEncoder { throw new CompressionException( "failed to initialize an SPDY header block deflater: " + resultCode); } else { - if (version < 3) { + if (version.getVersion() < 3) { resultCode = z.deflateSetDictionary(SPDY2_DICT, SPDY2_DICT.length); } else { resultCode = z.deflateSetDictionary(SPDY_DICT, SPDY_DICT.length); diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockRawDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockRawDecoder.java index c3c725cb6e..60a823026f 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockRawDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockRawDecoder.java @@ -29,15 +29,14 @@ public class SpdyHeaderBlockRawDecoder extends SpdyHeaderBlockDecoder { private int headerSize; private int numHeaders; - public SpdyHeaderBlockRawDecoder(int version, int maxHeaderSize) { - if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) { - throw new IllegalArgumentException( - "unsupported version: " + version); + public SpdyHeaderBlockRawDecoder(SpdyVersion version, int maxHeaderSize) { + if (version == null) { + throw new NullPointerException("version"); } - this.version = version; + this.version = version.getVersion(); this.maxHeaderSize = maxHeaderSize; - lengthFieldSize = version < 3 ? 2 : 4; + lengthFieldSize = this.version < 3 ? 2 : 4; reset(); } diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockRawEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockRawEncoder.java index 512de8de4f..44a6dd5da1 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockRawEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockRawEncoder.java @@ -27,12 +27,11 @@ public class SpdyHeaderBlockRawEncoder extends SpdyHeaderBlockEncoder { private final int version; - public SpdyHeaderBlockRawEncoder(int version) { - if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) { - throw new IllegalArgumentException( - "unknown version: " + version); + public SpdyHeaderBlockRawEncoder(SpdyVersion version) { + if (version == null) { + throw new NullPointerException("version"); } - this.version = version; + this.version = version.getVersion(); } private void setLengthField(ByteBuf buffer, int writerIndex, int length) { diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockZlibDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockZlibDecoder.java index ac5402fdc5..2e2fa34bb9 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockZlibDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockZlibDecoder.java @@ -31,9 +31,9 @@ class SpdyHeaderBlockZlibDecoder extends SpdyHeaderBlockRawDecoder { private ByteBuf decompressed; - public SpdyHeaderBlockZlibDecoder(int version, int maxHeaderSize) { + public SpdyHeaderBlockZlibDecoder(SpdyVersion version, int maxHeaderSize) { super(version, maxHeaderSize); - this.version = version; + this.version = version.getVersion(); } @Override diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockZlibEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockZlibEncoder.java index 320167dd49..56f52f6ea5 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockZlibEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockZlibEncoder.java @@ -30,14 +30,14 @@ class SpdyHeaderBlockZlibEncoder extends SpdyHeaderBlockRawEncoder { private boolean finished; - public SpdyHeaderBlockZlibEncoder(int version, int compressionLevel) { + public SpdyHeaderBlockZlibEncoder(SpdyVersion version, int compressionLevel) { super(version); if (compressionLevel < 0 || compressionLevel > 9) { throw new IllegalArgumentException( "compressionLevel: " + compressionLevel + " (expected: 0-9)"); } compressor = new Deflater(compressionLevel); - if (version < 3) { + if (version.getVersion() < 3) { compressor.setDictionary(SPDY2_DICT); } else { compressor.setDictionary(SPDY_DICT); diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpCodec.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpCodec.java index 86bfea22e7..efb2dd426d 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpCodec.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpCodec.java @@ -25,7 +25,7 @@ public final class SpdyHttpCodec /** * Creates a new instance with the specified decoder options. */ - public SpdyHttpCodec(int version, int maxContentLength) { + public SpdyHttpCodec(SpdyVersion version, int maxContentLength) { super(new SpdyHttpDecoder(version, maxContentLength), new SpdyHttpEncoder(version)); } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpDecoder.java index 7041cfda79..2ff061b1d5 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpDecoder.java @@ -51,7 +51,7 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder { * If the length of the message content exceeds this value, * a {@link TooLongFrameException} will be raised. */ - public SpdyHttpDecoder(int version, int maxContentLength) { + public SpdyHttpDecoder(SpdyVersion version, int maxContentLength) { this(version, maxContentLength, new HashMap()); } @@ -64,16 +64,15 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder { * a {@link TooLongFrameException} will be raised. * @param messageMap the {@link Map} used to hold partially received messages. */ - protected SpdyHttpDecoder(int version, int maxContentLength, Map messageMap) { - if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) { - throw new IllegalArgumentException( - "unsupported version: " + version); + protected SpdyHttpDecoder(SpdyVersion version, int maxContentLength, Map messageMap) { + if (version == null) { + throw new NullPointerException("version"); } if (maxContentLength <= 0) { throw new IllegalArgumentException( "maxContentLength must be a positive integer: " + maxContentLength); } - spdyVersion = version; + spdyVersion = version.getVersion(); this.maxContentLength = maxContentLength; this.messageMap = messageMap; } diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpEncoder.java index a8e3eefd26..50ab4ca1ab 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpEncoder.java @@ -130,12 +130,11 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder { * * @param version the protocol version */ - public SpdyHttpEncoder(int version) { - if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) { - throw new IllegalArgumentException( - "unsupported version: " + version); + public SpdyHttpEncoder(SpdyVersion version) { + if (version == null) { + throw new NullPointerException("version"); } - spdyVersion = version; + spdyVersion = version.getVersion(); } @Override diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyOrHttpChooser.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyOrHttpChooser.java index c14a0d669b..d80ce89c1b 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyOrHttpChooser.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyOrHttpChooser.java @@ -41,6 +41,7 @@ public abstract class SpdyOrHttpChooser extends ByteToMessageDecoder { public enum SelectedProtocol { SPDY_2, SPDY_3, + SPDY_3_1, HTTP_1_1, HTTP_1_0, UNKNOWN @@ -84,10 +85,13 @@ public abstract class SpdyOrHttpChooser extends ByteToMessageDecoder { // Not done with choosing the protocol, so just return here for now, return false; case SPDY_2: - addSpdyHandlers(ctx, 2); + addSpdyHandlers(ctx, SpdyVersion.SPDY_2); break; case SPDY_3: - addSpdyHandlers(ctx, 3); + addSpdyHandlers(ctx, SpdyVersion.SPDY_3); + break; + case SPDY_3_1: + addSpdyHandlers(ctx, SpdyVersion.SPDY_3_1); break; case HTTP_1_0: case HTTP_1_1: @@ -102,7 +106,7 @@ public abstract class SpdyOrHttpChooser extends ByteToMessageDecoder { /** * Add all {@link ChannelHandler}'s that are needed for SPDY with the given version. */ - protected void addSpdyHandlers(ChannelHandlerContext ctx, int version) { + protected void addSpdyHandlers(ChannelHandlerContext ctx, SpdyVersion version) { ChannelPipeline pipeline = ctx.pipeline(); pipeline.addLast("spdyDecoder", new SpdyFrameDecoder(version)); pipeline.addLast("spdyEncoder", new SpdyFrameEncoder(version)); diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySession.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySession.java index 2956ac76ae..0cd1ec6852 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySession.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySession.java @@ -26,12 +26,28 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; +import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_SESSION_STREAM_ID; + final class SpdySession { + private final AtomicInteger activeLocalStreams = new AtomicInteger(); + private final AtomicInteger activeRemoteStreams = new AtomicInteger(); private final Map activeStreams = PlatformDependent.newConcurrentHashMap(); - int numActiveStreams() { - return activeStreams.size(); + private final AtomicInteger sendWindowSize; + private final AtomicInteger receiveWindowSize; + + public SpdySession(int sendWindowSize, int receiveWindowSize) { + this.sendWindowSize = new AtomicInteger(sendWindowSize); + this.receiveWindowSize = new AtomicInteger(receiveWindowSize); + } + + int numActiveStreams(boolean remote) { + if (remote) { + return activeRemoteStreams.get(); + } else { + return activeLocalStreams.get(); + } } boolean noActiveStreams() { @@ -51,15 +67,34 @@ final class SpdySession { void acceptStream( int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed, - int sendWindowSize, int receiveWindowSize) { + int sendWindowSize, int receiveWindowSize, boolean remote) { if (!remoteSideClosed || !localSideClosed) { - activeStreams.put(streamId, new StreamState( + StreamState state = activeStreams.put(streamId, new StreamState( priority, remoteSideClosed, localSideClosed, sendWindowSize, receiveWindowSize)); + if (state == null) { + if (remote) { + activeRemoteStreams.incrementAndGet(); + } else { + activeLocalStreams.incrementAndGet(); + } + } } } - void removeStream(int streamId, Throwable cause) { + private StreamState removeActiveStream(int streamId, boolean remote) { StreamState state = activeStreams.remove(streamId); + if (state != null) { + if (remote) { + activeRemoteStreams.decrementAndGet(); + } else { + activeLocalStreams.decrementAndGet(); + } + } + return state; + } + + void removeStream(int streamId, Throwable cause, boolean remote) { + StreamState state = removeActiveStream(streamId, remote); if (state != null) { state.clearPendingWrites(cause); } @@ -70,12 +105,12 @@ final class SpdySession { return state == null || state.isRemoteSideClosed(); } - void closeRemoteSide(int streamId) { + void closeRemoteSide(int streamId, boolean remote) { StreamState state = activeStreams.get(streamId); if (state != null) { state.closeRemoteSide(); if (state.isLocalSideClosed()) { - activeStreams.remove(streamId); + removeActiveStream(streamId, remote); } } } @@ -85,12 +120,12 @@ final class SpdySession { return state == null || state.isLocalSideClosed(); } - void closeLocalSide(int streamId) { + void closeLocalSide(int streamId, boolean remote) { StreamState state = activeStreams.get(streamId); if (state != null) { state.closeLocalSide(); if (state.isRemoteSideClosed()) { - activeStreams.remove(streamId); + removeActiveStream(streamId, remote); } } } @@ -112,16 +147,28 @@ final class SpdySession { } int getSendWindowSize(int streamId) { + if (streamId == SPDY_SESSION_STREAM_ID) { + return sendWindowSize.get(); + } + StreamState state = activeStreams.get(streamId); return state != null ? state.getSendWindowSize() : -1; } int updateSendWindowSize(int streamId, int deltaWindowSize) { + if (streamId == SPDY_SESSION_STREAM_ID) { + return sendWindowSize.addAndGet(deltaWindowSize); + } + StreamState state = activeStreams.get(streamId); return state != null ? state.updateSendWindowSize(deltaWindowSize) : -1; } int updateReceiveWindowSize(int streamId, int deltaWindowSize) { + if (streamId == SPDY_SESSION_STREAM_ID) { + return receiveWindowSize.addAndGet(deltaWindowSize); + } + StreamState state = activeStreams.get(streamId); if (deltaWindowSize > 0) { state.setReceiveWindowSizeLowerBound(0); @@ -130,6 +177,10 @@ final class SpdySession { } int getReceiveWindowSizeLowerBound(int streamId) { + if (streamId == SPDY_SESSION_STREAM_ID) { + return 0; + } + StreamState state = activeStreams.get(streamId); return state != null ? state.getReceiveWindowSizeLowerBound() : 0; } @@ -155,6 +206,19 @@ final class SpdySession { } PendingWrite getPendingWrite(int streamId) { + if (streamId == SPDY_SESSION_STREAM_ID) { + for (Integer id : getActiveStreams()) { + StreamState state = activeStreams.get(id); + if (state.getSendWindowSize() > 0) { + PendingWrite pendingWrite = state.getPendingWrite(); + if (pendingWrite != null) { + return pendingWrite; + } + } + } + return null; + } + StreamState state = activeStreams.get(streamId); return state != null ? state.getPendingWrite() : null; } diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java index 866af61d50..53c248a852 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java @@ -24,6 +24,8 @@ import io.netty.util.internal.EmptyArrays; import java.util.concurrent.atomic.AtomicInteger; +import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_SESSION_STREAM_ID; + /** * Manages streams within a SPDY session. */ @@ -38,17 +40,16 @@ public class SpdySessionHandler STREAM_CLOSED.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE); } - private final SpdySession spdySession = new SpdySession(); + private static final int DEFAULT_WINDOW_SIZE = 64 * 1024; // 64 KB default initial window size + private int initialSendWindowSize = DEFAULT_WINDOW_SIZE; + private int initialReceiveWindowSize = DEFAULT_WINDOW_SIZE; + + private final SpdySession spdySession = new SpdySession(initialSendWindowSize, initialReceiveWindowSize); private int lastGoodStreamId; private static final int DEFAULT_MAX_CONCURRENT_STREAMS = Integer.MAX_VALUE; private int remoteConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS; private int localConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS; - private int maxConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS; - - private static final int DEFAULT_WINDOW_SIZE = 64 * 1024; // 64 KB default initial window size - private int initialSendWindowSize = DEFAULT_WINDOW_SIZE; - private int initialReceiveWindowSize = DEFAULT_WINDOW_SIZE; private final Object flowControlLock = new Object(); @@ -61,6 +62,7 @@ public class SpdySessionHandler private final boolean server; private final boolean flowControl; + private final boolean sessionFlowControl; /** * Creates a new session handler. @@ -71,13 +73,13 @@ public class SpdySessionHandler * {@code false} if and only if this session handler should * handle the client endpoint of the connection. */ - public SpdySessionHandler(int version, boolean server) { - if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) { - throw new IllegalArgumentException( - "unsupported version: " + version); + public SpdySessionHandler(SpdyVersion version, boolean server) { + if (version == null) { + throw new NullPointerException("version"); } this.server = server; - flowControl = version >= 3; + flowControl = version.useFlowControl(); + sessionFlowControl = version.useSessionFlowControl(); } @Override @@ -109,6 +111,27 @@ public class SpdySessionHandler SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg; int streamId = spdyDataFrame.getStreamId(); + if (sessionFlowControl) { + int deltaWindowSize = -1 * spdyDataFrame.content().readableBytes(); + int newSessionWindowSize = + spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, deltaWindowSize); + + // Check if session window size is reduced beyond allowable lower bound + if (newSessionWindowSize < 0) { + issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR); + return; + } + + // Send a WINDOW_UPDATE frame if less than half the session window size remains + if (newSessionWindowSize <= initialReceiveWindowSize / 2) { + deltaWindowSize = initialReceiveWindowSize - newSessionWindowSize; + spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, deltaWindowSize); + SpdyWindowUpdateFrame spdyWindowUpdateFrame = + new DefaultSpdyWindowUpdateFrame(SPDY_SESSION_STREAM_ID, deltaWindowSize); + ctx.writeAndFlush(spdyWindowUpdateFrame); + } + } + // Check if we received a data frame for a Stream-ID which is not open if (!spdySession.isActiveStream(streamId)) { @@ -130,17 +153,17 @@ public class SpdySessionHandler } // Check if we received a data frame before receiving a SYN_REPLY - if (!isRemoteInitiatedID(streamId) && !spdySession.hasReceivedReply(streamId)) { + if (!isRemoteInitiatedId(streamId) && !spdySession.hasReceivedReply(streamId)) { spdyDataFrame.release(); issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR); return; } /* - * SPDY Data frame flow control processing requirements: - * - * Recipient should not send a WINDOW_UPDATE frame as it consumes the last data frame. - */ + * SPDY Data frame flow control processing requirements: + * + * Recipient should not send a WINDOW_UPDATE frame as it consumes the last data frame. + */ if (flowControl) { // Update receive window size @@ -168,7 +191,7 @@ public class SpdySessionHandler } } - // Send a WINDOW_UPDATE frame if less than half the window size remains + // Send a WINDOW_UPDATE frame if less than half the stream window size remains if (newWindowSize <= initialReceiveWindowSize / 2 && !spdyDataFrame.isLast()) { deltaWindowSize = initialReceiveWindowSize - newWindowSize; spdySession.updateReceiveWindowSize(streamId, deltaWindowSize); @@ -204,7 +227,7 @@ public class SpdySessionHandler // Check if we received a valid SYN_STREAM frame if (spdySynStreamFrame.isInvalid() || - !isRemoteInitiatedID(streamId) || + !isRemoteInitiatedId(streamId) || spdySession.isActiveStream(streamId)) { issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR); return; @@ -239,7 +262,7 @@ public class SpdySessionHandler // Check if we received a valid SYN_REPLY frame if (spdySynReplyFrame.isInvalid() || - isRemoteInitiatedID(streamId) || + isRemoteInitiatedId(streamId) || spdySession.isRemoteSideClosed(streamId)) { issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM); return; @@ -279,7 +302,7 @@ public class SpdySessionHandler int newConcurrentStreams = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS); if (newConcurrentStreams >= 0) { - updateConcurrentStreams(newConcurrentStreams, true); + remoteConcurrentStreams = newConcurrentStreams; } // Persistence flag are inconsistent with the use of SETTINGS to communicate @@ -311,7 +334,7 @@ public class SpdySessionHandler SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg; - if (isRemoteInitiatedID(spdyPingFrame.getId())) { + if (isRemoteInitiatedId(spdyPingFrame.getId())) { ctx.writeAndFlush(spdyPingFrame); return; } @@ -365,13 +388,17 @@ public class SpdySessionHandler int deltaWindowSize = spdyWindowUpdateFrame.getDeltaWindowSize(); // Ignore frames for half-closed streams - if (spdySession.isLocalSideClosed(streamId)) { + if (streamId != SPDY_SESSION_STREAM_ID && spdySession.isLocalSideClosed(streamId)) { return; } // Check for numerical overflow if (spdySession.getSendWindowSize(streamId) > Integer.MAX_VALUE - deltaWindowSize) { - issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR); + if (streamId == SPDY_SESSION_STREAM_ID) { + issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR); + } else { + issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR); + } return; } @@ -426,7 +453,7 @@ public class SpdySessionHandler if (msg instanceof SpdyDataFrame) { SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg; - final int streamId = spdyDataFrame.getStreamId(); + int streamId = spdyDataFrame.getStreamId(); // Frames must not be sent on half-closed streams if (spdySession.isLocalSideClosed(streamId)) { @@ -453,6 +480,11 @@ public class SpdySessionHandler int dataLength = spdyDataFrame.content().readableBytes(); int sendWindowSize = spdySession.getSendWindowSize(streamId); + if (sessionFlowControl) { + int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID); + sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize); + } + if (sendWindowSize <= 0) { // Stream is stalled -- enqueue Data frame and return spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise)); @@ -460,6 +492,9 @@ public class SpdySessionHandler } else if (sendWindowSize < dataLength) { // Stream is not stalled but we cannot send the entire frame spdySession.updateSendWindowSize(streamId, -1 * sendWindowSize); + if (sessionFlowControl) { + spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize); + } // Create a partial data frame whose length is the current window size SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId, @@ -469,13 +504,13 @@ public class SpdySessionHandler spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise)); // The transfer window size is pre-decremented when sending a data frame downstream. - // Close the stream on write failures that leave the transfer window in a corrupt state. + // Close the session on write failures that leave the transfer window in a corrupt state. final ChannelHandlerContext context = ctx; ctx.write(partialDataFrame).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { - issueStreamError(context, streamId, SpdyStreamStatus.INTERNAL_ERROR); + issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR); } } }); @@ -483,15 +518,18 @@ public class SpdySessionHandler } else { // Window size is large enough to send entire data frame spdySession.updateSendWindowSize(streamId, -1 * dataLength); + if (sessionFlowControl) { + spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataLength); + } // The transfer window size is pre-decremented when sending a data frame downstream. - // Close the stream on write failures that leave the transfer window in a corrupt state. + // Close the session on write failures that leave the transfer window in a corrupt state. final ChannelHandlerContext context = ctx; promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { - issueStreamError(context, streamId, SpdyStreamStatus.INTERNAL_ERROR); + issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR); } } }); @@ -509,7 +547,7 @@ public class SpdySessionHandler SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg; int streamId = spdySynStreamFrame.getStreamId(); - if (isRemoteInitiatedID(streamId)) { + if (isRemoteInitiatedId(streamId)) { promise.setFailure(PROTOCOL_EXCEPTION); return; } @@ -528,7 +566,7 @@ public class SpdySessionHandler int streamId = spdySynReplyFrame.getStreamId(); // Frames must not be sent on half-closed streams - if (!isRemoteInitiatedID(streamId) || spdySession.isLocalSideClosed(streamId)) { + if (!isRemoteInitiatedId(streamId) || spdySession.isLocalSideClosed(streamId)) { promise.setFailure(PROTOCOL_EXCEPTION); return; } @@ -550,7 +588,7 @@ public class SpdySessionHandler int newConcurrentStreams = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS); if (newConcurrentStreams >= 0) { - updateConcurrentStreams(newConcurrentStreams, false); + localConcurrentStreams = newConcurrentStreams; } // Persistence flag are inconsistent with the use of SETTINGS to communicate @@ -572,7 +610,7 @@ public class SpdySessionHandler } else if (msg instanceof SpdyPingFrame) { SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg; - if (isRemoteInitiatedID(spdyPingFrame.getId())) { + if (isRemoteInitiatedId(spdyPingFrame.getId())) { ctx.fireExceptionCaught(new IllegalArgumentException( "invalid PING ID: " + spdyPingFrame.getId())); return; @@ -654,20 +692,11 @@ public class SpdySessionHandler * Helper functions */ - private boolean isRemoteInitiatedID(int id) { + private boolean isRemoteInitiatedId(int id) { boolean serverId = SpdyCodecUtil.isServerId(id); return server && !serverId || !server && serverId; } - private void updateConcurrentStreams(int newConcurrentStreams, boolean remote) { - if (remote) { - remoteConcurrentStreams = newConcurrentStreams; - } else { - localConcurrentStreams = newConcurrentStreams; - } - maxConcurrentStreams = Math.min(localConcurrentStreams, remoteConcurrentStreams); - } - // need to synchronize to prevent new streams from being created while updating active streams private synchronized void updateInitialSendWindowSize(int newInitialWindowSize) { int deltaWindowSize = newInitialWindowSize - initialSendWindowSize; @@ -690,13 +719,15 @@ public class SpdySessionHandler return false; } - if (spdySession.numActiveStreams() >= maxConcurrentStreams) { + boolean remote = isRemoteInitiatedId(streamId); + int maxConcurrentStreams = remote ? localConcurrentStreams : remoteConcurrentStreams; + if (spdySession.numActiveStreams(remote) >= maxConcurrentStreams) { return false; } spdySession.acceptStream( streamId, priority, remoteSideClosed, localSideClosed, - initialSendWindowSize, initialReceiveWindowSize); - if (isRemoteInitiatedID(streamId)) { + initialSendWindowSize, initialReceiveWindowSize, remote); + if (remote) { lastGoodStreamId = streamId; } return true; @@ -704,9 +735,9 @@ public class SpdySessionHandler private void halfCloseStream(int streamId, boolean remote, ChannelFuture future) { if (remote) { - spdySession.closeRemoteSide(streamId); + spdySession.closeRemoteSide(streamId, isRemoteInitiatedId(streamId)); } else { - spdySession.closeLocalSide(streamId); + spdySession.closeLocalSide(streamId, isRemoteInitiatedId(streamId)); } if (closeSessionFutureListener != null && spdySession.noActiveStreams()) { future.addListener(closeSessionFutureListener); @@ -714,16 +745,20 @@ public class SpdySessionHandler } private void removeStream(int streamId, ChannelFuture future) { - spdySession.removeStream(streamId, STREAM_CLOSED); + spdySession.removeStream(streamId, STREAM_CLOSED, isRemoteInitiatedId(streamId)); if (closeSessionFutureListener != null && spdySession.noActiveStreams()) { future.addListener(closeSessionFutureListener); } } - private void updateSendWindowSize(final ChannelHandlerContext ctx, final int streamId, int deltaWindowSize) { + private void updateSendWindowSize(final ChannelHandlerContext ctx, int streamId, int deltaWindowSize) { synchronized (flowControlLock) { int newWindowSize = spdySession.updateSendWindowSize(streamId, deltaWindowSize); + if (sessionFlowControl && streamId != SPDY_SESSION_STREAM_ID) { + int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID); + newWindowSize = Math.min(newWindowSize, sessionSendWindowSize); + } while (newWindowSize > 0) { // Check if we have unblocked a stalled stream @@ -734,42 +769,54 @@ public class SpdySessionHandler SpdyDataFrame spdyDataFrame = pendingWrite.spdyDataFrame; int dataFrameSize = spdyDataFrame.content().readableBytes(); + int writeStreamId = spdyDataFrame.getStreamId(); + if (sessionFlowControl && streamId == SPDY_SESSION_STREAM_ID) { + newWindowSize = Math.min(newWindowSize, spdySession.getSendWindowSize(writeStreamId)); + } if (newWindowSize >= dataFrameSize) { // Window size is large enough to send entire data frame - spdySession.removePendingWrite(streamId); - newWindowSize = spdySession.updateSendWindowSize(streamId, -1 * dataFrameSize); + spdySession.removePendingWrite(writeStreamId); + newWindowSize = spdySession.updateSendWindowSize(writeStreamId, -1 * dataFrameSize); + if (sessionFlowControl) { + int sessionSendWindowSize = + spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataFrameSize); + newWindowSize = Math.min(newWindowSize, sessionSendWindowSize); + } // Close the local side of the stream if this is the last frame if (spdyDataFrame.isLast()) { - halfCloseStream(streamId, false, pendingWrite.promise); + halfCloseStream(writeStreamId, false, pendingWrite.promise); } // The transfer window size is pre-decremented when sending a data frame downstream. - // Close the stream on write failures that leave the transfer window in a corrupt state. + // Close the session on write failures that leave the transfer window in a corrupt state. ctx.writeAndFlush(spdyDataFrame, pendingWrite.promise).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { - issueStreamError(ctx, streamId, SpdyStreamStatus.INTERNAL_ERROR); + issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR); } } }); } else { // We can send a partial frame - spdySession.updateSendWindowSize(streamId, -1 * newWindowSize); + spdySession.updateSendWindowSize(writeStreamId, -1 * newWindowSize); + if (sessionFlowControl) { + spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * newWindowSize); + } // Create a partial data frame whose length is the current window size - SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId, + SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(writeStreamId, spdyDataFrame.content().readSlice(newWindowSize).retain()); // The transfer window size is pre-decremented when sending a data frame downstream. - // Close the stream on write failures that leave the transfer window in a corrupt state. + // Close the session on write failures that leave the transfer window in a corrupt state. ctx.writeAndFlush(partialDataFrame).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { - issueStreamError(ctx, streamId, SpdyStreamStatus.INTERNAL_ERROR); + issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR); } } }); diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyVersion.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyVersion.java new file mode 100644 index 0000000000..84d66028ca --- /dev/null +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyVersion.java @@ -0,0 +1,55 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.spdy; + +public enum SpdyVersion { + SPDY_2 (2, false, false), + SPDY_3 (3, true, false), + SPDY_3_1 (3, true, true); + + static SpdyVersion valueOf(int version) { + if (version == 2) { + return SPDY_2; + } + if (version == 3) { + return SPDY_3; + } + throw new IllegalArgumentException( + "unsupported version: " + version); + } + + private final int version; + private final boolean flowControl; + private final boolean sessionFlowControl; + + private SpdyVersion(int version, boolean flowControl, boolean sessionFlowControl) { + this.version = version; + this.flowControl = flowControl; + this.sessionFlowControl = sessionFlowControl; + } + + int getVersion() { + return version; + } + + boolean useFlowControl() { + return flowControl; + } + + boolean useSessionFlowControl() { + return sessionFlowControl; + } +} diff --git a/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdyFrameDecoderTest.java b/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdyFrameDecoderTest.java index 03e2e0b205..1e3cc3fdc1 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdyFrameDecoderTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdyFrameDecoderTest.java @@ -34,7 +34,6 @@ import java.net.InetSocketAddress; import java.util.Arrays; import java.util.List; -import static io.netty.handler.codec.spdy.SpdyConstants.*; import static org.junit.Assert.*; public class SpdyFrameDecoderTest { @@ -48,57 +47,60 @@ public class SpdyFrameDecoderTest { @Test public void testTooLargeHeaderNameOnSynStreamRequest() throws Exception { - for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version++) { - final int finalVersion = version; - List headerSizes = Arrays.asList(90, 900); - for (final int maxHeaderSize : headerSizes) { // 90 catches the header name, 900 the value - SpdyHeadersFrame frame = new DefaultSpdySynStreamFrame(1, 0, (byte) 0); - addHeader(frame, 100, 1000); - final CaptureHandler captureHandler = new CaptureHandler(); - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group); - sb.channel(NioServerSocketChannel.class); - sb.childHandler(new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast( - new SpdyFrameDecoder(finalVersion, 10000, maxHeaderSize), - new SpdySessionHandler(finalVersion, true), - captureHandler); - } - }); + testTooLargeHeaderNameOnSynStreamRequest(SpdyVersion.SPDY_2); + testTooLargeHeaderNameOnSynStreamRequest(SpdyVersion.SPDY_3); + testTooLargeHeaderNameOnSynStreamRequest(SpdyVersion.SPDY_3_1); + } - Bootstrap cb = new Bootstrap(); - cb.group(group); - cb.channel(NioSocketChannel.class); - cb.handler(new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast(new SpdyFrameEncoder(finalVersion)); - } - }); - Channel sc = sb.bind(0).sync().channel(); - int port = ((InetSocketAddress) sc.localAddress()).getPort(); + private void testTooLargeHeaderNameOnSynStreamRequest(final SpdyVersion version) throws Exception { + List headerSizes = Arrays.asList(90, 900); + for (final int maxHeaderSize : headerSizes) { // 90 catches the header name, 900 the value + SpdyHeadersFrame frame = new DefaultSpdySynStreamFrame(1, 0, (byte) 0); + addHeader(frame, 100, 1000); + final CaptureHandler captureHandler = new CaptureHandler(); + ServerBootstrap sb = new ServerBootstrap(); + sb.group(group); + sb.channel(NioServerSocketChannel.class); + sb.childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast( + new SpdyFrameDecoder(version, 10000, maxHeaderSize), + new SpdySessionHandler(version, true), + captureHandler); + } + }); - Channel cc = cb.connect(NetUtil.LOCALHOST, port).sync().channel(); + Bootstrap cb = new Bootstrap(); + cb.group(group); + cb.channel(NioSocketChannel.class); + cb.handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(new SpdyFrameEncoder(version)); + } + }); + Channel sc = sb.bind(0).sync().channel(); + int port = ((InetSocketAddress) sc.localAddress()).getPort(); - sendAndWaitForFrame(cc, frame, captureHandler); + Channel cc = cb.connect(NetUtil.LOCALHOST, port).sync().channel(); - assertNotNull("version " + version + ", not null message", - captureHandler.message); - String message = "version " + version + ", should be SpdyHeadersFrame, was " + - captureHandler.message.getClass(); - assertTrue( - message, - captureHandler.message instanceof SpdyHeadersFrame); - SpdyHeadersFrame writtenFrame = (SpdyHeadersFrame) captureHandler.message; + sendAndWaitForFrame(cc, frame, captureHandler); - assertTrue("should be truncated", writtenFrame.isTruncated()); - assertFalse("should not be invalid", writtenFrame.isInvalid()); + assertNotNull("version " + version + ", not null message", + captureHandler.message); + String message = "version " + version + ", should be SpdyHeadersFrame, was " + + captureHandler.message.getClass(); + assertTrue( + message, + captureHandler.message instanceof SpdyHeadersFrame); + SpdyHeadersFrame writtenFrame = (SpdyHeadersFrame) captureHandler.message; - sc.close().sync(); - cc.close().sync(); - } + assertTrue("should be truncated", writtenFrame.isTruncated()); + assertFalse("should not be invalid", writtenFrame.isInvalid()); + + sc.close().sync(); + cc.close().sync(); } } diff --git a/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java b/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java index a4801c4122..c074ee0a40 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java @@ -92,7 +92,7 @@ public class SpdySessionHandlerTest { assertTrue(spdyHeadersFrame.headers().entries().isEmpty()); } - private static void testSpdySessionHandler(int version, boolean server) { + private static void testSpdySessionHandler(SpdyVersion version, boolean server) { EmbeddedChannel sessionHandler = new EmbeddedChannel( new SpdySessionHandler(version, server), new EchoHandler(closeSignal, server)); @@ -103,9 +103,6 @@ public class SpdySessionHandlerTest { int localStreamId = server ? 1 : 2; int remoteStreamId = server ? 2 : 1; - SpdyPingFrame localPingFrame = new DefaultSpdyPingFrame(localStreamId); - SpdyPingFrame remotePingFrame = new DefaultSpdyPingFrame(remoteStreamId); - SpdySynStreamFrame spdySynStreamFrame = new DefaultSpdySynStreamFrame(localStreamId, 0, (byte) 0); spdySynStreamFrame.headers().set("Compression", "test"); @@ -126,24 +123,11 @@ public class SpdySessionHandlerTest { assertNull(sessionHandler.readOutbound()); remoteStreamId += 2; - // Check if session handler correctly limits the number of - // concurrent streams in the SETTINGS frame - SpdySettingsFrame spdySettingsFrame = new DefaultSpdySettingsFrame(); - spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 0); - sessionHandler.writeInbound(spdySettingsFrame); - assertNull(sessionHandler.readOutbound()); - sessionHandler.writeInbound(spdySynStreamFrame); - assertRstStream(sessionHandler.readOutbound(), localStreamId, SpdyStreamStatus.REFUSED_STREAM); - assertNull(sessionHandler.readOutbound()); - spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 100); - sessionHandler.writeInbound(spdySettingsFrame); - assertNull(sessionHandler.readOutbound()); - sessionHandler.writeInbound(new DefaultSpdySynReplyFrame(remoteStreamId)); - assertNull(sessionHandler.readOutbound()); - // Check if session handler returns PROTOCOL_ERROR if it receives // multiple SYN_REPLY frames for the same active Stream-ID sessionHandler.writeInbound(new DefaultSpdySynReplyFrame(remoteStreamId)); + assertNull(sessionHandler.readOutbound()); + sessionHandler.writeInbound(new DefaultSpdySynReplyFrame(remoteStreamId)); assertRstStream(sessionHandler.readOutbound(), remoteStreamId, SpdyStreamStatus.STREAM_IN_USE); assertNull(sessionHandler.readOutbound()); remoteStreamId += 2; @@ -173,6 +157,17 @@ public class SpdySessionHandlerTest { assertRstStream(sessionHandler.readOutbound(), localStreamId, SpdyStreamStatus.REFUSED_STREAM); assertNull(sessionHandler.readOutbound()); + // Check if session handler rejects HEADERS for closed streams + int testStreamId = spdyDataFrame.getStreamId(); + sessionHandler.writeInbound(spdyDataFrame); + assertDataFrame(sessionHandler.readOutbound(), testStreamId, spdyDataFrame.isLast()); + assertNull(sessionHandler.readOutbound()); + spdyHeadersFrame.setStreamId(testStreamId); + + sessionHandler.writeInbound(spdyHeadersFrame); + assertRstStream(sessionHandler.readOutbound(), testStreamId, SpdyStreamStatus.INVALID_STREAM); + assertNull(sessionHandler.readOutbound()); + // Check if session handler drops active streams if it receives // a RST_STREAM frame for that Stream-ID sessionHandler.writeInbound(new DefaultSpdyRstStreamFrame(remoteStreamId, 3)); @@ -200,32 +195,6 @@ public class SpdySessionHandlerTest { assertNull(sessionHandler.readOutbound()); spdySynStreamFrame.setStreamId(localStreamId); - // Check if session handler correctly handles updates to the max - // concurrent streams in the SETTINGS frame - spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 2); - sessionHandler.writeInbound(spdySettingsFrame); - assertNull(sessionHandler.readOutbound()); - sessionHandler.writeInbound(spdySynStreamFrame); - assertRstStream(sessionHandler.readOutbound(), localStreamId, SpdyStreamStatus.REFUSED_STREAM); - assertNull(sessionHandler.readOutbound()); - spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 4); - sessionHandler.writeInbound(spdySettingsFrame); - assertNull(sessionHandler.readOutbound()); - sessionHandler.writeInbound(spdySynStreamFrame); - assertSynReply(sessionHandler.readOutbound(), localStreamId, false, spdySynStreamFrame.headers()); - assertNull(sessionHandler.readOutbound()); - - // Check if session handler rejects HEADERS for closed streams - int testStreamId = spdyDataFrame.getStreamId(); - sessionHandler.writeInbound(spdyDataFrame); - assertDataFrame(sessionHandler.readOutbound(), testStreamId, spdyDataFrame.isLast()); - assertNull(sessionHandler.readOutbound()); - spdyHeadersFrame.setStreamId(testStreamId); - - sessionHandler.writeInbound(spdyHeadersFrame); - assertRstStream(sessionHandler.readOutbound(), testStreamId, SpdyStreamStatus.INVALID_STREAM); - assertNull(sessionHandler.readOutbound()); - // Check if session handler returns PROTOCOL_ERROR if it receives // an invalid HEADERS frame spdyHeadersFrame.setStreamId(localStreamId); @@ -235,6 +204,23 @@ public class SpdySessionHandlerTest { assertRstStream(sessionHandler.readOutbound(), localStreamId, SpdyStreamStatus.PROTOCOL_ERROR); assertNull(sessionHandler.readOutbound()); + sessionHandler.finish(); + } + + private static void testSpdySessionHandlerPing(SpdyVersion version, boolean server) { + EmbeddedChannel sessionHandler = new EmbeddedChannel( + new SpdySessionHandler(version, server), new EchoHandler(closeSignal, server)); + + while (sessionHandler.readOutbound() != null) { + continue; + } + + int localStreamId = server ? 1 : 2; + int remoteStreamId = server ? 2 : 1; + + SpdyPingFrame localPingFrame = new DefaultSpdyPingFrame(localStreamId); + SpdyPingFrame remotePingFrame = new DefaultSpdyPingFrame(remoteStreamId); + // Check if session handler returns identical local PINGs sessionHandler.writeInbound(localPingFrame); assertPing(sessionHandler.readOutbound(), localPingFrame.getId()); @@ -244,6 +230,34 @@ public class SpdySessionHandlerTest { sessionHandler.writeInbound(remotePingFrame); assertNull(sessionHandler.readOutbound()); + sessionHandler.finish(); + } + + private static void testSpdySessionHandlerGoAway(SpdyVersion version, boolean server) { + EmbeddedChannel sessionHandler = new EmbeddedChannel( + new SpdySessionHandler(version, server), new EchoHandler(closeSignal, server)); + + while (sessionHandler.readOutbound() != null) { + continue; + } + + int localStreamId = server ? 1 : 2; + + SpdySynStreamFrame spdySynStreamFrame = + new DefaultSpdySynStreamFrame(localStreamId, 0, (byte) 0); + spdySynStreamFrame.headers().set("Compression", "test"); + + SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(localStreamId); + spdyDataFrame.setLast(true); + + // Send an initial request + sessionHandler.writeInbound(spdySynStreamFrame); + assertSynReply(sessionHandler.readOutbound(), localStreamId, false, spdySynStreamFrame.headers()); + assertNull(sessionHandler.readOutbound()); + sessionHandler.writeInbound(spdyDataFrame); + assertDataFrame(sessionHandler.readOutbound(), localStreamId, true); + assertNull(sessionHandler.readOutbound()); + // Check if session handler sends a GOAWAY frame when closing sessionHandler.writeInbound(closeMessage); assertGoAway(sessionHandler.readOutbound(), localStreamId); @@ -268,22 +282,66 @@ public class SpdySessionHandlerTest { @Test public void testSpdyClientSessionHandler() { - for (int version = SpdyConstants.SPDY_MIN_VERSION; version <= SpdyConstants.SPDY_MAX_VERSION; version ++) { - logger.info("Running: testSpdyClientSessionHandler v" + version); - testSpdySessionHandler(version, false); - } + logger.info("Running: testSpdyClientSessionHandler v2"); + testSpdySessionHandler(SpdyVersion.SPDY_2, false); + logger.info("Running: testSpdyClientSessionHandler v3"); + testSpdySessionHandler(SpdyVersion.SPDY_3, false); + logger.info("Running: testSpdyClientSessionHandler v3.1"); + testSpdySessionHandler(SpdyVersion.SPDY_3_1, false); + } + + @Test + public void testSpdyClientSessionHandlerPing() { + logger.info("Running: testSpdyClientSessionHandlerPing v2"); + testSpdySessionHandlerPing(SpdyVersion.SPDY_2, false); + logger.info("Running: testSpdyClientSessionHandlerPing v3"); + testSpdySessionHandlerPing(SpdyVersion.SPDY_3, false); + logger.info("Running: testSpdyClientSessionHandlerPing v3.1"); + testSpdySessionHandlerPing(SpdyVersion.SPDY_3_1, false); + } + + @Test + public void testSpdyClientSessionHandlerGoAway() { + logger.info("Running: testSpdyClientSessionHandlerGoAway v2"); + testSpdySessionHandlerGoAway(SpdyVersion.SPDY_2, false); + logger.info("Running: testSpdyClientSessionHandlerGoAway v3"); + testSpdySessionHandlerGoAway(SpdyVersion.SPDY_3, false); + logger.info("Running: testSpdyClientSessionHandlerGoAway v3.1"); + testSpdySessionHandlerGoAway(SpdyVersion.SPDY_3_1, false); } @Test public void testSpdyServerSessionHandler() { - for (int version = SpdyConstants.SPDY_MIN_VERSION; version <= SpdyConstants.SPDY_MAX_VERSION; version ++) { - logger.info("Running: testSpdyServerSessionHandler v" + version); - testSpdySessionHandler(version, true); - } + logger.info("Running: testSpdyServerSessionHandler v2"); + testSpdySessionHandler(SpdyVersion.SPDY_2, true); + logger.info("Running: testSpdyServerSessionHandler v3"); + testSpdySessionHandler(SpdyVersion.SPDY_3, true); + logger.info("Running: testSpdyServerSessionHandler v3.1"); + testSpdySessionHandler(SpdyVersion.SPDY_3_1, true); + } + + @Test + public void testSpdyServerSessionHandlerPing() { + logger.info("Running: testSpdyServerSessionHandlerPing v2"); + testSpdySessionHandlerPing(SpdyVersion.SPDY_2, true); + logger.info("Running: testSpdyServerSessionHandlerPing v3"); + testSpdySessionHandlerPing(SpdyVersion.SPDY_3, true); + logger.info("Running: testSpdyServerSessionHandlerPing v3.1"); + testSpdySessionHandlerPing(SpdyVersion.SPDY_3_1, true); + } + + @Test + public void testSpdyServerSessionHandlerGoAway() { + logger.info("Running: testSpdyServerSessionHandlerGoAway v2"); + testSpdySessionHandlerGoAway(SpdyVersion.SPDY_2, true); + logger.info("Running: testSpdyServerSessionHandlerGoAway v3"); + testSpdySessionHandlerGoAway(SpdyVersion.SPDY_3, true); + logger.info("Running: testSpdyServerSessionHandlerGoAway v3.1"); + testSpdySessionHandlerGoAway(SpdyVersion.SPDY_3_1, true); } // Echo Handler opens 4 half-closed streams on session connection - // and then sets the number of concurrent streams to 3 + // and then sets the number of concurrent streams to 1 private static class EchoHandler extends ChannelInboundHandlerAdapter { private final int closeSignal; private final boolean server; @@ -308,9 +366,9 @@ public class SpdySessionHandlerTest { spdySynStreamFrame.setStreamId(spdySynStreamFrame.getStreamId() + 2); ctx.writeAndFlush(spdySynStreamFrame); - // Limit the number of concurrent streams to 3 + // Limit the number of concurrent streams to 1 SpdySettingsFrame spdySettingsFrame = new DefaultSpdySettingsFrame(); - spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 3); + spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 1); ctx.writeAndFlush(spdySettingsFrame); } @@ -337,8 +395,8 @@ public class SpdySessionHandlerTest { } if (msg instanceof SpdyDataFrame || - msg instanceof SpdyPingFrame || - msg instanceof SpdyHeadersFrame) { + msg instanceof SpdyPingFrame || + msg instanceof SpdyHeadersFrame) { ctx.writeAndFlush(msg); return; diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSpdyEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSpdyEchoTest.java index aeac4a01d6..a80d297514 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSpdyEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSpdyEchoTest.java @@ -25,9 +25,9 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.spdy.SpdyConstants; import io.netty.handler.codec.spdy.SpdyFrameDecoder; import io.netty.handler.codec.spdy.SpdyFrameEncoder; +import io.netty.handler.codec.spdy.SpdyVersion; import io.netty.util.NetUtil; import org.junit.Test; @@ -165,19 +165,37 @@ public class SocketSpdyEchoTest extends AbstractSocketTest { return frames; } - private int version; + private SpdyVersion version; @Test(timeout = 15000) public void testSpdyEcho() throws Throwable { - for (version = SpdyConstants.SPDY_MIN_VERSION; version <= SpdyConstants.SPDY_MAX_VERSION; version ++) { - logger.info("Testing against SPDY v" + version); - run(); - } + version = SpdyVersion.SPDY_2; + logger.info("Testing against SPDY v2"); + run(); + + version = SpdyVersion.SPDY_3; + logger.info("Testing against SPDY v3"); + run(); + + version = SpdyVersion.SPDY_3_1; + logger.info("Testing against SPDY v3.1"); + run(); } public void testSpdyEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable { - ByteBuf frames = createFrames(version); + ByteBuf frames; + switch (version) { + case SPDY_2: + frames = createFrames(2); + break; + case SPDY_3: + case SPDY_3_1: + frames = createFrames(3); + break; + default: + throw new IllegalArgumentException("unknown version"); + } final SpdyEchoTestServerHandler sh = new SpdyEchoTestServerHandler(); final SpdyEchoTestClientHandler ch = new SpdyEchoTestClientHandler(frames.copy());