Port HttpContentEncoder/Decoder to use EmbeddedStreamChannel / Cleanup

- Removed unused constructor parameter in AbstractChannel
- Re-enabled GZIP encoding in HTTP snoop example
This commit is contained in:
Trustin Lee 2012-06-07 21:06:56 +09:00
parent 3442ff90e8
commit 994038975a
27 changed files with 223 additions and 237 deletions

View File

@ -15,10 +15,9 @@
*/
package io.netty.handler.codec.http;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.embedded.EmbeddedStreamChannel;
import io.netty.handler.codec.compression.ZlibEncoder;
import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.handler.codec.embedder.EncoderEmbedder;
/**
* Compresses an {@link HttpMessage} and an {@link HttpChunk} in {@code gzip} or
@ -119,7 +118,7 @@ public class HttpContentCompressor extends HttpContentEncoder {
return new Result(
targetContentEncoding,
new EncoderEmbedder<ChannelBuffer>(
new EmbeddedStreamChannel(
new ZlibEncoder(wrapper, compressionLevel, windowBits, memLevel)));
}

View File

@ -18,8 +18,8 @@ package io.netty.handler.codec.http;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedStreamChannel;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.embedder.DecoderEmbedder;
/**
* Decodes the content of the received {@link HttpRequest} and {@link HttpChunk}.
@ -42,7 +42,7 @@ import io.netty.handler.codec.embedder.DecoderEmbedder;
*/
public abstract class HttpContentDecoder extends MessageToMessageDecoder<Object, Object> {
private DecoderEmbedder<ChannelBuffer> decoder;
private EmbeddedStreamChannel decoder;
/**
* Creates a new instance.
@ -84,15 +84,16 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<Object,
if (!m.isChunked()) {
ChannelBuffer content = m.getContent();
// Decode the content
content = ChannelBuffers.wrappedBuffer(
decode(content), finishDecode());
ChannelBuffer newContent = ChannelBuffers.dynamicBuffer();
decode(content, newContent);
finishDecode(newContent);
// Replace the content.
m.setContent(content);
m.setContent(newContent);
if (m.containsHeader(HttpHeaders.Names.CONTENT_LENGTH)) {
m.setHeader(
HttpHeaders.Names.CONTENT_LENGTH,
Integer.toString(content.readableBytes()));
Integer.toString(newContent.readableBytes()));
}
}
}
@ -103,12 +104,16 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<Object,
// Decode the chunk if necessary.
if (decoder != null) {
if (!c.isLast()) {
content = decode(content);
if (content.readable()) {
c.setContent(content);
ChannelBuffer newContent = ChannelBuffers.dynamicBuffer();
decode(content, newContent);
if (newContent.readable()) {
c.setContent(newContent);
} else {
return null;
}
} else {
ChannelBuffer lastProduct = finishDecode();
ChannelBuffer lastProduct = ChannelBuffers.dynamicBuffer();
finishDecode(lastProduct);
// Generate an additional chunk if the decoder produced
// the last product on closure,
@ -132,7 +137,7 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<Object,
* {@code null} otherwise (alternatively, you can throw an exception
* to block unknown encoding).
*/
protected abstract DecoderEmbedder<ChannelBuffer> newContentDecoder(String contentEncoding) throws Exception;
protected abstract EmbeddedStreamChannel newContentDecoder(String contentEncoding) throws Exception;
/**
* Returns the expected content encoding of the decoded content.
@ -146,19 +151,25 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<Object,
return HttpHeaders.Values.IDENTITY;
}
private ChannelBuffer decode(ChannelBuffer buf) {
decoder.offer(buf);
return ChannelBuffers.wrappedBuffer(decoder.pollAll(new ChannelBuffer[decoder.size()]));
private void decode(ChannelBuffer in, ChannelBuffer out) {
decoder.writeInbound(in);
fetchDecoderOutput(out);
}
private ChannelBuffer finishDecode() {
ChannelBuffer result;
private void finishDecode(ChannelBuffer out) {
if (decoder.finish()) {
result = ChannelBuffers.wrappedBuffer(decoder.pollAll(new ChannelBuffer[decoder.size()]));
} else {
result = ChannelBuffers.EMPTY_BUFFER;
fetchDecoderOutput(out);
}
decoder = null;
return result;
}
private void fetchDecoderOutput(ChannelBuffer out) {
for (;;) {
ChannelBuffer buf = (ChannelBuffer) decoder.readInbound();
if (buf == null) {
break;
}
out.writeBytes(buf);
}
}
}

View File

@ -15,10 +15,9 @@
*/
package io.netty.handler.codec.http;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.embedded.EmbeddedStreamChannel;
import io.netty.handler.codec.compression.ZlibDecoder;
import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.handler.codec.embedder.DecoderEmbedder;
/**
* Decompresses an {@link HttpMessage} and an {@link HttpChunk} compressed in
@ -27,12 +26,12 @@ import io.netty.handler.codec.embedder.DecoderEmbedder;
*/
public class HttpContentDecompressor extends HttpContentDecoder {
@Override
protected DecoderEmbedder<ChannelBuffer> newContentDecoder(String contentEncoding) throws Exception {
protected EmbeddedStreamChannel newContentDecoder(String contentEncoding) throws Exception {
if ("gzip".equalsIgnoreCase(contentEncoding) || "x-gzip".equalsIgnoreCase(contentEncoding)) {
return new DecoderEmbedder<ChannelBuffer>(new ZlibDecoder(ZlibWrapper.GZIP));
return new EmbeddedStreamChannel(new ZlibDecoder(ZlibWrapper.GZIP));
} else if ("deflate".equalsIgnoreCase(contentEncoding) || "x-deflate".equalsIgnoreCase(contentEncoding)) {
// To be strict, 'deflate' means ZLIB, but some servers were not implemented correctly.
return new DecoderEmbedder<ChannelBuffer>(new ZlibDecoder(ZlibWrapper.ZLIB_OR_NONE));
return new EmbeddedStreamChannel(new ZlibDecoder(ZlibWrapper.ZLIB_OR_NONE));
}
// 'identity' or unsupported

View File

@ -18,8 +18,8 @@ package io.netty.handler.codec.http;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedStreamChannel;
import io.netty.handler.codec.MessageToMessageCodec;
import io.netty.handler.codec.embedder.EncoderEmbedder;
import io.netty.util.internal.QueueFactory;
import java.util.Queue;
@ -49,7 +49,7 @@ import java.util.Queue;
public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessage, HttpMessage, Object, Object> {
private final Queue<String> acceptEncodingQueue = QueueFactory.createQueue();
private volatile EncoderEmbedder<ChannelBuffer> encoder;
private EmbeddedStreamChannel encoder;
/**
* Creates a new instance.
@ -117,15 +117,16 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessa
if (!m.isChunked()) {
ChannelBuffer content = m.getContent();
// Encode the content.
content = ChannelBuffers.wrappedBuffer(
encode(content), finishEncode());
ChannelBuffer newContent = ChannelBuffers.dynamicBuffer();
encode(content, newContent);
finishEncode(newContent);
// Replace the content.
m.setContent(content);
m.setContent(newContent);
if (m.containsHeader(HttpHeaders.Names.CONTENT_LENGTH)) {
m.setHeader(
HttpHeaders.Names.CONTENT_LENGTH,
Integer.toString(content.readableBytes()));
Integer.toString(newContent.readableBytes()));
}
}
} else if (msg instanceof HttpChunk) {
@ -135,12 +136,16 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessa
// Encode the chunk if necessary.
if (encoder != null) {
if (!c.isLast()) {
content = encode(content);
ChannelBuffer newContent = ChannelBuffers.dynamicBuffer();
encode(content, newContent);
if (content.readable()) {
c.setContent(content);
c.setContent(newContent);
} else {
return null;
}
} else {
ChannelBuffer lastProduct = finishEncode();
ChannelBuffer lastProduct = ChannelBuffers.dynamicBuffer();
finishEncode(lastProduct);
// Generate an additional chunk if the decoder produced
// the last product on closure,
@ -171,27 +176,33 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessa
*/
protected abstract Result beginEncode(HttpMessage msg, String acceptEncoding) throws Exception;
private ChannelBuffer encode(ChannelBuffer buf) {
encoder.offer(buf);
return ChannelBuffers.wrappedBuffer(encoder.pollAll(new ChannelBuffer[encoder.size()]));
private void encode(ChannelBuffer in, ChannelBuffer out) {
encoder.writeOutbound(in);
fetchEncoderOutput(out);
}
private ChannelBuffer finishEncode() {
ChannelBuffer result;
private void finishEncode(ChannelBuffer out) {
if (encoder.finish()) {
result = ChannelBuffers.wrappedBuffer(encoder.pollAll(new ChannelBuffer[encoder.size()]));
} else {
result = ChannelBuffers.EMPTY_BUFFER;
fetchEncoderOutput(out);
}
encoder = null;
return result;
}
private void fetchEncoderOutput(ChannelBuffer out) {
for (;;) {
ChannelBuffer buf = encoder.readOutbound();
if (buf == null) {
break;
}
out.writeBytes(buf);
}
}
public static final class Result {
private final String targetContentEncoding;
private final EncoderEmbedder<ChannelBuffer> contentEncoder;
private final EmbeddedStreamChannel contentEncoder;
public Result(String targetContentEncoding, EncoderEmbedder<ChannelBuffer> contentEncoder) {
public Result(String targetContentEncoding, EmbeddedStreamChannel contentEncoder) {
if (targetContentEncoding == null) {
throw new NullPointerException("targetContentEncoding");
}
@ -207,7 +218,7 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessa
return targetContentEncoding;
}
public EncoderEmbedder<ChannelBuffer> getContentEncoder() {
public EmbeddedStreamChannel getContentEncoder() {
return contentEncoder;
}
}

View File

@ -17,8 +17,8 @@ package io.netty.handler.codec.rtsp;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedMessageChannel;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.embedder.DecoderEmbedder;
import io.netty.handler.codec.http.HttpChunkAggregator;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpMessageDecoder;
@ -54,7 +54,7 @@ import io.netty.handler.codec.http.HttpMessageDecoder;
*/
public abstract class RtspMessageDecoder extends HttpMessageDecoder {
private final DecoderEmbedder<HttpMessage> aggregator;
private final EmbeddedMessageChannel aggregator;
/**
* Creates a new instance with the default
@ -70,15 +70,15 @@ public abstract class RtspMessageDecoder extends HttpMessageDecoder {
*/
protected RtspMessageDecoder(int maxInitialLineLength, int maxHeaderSize, int maxContentLength) {
super(maxInitialLineLength, maxHeaderSize, maxContentLength * 2);
aggregator = new DecoderEmbedder<HttpMessage>(new HttpChunkAggregator(maxContentLength));
aggregator = new EmbeddedMessageChannel(new HttpChunkAggregator(maxContentLength));
}
@Override
public Object decode(ChannelHandlerContext ctx, ChannelBuffer buffer) throws Exception {
Object o = super.decode(ctx, buffer);
if (o != null && aggregator.offer(o)) {
return aggregator.poll();
if (o != null && aggregator.writeInbound(o)) {
return aggregator.readInbound();
} else {
return null;
}

View File

@ -16,12 +16,10 @@
package io.netty.handler.codec.http;
import static org.junit.Assert.*;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.embedded.EmbeddedStreamChannel;
import io.netty.handler.codec.CodecException;
import io.netty.handler.codec.PrematureChannelClosureException;
import io.netty.handler.codec.embedder.DecoderEmbedder;
import io.netty.handler.codec.embedder.EncoderEmbedder;
import io.netty.util.CharsetUtil;
import org.junit.Test;
@ -38,42 +36,33 @@ public class HttpClientCodecTest {
@Test
public void testFailsNotOnRequestResponse() {
HttpClientCodec codec = new HttpClientCodec(4096, 8192, 8192, true);
DecoderEmbedder<ChannelBuffer> decoder = new DecoderEmbedder<ChannelBuffer>(codec);
EncoderEmbedder<ChannelBuffer> encoder = new EncoderEmbedder<ChannelBuffer>(codec);
encoder.offer(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "http://localhost/"));
decoder.offer(ChannelBuffers.copiedBuffer(RESPONSE, CharsetUtil.ISO_8859_1));
encoder.finish();
decoder.finish();
EmbeddedStreamChannel ch = new EmbeddedStreamChannel(codec);
ch.writeOutbound(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "http://localhost/"));
ch.writeInbound(ChannelBuffers.copiedBuffer(RESPONSE, CharsetUtil.ISO_8859_1));
ch.finish();
}
@Test
public void testFailsNotOnRequestResponseChunked() {
HttpClientCodec codec = new HttpClientCodec(4096, 8192, 8192, true);
DecoderEmbedder<ChannelBuffer> decoder = new DecoderEmbedder<ChannelBuffer>(codec);
EncoderEmbedder<ChannelBuffer> encoder = new EncoderEmbedder<ChannelBuffer>(codec);
encoder.offer(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "http://localhost/"));
decoder.offer(ChannelBuffers.copiedBuffer(CHUNKED_RESPONSE, CharsetUtil.ISO_8859_1));
encoder.finish();
decoder.finish();
EmbeddedStreamChannel ch = new EmbeddedStreamChannel(codec);
ch.writeOutbound(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "http://localhost/"));
ch.writeInbound(ChannelBuffers.copiedBuffer(CHUNKED_RESPONSE, CharsetUtil.ISO_8859_1));
ch.finish();
}
@Test
public void testFailsOnMissingResponse() {
HttpClientCodec codec = new HttpClientCodec(4096, 8192, 8192, true);
EncoderEmbedder<ChannelBuffer> encoder = new EncoderEmbedder<ChannelBuffer>(codec);
EmbeddedStreamChannel ch = new EmbeddedStreamChannel(codec);
assertTrue(encoder.offer(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "http://localhost/")));
assertNotNull(encoder.poll());
assertTrue(encoder.finish());
assertTrue(ch.writeOutbound(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "http://localhost/")));
assertNotNull(ch.readOutbound());
try {
encoder.poll();
ch.finish();
fail();
} catch (CodecException e) {
assertTrue(e instanceof PrematureChannelClosureException);
@ -84,19 +73,16 @@ public class HttpClientCodecTest {
@Test
public void testFailsOnIncompleteChunkedResponse() {
HttpClientCodec codec = new HttpClientCodec(4096, 8192, 8192, true);
DecoderEmbedder<ChannelBuffer> decoder = new DecoderEmbedder<ChannelBuffer>(codec);
EmbeddedStreamChannel ch = new EmbeddedStreamChannel(codec);
EncoderEmbedder<ChannelBuffer> encoder = new EncoderEmbedder<ChannelBuffer>(codec);
encoder.offer(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "http://localhost/"));
decoder.offer(ChannelBuffers.copiedBuffer(INCOMPLETE_CHUNKED_RESPONSE, CharsetUtil.ISO_8859_1));
ch.writeOutbound(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "http://localhost/"));
ch.writeInbound(ChannelBuffers.copiedBuffer(INCOMPLETE_CHUNKED_RESPONSE, CharsetUtil.ISO_8859_1));
try {
encoder.finish();
decoder.finish();
ch.finish();
fail();
} catch (CodecException e) {
assertTrue(e.getCause() instanceof PrematureChannelClosureException);
assertTrue(e instanceof PrematureChannelClosureException);
}
}

View File

@ -17,7 +17,7 @@ package io.netty.handler.codec.spdy;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.handler.codec.embedder.DecoderEmbedder;
import io.netty.channel.embedded.EmbeddedMessageChannel;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
@ -99,10 +99,12 @@ public class SpdySessionHandlerTest {
}
private void testSpdySessionHandler(int version, boolean server) {
DecoderEmbedder<Object> sessionHandler =
new DecoderEmbedder<Object>(
new SpdySessionHandler(version, server), new EchoHandler(closeSignal, server));
sessionHandler.pollAll();
EmbeddedMessageChannel sessionHandler = new EmbeddedMessageChannel(
new SpdySessionHandler(version, server), new EchoHandler(closeSignal, server));
while (sessionHandler.readInbound() != null) {
continue;
}
int localStreamID = server ? 1 : 2;
int remoteStreamID = server ? 2 : 1;
@ -119,36 +121,36 @@ public class SpdySessionHandlerTest {
// Check if session handler returns INVALID_STREAM if it receives
// a data frame for a Stream-ID that is not open
sessionHandler.offer(new DefaultSpdyDataFrame(localStreamID));
assertRstStream(sessionHandler.poll(), localStreamID, SpdyStreamStatus.INVALID_STREAM);
Assert.assertNull(sessionHandler.peek());
sessionHandler.writeInbound(new DefaultSpdyDataFrame(localStreamID));
assertRstStream(sessionHandler.readInbound(), localStreamID, SpdyStreamStatus.INVALID_STREAM);
Assert.assertNull(sessionHandler.readInbound());
// Check if session handler returns PROTOCOL_ERROR if it receives
// a data frame for a Stream-ID before receiving a SYN_REPLY frame
sessionHandler.offer(new DefaultSpdyDataFrame(remoteStreamID));
assertRstStream(sessionHandler.poll(), remoteStreamID, SpdyStreamStatus.PROTOCOL_ERROR);
Assert.assertNull(sessionHandler.peek());
sessionHandler.writeInbound(new DefaultSpdyDataFrame(remoteStreamID));
assertRstStream(sessionHandler.readInbound(), remoteStreamID, SpdyStreamStatus.PROTOCOL_ERROR);
Assert.assertNull(sessionHandler.readInbound());
remoteStreamID += 2;
// Check if session handler returns PROTOCOL_ERROR if it receives
// multiple SYN_REPLY frames for the same active Stream-ID
sessionHandler.offer(new DefaultSpdySynReplyFrame(remoteStreamID));
Assert.assertNull(sessionHandler.peek());
sessionHandler.offer(new DefaultSpdySynReplyFrame(remoteStreamID));
assertRstStream(sessionHandler.poll(), remoteStreamID, SpdyStreamStatus.STREAM_IN_USE);
Assert.assertNull(sessionHandler.peek());
sessionHandler.writeInbound(new DefaultSpdySynReplyFrame(remoteStreamID));
Assert.assertNull(sessionHandler.readInbound());
sessionHandler.writeInbound(new DefaultSpdySynReplyFrame(remoteStreamID));
assertRstStream(sessionHandler.readInbound(), remoteStreamID, SpdyStreamStatus.STREAM_IN_USE);
Assert.assertNull(sessionHandler.readInbound());
remoteStreamID += 2;
// Check if frame codec correctly compresses/uncompresses headers
sessionHandler.offer(spdySynStreamFrame);
assertSynReply(sessionHandler.poll(), localStreamID, false, spdySynStreamFrame);
Assert.assertNull(sessionHandler.peek());
sessionHandler.writeInbound(spdySynStreamFrame);
assertSynReply(sessionHandler.readInbound(), localStreamID, false, spdySynStreamFrame);
Assert.assertNull(sessionHandler.readInbound());
SpdyHeadersFrame spdyHeadersFrame = new DefaultSpdyHeadersFrame(localStreamID);
spdyHeadersFrame.addHeader("HEADER","test1");
spdyHeadersFrame.addHeader("HEADER","test2");
sessionHandler.offer(spdyHeadersFrame);
assertHeaders(sessionHandler.poll(), localStreamID, spdyHeadersFrame);
Assert.assertNull(sessionHandler.peek());
sessionHandler.writeInbound(spdyHeadersFrame);
assertHeaders(sessionHandler.readInbound(), localStreamID, spdyHeadersFrame);
Assert.assertNull(sessionHandler.readInbound());
localStreamID += 2;
// Check if session handler closed the streams using the number
@ -157,98 +159,98 @@ public class SpdySessionHandlerTest {
spdySynStreamFrame.setStreamID(localStreamID);
spdySynStreamFrame.setLast(true);
spdySynStreamFrame.setUnidirectional(true);
sessionHandler.offer(spdySynStreamFrame);
assertRstStream(sessionHandler.poll(), localStreamID, SpdyStreamStatus.REFUSED_STREAM);
Assert.assertNull(sessionHandler.peek());
sessionHandler.writeInbound(spdySynStreamFrame);
assertRstStream(sessionHandler.readInbound(), localStreamID, SpdyStreamStatus.REFUSED_STREAM);
Assert.assertNull(sessionHandler.readInbound());
// Check if session handler drops active streams if it receives
// a RST_STREAM frame for that Stream-ID
sessionHandler.offer(new DefaultSpdyRstStreamFrame(remoteStreamID, 3));
Assert.assertNull(sessionHandler.peek());
sessionHandler.writeInbound(new DefaultSpdyRstStreamFrame(remoteStreamID, 3));
Assert.assertNull(sessionHandler.readInbound());
remoteStreamID += 2;
// Check if session handler honors UNIDIRECTIONAL streams
spdySynStreamFrame.setLast(false);
sessionHandler.offer(spdySynStreamFrame);
Assert.assertNull(sessionHandler.peek());
sessionHandler.writeInbound(spdySynStreamFrame);
Assert.assertNull(sessionHandler.readInbound());
spdySynStreamFrame.setUnidirectional(false);
// Check if session handler returns PROTOCOL_ERROR if it receives
// multiple SYN_STREAM frames for the same active Stream-ID
sessionHandler.offer(spdySynStreamFrame);
assertRstStream(sessionHandler.poll(), localStreamID, SpdyStreamStatus.PROTOCOL_ERROR);
Assert.assertNull(sessionHandler.peek());
sessionHandler.writeInbound(spdySynStreamFrame);
assertRstStream(sessionHandler.readInbound(), localStreamID, SpdyStreamStatus.PROTOCOL_ERROR);
Assert.assertNull(sessionHandler.readInbound());
localStreamID += 2;
// Check if session handler returns PROTOCOL_ERROR if it receives
// a SYN_STREAM frame with an invalid Stream-ID
spdySynStreamFrame.setStreamID(localStreamID - 1);
sessionHandler.offer(spdySynStreamFrame);
assertRstStream(sessionHandler.poll(), localStreamID - 1, SpdyStreamStatus.PROTOCOL_ERROR);
Assert.assertNull(sessionHandler.peek());
sessionHandler.writeInbound(spdySynStreamFrame);
assertRstStream(sessionHandler.readInbound(), localStreamID - 1, SpdyStreamStatus.PROTOCOL_ERROR);
Assert.assertNull(sessionHandler.readInbound());
spdySynStreamFrame.setStreamID(localStreamID);
// Check if session handler correctly limits the number of
// concurrent streams in the SETTINGS frame
SpdySettingsFrame spdySettingsFrame = new DefaultSpdySettingsFrame();
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 2);
sessionHandler.offer(spdySettingsFrame);
Assert.assertNull(sessionHandler.peek());
sessionHandler.offer(spdySynStreamFrame);
assertRstStream(sessionHandler.poll(), localStreamID, SpdyStreamStatus.REFUSED_STREAM);
Assert.assertNull(sessionHandler.peek());
sessionHandler.writeInbound(spdySettingsFrame);
Assert.assertNull(sessionHandler.readInbound());
sessionHandler.writeInbound(spdySynStreamFrame);
assertRstStream(sessionHandler.readInbound(), localStreamID, SpdyStreamStatus.REFUSED_STREAM);
Assert.assertNull(sessionHandler.readInbound());
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 4);
sessionHandler.offer(spdySettingsFrame);
Assert.assertNull(sessionHandler.peek());
sessionHandler.offer(spdySynStreamFrame);
assertSynReply(sessionHandler.poll(), localStreamID, false, spdySynStreamFrame);
Assert.assertNull(sessionHandler.peek());
sessionHandler.writeInbound(spdySettingsFrame);
Assert.assertNull(sessionHandler.readInbound());
sessionHandler.writeInbound(spdySynStreamFrame);
assertSynReply(sessionHandler.readInbound(), localStreamID, false, spdySynStreamFrame);
Assert.assertNull(sessionHandler.readInbound());
// Check if session handler rejects HEADERS for closed streams
int testStreamID = spdyDataFrame.getStreamID();
sessionHandler.offer(spdyDataFrame);
assertDataFrame(sessionHandler.poll(), testStreamID, spdyDataFrame.isLast());
Assert.assertNull(sessionHandler.peek());
sessionHandler.writeInbound(spdyDataFrame);
assertDataFrame(sessionHandler.readInbound(), testStreamID, spdyDataFrame.isLast());
Assert.assertNull(sessionHandler.readInbound());
spdyHeadersFrame.setStreamID(testStreamID);
sessionHandler.offer(spdyHeadersFrame);
assertRstStream(sessionHandler.poll(), testStreamID, SpdyStreamStatus.INVALID_STREAM);
Assert.assertNull(sessionHandler.peek());
sessionHandler.writeInbound(spdyHeadersFrame);
assertRstStream(sessionHandler.readInbound(), testStreamID, SpdyStreamStatus.INVALID_STREAM);
Assert.assertNull(sessionHandler.readInbound());
// Check if session handler returns PROTOCOL_ERROR if it receives
// an invalid HEADERS frame
spdyHeadersFrame.setStreamID(localStreamID);
spdyHeadersFrame.setInvalid();
sessionHandler.offer(spdyHeadersFrame);
assertRstStream(sessionHandler.poll(), localStreamID, SpdyStreamStatus.PROTOCOL_ERROR);
Assert.assertNull(sessionHandler.peek());
sessionHandler.writeInbound(spdyHeadersFrame);
assertRstStream(sessionHandler.readInbound(), localStreamID, SpdyStreamStatus.PROTOCOL_ERROR);
Assert.assertNull(sessionHandler.readInbound());
// Check if session handler returns identical local PINGs
sessionHandler.offer(localPingFrame);
assertPing(sessionHandler.poll(), localPingFrame.getID());
Assert.assertNull(sessionHandler.peek());
sessionHandler.writeInbound(localPingFrame);
assertPing(sessionHandler.readInbound(), localPingFrame.getID());
Assert.assertNull(sessionHandler.readInbound());
// Check if session handler ignores un-initiated remote PINGs
sessionHandler.offer(remotePingFrame);
Assert.assertNull(sessionHandler.peek());
sessionHandler.writeInbound(remotePingFrame);
Assert.assertNull(sessionHandler.readInbound());
// Check if session handler sends a GOAWAY frame when closing
sessionHandler.offer(closeMessage);
assertGoAway(sessionHandler.poll(), localStreamID);
Assert.assertNull(sessionHandler.peek());
sessionHandler.writeInbound(closeMessage);
assertGoAway(sessionHandler.readInbound(), localStreamID);
Assert.assertNull(sessionHandler.readInbound());
localStreamID += 2;
// Check if session handler returns REFUSED_STREAM if it receives
// SYN_STREAM frames after sending a GOAWAY frame
spdySynStreamFrame.setStreamID(localStreamID);
sessionHandler.offer(spdySynStreamFrame);
assertRstStream(sessionHandler.poll(), localStreamID, SpdyStreamStatus.REFUSED_STREAM);
Assert.assertNull(sessionHandler.peek());
sessionHandler.writeInbound(spdySynStreamFrame);
assertRstStream(sessionHandler.readInbound(), localStreamID, SpdyStreamStatus.REFUSED_STREAM);
Assert.assertNull(sessionHandler.readInbound());
// Check if session handler ignores Data frames after sending
// a GOAWAY frame
spdyDataFrame.setStreamID(localStreamID);
sessionHandler.offer(spdyDataFrame);
Assert.assertNull(sessionHandler.peek());
sessionHandler.writeInbound(spdyDataFrame);
Assert.assertNull(sessionHandler.readInbound());
sessionHandler.finish();
}

View File

@ -20,6 +20,7 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.example.securechat.SecureChatSslContextFactory;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
@ -52,8 +53,7 @@ public class HttpSnoopClientInitializer extends ChannelInitializer<SocketChannel
p.addLast("codec", new HttpClientCodec());
// Remove the following line if you don't want automatic content decompression.
// FIXME: Port HttpContentDecompressor to the new API
//p.addLast("inflater", new HttpContentDecompressor());
p.addLast("inflater", new HttpContentDecompressor());
// Uncomment the following line if you don't want to handle HttpChunks.
//pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));

View File

@ -18,6 +18,7 @@ package io.netty.example.http.snoop;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
@ -37,8 +38,7 @@ public class HttpSnoopServerInitializer extends ChannelInitializer<SocketChannel
//pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
p.addLast("encoder", new HttpResponseEncoder());
// Remove the following line if you don't want automatic content compression.
// FIXME: Port HttpContentCompressor to the new API
//p.addLast("deflater", new HttpContentCompressor());
p.addLast("deflater", new HttpContentCompressor());
p.addLast("handler", new HttpSnoopServerHandler());
}
}

View File

@ -24,7 +24,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.handler.codec.embedder.EncoderEmbedder;
import io.netty.channel.embedded.EmbeddedMessageChannel;
import io.netty.util.CharsetUtil;
import java.io.ByteArrayInputStream;
@ -98,39 +98,43 @@ public class ChunkedWriteHandlerTest {
check(new ChunkedNioFile(TMP), new ChunkedNioFile(TMP), new ChunkedNioFile(TMP));
}
// Test case which shows that there is not a bug like stated here:
// http://stackoverflow.com/questions/10409241/why-is-close-channelfuturelistener-not-notified/10426305#comment14126161_10426305
@Test
public void testListenerNotifiedWhenIsEnd() {
ChannelBuffer buffer = ChannelBuffers.copiedBuffer("Test", CharsetUtil.ISO_8859_1);
ChunkedInput input = new ChunkedInput() {
private boolean done;
private ChannelBuffer buffer = ChannelBuffers.copiedBuffer("Test", CharsetUtil.ISO_8859_1);
private final ChannelBuffer buffer = ChannelBuffers.copiedBuffer("Test", CharsetUtil.ISO_8859_1);
@Override
public Object nextChunk() throws Exception {
done = true;
return buffer.duplicate();
}
@Override
public boolean isEndOfInput() throws Exception {
return done;
}
@Override
public void close() throws Exception {
// NOOP
}
};
final AtomicBoolean listenerNotified = new AtomicBoolean(false);
final ChannelFutureListener listener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
listenerNotified.set(true);
}
};
ChannelOutboundHandlerAdapter<ChannelBuffer> testHandler = new ChannelOutboundHandlerAdapter<ChannelBuffer>() {
@Override
@ -141,48 +145,39 @@ public class ChunkedWriteHandlerTest {
@Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
super.flush(ctx, future);
future.setSuccess();
}
};
EncoderEmbedder<ChannelBuffer> handler = new EncoderEmbedder<ChannelBuffer>(new ChunkedWriteHandler(), testHandler) {
@Override
public boolean offer(Object input) {
ChannelFuture future = channel().write(input);
future.addListener(listener);
future.awaitUninterruptibly();
return !isEmpty();
}
};
assertTrue(handler.offer(input));
assertTrue(handler.finish());
EmbeddedMessageChannel ch = new EmbeddedMessageChannel(new ChunkedWriteHandler(), testHandler);
ch.outboundMessageBuffer().add(input);
ch.flush().addListener(listener).syncUninterruptibly();
ch.checkException();
ch.finish();
// the listener should have been notified
assertTrue(listenerNotified.get());
assertEquals(buffer, handler.poll());
assertNull(handler.poll());
assertEquals(buffer, ch.readOutbound());
assertNull(ch.readOutbound());
}
private static void check(ChunkedInput... inputs) {
EncoderEmbedder<ChannelBuffer> embedder =
new EncoderEmbedder<ChannelBuffer>(new ChunkedWriteHandler());
EmbeddedMessageChannel ch = new EmbeddedMessageChannel(new ChunkedWriteHandler());
for (ChunkedInput input: inputs) {
embedder.offer(input);
ch.writeOutbound(input);
}
Assert.assertTrue(embedder.finish());
Assert.assertTrue(ch.finish());
int i = 0;
int read = 0;
for (;;) {
ChannelBuffer buffer = embedder.poll();
ChannelBuffer buffer = (ChannelBuffer) ch.readOutbound();
if (buffer == null) {
break;
}

View File

@ -102,11 +102,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
* @param parent
* the parent of this channel. {@code null} if there's no parent.
*/
protected AbstractChannel(Channel parent, Integer id, ChannelBufferHolder<?> outboundBuffer) {
if (outboundBuffer == null) {
throw new NullPointerException("outboundBuffer");
}
protected AbstractChannel(Channel parent, Integer id) {
if (id == null) {
id = allocateId(this);
} else {

View File

@ -36,7 +36,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
* Creates a new instance.
*/
protected AbstractServerChannel(Integer id) {
super(null, id, ChannelBufferHolders.discardMessageBuffer());
super(null, id);
}
@Override

View File

@ -1628,7 +1628,11 @@ public class DefaultChannelPipeline implements ChannelPipeline {
case STREAM:
return ChannelBufferHolders.byteBuffer();
case MESSAGE:
return ChannelBufferHolders.messageBuffer();
if (channel instanceof ServerChannel) {
return ChannelBufferHolders.discardMessageBuffer();
} else {
return ChannelBufferHolders.messageBuffer();
}
default:
throw new Error();
}

View File

@ -51,8 +51,8 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel {
private Throwable lastException;
private int state; // 0 = OPEN, 1 = ACTIVE, 2 = CLOSED
AbstractEmbeddedChannel(ChannelBufferHolder<?> outboundBuffer, ChannelHandler... handlers) {
super(null, null, outboundBuffer);
AbstractEmbeddedChannel(ChannelHandler... handlers) {
super(null, null);
if (handlers == null) {
throw new NullPointerException("handlers");

View File

@ -1,6 +1,5 @@
package io.netty.channel.embedded;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelType;
@ -12,7 +11,7 @@ public class EmbeddedMessageChannel extends AbstractEmbeddedChannel {
private final Queue<Object> lastOutboundBuffer = new ArrayDeque<Object>();
public EmbeddedMessageChannel(ChannelHandler... handlers) {
super(ChannelBufferHolders.messageBuffer(), handlers);
super(handlers);
}
@Override

View File

@ -2,7 +2,6 @@ package io.netty.channel.embedded;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelType;
@ -11,7 +10,7 @@ public class EmbeddedStreamChannel extends AbstractEmbeddedChannel {
private final ChannelBuffer lastOutboundBuffer = ChannelBuffers.dynamicBuffer();
public EmbeddedStreamChannel(ChannelHandler... handlers) {
super(ChannelBufferHolders.messageBuffer(), handlers);
super(handlers);
}
@Override

View File

@ -17,7 +17,6 @@ package io.netty.channel.local;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
@ -57,11 +56,11 @@ public class LocalChannel extends AbstractChannel {
}
public LocalChannel(Integer id) {
super(null, id, ChannelBufferHolders.messageBuffer());
super(null, id);
}
LocalChannel(LocalServerChannel parent, LocalChannel peer) {
super(parent, null, ChannelBufferHolders.messageBuffer());
super(parent, null);
this.peer = peer;
localAddress = parent.localAddress();
remoteAddress = peer.localAddress();

View File

@ -17,7 +17,6 @@ package io.netty.channel.socket.nio;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop;
@ -51,9 +50,8 @@ public abstract class AbstractNioChannel extends AbstractChannel {
private ConnectException connectTimeoutException;
protected AbstractNioChannel(
Channel parent, Integer id, ChannelBufferHolder<?> outboundBuffer,
SelectableChannel ch, int defaultInterestOps) {
super(parent, id, outboundBuffer);
Channel parent, Integer id, SelectableChannel ch, int defaultInterestOps) {
super(parent, id);
this.ch = ch;
this.defaultInterestOps = defaultInterestOps;
try {

View File

@ -16,7 +16,6 @@
package io.netty.channel.socket.nio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelType;
@ -27,9 +26,8 @@ import java.util.Queue;
abstract class AbstractNioMessageChannel extends AbstractNioChannel {
protected AbstractNioMessageChannel(
Channel parent, Integer id, ChannelBufferHolder<?> outboundBuffer,
SelectableChannel ch, int defaultInterestOps) {
super(parent, id, outboundBuffer, ch, defaultInterestOps);
Channel parent, Integer id, SelectableChannel ch, int defaultInterestOps) {
super(parent, id, ch, defaultInterestOps);
}
@Override

View File

@ -17,7 +17,6 @@ package io.netty.channel.socket.nio;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelType;
@ -29,7 +28,7 @@ abstract class AbstractNioStreamChannel extends AbstractNioChannel {
protected AbstractNioStreamChannel(
Channel parent, Integer id, SelectableChannel ch) {
super(parent, id, ChannelBufferHolders.byteBuffer(), ch, SelectionKey.OP_READ);
super(parent, id, ch, SelectionKey.OP_READ);
}
@Override

View File

@ -17,7 +17,6 @@ package io.netty.channel.socket.nio;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.DatagramChannelConfig;
@ -88,7 +87,7 @@ public final class NioDatagramChannel extends AbstractNioMessageChannel implemen
}
public NioDatagramChannel(Integer id, DatagramChannel socket) {
super(null, id, ChannelBufferHolders.messageBuffer(), socket, SelectionKey.OP_READ);
super(null, id, socket, SelectionKey.OP_READ);
config = new NioDatagramChannelConfig(socket);
}

View File

@ -15,7 +15,6 @@
*/
package io.netty.channel.socket.nio;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
import io.netty.channel.socket.DefaultServerSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannelConfig;
@ -42,8 +41,7 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel
private final ServerSocketChannelConfig config;
public NioServerSocketChannel() {
super(null, null, ChannelBufferHolders.discardMessageBuffer(),
newSocket(), SelectionKey.OP_ACCEPT);
super(null, null, newSocket(), SelectionKey.OP_ACCEPT);
config = new DefaultServerSocketChannelConfig(javaChannel().socket());
}

View File

@ -17,7 +17,6 @@ package io.netty.channel.socket.oio;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop;
@ -26,8 +25,8 @@ import java.net.SocketAddress;
abstract class AbstractOioChannel extends AbstractChannel {
protected AbstractOioChannel(Channel parent, Integer id, ChannelBufferHolder<?> outboundBuffer) {
super(parent, id, outboundBuffer);
protected AbstractOioChannel(Channel parent, Integer id) {
super(parent, id);
}
@Override

View File

@ -16,7 +16,6 @@
package io.netty.channel.socket.oio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelType;
@ -25,9 +24,8 @@ import java.util.Queue;
abstract class AbstractOioMessageChannel extends AbstractOioChannel {
protected AbstractOioMessageChannel(
Channel parent, Integer id, ChannelBufferHolder<?> outboundBuffer) {
super(parent, id, outboundBuffer);
protected AbstractOioMessageChannel(Channel parent, Integer id) {
super(parent, id);
}
@Override

View File

@ -17,7 +17,6 @@ package io.netty.channel.socket.oio;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelType;
@ -26,7 +25,7 @@ import java.io.IOException;
abstract class AbstractOioStreamChannel extends AbstractOioChannel {
protected AbstractOioStreamChannel(Channel parent, Integer id) {
super(parent, id, ChannelBufferHolders.byteBuffer());
super(parent, id);
}
@Override

View File

@ -17,7 +17,6 @@ package io.netty.channel.socket.oio;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.DatagramChannel;
@ -66,7 +65,7 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
}
public OioDatagramChannel(Integer id, MulticastSocket socket) {
super(null, id, ChannelBufferHolders.messageBuffer());
super(null, id);
boolean success = false;
try {

View File

@ -15,7 +15,6 @@
*/
package io.netty.channel.socket.oio;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
import io.netty.channel.socket.DefaultServerSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannel;
@ -60,7 +59,7 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel
}
public OioServerSocketChannel(Integer id, ServerSocket socket) {
super(null, id, ChannelBufferHolders.discardMessageBuffer());
super(null, id);
if (socket == null) {
throw new NullPointerException("socket");
}