SPDY: add SPDY/3.1 support

- with Michael Schore <mschore@twitter.com>
This commit is contained in:
Jeff Pinner 2013-09-27 10:48:16 -07:00 committed by Norman Maurer
parent c2101d3c56
commit 321990a115
22 changed files with 472 additions and 252 deletions

View File

@ -20,6 +20,8 @@ import io.netty.util.CharsetUtil;
final class SpdyCodecUtil { final class SpdyCodecUtil {
static final int SPDY_SESSION_STREAM_ID = 0;
static final int SPDY_HEADER_TYPE_OFFSET = 2; static final int SPDY_HEADER_TYPE_OFFSET = 2;
static final int SPDY_HEADER_FLAGS_OFFSET = 4; static final int SPDY_HEADER_FLAGS_OFFSET = 4;
static final int SPDY_HEADER_LENGTH_OFFSET = 5; static final int SPDY_HEADER_LENGTH_OFFSET = 5;

View File

@ -1,24 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.spdy;
public final class SpdyConstants {
public static final int SPDY_MIN_VERSION = 2;
public static final int SPDY_MAX_VERSION = 3;
private SpdyConstants() { }
}

View File

@ -28,7 +28,7 @@ public final class SpdyFrameCodec extends CombinedChannelDuplexHandler<SpdyFrame
* {@code compressionLevel (6)}, {@code windowBits (15)}, * {@code compressionLevel (6)}, {@code windowBits (15)},
* and {@code memLevel (8)}). * and {@code memLevel (8)}).
*/ */
public SpdyFrameCodec(int version) { public SpdyFrameCodec(SpdyVersion version) {
this(version, 8192, 16384, 6, 15, 8); this(version, 8192, 16384, 6, 15, 8);
} }
@ -36,7 +36,7 @@ public final class SpdyFrameCodec extends CombinedChannelDuplexHandler<SpdyFrame
* Creates a new instance with the specified decoder and encoder options. * Creates a new instance with the specified decoder and encoder options.
*/ */
public SpdyFrameCodec( public SpdyFrameCodec(
int version, int maxChunkSize, int maxHeaderSize, SpdyVersion version, int maxChunkSize, int maxHeaderSize,
int compressionLevel, int windowBits, int memLevel) { int compressionLevel, int windowBits, int memLevel) {
super( super(
new SpdyFrameDecoder(version, maxChunkSize, maxHeaderSize), new SpdyFrameDecoder(version, maxChunkSize, maxHeaderSize),

View File

@ -83,28 +83,27 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder {
* Creates a new instance with the specified {@code version} and the default * Creates a new instance with the specified {@code version} and the default
* {@code maxChunkSize (8192)} and {@code maxHeaderSize (16384)}. * {@code maxChunkSize (8192)} and {@code maxHeaderSize (16384)}.
*/ */
public SpdyFrameDecoder(int version) { public SpdyFrameDecoder(SpdyVersion version) {
this(version, 8192, 16384); this(version, 8192, 16384);
} }
/** /**
* Creates a new instance with the specified parameters. * Creates a new instance with the specified parameters.
*/ */
public SpdyFrameDecoder(int version, int maxChunkSize, int maxHeaderSize) { public SpdyFrameDecoder(SpdyVersion version, int maxChunkSize, int maxHeaderSize) {
this(version, maxChunkSize, SpdyHeaderBlockDecoder.newInstance(version, maxHeaderSize)); this(version, maxChunkSize, SpdyHeaderBlockDecoder.newInstance(version, maxHeaderSize));
} }
protected SpdyFrameDecoder( protected SpdyFrameDecoder(
int version, int maxChunkSize, SpdyHeaderBlockDecoder headerBlockDecoder) { SpdyVersion version, int maxChunkSize, SpdyHeaderBlockDecoder headerBlockDecoder) {
if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) { if (version == null) {
throw new IllegalArgumentException( throw new NullPointerException("version");
"unsupported version: " + version);
} }
if (maxChunkSize <= 0) { if (maxChunkSize <= 0) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"maxChunkSize must be a positive integer: " + maxChunkSize); "maxChunkSize must be a positive integer: " + maxChunkSize);
} }
spdyVersion = version; spdyVersion = version.getVersion();
this.maxChunkSize = maxChunkSize; this.maxChunkSize = maxChunkSize;
this.headerBlockDecoder = headerBlockDecoder; this.headerBlockDecoder = headerBlockDecoder;
state = State.READ_COMMON_HEADER; state = State.READ_COMMON_HEADER;

View File

@ -39,24 +39,23 @@ public class SpdyFrameEncoder extends MessageToByteEncoder<SpdyFrame> {
* default {@code compressionLevel (6)}, {@code windowBits (15)}, * default {@code compressionLevel (6)}, {@code windowBits (15)},
* and {@code memLevel (8)}. * and {@code memLevel (8)}.
*/ */
public SpdyFrameEncoder(int version) { public SpdyFrameEncoder(SpdyVersion version) {
this(version, 6, 15, 8); this(version, 6, 15, 8);
} }
/** /**
* Creates a new instance with the specified parameters. * Creates a new instance with the specified parameters.
*/ */
public SpdyFrameEncoder(int version, int compressionLevel, int windowBits, int memLevel) { public SpdyFrameEncoder(SpdyVersion version, int compressionLevel, int windowBits, int memLevel) {
this(version, SpdyHeaderBlockEncoder.newInstance( this(version, SpdyHeaderBlockEncoder.newInstance(
version, compressionLevel, windowBits, memLevel)); version, compressionLevel, windowBits, memLevel));
} }
protected SpdyFrameEncoder(int version, SpdyHeaderBlockEncoder headerBlockEncoder) { protected SpdyFrameEncoder(SpdyVersion version, SpdyHeaderBlockEncoder headerBlockEncoder) {
if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) { if (version == null) {
throw new IllegalArgumentException( throw new NullPointerException("version");
"unknown version: " + version);
} }
this.version = version; this.version = version.getVersion();
this.headerBlockEncoder = headerBlockEncoder; this.headerBlockEncoder = headerBlockEncoder;
} }

View File

@ -19,7 +19,7 @@ import io.netty.buffer.ByteBuf;
abstract class SpdyHeaderBlockDecoder { abstract class SpdyHeaderBlockDecoder {
static SpdyHeaderBlockDecoder newInstance(int version, int maxHeaderSize) { static SpdyHeaderBlockDecoder newInstance(SpdyVersion version, int maxHeaderSize) {
return new SpdyHeaderBlockZlibDecoder(version, maxHeaderSize); return new SpdyHeaderBlockZlibDecoder(version, maxHeaderSize);
} }

View File

@ -22,7 +22,7 @@ import io.netty.util.internal.PlatformDependent;
abstract class SpdyHeaderBlockEncoder { abstract class SpdyHeaderBlockEncoder {
static SpdyHeaderBlockEncoder newInstance( static SpdyHeaderBlockEncoder newInstance(
int version, int compressionLevel, int windowBits, int memLevel) { SpdyVersion version, int compressionLevel, int windowBits, int memLevel) {
if (PlatformDependent.javaVersion() >= 7) { if (PlatformDependent.javaVersion() >= 7) {
return new SpdyHeaderBlockZlibEncoder( return new SpdyHeaderBlockZlibEncoder(

View File

@ -31,7 +31,7 @@ class SpdyHeaderBlockJZlibEncoder extends SpdyHeaderBlockRawEncoder {
private boolean finished; private boolean finished;
public SpdyHeaderBlockJZlibEncoder( public SpdyHeaderBlockJZlibEncoder(
int version, int compressionLevel, int windowBits, int memLevel) { SpdyVersion version, int compressionLevel, int windowBits, int memLevel) {
super(version); super(version);
if (compressionLevel < 0 || compressionLevel > 9) { if (compressionLevel < 0 || compressionLevel > 9) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
@ -52,7 +52,7 @@ class SpdyHeaderBlockJZlibEncoder extends SpdyHeaderBlockRawEncoder {
throw new CompressionException( throw new CompressionException(
"failed to initialize an SPDY header block deflater: " + resultCode); "failed to initialize an SPDY header block deflater: " + resultCode);
} else { } else {
if (version < 3) { if (version.getVersion() < 3) {
resultCode = z.deflateSetDictionary(SPDY2_DICT, SPDY2_DICT.length); resultCode = z.deflateSetDictionary(SPDY2_DICT, SPDY2_DICT.length);
} else { } else {
resultCode = z.deflateSetDictionary(SPDY_DICT, SPDY_DICT.length); resultCode = z.deflateSetDictionary(SPDY_DICT, SPDY_DICT.length);

View File

@ -29,15 +29,14 @@ public class SpdyHeaderBlockRawDecoder extends SpdyHeaderBlockDecoder {
private int headerSize; private int headerSize;
private int numHeaders; private int numHeaders;
public SpdyHeaderBlockRawDecoder(int version, int maxHeaderSize) { public SpdyHeaderBlockRawDecoder(SpdyVersion version, int maxHeaderSize) {
if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) { if (version == null) {
throw new IllegalArgumentException( throw new NullPointerException("version");
"unsupported version: " + version);
} }
this.version = version; this.version = version.getVersion();
this.maxHeaderSize = maxHeaderSize; this.maxHeaderSize = maxHeaderSize;
lengthFieldSize = version < 3 ? 2 : 4; lengthFieldSize = this.version < 3 ? 2 : 4;
reset(); reset();
} }

View File

@ -27,12 +27,11 @@ public class SpdyHeaderBlockRawEncoder extends SpdyHeaderBlockEncoder {
private final int version; private final int version;
public SpdyHeaderBlockRawEncoder(int version) { public SpdyHeaderBlockRawEncoder(SpdyVersion version) {
if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) { if (version == null) {
throw new IllegalArgumentException( throw new NullPointerException("version");
"unknown version: " + version);
} }
this.version = version; this.version = version.getVersion();
} }
private void setLengthField(ByteBuf buffer, int writerIndex, int length) { private void setLengthField(ByteBuf buffer, int writerIndex, int length) {

View File

@ -31,9 +31,9 @@ class SpdyHeaderBlockZlibDecoder extends SpdyHeaderBlockRawDecoder {
private ByteBuf decompressed; private ByteBuf decompressed;
public SpdyHeaderBlockZlibDecoder(int version, int maxHeaderSize) { public SpdyHeaderBlockZlibDecoder(SpdyVersion version, int maxHeaderSize) {
super(version, maxHeaderSize); super(version, maxHeaderSize);
this.version = version; this.version = version.getVersion();
} }
@Override @Override

View File

@ -30,14 +30,14 @@ class SpdyHeaderBlockZlibEncoder extends SpdyHeaderBlockRawEncoder {
private boolean finished; private boolean finished;
public SpdyHeaderBlockZlibEncoder(int version, int compressionLevel) { public SpdyHeaderBlockZlibEncoder(SpdyVersion version, int compressionLevel) {
super(version); super(version);
if (compressionLevel < 0 || compressionLevel > 9) { if (compressionLevel < 0 || compressionLevel > 9) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"compressionLevel: " + compressionLevel + " (expected: 0-9)"); "compressionLevel: " + compressionLevel + " (expected: 0-9)");
} }
compressor = new Deflater(compressionLevel); compressor = new Deflater(compressionLevel);
if (version < 3) { if (version.getVersion() < 3) {
compressor.setDictionary(SPDY2_DICT); compressor.setDictionary(SPDY2_DICT);
} else { } else {
compressor.setDictionary(SPDY_DICT); compressor.setDictionary(SPDY_DICT);

View File

@ -25,7 +25,7 @@ public final class SpdyHttpCodec
/** /**
* Creates a new instance with the specified decoder options. * Creates a new instance with the specified decoder options.
*/ */
public SpdyHttpCodec(int version, int maxContentLength) { public SpdyHttpCodec(SpdyVersion version, int maxContentLength) {
super(new SpdyHttpDecoder(version, maxContentLength), new SpdyHttpEncoder(version)); super(new SpdyHttpDecoder(version, maxContentLength), new SpdyHttpEncoder(version));
} }
} }

View File

@ -51,7 +51,7 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyFrame> {
* If the length of the message content exceeds this value, * If the length of the message content exceeds this value,
* a {@link TooLongFrameException} will be raised. * a {@link TooLongFrameException} will be raised.
*/ */
public SpdyHttpDecoder(int version, int maxContentLength) { public SpdyHttpDecoder(SpdyVersion version, int maxContentLength) {
this(version, maxContentLength, new HashMap<Integer, FullHttpMessage>()); this(version, maxContentLength, new HashMap<Integer, FullHttpMessage>());
} }
@ -64,16 +64,15 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyFrame> {
* a {@link TooLongFrameException} will be raised. * a {@link TooLongFrameException} will be raised.
* @param messageMap the {@link Map} used to hold partially received messages. * @param messageMap the {@link Map} used to hold partially received messages.
*/ */
protected SpdyHttpDecoder(int version, int maxContentLength, Map<Integer, FullHttpMessage> messageMap) { protected SpdyHttpDecoder(SpdyVersion version, int maxContentLength, Map<Integer, FullHttpMessage> messageMap) {
if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) { if (version == null) {
throw new IllegalArgumentException( throw new NullPointerException("version");
"unsupported version: " + version);
} }
if (maxContentLength <= 0) { if (maxContentLength <= 0) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"maxContentLength must be a positive integer: " + maxContentLength); "maxContentLength must be a positive integer: " + maxContentLength);
} }
spdyVersion = version; spdyVersion = version.getVersion();
this.maxContentLength = maxContentLength; this.maxContentLength = maxContentLength;
this.messageMap = messageMap; this.messageMap = messageMap;
} }

View File

@ -130,12 +130,11 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder<HttpObject> {
* *
* @param version the protocol version * @param version the protocol version
*/ */
public SpdyHttpEncoder(int version) { public SpdyHttpEncoder(SpdyVersion version) {
if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) { if (version == null) {
throw new IllegalArgumentException( throw new NullPointerException("version");
"unsupported version: " + version);
} }
spdyVersion = version; spdyVersion = version.getVersion();
} }
@Override @Override

View File

@ -41,6 +41,7 @@ public abstract class SpdyOrHttpChooser extends ByteToMessageDecoder {
public enum SelectedProtocol { public enum SelectedProtocol {
SPDY_2, SPDY_2,
SPDY_3, SPDY_3,
SPDY_3_1,
HTTP_1_1, HTTP_1_1,
HTTP_1_0, HTTP_1_0,
UNKNOWN UNKNOWN
@ -84,10 +85,13 @@ public abstract class SpdyOrHttpChooser extends ByteToMessageDecoder {
// Not done with choosing the protocol, so just return here for now, // Not done with choosing the protocol, so just return here for now,
return false; return false;
case SPDY_2: case SPDY_2:
addSpdyHandlers(ctx, 2); addSpdyHandlers(ctx, SpdyVersion.SPDY_2);
break; break;
case SPDY_3: case SPDY_3:
addSpdyHandlers(ctx, 3); addSpdyHandlers(ctx, SpdyVersion.SPDY_3);
break;
case SPDY_3_1:
addSpdyHandlers(ctx, SpdyVersion.SPDY_3_1);
break; break;
case HTTP_1_0: case HTTP_1_0:
case HTTP_1_1: case HTTP_1_1:
@ -102,7 +106,7 @@ public abstract class SpdyOrHttpChooser extends ByteToMessageDecoder {
/** /**
* Add all {@link ChannelHandler}'s that are needed for SPDY with the given version. * Add all {@link ChannelHandler}'s that are needed for SPDY with the given version.
*/ */
protected void addSpdyHandlers(ChannelHandlerContext ctx, int version) { protected void addSpdyHandlers(ChannelHandlerContext ctx, SpdyVersion version) {
ChannelPipeline pipeline = ctx.pipeline(); ChannelPipeline pipeline = ctx.pipeline();
pipeline.addLast("spdyDecoder", new SpdyFrameDecoder(version)); pipeline.addLast("spdyDecoder", new SpdyFrameDecoder(version));
pipeline.addLast("spdyEncoder", new SpdyFrameEncoder(version)); pipeline.addLast("spdyEncoder", new SpdyFrameEncoder(version));

View File

@ -26,12 +26,28 @@ import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_SESSION_STREAM_ID;
final class SpdySession { final class SpdySession {
private final AtomicInteger activeLocalStreams = new AtomicInteger();
private final AtomicInteger activeRemoteStreams = new AtomicInteger();
private final Map<Integer, StreamState> activeStreams = PlatformDependent.newConcurrentHashMap(); private final Map<Integer, StreamState> activeStreams = PlatformDependent.newConcurrentHashMap();
int numActiveStreams() { private final AtomicInteger sendWindowSize;
return activeStreams.size(); private final AtomicInteger receiveWindowSize;
public SpdySession(int sendWindowSize, int receiveWindowSize) {
this.sendWindowSize = new AtomicInteger(sendWindowSize);
this.receiveWindowSize = new AtomicInteger(receiveWindowSize);
}
int numActiveStreams(boolean remote) {
if (remote) {
return activeRemoteStreams.get();
} else {
return activeLocalStreams.get();
}
} }
boolean noActiveStreams() { boolean noActiveStreams() {
@ -51,15 +67,34 @@ final class SpdySession {
void acceptStream( void acceptStream(
int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed, int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed,
int sendWindowSize, int receiveWindowSize) { int sendWindowSize, int receiveWindowSize, boolean remote) {
if (!remoteSideClosed || !localSideClosed) { if (!remoteSideClosed || !localSideClosed) {
activeStreams.put(streamId, new StreamState( StreamState state = activeStreams.put(streamId, new StreamState(
priority, remoteSideClosed, localSideClosed, sendWindowSize, receiveWindowSize)); priority, remoteSideClosed, localSideClosed, sendWindowSize, receiveWindowSize));
if (state == null) {
if (remote) {
activeRemoteStreams.incrementAndGet();
} else {
activeLocalStreams.incrementAndGet();
}
}
} }
} }
void removeStream(int streamId, Throwable cause) { private StreamState removeActiveStream(int streamId, boolean remote) {
StreamState state = activeStreams.remove(streamId); StreamState state = activeStreams.remove(streamId);
if (state != null) {
if (remote) {
activeRemoteStreams.decrementAndGet();
} else {
activeLocalStreams.decrementAndGet();
}
}
return state;
}
void removeStream(int streamId, Throwable cause, boolean remote) {
StreamState state = removeActiveStream(streamId, remote);
if (state != null) { if (state != null) {
state.clearPendingWrites(cause); state.clearPendingWrites(cause);
} }
@ -70,12 +105,12 @@ final class SpdySession {
return state == null || state.isRemoteSideClosed(); return state == null || state.isRemoteSideClosed();
} }
void closeRemoteSide(int streamId) { void closeRemoteSide(int streamId, boolean remote) {
StreamState state = activeStreams.get(streamId); StreamState state = activeStreams.get(streamId);
if (state != null) { if (state != null) {
state.closeRemoteSide(); state.closeRemoteSide();
if (state.isLocalSideClosed()) { if (state.isLocalSideClosed()) {
activeStreams.remove(streamId); removeActiveStream(streamId, remote);
} }
} }
} }
@ -85,12 +120,12 @@ final class SpdySession {
return state == null || state.isLocalSideClosed(); return state == null || state.isLocalSideClosed();
} }
void closeLocalSide(int streamId) { void closeLocalSide(int streamId, boolean remote) {
StreamState state = activeStreams.get(streamId); StreamState state = activeStreams.get(streamId);
if (state != null) { if (state != null) {
state.closeLocalSide(); state.closeLocalSide();
if (state.isRemoteSideClosed()) { if (state.isRemoteSideClosed()) {
activeStreams.remove(streamId); removeActiveStream(streamId, remote);
} }
} }
} }
@ -112,16 +147,28 @@ final class SpdySession {
} }
int getSendWindowSize(int streamId) { int getSendWindowSize(int streamId) {
if (streamId == SPDY_SESSION_STREAM_ID) {
return sendWindowSize.get();
}
StreamState state = activeStreams.get(streamId); StreamState state = activeStreams.get(streamId);
return state != null ? state.getSendWindowSize() : -1; return state != null ? state.getSendWindowSize() : -1;
} }
int updateSendWindowSize(int streamId, int deltaWindowSize) { int updateSendWindowSize(int streamId, int deltaWindowSize) {
if (streamId == SPDY_SESSION_STREAM_ID) {
return sendWindowSize.addAndGet(deltaWindowSize);
}
StreamState state = activeStreams.get(streamId); StreamState state = activeStreams.get(streamId);
return state != null ? state.updateSendWindowSize(deltaWindowSize) : -1; return state != null ? state.updateSendWindowSize(deltaWindowSize) : -1;
} }
int updateReceiveWindowSize(int streamId, int deltaWindowSize) { int updateReceiveWindowSize(int streamId, int deltaWindowSize) {
if (streamId == SPDY_SESSION_STREAM_ID) {
return receiveWindowSize.addAndGet(deltaWindowSize);
}
StreamState state = activeStreams.get(streamId); StreamState state = activeStreams.get(streamId);
if (deltaWindowSize > 0) { if (deltaWindowSize > 0) {
state.setReceiveWindowSizeLowerBound(0); state.setReceiveWindowSizeLowerBound(0);
@ -130,6 +177,10 @@ final class SpdySession {
} }
int getReceiveWindowSizeLowerBound(int streamId) { int getReceiveWindowSizeLowerBound(int streamId) {
if (streamId == SPDY_SESSION_STREAM_ID) {
return 0;
}
StreamState state = activeStreams.get(streamId); StreamState state = activeStreams.get(streamId);
return state != null ? state.getReceiveWindowSizeLowerBound() : 0; return state != null ? state.getReceiveWindowSizeLowerBound() : 0;
} }
@ -155,6 +206,19 @@ final class SpdySession {
} }
PendingWrite getPendingWrite(int streamId) { PendingWrite getPendingWrite(int streamId) {
if (streamId == SPDY_SESSION_STREAM_ID) {
for (Integer id : getActiveStreams()) {
StreamState state = activeStreams.get(id);
if (state.getSendWindowSize() > 0) {
PendingWrite pendingWrite = state.getPendingWrite();
if (pendingWrite != null) {
return pendingWrite;
}
}
}
return null;
}
StreamState state = activeStreams.get(streamId); StreamState state = activeStreams.get(streamId);
return state != null ? state.getPendingWrite() : null; return state != null ? state.getPendingWrite() : null;
} }

View File

@ -24,6 +24,8 @@ import io.netty.util.internal.EmptyArrays;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_SESSION_STREAM_ID;
/** /**
* Manages streams within a SPDY session. * Manages streams within a SPDY session.
*/ */
@ -38,17 +40,16 @@ public class SpdySessionHandler
STREAM_CLOSED.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE); STREAM_CLOSED.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
} }
private final SpdySession spdySession = new SpdySession(); private static final int DEFAULT_WINDOW_SIZE = 64 * 1024; // 64 KB default initial window size
private int initialSendWindowSize = DEFAULT_WINDOW_SIZE;
private int initialReceiveWindowSize = DEFAULT_WINDOW_SIZE;
private final SpdySession spdySession = new SpdySession(initialSendWindowSize, initialReceiveWindowSize);
private int lastGoodStreamId; private int lastGoodStreamId;
private static final int DEFAULT_MAX_CONCURRENT_STREAMS = Integer.MAX_VALUE; private static final int DEFAULT_MAX_CONCURRENT_STREAMS = Integer.MAX_VALUE;
private int remoteConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS; private int remoteConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
private int localConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS; private int localConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
private int maxConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
private static final int DEFAULT_WINDOW_SIZE = 64 * 1024; // 64 KB default initial window size
private int initialSendWindowSize = DEFAULT_WINDOW_SIZE;
private int initialReceiveWindowSize = DEFAULT_WINDOW_SIZE;
private final Object flowControlLock = new Object(); private final Object flowControlLock = new Object();
@ -61,6 +62,7 @@ public class SpdySessionHandler
private final boolean server; private final boolean server;
private final boolean flowControl; private final boolean flowControl;
private final boolean sessionFlowControl;
/** /**
* Creates a new session handler. * Creates a new session handler.
@ -71,13 +73,13 @@ public class SpdySessionHandler
* {@code false} if and only if this session handler should * {@code false} if and only if this session handler should
* handle the client endpoint of the connection. * handle the client endpoint of the connection.
*/ */
public SpdySessionHandler(int version, boolean server) { public SpdySessionHandler(SpdyVersion version, boolean server) {
if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) { if (version == null) {
throw new IllegalArgumentException( throw new NullPointerException("version");
"unsupported version: " + version);
} }
this.server = server; this.server = server;
flowControl = version >= 3; flowControl = version.useFlowControl();
sessionFlowControl = version.useSessionFlowControl();
} }
@Override @Override
@ -109,6 +111,27 @@ public class SpdySessionHandler
SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg; SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
int streamId = spdyDataFrame.getStreamId(); int streamId = spdyDataFrame.getStreamId();
if (sessionFlowControl) {
int deltaWindowSize = -1 * spdyDataFrame.content().readableBytes();
int newSessionWindowSize =
spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, deltaWindowSize);
// Check if session window size is reduced beyond allowable lower bound
if (newSessionWindowSize < 0) {
issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
return;
}
// Send a WINDOW_UPDATE frame if less than half the session window size remains
if (newSessionWindowSize <= initialReceiveWindowSize / 2) {
deltaWindowSize = initialReceiveWindowSize - newSessionWindowSize;
spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, deltaWindowSize);
SpdyWindowUpdateFrame spdyWindowUpdateFrame =
new DefaultSpdyWindowUpdateFrame(SPDY_SESSION_STREAM_ID, deltaWindowSize);
ctx.writeAndFlush(spdyWindowUpdateFrame);
}
}
// Check if we received a data frame for a Stream-ID which is not open // Check if we received a data frame for a Stream-ID which is not open
if (!spdySession.isActiveStream(streamId)) { if (!spdySession.isActiveStream(streamId)) {
@ -130,7 +153,7 @@ public class SpdySessionHandler
} }
// Check if we received a data frame before receiving a SYN_REPLY // Check if we received a data frame before receiving a SYN_REPLY
if (!isRemoteInitiatedID(streamId) && !spdySession.hasReceivedReply(streamId)) { if (!isRemoteInitiatedId(streamId) && !spdySession.hasReceivedReply(streamId)) {
spdyDataFrame.release(); spdyDataFrame.release();
issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR); issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
return; return;
@ -168,7 +191,7 @@ public class SpdySessionHandler
} }
} }
// Send a WINDOW_UPDATE frame if less than half the window size remains // Send a WINDOW_UPDATE frame if less than half the stream window size remains
if (newWindowSize <= initialReceiveWindowSize / 2 && !spdyDataFrame.isLast()) { if (newWindowSize <= initialReceiveWindowSize / 2 && !spdyDataFrame.isLast()) {
deltaWindowSize = initialReceiveWindowSize - newWindowSize; deltaWindowSize = initialReceiveWindowSize - newWindowSize;
spdySession.updateReceiveWindowSize(streamId, deltaWindowSize); spdySession.updateReceiveWindowSize(streamId, deltaWindowSize);
@ -204,7 +227,7 @@ public class SpdySessionHandler
// Check if we received a valid SYN_STREAM frame // Check if we received a valid SYN_STREAM frame
if (spdySynStreamFrame.isInvalid() || if (spdySynStreamFrame.isInvalid() ||
!isRemoteInitiatedID(streamId) || !isRemoteInitiatedId(streamId) ||
spdySession.isActiveStream(streamId)) { spdySession.isActiveStream(streamId)) {
issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR); issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
return; return;
@ -239,7 +262,7 @@ public class SpdySessionHandler
// Check if we received a valid SYN_REPLY frame // Check if we received a valid SYN_REPLY frame
if (spdySynReplyFrame.isInvalid() || if (spdySynReplyFrame.isInvalid() ||
isRemoteInitiatedID(streamId) || isRemoteInitiatedId(streamId) ||
spdySession.isRemoteSideClosed(streamId)) { spdySession.isRemoteSideClosed(streamId)) {
issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM); issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
return; return;
@ -279,7 +302,7 @@ public class SpdySessionHandler
int newConcurrentStreams = int newConcurrentStreams =
spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS); spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
if (newConcurrentStreams >= 0) { if (newConcurrentStreams >= 0) {
updateConcurrentStreams(newConcurrentStreams, true); remoteConcurrentStreams = newConcurrentStreams;
} }
// Persistence flag are inconsistent with the use of SETTINGS to communicate // Persistence flag are inconsistent with the use of SETTINGS to communicate
@ -311,7 +334,7 @@ public class SpdySessionHandler
SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg; SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
if (isRemoteInitiatedID(spdyPingFrame.getId())) { if (isRemoteInitiatedId(spdyPingFrame.getId())) {
ctx.writeAndFlush(spdyPingFrame); ctx.writeAndFlush(spdyPingFrame);
return; return;
} }
@ -365,13 +388,17 @@ public class SpdySessionHandler
int deltaWindowSize = spdyWindowUpdateFrame.getDeltaWindowSize(); int deltaWindowSize = spdyWindowUpdateFrame.getDeltaWindowSize();
// Ignore frames for half-closed streams // Ignore frames for half-closed streams
if (spdySession.isLocalSideClosed(streamId)) { if (streamId != SPDY_SESSION_STREAM_ID && spdySession.isLocalSideClosed(streamId)) {
return; return;
} }
// Check for numerical overflow // Check for numerical overflow
if (spdySession.getSendWindowSize(streamId) > Integer.MAX_VALUE - deltaWindowSize) { if (spdySession.getSendWindowSize(streamId) > Integer.MAX_VALUE - deltaWindowSize) {
if (streamId == SPDY_SESSION_STREAM_ID) {
issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
} else {
issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR); issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
}
return; return;
} }
@ -426,7 +453,7 @@ public class SpdySessionHandler
if (msg instanceof SpdyDataFrame) { if (msg instanceof SpdyDataFrame) {
SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg; SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
final int streamId = spdyDataFrame.getStreamId(); int streamId = spdyDataFrame.getStreamId();
// Frames must not be sent on half-closed streams // Frames must not be sent on half-closed streams
if (spdySession.isLocalSideClosed(streamId)) { if (spdySession.isLocalSideClosed(streamId)) {
@ -453,6 +480,11 @@ public class SpdySessionHandler
int dataLength = spdyDataFrame.content().readableBytes(); int dataLength = spdyDataFrame.content().readableBytes();
int sendWindowSize = spdySession.getSendWindowSize(streamId); int sendWindowSize = spdySession.getSendWindowSize(streamId);
if (sessionFlowControl) {
int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
}
if (sendWindowSize <= 0) { if (sendWindowSize <= 0) {
// Stream is stalled -- enqueue Data frame and return // Stream is stalled -- enqueue Data frame and return
spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise)); spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
@ -460,6 +492,9 @@ public class SpdySessionHandler
} else if (sendWindowSize < dataLength) { } else if (sendWindowSize < dataLength) {
// Stream is not stalled but we cannot send the entire frame // Stream is not stalled but we cannot send the entire frame
spdySession.updateSendWindowSize(streamId, -1 * sendWindowSize); spdySession.updateSendWindowSize(streamId, -1 * sendWindowSize);
if (sessionFlowControl) {
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
}
// Create a partial data frame whose length is the current window size // Create a partial data frame whose length is the current window size
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId, SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId,
@ -469,13 +504,13 @@ public class SpdySessionHandler
spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise)); spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
// The transfer window size is pre-decremented when sending a data frame downstream. // The transfer window size is pre-decremented when sending a data frame downstream.
// Close the stream on write failures that leave the transfer window in a corrupt state. // Close the session on write failures that leave the transfer window in a corrupt state.
final ChannelHandlerContext context = ctx; final ChannelHandlerContext context = ctx;
ctx.write(partialDataFrame).addListener(new ChannelFutureListener() { ctx.write(partialDataFrame).addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
issueStreamError(context, streamId, SpdyStreamStatus.INTERNAL_ERROR); issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
} }
} }
}); });
@ -483,15 +518,18 @@ public class SpdySessionHandler
} else { } else {
// Window size is large enough to send entire data frame // Window size is large enough to send entire data frame
spdySession.updateSendWindowSize(streamId, -1 * dataLength); spdySession.updateSendWindowSize(streamId, -1 * dataLength);
if (sessionFlowControl) {
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataLength);
}
// The transfer window size is pre-decremented when sending a data frame downstream. // The transfer window size is pre-decremented when sending a data frame downstream.
// Close the stream on write failures that leave the transfer window in a corrupt state. // Close the session on write failures that leave the transfer window in a corrupt state.
final ChannelHandlerContext context = ctx; final ChannelHandlerContext context = ctx;
promise.addListener(new ChannelFutureListener() { promise.addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
issueStreamError(context, streamId, SpdyStreamStatus.INTERNAL_ERROR); issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
} }
} }
}); });
@ -509,7 +547,7 @@ public class SpdySessionHandler
SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg; SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
int streamId = spdySynStreamFrame.getStreamId(); int streamId = spdySynStreamFrame.getStreamId();
if (isRemoteInitiatedID(streamId)) { if (isRemoteInitiatedId(streamId)) {
promise.setFailure(PROTOCOL_EXCEPTION); promise.setFailure(PROTOCOL_EXCEPTION);
return; return;
} }
@ -528,7 +566,7 @@ public class SpdySessionHandler
int streamId = spdySynReplyFrame.getStreamId(); int streamId = spdySynReplyFrame.getStreamId();
// Frames must not be sent on half-closed streams // Frames must not be sent on half-closed streams
if (!isRemoteInitiatedID(streamId) || spdySession.isLocalSideClosed(streamId)) { if (!isRemoteInitiatedId(streamId) || spdySession.isLocalSideClosed(streamId)) {
promise.setFailure(PROTOCOL_EXCEPTION); promise.setFailure(PROTOCOL_EXCEPTION);
return; return;
} }
@ -550,7 +588,7 @@ public class SpdySessionHandler
int newConcurrentStreams = int newConcurrentStreams =
spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS); spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
if (newConcurrentStreams >= 0) { if (newConcurrentStreams >= 0) {
updateConcurrentStreams(newConcurrentStreams, false); localConcurrentStreams = newConcurrentStreams;
} }
// Persistence flag are inconsistent with the use of SETTINGS to communicate // Persistence flag are inconsistent with the use of SETTINGS to communicate
@ -572,7 +610,7 @@ public class SpdySessionHandler
} else if (msg instanceof SpdyPingFrame) { } else if (msg instanceof SpdyPingFrame) {
SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg; SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
if (isRemoteInitiatedID(spdyPingFrame.getId())) { if (isRemoteInitiatedId(spdyPingFrame.getId())) {
ctx.fireExceptionCaught(new IllegalArgumentException( ctx.fireExceptionCaught(new IllegalArgumentException(
"invalid PING ID: " + spdyPingFrame.getId())); "invalid PING ID: " + spdyPingFrame.getId()));
return; return;
@ -654,20 +692,11 @@ public class SpdySessionHandler
* Helper functions * Helper functions
*/ */
private boolean isRemoteInitiatedID(int id) { private boolean isRemoteInitiatedId(int id) {
boolean serverId = SpdyCodecUtil.isServerId(id); boolean serverId = SpdyCodecUtil.isServerId(id);
return server && !serverId || !server && serverId; return server && !serverId || !server && serverId;
} }
private void updateConcurrentStreams(int newConcurrentStreams, boolean remote) {
if (remote) {
remoteConcurrentStreams = newConcurrentStreams;
} else {
localConcurrentStreams = newConcurrentStreams;
}
maxConcurrentStreams = Math.min(localConcurrentStreams, remoteConcurrentStreams);
}
// need to synchronize to prevent new streams from being created while updating active streams // need to synchronize to prevent new streams from being created while updating active streams
private synchronized void updateInitialSendWindowSize(int newInitialWindowSize) { private synchronized void updateInitialSendWindowSize(int newInitialWindowSize) {
int deltaWindowSize = newInitialWindowSize - initialSendWindowSize; int deltaWindowSize = newInitialWindowSize - initialSendWindowSize;
@ -690,13 +719,15 @@ public class SpdySessionHandler
return false; return false;
} }
if (spdySession.numActiveStreams() >= maxConcurrentStreams) { boolean remote = isRemoteInitiatedId(streamId);
int maxConcurrentStreams = remote ? localConcurrentStreams : remoteConcurrentStreams;
if (spdySession.numActiveStreams(remote) >= maxConcurrentStreams) {
return false; return false;
} }
spdySession.acceptStream( spdySession.acceptStream(
streamId, priority, remoteSideClosed, localSideClosed, streamId, priority, remoteSideClosed, localSideClosed,
initialSendWindowSize, initialReceiveWindowSize); initialSendWindowSize, initialReceiveWindowSize, remote);
if (isRemoteInitiatedID(streamId)) { if (remote) {
lastGoodStreamId = streamId; lastGoodStreamId = streamId;
} }
return true; return true;
@ -704,9 +735,9 @@ public class SpdySessionHandler
private void halfCloseStream(int streamId, boolean remote, ChannelFuture future) { private void halfCloseStream(int streamId, boolean remote, ChannelFuture future) {
if (remote) { if (remote) {
spdySession.closeRemoteSide(streamId); spdySession.closeRemoteSide(streamId, isRemoteInitiatedId(streamId));
} else { } else {
spdySession.closeLocalSide(streamId); spdySession.closeLocalSide(streamId, isRemoteInitiatedId(streamId));
} }
if (closeSessionFutureListener != null && spdySession.noActiveStreams()) { if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
future.addListener(closeSessionFutureListener); future.addListener(closeSessionFutureListener);
@ -714,16 +745,20 @@ public class SpdySessionHandler
} }
private void removeStream(int streamId, ChannelFuture future) { private void removeStream(int streamId, ChannelFuture future) {
spdySession.removeStream(streamId, STREAM_CLOSED); spdySession.removeStream(streamId, STREAM_CLOSED, isRemoteInitiatedId(streamId));
if (closeSessionFutureListener != null && spdySession.noActiveStreams()) { if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
future.addListener(closeSessionFutureListener); future.addListener(closeSessionFutureListener);
} }
} }
private void updateSendWindowSize(final ChannelHandlerContext ctx, final int streamId, int deltaWindowSize) { private void updateSendWindowSize(final ChannelHandlerContext ctx, int streamId, int deltaWindowSize) {
synchronized (flowControlLock) { synchronized (flowControlLock) {
int newWindowSize = spdySession.updateSendWindowSize(streamId, deltaWindowSize); int newWindowSize = spdySession.updateSendWindowSize(streamId, deltaWindowSize);
if (sessionFlowControl && streamId != SPDY_SESSION_STREAM_ID) {
int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
newWindowSize = Math.min(newWindowSize, sessionSendWindowSize);
}
while (newWindowSize > 0) { while (newWindowSize > 0) {
// Check if we have unblocked a stalled stream // Check if we have unblocked a stalled stream
@ -734,42 +769,54 @@ public class SpdySessionHandler
SpdyDataFrame spdyDataFrame = pendingWrite.spdyDataFrame; SpdyDataFrame spdyDataFrame = pendingWrite.spdyDataFrame;
int dataFrameSize = spdyDataFrame.content().readableBytes(); int dataFrameSize = spdyDataFrame.content().readableBytes();
int writeStreamId = spdyDataFrame.getStreamId();
if (sessionFlowControl && streamId == SPDY_SESSION_STREAM_ID) {
newWindowSize = Math.min(newWindowSize, spdySession.getSendWindowSize(writeStreamId));
}
if (newWindowSize >= dataFrameSize) { if (newWindowSize >= dataFrameSize) {
// Window size is large enough to send entire data frame // Window size is large enough to send entire data frame
spdySession.removePendingWrite(streamId); spdySession.removePendingWrite(writeStreamId);
newWindowSize = spdySession.updateSendWindowSize(streamId, -1 * dataFrameSize); newWindowSize = spdySession.updateSendWindowSize(writeStreamId, -1 * dataFrameSize);
if (sessionFlowControl) {
int sessionSendWindowSize =
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataFrameSize);
newWindowSize = Math.min(newWindowSize, sessionSendWindowSize);
}
// Close the local side of the stream if this is the last frame // Close the local side of the stream if this is the last frame
if (spdyDataFrame.isLast()) { if (spdyDataFrame.isLast()) {
halfCloseStream(streamId, false, pendingWrite.promise); halfCloseStream(writeStreamId, false, pendingWrite.promise);
} }
// The transfer window size is pre-decremented when sending a data frame downstream. // The transfer window size is pre-decremented when sending a data frame downstream.
// Close the stream on write failures that leave the transfer window in a corrupt state. // Close the session on write failures that leave the transfer window in a corrupt state.
ctx.writeAndFlush(spdyDataFrame, pendingWrite.promise).addListener(new ChannelFutureListener() { ctx.writeAndFlush(spdyDataFrame, pendingWrite.promise).addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
issueStreamError(ctx, streamId, SpdyStreamStatus.INTERNAL_ERROR); issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
} }
} }
}); });
} else { } else {
// We can send a partial frame // We can send a partial frame
spdySession.updateSendWindowSize(streamId, -1 * newWindowSize); spdySession.updateSendWindowSize(writeStreamId, -1 * newWindowSize);
if (sessionFlowControl) {
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * newWindowSize);
}
// Create a partial data frame whose length is the current window size // Create a partial data frame whose length is the current window size
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId, SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(writeStreamId,
spdyDataFrame.content().readSlice(newWindowSize).retain()); spdyDataFrame.content().readSlice(newWindowSize).retain());
// The transfer window size is pre-decremented when sending a data frame downstream. // The transfer window size is pre-decremented when sending a data frame downstream.
// Close the stream on write failures that leave the transfer window in a corrupt state. // Close the session on write failures that leave the transfer window in a corrupt state.
ctx.writeAndFlush(partialDataFrame).addListener(new ChannelFutureListener() { ctx.writeAndFlush(partialDataFrame).addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
issueStreamError(ctx, streamId, SpdyStreamStatus.INTERNAL_ERROR); issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
} }
} }
}); });

View File

@ -0,0 +1,55 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.spdy;
public enum SpdyVersion {
SPDY_2 (2, false, false),
SPDY_3 (3, true, false),
SPDY_3_1 (3, true, true);
static SpdyVersion valueOf(int version) {
if (version == 2) {
return SPDY_2;
}
if (version == 3) {
return SPDY_3;
}
throw new IllegalArgumentException(
"unsupported version: " + version);
}
private final int version;
private final boolean flowControl;
private final boolean sessionFlowControl;
private SpdyVersion(int version, boolean flowControl, boolean sessionFlowControl) {
this.version = version;
this.flowControl = flowControl;
this.sessionFlowControl = sessionFlowControl;
}
int getVersion() {
return version;
}
boolean useFlowControl() {
return flowControl;
}
boolean useSessionFlowControl() {
return sessionFlowControl;
}
}

View File

@ -34,7 +34,6 @@ import java.net.InetSocketAddress;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import static io.netty.handler.codec.spdy.SpdyConstants.*;
import static org.junit.Assert.*; import static org.junit.Assert.*;
public class SpdyFrameDecoderTest { public class SpdyFrameDecoderTest {
@ -48,8 +47,12 @@ public class SpdyFrameDecoderTest {
@Test @Test
public void testTooLargeHeaderNameOnSynStreamRequest() throws Exception { public void testTooLargeHeaderNameOnSynStreamRequest() throws Exception {
for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version++) { testTooLargeHeaderNameOnSynStreamRequest(SpdyVersion.SPDY_2);
final int finalVersion = version; testTooLargeHeaderNameOnSynStreamRequest(SpdyVersion.SPDY_3);
testTooLargeHeaderNameOnSynStreamRequest(SpdyVersion.SPDY_3_1);
}
private void testTooLargeHeaderNameOnSynStreamRequest(final SpdyVersion version) throws Exception {
List<Integer> headerSizes = Arrays.asList(90, 900); List<Integer> headerSizes = Arrays.asList(90, 900);
for (final int maxHeaderSize : headerSizes) { // 90 catches the header name, 900 the value for (final int maxHeaderSize : headerSizes) { // 90 catches the header name, 900 the value
SpdyHeadersFrame frame = new DefaultSpdySynStreamFrame(1, 0, (byte) 0); SpdyHeadersFrame frame = new DefaultSpdySynStreamFrame(1, 0, (byte) 0);
@ -62,8 +65,8 @@ public class SpdyFrameDecoderTest {
@Override @Override
protected void initChannel(SocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast( ch.pipeline().addLast(
new SpdyFrameDecoder(finalVersion, 10000, maxHeaderSize), new SpdyFrameDecoder(version, 10000, maxHeaderSize),
new SpdySessionHandler(finalVersion, true), new SpdySessionHandler(version, true),
captureHandler); captureHandler);
} }
}); });
@ -74,7 +77,7 @@ public class SpdyFrameDecoderTest {
cb.handler(new ChannelInitializer<SocketChannel>() { cb.handler(new ChannelInitializer<SocketChannel>() {
@Override @Override
protected void initChannel(SocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new SpdyFrameEncoder(finalVersion)); ch.pipeline().addLast(new SpdyFrameEncoder(version));
} }
}); });
Channel sc = sb.bind(0).sync().channel(); Channel sc = sb.bind(0).sync().channel();
@ -100,7 +103,6 @@ public class SpdyFrameDecoderTest {
cc.close().sync(); cc.close().sync();
} }
} }
}
private static void sendAndWaitForFrame(Channel cc, SpdyFrame frame, CaptureHandler handler) { private static void sendAndWaitForFrame(Channel cc, SpdyFrame frame, CaptureHandler handler) {
cc.writeAndFlush(frame); cc.writeAndFlush(frame);

View File

@ -92,7 +92,7 @@ public class SpdySessionHandlerTest {
assertTrue(spdyHeadersFrame.headers().entries().isEmpty()); assertTrue(spdyHeadersFrame.headers().entries().isEmpty());
} }
private static void testSpdySessionHandler(int version, boolean server) { private static void testSpdySessionHandler(SpdyVersion version, boolean server) {
EmbeddedChannel sessionHandler = new EmbeddedChannel( EmbeddedChannel sessionHandler = new EmbeddedChannel(
new SpdySessionHandler(version, server), new EchoHandler(closeSignal, server)); new SpdySessionHandler(version, server), new EchoHandler(closeSignal, server));
@ -103,9 +103,6 @@ public class SpdySessionHandlerTest {
int localStreamId = server ? 1 : 2; int localStreamId = server ? 1 : 2;
int remoteStreamId = server ? 2 : 1; int remoteStreamId = server ? 2 : 1;
SpdyPingFrame localPingFrame = new DefaultSpdyPingFrame(localStreamId);
SpdyPingFrame remotePingFrame = new DefaultSpdyPingFrame(remoteStreamId);
SpdySynStreamFrame spdySynStreamFrame = SpdySynStreamFrame spdySynStreamFrame =
new DefaultSpdySynStreamFrame(localStreamId, 0, (byte) 0); new DefaultSpdySynStreamFrame(localStreamId, 0, (byte) 0);
spdySynStreamFrame.headers().set("Compression", "test"); spdySynStreamFrame.headers().set("Compression", "test");
@ -126,24 +123,11 @@ public class SpdySessionHandlerTest {
assertNull(sessionHandler.readOutbound()); assertNull(sessionHandler.readOutbound());
remoteStreamId += 2; remoteStreamId += 2;
// Check if session handler correctly limits the number of
// concurrent streams in the SETTINGS frame
SpdySettingsFrame spdySettingsFrame = new DefaultSpdySettingsFrame();
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 0);
sessionHandler.writeInbound(spdySettingsFrame);
assertNull(sessionHandler.readOutbound());
sessionHandler.writeInbound(spdySynStreamFrame);
assertRstStream(sessionHandler.readOutbound(), localStreamId, SpdyStreamStatus.REFUSED_STREAM);
assertNull(sessionHandler.readOutbound());
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 100);
sessionHandler.writeInbound(spdySettingsFrame);
assertNull(sessionHandler.readOutbound());
sessionHandler.writeInbound(new DefaultSpdySynReplyFrame(remoteStreamId));
assertNull(sessionHandler.readOutbound());
// Check if session handler returns PROTOCOL_ERROR if it receives // Check if session handler returns PROTOCOL_ERROR if it receives
// multiple SYN_REPLY frames for the same active Stream-ID // multiple SYN_REPLY frames for the same active Stream-ID
sessionHandler.writeInbound(new DefaultSpdySynReplyFrame(remoteStreamId)); sessionHandler.writeInbound(new DefaultSpdySynReplyFrame(remoteStreamId));
assertNull(sessionHandler.readOutbound());
sessionHandler.writeInbound(new DefaultSpdySynReplyFrame(remoteStreamId));
assertRstStream(sessionHandler.readOutbound(), remoteStreamId, SpdyStreamStatus.STREAM_IN_USE); assertRstStream(sessionHandler.readOutbound(), remoteStreamId, SpdyStreamStatus.STREAM_IN_USE);
assertNull(sessionHandler.readOutbound()); assertNull(sessionHandler.readOutbound());
remoteStreamId += 2; remoteStreamId += 2;
@ -173,6 +157,17 @@ public class SpdySessionHandlerTest {
assertRstStream(sessionHandler.readOutbound(), localStreamId, SpdyStreamStatus.REFUSED_STREAM); assertRstStream(sessionHandler.readOutbound(), localStreamId, SpdyStreamStatus.REFUSED_STREAM);
assertNull(sessionHandler.readOutbound()); assertNull(sessionHandler.readOutbound());
// Check if session handler rejects HEADERS for closed streams
int testStreamId = spdyDataFrame.getStreamId();
sessionHandler.writeInbound(spdyDataFrame);
assertDataFrame(sessionHandler.readOutbound(), testStreamId, spdyDataFrame.isLast());
assertNull(sessionHandler.readOutbound());
spdyHeadersFrame.setStreamId(testStreamId);
sessionHandler.writeInbound(spdyHeadersFrame);
assertRstStream(sessionHandler.readOutbound(), testStreamId, SpdyStreamStatus.INVALID_STREAM);
assertNull(sessionHandler.readOutbound());
// Check if session handler drops active streams if it receives // Check if session handler drops active streams if it receives
// a RST_STREAM frame for that Stream-ID // a RST_STREAM frame for that Stream-ID
sessionHandler.writeInbound(new DefaultSpdyRstStreamFrame(remoteStreamId, 3)); sessionHandler.writeInbound(new DefaultSpdyRstStreamFrame(remoteStreamId, 3));
@ -200,32 +195,6 @@ public class SpdySessionHandlerTest {
assertNull(sessionHandler.readOutbound()); assertNull(sessionHandler.readOutbound());
spdySynStreamFrame.setStreamId(localStreamId); spdySynStreamFrame.setStreamId(localStreamId);
// Check if session handler correctly handles updates to the max
// concurrent streams in the SETTINGS frame
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 2);
sessionHandler.writeInbound(spdySettingsFrame);
assertNull(sessionHandler.readOutbound());
sessionHandler.writeInbound(spdySynStreamFrame);
assertRstStream(sessionHandler.readOutbound(), localStreamId, SpdyStreamStatus.REFUSED_STREAM);
assertNull(sessionHandler.readOutbound());
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 4);
sessionHandler.writeInbound(spdySettingsFrame);
assertNull(sessionHandler.readOutbound());
sessionHandler.writeInbound(spdySynStreamFrame);
assertSynReply(sessionHandler.readOutbound(), localStreamId, false, spdySynStreamFrame.headers());
assertNull(sessionHandler.readOutbound());
// Check if session handler rejects HEADERS for closed streams
int testStreamId = spdyDataFrame.getStreamId();
sessionHandler.writeInbound(spdyDataFrame);
assertDataFrame(sessionHandler.readOutbound(), testStreamId, spdyDataFrame.isLast());
assertNull(sessionHandler.readOutbound());
spdyHeadersFrame.setStreamId(testStreamId);
sessionHandler.writeInbound(spdyHeadersFrame);
assertRstStream(sessionHandler.readOutbound(), testStreamId, SpdyStreamStatus.INVALID_STREAM);
assertNull(sessionHandler.readOutbound());
// Check if session handler returns PROTOCOL_ERROR if it receives // Check if session handler returns PROTOCOL_ERROR if it receives
// an invalid HEADERS frame // an invalid HEADERS frame
spdyHeadersFrame.setStreamId(localStreamId); spdyHeadersFrame.setStreamId(localStreamId);
@ -235,6 +204,23 @@ public class SpdySessionHandlerTest {
assertRstStream(sessionHandler.readOutbound(), localStreamId, SpdyStreamStatus.PROTOCOL_ERROR); assertRstStream(sessionHandler.readOutbound(), localStreamId, SpdyStreamStatus.PROTOCOL_ERROR);
assertNull(sessionHandler.readOutbound()); assertNull(sessionHandler.readOutbound());
sessionHandler.finish();
}
private static void testSpdySessionHandlerPing(SpdyVersion version, boolean server) {
EmbeddedChannel sessionHandler = new EmbeddedChannel(
new SpdySessionHandler(version, server), new EchoHandler(closeSignal, server));
while (sessionHandler.readOutbound() != null) {
continue;
}
int localStreamId = server ? 1 : 2;
int remoteStreamId = server ? 2 : 1;
SpdyPingFrame localPingFrame = new DefaultSpdyPingFrame(localStreamId);
SpdyPingFrame remotePingFrame = new DefaultSpdyPingFrame(remoteStreamId);
// Check if session handler returns identical local PINGs // Check if session handler returns identical local PINGs
sessionHandler.writeInbound(localPingFrame); sessionHandler.writeInbound(localPingFrame);
assertPing(sessionHandler.readOutbound(), localPingFrame.getId()); assertPing(sessionHandler.readOutbound(), localPingFrame.getId());
@ -244,6 +230,34 @@ public class SpdySessionHandlerTest {
sessionHandler.writeInbound(remotePingFrame); sessionHandler.writeInbound(remotePingFrame);
assertNull(sessionHandler.readOutbound()); assertNull(sessionHandler.readOutbound());
sessionHandler.finish();
}
private static void testSpdySessionHandlerGoAway(SpdyVersion version, boolean server) {
EmbeddedChannel sessionHandler = new EmbeddedChannel(
new SpdySessionHandler(version, server), new EchoHandler(closeSignal, server));
while (sessionHandler.readOutbound() != null) {
continue;
}
int localStreamId = server ? 1 : 2;
SpdySynStreamFrame spdySynStreamFrame =
new DefaultSpdySynStreamFrame(localStreamId, 0, (byte) 0);
spdySynStreamFrame.headers().set("Compression", "test");
SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(localStreamId);
spdyDataFrame.setLast(true);
// Send an initial request
sessionHandler.writeInbound(spdySynStreamFrame);
assertSynReply(sessionHandler.readOutbound(), localStreamId, false, spdySynStreamFrame.headers());
assertNull(sessionHandler.readOutbound());
sessionHandler.writeInbound(spdyDataFrame);
assertDataFrame(sessionHandler.readOutbound(), localStreamId, true);
assertNull(sessionHandler.readOutbound());
// Check if session handler sends a GOAWAY frame when closing // Check if session handler sends a GOAWAY frame when closing
sessionHandler.writeInbound(closeMessage); sessionHandler.writeInbound(closeMessage);
assertGoAway(sessionHandler.readOutbound(), localStreamId); assertGoAway(sessionHandler.readOutbound(), localStreamId);
@ -268,22 +282,66 @@ public class SpdySessionHandlerTest {
@Test @Test
public void testSpdyClientSessionHandler() { public void testSpdyClientSessionHandler() {
for (int version = SpdyConstants.SPDY_MIN_VERSION; version <= SpdyConstants.SPDY_MAX_VERSION; version ++) { logger.info("Running: testSpdyClientSessionHandler v2");
logger.info("Running: testSpdyClientSessionHandler v" + version); testSpdySessionHandler(SpdyVersion.SPDY_2, false);
testSpdySessionHandler(version, false); logger.info("Running: testSpdyClientSessionHandler v3");
testSpdySessionHandler(SpdyVersion.SPDY_3, false);
logger.info("Running: testSpdyClientSessionHandler v3.1");
testSpdySessionHandler(SpdyVersion.SPDY_3_1, false);
} }
@Test
public void testSpdyClientSessionHandlerPing() {
logger.info("Running: testSpdyClientSessionHandlerPing v2");
testSpdySessionHandlerPing(SpdyVersion.SPDY_2, false);
logger.info("Running: testSpdyClientSessionHandlerPing v3");
testSpdySessionHandlerPing(SpdyVersion.SPDY_3, false);
logger.info("Running: testSpdyClientSessionHandlerPing v3.1");
testSpdySessionHandlerPing(SpdyVersion.SPDY_3_1, false);
}
@Test
public void testSpdyClientSessionHandlerGoAway() {
logger.info("Running: testSpdyClientSessionHandlerGoAway v2");
testSpdySessionHandlerGoAway(SpdyVersion.SPDY_2, false);
logger.info("Running: testSpdyClientSessionHandlerGoAway v3");
testSpdySessionHandlerGoAway(SpdyVersion.SPDY_3, false);
logger.info("Running: testSpdyClientSessionHandlerGoAway v3.1");
testSpdySessionHandlerGoAway(SpdyVersion.SPDY_3_1, false);
} }
@Test @Test
public void testSpdyServerSessionHandler() { public void testSpdyServerSessionHandler() {
for (int version = SpdyConstants.SPDY_MIN_VERSION; version <= SpdyConstants.SPDY_MAX_VERSION; version ++) { logger.info("Running: testSpdyServerSessionHandler v2");
logger.info("Running: testSpdyServerSessionHandler v" + version); testSpdySessionHandler(SpdyVersion.SPDY_2, true);
testSpdySessionHandler(version, true); logger.info("Running: testSpdyServerSessionHandler v3");
testSpdySessionHandler(SpdyVersion.SPDY_3, true);
logger.info("Running: testSpdyServerSessionHandler v3.1");
testSpdySessionHandler(SpdyVersion.SPDY_3_1, true);
} }
@Test
public void testSpdyServerSessionHandlerPing() {
logger.info("Running: testSpdyServerSessionHandlerPing v2");
testSpdySessionHandlerPing(SpdyVersion.SPDY_2, true);
logger.info("Running: testSpdyServerSessionHandlerPing v3");
testSpdySessionHandlerPing(SpdyVersion.SPDY_3, true);
logger.info("Running: testSpdyServerSessionHandlerPing v3.1");
testSpdySessionHandlerPing(SpdyVersion.SPDY_3_1, true);
}
@Test
public void testSpdyServerSessionHandlerGoAway() {
logger.info("Running: testSpdyServerSessionHandlerGoAway v2");
testSpdySessionHandlerGoAway(SpdyVersion.SPDY_2, true);
logger.info("Running: testSpdyServerSessionHandlerGoAway v3");
testSpdySessionHandlerGoAway(SpdyVersion.SPDY_3, true);
logger.info("Running: testSpdyServerSessionHandlerGoAway v3.1");
testSpdySessionHandlerGoAway(SpdyVersion.SPDY_3_1, true);
} }
// Echo Handler opens 4 half-closed streams on session connection // Echo Handler opens 4 half-closed streams on session connection
// and then sets the number of concurrent streams to 3 // and then sets the number of concurrent streams to 1
private static class EchoHandler extends ChannelInboundHandlerAdapter { private static class EchoHandler extends ChannelInboundHandlerAdapter {
private final int closeSignal; private final int closeSignal;
private final boolean server; private final boolean server;
@ -308,9 +366,9 @@ public class SpdySessionHandlerTest {
spdySynStreamFrame.setStreamId(spdySynStreamFrame.getStreamId() + 2); spdySynStreamFrame.setStreamId(spdySynStreamFrame.getStreamId() + 2);
ctx.writeAndFlush(spdySynStreamFrame); ctx.writeAndFlush(spdySynStreamFrame);
// Limit the number of concurrent streams to 3 // Limit the number of concurrent streams to 1
SpdySettingsFrame spdySettingsFrame = new DefaultSpdySettingsFrame(); SpdySettingsFrame spdySettingsFrame = new DefaultSpdySettingsFrame();
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 3); spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 1);
ctx.writeAndFlush(spdySettingsFrame); ctx.writeAndFlush(spdySettingsFrame);
} }

View File

@ -25,9 +25,9 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.spdy.SpdyConstants;
import io.netty.handler.codec.spdy.SpdyFrameDecoder; import io.netty.handler.codec.spdy.SpdyFrameDecoder;
import io.netty.handler.codec.spdy.SpdyFrameEncoder; import io.netty.handler.codec.spdy.SpdyFrameEncoder;
import io.netty.handler.codec.spdy.SpdyVersion;
import io.netty.util.NetUtil; import io.netty.util.NetUtil;
import org.junit.Test; import org.junit.Test;
@ -165,19 +165,37 @@ public class SocketSpdyEchoTest extends AbstractSocketTest {
return frames; return frames;
} }
private int version; private SpdyVersion version;
@Test(timeout = 15000) @Test(timeout = 15000)
public void testSpdyEcho() throws Throwable { public void testSpdyEcho() throws Throwable {
for (version = SpdyConstants.SPDY_MIN_VERSION; version <= SpdyConstants.SPDY_MAX_VERSION; version ++) { version = SpdyVersion.SPDY_2;
logger.info("Testing against SPDY v" + version); logger.info("Testing against SPDY v2");
run();
version = SpdyVersion.SPDY_3;
logger.info("Testing against SPDY v3");
run();
version = SpdyVersion.SPDY_3_1;
logger.info("Testing against SPDY v3.1");
run(); run();
}
} }
public void testSpdyEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testSpdyEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
ByteBuf frames = createFrames(version); ByteBuf frames;
switch (version) {
case SPDY_2:
frames = createFrames(2);
break;
case SPDY_3:
case SPDY_3_1:
frames = createFrames(3);
break;
default:
throw new IllegalArgumentException("unknown version");
}
final SpdyEchoTestServerHandler sh = new SpdyEchoTestServerHandler(); final SpdyEchoTestServerHandler sh = new SpdyEchoTestServerHandler();
final SpdyEchoTestClientHandler ch = new SpdyEchoTestClientHandler(frames.copy()); final SpdyEchoTestClientHandler ch = new SpdyEchoTestClientHandler(frames.copy());