diff --git a/all/pom.xml b/all/pom.xml index a85be0c8d4..1f553f0372 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -20,7 +20,7 @@ io.netty netty-parent - 4.0.0.Alpha5-SNAPSHOT + 4.0.0.Alpha6-SNAPSHOT netty @@ -117,7 +117,6 @@ maven-clean-plugin - 2.5 clean-first @@ -130,7 +129,6 @@ maven-dependency-plugin - 2.4 unpack-sources @@ -162,7 +160,6 @@ org.codehaus.mojo build-helper-maven-plugin - 1.7 add-source @@ -203,7 +200,6 @@ maven-resources-plugin - 2.5 default-resources @@ -239,7 +235,6 @@ maven-jxr-plugin - 2.2 generate-xref @@ -259,6 +254,13 @@ Netty Source Xref (${project.version}) Netty Source Xref (${project.version}) + + + ${project.groupId} + netty-build + 10 + + maven-javadoc-plugin diff --git a/buffer/pom.xml b/buffer/pom.xml index 17574e422b..f661965a66 100644 --- a/buffer/pom.xml +++ b/buffer/pom.xml @@ -20,7 +20,7 @@ io.netty netty-parent - 4.0.0.Alpha5-SNAPSHOT + 4.0.0.Alpha6-SNAPSHOT netty-buffer diff --git a/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java b/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java index 96c8036c55..860925f506 100644 --- a/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java @@ -578,14 +578,19 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit @Override public void getBytes(int index, byte[] dst, int dstIndex, int length) { - int componentId = toComponentIndex(index); if (index > capacity() - length || dstIndex > dst.length - length) { throw new IndexOutOfBoundsException("Too many bytes to read - Needs " + (index + length) + ", maximum is " + capacity() + " or " + dst.length); } + if (index < 0) { + throw new IndexOutOfBoundsException("index must be >= 0"); + } + if (length == 0) { + return; + } + int i = toComponentIndex(index); - int i = componentId; while (length > 0) { Component c = components.get(i); ByteBuf s = c.buf; @@ -601,15 +606,20 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit @Override public void getBytes(int index, ByteBuffer dst) { - int componentId = toComponentIndex(index); int limit = dst.limit(); int length = dst.remaining(); + if (index > capacity() - length) { throw new IndexOutOfBoundsException("Too many bytes to be read - Needs " + (index + length) + ", maximum is " + capacity()); } - - int i = componentId; + if (index < 0) { + throw new IndexOutOfBoundsException("index must be >= 0"); + } + if (length == 0) { + return; + } + int i = toComponentIndex(index); try { while (length > 0) { Component c = components.get(i); @@ -629,14 +639,18 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit @Override public void getBytes(int index, ByteBuf dst, int dstIndex, int length) { - int componentId = toComponentIndex(index); if (index > capacity() - length || dstIndex > dst.capacity() - length) { throw new IndexOutOfBoundsException("Too many bytes to be read - Needs " + (index + length) + " or " + (dstIndex + length) + ", maximum is " + capacity() + " or " + dst.capacity()); } - - int i = componentId; + if (index < 0) { + throw new IndexOutOfBoundsException("index must be >= 0"); + } + if (length == 0) { + return; + } + int i = toComponentIndex(index); while (length > 0) { Component c = components.get(i); ByteBuf s = c.buf; @@ -670,13 +684,18 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit @Override public void getBytes(int index, OutputStream out, int length) throws IOException { - int componentId = toComponentIndex(index); if (index > capacity() - length) { throw new IndexOutOfBoundsException("Too many bytes to be read - needs " + (index + length) + ", maximum of " + capacity()); } + if (index < 0) { + throw new IndexOutOfBoundsException("index must be >= 0"); + } + if (length == 0) { + return; + } - int i = componentId; + int i = toComponentIndex(index); while (length > 0) { Component c = components.get(i); ByteBuf s = c.buf; @@ -1031,11 +1050,17 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit @Override public ByteBuffer[] nioBuffers(int index, int length) { - int componentId = toComponentIndex(index); if (index + length > capacity()) { throw new IndexOutOfBoundsException("Too many bytes to convert - Needs" + (index + length) + ", maximum is " + capacity()); } + if (index < 0) { + throw new IndexOutOfBoundsException("index must be >= 0"); + } + if (length == 0) { + return new ByteBuffer[0]; + } + int componentId = toComponentIndex(index); List buffers = new ArrayList(components.size()); diff --git a/buffer/src/test/java/io/netty/buffer/AbstractCompositeChannelBufferTest.java b/buffer/src/test/java/io/netty/buffer/AbstractCompositeChannelBufferTest.java index 2900336c25..b73513e358 100644 --- a/buffer/src/test/java/io/netty/buffer/AbstractCompositeChannelBufferTest.java +++ b/buffer/src/test/java/io/netty/buffer/AbstractCompositeChannelBufferTest.java @@ -410,4 +410,11 @@ public abstract class AbstractCompositeChannelBufferTest extends wrappedBuffer(new byte[] { 0, 1, 2, 3, 4, 6, 7, 8, 5, 9, 10, 11 }, 6, 5).order(order)); assertFalse(ByteBufUtil.equals(a, b)); } + + @Test + public void testEmptyBuffer() { + ByteBuf b = wrappedBuffer(new byte[] {1, 2}, new byte[] {3, 4}); + b.readBytes(new byte[4]); + b.readBytes(new byte[0]); + } } diff --git a/codec-http/pom.xml b/codec-http/pom.xml index 7c369d4d0d..8e9d245872 100644 --- a/codec-http/pom.xml +++ b/codec-http/pom.xml @@ -20,7 +20,7 @@ io.netty netty-parent - 4.0.0.Alpha5-SNAPSHOT + 4.0.0.Alpha6-SNAPSHOT netty-codec-http diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpChunk.java b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpChunk.java index 96705bf4ac..329c203211 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpChunk.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpChunk.java @@ -20,7 +20,7 @@ import io.netty.buffer.ByteBuf; /** * The default {@link HttpChunk} implementation. */ -public class DefaultHttpChunk implements HttpChunk { +public class DefaultHttpChunk extends DefaultHttpObject implements HttpChunk { private ByteBuf content; private boolean last; @@ -51,4 +51,24 @@ public class DefaultHttpChunk implements HttpChunk { public boolean isLast() { return last; } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append(getClass().getSimpleName()); + + final boolean last = isLast(); + buf.append("(last: "); + buf.append(last); + if (!last) { + buf.append(", size: "); + buf.append(getContent().readableBytes()); + } + + buf.append(", decodeResult: "); + buf.append(getDecoderResult()); + buf.append(')'); + + return buf.toString(); + } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpChunkTrailer.java b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpChunkTrailer.java index 6cb31aa0bb..29b4cdfced 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpChunkTrailer.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpChunkTrailer.java @@ -17,6 +17,7 @@ package io.netty.handler.codec.http; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.util.internal.StringUtil; import java.util.List; import java.util.Map; @@ -25,7 +26,7 @@ import java.util.Set; /** * The default {@link HttpChunkTrailer} implementation. */ -public class DefaultHttpChunkTrailer implements HttpChunkTrailer { +public class DefaultHttpChunkTrailer extends DefaultHttpObject implements HttpChunkTrailer { private final HttpHeaders headers = new HttpHeaders() { @Override @@ -104,4 +105,37 @@ public class DefaultHttpChunkTrailer implements HttpChunkTrailer { public void setContent(ByteBuf content) { throw new IllegalStateException("read-only"); } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append(getClass().getSimpleName()); + + final boolean last = isLast(); + buf.append("(last: "); + buf.append(last); + if (!last) { + buf.append(", size: "); + buf.append(getContent().readableBytes()); + } + + buf.append(", decodeResult: "); + buf.append(getDecoderResult()); + buf.append(')'); + buf.append(StringUtil.NEWLINE); + appendHeaders(buf); + + // Remove the last newline. + buf.setLength(buf.length() - StringUtil.NEWLINE.length()); + return buf.toString(); + } + + private void appendHeaders(StringBuilder buf) { + for (Map.Entry e: getHeaders()) { + buf.append(e.getKey()); + buf.append(": "); + buf.append(e.getValue()); + buf.append(StringUtil.NEWLINE); + } + } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpMessage.java b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpMessage.java index a0697aa12f..09d79e0a3a 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpMessage.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpMessage.java @@ -26,7 +26,7 @@ import java.util.Set; /** * The default {@link HttpMessage} implementation. */ -public class DefaultHttpMessage implements HttpMessage { +public class DefaultHttpMessage extends DefaultHttpObject implements HttpMessage { private final HttpHeaders headers = new HttpHeaders(); private HttpVersion version; diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpObject.java b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpObject.java new file mode 100644 index 0000000000..232eeecea1 --- /dev/null +++ b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpObject.java @@ -0,0 +1,40 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.http; + +import io.netty.handler.codec.DecoderResult; + +public class DefaultHttpObject implements HttpObject { + + private DecoderResult decodeResult = DecoderResult.SUCCESS; + + protected DefaultHttpObject() { + // Disallow direct instantiation + } + + @Override + public DecoderResult getDecoderResult() { + return decodeResult; + } + + @Override + public void setDecoderResult(DecoderResult result) { + if (result == null) { + throw new NullPointerException("result"); + } + decodeResult = result; + } +} diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpRequest.java b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpRequest.java index afadec21bd..134be89901 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpRequest.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpRequest.java @@ -70,6 +70,8 @@ public class DefaultHttpRequest extends DefaultHttpMessage implements HttpReques buf.append(getClass().getSimpleName()); buf.append("(transferEncoding: "); buf.append(getTransferEncoding()); + buf.append(", decodeResult: "); + buf.append(getDecoderResult()); buf.append(')'); buf.append(StringUtil.NEWLINE); buf.append(getMethod().toString()); diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpResponse.java b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpResponse.java index 440b34fed0..9872868ae1 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpResponse.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpResponse.java @@ -54,6 +54,8 @@ public class DefaultHttpResponse extends DefaultHttpMessage implements HttpRespo buf.append(getClass().getSimpleName()); buf.append("(transferEncoding: "); buf.append(getTransferEncoding()); + buf.append(", decodeResult: "); + buf.append(getDecoderResult()); buf.append(')'); buf.append(StringUtil.NEWLINE); buf.append(getProtocolVersion().getText()); diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunk.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunk.java index 4157fc91ec..dbf5e3917b 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunk.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunk.java @@ -18,6 +18,7 @@ package io.netty.handler.codec.http; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.DecoderResult; import java.util.Collections; import java.util.List; @@ -33,7 +34,7 @@ import java.util.Set; * {@link ChannelPipeline}. * @apiviz.landmark */ -public interface HttpChunk { +public interface HttpChunk extends HttpObject { /** * The 'end of content' marker in chunked encoding. @@ -103,6 +104,16 @@ public interface HttpChunk { public void setHeader(String name, Iterable values) { throw new IllegalStateException("read-only"); } + + @Override + public DecoderResult getDecoderResult() { + return DecoderResult.SUCCESS; + } + + @Override + public void setDecoderResult(DecoderResult result) { + throw new IllegalStateException("read-only"); + } }; /** diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkAggregator.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkAggregator.java index e175ffcdb9..f32963f308 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkAggregator.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkAggregator.java @@ -22,6 +22,7 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.DecoderResult; import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.handler.codec.TooLongFrameException; import io.netty.util.CharsetUtil; @@ -46,7 +47,7 @@ import java.util.Map.Entry; * @apiviz.landmark * @apiviz.has io.netty.handler.codec.http.HttpChunk oneway - - filters out */ -public class HttpChunkAggregator extends MessageToMessageDecoder { +public class HttpChunkAggregator extends MessageToMessageDecoder { public static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024; private static final ByteBuf CONTINUE = Unpooled.copiedBuffer( "HTTP/1.1 100 Continue\r\n\r\n", CharsetUtil.US_ASCII); @@ -66,6 +67,8 @@ public class HttpChunkAggregator extends MessageToMessageDecoder[] { HttpMessage.class }, + new Class[] { HttpObject.class }); } @Override @@ -74,11 +72,6 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec { * Creates a new instance. */ protected HttpMessageEncoder() { - } - - @Override - public boolean isEncodable(Object msg) throws Exception { - return msg instanceof HttpMessage || msg instanceof HttpChunk; + super(HttpObject.class); } @Override diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpObject.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpObject.java new file mode 100644 index 0000000000..5261825564 --- /dev/null +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpObject.java @@ -0,0 +1,23 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.http; + +import io.netty.handler.codec.DecoderResult; + +public interface HttpObject { + DecoderResult getDecoderResult(); + void setDecoderResult(DecoderResult result); +} diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpRequestDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpRequestDecoder.java index 650e34dfcc..8f8c5962ab 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpRequestDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpRequestDecoder.java @@ -76,6 +76,11 @@ public class HttpRequestDecoder extends HttpMessageDecoder { HttpVersion.valueOf(initialLine[2]), HttpMethod.valueOf(initialLine[0]), initialLine[1]); } + @Override + protected HttpMessage createInvalidMessage() { + return new DefaultHttpRequest(HttpVersion.HTTP_1_0, HttpMethod.GET, "/bad-request"); + } + @Override protected boolean isDecodingRequest() { return true; diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpResponseDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpResponseDecoder.java index 9393217119..bf78b0e1bb 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpResponseDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpResponseDecoder.java @@ -83,6 +83,8 @@ import io.netty.handler.codec.TooLongFrameException; */ public class HttpResponseDecoder extends HttpMessageDecoder { + private static final HttpResponseStatus UNKNOWN_STATUS = new HttpResponseStatus(999, "Unknown"); + /** * Creates a new instance with the default * {@code maxInitialLineLength (4096}}, {@code maxHeaderSize (8192)}, and @@ -106,6 +108,11 @@ public class HttpResponseDecoder extends HttpMessageDecoder { new HttpResponseStatus(Integer.valueOf(initialLine[1]), initialLine[2])); } + @Override + protected HttpMessage createInvalidMessage() { + return new DefaultHttpResponse(HttpVersion.HTTP_1_0, UNKNOWN_STATUS); + } + @Override protected boolean isDecodingRequest() { return false; diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameEncoder.java index 81d5cd1686..431d40c372 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameEncoder.java @@ -32,9 +32,8 @@ import io.netty.handler.codec.MessageToByteEncoder; @Sharable public class WebSocket00FrameEncoder extends MessageToByteEncoder { - @Override - public boolean isEncodable(Object msg) throws Exception { - return msg instanceof WebSocketFrame; + public WebSocket00FrameEncoder() { + super(WebSocketFrame.class); } @Override diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameEncoder.java index 1d77385229..937035b447 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameEncoder.java @@ -90,17 +90,14 @@ public class WebSocket08FrameEncoder extends MessageToByteEncoder { * Creates a new instance with the specified parameters. */ public SpdyFrameEncoder(int version, int compressionLevel, int windowBits, int memLevel) { + super(SpdyDataFrame.class, SpdyControlFrame.class); + if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) { throw new IllegalArgumentException( "unknown version: " + version); @@ -74,11 +76,6 @@ public class SpdyFrameEncoder extends MessageToByteEncoder { }); } - @Override - public boolean isEncodable(Object msg) throws Exception { - return msg instanceof SpdyDataFrame || msg instanceof SpdyControlFrame; - } - @Override public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { if (msg instanceof SpdyDataFrame) { diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpDecoder.java index be61fc7ee1..d72aaf6cce 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpDecoder.java @@ -52,6 +52,8 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder SpdyConstants.SPDY_MAX_VERSION) { throw new IllegalArgumentException( "unsupported version: " + version); diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpEncoder.java index c413c09cbf..afa4cf1c1c 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpEncoder.java @@ -22,6 +22,7 @@ import io.netty.handler.codec.http.HttpChunk; import io.netty.handler.codec.http.HttpChunkTrailer; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMessage; +import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponse; @@ -129,6 +130,8 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder { * @param version the protocol version */ public SpdyHttpEncoder(int version) { + super(HttpObject.class); + if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) { throw new IllegalArgumentException( "unsupported version: " + version); @@ -136,13 +139,6 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder { spdyVersion = version; } - @Override - public boolean isEncodable(Object msg) throws Exception { - return msg instanceof HttpRequest || - msg instanceof HttpResponse || - msg instanceof HttpChunk; - } - @Override public Object encode(ChannelHandlerContext ctx, Object msg) throws Exception { diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java index 145c987601..57d8d969f2 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java @@ -103,19 +103,7 @@ public class SpdySessionHandler break; } - if (msg instanceof SpdyDataFrame || - msg instanceof SpdySynStreamFrame || - msg instanceof SpdySynReplyFrame || - msg instanceof SpdyRstStreamFrame || - msg instanceof SpdySettingsFrame || - msg instanceof SpdyPingFrame || - msg instanceof SpdyGoAwayFrame || - msg instanceof SpdyHeadersFrame || - msg instanceof SpdyWindowUpdateFrame) { - handleInboundMessage(ctx, msg); - } else { - ctx.nextInboundMessageBuffer().add(msg); - } + handleInboundMessage(ctx, msg); } ctx.fireInboundBufferUpdated(); } diff --git a/codec-http/src/test/java/io/netty/handler/codec/http/HttpInvalidMessageTest.java b/codec-http/src/test/java/io/netty/handler/codec/http/HttpInvalidMessageTest.java new file mode 100644 index 0000000000..3d67783227 --- /dev/null +++ b/codec-http/src/test/java/io/netty/handler/codec/http/HttpInvalidMessageTest.java @@ -0,0 +1,117 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.http; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedByteChannel; +import io.netty.handler.codec.DecoderResult; +import io.netty.util.CharsetUtil; + +import java.util.Random; + +import org.junit.Assert; +import org.junit.Test; + +public class HttpInvalidMessageTest { + + private final Random rnd = new Random(); + + @Test + public void testRequestWithBadInitialLine() throws Exception { + EmbeddedByteChannel ch = new EmbeddedByteChannel(new HttpRequestDecoder()); + ch.writeInbound(Unpooled.copiedBuffer("GET / HTTP/1.0 with extra\r\n", CharsetUtil.UTF_8)); + HttpRequest req = (HttpRequest) ch.readInbound(); + DecoderResult dr = req.getDecoderResult(); + Assert.assertFalse(dr.isSuccess()); + Assert.assertFalse(dr.isPartialFailure()); + ensureInboundTrafficDiscarded(ch); + } + + @Test + public void testRequestWithBadHeader() throws Exception { + EmbeddedByteChannel ch = new EmbeddedByteChannel(new HttpRequestDecoder()); + ch.writeInbound(Unpooled.copiedBuffer("GET /maybe-something HTTP/1.0\r\n", CharsetUtil.UTF_8)); + ch.writeInbound(Unpooled.copiedBuffer("Good_Name: Good Value\r\n", CharsetUtil.UTF_8)); + ch.writeInbound(Unpooled.copiedBuffer("Bad=Name: Bad Value\r\n", CharsetUtil.UTF_8)); + ch.writeInbound(Unpooled.copiedBuffer("\r\n", CharsetUtil.UTF_8)); + HttpRequest req = (HttpRequest) ch.readInbound(); + DecoderResult dr = req.getDecoderResult(); + Assert.assertFalse(dr.isSuccess()); + Assert.assertTrue(dr.isPartialFailure()); + Assert.assertEquals("Good Value", req.getHeader("Good_Name")); + Assert.assertEquals("/maybe-something", req.getUri()); + ensureInboundTrafficDiscarded(ch); + } + + @Test + public void testResponseWithBadInitialLine() throws Exception { + EmbeddedByteChannel ch = new EmbeddedByteChannel(new HttpResponseDecoder()); + ch.writeInbound(Unpooled.copiedBuffer("HTTP/1.0 BAD_CODE Bad Server\r\n", CharsetUtil.UTF_8)); + HttpResponse res = (HttpResponse) ch.readInbound(); + DecoderResult dr = res.getDecoderResult(); + Assert.assertFalse(dr.isSuccess()); + Assert.assertFalse(dr.isPartialFailure()); + ensureInboundTrafficDiscarded(ch); + } + + @Test + public void testResponseWithBadHeader() throws Exception { + EmbeddedByteChannel ch = new EmbeddedByteChannel(new HttpResponseDecoder()); + ch.writeInbound(Unpooled.copiedBuffer("HTTP/1.0 200 Maybe OK\r\n", CharsetUtil.UTF_8)); + ch.writeInbound(Unpooled.copiedBuffer("Good_Name: Good Value\r\n", CharsetUtil.UTF_8)); + ch.writeInbound(Unpooled.copiedBuffer("Bad=Name: Bad Value\r\n", CharsetUtil.UTF_8)); + ch.writeInbound(Unpooled.copiedBuffer("\r\n", CharsetUtil.UTF_8)); + HttpResponse res = (HttpResponse) ch.readInbound(); + DecoderResult dr = res.getDecoderResult(); + Assert.assertFalse(dr.isSuccess()); + Assert.assertTrue(dr.isPartialFailure()); + Assert.assertEquals("Maybe OK", res.getStatus().getReasonPhrase()); + Assert.assertEquals("Good Value", res.getHeader("Good_Name")); + ensureInboundTrafficDiscarded(ch); + } + + @Test + public void testBadChunk() throws Exception { + EmbeddedByteChannel ch = new EmbeddedByteChannel(new HttpRequestDecoder()); + ch.writeInbound(Unpooled.copiedBuffer("GET / HTTP/1.0\r\n", CharsetUtil.UTF_8)); + ch.writeInbound(Unpooled.copiedBuffer("Transfer-Encoding: chunked\r\n\r\n", CharsetUtil.UTF_8)); + ch.writeInbound(Unpooled.copiedBuffer("BAD_LENGTH\r\n", CharsetUtil.UTF_8)); + + HttpRequest req = (HttpRequest) ch.readInbound(); + Assert.assertTrue(req.getDecoderResult().isSuccess()); + + HttpChunk chunk = (HttpChunk) ch.readInbound(); + DecoderResult dr = chunk.getDecoderResult(); + Assert.assertFalse(dr.isSuccess()); + Assert.assertFalse(dr.isPartialFailure()); + ensureInboundTrafficDiscarded(ch); + } + + private void ensureInboundTrafficDiscarded(EmbeddedByteChannel ch) { + // Generate a lot of random traffic to ensure that it's discarded silently. + byte[] data = new byte[1048576]; + rnd.nextBytes(data); + + ByteBuf buf = Unpooled.wrappedBuffer(data); + for (int i = 0; i < 4096; i ++) { + buf.setIndex(0, data.length); + ch.writeInbound(buf); + ch.checkException(); + Assert.assertNull(ch.readInbound()); + } + } +} diff --git a/codec/pom.xml b/codec/pom.xml index e795254a0a..a9e9074fea 100644 --- a/codec/pom.xml +++ b/codec/pom.xml @@ -20,7 +20,7 @@ io.netty netty-parent - 4.0.0.Alpha5-SNAPSHOT + 4.0.0.Alpha6-SNAPSHOT netty-codec diff --git a/codec/src/main/java/io/netty/handler/codec/CodecUtil.java b/codec/src/main/java/io/netty/handler/codec/CodecUtil.java index 43d8c76723..30966c1643 100644 --- a/codec/src/main/java/io/netty/handler/codec/CodecUtil.java +++ b/codec/src/main/java/io/netty/handler/codec/CodecUtil.java @@ -82,6 +82,41 @@ final class CodecUtil { msg.getClass().getSimpleName())); } + private static final Class[] EMPTY_TYPES = new Class[0]; + + static Class[] acceptedMessageTypes(Class[] acceptedMsgTypes) { + if (acceptedMsgTypes == null) { + return EMPTY_TYPES; + } + + int numElem = 0; + for (Class c: acceptedMsgTypes) { + if (c == null) { + break; + } + numElem ++; + } + + Class[] newAllowedMsgTypes = new Class[numElem]; + System.arraycopy(acceptedMsgTypes, 0, newAllowedMsgTypes, 0, numElem); + + return newAllowedMsgTypes; + } + + static boolean acceptMessage(Class[] acceptedMsgTypes, Object msg) { + if (acceptedMsgTypes.length == 0) { + return true; + } + + for (Class c: acceptedMsgTypes) { + if (c.isInstance(msg)) { + return true; + } + } + + return false; + } + private CodecUtil() { // Unused } diff --git a/codec/src/main/java/io/netty/handler/codec/DecoderResult.java b/codec/src/main/java/io/netty/handler/codec/DecoderResult.java new file mode 100644 index 0000000000..e621d8baf1 --- /dev/null +++ b/codec/src/main/java/io/netty/handler/codec/DecoderResult.java @@ -0,0 +1,85 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec; + +public class DecoderResult { + + public static final DecoderResult SUCCESS = new DecoderResult(false, null); + + public static DecoderResult failure(Throwable cause) { + if (cause == null) { + throw new NullPointerException("cause"); + } + return new DecoderResult(false, cause); + } + + public static DecoderResult partialFailure(Throwable cause) { + if (cause == null) { + throw new NullPointerException("cause"); + } + return new DecoderResult(true, cause); + } + + private final boolean partial; + private final Throwable cause; + + protected DecoderResult(boolean partial, Throwable cause) { + if (partial && cause == null) { + throw new IllegalArgumentException("successful result cannot be partial."); + } + + this.partial = partial; + this.cause = cause; + } + + public boolean isSuccess() { + return cause == null; + } + + public boolean isFailure() { + return cause != null; + } + + public boolean isCompleteFailure() { + return cause != null && !partial; + } + + public boolean isPartialFailure() { + return partial; + } + + public Throwable cause() { + return cause; + } + + @Override + public String toString() { + if (isSuccess()) { + return "success"; + } + + String cause = cause().toString(); + StringBuilder buf = new StringBuilder(cause.length() + 17); + if (isPartialFailure()) { + buf.append("partial_"); + } + buf.append("failure("); + buf.append(cause); + buf.append(')'); + + return buf.toString(); + } +} diff --git a/codec/src/main/java/io/netty/handler/codec/LengthFieldPrepender.java b/codec/src/main/java/io/netty/handler/codec/LengthFieldPrepender.java index 46763ea5f4..83cceab485 100644 --- a/codec/src/main/java/io/netty/handler/codec/LengthFieldPrepender.java +++ b/codec/src/main/java/io/netty/handler/codec/LengthFieldPrepender.java @@ -83,6 +83,8 @@ public class LengthFieldPrepender extends MessageToByteEncoder { */ public LengthFieldPrepender( int lengthFieldLength, boolean lengthIncludesLengthFieldLength) { + super(ByteBuf.class); + if (lengthFieldLength != 1 && lengthFieldLength != 2 && lengthFieldLength != 3 && lengthFieldLength != 4 && lengthFieldLength != 8) { @@ -95,11 +97,6 @@ public class LengthFieldPrepender extends MessageToByteEncoder { this.lengthIncludesLengthFieldLength = lengthIncludesLengthFieldLength; } - @Override - public boolean isEncodable(Object msg) throws Exception { - return msg instanceof ByteBuf; - } - @Override public void encode( ChannelHandlerContext ctx, diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToByteEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToByteEncoder.java index 567ac9f0b7..92b0536485 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToByteEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToByteEncoder.java @@ -23,6 +23,12 @@ import io.netty.channel.ChannelOutboundMessageHandlerAdapter; public abstract class MessageToByteEncoder extends ChannelOutboundMessageHandlerAdapter { + private final Class[] acceptedMsgTypes; + + protected MessageToByteEncoder(Class... acceptedMsgTypes) { + this.acceptedMsgTypes = CodecUtil.acceptedMessageTypes(acceptedMsgTypes); + } + @Override public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { MessageBuf in = ctx.outboundMessageBuffer(); @@ -61,7 +67,7 @@ public abstract class MessageToByteEncoder extends ChannelOutboundMessageHand * @param msg the message */ public boolean isEncodable(Object msg) throws Exception { - return true; + return CodecUtil.acceptMessage(acceptedMsgTypes, msg); } public abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception; diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java index 8ff57491bb..d86e6c5cfe 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java @@ -53,6 +53,19 @@ public abstract class MessageToMessageCodec[] acceptedInboundMsgTypes; + private final Class[] acceptedOutboundMsgTypes; + + protected MessageToMessageCodec() { + this(null, null); + } + + protected MessageToMessageCodec( + Class[] acceptedInboundMsgTypes, Class[] acceptedOutboundMsgTypes) { + this.acceptedInboundMsgTypes = CodecUtil.acceptedMessageTypes(acceptedInboundMsgTypes); + this.acceptedOutboundMsgTypes = CodecUtil.acceptedMessageTypes(acceptedOutboundMsgTypes); + } + @Override public MessageBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception { return decoder.newInboundBuffer(ctx); @@ -80,7 +93,7 @@ public abstract class MessageToMessageCodec extends ChannelInboundHandlerAdapter implements ChannelInboundMessageHandler { + private final Class[] acceptedMsgTypes; + + protected MessageToMessageDecoder(Class... acceptedMsgTypes) { + this.acceptedMsgTypes = CodecUtil.acceptedMessageTypes(acceptedMsgTypes); + } + @Override public MessageBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception { return Unpooled.messageBuffer(); @@ -77,7 +83,7 @@ public abstract class MessageToMessageDecoder * @param msg the message */ public boolean isDecodable(Object msg) throws Exception { - return true; + return CodecUtil.acceptMessage(acceptedMsgTypes, msg); } public abstract O decode(ChannelHandlerContext ctx, I msg) throws Exception; diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java index 3b22fc6c3d..cbc0be41ee 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java @@ -22,6 +22,12 @@ import io.netty.channel.ChannelOutboundMessageHandlerAdapter; public abstract class MessageToMessageEncoder extends ChannelOutboundMessageHandlerAdapter { + private final Class[] acceptedMsgTypes; + + protected MessageToMessageEncoder(Class... acceptedMsgTypes) { + this.acceptedMsgTypes = CodecUtil.acceptedMessageTypes(acceptedMsgTypes); + } + @Override public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { MessageBuf in = ctx.outboundMessageBuffer(); @@ -65,7 +71,7 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundMessa * @param msg the message */ public boolean isEncodable(Object msg) throws Exception { - return true; + return CodecUtil.acceptMessage(acceptedMsgTypes, msg); } public abstract O encode(ChannelHandlerContext ctx, I msg) throws Exception; diff --git a/codec/src/main/java/io/netty/handler/codec/base64/Base64Decoder.java b/codec/src/main/java/io/netty/handler/codec/base64/Base64Decoder.java index 985bcfdc12..e820483a4d 100644 --- a/codec/src/main/java/io/netty/handler/codec/base64/Base64Decoder.java +++ b/codec/src/main/java/io/netty/handler/codec/base64/Base64Decoder.java @@ -53,17 +53,14 @@ public class Base64Decoder extends MessageToMessageDecoder { } public Base64Decoder(Base64Dialect dialect) { + super(ByteBuf.class); + if (dialect == null) { throw new NullPointerException("dialect"); } this.dialect = dialect; } - @Override - public boolean isDecodable(Object msg) throws Exception { - return msg instanceof ByteBuf; - } - @Override public ByteBuf decode(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { return Base64.decode(msg, msg.readerIndex(), msg.readableBytes(), dialect); diff --git a/codec/src/main/java/io/netty/handler/codec/base64/Base64Encoder.java b/codec/src/main/java/io/netty/handler/codec/base64/Base64Encoder.java index c90790f745..8d95fc9fba 100644 --- a/codec/src/main/java/io/netty/handler/codec/base64/Base64Encoder.java +++ b/codec/src/main/java/io/netty/handler/codec/base64/Base64Encoder.java @@ -54,6 +54,8 @@ public class Base64Encoder extends MessageToMessageEncoder { } public Base64Encoder(boolean breakLines, Base64Dialect dialect) { + super(ByteBuf.class); + if (dialect == null) { throw new NullPointerException("dialect"); } @@ -62,11 +64,6 @@ public class Base64Encoder extends MessageToMessageEncoder { this.dialect = dialect; } - @Override - public boolean isEncodable(Object msg) throws Exception { - return msg instanceof ByteBuf; - } - @Override public ByteBuf encode(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { diff --git a/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayDecoder.java b/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayDecoder.java index 50f00a99b2..300dabe263 100644 --- a/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayDecoder.java @@ -49,9 +49,8 @@ import io.netty.handler.codec.MessageToMessageDecoder; */ public class ByteArrayDecoder extends MessageToMessageDecoder { - @Override - public boolean isDecodable(Object msg) throws Exception { - return msg instanceof ByteBuf; + public ByteArrayDecoder() { + super(ByteBuf.class); } @Override diff --git a/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayEncoder.java b/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayEncoder.java index 93e91567ec..87eb79112a 100644 --- a/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayEncoder.java @@ -51,14 +51,13 @@ import io.netty.handler.codec.MessageToMessageEncoder; */ public class ByteArrayEncoder extends MessageToMessageEncoder { - @Override - public MessageBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception { - return Unpooled.messageBuffer(); + public ByteArrayEncoder() { + super(byte[].class); } @Override - public boolean isEncodable(Object msg) throws Exception { - return msg instanceof byte[]; + public MessageBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception { + return Unpooled.messageBuffer(); } @Override diff --git a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufDecoder.java b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufDecoder.java index 77b940e992..3320fc432e 100644 --- a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufDecoder.java @@ -74,6 +74,8 @@ public class ProtobufDecoder extends MessageToMessageDecoder { - @Override - public boolean isEncodable(Object msg) throws Exception { - return msg instanceof MessageLite || msg instanceof MessageLite.Builder; + public ProtobufEncoder() { + super(MessageLite.class, MessageLite.Builder.class); } @Override diff --git a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrepender.java b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrepender.java index 98ecb47113..0a5c89f120 100644 --- a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrepender.java +++ b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrepender.java @@ -44,11 +44,7 @@ public class ProtobufVarint32LengthFieldPrepender extends MessageToByteEncoder { * the long term. */ public CompatibleObjectEncoder(int resetInterval) { + super(Serializable.class); + if (resetInterval < 0) { throw new IllegalArgumentException( "resetInterval: " + resetInterval); @@ -75,11 +77,6 @@ public class CompatibleObjectEncoder extends MessageToByteEncoder { return new ObjectOutputStream(out); } - @Override - public boolean isEncodable(Object msg) throws Exception { - return msg instanceof Serializable; - } - @Override public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { Attribute oosAttr = ctx.attr(OOS); diff --git a/codec/src/main/java/io/netty/handler/codec/serialization/ObjectEncoder.java b/codec/src/main/java/io/netty/handler/codec/serialization/ObjectEncoder.java index 9205635990..96e620d622 100644 --- a/codec/src/main/java/io/netty/handler/codec/serialization/ObjectEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/serialization/ObjectEncoder.java @@ -39,9 +39,8 @@ import java.io.Serializable; public class ObjectEncoder extends MessageToByteEncoder { private static final byte[] LENGTH_PLACEHOLDER = new byte[4]; - @Override - public boolean isEncodable(Object msg) throws Exception { - return msg instanceof Serializable; + public ObjectEncoder() { + super(Serializable.class); } @Override diff --git a/codec/src/main/java/io/netty/handler/codec/string/StringDecoder.java b/codec/src/main/java/io/netty/handler/codec/string/StringDecoder.java index f8528cdac5..6e955f3e12 100644 --- a/codec/src/main/java/io/netty/handler/codec/string/StringDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/string/StringDecoder.java @@ -68,17 +68,14 @@ public class StringDecoder extends MessageToMessageDecoder { * Creates a new instance with the specified character set. */ public StringDecoder(Charset charset) { + super(ByteBuf.class); + if (charset == null) { throw new NullPointerException("charset"); } this.charset = charset; } - @Override - public boolean isDecodable(Object msg) throws Exception { - return msg instanceof ByteBuf; - } - @Override public String decode(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { return msg.toString(charset); diff --git a/codec/src/main/java/io/netty/handler/codec/string/StringEncoder.java b/codec/src/main/java/io/netty/handler/codec/string/StringEncoder.java index 795c6b46b4..91faa5c029 100644 --- a/codec/src/main/java/io/netty/handler/codec/string/StringEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/string/StringEncoder.java @@ -50,7 +50,7 @@ import java.nio.charset.Charset; * @apiviz.landmark */ @Sharable -public class StringEncoder extends MessageToMessageEncoder { +public class StringEncoder extends MessageToMessageEncoder { // TODO Use CharsetEncoder instead. private final Charset charset; @@ -66,6 +66,8 @@ public class StringEncoder extends MessageToMessageEncoder { * Creates a new instance with the specified character set. */ public StringEncoder(Charset charset) { + super(CharSequence.class); + if (charset == null) { throw new NullPointerException("charset"); } @@ -73,12 +75,7 @@ public class StringEncoder extends MessageToMessageEncoder { } @Override - public boolean isEncodable(Object msg) throws Exception { - return msg instanceof String; - } - - @Override - public ByteBuf encode(ChannelHandlerContext ctx, String msg) throws Exception { + public ByteBuf encode(ChannelHandlerContext ctx, CharSequence msg) throws Exception { return Unpooled.copiedBuffer(msg, charset); } } diff --git a/common/pom.xml b/common/pom.xml index 6e641f43ef..09e8aef32f 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -20,7 +20,7 @@ io.netty netty-parent - 4.0.0.Alpha5-SNAPSHOT + 4.0.0.Alpha6-SNAPSHOT netty-common diff --git a/example/pom.xml b/example/pom.xml index aeae757e55..db66842354 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -20,7 +20,7 @@ io.netty netty-parent - 4.0.0.Alpha5-SNAPSHOT + 4.0.0.Alpha6-SNAPSHOT netty-example diff --git a/example/src/main/java/io/netty/example/factorial/NumberEncoder.java b/example/src/main/java/io/netty/example/factorial/NumberEncoder.java index 4fcd80adbc..ebe79f3e9e 100644 --- a/example/src/main/java/io/netty/example/factorial/NumberEncoder.java +++ b/example/src/main/java/io/netty/example/factorial/NumberEncoder.java @@ -28,6 +28,10 @@ import java.math.BigInteger; */ public class NumberEncoder extends MessageToByteEncoder { + public NumberEncoder() { + super(Number.class); + } + @Override public void encode( ChannelHandlerContext ctx, Number msg, ByteBuf out) throws Exception { diff --git a/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java b/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java index 22ab5a6756..75365fd74f 100644 --- a/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java +++ b/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java @@ -25,7 +25,6 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundMessageHandlerAdapter; -import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpRequest; @@ -104,12 +103,18 @@ public class HttpStaticFileServerHandler extends ChannelInboundMessageHandlerAda public void messageReceived( ChannelHandlerContext ctx, HttpRequest request) throws Exception { + if (!request.getDecoderResult().isSuccess()) { + sendError(ctx, BAD_REQUEST); + return; + } + if (request.getMethod() != GET) { sendError(ctx, METHOD_NOT_ALLOWED); return; } - final String path = sanitizeUri(request.getUri()); + final String uri = request.getUri(); + final String path = sanitizeUri(uri); if (path == null) { sendError(ctx, FORBIDDEN); return; @@ -120,6 +125,16 @@ public class HttpStaticFileServerHandler extends ChannelInboundMessageHandlerAda sendError(ctx, NOT_FOUND); return; } + + if (file.isDirectory()) { + if (uri.endsWith("/")) { + sendListing(ctx, file); + } else { + sendRedirect(ctx, uri + '/'); + } + return; + } + if (!file.isFile()) { sendError(ctx, FORBIDDEN); return; @@ -172,13 +187,7 @@ public class HttpStaticFileServerHandler extends ChannelInboundMessageHandlerAda } @Override - public void exceptionCaught( - ChannelHandlerContext ctx, Throwable cause) throws Exception { - if (cause instanceof TooLongFrameException) { - sendError(ctx, BAD_REQUEST); - return; - } - + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); if (ctx.channel().isActive()) { sendError(ctx, INTERNAL_SERVER_ERROR); @@ -197,6 +206,10 @@ public class HttpStaticFileServerHandler extends ChannelInboundMessageHandlerAda } } + if (!uri.startsWith("/")) { + return null; + } + // Convert file separators. uri = uri.replace('/', File.separatorChar); @@ -212,6 +225,55 @@ public class HttpStaticFileServerHandler extends ChannelInboundMessageHandlerAda return System.getProperty("user.dir") + File.separator + uri; } + private static void sendListing(ChannelHandlerContext ctx, File dir) { + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK); + response.setHeader(CONTENT_TYPE, "text/html; charset=UTF-8"); + + StringBuilder buf = new StringBuilder(); + + buf.append("\r\n"); + buf.append(""); + buf.append("Listing of: "); + buf.append(dir.getPath()); + buf.append("\r\n"); + + buf.append("

Listing of: "); + buf.append(dir.getPath()); + buf.append("

\r\n"); + + buf.append("
    "); + buf.append("
  • ..
  • \r\n"); + + for (File f: dir.listFiles()) { + if (f.isHidden() || !f.canRead()) { + continue; + } + + String name = f.getName(); + + buf.append("
  • "); + buf.append(name); + buf.append("
  • \r\n"); + } + + buf.append("
\r\n"); + + response.setContent(Unpooled.copiedBuffer(buf, CharsetUtil.UTF_8)); + + // Close the connection as soon as the error message is sent. + ctx.write(response).addListener(ChannelFutureListener.CLOSE); + } + + private static void sendRedirect(ChannelHandlerContext ctx, String newUri) { + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, FOUND); + response.setHeader(LOCATION, newUri); + + // Close the connection as soon as the error message is sent. + ctx.write(response).addListener(ChannelFutureListener.CLOSE); + } + private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); diff --git a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServerHandler.java b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServerHandler.java index 1adad5709d..5ba3073733 100644 --- a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServerHandler.java +++ b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServerHandler.java @@ -25,12 +25,14 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundMessageHandlerAdapter; +import io.netty.handler.codec.DecoderResult; import io.netty.handler.codec.http.Cookie; import io.netty.handler.codec.http.CookieDecoder; import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.HttpChunk; import io.netty.handler.codec.http.HttpChunkTrailer; import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.QueryStringDecoder; @@ -62,14 +64,19 @@ public class HttpSnoopServerHandler extends ChannelInboundMessageHandlerAdapter< buf.append("WELCOME TO THE WILD WILD WEB SERVER\r\n"); buf.append("===================================\r\n"); - buf.append("VERSION: " + request.getProtocolVersion() + "\r\n"); - buf.append("HOSTNAME: " + getHost(request, "unknown") + "\r\n"); - buf.append("REQUEST_URI: " + request.getUri() + "\r\n\r\n"); + buf.append("VERSION: ").append(request.getProtocolVersion()).append("\r\n"); + buf.append("HOSTNAME: ").append(getHost(request, "unknown")).append("\r\n"); + buf.append("REQUEST_URI: ").append(request.getUri()).append("\r\n\r\n"); - for (Map.Entry h: request.getHeaders()) { - buf.append("HEADER: " + h.getKey() + " = " + h.getValue() + "\r\n"); + List> headers = request.getHeaders(); + if (!headers.isEmpty()) { + for (Map.Entry h: request.getHeaders()) { + String key = h.getKey(); + String value = h.getValue(); + buf.append("HEADER: ").append(key).append(" = ").append(value).append("\r\n"); + } + buf.append("\r\n"); } - buf.append("\r\n"); QueryStringDecoder queryStringDecoder = new QueryStringDecoder(request.getUri()); Map> params = queryStringDecoder.getParameters(); @@ -78,7 +85,7 @@ public class HttpSnoopServerHandler extends ChannelInboundMessageHandlerAdapter< String key = p.getKey(); List vals = p.getValue(); for (String val : vals) { - buf.append("PARAM: " + key + " = " + val + "\r\n"); + buf.append("PARAM: ").append(key).append(" = ").append(val).append("\r\n"); } } buf.append("\r\n"); @@ -89,9 +96,12 @@ public class HttpSnoopServerHandler extends ChannelInboundMessageHandlerAdapter< } else { ByteBuf content = request.getContent(); if (content.readable()) { - buf.append("CONTENT: " + content.toString(CharsetUtil.UTF_8) + "\r\n"); + buf.append("CONTENT: "); + buf.append(content.toString(CharsetUtil.UTF_8)); + buf.append("\r\n"); } - writeResponse(ctx); + appendDecoderResult(buf, request); + writeResponse(ctx, request); } } else { HttpChunk chunk = (HttpChunk) msg; @@ -104,25 +114,46 @@ public class HttpSnoopServerHandler extends ChannelInboundMessageHandlerAdapter< buf.append("\r\n"); for (String name: trailer.getHeaderNames()) { for (String value: trailer.getHeaders(name)) { - buf.append("TRAILING HEADER: " + name + " = " + value + "\r\n"); + buf.append("TRAILING HEADER: "); + buf.append(name).append(" = ").append(value).append("\r\n"); } } buf.append("\r\n"); } - writeResponse(ctx); + appendDecoderResult(buf, chunk); + writeResponse(ctx, chunk); } else { - buf.append("CHUNK: " + chunk.getContent().toString(CharsetUtil.UTF_8) + "\r\n"); + buf.append("CHUNK: "); + buf.append(chunk.getContent().toString(CharsetUtil.UTF_8)).append("\r\n"); + appendDecoderResult(buf, chunk); } } } - private void writeResponse(ChannelHandlerContext ctx) { + private static void appendDecoderResult(StringBuilder buf, HttpObject o) { + DecoderResult result = o.getDecoderResult(); + if (result.isSuccess()) { + return; + } + + buf.append(".. WITH A "); + if (result.isPartialFailure()) { + buf.append("PARTIAL "); + } + buf.append("DECODER FAILURE: "); + buf.append(result.cause()); + buf.append("\r\n"); + } + + private void writeResponse(ChannelHandlerContext ctx, HttpObject currentObj) { // Decide whether to close the connection or not. boolean keepAlive = isKeepAlive(request); // Build the response object. - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + HttpResponse response = new DefaultHttpResponse( + HTTP_1_1, currentObj.getDecoderResult().isSuccess()? OK : BAD_REQUEST); + response.setContent(Unpooled.copiedBuffer(buf.toString(), CharsetUtil.UTF_8)); response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); diff --git a/example/src/main/java/io/netty/example/http/websocketx/autobahn/AutobahnServerHandler.java b/example/src/main/java/io/netty/example/http/websocketx/autobahn/AutobahnServerHandler.java index da6eb9e2a0..ea7e1768e2 100644 --- a/example/src/main/java/io/netty/example/http/websocketx/autobahn/AutobahnServerHandler.java +++ b/example/src/main/java/io/netty/example/http/websocketx/autobahn/AutobahnServerHandler.java @@ -59,6 +59,12 @@ public class AutobahnServerHandler extends ChannelInboundMessageHandlerAdapter io.netty netty-parent - 4.0.0.Alpha5-SNAPSHOT + 4.0.0.Alpha6-SNAPSHOT netty-handler diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index 67aa3b1675..80277ebb85 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -170,6 +170,9 @@ public class SslHandler private final Queue handshakeFutures = new ArrayDeque(); private final SSLEngineInboundCloseFuture sslCloseFuture = new SSLEngineInboundCloseFuture(); + private volatile long handshakeTimeoutMillis = 10000; + private volatile long closeNotifyTimeoutMillis = 3000; + /** * Creates a new instance. * @@ -227,6 +230,46 @@ public class SslHandler this.startTls = startTls; } + public long getHandshakeTimeoutMillis() { + return handshakeTimeoutMillis; + } + + public void setHandshakeTimeout(long handshakeTimeout, TimeUnit unit) { + if (unit == null) { + throw new NullPointerException("unit"); + } + + setHandshakeTimeoutMillis(unit.toMillis(handshakeTimeout)); + } + + public void setHandshakeTimeoutMillis(long handshakeTimeoutMillis) { + if (handshakeTimeoutMillis < 0) { + throw new IllegalArgumentException( + "handshakeTimeoutMillis: " + handshakeTimeoutMillis + " (expected: >= 0)"); + } + this.handshakeTimeoutMillis = handshakeTimeoutMillis; + } + + public long getCloseNotifyTimeoutMillis() { + return handshakeTimeoutMillis; + } + + public void setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit) { + if (unit == null) { + throw new NullPointerException("unit"); + } + + setCloseNotifyTimeoutMillis(unit.toMillis(closeNotifyTimeout)); + } + + public void setCloseNotifyTimeoutMillis(long closeNotifyTimeoutMillis) { + if (closeNotifyTimeoutMillis < 0) { + throw new IllegalArgumentException( + "closeNotifyTimeoutMillis: " + closeNotifyTimeoutMillis + " (expected: >= 0)"); + } + this.closeNotifyTimeoutMillis = closeNotifyTimeoutMillis; + } + /** * Returns the {@link SSLEngine} which is used by this handler. */ @@ -247,23 +290,32 @@ public class SslHandler public ChannelFuture handshake(final ChannelFuture future) { final ChannelHandlerContext ctx = this.ctx; - ctx.executor().schedule(new Runnable() { - @Override - public void run() { - if (future.isDone()) { - return; - } + final ScheduledFuture timeoutFuture; + if (handshakeTimeoutMillis > 0) { + timeoutFuture = ctx.executor().schedule(new Runnable() { + @Override + public void run() { + if (future.isDone()) { + return; + } + + SSLException e = new SSLException("handshake timed out"); + future.setFailure(e); + ctx.fireExceptionCaught(e); + ctx.close(); + } + }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS); + } else { + timeoutFuture = null; + } - SSLException e = new SSLException("handshake timed out"); - future.setFailure(e); - ctx.fireExceptionCaught(e); - ctx.close(); - } - }, 10, TimeUnit.SECONDS); // FIXME: Magic value ctx.executor().execute(new Runnable() { @Override public void run() { try { + if (timeoutFuture != null) { + timeoutFuture.cancel(false); + } engine.beginHandshake(); handshakeFutures.add(future); flush(ctx, ctx.newFuture()); @@ -861,7 +913,7 @@ public class SslHandler } } - private static void safeClose( + private void safeClose( final ChannelHandlerContext ctx, ChannelFuture flushFuture, final ChannelFuture closeFuture) { if (!ctx.channel().isActive()) { @@ -869,23 +921,31 @@ public class SslHandler return; } - // Force-close the connection if close_notify is not fully sent in time. - final ScheduledFuture timeoutFuture = ctx.executor().schedule(new Runnable() { - @Override - public void run() { - logger.warn( - ctx.channel() + " last lssssswrite attempt timed out." + - " Force-closing the connection."); - ctx.close(closeFuture); - } - }, 3, TimeUnit.SECONDS); // FIXME: Magic value + final ScheduledFuture timeoutFuture; + if (closeNotifyTimeoutMillis > 0) { + // Force-close the connection if close_notify is not fully sent in time. + timeoutFuture = ctx.executor().schedule(new Runnable() { + @Override + public void run() { + logger.warn( + ctx.channel() + " last lssssswrite attempt timed out." + + " Force-closing the connection."); + ctx.close(closeFuture); + } + }, closeNotifyTimeoutMillis, TimeUnit.MILLISECONDS); + } else { + timeoutFuture = null; + } + // Close the connection if close_notify is sent in time. flushFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { - timeoutFuture.cancel(false); + if (timeoutFuture != null) { + timeoutFuture.cancel(false); + } if (ctx.channel().isActive()) { ctx.close(closeFuture); } diff --git a/pom.xml b/pom.xml index 20cbf96dc6..fc90c06f27 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ io.netty netty-parent pom - 4.0.0.Alpha5-SNAPSHOT + 4.0.0.Alpha6-SNAPSHOT Netty http://netty.io/ @@ -318,7 +318,7 @@ ${project.groupId} netty-build - 9 + 10 @@ -378,10 +378,40 @@ - + + maven-clean-plugin + 2.5 + + + maven-resources-plugin + 2.5 + + + maven-jar-plugin + 2.4 + + + maven-dependency-plugin + 2.4 + + + maven-assembly-plugin + 2.3 + + + maven-jxr-plugin + 2.2 + + + org.codehaus.mojo + build-helper-maven-plugin + 1.7 + + + org.eclipse.m2e lifecycle-mapping diff --git a/tarball/pom.xml b/tarball/pom.xml index 880371f9aa..a6bf2c7ee3 100644 --- a/tarball/pom.xml +++ b/tarball/pom.xml @@ -20,7 +20,7 @@ io.netty netty-parent - 4.0.0.Alpha5-SNAPSHOT + 4.0.0.Alpha6-SNAPSHOT netty-tarball @@ -51,7 +51,6 @@ maven-clean-plugin - 2.5 clean-first @@ -87,7 +86,6 @@ maven-deploy-plugin - 2.7 true @@ -102,7 +100,6 @@ maven-dependency-plugin - 2.4 copy-jars @@ -158,7 +155,6 @@ maven-assembly-plugin - 2.3 build-tarball @@ -177,11 +173,9 @@ - - diff --git a/testsuite/pom.xml b/testsuite/pom.xml index bd7b5e5c23..76a92ed472 100644 --- a/testsuite/pom.xml +++ b/testsuite/pom.xml @@ -20,7 +20,7 @@ io.netty netty-parent - 4.0.0.Alpha5-SNAPSHOT + 4.0.0.Alpha6-SNAPSHOT netty-testsuite @@ -45,7 +45,6 @@ maven-deploy-plugin - 2.7 true diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/ServerSocketSuspendTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/ServerSocketSuspendTest.java index cfcd8db9d6..41124a5e1c 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/ServerSocketSuspendTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/ServerSocketSuspendTest.java @@ -72,6 +72,8 @@ public class ServerSocketSuspendTest extends AbstractServerSocketTest { } } + Thread.sleep(TIMEOUT / 1000000); + try { long startTime = System.nanoTime(); for (int i = 0; i < NUM_CHANNELS; i ++) { diff --git a/testsuite/src/test/java/io/netty/testsuite/util/TestUtils.java b/testsuite/src/test/java/io/netty/testsuite/util/TestUtils.java index 33c8242c01..38e2f8ac78 100644 --- a/testsuite/src/test/java/io/netty/testsuite/util/TestUtils.java +++ b/testsuite/src/test/java/io/netty/testsuite/util/TestUtils.java @@ -15,13 +15,38 @@ */ package io.netty.testsuite.util; +import io.netty.util.NetworkConstants; + import java.io.IOException; +import java.net.InetSocketAddress; import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; public class TestUtils { - private static int START_PORT = 20000; - private static int END_PORT = 30000; + private static final int START_PORT = 32768; + private static final int END_PORT = 65536; + private static final int NUM_CANDIDATES = END_PORT - START_PORT; + + private static final List PORTS = new ArrayList(); + private static Iterator PORTS_ITERATOR; + + static { + for (int i = START_PORT; i < END_PORT; i ++) { + PORTS.add(i); + } + Collections.shuffle(PORTS); + } + + private static int nextCandidatePort() { + if (PORTS_ITERATOR == null || !PORTS_ITERATOR.hasNext()) { + PORTS_ITERATOR = PORTS.iterator(); + } + return PORTS_ITERATOR.next(); + } /** * Return a free port which can be used to bind to @@ -29,19 +54,28 @@ public class TestUtils { * @return port */ public static int getFreePort() { - for(int start = START_PORT; start <= END_PORT; start++) { + for (int i = 0; i < NUM_CANDIDATES; i ++) { + int port = nextCandidatePort(); try { - ServerSocket socket = new ServerSocket(start); - socket.setReuseAddress(true); - socket.close(); - START_PORT = start + 1; - return start; + // Ensure it is possible to bind on both wildcard and loopback. + ServerSocket ss; + ss = new ServerSocket(); + ss.setReuseAddress(false); + ss.bind(new InetSocketAddress(port)); + ss.close(); + + ss = new ServerSocket(); + ss.setReuseAddress(false); + ss.bind(new InetSocketAddress(NetworkConstants.LOCALHOST, port)); + ss.close(); + + return port; } catch (IOException e) { // ignore } - } - throw new RuntimeException("Unable to find a free port...."); + + throw new RuntimeException("unable to find a free port"); } private TestUtils() { } diff --git a/transport/pom.xml b/transport/pom.xml index e04bb88f74..e5733faf00 100644 --- a/transport/pom.xml +++ b/transport/pom.xml @@ -20,7 +20,7 @@ io.netty netty-parent - 4.0.0.Alpha5-SNAPSHOT + 4.0.0.Alpha6-SNAPSHOT netty-transport diff --git a/transport/src/main/java/com/sun/nio/sctp/SctpServerChannel.java b/transport/src/main/java/com/sun/nio/sctp/SctpServerChannel.java index eaf617e5f7..2b47d1daf3 100644 --- a/transport/src/main/java/com/sun/nio/sctp/SctpServerChannel.java +++ b/transport/src/main/java/com/sun/nio/sctp/SctpServerChannel.java @@ -30,13 +30,13 @@ public abstract class SctpServerChannel extends AbstractSelectableChannel { public static SctpServerChannel open() throws IOException { return null; } - + protected SctpServerChannel(SelectorProvider provider) { super(provider); } public abstract T getOption(SctpSocketOption name) throws IOException; - public abstract SctpChannel setOption(SctpSocketOption name, T value) throws IOException; + public abstract SctpServerChannel setOption(SctpSocketOption name, T value) throws IOException; public abstract Set getAllLocalAddresses() throws IOException; diff --git a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java index c57b4b2afc..c7fce485bc 100644 --- a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java @@ -17,17 +17,16 @@ package io.netty.bootstrap; import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; - import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelInboundMessageHandler; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundMessageHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; import io.netty.channel.socket.SocketChannel; @@ -232,6 +231,7 @@ public class ServerBootstrap extends AbstractBootstrap { try { childGroup.register(child); } catch (Throwable t) { + child.unsafe().closeForcibly(); logger.warn("Failed to register an accepted channel: " + child, t); } } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 46096f882d..49d854f82c 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -547,6 +547,15 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } + @Override + public final void closeForcibly() { + try { + doClose(); + } catch (Exception e) { + logger.warn("Failed to close a channel.", e); + } + } + @Override public final void deregister(final ChannelFuture future) { if (eventLoop().inEventLoop()) { diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index 788daedce8..ea0d3e6969 100755 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -248,6 +248,12 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFu */ void close(ChannelFuture future); + /** + * Closes the {@link Channel} immediately without firing any events. Probably only useful + * when registration attempt failed. + */ + void closeForcibly(); + /** * Deregister the {@link Channel} of the {@link ChannelFuture} from {@link EventLoop} and notify the * {@link ChannelFuture} once the operation was complete. diff --git a/transport/src/main/java/io/netty/channel/ChannelInitializer.java b/transport/src/main/java/io/netty/channel/ChannelInitializer.java index 0ec4da0a17..a347e75eda 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInitializer.java +++ b/transport/src/main/java/io/netty/channel/ChannelInitializer.java @@ -26,6 +26,7 @@ public abstract class ChannelInitializer extends ChannelState public abstract void initChannel(C ch) throws Exception; + @SuppressWarnings("unchecked") @Override public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java index 9bb9b6f690..b7fd02ed60 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java @@ -48,6 +48,7 @@ public class DefaultChannelConfig implements ChannelConfig { return result; } + @SuppressWarnings("unchecked") @Override public boolean setOptions(Map, ?> options) { if (options == null) { @@ -64,6 +65,7 @@ public class DefaultChannelConfig implements ChannelConfig { return setAllOptions; } + @SuppressWarnings("unchecked") @Override public T getOption(ChannelOption option) { if (option == null) { diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index bf6d4aea66..6231ac7796 100755 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -388,6 +388,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { return remove(getContextOrDie(name)).handler(); } + @SuppressWarnings("unchecked") @Override public T remove(Class handlerType) { return (T) remove(getContextOrDie(handlerType)).handler(); @@ -784,6 +785,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } + @SuppressWarnings("unchecked") @Override public T get(Class handlerType) { ChannelHandlerContext ctx = context(handlerType); diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index c36db6b588..0b02e1290f 100755 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -23,6 +23,7 @@ import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelPipeline; import io.netty.channel.DefaultChannelConfig; import io.netty.channel.EventLoop; import io.netty.channel.SingleThreadEventExecutor; @@ -33,6 +34,7 @@ import java.nio.channels.AlreadyConnectedException; import java.nio.channels.ClosedChannelException; import java.nio.channels.ConnectionPendingException; import java.nio.channels.NotYetConnectedException; +import java.util.Collections; /** * A {@link Channel} for the local transport. @@ -215,16 +217,24 @@ public class LocalChannel extends AbstractChannel { } final LocalChannel peer = this.peer; - assert peer != null; + final ChannelPipeline peerPipeline = peer.pipeline(); + final EventLoop peerLoop = peer.eventLoop(); - buf.drainTo(peer.pipeline().inboundMessageBuffer()); - - peer.eventLoop().execute(new Runnable() { - @Override - public void run() { - peer.pipeline().fireInboundBufferUpdated(); - } - }); + if (peerLoop == eventLoop()) { + buf.drainTo(peerPipeline.inboundMessageBuffer()); + peerPipeline.fireInboundBufferUpdated(); + } else { + final Object[] msgs = buf.toArray(); + buf.clear(); + peerLoop.execute(new Runnable() { + @Override + public void run() { + MessageBuf buf = peerPipeline.inboundMessageBuffer(); + Collections.addAll(buf, msgs); + peerPipeline.fireInboundBufferUpdated(); + } + }); + } } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java index 665196a1b4..f1f9a1c2b5 100644 --- a/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java @@ -61,6 +61,7 @@ public class DefaultDatagramChannelConfig extends DefaultChannelConfig implement IP_MULTICAST_ADDR, IP_MULTICAST_IF, IP_MULTICAST_TTL, IP_TOS, UDP_RECEIVE_PACKET_SIZE); } + @SuppressWarnings("unchecked") @Override public T getOption(ChannelOption option) { if (option == SO_BROADCAST) { @@ -82,12 +83,10 @@ public class DefaultDatagramChannelConfig extends DefaultChannelConfig implement return (T) Boolean.valueOf(isLoopbackModeDisabled()); } if (option == IP_MULTICAST_ADDR) { - @SuppressWarnings("unchecked") T i = (T) getInterface(); return i; } if (option == IP_MULTICAST_IF) { - @SuppressWarnings("unchecked") T i = (T) getNetworkInterface(); return i; } diff --git a/transport/src/main/java/io/netty/channel/socket/DefaultSctpChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/DefaultSctpChannelConfig.java index 03ab6d9ad0..fa0dbae987 100644 --- a/transport/src/main/java/io/netty/channel/socket/DefaultSctpChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/DefaultSctpChannelConfig.java @@ -48,6 +48,7 @@ public class DefaultSctpChannelConfig extends DefaultChannelConfig implements Sc } + @SuppressWarnings("unchecked") @Override public T getOption(ChannelOption option) { if (option == SO_RCVBUF) { diff --git a/transport/src/main/java/io/netty/channel/socket/DefaultSctpServerChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/DefaultSctpServerChannelConfig.java index c5d2c44bb5..fb7a87568a 100644 --- a/transport/src/main/java/io/netty/channel/socket/DefaultSctpServerChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/DefaultSctpServerChannelConfig.java @@ -28,7 +28,6 @@ import java.util.Map; import static com.sun.nio.sctp.SctpStandardSocketOptions.SCTP_INIT_MAXSTREAMS; import static com.sun.nio.sctp.SctpStandardSocketOptions.SO_RCVBUF; import static com.sun.nio.sctp.SctpStandardSocketOptions.SO_SNDBUF; -import static io.netty.channel.ChannelOption.SCTP_NODELAY; /** * The default {@link SctpServerChannelConfig} implementation for SCTP. @@ -56,6 +55,7 @@ public class DefaultSctpServerChannelConfig extends DefaultChannelConfig impleme } + @SuppressWarnings("unchecked") @Override public T getOption(ChannelOption option) { if (option == ChannelOption.SO_RCVBUF) { diff --git a/transport/src/main/java/io/netty/channel/socket/DefaultServerSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/DefaultServerSocketChannelConfig.java index 71ada68669..2d75c4fcf0 100644 --- a/transport/src/main/java/io/netty/channel/socket/DefaultServerSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/DefaultServerSocketChannelConfig.java @@ -49,6 +49,7 @@ public class DefaultServerSocketChannelConfig extends DefaultChannelConfig return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG); } + @SuppressWarnings("unchecked") @Override public T getOption(ChannelOption option) { if (option == SO_RCVBUF) { diff --git a/transport/src/main/java/io/netty/channel/socket/DefaultSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/DefaultSocketChannelConfig.java index e900b68fee..9c1521e4a2 100644 --- a/transport/src/main/java/io/netty/channel/socket/DefaultSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/DefaultSocketChannelConfig.java @@ -51,6 +51,7 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig ALLOW_HALF_CLOSURE); } + @SuppressWarnings("unchecked") @Override public T getOption(ChannelOption option) { if (option == SO_RCVBUF) { diff --git a/transport/src/main/java/io/netty/channel/socket/SctpNotificationEvent.java b/transport/src/main/java/io/netty/channel/socket/SctpNotificationEvent.java index 3fefa73162..b9a97416f3 100644 --- a/transport/src/main/java/io/netty/channel/socket/SctpNotificationEvent.java +++ b/transport/src/main/java/io/netty/channel/socket/SctpNotificationEvent.java @@ -27,10 +27,17 @@ public final class SctpNotificationEvent { this.attachment = attachment; } + /** + * Return the {@link Notification} + */ public Notification notification() { return notification; } + /** + * Return the attachment of this {@link SctpNotificationEvent}, or + * null if no attachment was provided + */ public Object attachment() { return attachment; } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java index ac31001d50..7b7dc8e136 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java @@ -68,7 +68,7 @@ abstract class AbstractAioChannel extends AbstractChannel { @Override protected Runnable doRegister() throws Exception { - if (((AioEventLoop) eventLoop()).parent() != group) { + if (eventLoop().parent() != group) { throw new ChannelException( getClass().getSimpleName() + " must be registered to the " + AioEventLoopGroup.class.getSimpleName() + " which was specified in the constructor."); diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannelConfig.java index 990e8172b5..d23dae7241 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannelConfig.java @@ -45,6 +45,7 @@ final class AioServerSocketChannelConfig extends DefaultChannelConfig return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG); } + @SuppressWarnings("unchecked") @Override public T getOption(ChannelOption option) { if (option == SO_RCVBUF) { diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java index 3be37ea7a2..a03acb25d4 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java @@ -57,6 +57,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig AIO_READ_TIMEOUT, AIO_WRITE_TIMEOUT, ALLOW_HALF_CLOSURE); } + @SuppressWarnings("unchecked") @Override public T getOption(ChannelOption option) { if (option == SO_RCVBUF) { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/SelectorUtil.java b/transport/src/main/java/io/netty/channel/socket/nio/SelectorUtil.java index 459fa06a8b..e47802ac62 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/SelectorUtil.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/SelectorUtil.java @@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit; final class SelectorUtil { private static final InternalLogger logger = InternalLoggerFactory.getInstance(SelectorUtil.class); - static final long DEFAULT_SELECT_TIMEOUT = 10; + static final long DEFAULT_SELECT_TIMEOUT = 500; static final long SELECT_TIMEOUT = SystemPropertyUtil.getLong("io.netty.selectTimeout", DEFAULT_SELECT_TIMEOUT); static final long SELECT_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(SELECT_TIMEOUT); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoopGroup.java index d2232cadfe..75e65d8056 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoopGroup.java @@ -19,9 +19,9 @@ package io.netty.channel.socket.oio; import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelTaskScheduler; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; -import io.netty.channel.ChannelTaskScheduler; import java.util.Collections; import java.util.Queue; diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioSctpChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioSctpChannel.java index 586e885cc2..feb1e904f6 100755 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioSctpChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioSctpChannel.java @@ -52,7 +52,7 @@ public class OioSctpChannel extends AbstractOioMessageChannel private final SctpChannel ch; private final SctpChannelConfig config; - private final NotificationHandler notificationHandler; + private final NotificationHandler notificationHandler; private static SctpChannel openChannel() { try { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioSctpServerChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioSctpServerChannel.java index 5f31c9a507..11d0ebe30d 100755 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioSctpServerChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioSctpServerChannel.java @@ -21,9 +21,7 @@ import io.netty.buffer.ChannelBufType; import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelException; import io.netty.channel.ChannelMetadata; -import io.netty.channel.socket.DefaultSctpChannelConfig; import io.netty.channel.socket.DefaultSctpServerChannelConfig; -import io.netty.channel.socket.SctpNotificationHandler; import io.netty.channel.socket.SctpServerChannelConfig; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java new file mode 100644 index 0000000000..87fdfe41ac --- /dev/null +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java @@ -0,0 +1,153 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.local; + +import static org.junit.Assert.*; +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.MessageBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundMessageHandlerAdapter; + +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +public class LocalTransportThreadModelTest2 { + + private static final String LOCAL_CHANNEL = LocalTransportThreadModelTest2.class.getName(); + + static final int messageCountPerRun = 4; + + @Test(timeout = 15000) + public void testSocketReuse() throws InterruptedException { + ServerBootstrap serverBootstrap = new ServerBootstrap(); + LocalHander serverHandler = new LocalHander("SERVER"); + serverBootstrap + .group(new LocalEventLoopGroup(), new LocalEventLoopGroup()) + .channel(LocalServerChannel.class) + .localAddress(new LocalAddress(LOCAL_CHANNEL)) + .childHandler(serverHandler); + + Bootstrap clientBootstrap = new Bootstrap(); + LocalHander clientHandler = new LocalHander("CLIENT"); + clientBootstrap + .group(new LocalEventLoopGroup()) + .channel(LocalChannel.class) + .remoteAddress(new LocalAddress(LOCAL_CHANNEL)).handler(clientHandler); + + serverBootstrap.bind().sync(); + + int count = 100; + for (int i = 1; i < count + 1; i ++) { + Channel ch = clientBootstrap.connect().sync().channel(); + + // SPIN until we get what we are looking for. + int target = i * messageCountPerRun; + while (serverHandler.count.get() != target || clientHandler.count.get() != target) { + Thread.sleep(50); + } + close(ch, clientHandler); + } + + assertEquals(count * 2 * messageCountPerRun, serverHandler.count.get() + + clientHandler.count.get()); + } + + public void close(final Channel localChannel, final LocalHander localRegistrationHandler) { + // we want to make sure we actually shutdown IN the event loop + if (localChannel.eventLoop().inEventLoop()) { + MessageBuf outboundMessageBuffer = + localChannel.pipeline().outboundMessageBuffer(); + if (!outboundMessageBuffer.isEmpty()) { + System.err.println("NOT EMPTY TO SEND!"); + } + + // Wait until all messages are flushed before closing the channel. + if (localRegistrationHandler.lastWriteFuture != null) { + localRegistrationHandler.lastWriteFuture.awaitUninterruptibly(); + } + + MessageBuf inboundMessageBuffer = + localChannel.pipeline().inboundMessageBuffer(); + if (!inboundMessageBuffer.isEmpty()) { + // sometimes we close the pipeline before everything on it has been notified/received. + // we want these messages, since they are in our queue. + Iterator iterator = inboundMessageBuffer.iterator(); + while (iterator.hasNext()) { + Object next = iterator.next(); + System.err.println("DEFERRED on close: " + next); + iterator.remove(); + } + } + + localChannel.close(); + return; + } else { + localChannel.eventLoop().execute(new Runnable() { + @Override + public void run() { + close(localChannel, localRegistrationHandler); + } + }); + } + + // Wait until the connection is closed or the connection attempt fails. + localChannel.closeFuture().awaitUninterruptibly(); + + MessageBuf inboundMessageBuffer = localChannel.pipeline().inboundMessageBuffer(); + if (!inboundMessageBuffer.isEmpty()) { + // sometimes we close the pipeline before everything on it has been notified/received. + // we want these messages, since they are in our queue. + Iterator iterator = inboundMessageBuffer.iterator(); + while (iterator.hasNext()) { + Object next = iterator.next(); + System.err.println("DEFERRED on close: " + next); + iterator.remove(); + } + } + } + + @Sharable + class LocalHander extends ChannelInboundMessageHandlerAdapter { + private final String name; + + public volatile ChannelFuture lastWriteFuture = null; + + public AtomicInteger count = new AtomicInteger(0); + + public LocalHander(String name) { + this.name = name; + + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + for (int i = 0; i < messageCountPerRun; i ++) { + lastWriteFuture = ctx.channel().write(name + " " + i); + } + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { + count.incrementAndGet(); + } + } +} \ No newline at end of file