SPDY Protocol Framing Layer Tests
This commit is contained in:
parent
cf8a4d627d
commit
9fcb4643b1
@ -250,6 +250,10 @@ public class SpdyFrameDecoder extends FrameDecoder {
|
||||
}
|
||||
|
||||
private ChannelBuffer decompress(ChannelBuffer compressed) throws Exception {
|
||||
if ((compressed.readableBytes() == 2) &&
|
||||
(compressed.getShort(compressed.readerIndex()) == 0)) {
|
||||
return compressed;
|
||||
}
|
||||
headerBlockDecompressor.offer(compressed);
|
||||
return headerBlockDecompressor.poll();
|
||||
}
|
||||
|
@ -236,8 +236,16 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
|
||||
} else if (msg instanceof SpdyHeadersFrame) {
|
||||
|
||||
SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
|
||||
int streamID = spdyHeadersFrame.getStreamID();
|
||||
|
||||
// Check if we received a valid HEADERS frame
|
||||
if (spdyHeadersFrame.isInvalid()) {
|
||||
issueStreamError(ctx, e, spdyHeadersFrame.getStreamID(), SpdyStreamStatus.PROTOCOL_ERROR);
|
||||
issueStreamError(ctx, e, streamID, SpdyStreamStatus.PROTOCOL_ERROR);
|
||||
return;
|
||||
}
|
||||
|
||||
if (spdySession.isRemoteSideClosed(streamID)) {
|
||||
issueStreamError(ctx, e, streamID, SpdyStreamStatus.INVALID_STREAM);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -331,6 +339,15 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
|
||||
e.getFuture().setFailure(PROTOCOL_EXCEPTION);
|
||||
return;
|
||||
|
||||
} else if (msg instanceof SpdyHeadersFrame) {
|
||||
|
||||
SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
|
||||
int streamID = spdyHeadersFrame.getStreamID();
|
||||
|
||||
if (spdySession.isLocalSideClosed(streamID)) {
|
||||
e.getFuture().setFailure(PROTOCOL_EXCEPTION);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
ctx.sendDownstream(evt);
|
||||
@ -372,18 +389,27 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
|
||||
|
||||
private synchronized void updateConcurrentStreams(SpdySettingsFrame settings, boolean remote) {
|
||||
int newConcurrentStreams = settings.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
|
||||
if (newConcurrentStreams > 0) {
|
||||
if (remote) {
|
||||
remoteConcurrentStreams = newConcurrentStreams;
|
||||
if ((localConcurrentStreams == 0) || localConcurrentStreams > remoteConcurrentStreams) {
|
||||
maxConcurrentStreams = remoteConcurrentStreams;
|
||||
}
|
||||
} else {
|
||||
localConcurrentStreams = newConcurrentStreams;
|
||||
if ((remoteConcurrentStreams == 0) || remoteConcurrentStreams > localConcurrentStreams) {
|
||||
maxConcurrentStreams = localConcurrentStreams;
|
||||
}
|
||||
}
|
||||
if (remote) {
|
||||
remoteConcurrentStreams = newConcurrentStreams;
|
||||
} else {
|
||||
localConcurrentStreams = newConcurrentStreams;
|
||||
}
|
||||
if (localConcurrentStreams == remoteConcurrentStreams) {
|
||||
maxConcurrentStreams = localConcurrentStreams;
|
||||
return;
|
||||
}
|
||||
if (localConcurrentStreams == 0) {
|
||||
maxConcurrentStreams = remoteConcurrentStreams;
|
||||
return;
|
||||
}
|
||||
if (remoteConcurrentStreams == 0) {
|
||||
maxConcurrentStreams = localConcurrentStreams;
|
||||
return;
|
||||
}
|
||||
if (localConcurrentStreams > remoteConcurrentStreams) {
|
||||
maxConcurrentStreams = remoteConcurrentStreams;
|
||||
} else {
|
||||
maxConcurrentStreams = localConcurrentStreams;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,252 @@
|
||||
/*
|
||||
* Copyright 2012 Twitter, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License. You may obtain
|
||||
* a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.jboss.netty.handler.codec.spdy;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.jboss.netty.bootstrap.ClientBootstrap;
|
||||
import org.jboss.netty.bootstrap.ServerBootstrap;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.Channels;
|
||||
import org.jboss.netty.channel.ChannelFactory;
|
||||
import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelStateEvent;
|
||||
import org.jboss.netty.channel.ExceptionEvent;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
||||
import org.jboss.netty.util.TestUtil;
|
||||
import org.jboss.netty.util.internal.ExecutorUtil;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
public abstract class AbstractSocketSpdyEchoTest {
|
||||
|
||||
private static final Random random = new Random();
|
||||
static final ChannelBuffer frames = ChannelBuffers.buffer(1160);
|
||||
static final int ignoredBytes = 20;
|
||||
|
||||
private static ExecutorService executor;
|
||||
|
||||
static {
|
||||
// SPDY UNKNOWN Frame
|
||||
frames.writeByte(0x80);
|
||||
frames.writeByte(2);
|
||||
frames.writeShort(0xFFFF);
|
||||
frames.writeByte(0xFF);
|
||||
frames.writeMedium(4);
|
||||
frames.writeInt(random.nextInt());
|
||||
|
||||
// SPDY NOOP Frame
|
||||
frames.writeByte(0x80);
|
||||
frames.writeByte(2);
|
||||
frames.writeShort(5);
|
||||
frames.writeInt(0);
|
||||
|
||||
// SPDY Data Frame
|
||||
frames.writeInt(random.nextInt() & 0x7FFFFFFF);
|
||||
frames.writeByte(0x01);
|
||||
frames.writeMedium(1024);
|
||||
for (int i = 0; i < 256; i ++) {
|
||||
frames.writeInt(random.nextInt());
|
||||
}
|
||||
|
||||
// SPDY SYN_STREAM Frame
|
||||
frames.writeByte(0x80);
|
||||
frames.writeByte(2);
|
||||
frames.writeShort(1);
|
||||
frames.writeByte(0x03);
|
||||
frames.writeMedium(12);
|
||||
frames.writeInt(random.nextInt() & 0x7FFFFFFF);
|
||||
frames.writeInt(random.nextInt() & 0x7FFFFFFF);
|
||||
frames.writeShort(0x8000);
|
||||
frames.writeShort(0);
|
||||
|
||||
// SPDY SYN_REPLY Frame
|
||||
frames.writeByte(0x80);
|
||||
frames.writeByte(2);
|
||||
frames.writeShort(2);
|
||||
frames.writeByte(0x01);
|
||||
frames.writeMedium(8);
|
||||
frames.writeInt(random.nextInt() & 0x7FFFFFFF);
|
||||
frames.writeInt(0);
|
||||
|
||||
// SPDY RST_STREAM Frame
|
||||
frames.writeByte(0x80);
|
||||
frames.writeByte(2);
|
||||
frames.writeShort(3);
|
||||
frames.writeInt(8);
|
||||
frames.writeInt(random.nextInt() & 0x7FFFFFFF);
|
||||
frames.writeInt(random.nextInt() | 0x01);
|
||||
|
||||
// SPDY SETTINGS Frame
|
||||
frames.writeByte(0x80);
|
||||
frames.writeByte(2);
|
||||
frames.writeShort(4);
|
||||
frames.writeByte(0x01);
|
||||
frames.writeMedium(12);
|
||||
frames.writeInt(1);
|
||||
frames.writeMedium(random.nextInt());
|
||||
frames.writeByte(0x03);
|
||||
frames.writeInt(random.nextInt());
|
||||
|
||||
// SPDY PING Frame
|
||||
frames.writeByte(0x80);
|
||||
frames.writeByte(2);
|
||||
frames.writeShort(6);
|
||||
frames.writeInt(4);
|
||||
frames.writeInt(random.nextInt());
|
||||
|
||||
// SPDY GOAWAY Frame
|
||||
frames.writeByte(0x80);
|
||||
frames.writeByte(2);
|
||||
frames.writeShort(7);
|
||||
frames.writeInt(4);
|
||||
frames.writeInt(random.nextInt() & 0x7FFFFFFF);
|
||||
|
||||
// SPDY HEADERS Frame
|
||||
frames.writeByte(0x80);
|
||||
frames.writeByte(2);
|
||||
frames.writeShort(8);
|
||||
frames.writeInt(4);
|
||||
frames.writeInt(random.nextInt() & 0x7FFFFFFF);
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void init() {
|
||||
executor = Executors.newCachedThreadPool();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void destroy() {
|
||||
ExecutorUtil.terminate(executor);
|
||||
}
|
||||
|
||||
protected abstract ChannelFactory newServerSocketChannelFactory(Executor executor);
|
||||
protected abstract ChannelFactory newClientSocketChannelFactory(Executor executor);
|
||||
|
||||
@Test
|
||||
public void testSpdyEcho() throws Throwable {
|
||||
ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(executor));
|
||||
ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(executor));
|
||||
|
||||
EchoHandler sh = new EchoHandler(true);
|
||||
EchoHandler ch = new EchoHandler(false);
|
||||
|
||||
sb.getPipeline().addLast("decoder", new SpdyFrameDecoder());
|
||||
sb.getPipeline().addLast("encoder", new SpdyFrameEncoder());
|
||||
sb.getPipeline().addLast("handler", sh);
|
||||
|
||||
cb.getPipeline().addLast("handler", ch);
|
||||
|
||||
Channel sc = sb.bind(new InetSocketAddress(0));
|
||||
int port = ((InetSocketAddress) sc.getLocalAddress()).getPort();
|
||||
|
||||
ChannelFuture ccf = cb.connect(new InetSocketAddress(TestUtil.getLocalHost(), port));
|
||||
assertTrue(ccf.awaitUninterruptibly().isSuccess());
|
||||
|
||||
Channel cc = ccf.getChannel();
|
||||
cc.write(frames);
|
||||
|
||||
while (ch.counter < frames.writerIndex() - ignoredBytes) {
|
||||
if (sh.exception.get() != null) {
|
||||
break;
|
||||
}
|
||||
if (ch.exception.get() != null) {
|
||||
break;
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(1);
|
||||
} catch (InterruptedException e) {
|
||||
// Ignore.
|
||||
}
|
||||
}
|
||||
|
||||
sh.channel.close().awaitUninterruptibly();
|
||||
ch.channel.close().awaitUninterruptibly();
|
||||
sc.close().awaitUninterruptibly();
|
||||
|
||||
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
|
||||
throw sh.exception.get();
|
||||
}
|
||||
if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
|
||||
throw ch.exception.get();
|
||||
}
|
||||
if (sh.exception.get() != null) {
|
||||
throw sh.exception.get();
|
||||
}
|
||||
if (ch.exception.get() != null) {
|
||||
throw ch.exception.get();
|
||||
}
|
||||
}
|
||||
|
||||
private class EchoHandler extends SimpleChannelUpstreamHandler {
|
||||
volatile Channel channel;
|
||||
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||
volatile int counter;
|
||||
final boolean server;
|
||||
|
||||
EchoHandler(boolean server) {
|
||||
super();
|
||||
this.server = server;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
|
||||
throws Exception {
|
||||
channel = e.getChannel();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
|
||||
throws Exception {
|
||||
if (server) {
|
||||
Channels.write(channel, e.getMessage(), e.getRemoteAddress());
|
||||
} else {
|
||||
ChannelBuffer m = (ChannelBuffer) e.getMessage();
|
||||
byte[] actual = new byte[m.readableBytes()];
|
||||
m.getBytes(0, actual);
|
||||
|
||||
int lastIdx = counter;
|
||||
for (int i = 0; i < actual.length; i ++) {
|
||||
assertEquals(frames.getByte(ignoredBytes + i + lastIdx), actual[i]);
|
||||
}
|
||||
|
||||
counter += actual.length;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
|
||||
throws Exception {
|
||||
if (exception.compareAndSet(null, e.getCause())) {
|
||||
e.getChannel().close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,36 @@
|
||||
/*
|
||||
* Copyright 2012 Twitter, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License. You may obtain
|
||||
* a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.jboss.netty.handler.codec.spdy;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.jboss.netty.channel.ChannelFactory;
|
||||
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
|
||||
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
|
||||
|
||||
public class NioNioSocketSpdyEchoTest extends AbstractSocketSpdyEchoTest {
|
||||
|
||||
@Override
|
||||
protected ChannelFactory newClientSocketChannelFactory(Executor executor) {
|
||||
return new NioClientSocketChannelFactory(executor, executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ChannelFactory newServerSocketChannelFactory(Executor executor) {
|
||||
return new NioServerSocketChannelFactory(executor, executor);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,36 @@
|
||||
/*
|
||||
* Copyright 2012 Twitter, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License. You may obtain
|
||||
* a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.jboss.netty.handler.codec.spdy;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.jboss.netty.channel.ChannelFactory;
|
||||
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
|
||||
import org.jboss.netty.channel.socket.oio.OioServerSocketChannelFactory;
|
||||
|
||||
public class NioOioSocketSpdyEchoTest extends AbstractSocketSpdyEchoTest {
|
||||
|
||||
@Override
|
||||
protected ChannelFactory newClientSocketChannelFactory(Executor executor) {
|
||||
return new NioClientSocketChannelFactory(executor, executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ChannelFactory newServerSocketChannelFactory(Executor executor) {
|
||||
return new OioServerSocketChannelFactory(executor, executor);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,36 @@
|
||||
/*
|
||||
* Copyright 2012 Twitter, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License. You may obtain
|
||||
* a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.jboss.netty.handler.codec.spdy;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.jboss.netty.channel.ChannelFactory;
|
||||
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
|
||||
import org.jboss.netty.channel.socket.oio.OioClientSocketChannelFactory;
|
||||
|
||||
public class OioNioSocketSpdyEchoTest extends AbstractSocketSpdyEchoTest {
|
||||
|
||||
@Override
|
||||
protected ChannelFactory newClientSocketChannelFactory(Executor executor) {
|
||||
return new OioClientSocketChannelFactory(executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ChannelFactory newServerSocketChannelFactory(Executor executor) {
|
||||
return new NioServerSocketChannelFactory(executor, executor);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,36 @@
|
||||
/*
|
||||
* Copyright 2012 Twitter, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License. You may obtain
|
||||
* a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.jboss.netty.handler.codec.spdy;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.jboss.netty.channel.ChannelFactory;
|
||||
import org.jboss.netty.channel.socket.oio.OioClientSocketChannelFactory;
|
||||
import org.jboss.netty.channel.socket.oio.OioServerSocketChannelFactory;
|
||||
|
||||
public class OioOioSocketSpdyEchoTest extends AbstractSocketSpdyEchoTest {
|
||||
|
||||
@Override
|
||||
protected ChannelFactory newClientSocketChannelFactory(Executor executor) {
|
||||
return new OioClientSocketChannelFactory(executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ChannelFactory newServerSocketChannelFactory(Executor executor) {
|
||||
return new OioServerSocketChannelFactory(executor, executor);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,334 @@
|
||||
/*
|
||||
* Copyright 2012 Twitter, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License. You may obtain
|
||||
* a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.jboss.netty.handler.codec.spdy;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.jboss.netty.channel.Channels;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelStateEvent;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
||||
import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class SpdySessionHandlerTest {
|
||||
|
||||
private static final int closeSignal = SpdyCodecUtil.SPDY_SETTINGS_MAX_ID;
|
||||
private static final SpdySettingsFrame closeMessage = new DefaultSpdySettingsFrame();
|
||||
|
||||
static {
|
||||
closeMessage.setValue(closeSignal, 0);
|
||||
}
|
||||
|
||||
private void assertHeaderBlock(SpdyHeaderBlock received, SpdyHeaderBlock expected) {
|
||||
for (String name: expected.getHeaderNames()) {
|
||||
List<String> expectedValues = expected.getHeaders(name);
|
||||
List<String> receivedValues = received.getHeaders(name);
|
||||
Assert.assertTrue(receivedValues.containsAll(expectedValues));
|
||||
receivedValues.removeAll(expectedValues);
|
||||
Assert.assertTrue(receivedValues.isEmpty());
|
||||
received.removeHeader(name);
|
||||
}
|
||||
Assert.assertTrue(received.getHeaders().isEmpty());
|
||||
}
|
||||
|
||||
private void assertDataFrame(Object msg, int streamID, boolean last) {
|
||||
Assert.assertNotNull(msg);
|
||||
Assert.assertTrue(msg instanceof SpdyDataFrame);
|
||||
SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
|
||||
Assert.assertTrue(spdyDataFrame.getStreamID() == streamID);
|
||||
Assert.assertTrue(spdyDataFrame.isLast() == last);
|
||||
}
|
||||
|
||||
private void assertSynReply(Object msg, int streamID, boolean last, SpdyHeaderBlock headers) {
|
||||
Assert.assertNotNull(msg);
|
||||
Assert.assertTrue(msg instanceof SpdySynReplyFrame);
|
||||
SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
|
||||
Assert.assertTrue(spdySynReplyFrame.getStreamID() == streamID);
|
||||
Assert.assertTrue(spdySynReplyFrame.isLast() == last);
|
||||
assertHeaderBlock(spdySynReplyFrame, headers);
|
||||
}
|
||||
|
||||
private void assertRstStream(Object msg, int streamID, SpdyStreamStatus status) {
|
||||
Assert.assertNotNull(msg);
|
||||
Assert.assertTrue(msg instanceof SpdyRstStreamFrame);
|
||||
SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
|
||||
Assert.assertTrue(spdyRstStreamFrame.getStreamID() == streamID);
|
||||
Assert.assertTrue(spdyRstStreamFrame.getStatus().equals(status));
|
||||
}
|
||||
|
||||
private void assertPing(Object msg, int ID) {
|
||||
Assert.assertNotNull(msg);
|
||||
Assert.assertTrue(msg instanceof SpdyPingFrame);
|
||||
SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
|
||||
Assert.assertTrue(spdyPingFrame.getID() == ID);
|
||||
}
|
||||
|
||||
private void assertGoAway(Object msg, int lastGoodStreamID) {
|
||||
Assert.assertNotNull(msg);
|
||||
Assert.assertTrue(msg instanceof SpdyGoAwayFrame);
|
||||
SpdyGoAwayFrame spdyGoAwayFrame = (SpdyGoAwayFrame) msg;
|
||||
Assert.assertTrue(spdyGoAwayFrame.getLastGoodStreamID() == lastGoodStreamID);
|
||||
}
|
||||
|
||||
private void assertHeaders(Object msg, int streamID, SpdyHeaderBlock headers) {
|
||||
Assert.assertNotNull(msg);
|
||||
Assert.assertTrue(msg instanceof SpdyHeadersFrame);
|
||||
SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
|
||||
Assert.assertTrue(spdyHeadersFrame.getStreamID() == streamID);
|
||||
assertHeaderBlock(spdyHeadersFrame, headers);
|
||||
}
|
||||
|
||||
private void testSpdySessionHandler(boolean server) {
|
||||
DecoderEmbedder<Object> sessionHandler =
|
||||
new DecoderEmbedder<Object>(
|
||||
new SpdySessionHandler(server), new EchoHandler(closeSignal, server));
|
||||
sessionHandler.pollAll();
|
||||
|
||||
int localStreamID = server ? 1 : 2;
|
||||
int remoteStreamID = server ? 2 : 1;
|
||||
|
||||
SpdyPingFrame localPingFrame = new DefaultSpdyPingFrame(localStreamID);
|
||||
SpdyPingFrame remotePingFrame = new DefaultSpdyPingFrame(remoteStreamID);
|
||||
|
||||
SpdySynStreamFrame spdySynStreamFrame =
|
||||
new DefaultSpdySynStreamFrame(localStreamID, 0, (byte) 0);
|
||||
spdySynStreamFrame.setHeader("Compression", "test");
|
||||
|
||||
SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(localStreamID);
|
||||
spdyDataFrame.setLast(true);
|
||||
|
||||
// Check if session handler returns INVALID_STREAM if it receives
|
||||
// a data frame for a Stream-ID that is not open
|
||||
sessionHandler.offer(new DefaultSpdyDataFrame(localStreamID));
|
||||
assertRstStream(sessionHandler.poll(), localStreamID, SpdyStreamStatus.INVALID_STREAM);
|
||||
Assert.assertNull(sessionHandler.peek());
|
||||
|
||||
// Check if session handler returns PROTOCOL_ERROR if it receives
|
||||
// a data frame for a Stream-ID before receiving a SYN_REPLY frame
|
||||
sessionHandler.offer(new DefaultSpdyDataFrame(remoteStreamID));
|
||||
assertRstStream(sessionHandler.poll(), remoteStreamID, SpdyStreamStatus.PROTOCOL_ERROR);
|
||||
Assert.assertNull(sessionHandler.peek());
|
||||
remoteStreamID += 2;
|
||||
|
||||
// Check if session handler returns PROTOCOL_ERROR if it receives
|
||||
// multiple SYN_REPLY frames for the same active Stream-ID
|
||||
sessionHandler.offer(new DefaultSpdySynReplyFrame(remoteStreamID));
|
||||
Assert.assertNull(sessionHandler.peek());
|
||||
sessionHandler.offer(new DefaultSpdySynReplyFrame(remoteStreamID));
|
||||
assertRstStream(sessionHandler.poll(), remoteStreamID, SpdyStreamStatus.PROTOCOL_ERROR);
|
||||
Assert.assertNull(sessionHandler.peek());
|
||||
remoteStreamID += 2;
|
||||
|
||||
// Check if frame codec correctly compresses/uncompresses headers
|
||||
sessionHandler.offer(spdySynStreamFrame);
|
||||
assertSynReply(sessionHandler.poll(), localStreamID, false, spdySynStreamFrame);
|
||||
Assert.assertNull(sessionHandler.peek());
|
||||
SpdyHeadersFrame spdyHeadersFrame = new DefaultSpdyHeadersFrame(localStreamID);
|
||||
spdyHeadersFrame.addHeader("HEADER","test1");
|
||||
spdyHeadersFrame.addHeader("HEADER","test2");
|
||||
sessionHandler.offer(spdyHeadersFrame);
|
||||
assertHeaders(sessionHandler.poll(), localStreamID, spdyHeadersFrame);
|
||||
Assert.assertNull(sessionHandler.peek());
|
||||
localStreamID += 2;
|
||||
|
||||
// Check if session handler closed the streams using the number
|
||||
// of concurrent streams and that it returns REFUSED_STREAM
|
||||
// if it receives a SYN_STREAM frame it does not wish to accept
|
||||
spdySynStreamFrame.setStreamID(localStreamID);
|
||||
spdySynStreamFrame.setLast(true);
|
||||
spdySynStreamFrame.setUnidirectional(true);
|
||||
sessionHandler.offer(spdySynStreamFrame);
|
||||
assertRstStream(sessionHandler.poll(), localStreamID, SpdyStreamStatus.REFUSED_STREAM);
|
||||
Assert.assertNull(sessionHandler.peek());
|
||||
|
||||
// Check if session handler drops active streams if it receives
|
||||
// a RST_STREAM frame for that Stream-ID
|
||||
sessionHandler.offer(new DefaultSpdyRstStreamFrame(remoteStreamID, 3));
|
||||
Assert.assertNull(sessionHandler.peek());
|
||||
remoteStreamID += 2;
|
||||
|
||||
// Check if session handler honors UNIDIRECTIONAL streams
|
||||
spdySynStreamFrame.setLast(false);
|
||||
sessionHandler.offer(spdySynStreamFrame);
|
||||
Assert.assertNull(sessionHandler.peek());
|
||||
spdySynStreamFrame.setUnidirectional(false);
|
||||
|
||||
// Check if session handler returns PROTOCOL_ERROR if it receives
|
||||
// multiple SYN_STREAM frames for the same active Stream-ID
|
||||
sessionHandler.offer(spdySynStreamFrame);
|
||||
assertRstStream(sessionHandler.poll(), localStreamID, SpdyStreamStatus.PROTOCOL_ERROR);
|
||||
Assert.assertNull(sessionHandler.peek());
|
||||
localStreamID += 2;
|
||||
|
||||
// Check if session handler returns PROTOCOL_ERROR if it receives
|
||||
// a SYN_STREAM frame with an invalid Stream-ID
|
||||
spdySynStreamFrame.setStreamID(localStreamID - 1);
|
||||
sessionHandler.offer(spdySynStreamFrame);
|
||||
assertRstStream(sessionHandler.poll(), localStreamID - 1, SpdyStreamStatus.PROTOCOL_ERROR);
|
||||
Assert.assertNull(sessionHandler.peek());
|
||||
spdySynStreamFrame.setStreamID(localStreamID);
|
||||
|
||||
// Check if session handler correctly limits the number of
|
||||
// concurrent streams in the SETTINGS frame
|
||||
SpdySettingsFrame spdySettingsFrame = new DefaultSpdySettingsFrame();
|
||||
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 2);
|
||||
sessionHandler.offer(spdySettingsFrame);
|
||||
Assert.assertNull(sessionHandler.peek());
|
||||
sessionHandler.offer(spdySynStreamFrame);
|
||||
assertRstStream(sessionHandler.poll(), localStreamID, SpdyStreamStatus.REFUSED_STREAM);
|
||||
Assert.assertNull(sessionHandler.peek());
|
||||
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 4);
|
||||
sessionHandler.offer(spdySettingsFrame);
|
||||
Assert.assertNull(sessionHandler.peek());
|
||||
sessionHandler.offer(spdySynStreamFrame);
|
||||
assertSynReply(sessionHandler.poll(), localStreamID, false, spdySynStreamFrame);
|
||||
Assert.assertNull(sessionHandler.peek());
|
||||
|
||||
// Check if session handler rejects HEADERS for closed streams
|
||||
int testStreamID = spdyDataFrame.getStreamID();
|
||||
sessionHandler.offer(spdyDataFrame);
|
||||
assertDataFrame(sessionHandler.poll(), testStreamID, spdyDataFrame.isLast());
|
||||
Assert.assertNull(sessionHandler.peek());
|
||||
spdyHeadersFrame.setStreamID(testStreamID);
|
||||
sessionHandler.offer(spdyHeadersFrame);
|
||||
assertRstStream(sessionHandler.poll(), testStreamID, SpdyStreamStatus.INVALID_STREAM);
|
||||
Assert.assertNull(sessionHandler.peek());
|
||||
|
||||
// Check if session handler returns PROTOCOL_ERROR if it receives
|
||||
// an invalid HEADERS frame
|
||||
spdyHeadersFrame.setStreamID(localStreamID);
|
||||
spdyHeadersFrame.setInvalid();
|
||||
sessionHandler.offer(spdyHeadersFrame);
|
||||
assertRstStream(sessionHandler.poll(), localStreamID, SpdyStreamStatus.PROTOCOL_ERROR);
|
||||
Assert.assertNull(sessionHandler.peek());
|
||||
|
||||
// Check if session handler returns identical local PINGs
|
||||
sessionHandler.offer(localPingFrame);
|
||||
assertPing(sessionHandler.poll(), localPingFrame.getID());
|
||||
Assert.assertNull(sessionHandler.peek());
|
||||
|
||||
// Check if session handler ignores un-initiated remote PINGs
|
||||
sessionHandler.offer(remotePingFrame);
|
||||
Assert.assertNull(sessionHandler.peek());
|
||||
|
||||
// Check if session handler sends a GOAWAY frame when closing
|
||||
sessionHandler.offer(closeMessage);
|
||||
assertGoAway(sessionHandler.poll(), localStreamID);
|
||||
Assert.assertNull(sessionHandler.peek());
|
||||
localStreamID += 2;
|
||||
|
||||
// Check if session handler returns REFUSED_STREAM if it receives
|
||||
// SYN_STREAM frames after sending a GOAWAY frame
|
||||
spdySynStreamFrame.setStreamID(localStreamID);
|
||||
sessionHandler.offer(spdySynStreamFrame);
|
||||
assertRstStream(sessionHandler.poll(), localStreamID, SpdyStreamStatus.REFUSED_STREAM);
|
||||
Assert.assertNull(sessionHandler.peek());
|
||||
|
||||
// Check if session handler ignores Data frames after sending
|
||||
// a GOAWAY frame
|
||||
spdyDataFrame.setStreamID(localStreamID);
|
||||
sessionHandler.offer(spdyDataFrame);
|
||||
Assert.assertNull(sessionHandler.peek());
|
||||
|
||||
sessionHandler.finish();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSpdyClientSessionHandler() {
|
||||
testSpdySessionHandler(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSpdyServerSessionHandler() {
|
||||
testSpdySessionHandler(true);
|
||||
}
|
||||
|
||||
// Echo Handler opens 4 half-closed streams on session connection
|
||||
// and then sets the number of concurrent streams to 3
|
||||
private class EchoHandler extends SimpleChannelUpstreamHandler {
|
||||
private int closeSignal;
|
||||
private boolean server;
|
||||
|
||||
EchoHandler(int closeSignal, boolean server) {
|
||||
super();
|
||||
this.closeSignal = closeSignal;
|
||||
this.server = server;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
|
||||
throws Exception {
|
||||
|
||||
// Initiate 4 new streams
|
||||
int streamID = server ? 2 : 1;
|
||||
SpdySynStreamFrame spdySynStreamFrame =
|
||||
new DefaultSpdySynStreamFrame(streamID, 0, (byte) 0);
|
||||
spdySynStreamFrame.setLast(true);
|
||||
Channels.write(e.getChannel(), spdySynStreamFrame);
|
||||
spdySynStreamFrame.setStreamID(spdySynStreamFrame.getStreamID() + 2);
|
||||
Channels.write(e.getChannel(), spdySynStreamFrame);
|
||||
spdySynStreamFrame.setStreamID(spdySynStreamFrame.getStreamID() + 2);
|
||||
Channels.write(e.getChannel(), spdySynStreamFrame);
|
||||
spdySynStreamFrame.setStreamID(spdySynStreamFrame.getStreamID() + 2);
|
||||
Channels.write(e.getChannel(), spdySynStreamFrame);
|
||||
|
||||
// Limit the number of concurrent streams to 3
|
||||
SpdySettingsFrame spdySettingsFrame = new DefaultSpdySettingsFrame();
|
||||
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 3);
|
||||
Channels.write(e.getChannel(), spdySettingsFrame);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
|
||||
throws Exception {
|
||||
Object msg = e.getMessage();
|
||||
if ((msg instanceof SpdyDataFrame) ||
|
||||
(msg instanceof SpdyPingFrame) ||
|
||||
(msg instanceof SpdyHeadersFrame)) {
|
||||
|
||||
Channels.write(e.getChannel(), msg, e.getRemoteAddress());
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg instanceof SpdySynStreamFrame) {
|
||||
|
||||
SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
|
||||
|
||||
int streamID = spdySynStreamFrame.getStreamID();
|
||||
SpdySynReplyFrame spdySynReplyFrame = new DefaultSpdySynReplyFrame(streamID);
|
||||
spdySynReplyFrame.setLast(spdySynStreamFrame.isLast());
|
||||
for (Map.Entry<String, String> entry: spdySynStreamFrame.getHeaders()) {
|
||||
spdySynReplyFrame.addHeader(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
Channels.write(e.getChannel(), spdySynReplyFrame, e.getRemoteAddress());
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg instanceof SpdySettingsFrame) {
|
||||
|
||||
SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
|
||||
if (spdySettingsFrame.isSet(closeSignal)) {
|
||||
Channels.close(e.getChannel());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user