SPDY: add SPDY/3.1 support

- with Michael Schore <mschore@twitter.com>
This commit is contained in:
Jeff Pinner 2013-09-26 15:26:37 -07:00
parent fec0987cca
commit bd54b516b7
18 changed files with 377 additions and 151 deletions

View File

@ -19,8 +19,7 @@ import org.jboss.netty.buffer.ChannelBuffer;
final class SpdyCodecUtil { final class SpdyCodecUtil {
static final int SPDY_MIN_VERSION = 2; static final int SPDY_SESSION_STREAM_ID = 0;
static final int SPDY_MAX_VERSION = 3;
static final int SPDY_HEADER_TYPE_OFFSET = 2; static final int SPDY_HEADER_TYPE_OFFSET = 2;
static final int SPDY_HEADER_FLAGS_OFFSET = 4; static final int SPDY_HEADER_FLAGS_OFFSET = 4;

View File

@ -62,29 +62,45 @@ public class SpdyFrameDecoder extends FrameDecoder {
* Creates a new instance with the specified {@code version} and the default * Creates a new instance with the specified {@code version} and the default
* {@code maxChunkSize (8192)} and {@code maxHeaderSize (16384)}. * {@code maxChunkSize (8192)} and {@code maxHeaderSize (16384)}.
*/ */
@Deprecated
public SpdyFrameDecoder(int version) { public SpdyFrameDecoder(int version) {
this(version, 8192, 16384); this(SpdyVersion.valueOf(version), 8192, 16384);
}
/**
* Creates a new instance with the specified {@code version} and the default
* {@code maxChunkSize (8192)} and {@code maxHeaderSize (16384)}.
*/
public SpdyFrameDecoder(SpdyVersion spdyVersion) {
this(spdyVersion, 8192, 16384);
} }
/** /**
* Creates a new instance with the specified parameters. * Creates a new instance with the specified parameters.
*/ */
@Deprecated
public SpdyFrameDecoder(int version, int maxChunkSize, int maxHeaderSize) { public SpdyFrameDecoder(int version, int maxChunkSize, int maxHeaderSize) {
this(version, maxChunkSize, SpdyHeaderBlockDecoder.newInstance(version, maxHeaderSize)); this(SpdyVersion.valueOf(version), maxChunkSize, maxHeaderSize);
}
/**
* Creates a new instance with the specified parameters.
*/
public SpdyFrameDecoder(SpdyVersion spdyVersion, int maxChunkSize, int maxHeaderSize) {
this(spdyVersion, maxChunkSize, SpdyHeaderBlockDecoder.newInstance(spdyVersion, maxHeaderSize));
} }
protected SpdyFrameDecoder( protected SpdyFrameDecoder(
int version, int maxChunkSize, SpdyHeaderBlockDecoder headerBlockDecoder) { SpdyVersion spdyVersion, int maxChunkSize, SpdyHeaderBlockDecoder headerBlockDecoder) {
super(false); super(false);
if (version < SPDY_MIN_VERSION || version > SPDY_MAX_VERSION) { if (spdyVersion == null) {
throw new IllegalArgumentException( throw new NullPointerException("spdyVersion");
"unsupported version: " + version);
} }
if (maxChunkSize <= 0) { if (maxChunkSize <= 0) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"maxChunkSize must be a positive integer: " + maxChunkSize); "maxChunkSize must be a positive integer: " + maxChunkSize);
} }
spdyVersion = version; this.spdyVersion = spdyVersion.getVersion();
this.maxChunkSize = maxChunkSize; this.maxChunkSize = maxChunkSize;
this.headerBlockDecoder = headerBlockDecoder; this.headerBlockDecoder = headerBlockDecoder;
state = State.READ_COMMON_HEADER; state = State.READ_COMMON_HEADER;

View File

@ -42,24 +42,41 @@ public class SpdyFrameEncoder implements ChannelDownstreamHandler {
* default {@code compressionLevel (6)}, {@code windowBits (15)}, * default {@code compressionLevel (6)}, {@code windowBits (15)},
* and {@code memLevel (8)}. * and {@code memLevel (8)}.
*/ */
@Deprecated
public SpdyFrameEncoder(int version) { public SpdyFrameEncoder(int version) {
this(version, 6, 15, 8); this(SpdyVersion.valueOf(version), 6, 15, 8);
}
/**
* Creates a new instance with the specified {@code version} and the
* default {@code compressionLevel (6)}, {@code windowBits (15)},
* and {@code memLevel (8)}.
*/
public SpdyFrameEncoder(SpdyVersion spdyVersion) {
this(spdyVersion, 6, 15, 8);
} }
/** /**
* Creates a new instance with the specified parameters. * Creates a new instance with the specified parameters.
*/ */
@Deprecated
public SpdyFrameEncoder(int version, int compressionLevel, int windowBits, int memLevel) { public SpdyFrameEncoder(int version, int compressionLevel, int windowBits, int memLevel) {
this(version, SpdyHeaderBlockEncoder.newInstance( this(SpdyVersion.valueOf(version), compressionLevel, windowBits, memLevel);
version, compressionLevel, windowBits, memLevel));
} }
protected SpdyFrameEncoder(int version, SpdyHeaderBlockEncoder headerBlockEncoder) { /**
if (version < SPDY_MIN_VERSION || version > SPDY_MAX_VERSION) { * Creates a new instance with the specified parameters.
throw new IllegalArgumentException( */
"unknown version: " + version); public SpdyFrameEncoder(SpdyVersion spdyVersion, int compressionLevel, int windowBits, int memLevel) {
this(spdyVersion, SpdyHeaderBlockEncoder.newInstance(
spdyVersion, compressionLevel, windowBits, memLevel));
} }
this.version = version;
protected SpdyFrameEncoder(SpdyVersion spdyVersion, SpdyHeaderBlockEncoder headerBlockEncoder) {
if (spdyVersion == null) {
throw new NullPointerException("spdyVersion");
}
version = spdyVersion.getVersion();
this.headerBlockEncoder = headerBlockEncoder; this.headerBlockEncoder = headerBlockEncoder;
} }

View File

@ -19,8 +19,8 @@ import org.jboss.netty.buffer.ChannelBuffer;
abstract class SpdyHeaderBlockDecoder { abstract class SpdyHeaderBlockDecoder {
static SpdyHeaderBlockDecoder newInstance(int version, int maxHeaderSize) { static SpdyHeaderBlockDecoder newInstance(SpdyVersion spdyVersion, int maxHeaderSize) {
return new SpdyHeaderBlockZlibDecoder(version, maxHeaderSize); return new SpdyHeaderBlockZlibDecoder(spdyVersion, maxHeaderSize);
} }
abstract void decode(ChannelBuffer encoded, SpdyHeadersFrame frame) throws Exception; abstract void decode(ChannelBuffer encoded, SpdyHeadersFrame frame) throws Exception;

View File

@ -21,14 +21,14 @@ import org.jboss.netty.util.internal.DetectionUtil;
abstract class SpdyHeaderBlockEncoder { abstract class SpdyHeaderBlockEncoder {
static SpdyHeaderBlockEncoder newInstance( static SpdyHeaderBlockEncoder newInstance(
int version, int compressionLevel, int windowBits, int memLevel) { SpdyVersion spdyVersion, int compressionLevel, int windowBits, int memLevel) {
if (DetectionUtil.javaVersion() >= 7) { if (DetectionUtil.javaVersion() >= 7) {
return new SpdyHeaderBlockZlibEncoder( return new SpdyHeaderBlockZlibEncoder(
version, compressionLevel); spdyVersion, compressionLevel);
} else { } else {
return new SpdyHeaderBlockJZlibEncoder( return new SpdyHeaderBlockJZlibEncoder(
version, compressionLevel, windowBits, memLevel); spdyVersion, compressionLevel, windowBits, memLevel);
} }
} }

View File

@ -30,8 +30,8 @@ class SpdyHeaderBlockJZlibEncoder extends SpdyHeaderBlockRawEncoder {
private boolean finished; private boolean finished;
public SpdyHeaderBlockJZlibEncoder( public SpdyHeaderBlockJZlibEncoder(
int version, int compressionLevel, int windowBits, int memLevel) { SpdyVersion spdyVersion, int compressionLevel, int windowBits, int memLevel) {
super(version); super(spdyVersion);
if (compressionLevel < 0 || compressionLevel > 9) { if (compressionLevel < 0 || compressionLevel > 9) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"compressionLevel: " + compressionLevel + " (expected: 0-9)"); "compressionLevel: " + compressionLevel + " (expected: 0-9)");
@ -51,6 +51,7 @@ class SpdyHeaderBlockJZlibEncoder extends SpdyHeaderBlockRawEncoder {
throw new CompressionException( throw new CompressionException(
"failed to initialize an SPDY header block deflater: " + resultCode); "failed to initialize an SPDY header block deflater: " + resultCode);
} else { } else {
int version = spdyVersion.getVersion();
if (version < 3) { if (version < 3) {
resultCode = z.deflateSetDictionary(SPDY2_DICT, SPDY2_DICT.length); resultCode = z.deflateSetDictionary(SPDY2_DICT, SPDY2_DICT.length);
} else { } else {

View File

@ -30,13 +30,12 @@ public class SpdyHeaderBlockRawDecoder extends SpdyHeaderBlockDecoder {
private int headerSize; private int headerSize;
private int numHeaders; private int numHeaders;
public SpdyHeaderBlockRawDecoder(int version, int maxHeaderSize) { public SpdyHeaderBlockRawDecoder(SpdyVersion spdyVersion, int maxHeaderSize) {
if (version < SPDY_MIN_VERSION || version > SPDY_MAX_VERSION) { if (spdyVersion == null) {
throw new IllegalArgumentException( throw new NullPointerException("spdyVersion");
"unsupported version: " + version);
} }
this.version = version; this.version = spdyVersion.getVersion();
this.maxHeaderSize = maxHeaderSize; this.maxHeaderSize = maxHeaderSize;
lengthFieldSize = version < 3 ? 2 : 4; lengthFieldSize = version < 3 ? 2 : 4;
reset(); reset();

View File

@ -27,12 +27,11 @@ public class SpdyHeaderBlockRawEncoder extends SpdyHeaderBlockEncoder {
private final int version; private final int version;
public SpdyHeaderBlockRawEncoder(int version) { public SpdyHeaderBlockRawEncoder(SpdyVersion spdyVersion) {
if (version < SPDY_MIN_VERSION || version > SPDY_MAX_VERSION) { if (spdyVersion == null) {
throw new IllegalArgumentException( throw new NullPointerException("spdyVersion");
"unknown version: " + version);
} }
this.version = version; version = spdyVersion.getVersion();
} }
private void setLengthField(ChannelBuffer buffer, int writerIndex, int length) { private void setLengthField(ChannelBuffer buffer, int writerIndex, int length) {

View File

@ -31,9 +31,9 @@ class SpdyHeaderBlockZlibDecoder extends SpdyHeaderBlockRawDecoder {
private ChannelBuffer decompressed; private ChannelBuffer decompressed;
public SpdyHeaderBlockZlibDecoder(int version, int maxHeaderSize) { public SpdyHeaderBlockZlibDecoder(SpdyVersion spdyVersion, int maxHeaderSize) {
super(version, maxHeaderSize); super(spdyVersion, maxHeaderSize);
this.version = version; this.version = spdyVersion.getVersion();
} }
@Override @Override

View File

@ -29,13 +29,14 @@ class SpdyHeaderBlockZlibEncoder extends SpdyHeaderBlockRawEncoder {
private boolean finished; private boolean finished;
public SpdyHeaderBlockZlibEncoder(int version, int compressionLevel) { public SpdyHeaderBlockZlibEncoder(SpdyVersion spdyVersion, int compressionLevel) {
super(version); super(spdyVersion);
if (compressionLevel < 0 || compressionLevel > 9) { if (compressionLevel < 0 || compressionLevel > 9) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"compressionLevel: " + compressionLevel + " (expected: 0-9)"); "compressionLevel: " + compressionLevel + " (expected: 0-9)");
} }
compressor = new Deflater(compressionLevel); compressor = new Deflater(compressionLevel);
int version = spdyVersion.getVersion();
if (version < 3) { if (version < 3) {
compressor.setDictionary(SPDY2_DICT); compressor.setDictionary(SPDY2_DICT);
} else { } else {

View File

@ -55,29 +55,42 @@ public class SpdyHttpDecoder extends OneToOneDecoder {
* If the length of the message content exceeds this value, * If the length of the message content exceeds this value,
* a {@link TooLongFrameException} will be raised. * a {@link TooLongFrameException} will be raised.
*/ */
@Deprecated
public SpdyHttpDecoder(int version, int maxContentLength) { public SpdyHttpDecoder(int version, int maxContentLength) {
this(version, maxContentLength, new HashMap<Integer, HttpMessage>()); this(SpdyVersion.valueOf(version), maxContentLength, new HashMap<Integer, HttpMessage>());
}
/**
* Creates a new instance.
*
* @param spdyVersion the protocol version
* @param maxContentLength the maximum length of the message content.
* If the length of the message content exceeds this value,
* a {@link TooLongFrameException} will be raised.
*/
@Deprecated
public SpdyHttpDecoder(SpdyVersion spdyVersion, int maxContentLength) {
this(spdyVersion, maxContentLength, new HashMap<Integer, HttpMessage>());
} }
/** /**
* Creates a new instance with the specified parameters. * Creates a new instance with the specified parameters.
* *
* @param version the protocol version * @param spdyVersion the protocol version
* @param maxContentLength the maximum length of the message content. * @param maxContentLength the maximum length of the message content.
* If the length of the message content exceeds this value, * If the length of the message content exceeds this value,
* a {@link TooLongFrameException} will be raised. * a {@link TooLongFrameException} will be raised.
* @param messageMap the {@link Map} used to hold partially received messages. * @param messageMap the {@link Map} used to hold partially received messages.
*/ */
protected SpdyHttpDecoder(int version, int maxContentLength, Map<Integer, HttpMessage> messageMap) { protected SpdyHttpDecoder(SpdyVersion spdyVersion, int maxContentLength, Map<Integer, HttpMessage> messageMap) {
if (version < SPDY_MIN_VERSION || version > SPDY_MAX_VERSION) { if (spdyVersion == null) {
throw new IllegalArgumentException( throw new NullPointerException("spdyVersion");
"unsupported version: " + version);
} }
if (maxContentLength <= 0) { if (maxContentLength <= 0) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"maxContentLength must be a positive integer: " + maxContentLength); "maxContentLength must be a positive integer: " + maxContentLength);
} }
spdyVersion = version; this.spdyVersion = spdyVersion.getVersion();
this.maxContentLength = maxContentLength; this.maxContentLength = maxContentLength;
this.messageMap = messageMap; this.messageMap = messageMap;
} }

View File

@ -136,12 +136,21 @@ public class SpdyHttpEncoder implements ChannelDownstreamHandler {
* *
* @param version the protocol version * @param version the protocol version
*/ */
@Deprecated
public SpdyHttpEncoder(int version) { public SpdyHttpEncoder(int version) {
if (version < SPDY_MIN_VERSION || version > SPDY_MAX_VERSION) { this(SpdyVersion.valueOf(version));
throw new IllegalArgumentException(
"unsupported version: " + version);
} }
spdyVersion = version;
/**
* Creates a new instance.
*
* @param spdyVersion the protocol version
*/
public SpdyHttpEncoder(SpdyVersion spdyVersion) {
if (spdyVersion == null) {
throw new NullPointerException("spdyVersion");
}
this.spdyVersion = spdyVersion.getVersion();
} }
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt) public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)

View File

@ -15,6 +15,8 @@
*/ */
package org.jboss.netty.handler.codec.spdy; package org.jboss.netty.handler.codec.spdy;
import org.jboss.netty.channel.MessageEvent;
import java.util.Comparator; import java.util.Comparator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -23,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.channel.MessageEvent; import static org.jboss.netty.handler.codec.spdy.SpdyCodecUtil.*;
final class SpdySession { final class SpdySession {
@ -34,6 +36,14 @@ final class SpdySession {
private final Map<Integer, StreamState> activeStreams = private final Map<Integer, StreamState> activeStreams =
new ConcurrentHashMap<Integer, StreamState>(); new ConcurrentHashMap<Integer, StreamState>();
private final AtomicInteger sendWindowSize;
private final AtomicInteger receiveWindowSize;
public SpdySession(int sendWindowSize, int receiveWindowSize) {
this.sendWindowSize = new AtomicInteger(sendWindowSize);
this.receiveWindowSize = new AtomicInteger(receiveWindowSize);
}
int numActiveStreams(boolean remote) { int numActiveStreams(boolean remote) {
if (remote) { if (remote) {
return activeRemoteStreams.get(); return activeRemoteStreams.get();
@ -145,16 +155,28 @@ final class SpdySession {
} }
int getSendWindowSize(int streamId) { int getSendWindowSize(int streamId) {
if (streamId == SPDY_SESSION_STREAM_ID) {
return sendWindowSize.get();
}
StreamState state = activeStreams.get(streamId); StreamState state = activeStreams.get(streamId);
return state != null ? state.getSendWindowSize() : -1; return state != null ? state.getSendWindowSize() : -1;
} }
int updateSendWindowSize(int streamId, int deltaWindowSize) { int updateSendWindowSize(int streamId, int deltaWindowSize) {
if (streamId == SPDY_SESSION_STREAM_ID) {
return sendWindowSize.addAndGet(deltaWindowSize);
}
StreamState state = activeStreams.get(streamId); StreamState state = activeStreams.get(streamId);
return state != null ? state.updateSendWindowSize(deltaWindowSize) : -1; return state != null ? state.updateSendWindowSize(deltaWindowSize) : -1;
} }
int updateReceiveWindowSize(int streamId, int deltaWindowSize) { int updateReceiveWindowSize(int streamId, int deltaWindowSize) {
if (streamId == SPDY_SESSION_STREAM_ID) {
return receiveWindowSize.addAndGet(deltaWindowSize);
}
StreamState state = activeStreams.get(streamId); StreamState state = activeStreams.get(streamId);
if (deltaWindowSize > 0) { if (deltaWindowSize > 0) {
state.setReceiveWindowSizeLowerBound(0); state.setReceiveWindowSizeLowerBound(0);
@ -163,10 +185,20 @@ final class SpdySession {
} }
int getReceiveWindowSizeLowerBound(int streamId) { int getReceiveWindowSizeLowerBound(int streamId) {
if (streamId == SPDY_SESSION_STREAM_ID) {
return 0;
}
StreamState state = activeStreams.get(streamId); StreamState state = activeStreams.get(streamId);
return state != null ? state.getReceiveWindowSizeLowerBound() : 0; return state != null ? state.getReceiveWindowSizeLowerBound() : 0;
} }
void updateAllSendWindowSizes(int deltaWindowSize) {
for (StreamState state: activeStreams.values()) {
state.updateSendWindowSize(deltaWindowSize);
}
}
void updateAllReceiveWindowSizes(int deltaWindowSize) { void updateAllReceiveWindowSizes(int deltaWindowSize) {
for (StreamState state: activeStreams.values()) { for (StreamState state: activeStreams.values()) {
state.updateReceiveWindowSize(deltaWindowSize); state.updateReceiveWindowSize(deltaWindowSize);
@ -182,6 +214,19 @@ final class SpdySession {
} }
MessageEvent getPendingWrite(int streamId) { MessageEvent getPendingWrite(int streamId) {
if (streamId == SPDY_SESSION_STREAM_ID) {
for (Integer id : getActiveStreams()) {
StreamState state = activeStreams.get(id);
if (state.getSendWindowSize() > 0) {
MessageEvent e = state.getPendingWrite();
if (e != null) {
return e;
}
}
}
return null;
}
StreamState state = activeStreams.get(streamId); StreamState state = activeStreams.get(streamId);
return state != null ? state.getPendingWrite() : null; return state != null ? state.getPendingWrite() : null;
} }

View File

@ -41,17 +41,17 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
private static final SpdyProtocolException PROTOCOL_EXCEPTION = new SpdyProtocolException(); private static final SpdyProtocolException PROTOCOL_EXCEPTION = new SpdyProtocolException();
private final SpdySession spdySession = new SpdySession(); private static final int DEFAULT_WINDOW_SIZE = 64 * 1024; // 64 KB default initial window size
private volatile int initialSendWindowSize = DEFAULT_WINDOW_SIZE;
private volatile int initialReceiveWindowSize = DEFAULT_WINDOW_SIZE;
private final SpdySession spdySession = new SpdySession(initialSendWindowSize, initialReceiveWindowSize);
private volatile int lastGoodStreamId; private volatile int lastGoodStreamId;
private static final int DEFAULT_MAX_CONCURRENT_STREAMS = Integer.MAX_VALUE; private static final int DEFAULT_MAX_CONCURRENT_STREAMS = Integer.MAX_VALUE;
private volatile int remoteConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS; private volatile int remoteConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
private volatile int localConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS; private volatile int localConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
private static final int DEFAULT_WINDOW_SIZE = 64 * 1024; // 64 KB default initial window size
private volatile int initialSendWindowSize = DEFAULT_WINDOW_SIZE;
private volatile int initialReceiveWindowSize = DEFAULT_WINDOW_SIZE;
private final Object flowControlLock = new Object(); private final Object flowControlLock = new Object();
private final AtomicInteger pings = new AtomicInteger(); private final AtomicInteger pings = new AtomicInteger();
@ -63,6 +63,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
private final boolean server; private final boolean server;
private final boolean flowControl; private final boolean flowControl;
private final boolean sessionFlowControl;
/** /**
* Creates a new session handler. * Creates a new session handler.
@ -73,13 +74,27 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
* {@code false} if and only if this session handler should * {@code false} if and only if this session handler should
* handle the client endpoint of the connection. * handle the client endpoint of the connection.
*/ */
@Deprecated
public SpdySessionHandler(int version, boolean server) { public SpdySessionHandler(int version, boolean server) {
if (version < SPDY_MIN_VERSION || version > SPDY_MAX_VERSION) { this(SpdyVersion.valueOf(version), server);
throw new IllegalArgumentException( }
"unsupported version: " + version);
/**
* Creates a new session handler.
*
* @param spdyVersion the protocol version
* @param server {@code true} if and only if this session handler should
* handle the server endpoint of the connection.
* {@code false} if and only if this session handler should
* handle the client endpoint of the connection.
*/
public SpdySessionHandler(SpdyVersion spdyVersion, boolean server) {
if (spdyVersion == null) {
throw new NullPointerException("spdyVersion");
} }
this.server = server; this.server = server;
flowControl = version >= 3; flowControl = spdyVersion.useFlowControl();
sessionFlowControl = spdyVersion.useSessionFlowControl();
} }
@Override @Override
@ -115,6 +130,28 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg; SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
int streamId = spdyDataFrame.getStreamId(); int streamId = spdyDataFrame.getStreamId();
if (sessionFlowControl) {
int deltaWindowSize = -1 * spdyDataFrame.getData().readableBytes();
int newSessionWindowSize =
spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, deltaWindowSize);
// Check if session window size is reduced beyond allowable lower bound
if (newSessionWindowSize < 0) {
issueSessionError(ctx, e.getChannel(), e.getRemoteAddress(), SpdySessionStatus.PROTOCOL_ERROR);
return;
}
// Send a WINDOW_UPDATE frame if less than half the session window size remains
if (newSessionWindowSize <= initialReceiveWindowSize / 2) {
deltaWindowSize = initialReceiveWindowSize - newSessionWindowSize;
spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, deltaWindowSize);
SpdyWindowUpdateFrame spdyWindowUpdateFrame =
new DefaultSpdyWindowUpdateFrame(SPDY_SESSION_STREAM_ID, deltaWindowSize);
Channels.write(
ctx, Channels.future(e.getChannel()), spdyWindowUpdateFrame, e.getRemoteAddress());
}
}
// Check if we received a data frame for a Stream-ID which is not open // Check if we received a data frame for a Stream-ID which is not open
if (!spdySession.isActiveStream(streamId)) { if (!spdySession.isActiveStream(streamId)) {
if (streamId <= lastGoodStreamId) { if (streamId <= lastGoodStreamId) {
@ -168,7 +205,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
} }
} }
// Send a WINDOW_UPDATE frame if less than half the window size remains // Send a WINDOW_UPDATE frame if less than half the stream window size remains
if (newWindowSize <= initialReceiveWindowSize / 2 && !spdyDataFrame.isLast()) { if (newWindowSize <= initialReceiveWindowSize / 2 && !spdyDataFrame.isLast()) {
deltaWindowSize = initialReceiveWindowSize - newWindowSize; deltaWindowSize = initialReceiveWindowSize - newWindowSize;
spdySession.updateReceiveWindowSize(streamId, deltaWindowSize); spdySession.updateReceiveWindowSize(streamId, deltaWindowSize);
@ -366,13 +403,17 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
int deltaWindowSize = spdyWindowUpdateFrame.getDeltaWindowSize(); int deltaWindowSize = spdyWindowUpdateFrame.getDeltaWindowSize();
// Ignore frames for half-closed streams // Ignore frames for half-closed streams
if (spdySession.isLocalSideClosed(streamId)) { if (streamId != SPDY_SESSION_STREAM_ID && spdySession.isLocalSideClosed(streamId)) {
return; return;
} }
// Check for numerical overflow // Check for numerical overflow
if (spdySession.getSendWindowSize(streamId) > Integer.MAX_VALUE - deltaWindowSize) { if (spdySession.getSendWindowSize(streamId) > Integer.MAX_VALUE - deltaWindowSize) {
if (streamId == SPDY_SESSION_STREAM_ID) {
issueSessionError(ctx, e.getChannel(), e.getRemoteAddress(), SpdySessionStatus.PROTOCOL_ERROR);
} else {
issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR); issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
}
return; return;
} }
@ -454,6 +495,11 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
int dataLength = spdyDataFrame.getData().readableBytes(); int dataLength = spdyDataFrame.getData().readableBytes();
int sendWindowSize = spdySession.getSendWindowSize(streamId); int sendWindowSize = spdySession.getSendWindowSize(streamId);
if (sessionFlowControl) {
int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
}
if (sendWindowSize <= 0) { if (sendWindowSize <= 0) {
// Stream is stalled -- enqueue Data frame and return // Stream is stalled -- enqueue Data frame and return
spdySession.putPendingWrite(streamId, e); spdySession.putPendingWrite(streamId, e);
@ -461,6 +507,9 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
} else if (sendWindowSize < dataLength) { } else if (sendWindowSize < dataLength) {
// Stream is not stalled but we cannot send the entire frame // Stream is not stalled but we cannot send the entire frame
spdySession.updateSendWindowSize(streamId, -1 * sendWindowSize); spdySession.updateSendWindowSize(streamId, -1 * sendWindowSize);
if (sessionFlowControl) {
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
}
// Create a partial data frame whose length is the current window size // Create a partial data frame whose length is the current window size
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId); SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId);
@ -472,14 +521,15 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
ChannelFuture writeFuture = Channels.future(e.getChannel()); ChannelFuture writeFuture = Channels.future(e.getChannel());
// The transfer window size is pre-decremented when sending a data frame downstream. // The transfer window size is pre-decremented when sending a data frame downstream.
// Close the stream on write failures that leaves the transfer window in a corrupt state. // Close the session on write failures that leaves the transfer window in a corrupt state.
final SocketAddress remoteAddress = e.getRemoteAddress(); final SocketAddress remoteAddress = e.getRemoteAddress();
final ChannelHandlerContext context = ctx; final ChannelHandlerContext context = ctx;
e.getFuture().addListener(new ChannelFutureListener() { e.getFuture().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
issueStreamError( Channel channel = future.getChannel();
context, remoteAddress, streamId, SpdyStreamStatus.INTERNAL_ERROR); issueSessionError(
context, channel, remoteAddress, SpdySessionStatus.INTERNAL_ERROR);
} }
} }
}); });
@ -489,16 +539,20 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
} else { } else {
// Window size is large enough to send entire data frame // Window size is large enough to send entire data frame
spdySession.updateSendWindowSize(streamId, -1 * dataLength); spdySession.updateSendWindowSize(streamId, -1 * dataLength);
if (sessionFlowControl) {
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataLength);
}
// The transfer window size is pre-decremented when sending a data frame downstream. // The transfer window size is pre-decremented when sending a data frame downstream.
// Close the stream on write failures that leaves the transfer window in a corrupt state. // Close the session on write failures that leaves the transfer window in a corrupt state.
final SocketAddress remoteAddress = e.getRemoteAddress(); final SocketAddress remoteAddress = e.getRemoteAddress();
final ChannelHandlerContext context = ctx; final ChannelHandlerContext context = ctx;
e.getFuture().addListener(new ChannelFutureListener() { e.getFuture().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
issueStreamError( Channel channel = future.getChannel();
context, remoteAddress, streamId, SpdyStreamStatus.INTERNAL_ERROR); issueSessionError(
context, channel, remoteAddress, SpdySessionStatus.INTERNAL_ERROR);
} }
} }
}); });
@ -673,9 +727,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
private synchronized void updateInitialSendWindowSize(int newInitialWindowSize) { private synchronized void updateInitialSendWindowSize(int newInitialWindowSize) {
int deltaWindowSize = newInitialWindowSize - initialSendWindowSize; int deltaWindowSize = newInitialWindowSize - initialSendWindowSize;
initialSendWindowSize = newInitialWindowSize; initialSendWindowSize = newInitialWindowSize;
for (Integer StreamId: spdySession.getActiveStreams()) { spdySession.updateAllSendWindowSizes(deltaWindowSize);
spdySession.updateSendWindowSize(StreamId.intValue(), deltaWindowSize);
}
} }
// need to synchronize to prevent new streams from being created while updating active streams // need to synchronize to prevent new streams from being created while updating active streams
@ -728,6 +780,10 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
private void updateSendWindowSize(ChannelHandlerContext ctx, final int streamId, int deltaWindowSize) { private void updateSendWindowSize(ChannelHandlerContext ctx, final int streamId, int deltaWindowSize) {
synchronized (flowControlLock) { synchronized (flowControlLock) {
int newWindowSize = spdySession.updateSendWindowSize(streamId, deltaWindowSize); int newWindowSize = spdySession.updateSendWindowSize(streamId, deltaWindowSize);
if (sessionFlowControl && streamId != SPDY_SESSION_STREAM_ID) {
int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
newWindowSize = Math.min(newWindowSize, sessionSendWindowSize);
}
while (newWindowSize > 0) { while (newWindowSize > 0) {
// Check if we have unblocked a stalled stream // Check if we have unblocked a stalled stream
@ -738,33 +794,46 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
SpdyDataFrame spdyDataFrame = (SpdyDataFrame) e.getMessage(); SpdyDataFrame spdyDataFrame = (SpdyDataFrame) e.getMessage();
int dataFrameSize = spdyDataFrame.getData().readableBytes(); int dataFrameSize = spdyDataFrame.getData().readableBytes();
final int writeStreamId = spdyDataFrame.getStreamId();
if (sessionFlowControl && streamId == SPDY_SESSION_STREAM_ID) {
newWindowSize = Math.min(newWindowSize, spdySession.getSendWindowSize(writeStreamId));
}
if (newWindowSize >= dataFrameSize) { if (newWindowSize >= dataFrameSize) {
// Window size is large enough to send entire data frame // Window size is large enough to send entire data frame
spdySession.removePendingWrite(streamId); spdySession.removePendingWrite(writeStreamId);
newWindowSize = spdySession.updateSendWindowSize(streamId, -1 * dataFrameSize); newWindowSize = spdySession.updateSendWindowSize(writeStreamId, -1 * dataFrameSize);
if (sessionFlowControl) {
int sessionSendWindowSize =
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataFrameSize);
newWindowSize = Math.min(newWindowSize, sessionSendWindowSize);
}
// The transfer window size is pre-decremented when sending a data frame downstream. // The transfer window size is pre-decremented when sending a data frame downstream.
// Close the stream on write failures that leaves the transfer window in a corrupt state. // Close the session on write failures that leaves the transfer window in a corrupt state.
final SocketAddress remoteAddress = e.getRemoteAddress(); final SocketAddress remoteAddress = e.getRemoteAddress();
final ChannelHandlerContext context = ctx; final ChannelHandlerContext context = ctx;
e.getFuture().addListener(new ChannelFutureListener() { e.getFuture().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
issueStreamError(context, remoteAddress, streamId, SpdyStreamStatus.INTERNAL_ERROR); Channel channel = future.getChannel();
issueSessionError(context, channel, remoteAddress, SpdySessionStatus.INTERNAL_ERROR);
} }
} }
}); });
// Close the local side of the stream if this is the last frame // Close the local side of the stream if this is the last frame
if (spdyDataFrame.isLast()) { if (spdyDataFrame.isLast()) {
halfCloseStream(streamId, false, e.getFuture()); halfCloseStream(writeStreamId, false, e.getFuture());
} }
Channels.write(ctx, e.getFuture(), spdyDataFrame, e.getRemoteAddress()); Channels.write(ctx, e.getFuture(), spdyDataFrame, e.getRemoteAddress());
} else { } else {
// We can send a partial frame // We can send a partial frame
spdySession.updateSendWindowSize(streamId, -1 * newWindowSize); spdySession.updateSendWindowSize(writeStreamId, -1 * newWindowSize);
if (sessionFlowControl) {
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * newWindowSize);
}
// Create a partial data frame whose length is the current window size // Create a partial data frame whose length is the current window size
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId); SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId);
@ -773,13 +842,14 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
ChannelFuture writeFuture = Channels.future(e.getChannel()); ChannelFuture writeFuture = Channels.future(e.getChannel());
// The transfer window size is pre-decremented when sending a data frame downstream. // The transfer window size is pre-decremented when sending a data frame downstream.
// Close the stream on write failures that leaves the transfer window in a corrupt state. // Close the session on write failures that leaves the transfer window in a corrupt state.
final SocketAddress remoteAddress = e.getRemoteAddress(); final SocketAddress remoteAddress = e.getRemoteAddress();
final ChannelHandlerContext context = ctx; final ChannelHandlerContext context = ctx;
e.getFuture().addListener(new ChannelFutureListener() { e.getFuture().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
issueStreamError(context, remoteAddress, streamId, SpdyStreamStatus.INTERNAL_ERROR); Channel channel = future.getChannel();
issueSessionError(context, channel, remoteAddress, SpdySessionStatus.INTERNAL_ERROR);
} }
} }
}); });

View File

@ -0,0 +1,55 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.codec.spdy;
public enum SpdyVersion {
SPDY_2 (2, false, false),
SPDY_3 (3, true, false),
SPDY_3_1 (3, true, true);
static SpdyVersion valueOf(int version) {
if (version == 2) {
return SPDY_2;
}
if (version == 3) {
return SPDY_3;
}
throw new IllegalArgumentException(
"unsupported version: " + version);
}
private final int version;
private final boolean flowControl;
private final boolean sessionFlowControl;
private SpdyVersion(int version, boolean flowControl, boolean sessionFlowControl) {
this.version = version;
this.flowControl = flowControl;
this.sessionFlowControl = sessionFlowControl;
}
int getVersion() {
return version;
}
boolean useFlowControl() {
return flowControl;
}
boolean useSessionFlowControl() {
return sessionFlowControl;
}
}

View File

@ -15,7 +15,6 @@
*/ */
package org.jboss.netty.handler.codec.spdy; package org.jboss.netty.handler.codec.spdy;
import static org.jboss.netty.handler.codec.spdy.SpdyCodecUtil.*;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import java.io.IOException; import java.io.IOException;
@ -174,16 +173,16 @@ public abstract class AbstractSocketSpdyEchoTest {
@Test @Test
public void testSpdyEcho() throws Throwable { public void testSpdyEcho() throws Throwable {
for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) { testSpdyEcho(SpdyVersion.SPDY_2);
testSpdyEcho(version); testSpdyEcho(SpdyVersion.SPDY_3);
} testSpdyEcho(SpdyVersion.SPDY_3_1);
} }
private void testSpdyEcho(int version) throws Throwable { private void testSpdyEcho(SpdyVersion version) throws Throwable {
ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(Executors.newCachedThreadPool())); ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(Executors.newCachedThreadPool()));
ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(Executors.newCachedThreadPool())); ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(Executors.newCachedThreadPool()));
ChannelBuffer frames = createFrames(version); ChannelBuffer frames = createFrames(version.getVersion());
EchoHandler sh = new EchoHandler(frames, true); EchoHandler sh = new EchoHandler(frames, true);
EchoHandler ch = new EchoHandler(frames, false); EchoHandler ch = new EchoHandler(frames, false);

View File

@ -35,14 +35,18 @@ import java.util.List;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import static org.jboss.netty.handler.codec.spdy.SpdyCodecUtil.*;
import static org.junit.Assert.*; import static org.junit.Assert.*;
public class SpdyFrameDecoderTest { public class SpdyFrameDecoderTest {
@Test @Test
public void testTooLargeHeaderNameOnSynStreamRequest() throws Exception { public void testTooLargeHeaderNameOnSynStreamRequest() throws Exception {
for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version++) { testTooLargeHeaderNameOnSynStreamRequest(SpdyVersion.SPDY_2);
testTooLargeHeaderNameOnSynStreamRequest(SpdyVersion.SPDY_3);
testTooLargeHeaderNameOnSynStreamRequest(SpdyVersion.SPDY_3_1);
}
private void testTooLargeHeaderNameOnSynStreamRequest(SpdyVersion spdyVersion) throws Exception {
List<Integer> headerSizes = Arrays.asList(90, 900); List<Integer> headerSizes = Arrays.asList(90, 900);
for (int maxHeaderSize : headerSizes) { // 90 catches the header name, 900 the value for (int maxHeaderSize : headerSizes) { // 90 catches the header name, 900 the value
SpdyHeadersFrame frame = new DefaultSpdySynStreamFrame(1, 0, (byte) 0); SpdyHeadersFrame frame = new DefaultSpdySynStreamFrame(1, 0, (byte) 0);
@ -53,11 +57,11 @@ public class SpdyFrameDecoderTest {
ClientBootstrap cb = new ClientBootstrap( ClientBootstrap cb = new ClientBootstrap(
newClientSocketChannelFactory(Executors.newCachedThreadPool())); newClientSocketChannelFactory(Executors.newCachedThreadPool()));
sb.getPipeline().addLast("decoder", new SpdyFrameDecoder(version, 10000, maxHeaderSize)); sb.getPipeline().addLast("decoder", new SpdyFrameDecoder(spdyVersion, 10000, maxHeaderSize));
sb.getPipeline().addLast("sessionHandler", new SpdySessionHandler(version, true)); sb.getPipeline().addLast("sessionHandler", new SpdySessionHandler(spdyVersion, true));
sb.getPipeline().addLast("handler", captureHandler); sb.getPipeline().addLast("handler", captureHandler);
cb.getPipeline().addLast("encoder", new SpdyFrameEncoder(version)); cb.getPipeline().addLast("encoder", new SpdyFrameEncoder(spdyVersion));
Channel sc = sb.bind(new InetSocketAddress(0)); Channel sc = sb.bind(new InetSocketAddress(0));
int port = ((InetSocketAddress) sc.getLocalAddress()).getPort(); int port = ((InetSocketAddress) sc.getLocalAddress()).getPort();
@ -68,9 +72,9 @@ public class SpdyFrameDecoderTest {
sendAndWaitForFrame(cc, frame, captureHandler); sendAndWaitForFrame(cc, frame, captureHandler);
assertNotNull("version " + version + ", not null message", assertNotNull("version " + spdyVersion.getVersion() + ", not null message",
captureHandler.message); captureHandler.message);
String message = "version " + version + ", should be SpdyHeadersFrame, was " + String message = "version " + spdyVersion.getVersion() + ", should be SpdyHeadersFrame, was " +
captureHandler.message.getClass(); captureHandler.message.getClass();
assertTrue( assertTrue(
message, message,
@ -87,7 +91,6 @@ public class SpdyFrameDecoderTest {
sb.releaseExternalResources(); sb.releaseExternalResources();
} }
} }
}
private static void sendAndWaitForFrame(Channel cc, SpdyFrame frame, CaptureHandler handler) { private static void sendAndWaitForFrame(Channel cc, SpdyFrame frame, CaptureHandler handler) {
cc.write(frame); cc.write(frame);

View File

@ -91,10 +91,10 @@ public class SpdySessionHandlerTest {
assertTrue(spdyHeadersFrame.getHeaders().isEmpty()); assertTrue(spdyHeadersFrame.getHeaders().isEmpty());
} }
private static void testSpdySessionHandler(int version, boolean server) { private static void testSpdySessionHandler(SpdyVersion spdyVersion, boolean server) {
DecoderEmbedder<Object> sessionHandler = DecoderEmbedder<Object> sessionHandler =
new DecoderEmbedder<Object>( new DecoderEmbedder<Object>(
new SpdySessionHandler(version, server), new EchoHandler(closeSignal, server)); new SpdySessionHandler(spdyVersion, server), new EchoHandler(closeSignal, server));
sessionHandler.pollAll(); sessionHandler.pollAll();
int localStreamId = server ? 1 : 2; int localStreamId = server ? 1 : 2;
@ -200,50 +200,50 @@ public class SpdySessionHandlerTest {
@Test @Test
public void testSpdyClientSessionHandler() { public void testSpdyClientSessionHandler() {
for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) { testSpdySessionHandler(SpdyVersion.SPDY_2, false);
testSpdySessionHandler(version, false); testSpdySessionHandler(SpdyVersion.SPDY_3, false);
} testSpdySessionHandler(SpdyVersion.SPDY_3_1, false);
} }
@Test @Test
public void testSpdyClientSessionHandlerPing() { public void testSpdyClientSessionHandlerPing() {
for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) { testSpdySessionHandlerPing(SpdyVersion.SPDY_2, false);
testSpdySessionHandlerPing(version, false); testSpdySessionHandlerPing(SpdyVersion.SPDY_3, false);
} testSpdySessionHandlerPing(SpdyVersion.SPDY_3_1, false);
} }
@Test @Test
public void testSpdyClientSessionHandlerGoAway() { public void testSpdyClientSessionHandlerGoAway() {
for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) { testSpdySessionHandlerGoAway(SpdyVersion.SPDY_2, false);
testSpdySessionHandlerGoAway(version, false); testSpdySessionHandlerGoAway(SpdyVersion.SPDY_3, false);
} testSpdySessionHandlerGoAway(SpdyVersion.SPDY_3_1, false);
} }
@Test @Test
public void testSpdyServerSessionHandler() { public void testSpdyServerSessionHandler() {
for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) { testSpdySessionHandler(SpdyVersion.SPDY_2, true);
testSpdySessionHandler(version, true); testSpdySessionHandler(SpdyVersion.SPDY_3, true);
} testSpdySessionHandler(SpdyVersion.SPDY_3_1, true);
} }
@Test @Test
public void testSpdyServerSessionHandlerPing() { public void testSpdyServerSessionHandlerPing() {
for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) { testSpdySessionHandlerPing(SpdyVersion.SPDY_2, true);
testSpdySessionHandlerPing(version, true); testSpdySessionHandlerPing(SpdyVersion.SPDY_3, true);
} testSpdySessionHandlerPing(SpdyVersion.SPDY_3_1, true);
} }
@Test @Test
public void testSpdyServerSessionHandlerGoAway() { public void testSpdyServerSessionHandlerGoAway() {
for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) { testSpdySessionHandlerGoAway(SpdyVersion.SPDY_2, true);
testSpdySessionHandlerGoAway(version, true); testSpdySessionHandlerGoAway(SpdyVersion.SPDY_3, true);
} testSpdySessionHandlerGoAway(SpdyVersion.SPDY_3_1, true);
} }
private static void testSpdySessionHandlerPing(int version, boolean server) { private static void testSpdySessionHandlerPing(SpdyVersion spdyVersion, boolean server) {
DecoderEmbedder<Object> sessionHandler = DecoderEmbedder<Object> sessionHandler =
new DecoderEmbedder<Object>( new DecoderEmbedder<Object>(
new SpdySessionHandler(version, server), new EchoHandler(closeSignal, server)); new SpdySessionHandler(spdyVersion, server), new EchoHandler(closeSignal, server));
sessionHandler.pollAll(); sessionHandler.pollAll();
int localStreamId = server ? 1 : 2; int localStreamId = server ? 1 : 2;
@ -264,10 +264,10 @@ public class SpdySessionHandlerTest {
sessionHandler.finish(); sessionHandler.finish();
} }
private static void testSpdySessionHandlerGoAway(int version, boolean server) { private static void testSpdySessionHandlerGoAway(SpdyVersion spdyVersion, boolean server) {
DecoderEmbedder<Object> sessionHandler = DecoderEmbedder<Object> sessionHandler =
new DecoderEmbedder<Object>( new DecoderEmbedder<Object>(
new SpdySessionHandler(version, server), new EchoHandler(closeSignal, server)); new SpdySessionHandler(spdyVersion, server), new EchoHandler(closeSignal, server));
sessionHandler.pollAll(); sessionHandler.pollAll();
int localStreamId = server ? 1 : 2; int localStreamId = server ? 1 : 2;