SPDY: add SPDY/3.1 support

- with Michael Schore <mschore@twitter.com>
This commit is contained in:
Jeff Pinner 2013-09-27 10:48:16 -07:00 committed by Trustin Lee
parent a79dfe74b7
commit 39ae2dd3f1
22 changed files with 472 additions and 252 deletions

View File

@ -20,6 +20,8 @@ import io.netty.util.CharsetUtil;
final class SpdyCodecUtil {
static final int SPDY_SESSION_STREAM_ID = 0;
static final int SPDY_HEADER_TYPE_OFFSET = 2;
static final int SPDY_HEADER_FLAGS_OFFSET = 4;
static final int SPDY_HEADER_LENGTH_OFFSET = 5;

View File

@ -1,24 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.spdy;
public final class SpdyConstants {
public static final int SPDY_MIN_VERSION = 2;
public static final int SPDY_MAX_VERSION = 3;
private SpdyConstants() { }
}

View File

@ -28,7 +28,7 @@ public final class SpdyFrameCodec extends CombinedChannelDuplexHandler<SpdyFrame
* {@code compressionLevel (6)}, {@code windowBits (15)},
* and {@code memLevel (8)}).
*/
public SpdyFrameCodec(int version) {
public SpdyFrameCodec(SpdyVersion version) {
this(version, 8192, 16384, 6, 15, 8);
}
@ -36,7 +36,7 @@ public final class SpdyFrameCodec extends CombinedChannelDuplexHandler<SpdyFrame
* Creates a new instance with the specified decoder and encoder options.
*/
public SpdyFrameCodec(
int version, int maxChunkSize, int maxHeaderSize,
SpdyVersion version, int maxChunkSize, int maxHeaderSize,
int compressionLevel, int windowBits, int memLevel) {
super(
new SpdyFrameDecoder(version, maxChunkSize, maxHeaderSize),

View File

@ -83,28 +83,27 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder {
* Creates a new instance with the specified {@code version} and the default
* {@code maxChunkSize (8192)} and {@code maxHeaderSize (16384)}.
*/
public SpdyFrameDecoder(int version) {
public SpdyFrameDecoder(SpdyVersion version) {
this(version, 8192, 16384);
}
/**
* Creates a new instance with the specified parameters.
*/
public SpdyFrameDecoder(int version, int maxChunkSize, int maxHeaderSize) {
public SpdyFrameDecoder(SpdyVersion version, int maxChunkSize, int maxHeaderSize) {
this(version, maxChunkSize, SpdyHeaderBlockDecoder.newInstance(version, maxHeaderSize));
}
protected SpdyFrameDecoder(
int version, int maxChunkSize, SpdyHeaderBlockDecoder headerBlockDecoder) {
if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) {
throw new IllegalArgumentException(
"unsupported version: " + version);
SpdyVersion version, int maxChunkSize, SpdyHeaderBlockDecoder headerBlockDecoder) {
if (version == null) {
throw new NullPointerException("version");
}
if (maxChunkSize <= 0) {
throw new IllegalArgumentException(
"maxChunkSize must be a positive integer: " + maxChunkSize);
}
spdyVersion = version;
spdyVersion = version.getVersion();
this.maxChunkSize = maxChunkSize;
this.headerBlockDecoder = headerBlockDecoder;
state = State.READ_COMMON_HEADER;

View File

@ -39,24 +39,23 @@ public class SpdyFrameEncoder extends MessageToByteEncoder<SpdyFrame> {
* default {@code compressionLevel (6)}, {@code windowBits (15)},
* and {@code memLevel (8)}.
*/
public SpdyFrameEncoder(int version) {
public SpdyFrameEncoder(SpdyVersion version) {
this(version, 6, 15, 8);
}
/**
* Creates a new instance with the specified parameters.
*/
public SpdyFrameEncoder(int version, int compressionLevel, int windowBits, int memLevel) {
public SpdyFrameEncoder(SpdyVersion version, int compressionLevel, int windowBits, int memLevel) {
this(version, SpdyHeaderBlockEncoder.newInstance(
version, compressionLevel, windowBits, memLevel));
}
protected SpdyFrameEncoder(int version, SpdyHeaderBlockEncoder headerBlockEncoder) {
if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) {
throw new IllegalArgumentException(
"unknown version: " + version);
protected SpdyFrameEncoder(SpdyVersion version, SpdyHeaderBlockEncoder headerBlockEncoder) {
if (version == null) {
throw new NullPointerException("version");
}
this.version = version;
this.version = version.getVersion();
this.headerBlockEncoder = headerBlockEncoder;
}

View File

@ -19,7 +19,7 @@ import io.netty.buffer.ByteBuf;
abstract class SpdyHeaderBlockDecoder {
static SpdyHeaderBlockDecoder newInstance(int version, int maxHeaderSize) {
static SpdyHeaderBlockDecoder newInstance(SpdyVersion version, int maxHeaderSize) {
return new SpdyHeaderBlockZlibDecoder(version, maxHeaderSize);
}

View File

@ -22,7 +22,7 @@ import io.netty.util.internal.PlatformDependent;
abstract class SpdyHeaderBlockEncoder {
static SpdyHeaderBlockEncoder newInstance(
int version, int compressionLevel, int windowBits, int memLevel) {
SpdyVersion version, int compressionLevel, int windowBits, int memLevel) {
if (PlatformDependent.javaVersion() >= 7) {
return new SpdyHeaderBlockZlibEncoder(

View File

@ -31,7 +31,7 @@ class SpdyHeaderBlockJZlibEncoder extends SpdyHeaderBlockRawEncoder {
private boolean finished;
public SpdyHeaderBlockJZlibEncoder(
int version, int compressionLevel, int windowBits, int memLevel) {
SpdyVersion version, int compressionLevel, int windowBits, int memLevel) {
super(version);
if (compressionLevel < 0 || compressionLevel > 9) {
throw new IllegalArgumentException(
@ -52,7 +52,7 @@ class SpdyHeaderBlockJZlibEncoder extends SpdyHeaderBlockRawEncoder {
throw new CompressionException(
"failed to initialize an SPDY header block deflater: " + resultCode);
} else {
if (version < 3) {
if (version.getVersion() < 3) {
resultCode = z.deflateSetDictionary(SPDY2_DICT, SPDY2_DICT.length);
} else {
resultCode = z.deflateSetDictionary(SPDY_DICT, SPDY_DICT.length);

View File

@ -29,15 +29,14 @@ public class SpdyHeaderBlockRawDecoder extends SpdyHeaderBlockDecoder {
private int headerSize;
private int numHeaders;
public SpdyHeaderBlockRawDecoder(int version, int maxHeaderSize) {
if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) {
throw new IllegalArgumentException(
"unsupported version: " + version);
public SpdyHeaderBlockRawDecoder(SpdyVersion version, int maxHeaderSize) {
if (version == null) {
throw new NullPointerException("version");
}
this.version = version;
this.version = version.getVersion();
this.maxHeaderSize = maxHeaderSize;
lengthFieldSize = version < 3 ? 2 : 4;
lengthFieldSize = this.version < 3 ? 2 : 4;
reset();
}

View File

@ -27,12 +27,11 @@ public class SpdyHeaderBlockRawEncoder extends SpdyHeaderBlockEncoder {
private final int version;
public SpdyHeaderBlockRawEncoder(int version) {
if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) {
throw new IllegalArgumentException(
"unknown version: " + version);
public SpdyHeaderBlockRawEncoder(SpdyVersion version) {
if (version == null) {
throw new NullPointerException("version");
}
this.version = version;
this.version = version.getVersion();
}
private void setLengthField(ByteBuf buffer, int writerIndex, int length) {

View File

@ -31,9 +31,9 @@ class SpdyHeaderBlockZlibDecoder extends SpdyHeaderBlockRawDecoder {
private ByteBuf decompressed;
public SpdyHeaderBlockZlibDecoder(int version, int maxHeaderSize) {
public SpdyHeaderBlockZlibDecoder(SpdyVersion version, int maxHeaderSize) {
super(version, maxHeaderSize);
this.version = version;
this.version = version.getVersion();
}
@Override

View File

@ -30,14 +30,14 @@ class SpdyHeaderBlockZlibEncoder extends SpdyHeaderBlockRawEncoder {
private boolean finished;
public SpdyHeaderBlockZlibEncoder(int version, int compressionLevel) {
public SpdyHeaderBlockZlibEncoder(SpdyVersion version, int compressionLevel) {
super(version);
if (compressionLevel < 0 || compressionLevel > 9) {
throw new IllegalArgumentException(
"compressionLevel: " + compressionLevel + " (expected: 0-9)");
}
compressor = new Deflater(compressionLevel);
if (version < 3) {
if (version.getVersion() < 3) {
compressor.setDictionary(SPDY2_DICT);
} else {
compressor.setDictionary(SPDY_DICT);

View File

@ -25,7 +25,7 @@ public final class SpdyHttpCodec
/**
* Creates a new instance with the specified decoder options.
*/
public SpdyHttpCodec(int version, int maxContentLength) {
public SpdyHttpCodec(SpdyVersion version, int maxContentLength) {
super(new SpdyHttpDecoder(version, maxContentLength), new SpdyHttpEncoder(version));
}
}

View File

@ -51,7 +51,7 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyFrame> {
* If the length of the message content exceeds this value,
* a {@link TooLongFrameException} will be raised.
*/
public SpdyHttpDecoder(int version, int maxContentLength) {
public SpdyHttpDecoder(SpdyVersion version, int maxContentLength) {
this(version, maxContentLength, new HashMap<Integer, FullHttpMessage>());
}
@ -64,16 +64,15 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyFrame> {
* a {@link TooLongFrameException} will be raised.
* @param messageMap the {@link Map} used to hold partially received messages.
*/
protected SpdyHttpDecoder(int version, int maxContentLength, Map<Integer, FullHttpMessage> messageMap) {
if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) {
throw new IllegalArgumentException(
"unsupported version: " + version);
protected SpdyHttpDecoder(SpdyVersion version, int maxContentLength, Map<Integer, FullHttpMessage> messageMap) {
if (version == null) {
throw new NullPointerException("version");
}
if (maxContentLength <= 0) {
throw new IllegalArgumentException(
"maxContentLength must be a positive integer: " + maxContentLength);
}
spdyVersion = version;
spdyVersion = version.getVersion();
this.maxContentLength = maxContentLength;
this.messageMap = messageMap;
}

View File

@ -130,12 +130,11 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder<HttpObject> {
*
* @param version the protocol version
*/
public SpdyHttpEncoder(int version) {
if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) {
throw new IllegalArgumentException(
"unsupported version: " + version);
public SpdyHttpEncoder(SpdyVersion version) {
if (version == null) {
throw new NullPointerException("version");
}
spdyVersion = version;
spdyVersion = version.getVersion();
}
@Override

View File

@ -41,6 +41,7 @@ public abstract class SpdyOrHttpChooser extends ByteToMessageDecoder {
public enum SelectedProtocol {
SPDY_2,
SPDY_3,
SPDY_3_1,
HTTP_1_1,
HTTP_1_0,
UNKNOWN
@ -84,10 +85,13 @@ public abstract class SpdyOrHttpChooser extends ByteToMessageDecoder {
// Not done with choosing the protocol, so just return here for now,
return false;
case SPDY_2:
addSpdyHandlers(ctx, 2);
addSpdyHandlers(ctx, SpdyVersion.SPDY_2);
break;
case SPDY_3:
addSpdyHandlers(ctx, 3);
addSpdyHandlers(ctx, SpdyVersion.SPDY_3);
break;
case SPDY_3_1:
addSpdyHandlers(ctx, SpdyVersion.SPDY_3_1);
break;
case HTTP_1_0:
case HTTP_1_1:
@ -102,7 +106,7 @@ public abstract class SpdyOrHttpChooser extends ByteToMessageDecoder {
/**
* Add all {@link ChannelHandler}'s that are needed for SPDY with the given version.
*/
protected void addSpdyHandlers(ChannelHandlerContext ctx, int version) {
protected void addSpdyHandlers(ChannelHandlerContext ctx, SpdyVersion version) {
ChannelPipeline pipeline = ctx.pipeline();
pipeline.addLast("spdyDecoder", new SpdyFrameDecoder(version));
pipeline.addLast("spdyEncoder", new SpdyFrameEncoder(version));

View File

@ -26,12 +26,28 @@ import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_SESSION_STREAM_ID;
final class SpdySession {
private final AtomicInteger activeLocalStreams = new AtomicInteger();
private final AtomicInteger activeRemoteStreams = new AtomicInteger();
private final Map<Integer, StreamState> activeStreams = PlatformDependent.newConcurrentHashMap();
int numActiveStreams() {
return activeStreams.size();
private final AtomicInteger sendWindowSize;
private final AtomicInteger receiveWindowSize;
public SpdySession(int sendWindowSize, int receiveWindowSize) {
this.sendWindowSize = new AtomicInteger(sendWindowSize);
this.receiveWindowSize = new AtomicInteger(receiveWindowSize);
}
int numActiveStreams(boolean remote) {
if (remote) {
return activeRemoteStreams.get();
} else {
return activeLocalStreams.get();
}
}
boolean noActiveStreams() {
@ -51,15 +67,34 @@ final class SpdySession {
void acceptStream(
int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed,
int sendWindowSize, int receiveWindowSize) {
int sendWindowSize, int receiveWindowSize, boolean remote) {
if (!remoteSideClosed || !localSideClosed) {
activeStreams.put(streamId, new StreamState(
StreamState state = activeStreams.put(streamId, new StreamState(
priority, remoteSideClosed, localSideClosed, sendWindowSize, receiveWindowSize));
if (state == null) {
if (remote) {
activeRemoteStreams.incrementAndGet();
} else {
activeLocalStreams.incrementAndGet();
}
}
}
}
void removeStream(int streamId, Throwable cause) {
private StreamState removeActiveStream(int streamId, boolean remote) {
StreamState state = activeStreams.remove(streamId);
if (state != null) {
if (remote) {
activeRemoteStreams.decrementAndGet();
} else {
activeLocalStreams.decrementAndGet();
}
}
return state;
}
void removeStream(int streamId, Throwable cause, boolean remote) {
StreamState state = removeActiveStream(streamId, remote);
if (state != null) {
state.clearPendingWrites(cause);
}
@ -70,12 +105,12 @@ final class SpdySession {
return state == null || state.isRemoteSideClosed();
}
void closeRemoteSide(int streamId) {
void closeRemoteSide(int streamId, boolean remote) {
StreamState state = activeStreams.get(streamId);
if (state != null) {
state.closeRemoteSide();
if (state.isLocalSideClosed()) {
activeStreams.remove(streamId);
removeActiveStream(streamId, remote);
}
}
}
@ -85,12 +120,12 @@ final class SpdySession {
return state == null || state.isLocalSideClosed();
}
void closeLocalSide(int streamId) {
void closeLocalSide(int streamId, boolean remote) {
StreamState state = activeStreams.get(streamId);
if (state != null) {
state.closeLocalSide();
if (state.isRemoteSideClosed()) {
activeStreams.remove(streamId);
removeActiveStream(streamId, remote);
}
}
}
@ -112,16 +147,28 @@ final class SpdySession {
}
int getSendWindowSize(int streamId) {
if (streamId == SPDY_SESSION_STREAM_ID) {
return sendWindowSize.get();
}
StreamState state = activeStreams.get(streamId);
return state != null ? state.getSendWindowSize() : -1;
}
int updateSendWindowSize(int streamId, int deltaWindowSize) {
if (streamId == SPDY_SESSION_STREAM_ID) {
return sendWindowSize.addAndGet(deltaWindowSize);
}
StreamState state = activeStreams.get(streamId);
return state != null ? state.updateSendWindowSize(deltaWindowSize) : -1;
}
int updateReceiveWindowSize(int streamId, int deltaWindowSize) {
if (streamId == SPDY_SESSION_STREAM_ID) {
return receiveWindowSize.addAndGet(deltaWindowSize);
}
StreamState state = activeStreams.get(streamId);
if (deltaWindowSize > 0) {
state.setReceiveWindowSizeLowerBound(0);
@ -130,6 +177,10 @@ final class SpdySession {
}
int getReceiveWindowSizeLowerBound(int streamId) {
if (streamId == SPDY_SESSION_STREAM_ID) {
return 0;
}
StreamState state = activeStreams.get(streamId);
return state != null ? state.getReceiveWindowSizeLowerBound() : 0;
}
@ -155,6 +206,19 @@ final class SpdySession {
}
PendingWrite getPendingWrite(int streamId) {
if (streamId == SPDY_SESSION_STREAM_ID) {
for (Integer id : getActiveStreams()) {
StreamState state = activeStreams.get(id);
if (state.getSendWindowSize() > 0) {
PendingWrite pendingWrite = state.getPendingWrite();
if (pendingWrite != null) {
return pendingWrite;
}
}
}
return null;
}
StreamState state = activeStreams.get(streamId);
return state != null ? state.getPendingWrite() : null;
}

View File

@ -24,6 +24,8 @@ import io.netty.util.internal.EmptyArrays;
import java.util.concurrent.atomic.AtomicInteger;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_SESSION_STREAM_ID;
/**
* Manages streams within a SPDY session.
*/
@ -38,17 +40,16 @@ public class SpdySessionHandler
STREAM_CLOSED.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
}
private final SpdySession spdySession = new SpdySession();
private static final int DEFAULT_WINDOW_SIZE = 64 * 1024; // 64 KB default initial window size
private int initialSendWindowSize = DEFAULT_WINDOW_SIZE;
private int initialReceiveWindowSize = DEFAULT_WINDOW_SIZE;
private final SpdySession spdySession = new SpdySession(initialSendWindowSize, initialReceiveWindowSize);
private int lastGoodStreamId;
private static final int DEFAULT_MAX_CONCURRENT_STREAMS = Integer.MAX_VALUE;
private int remoteConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
private int localConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
private int maxConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
private static final int DEFAULT_WINDOW_SIZE = 64 * 1024; // 64 KB default initial window size
private int initialSendWindowSize = DEFAULT_WINDOW_SIZE;
private int initialReceiveWindowSize = DEFAULT_WINDOW_SIZE;
private final Object flowControlLock = new Object();
@ -61,6 +62,7 @@ public class SpdySessionHandler
private final boolean server;
private final boolean flowControl;
private final boolean sessionFlowControl;
/**
* Creates a new session handler.
@ -71,13 +73,13 @@ public class SpdySessionHandler
* {@code false} if and only if this session handler should
* handle the client endpoint of the connection.
*/
public SpdySessionHandler(int version, boolean server) {
if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) {
throw new IllegalArgumentException(
"unsupported version: " + version);
public SpdySessionHandler(SpdyVersion version, boolean server) {
if (version == null) {
throw new NullPointerException("version");
}
this.server = server;
flowControl = version >= 3;
flowControl = version.useFlowControl();
sessionFlowControl = version.useSessionFlowControl();
}
@Override
@ -109,6 +111,27 @@ public class SpdySessionHandler
SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
int streamId = spdyDataFrame.getStreamId();
if (sessionFlowControl) {
int deltaWindowSize = -1 * spdyDataFrame.content().readableBytes();
int newSessionWindowSize =
spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, deltaWindowSize);
// Check if session window size is reduced beyond allowable lower bound
if (newSessionWindowSize < 0) {
issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
return;
}
// Send a WINDOW_UPDATE frame if less than half the session window size remains
if (newSessionWindowSize <= initialReceiveWindowSize / 2) {
deltaWindowSize = initialReceiveWindowSize - newSessionWindowSize;
spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, deltaWindowSize);
SpdyWindowUpdateFrame spdyWindowUpdateFrame =
new DefaultSpdyWindowUpdateFrame(SPDY_SESSION_STREAM_ID, deltaWindowSize);
ctx.writeAndFlush(spdyWindowUpdateFrame);
}
}
// Check if we received a data frame for a Stream-ID which is not open
if (!spdySession.isActiveStream(streamId)) {
@ -130,17 +153,17 @@ public class SpdySessionHandler
}
// Check if we received a data frame before receiving a SYN_REPLY
if (!isRemoteInitiatedID(streamId) && !spdySession.hasReceivedReply(streamId)) {
if (!isRemoteInitiatedId(streamId) && !spdySession.hasReceivedReply(streamId)) {
spdyDataFrame.release();
issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
return;
}
/*
* SPDY Data frame flow control processing requirements:
*
* Recipient should not send a WINDOW_UPDATE frame as it consumes the last data frame.
*/
* SPDY Data frame flow control processing requirements:
*
* Recipient should not send a WINDOW_UPDATE frame as it consumes the last data frame.
*/
if (flowControl) {
// Update receive window size
@ -168,7 +191,7 @@ public class SpdySessionHandler
}
}
// Send a WINDOW_UPDATE frame if less than half the window size remains
// Send a WINDOW_UPDATE frame if less than half the stream window size remains
if (newWindowSize <= initialReceiveWindowSize / 2 && !spdyDataFrame.isLast()) {
deltaWindowSize = initialReceiveWindowSize - newWindowSize;
spdySession.updateReceiveWindowSize(streamId, deltaWindowSize);
@ -204,7 +227,7 @@ public class SpdySessionHandler
// Check if we received a valid SYN_STREAM frame
if (spdySynStreamFrame.isInvalid() ||
!isRemoteInitiatedID(streamId) ||
!isRemoteInitiatedId(streamId) ||
spdySession.isActiveStream(streamId)) {
issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
return;
@ -239,7 +262,7 @@ public class SpdySessionHandler
// Check if we received a valid SYN_REPLY frame
if (spdySynReplyFrame.isInvalid() ||
isRemoteInitiatedID(streamId) ||
isRemoteInitiatedId(streamId) ||
spdySession.isRemoteSideClosed(streamId)) {
issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
return;
@ -279,7 +302,7 @@ public class SpdySessionHandler
int newConcurrentStreams =
spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
if (newConcurrentStreams >= 0) {
updateConcurrentStreams(newConcurrentStreams, true);
remoteConcurrentStreams = newConcurrentStreams;
}
// Persistence flag are inconsistent with the use of SETTINGS to communicate
@ -311,7 +334,7 @@ public class SpdySessionHandler
SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
if (isRemoteInitiatedID(spdyPingFrame.getId())) {
if (isRemoteInitiatedId(spdyPingFrame.getId())) {
ctx.writeAndFlush(spdyPingFrame);
return;
}
@ -365,13 +388,17 @@ public class SpdySessionHandler
int deltaWindowSize = spdyWindowUpdateFrame.getDeltaWindowSize();
// Ignore frames for half-closed streams
if (spdySession.isLocalSideClosed(streamId)) {
if (streamId != SPDY_SESSION_STREAM_ID && spdySession.isLocalSideClosed(streamId)) {
return;
}
// Check for numerical overflow
if (spdySession.getSendWindowSize(streamId) > Integer.MAX_VALUE - deltaWindowSize) {
issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
if (streamId == SPDY_SESSION_STREAM_ID) {
issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
} else {
issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
}
return;
}
@ -426,7 +453,7 @@ public class SpdySessionHandler
if (msg instanceof SpdyDataFrame) {
SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
final int streamId = spdyDataFrame.getStreamId();
int streamId = spdyDataFrame.getStreamId();
// Frames must not be sent on half-closed streams
if (spdySession.isLocalSideClosed(streamId)) {
@ -453,6 +480,11 @@ public class SpdySessionHandler
int dataLength = spdyDataFrame.content().readableBytes();
int sendWindowSize = spdySession.getSendWindowSize(streamId);
if (sessionFlowControl) {
int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
}
if (sendWindowSize <= 0) {
// Stream is stalled -- enqueue Data frame and return
spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
@ -460,6 +492,9 @@ public class SpdySessionHandler
} else if (sendWindowSize < dataLength) {
// Stream is not stalled but we cannot send the entire frame
spdySession.updateSendWindowSize(streamId, -1 * sendWindowSize);
if (sessionFlowControl) {
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
}
// Create a partial data frame whose length is the current window size
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId,
@ -469,13 +504,13 @@ public class SpdySessionHandler
spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
// The transfer window size is pre-decremented when sending a data frame downstream.
// Close the stream on write failures that leave the transfer window in a corrupt state.
// Close the session on write failures that leave the transfer window in a corrupt state.
final ChannelHandlerContext context = ctx;
ctx.write(partialDataFrame).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
issueStreamError(context, streamId, SpdyStreamStatus.INTERNAL_ERROR);
issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
}
}
});
@ -483,15 +518,18 @@ public class SpdySessionHandler
} else {
// Window size is large enough to send entire data frame
spdySession.updateSendWindowSize(streamId, -1 * dataLength);
if (sessionFlowControl) {
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataLength);
}
// The transfer window size is pre-decremented when sending a data frame downstream.
// Close the stream on write failures that leave the transfer window in a corrupt state.
// Close the session on write failures that leave the transfer window in a corrupt state.
final ChannelHandlerContext context = ctx;
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
issueStreamError(context, streamId, SpdyStreamStatus.INTERNAL_ERROR);
issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
}
}
});
@ -509,7 +547,7 @@ public class SpdySessionHandler
SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
int streamId = spdySynStreamFrame.getStreamId();
if (isRemoteInitiatedID(streamId)) {
if (isRemoteInitiatedId(streamId)) {
promise.setFailure(PROTOCOL_EXCEPTION);
return;
}
@ -528,7 +566,7 @@ public class SpdySessionHandler
int streamId = spdySynReplyFrame.getStreamId();
// Frames must not be sent on half-closed streams
if (!isRemoteInitiatedID(streamId) || spdySession.isLocalSideClosed(streamId)) {
if (!isRemoteInitiatedId(streamId) || spdySession.isLocalSideClosed(streamId)) {
promise.setFailure(PROTOCOL_EXCEPTION);
return;
}
@ -550,7 +588,7 @@ public class SpdySessionHandler
int newConcurrentStreams =
spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
if (newConcurrentStreams >= 0) {
updateConcurrentStreams(newConcurrentStreams, false);
localConcurrentStreams = newConcurrentStreams;
}
// Persistence flag are inconsistent with the use of SETTINGS to communicate
@ -572,7 +610,7 @@ public class SpdySessionHandler
} else if (msg instanceof SpdyPingFrame) {
SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
if (isRemoteInitiatedID(spdyPingFrame.getId())) {
if (isRemoteInitiatedId(spdyPingFrame.getId())) {
ctx.fireExceptionCaught(new IllegalArgumentException(
"invalid PING ID: " + spdyPingFrame.getId()));
return;
@ -654,20 +692,11 @@ public class SpdySessionHandler
* Helper functions
*/
private boolean isRemoteInitiatedID(int id) {
private boolean isRemoteInitiatedId(int id) {
boolean serverId = SpdyCodecUtil.isServerId(id);
return server && !serverId || !server && serverId;
}
private void updateConcurrentStreams(int newConcurrentStreams, boolean remote) {
if (remote) {
remoteConcurrentStreams = newConcurrentStreams;
} else {
localConcurrentStreams = newConcurrentStreams;
}
maxConcurrentStreams = Math.min(localConcurrentStreams, remoteConcurrentStreams);
}
// need to synchronize to prevent new streams from being created while updating active streams
private synchronized void updateInitialSendWindowSize(int newInitialWindowSize) {
int deltaWindowSize = newInitialWindowSize - initialSendWindowSize;
@ -690,13 +719,15 @@ public class SpdySessionHandler
return false;
}
if (spdySession.numActiveStreams() >= maxConcurrentStreams) {
boolean remote = isRemoteInitiatedId(streamId);
int maxConcurrentStreams = remote ? localConcurrentStreams : remoteConcurrentStreams;
if (spdySession.numActiveStreams(remote) >= maxConcurrentStreams) {
return false;
}
spdySession.acceptStream(
streamId, priority, remoteSideClosed, localSideClosed,
initialSendWindowSize, initialReceiveWindowSize);
if (isRemoteInitiatedID(streamId)) {
initialSendWindowSize, initialReceiveWindowSize, remote);
if (remote) {
lastGoodStreamId = streamId;
}
return true;
@ -704,9 +735,9 @@ public class SpdySessionHandler
private void halfCloseStream(int streamId, boolean remote, ChannelFuture future) {
if (remote) {
spdySession.closeRemoteSide(streamId);
spdySession.closeRemoteSide(streamId, isRemoteInitiatedId(streamId));
} else {
spdySession.closeLocalSide(streamId);
spdySession.closeLocalSide(streamId, isRemoteInitiatedId(streamId));
}
if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
future.addListener(closeSessionFutureListener);
@ -714,16 +745,20 @@ public class SpdySessionHandler
}
private void removeStream(int streamId, ChannelFuture future) {
spdySession.removeStream(streamId, STREAM_CLOSED);
spdySession.removeStream(streamId, STREAM_CLOSED, isRemoteInitiatedId(streamId));
if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
future.addListener(closeSessionFutureListener);
}
}
private void updateSendWindowSize(final ChannelHandlerContext ctx, final int streamId, int deltaWindowSize) {
private void updateSendWindowSize(final ChannelHandlerContext ctx, int streamId, int deltaWindowSize) {
synchronized (flowControlLock) {
int newWindowSize = spdySession.updateSendWindowSize(streamId, deltaWindowSize);
if (sessionFlowControl && streamId != SPDY_SESSION_STREAM_ID) {
int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
newWindowSize = Math.min(newWindowSize, sessionSendWindowSize);
}
while (newWindowSize > 0) {
// Check if we have unblocked a stalled stream
@ -734,42 +769,54 @@ public class SpdySessionHandler
SpdyDataFrame spdyDataFrame = pendingWrite.spdyDataFrame;
int dataFrameSize = spdyDataFrame.content().readableBytes();
int writeStreamId = spdyDataFrame.getStreamId();
if (sessionFlowControl && streamId == SPDY_SESSION_STREAM_ID) {
newWindowSize = Math.min(newWindowSize, spdySession.getSendWindowSize(writeStreamId));
}
if (newWindowSize >= dataFrameSize) {
// Window size is large enough to send entire data frame
spdySession.removePendingWrite(streamId);
newWindowSize = spdySession.updateSendWindowSize(streamId, -1 * dataFrameSize);
spdySession.removePendingWrite(writeStreamId);
newWindowSize = spdySession.updateSendWindowSize(writeStreamId, -1 * dataFrameSize);
if (sessionFlowControl) {
int sessionSendWindowSize =
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataFrameSize);
newWindowSize = Math.min(newWindowSize, sessionSendWindowSize);
}
// Close the local side of the stream if this is the last frame
if (spdyDataFrame.isLast()) {
halfCloseStream(streamId, false, pendingWrite.promise);
halfCloseStream(writeStreamId, false, pendingWrite.promise);
}
// The transfer window size is pre-decremented when sending a data frame downstream.
// Close the stream on write failures that leave the transfer window in a corrupt state.
// Close the session on write failures that leave the transfer window in a corrupt state.
ctx.writeAndFlush(spdyDataFrame, pendingWrite.promise).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
issueStreamError(ctx, streamId, SpdyStreamStatus.INTERNAL_ERROR);
issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
}
}
});
} else {
// We can send a partial frame
spdySession.updateSendWindowSize(streamId, -1 * newWindowSize);
spdySession.updateSendWindowSize(writeStreamId, -1 * newWindowSize);
if (sessionFlowControl) {
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * newWindowSize);
}
// Create a partial data frame whose length is the current window size
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId,
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(writeStreamId,
spdyDataFrame.content().readSlice(newWindowSize).retain());
// The transfer window size is pre-decremented when sending a data frame downstream.
// Close the stream on write failures that leave the transfer window in a corrupt state.
// Close the session on write failures that leave the transfer window in a corrupt state.
ctx.writeAndFlush(partialDataFrame).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
issueStreamError(ctx, streamId, SpdyStreamStatus.INTERNAL_ERROR);
issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
}
}
});

View File

@ -0,0 +1,55 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.spdy;
public enum SpdyVersion {
SPDY_2 (2, false, false),
SPDY_3 (3, true, false),
SPDY_3_1 (3, true, true);
static SpdyVersion valueOf(int version) {
if (version == 2) {
return SPDY_2;
}
if (version == 3) {
return SPDY_3;
}
throw new IllegalArgumentException(
"unsupported version: " + version);
}
private final int version;
private final boolean flowControl;
private final boolean sessionFlowControl;
private SpdyVersion(int version, boolean flowControl, boolean sessionFlowControl) {
this.version = version;
this.flowControl = flowControl;
this.sessionFlowControl = sessionFlowControl;
}
int getVersion() {
return version;
}
boolean useFlowControl() {
return flowControl;
}
boolean useSessionFlowControl() {
return sessionFlowControl;
}
}

View File

@ -34,7 +34,6 @@ import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
import static io.netty.handler.codec.spdy.SpdyConstants.*;
import static org.junit.Assert.*;
public class SpdyFrameDecoderTest {
@ -48,57 +47,60 @@ public class SpdyFrameDecoderTest {
@Test
public void testTooLargeHeaderNameOnSynStreamRequest() throws Exception {
for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version++) {
final int finalVersion = version;
List<Integer> headerSizes = Arrays.asList(90, 900);
for (final int maxHeaderSize : headerSizes) { // 90 catches the header name, 900 the value
SpdyHeadersFrame frame = new DefaultSpdySynStreamFrame(1, 0, (byte) 0);
addHeader(frame, 100, 1000);
final CaptureHandler captureHandler = new CaptureHandler();
ServerBootstrap sb = new ServerBootstrap();
sb.group(group);
sb.channel(NioServerSocketChannel.class);
sb.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new SpdyFrameDecoder(finalVersion, 10000, maxHeaderSize),
new SpdySessionHandler(finalVersion, true),
captureHandler);
}
});
testTooLargeHeaderNameOnSynStreamRequest(SpdyVersion.SPDY_2);
testTooLargeHeaderNameOnSynStreamRequest(SpdyVersion.SPDY_3);
testTooLargeHeaderNameOnSynStreamRequest(SpdyVersion.SPDY_3_1);
}
Bootstrap cb = new Bootstrap();
cb.group(group);
cb.channel(NioSocketChannel.class);
cb.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new SpdyFrameEncoder(finalVersion));
}
});
Channel sc = sb.bind(0).sync().channel();
int port = ((InetSocketAddress) sc.localAddress()).getPort();
private void testTooLargeHeaderNameOnSynStreamRequest(final SpdyVersion version) throws Exception {
List<Integer> headerSizes = Arrays.asList(90, 900);
for (final int maxHeaderSize : headerSizes) { // 90 catches the header name, 900 the value
SpdyHeadersFrame frame = new DefaultSpdySynStreamFrame(1, 0, (byte) 0);
addHeader(frame, 100, 1000);
final CaptureHandler captureHandler = new CaptureHandler();
ServerBootstrap sb = new ServerBootstrap();
sb.group(group);
sb.channel(NioServerSocketChannel.class);
sb.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new SpdyFrameDecoder(version, 10000, maxHeaderSize),
new SpdySessionHandler(version, true),
captureHandler);
}
});
Channel cc = cb.connect(NetUtil.LOCALHOST, port).sync().channel();
Bootstrap cb = new Bootstrap();
cb.group(group);
cb.channel(NioSocketChannel.class);
cb.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new SpdyFrameEncoder(version));
}
});
Channel sc = sb.bind(0).sync().channel();
int port = ((InetSocketAddress) sc.localAddress()).getPort();
sendAndWaitForFrame(cc, frame, captureHandler);
Channel cc = cb.connect(NetUtil.LOCALHOST, port).sync().channel();
assertNotNull("version " + version + ", not null message",
captureHandler.message);
String message = "version " + version + ", should be SpdyHeadersFrame, was " +
captureHandler.message.getClass();
assertTrue(
message,
captureHandler.message instanceof SpdyHeadersFrame);
SpdyHeadersFrame writtenFrame = (SpdyHeadersFrame) captureHandler.message;
sendAndWaitForFrame(cc, frame, captureHandler);
assertTrue("should be truncated", writtenFrame.isTruncated());
assertFalse("should not be invalid", writtenFrame.isInvalid());
assertNotNull("version " + version + ", not null message",
captureHandler.message);
String message = "version " + version + ", should be SpdyHeadersFrame, was " +
captureHandler.message.getClass();
assertTrue(
message,
captureHandler.message instanceof SpdyHeadersFrame);
SpdyHeadersFrame writtenFrame = (SpdyHeadersFrame) captureHandler.message;
sc.close().sync();
cc.close().sync();
}
assertTrue("should be truncated", writtenFrame.isTruncated());
assertFalse("should not be invalid", writtenFrame.isInvalid());
sc.close().sync();
cc.close().sync();
}
}

View File

@ -92,7 +92,7 @@ public class SpdySessionHandlerTest {
assertTrue(spdyHeadersFrame.headers().isEmpty());
}
private static void testSpdySessionHandler(int version, boolean server) {
private static void testSpdySessionHandler(SpdyVersion version, boolean server) {
EmbeddedChannel sessionHandler = new EmbeddedChannel(
new SpdySessionHandler(version, server), new EchoHandler(closeSignal, server));
@ -103,9 +103,6 @@ public class SpdySessionHandlerTest {
int localStreamId = server ? 1 : 2;
int remoteStreamId = server ? 2 : 1;
SpdyPingFrame localPingFrame = new DefaultSpdyPingFrame(localStreamId);
SpdyPingFrame remotePingFrame = new DefaultSpdyPingFrame(remoteStreamId);
SpdySynStreamFrame spdySynStreamFrame =
new DefaultSpdySynStreamFrame(localStreamId, 0, (byte) 0);
spdySynStreamFrame.headers().set("Compression", "test");
@ -126,24 +123,11 @@ public class SpdySessionHandlerTest {
assertNull(sessionHandler.readOutbound());
remoteStreamId += 2;
// Check if session handler correctly limits the number of
// concurrent streams in the SETTINGS frame
SpdySettingsFrame spdySettingsFrame = new DefaultSpdySettingsFrame();
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 0);
sessionHandler.writeInbound(spdySettingsFrame);
assertNull(sessionHandler.readOutbound());
sessionHandler.writeInbound(spdySynStreamFrame);
assertRstStream(sessionHandler.readOutbound(), localStreamId, SpdyStreamStatus.REFUSED_STREAM);
assertNull(sessionHandler.readOutbound());
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 100);
sessionHandler.writeInbound(spdySettingsFrame);
assertNull(sessionHandler.readOutbound());
sessionHandler.writeInbound(new DefaultSpdySynReplyFrame(remoteStreamId));
assertNull(sessionHandler.readOutbound());
// Check if session handler returns PROTOCOL_ERROR if it receives
// multiple SYN_REPLY frames for the same active Stream-ID
sessionHandler.writeInbound(new DefaultSpdySynReplyFrame(remoteStreamId));
assertNull(sessionHandler.readOutbound());
sessionHandler.writeInbound(new DefaultSpdySynReplyFrame(remoteStreamId));
assertRstStream(sessionHandler.readOutbound(), remoteStreamId, SpdyStreamStatus.STREAM_IN_USE);
assertNull(sessionHandler.readOutbound());
remoteStreamId += 2;
@ -173,6 +157,17 @@ public class SpdySessionHandlerTest {
assertRstStream(sessionHandler.readOutbound(), localStreamId, SpdyStreamStatus.REFUSED_STREAM);
assertNull(sessionHandler.readOutbound());
// Check if session handler rejects HEADERS for closed streams
int testStreamId = spdyDataFrame.getStreamId();
sessionHandler.writeInbound(spdyDataFrame);
assertDataFrame(sessionHandler.readOutbound(), testStreamId, spdyDataFrame.isLast());
assertNull(sessionHandler.readOutbound());
spdyHeadersFrame.setStreamId(testStreamId);
sessionHandler.writeInbound(spdyHeadersFrame);
assertRstStream(sessionHandler.readOutbound(), testStreamId, SpdyStreamStatus.INVALID_STREAM);
assertNull(sessionHandler.readOutbound());
// Check if session handler drops active streams if it receives
// a RST_STREAM frame for that Stream-ID
sessionHandler.writeInbound(new DefaultSpdyRstStreamFrame(remoteStreamId, 3));
@ -200,32 +195,6 @@ public class SpdySessionHandlerTest {
assertNull(sessionHandler.readOutbound());
spdySynStreamFrame.setStreamId(localStreamId);
// Check if session handler correctly handles updates to the max
// concurrent streams in the SETTINGS frame
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 2);
sessionHandler.writeInbound(spdySettingsFrame);
assertNull(sessionHandler.readOutbound());
sessionHandler.writeInbound(spdySynStreamFrame);
assertRstStream(sessionHandler.readOutbound(), localStreamId, SpdyStreamStatus.REFUSED_STREAM);
assertNull(sessionHandler.readOutbound());
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 4);
sessionHandler.writeInbound(spdySettingsFrame);
assertNull(sessionHandler.readOutbound());
sessionHandler.writeInbound(spdySynStreamFrame);
assertSynReply(sessionHandler.readOutbound(), localStreamId, false, spdySynStreamFrame.headers());
assertNull(sessionHandler.readOutbound());
// Check if session handler rejects HEADERS for closed streams
int testStreamId = spdyDataFrame.getStreamId();
sessionHandler.writeInbound(spdyDataFrame);
assertDataFrame(sessionHandler.readOutbound(), testStreamId, spdyDataFrame.isLast());
assertNull(sessionHandler.readOutbound());
spdyHeadersFrame.setStreamId(testStreamId);
sessionHandler.writeInbound(spdyHeadersFrame);
assertRstStream(sessionHandler.readOutbound(), testStreamId, SpdyStreamStatus.INVALID_STREAM);
assertNull(sessionHandler.readOutbound());
// Check if session handler returns PROTOCOL_ERROR if it receives
// an invalid HEADERS frame
spdyHeadersFrame.setStreamId(localStreamId);
@ -235,6 +204,23 @@ public class SpdySessionHandlerTest {
assertRstStream(sessionHandler.readOutbound(), localStreamId, SpdyStreamStatus.PROTOCOL_ERROR);
assertNull(sessionHandler.readOutbound());
sessionHandler.finish();
}
private static void testSpdySessionHandlerPing(SpdyVersion version, boolean server) {
EmbeddedChannel sessionHandler = new EmbeddedChannel(
new SpdySessionHandler(version, server), new EchoHandler(closeSignal, server));
while (sessionHandler.readOutbound() != null) {
continue;
}
int localStreamId = server ? 1 : 2;
int remoteStreamId = server ? 2 : 1;
SpdyPingFrame localPingFrame = new DefaultSpdyPingFrame(localStreamId);
SpdyPingFrame remotePingFrame = new DefaultSpdyPingFrame(remoteStreamId);
// Check if session handler returns identical local PINGs
sessionHandler.writeInbound(localPingFrame);
assertPing(sessionHandler.readOutbound(), localPingFrame.getId());
@ -244,6 +230,34 @@ public class SpdySessionHandlerTest {
sessionHandler.writeInbound(remotePingFrame);
assertNull(sessionHandler.readOutbound());
sessionHandler.finish();
}
private static void testSpdySessionHandlerGoAway(SpdyVersion version, boolean server) {
EmbeddedChannel sessionHandler = new EmbeddedChannel(
new SpdySessionHandler(version, server), new EchoHandler(closeSignal, server));
while (sessionHandler.readOutbound() != null) {
continue;
}
int localStreamId = server ? 1 : 2;
SpdySynStreamFrame spdySynStreamFrame =
new DefaultSpdySynStreamFrame(localStreamId, 0, (byte) 0);
spdySynStreamFrame.headers().set("Compression", "test");
SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(localStreamId);
spdyDataFrame.setLast(true);
// Send an initial request
sessionHandler.writeInbound(spdySynStreamFrame);
assertSynReply(sessionHandler.readOutbound(), localStreamId, false, spdySynStreamFrame.headers());
assertNull(sessionHandler.readOutbound());
sessionHandler.writeInbound(spdyDataFrame);
assertDataFrame(sessionHandler.readOutbound(), localStreamId, true);
assertNull(sessionHandler.readOutbound());
// Check if session handler sends a GOAWAY frame when closing
sessionHandler.writeInbound(closeMessage);
assertGoAway(sessionHandler.readOutbound(), localStreamId);
@ -268,22 +282,66 @@ public class SpdySessionHandlerTest {
@Test
public void testSpdyClientSessionHandler() {
for (int version = SpdyConstants.SPDY_MIN_VERSION; version <= SpdyConstants.SPDY_MAX_VERSION; version ++) {
logger.info("Running: testSpdyClientSessionHandler v" + version);
testSpdySessionHandler(version, false);
}
logger.info("Running: testSpdyClientSessionHandler v2");
testSpdySessionHandler(SpdyVersion.SPDY_2, false);
logger.info("Running: testSpdyClientSessionHandler v3");
testSpdySessionHandler(SpdyVersion.SPDY_3, false);
logger.info("Running: testSpdyClientSessionHandler v3.1");
testSpdySessionHandler(SpdyVersion.SPDY_3_1, false);
}
@Test
public void testSpdyClientSessionHandlerPing() {
logger.info("Running: testSpdyClientSessionHandlerPing v2");
testSpdySessionHandlerPing(SpdyVersion.SPDY_2, false);
logger.info("Running: testSpdyClientSessionHandlerPing v3");
testSpdySessionHandlerPing(SpdyVersion.SPDY_3, false);
logger.info("Running: testSpdyClientSessionHandlerPing v3.1");
testSpdySessionHandlerPing(SpdyVersion.SPDY_3_1, false);
}
@Test
public void testSpdyClientSessionHandlerGoAway() {
logger.info("Running: testSpdyClientSessionHandlerGoAway v2");
testSpdySessionHandlerGoAway(SpdyVersion.SPDY_2, false);
logger.info("Running: testSpdyClientSessionHandlerGoAway v3");
testSpdySessionHandlerGoAway(SpdyVersion.SPDY_3, false);
logger.info("Running: testSpdyClientSessionHandlerGoAway v3.1");
testSpdySessionHandlerGoAway(SpdyVersion.SPDY_3_1, false);
}
@Test
public void testSpdyServerSessionHandler() {
for (int version = SpdyConstants.SPDY_MIN_VERSION; version <= SpdyConstants.SPDY_MAX_VERSION; version ++) {
logger.info("Running: testSpdyServerSessionHandler v" + version);
testSpdySessionHandler(version, true);
}
logger.info("Running: testSpdyServerSessionHandler v2");
testSpdySessionHandler(SpdyVersion.SPDY_2, true);
logger.info("Running: testSpdyServerSessionHandler v3");
testSpdySessionHandler(SpdyVersion.SPDY_3, true);
logger.info("Running: testSpdyServerSessionHandler v3.1");
testSpdySessionHandler(SpdyVersion.SPDY_3_1, true);
}
@Test
public void testSpdyServerSessionHandlerPing() {
logger.info("Running: testSpdyServerSessionHandlerPing v2");
testSpdySessionHandlerPing(SpdyVersion.SPDY_2, true);
logger.info("Running: testSpdyServerSessionHandlerPing v3");
testSpdySessionHandlerPing(SpdyVersion.SPDY_3, true);
logger.info("Running: testSpdyServerSessionHandlerPing v3.1");
testSpdySessionHandlerPing(SpdyVersion.SPDY_3_1, true);
}
@Test
public void testSpdyServerSessionHandlerGoAway() {
logger.info("Running: testSpdyServerSessionHandlerGoAway v2");
testSpdySessionHandlerGoAway(SpdyVersion.SPDY_2, true);
logger.info("Running: testSpdyServerSessionHandlerGoAway v3");
testSpdySessionHandlerGoAway(SpdyVersion.SPDY_3, true);
logger.info("Running: testSpdyServerSessionHandlerGoAway v3.1");
testSpdySessionHandlerGoAway(SpdyVersion.SPDY_3_1, true);
}
// Echo Handler opens 4 half-closed streams on session connection
// and then sets the number of concurrent streams to 3
// and then sets the number of concurrent streams to 1
private static class EchoHandler extends ChannelInboundHandlerAdapter {
private final int closeSignal;
private final boolean server;
@ -308,9 +366,9 @@ public class SpdySessionHandlerTest {
spdySynStreamFrame.setStreamId(spdySynStreamFrame.getStreamId() + 2);
ctx.writeAndFlush(spdySynStreamFrame);
// Limit the number of concurrent streams to 3
// Limit the number of concurrent streams to 1
SpdySettingsFrame spdySettingsFrame = new DefaultSpdySettingsFrame();
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 3);
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 1);
ctx.writeAndFlush(spdySettingsFrame);
}
@ -337,8 +395,8 @@ public class SpdySessionHandlerTest {
}
if (msg instanceof SpdyDataFrame ||
msg instanceof SpdyPingFrame ||
msg instanceof SpdyHeadersFrame) {
msg instanceof SpdyPingFrame ||
msg instanceof SpdyHeadersFrame) {
ctx.writeAndFlush(msg);
return;

View File

@ -25,9 +25,9 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.spdy.SpdyConstants;
import io.netty.handler.codec.spdy.SpdyFrameDecoder;
import io.netty.handler.codec.spdy.SpdyFrameEncoder;
import io.netty.handler.codec.spdy.SpdyVersion;
import io.netty.util.NetUtil;
import org.junit.Test;
@ -165,19 +165,37 @@ public class SocketSpdyEchoTest extends AbstractSocketTest {
return frames;
}
private int version;
private SpdyVersion version;
@Test(timeout = 15000)
public void testSpdyEcho() throws Throwable {
for (version = SpdyConstants.SPDY_MIN_VERSION; version <= SpdyConstants.SPDY_MAX_VERSION; version ++) {
logger.info("Testing against SPDY v" + version);
run();
}
version = SpdyVersion.SPDY_2;
logger.info("Testing against SPDY v2");
run();
version = SpdyVersion.SPDY_3;
logger.info("Testing against SPDY v3");
run();
version = SpdyVersion.SPDY_3_1;
logger.info("Testing against SPDY v3.1");
run();
}
public void testSpdyEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
ByteBuf frames = createFrames(version);
ByteBuf frames;
switch (version) {
case SPDY_2:
frames = createFrames(2);
break;
case SPDY_3:
case SPDY_3_1:
frames = createFrames(3);
break;
default:
throw new IllegalArgumentException("unknown version");
}
final SpdyEchoTestServerHandler sh = new SpdyEchoTestServerHandler();
final SpdyEchoTestClientHandler ch = new SpdyEchoTestClientHandler(frames.copy());