Resolved conflicts in SctpData/SctpMessage refactoring

This commit is contained in:
Jestan Nirojan 2012-09-29 02:01:00 +08:00
commit 6904b62c2f
88 changed files with 1151 additions and 254 deletions

View File

@ -20,7 +20,7 @@
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.0.0.Alpha5-SNAPSHOT</version>
<version>4.0.0.Alpha6-SNAPSHOT</version>
</parent>
<artifactId>netty</artifactId>
@ -117,7 +117,6 @@
<plugins>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>2.5</version>
<executions>
<execution>
<id>clean-first</id>
@ -130,7 +129,6 @@
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<id>unpack-sources</id>
@ -162,7 +160,6 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<id>add-source</id>
@ -203,7 +200,6 @@
<!-- Disable all plugin executions configured by jar packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>2.5</version>
<executions>
<execution>
<id>default-resources</id>
@ -239,7 +235,6 @@
</plugin>
<plugin>
<artifactId>maven-jxr-plugin</artifactId>
<version>2.2</version>
<executions>
<execution>
<id>generate-xref</id>
@ -259,6 +254,13 @@
<docTitle>Netty Source Xref (${project.version})</docTitle>
<windowTitle>Netty Source Xref (${project.version})</windowTitle>
</configuration>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-build</artifactId>
<version>10</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<artifactId>maven-javadoc-plugin</artifactId>

View File

@ -20,7 +20,7 @@
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.0.0.Alpha5-SNAPSHOT</version>
<version>4.0.0.Alpha6-SNAPSHOT</version>
</parent>
<artifactId>netty-buffer</artifactId>

View File

@ -578,14 +578,19 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
@Override
public void getBytes(int index, byte[] dst, int dstIndex, int length) {
int componentId = toComponentIndex(index);
if (index > capacity() - length || dstIndex > dst.length - length) {
throw new IndexOutOfBoundsException("Too many bytes to read - Needs "
+ (index + length) + ", maximum is " + capacity() + " or "
+ dst.length);
}
if (index < 0) {
throw new IndexOutOfBoundsException("index must be >= 0");
}
if (length == 0) {
return;
}
int i = toComponentIndex(index);
int i = componentId;
while (length > 0) {
Component c = components.get(i);
ByteBuf s = c.buf;
@ -601,15 +606,20 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
@Override
public void getBytes(int index, ByteBuffer dst) {
int componentId = toComponentIndex(index);
int limit = dst.limit();
int length = dst.remaining();
if (index > capacity() - length) {
throw new IndexOutOfBoundsException("Too many bytes to be read - Needs "
+ (index + length) + ", maximum is " + capacity());
}
int i = componentId;
if (index < 0) {
throw new IndexOutOfBoundsException("index must be >= 0");
}
if (length == 0) {
return;
}
int i = toComponentIndex(index);
try {
while (length > 0) {
Component c = components.get(i);
@ -629,14 +639,18 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
@Override
public void getBytes(int index, ByteBuf dst, int dstIndex, int length) {
int componentId = toComponentIndex(index);
if (index > capacity() - length || dstIndex > dst.capacity() - length) {
throw new IndexOutOfBoundsException("Too many bytes to be read - Needs "
+ (index + length) + " or " + (dstIndex + length) + ", maximum is "
+ capacity() + " or " + dst.capacity());
}
int i = componentId;
if (index < 0) {
throw new IndexOutOfBoundsException("index must be >= 0");
}
if (length == 0) {
return;
}
int i = toComponentIndex(index);
while (length > 0) {
Component c = components.get(i);
ByteBuf s = c.buf;
@ -670,13 +684,18 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
@Override
public void getBytes(int index, OutputStream out, int length)
throws IOException {
int componentId = toComponentIndex(index);
if (index > capacity() - length) {
throw new IndexOutOfBoundsException("Too many bytes to be read - needs "
+ (index + length) + ", maximum of " + capacity());
}
if (index < 0) {
throw new IndexOutOfBoundsException("index must be >= 0");
}
if (length == 0) {
return;
}
int i = componentId;
int i = toComponentIndex(index);
while (length > 0) {
Component c = components.get(i);
ByteBuf s = c.buf;
@ -1031,11 +1050,17 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
@Override
public ByteBuffer[] nioBuffers(int index, int length) {
int componentId = toComponentIndex(index);
if (index + length > capacity()) {
throw new IndexOutOfBoundsException("Too many bytes to convert - Needs"
+ (index + length) + ", maximum is " + capacity());
}
if (index < 0) {
throw new IndexOutOfBoundsException("index must be >= 0");
}
if (length == 0) {
return new ByteBuffer[0];
}
int componentId = toComponentIndex(index);
List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(components.size());

View File

@ -410,4 +410,11 @@ public abstract class AbstractCompositeChannelBufferTest extends
wrappedBuffer(new byte[] { 0, 1, 2, 3, 4, 6, 7, 8, 5, 9, 10, 11 }, 6, 5).order(order));
assertFalse(ByteBufUtil.equals(a, b));
}
@Test
public void testEmptyBuffer() {
ByteBuf b = wrappedBuffer(new byte[] {1, 2}, new byte[] {3, 4});
b.readBytes(new byte[4]);
b.readBytes(new byte[0]);
}
}

View File

@ -20,7 +20,7 @@
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.0.0.Alpha5-SNAPSHOT</version>
<version>4.0.0.Alpha6-SNAPSHOT</version>
</parent>
<artifactId>netty-codec-http</artifactId>

View File

@ -20,7 +20,7 @@ import io.netty.buffer.ByteBuf;
/**
* The default {@link HttpChunk} implementation.
*/
public class DefaultHttpChunk implements HttpChunk {
public class DefaultHttpChunk extends DefaultHttpObject implements HttpChunk {
private ByteBuf content;
private boolean last;
@ -51,4 +51,24 @@ public class DefaultHttpChunk implements HttpChunk {
public boolean isLast() {
return last;
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder();
buf.append(getClass().getSimpleName());
final boolean last = isLast();
buf.append("(last: ");
buf.append(last);
if (!last) {
buf.append(", size: ");
buf.append(getContent().readableBytes());
}
buf.append(", decodeResult: ");
buf.append(getDecoderResult());
buf.append(')');
return buf.toString();
}
}

View File

@ -17,6 +17,7 @@ package io.netty.handler.codec.http;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.internal.StringUtil;
import java.util.List;
import java.util.Map;
@ -25,7 +26,7 @@ import java.util.Set;
/**
* The default {@link HttpChunkTrailer} implementation.
*/
public class DefaultHttpChunkTrailer implements HttpChunkTrailer {
public class DefaultHttpChunkTrailer extends DefaultHttpObject implements HttpChunkTrailer {
private final HttpHeaders headers = new HttpHeaders() {
@Override
@ -104,4 +105,37 @@ public class DefaultHttpChunkTrailer implements HttpChunkTrailer {
public void setContent(ByteBuf content) {
throw new IllegalStateException("read-only");
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder();
buf.append(getClass().getSimpleName());
final boolean last = isLast();
buf.append("(last: ");
buf.append(last);
if (!last) {
buf.append(", size: ");
buf.append(getContent().readableBytes());
}
buf.append(", decodeResult: ");
buf.append(getDecoderResult());
buf.append(')');
buf.append(StringUtil.NEWLINE);
appendHeaders(buf);
// Remove the last newline.
buf.setLength(buf.length() - StringUtil.NEWLINE.length());
return buf.toString();
}
private void appendHeaders(StringBuilder buf) {
for (Map.Entry<String, String> e: getHeaders()) {
buf.append(e.getKey());
buf.append(": ");
buf.append(e.getValue());
buf.append(StringUtil.NEWLINE);
}
}
}

View File

@ -26,7 +26,7 @@ import java.util.Set;
/**
* The default {@link HttpMessage} implementation.
*/
public class DefaultHttpMessage implements HttpMessage {
public class DefaultHttpMessage extends DefaultHttpObject implements HttpMessage {
private final HttpHeaders headers = new HttpHeaders();
private HttpVersion version;

View File

@ -0,0 +1,40 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http;
import io.netty.handler.codec.DecoderResult;
public class DefaultHttpObject implements HttpObject {
private DecoderResult decodeResult = DecoderResult.SUCCESS;
protected DefaultHttpObject() {
// Disallow direct instantiation
}
@Override
public DecoderResult getDecoderResult() {
return decodeResult;
}
@Override
public void setDecoderResult(DecoderResult result) {
if (result == null) {
throw new NullPointerException("result");
}
decodeResult = result;
}
}

View File

@ -70,6 +70,8 @@ public class DefaultHttpRequest extends DefaultHttpMessage implements HttpReques
buf.append(getClass().getSimpleName());
buf.append("(transferEncoding: ");
buf.append(getTransferEncoding());
buf.append(", decodeResult: ");
buf.append(getDecoderResult());
buf.append(')');
buf.append(StringUtil.NEWLINE);
buf.append(getMethod().toString());

View File

@ -54,6 +54,8 @@ public class DefaultHttpResponse extends DefaultHttpMessage implements HttpRespo
buf.append(getClass().getSimpleName());
buf.append("(transferEncoding: ");
buf.append(getTransferEncoding());
buf.append(", decodeResult: ");
buf.append(getDecoderResult());
buf.append(')');
buf.append(StringUtil.NEWLINE);
buf.append(getProtocolVersion().getText());

View File

@ -18,6 +18,7 @@ package io.netty.handler.codec.http;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DecoderResult;
import java.util.Collections;
import java.util.List;
@ -33,7 +34,7 @@ import java.util.Set;
* {@link ChannelPipeline}.
* @apiviz.landmark
*/
public interface HttpChunk {
public interface HttpChunk extends HttpObject {
/**
* The 'end of content' marker in chunked encoding.
@ -103,6 +104,16 @@ public interface HttpChunk {
public void setHeader(String name, Iterable<?> values) {
throw new IllegalStateException("read-only");
}
@Override
public DecoderResult getDecoderResult() {
return DecoderResult.SUCCESS;
}
@Override
public void setDecoderResult(DecoderResult result) {
throw new IllegalStateException("read-only");
}
};
/**

View File

@ -22,6 +22,7 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.util.CharsetUtil;
@ -46,7 +47,7 @@ import java.util.Map.Entry;
* @apiviz.landmark
* @apiviz.has io.netty.handler.codec.http.HttpChunk oneway - - filters out
*/
public class HttpChunkAggregator extends MessageToMessageDecoder<Object, HttpMessage> {
public class HttpChunkAggregator extends MessageToMessageDecoder<HttpObject, HttpMessage> {
public static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024;
private static final ByteBuf CONTINUE = Unpooled.copiedBuffer(
"HTTP/1.1 100 Continue\r\n\r\n", CharsetUtil.US_ASCII);
@ -66,6 +67,8 @@ public class HttpChunkAggregator extends MessageToMessageDecoder<Object, HttpMes
* a {@link TooLongFrameException} will be raised.
*/
public HttpChunkAggregator(int maxContentLength) {
super(HttpObject.class);
if (maxContentLength <= 0) {
throw new IllegalArgumentException(
"maxContentLength must be a positive integer: " +
@ -107,12 +110,7 @@ public class HttpChunkAggregator extends MessageToMessageDecoder<Object, HttpMes
}
@Override
public boolean isDecodable(Object msg) throws Exception {
return msg instanceof HttpMessage || msg instanceof HttpChunk;
}
@Override
public HttpMessage decode(ChannelHandlerContext ctx, Object msg) throws Exception {
public HttpMessage decode(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
HttpMessage currentMessage = this.currentMessage;
if (msg instanceof HttpMessage) {
@ -127,6 +125,12 @@ public class HttpChunkAggregator extends MessageToMessageDecoder<Object, HttpMes
ctx.write(CONTINUE.duplicate());
}
if (!m.getDecoderResult().isSuccess()) {
m.setTransferEncoding(HttpTransferEncoding.SINGLE);
this.currentMessage = null;
return m;
}
switch (m.getTransferEncoding()) {
case SINGLE:
this.currentMessage = null;
@ -167,7 +171,16 @@ public class HttpChunkAggregator extends MessageToMessageDecoder<Object, HttpMes
// Append the content of the chunk
appendToCumulation(chunk.getContent());
if (chunk.isLast()) {
final boolean last;
if (!chunk.getDecoderResult().isSuccess()) {
currentMessage.setDecoderResult(
DecoderResult.partialFailure(chunk.getDecoderResult().cause()));
last = true;
} else {
last = chunk.isLast();
}
if (last) {
this.currentMessage = null;
// Merge trailing headers into the message.

View File

@ -48,11 +48,7 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<Object,
* Creates a new instance.
*/
protected HttpContentDecoder() {
}
@Override
public boolean isDecodable(Object msg) throws Exception {
return msg instanceof HttpMessage || msg instanceof HttpChunk;
super(HttpObject.class);
}
@Override

View File

@ -55,11 +55,9 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessa
* Creates a new instance.
*/
protected HttpContentEncoder() {
}
@Override
public boolean isDecodable(Object msg) throws Exception {
return msg instanceof HttpMessage;
super(
new Class<?>[] { HttpMessage.class },
new Class<?>[] { HttpObject.class });
}
@Override
@ -74,11 +72,6 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessa
return msg;
}
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof HttpMessage || msg instanceof HttpChunk;
}
@Override
public Object encode(ChannelHandlerContext ctx, Object msg)
throws Exception {

View File

@ -62,6 +62,34 @@ public class HttpHeaders {
* {@code "Accept-Patch"}
*/
public static final String ACCEPT_PATCH = "Accept-Patch";
/**
* {@code "Access-Control-Allow-Credentials"}
*/
public static final String ACCESS_CONTROL_ALLOW_CREDENTIALS = "Access-Control-Allow-Credentials";
/**
* {@code "Access-Control-Allow-Headers"}
*/
public static final String ACCESS_CONTROL_ALLOW_HEADERS = "Access-Control-Allow-Headers";
/**
* {@code "Access-Control-Allow-Methods"}
*/
public static final String ACCESS_CONTROL_ALLOW_METHODS = "Access-Control-Allow-Methods";
/**
* {@code "Access-Control-Allow-Origin"}
*/
public static final String ACCESS_CONTROL_ALLOW_ORIGIN = "Access-Control-Allow-Origin";
/**
* {@code "Access-Control-Max-Age"}
*/
public static final String ACCESS_CONTROL_MAX_AGE = "Access-Control-Max-Age";
/**
* {@code "Access-Control-Request-Headers"}
*/
public static final String ACCESS_CONTROL_REQUEST_HEADERS = "Access-Control-Request-Headers";
/**
* {@code "Access-Control-Request-Method"}
*/
public static final String ACCESS_CONTROL_REQUEST_METHOD = "Access-Control-Request-Method";
/**
* {@code "Age"}
*/

View File

@ -34,7 +34,7 @@ import java.util.Set;
* @apiviz.landmark
* @apiviz.has io.netty.handler.codec.http.HttpChunk oneway - - is followed by
*/
public interface HttpMessage {
public interface HttpMessage extends HttpObject {
/**
* Returns the value of a header with the specified name. If there are

View File

@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.handler.codec.TooLongFrameException;
@ -125,7 +126,8 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<Object, HttpMe
READ_CHUNKED_CONTENT,
READ_CHUNKED_CONTENT_AS_CHUNKS,
READ_CHUNK_DELIMITER,
READ_CHUNK_FOOTER
READ_CHUNK_FOOTER,
BAD_MESSAGE
}
/**
@ -176,7 +178,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<Object, HttpMe
checkpoint();
}
}
case READ_INITIAL: {
case READ_INITIAL: try {
String[] initialLine = splitInitialLine(readLine(buffer, maxInitialLineLength));
if (initialLine.length < 3) {
// Invalid initial line - ignore.
@ -186,8 +188,10 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<Object, HttpMe
message = createMessage(initialLine);
checkpoint(State.READ_HEADER);
} catch (Exception e) {
return invalidMessage(e);
}
case READ_HEADER: {
case READ_HEADER: try {
State nextState = readHeaders(buffer);
checkpoint(nextState);
if (nextState == State.READ_CHUNK_SIZE) {
@ -195,7 +199,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<Object, HttpMe
return message;
} else if (nextState == State.SKIP_CONTROL_CHARS) {
// No content is expected.
return message;
return reset();
} else {
long contentLength = HttpHeaders.getContentLength(message, -1);
if (contentLength == 0 || contentLength == -1 && isDecodingRequest()) {
@ -229,6 +233,8 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<Object, HttpMe
}
// We return null here, this forces decode to be called again where we will decode the content
return null;
} catch (Exception e) {
return invalidMessage(e);
}
case READ_VARIABLE_LENGTH_CONTENT: {
int toRead = actualReadableBytes();
@ -238,7 +244,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<Object, HttpMe
if (message.getTransferEncoding() != HttpTransferEncoding.STREAMED) {
message.setTransferEncoding(HttpTransferEncoding.STREAMED);
return new Object[] {message, new DefaultHttpChunk(buffer.readBytes(toRead))};
return new Object[] { message, new DefaultHttpChunk(buffer.readBytes(toRead))};
} else {
return new DefaultHttpChunk(buffer.readBytes(toRead));
}
@ -308,7 +314,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<Object, HttpMe
* everything else after this point takes care of reading chunked content. basically, read chunk size,
* read chunk, read and ignore the CRLF and repeat until 0
*/
case READ_CHUNK_SIZE: {
case READ_CHUNK_SIZE: try {
String line = readLine(buffer, maxInitialLineLength);
int chunkSize = getChunkSize(line);
this.chunkSize = chunkSize;
@ -321,6 +327,8 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<Object, HttpMe
} else {
checkpoint(State.READ_CHUNKED_CONTENT);
}
} catch (Exception e) {
return invalidChunk(e);
}
case READ_CHUNKED_CONTENT: {
assert chunkSize <= Integer.MAX_VALUE;
@ -378,10 +386,12 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<Object, HttpMe
} else if (next == HttpConstants.LF) {
checkpoint(State.READ_CHUNK_SIZE);
return null;
} else {
checkpoint();
}
}
}
case READ_CHUNK_FOOTER: {
case READ_CHUNK_FOOTER: try {
HttpChunkTrailer trailer = readTrailingHeaders(buffer);
if (maxChunkSize == 0) {
// Chunked encoding disabled.
@ -391,6 +401,13 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<Object, HttpMe
// The last chunk, which is empty
return trailer;
}
} catch (Exception e) {
return invalidChunk(e);
}
case BAD_MESSAGE: {
// Keep discarding until disconnection.
buffer.skipBytes(actualReadableBytes());
return null;
}
default: {
throw new Error("Shouldn't reach here.");
@ -439,6 +456,24 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<Object, HttpMe
return message;
}
private HttpMessage invalidMessage(Exception cause) {
checkpoint(State.BAD_MESSAGE);
if (message != null) {
message.setDecoderResult(DecoderResult.partialFailure(cause));
} else {
message = createInvalidMessage();
message.setDecoderResult(DecoderResult.failure(cause));
}
return message;
}
private HttpChunk invalidChunk(Exception cause) {
checkpoint(State.BAD_MESSAGE);
HttpChunk chunk = new DefaultHttpChunk(Unpooled.EMPTY_BUFFER);
chunk.setDecoderResult(DecoderResult.failure(cause));
return chunk;
}
private static void skipControlCharacters(ByteBuf buffer) {
for (;;) {
char c = (char) buffer.readUnsignedByte();
@ -621,6 +656,8 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<Object, HttpMe
protected abstract boolean isDecodingRequest();
protected abstract HttpMessage createMessage(String[] initialLine) throws Exception;
protected abstract HttpMessage createInvalidMessage();
private static int getChunkSize(String hex) {
hex = hex.trim();

View File

@ -50,11 +50,7 @@ public abstract class HttpMessageEncoder extends MessageToByteEncoder<Object> {
* Creates a new instance.
*/
protected HttpMessageEncoder() {
}
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof HttpMessage || msg instanceof HttpChunk;
super(HttpObject.class);
}
@Override

View File

@ -0,0 +1,23 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http;
import io.netty.handler.codec.DecoderResult;
public interface HttpObject {
DecoderResult getDecoderResult();
void setDecoderResult(DecoderResult result);
}

View File

@ -76,6 +76,11 @@ public class HttpRequestDecoder extends HttpMessageDecoder {
HttpVersion.valueOf(initialLine[2]), HttpMethod.valueOf(initialLine[0]), initialLine[1]);
}
@Override
protected HttpMessage createInvalidMessage() {
return new DefaultHttpRequest(HttpVersion.HTTP_1_0, HttpMethod.GET, "/bad-request");
}
@Override
protected boolean isDecodingRequest() {
return true;

View File

@ -83,6 +83,8 @@ import io.netty.handler.codec.TooLongFrameException;
*/
public class HttpResponseDecoder extends HttpMessageDecoder {
private static final HttpResponseStatus UNKNOWN_STATUS = new HttpResponseStatus(999, "Unknown");
/**
* Creates a new instance with the default
* {@code maxInitialLineLength (4096}}, {@code maxHeaderSize (8192)}, and
@ -106,6 +108,11 @@ public class HttpResponseDecoder extends HttpMessageDecoder {
new HttpResponseStatus(Integer.valueOf(initialLine[1]), initialLine[2]));
}
@Override
protected HttpMessage createInvalidMessage() {
return new DefaultHttpResponse(HttpVersion.HTTP_1_0, UNKNOWN_STATUS);
}
@Override
protected boolean isDecodingRequest() {
return false;

View File

@ -32,9 +32,8 @@ import io.netty.handler.codec.MessageToByteEncoder;
@Sharable
public class WebSocket00FrameEncoder extends MessageToByteEncoder<WebSocketFrame> {
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof WebSocketFrame;
public WebSocket00FrameEncoder() {
super(WebSocketFrame.class);
}
@Override

View File

@ -90,17 +90,14 @@ public class WebSocket08FrameEncoder extends MessageToByteEncoder<WebSocketFrame
* false.
*/
public WebSocket08FrameEncoder(boolean maskPayload) {
super(WebSocketFrame.class);
this.maskPayload = maskPayload;
}
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof WebSocketFrame;
}
public void encode(
ChannelHandlerContext ctx, WebSocketFrame msg, ByteBuf out) throws Exception {
@Override
public void encode(ChannelHandlerContext ctx,
WebSocketFrame msg, ByteBuf out) throws Exception {
byte[] mask;
ByteBuf data = msg.getBinaryData();

View File

@ -71,6 +71,11 @@ public class RtspRequestDecoder extends RtspMessageDecoder {
RtspMethods.valueOf(initialLine[0]), initialLine[1]);
}
@Override
protected HttpMessage createInvalidMessage() {
return new DefaultHttpRequest(RtspVersions.RTSP_1_0, RtspMethods.OPTIONS, "/bad-request");
}
@Override
protected boolean isDecodingRequest() {
return true;

View File

@ -51,6 +51,8 @@ import io.netty.handler.codec.http.HttpResponseStatus;
*/
public class RtspResponseDecoder extends RtspMessageDecoder {
private static final HttpResponseStatus UNKNOWN_STATUS = new HttpResponseStatus(999, "Unknown");
/**
* Creates a new instance with the default
* {@code maxInitialLineLength (4096}}, {@code maxHeaderSize (8192)}, and
@ -74,6 +76,11 @@ public class RtspResponseDecoder extends RtspMessageDecoder {
new HttpResponseStatus(Integer.valueOf(initialLine[1]), initialLine[2]));
}
@Override
protected HttpMessage createInvalidMessage() {
return new DefaultHttpResponse(RtspVersions.RTSP_1_0, UNKNOWN_STATUS);
}
@Override
protected boolean isDecodingRequest() {
return false;

View File

@ -49,6 +49,8 @@ public class SpdyFrameEncoder extends MessageToByteEncoder<Object> {
* Creates a new instance with the specified parameters.
*/
public SpdyFrameEncoder(int version, int compressionLevel, int windowBits, int memLevel) {
super(SpdyDataFrame.class, SpdyControlFrame.class);
if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) {
throw new IllegalArgumentException(
"unknown version: " + version);
@ -74,11 +76,6 @@ public class SpdyFrameEncoder extends MessageToByteEncoder<Object> {
});
}
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof SpdyDataFrame || msg instanceof SpdyControlFrame;
}
@Override
public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
if (msg instanceof SpdyDataFrame) {

View File

@ -52,6 +52,8 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<Object, HttpMessage
* a {@link TooLongFrameException} will be raised.
*/
public SpdyHttpDecoder(int version, int maxContentLength) {
super(SpdyDataFrame.class, SpdyControlFrame.class);
if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) {
throw new IllegalArgumentException(
"unsupported version: " + version);

View File

@ -22,6 +22,7 @@ import io.netty.handler.codec.http.HttpChunk;
import io.netty.handler.codec.http.HttpChunkTrailer;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
@ -129,6 +130,8 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder<Object, Object> {
* @param version the protocol version
*/
public SpdyHttpEncoder(int version) {
super(HttpObject.class);
if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) {
throw new IllegalArgumentException(
"unsupported version: " + version);
@ -136,13 +139,6 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder<Object, Object> {
spdyVersion = version;
}
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof HttpRequest ||
msg instanceof HttpResponse ||
msg instanceof HttpChunk;
}
@Override
public Object encode(ChannelHandlerContext ctx, Object msg) throws Exception {

View File

@ -103,19 +103,7 @@ public class SpdySessionHandler
break;
}
if (msg instanceof SpdyDataFrame ||
msg instanceof SpdySynStreamFrame ||
msg instanceof SpdySynReplyFrame ||
msg instanceof SpdyRstStreamFrame ||
msg instanceof SpdySettingsFrame ||
msg instanceof SpdyPingFrame ||
msg instanceof SpdyGoAwayFrame ||
msg instanceof SpdyHeadersFrame ||
msg instanceof SpdyWindowUpdateFrame) {
handleInboundMessage(ctx, msg);
} else {
ctx.nextInboundMessageBuffer().add(msg);
}
handleInboundMessage(ctx, msg);
}
ctx.fireInboundBufferUpdated();
}

View File

@ -0,0 +1,117 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedByteChannel;
import io.netty.handler.codec.DecoderResult;
import io.netty.util.CharsetUtil;
import java.util.Random;
import org.junit.Assert;
import org.junit.Test;
public class HttpInvalidMessageTest {
private final Random rnd = new Random();
@Test
public void testRequestWithBadInitialLine() throws Exception {
EmbeddedByteChannel ch = new EmbeddedByteChannel(new HttpRequestDecoder());
ch.writeInbound(Unpooled.copiedBuffer("GET / HTTP/1.0 with extra\r\n", CharsetUtil.UTF_8));
HttpRequest req = (HttpRequest) ch.readInbound();
DecoderResult dr = req.getDecoderResult();
Assert.assertFalse(dr.isSuccess());
Assert.assertFalse(dr.isPartialFailure());
ensureInboundTrafficDiscarded(ch);
}
@Test
public void testRequestWithBadHeader() throws Exception {
EmbeddedByteChannel ch = new EmbeddedByteChannel(new HttpRequestDecoder());
ch.writeInbound(Unpooled.copiedBuffer("GET /maybe-something HTTP/1.0\r\n", CharsetUtil.UTF_8));
ch.writeInbound(Unpooled.copiedBuffer("Good_Name: Good Value\r\n", CharsetUtil.UTF_8));
ch.writeInbound(Unpooled.copiedBuffer("Bad=Name: Bad Value\r\n", CharsetUtil.UTF_8));
ch.writeInbound(Unpooled.copiedBuffer("\r\n", CharsetUtil.UTF_8));
HttpRequest req = (HttpRequest) ch.readInbound();
DecoderResult dr = req.getDecoderResult();
Assert.assertFalse(dr.isSuccess());
Assert.assertTrue(dr.isPartialFailure());
Assert.assertEquals("Good Value", req.getHeader("Good_Name"));
Assert.assertEquals("/maybe-something", req.getUri());
ensureInboundTrafficDiscarded(ch);
}
@Test
public void testResponseWithBadInitialLine() throws Exception {
EmbeddedByteChannel ch = new EmbeddedByteChannel(new HttpResponseDecoder());
ch.writeInbound(Unpooled.copiedBuffer("HTTP/1.0 BAD_CODE Bad Server\r\n", CharsetUtil.UTF_8));
HttpResponse res = (HttpResponse) ch.readInbound();
DecoderResult dr = res.getDecoderResult();
Assert.assertFalse(dr.isSuccess());
Assert.assertFalse(dr.isPartialFailure());
ensureInboundTrafficDiscarded(ch);
}
@Test
public void testResponseWithBadHeader() throws Exception {
EmbeddedByteChannel ch = new EmbeddedByteChannel(new HttpResponseDecoder());
ch.writeInbound(Unpooled.copiedBuffer("HTTP/1.0 200 Maybe OK\r\n", CharsetUtil.UTF_8));
ch.writeInbound(Unpooled.copiedBuffer("Good_Name: Good Value\r\n", CharsetUtil.UTF_8));
ch.writeInbound(Unpooled.copiedBuffer("Bad=Name: Bad Value\r\n", CharsetUtil.UTF_8));
ch.writeInbound(Unpooled.copiedBuffer("\r\n", CharsetUtil.UTF_8));
HttpResponse res = (HttpResponse) ch.readInbound();
DecoderResult dr = res.getDecoderResult();
Assert.assertFalse(dr.isSuccess());
Assert.assertTrue(dr.isPartialFailure());
Assert.assertEquals("Maybe OK", res.getStatus().getReasonPhrase());
Assert.assertEquals("Good Value", res.getHeader("Good_Name"));
ensureInboundTrafficDiscarded(ch);
}
@Test
public void testBadChunk() throws Exception {
EmbeddedByteChannel ch = new EmbeddedByteChannel(new HttpRequestDecoder());
ch.writeInbound(Unpooled.copiedBuffer("GET / HTTP/1.0\r\n", CharsetUtil.UTF_8));
ch.writeInbound(Unpooled.copiedBuffer("Transfer-Encoding: chunked\r\n\r\n", CharsetUtil.UTF_8));
ch.writeInbound(Unpooled.copiedBuffer("BAD_LENGTH\r\n", CharsetUtil.UTF_8));
HttpRequest req = (HttpRequest) ch.readInbound();
Assert.assertTrue(req.getDecoderResult().isSuccess());
HttpChunk chunk = (HttpChunk) ch.readInbound();
DecoderResult dr = chunk.getDecoderResult();
Assert.assertFalse(dr.isSuccess());
Assert.assertFalse(dr.isPartialFailure());
ensureInboundTrafficDiscarded(ch);
}
private void ensureInboundTrafficDiscarded(EmbeddedByteChannel ch) {
// Generate a lot of random traffic to ensure that it's discarded silently.
byte[] data = new byte[1048576];
rnd.nextBytes(data);
ByteBuf buf = Unpooled.wrappedBuffer(data);
for (int i = 0; i < 4096; i ++) {
buf.setIndex(0, data.length);
ch.writeInbound(buf);
ch.checkException();
Assert.assertNull(ch.readInbound());
}
}
}

View File

@ -20,7 +20,7 @@
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.0.0.Alpha5-SNAPSHOT</version>
<version>4.0.0.Alpha6-SNAPSHOT</version>
</parent>
<artifactId>netty-codec</artifactId>

View File

@ -82,6 +82,41 @@ final class CodecUtil {
msg.getClass().getSimpleName()));
}
private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
static Class<?>[] acceptedMessageTypes(Class<?>[] acceptedMsgTypes) {
if (acceptedMsgTypes == null) {
return EMPTY_TYPES;
}
int numElem = 0;
for (Class<?> c: acceptedMsgTypes) {
if (c == null) {
break;
}
numElem ++;
}
Class<?>[] newAllowedMsgTypes = new Class[numElem];
System.arraycopy(acceptedMsgTypes, 0, newAllowedMsgTypes, 0, numElem);
return newAllowedMsgTypes;
}
static boolean acceptMessage(Class<?>[] acceptedMsgTypes, Object msg) {
if (acceptedMsgTypes.length == 0) {
return true;
}
for (Class<?> c: acceptedMsgTypes) {
if (c.isInstance(msg)) {
return true;
}
}
return false;
}
private CodecUtil() {
// Unused
}

View File

@ -0,0 +1,85 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec;
public class DecoderResult {
public static final DecoderResult SUCCESS = new DecoderResult(false, null);
public static DecoderResult failure(Throwable cause) {
if (cause == null) {
throw new NullPointerException("cause");
}
return new DecoderResult(false, cause);
}
public static DecoderResult partialFailure(Throwable cause) {
if (cause == null) {
throw new NullPointerException("cause");
}
return new DecoderResult(true, cause);
}
private final boolean partial;
private final Throwable cause;
protected DecoderResult(boolean partial, Throwable cause) {
if (partial && cause == null) {
throw new IllegalArgumentException("successful result cannot be partial.");
}
this.partial = partial;
this.cause = cause;
}
public boolean isSuccess() {
return cause == null;
}
public boolean isFailure() {
return cause != null;
}
public boolean isCompleteFailure() {
return cause != null && !partial;
}
public boolean isPartialFailure() {
return partial;
}
public Throwable cause() {
return cause;
}
@Override
public String toString() {
if (isSuccess()) {
return "success";
}
String cause = cause().toString();
StringBuilder buf = new StringBuilder(cause.length() + 17);
if (isPartialFailure()) {
buf.append("partial_");
}
buf.append("failure(");
buf.append(cause);
buf.append(')');
return buf.toString();
}
}

View File

@ -83,6 +83,8 @@ public class LengthFieldPrepender extends MessageToByteEncoder<ByteBuf> {
*/
public LengthFieldPrepender(
int lengthFieldLength, boolean lengthIncludesLengthFieldLength) {
super(ByteBuf.class);
if (lengthFieldLength != 1 && lengthFieldLength != 2 &&
lengthFieldLength != 3 && lengthFieldLength != 4 &&
lengthFieldLength != 8) {
@ -95,11 +97,6 @@ public class LengthFieldPrepender extends MessageToByteEncoder<ByteBuf> {
this.lengthIncludesLengthFieldLength = lengthIncludesLengthFieldLength;
}
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof ByteBuf;
}
@Override
public void encode(
ChannelHandlerContext ctx,

View File

@ -23,6 +23,12 @@ import io.netty.channel.ChannelOutboundMessageHandlerAdapter;
public abstract class MessageToByteEncoder<I> extends ChannelOutboundMessageHandlerAdapter<I> {
private final Class<?>[] acceptedMsgTypes;
protected MessageToByteEncoder(Class<?>... acceptedMsgTypes) {
this.acceptedMsgTypes = CodecUtil.acceptedMessageTypes(acceptedMsgTypes);
}
@Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
MessageBuf<I> in = ctx.outboundMessageBuffer();
@ -61,7 +67,7 @@ public abstract class MessageToByteEncoder<I> extends ChannelOutboundMessageHand
* @param msg the message
*/
public boolean isEncodable(Object msg) throws Exception {
return true;
return CodecUtil.acceptMessage(acceptedMsgTypes, msg);
}
public abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;

View File

@ -53,6 +53,19 @@ public abstract class MessageToMessageCodec<INBOUND_IN, INBOUND_OUT, OUTBOUND_IN
}
};
private final Class<?>[] acceptedInboundMsgTypes;
private final Class<?>[] acceptedOutboundMsgTypes;
protected MessageToMessageCodec() {
this(null, null);
}
protected MessageToMessageCodec(
Class<?>[] acceptedInboundMsgTypes, Class<?>[] acceptedOutboundMsgTypes) {
this.acceptedInboundMsgTypes = CodecUtil.acceptedMessageTypes(acceptedInboundMsgTypes);
this.acceptedOutboundMsgTypes = CodecUtil.acceptedMessageTypes(acceptedOutboundMsgTypes);
}
@Override
public MessageBuf<INBOUND_IN> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return decoder.newInboundBuffer(ctx);
@ -80,7 +93,7 @@ public abstract class MessageToMessageCodec<INBOUND_IN, INBOUND_OUT, OUTBOUND_IN
* @param msg the message
*/
public boolean isDecodable(Object msg) throws Exception {
return true;
return CodecUtil.acceptMessage(acceptedInboundMsgTypes, msg);
}
/**
@ -89,7 +102,7 @@ public abstract class MessageToMessageCodec<INBOUND_IN, INBOUND_OUT, OUTBOUND_IN
* @param msg the message
*/
public boolean isEncodable(Object msg) throws Exception {
return true;
return CodecUtil.acceptMessage(acceptedOutboundMsgTypes, msg);
}
public abstract OUTBOUND_OUT encode(ChannelHandlerContext ctx, OUTBOUND_IN msg) throws Exception;

View File

@ -24,6 +24,12 @@ import io.netty.channel.ChannelInboundMessageHandler;
public abstract class MessageToMessageDecoder<I, O>
extends ChannelInboundHandlerAdapter implements ChannelInboundMessageHandler<I> {
private final Class<?>[] acceptedMsgTypes;
protected MessageToMessageDecoder(Class<?>... acceptedMsgTypes) {
this.acceptedMsgTypes = CodecUtil.acceptedMessageTypes(acceptedMsgTypes);
}
@Override
public MessageBuf<I> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return Unpooled.messageBuffer();
@ -77,7 +83,7 @@ public abstract class MessageToMessageDecoder<I, O>
* @param msg the message
*/
public boolean isDecodable(Object msg) throws Exception {
return true;
return CodecUtil.acceptMessage(acceptedMsgTypes, msg);
}
public abstract O decode(ChannelHandlerContext ctx, I msg) throws Exception;

View File

@ -22,6 +22,12 @@ import io.netty.channel.ChannelOutboundMessageHandlerAdapter;
public abstract class MessageToMessageEncoder<I, O> extends ChannelOutboundMessageHandlerAdapter<I> {
private final Class<?>[] acceptedMsgTypes;
protected MessageToMessageEncoder(Class<?>... acceptedMsgTypes) {
this.acceptedMsgTypes = CodecUtil.acceptedMessageTypes(acceptedMsgTypes);
}
@Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
MessageBuf<I> in = ctx.outboundMessageBuffer();
@ -65,7 +71,7 @@ public abstract class MessageToMessageEncoder<I, O> extends ChannelOutboundMessa
* @param msg the message
*/
public boolean isEncodable(Object msg) throws Exception {
return true;
return CodecUtil.acceptMessage(acceptedMsgTypes, msg);
}
public abstract O encode(ChannelHandlerContext ctx, I msg) throws Exception;

View File

@ -53,17 +53,14 @@ public class Base64Decoder extends MessageToMessageDecoder<ByteBuf, ByteBuf> {
}
public Base64Decoder(Base64Dialect dialect) {
super(ByteBuf.class);
if (dialect == null) {
throw new NullPointerException("dialect");
}
this.dialect = dialect;
}
@Override
public boolean isDecodable(Object msg) throws Exception {
return msg instanceof ByteBuf;
}
@Override
public ByteBuf decode(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
return Base64.decode(msg, msg.readerIndex(), msg.readableBytes(), dialect);

View File

@ -54,6 +54,8 @@ public class Base64Encoder extends MessageToMessageEncoder<ByteBuf, ByteBuf> {
}
public Base64Encoder(boolean breakLines, Base64Dialect dialect) {
super(ByteBuf.class);
if (dialect == null) {
throw new NullPointerException("dialect");
}
@ -62,11 +64,6 @@ public class Base64Encoder extends MessageToMessageEncoder<ByteBuf, ByteBuf> {
this.dialect = dialect;
}
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof ByteBuf;
}
@Override
public ByteBuf encode(ChannelHandlerContext ctx,
ByteBuf msg) throws Exception {

View File

@ -49,9 +49,8 @@ import io.netty.handler.codec.MessageToMessageDecoder;
*/
public class ByteArrayDecoder extends MessageToMessageDecoder<ByteBuf, byte[]> {
@Override
public boolean isDecodable(Object msg) throws Exception {
return msg instanceof ByteBuf;
public ByteArrayDecoder() {
super(ByteBuf.class);
}
@Override

View File

@ -51,14 +51,13 @@ import io.netty.handler.codec.MessageToMessageEncoder;
*/
public class ByteArrayEncoder extends MessageToMessageEncoder<byte[], ByteBuf> {
@Override
public MessageBuf<byte[]> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return Unpooled.messageBuffer();
public ByteArrayEncoder() {
super(byte[].class);
}
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof byte[];
public MessageBuf<byte[]> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return Unpooled.messageBuffer();
}
@Override

View File

@ -74,6 +74,8 @@ public class ProtobufDecoder extends MessageToMessageDecoder<ByteBuf, MessageLit
}
public ProtobufDecoder(MessageLite prototype, ExtensionRegistry extensionRegistry) {
super(ByteBuf.class);
if (prototype == null) {
throw new NullPointerException("prototype");
}
@ -81,11 +83,6 @@ public class ProtobufDecoder extends MessageToMessageDecoder<ByteBuf, MessageLit
this.extensionRegistry = extensionRegistry;
}
@Override
public boolean isDecodable(Object msg) throws Exception {
return msg instanceof ByteBuf;
}
@Override
public MessageLite decode(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
if (msg.hasArray()) {

View File

@ -59,9 +59,8 @@ import com.google.protobuf.MessageLite;
@Sharable
public class ProtobufEncoder extends MessageToMessageEncoder<Object, ByteBuf> {
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof MessageLite || msg instanceof MessageLite.Builder;
public ProtobufEncoder() {
super(MessageLite.class, MessageLite.Builder.class);
}
@Override

View File

@ -44,11 +44,7 @@ public class ProtobufVarint32LengthFieldPrepender extends MessageToByteEncoder<B
* Creates a new instance.
*/
public ProtobufVarint32LengthFieldPrepender() {
}
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof ByteBuf;
super(ByteBuf.class);
}
@Override

View File

@ -59,6 +59,8 @@ public class CompatibleObjectEncoder extends MessageToByteEncoder<Object> {
* the long term.
*/
public CompatibleObjectEncoder(int resetInterval) {
super(Serializable.class);
if (resetInterval < 0) {
throw new IllegalArgumentException(
"resetInterval: " + resetInterval);
@ -75,11 +77,6 @@ public class CompatibleObjectEncoder extends MessageToByteEncoder<Object> {
return new ObjectOutputStream(out);
}
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof Serializable;
}
@Override
public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
Attribute<ObjectOutputStream> oosAttr = ctx.attr(OOS);

View File

@ -39,9 +39,8 @@ import java.io.Serializable;
public class ObjectEncoder extends MessageToByteEncoder<Object> {
private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof Serializable;
public ObjectEncoder() {
super(Serializable.class);
}
@Override

View File

@ -68,17 +68,14 @@ public class StringDecoder extends MessageToMessageDecoder<ByteBuf, String> {
* Creates a new instance with the specified character set.
*/
public StringDecoder(Charset charset) {
super(ByteBuf.class);
if (charset == null) {
throw new NullPointerException("charset");
}
this.charset = charset;
}
@Override
public boolean isDecodable(Object msg) throws Exception {
return msg instanceof ByteBuf;
}
@Override
public String decode(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
return msg.toString(charset);

View File

@ -50,7 +50,7 @@ import java.nio.charset.Charset;
* @apiviz.landmark
*/
@Sharable
public class StringEncoder extends MessageToMessageEncoder<String, ByteBuf> {
public class StringEncoder extends MessageToMessageEncoder<CharSequence, ByteBuf> {
// TODO Use CharsetEncoder instead.
private final Charset charset;
@ -66,6 +66,8 @@ public class StringEncoder extends MessageToMessageEncoder<String, ByteBuf> {
* Creates a new instance with the specified character set.
*/
public StringEncoder(Charset charset) {
super(CharSequence.class);
if (charset == null) {
throw new NullPointerException("charset");
}
@ -73,12 +75,7 @@ public class StringEncoder extends MessageToMessageEncoder<String, ByteBuf> {
}
@Override
public boolean isEncodable(Object msg) throws Exception {
return msg instanceof String;
}
@Override
public ByteBuf encode(ChannelHandlerContext ctx, String msg) throws Exception {
public ByteBuf encode(ChannelHandlerContext ctx, CharSequence msg) throws Exception {
return Unpooled.copiedBuffer(msg, charset);
}
}

View File

@ -20,7 +20,7 @@
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.0.0.Alpha5-SNAPSHOT</version>
<version>4.0.0.Alpha6-SNAPSHOT</version>
</parent>
<artifactId>netty-common</artifactId>

View File

@ -20,7 +20,7 @@
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.0.0.Alpha5-SNAPSHOT</version>
<version>4.0.0.Alpha6-SNAPSHOT</version>
</parent>
<artifactId>netty-example</artifactId>

View File

@ -28,6 +28,10 @@ import java.math.BigInteger;
*/
public class NumberEncoder extends MessageToByteEncoder<Number> {
public NumberEncoder() {
super(Number.class);
}
@Override
public void encode(
ChannelHandlerContext ctx, Number msg, ByteBuf out) throws Exception {

View File

@ -25,7 +25,6 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
@ -104,12 +103,18 @@ public class HttpStaticFileServerHandler extends ChannelInboundMessageHandlerAda
public void messageReceived(
ChannelHandlerContext ctx, HttpRequest request) throws Exception {
if (!request.getDecoderResult().isSuccess()) {
sendError(ctx, BAD_REQUEST);
return;
}
if (request.getMethod() != GET) {
sendError(ctx, METHOD_NOT_ALLOWED);
return;
}
final String path = sanitizeUri(request.getUri());
final String uri = request.getUri();
final String path = sanitizeUri(uri);
if (path == null) {
sendError(ctx, FORBIDDEN);
return;
@ -120,6 +125,16 @@ public class HttpStaticFileServerHandler extends ChannelInboundMessageHandlerAda
sendError(ctx, NOT_FOUND);
return;
}
if (file.isDirectory()) {
if (uri.endsWith("/")) {
sendListing(ctx, file);
} else {
sendRedirect(ctx, uri + '/');
}
return;
}
if (!file.isFile()) {
sendError(ctx, FORBIDDEN);
return;
@ -172,13 +187,7 @@ public class HttpStaticFileServerHandler extends ChannelInboundMessageHandlerAda
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof TooLongFrameException) {
sendError(ctx, BAD_REQUEST);
return;
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
if (ctx.channel().isActive()) {
sendError(ctx, INTERNAL_SERVER_ERROR);
@ -197,6 +206,10 @@ public class HttpStaticFileServerHandler extends ChannelInboundMessageHandlerAda
}
}
if (!uri.startsWith("/")) {
return null;
}
// Convert file separators.
uri = uri.replace('/', File.separatorChar);
@ -212,6 +225,55 @@ public class HttpStaticFileServerHandler extends ChannelInboundMessageHandlerAda
return System.getProperty("user.dir") + File.separator + uri;
}
private static void sendListing(ChannelHandlerContext ctx, File dir) {
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
response.setHeader(CONTENT_TYPE, "text/html; charset=UTF-8");
StringBuilder buf = new StringBuilder();
buf.append("<!DOCTYPE html>\r\n");
buf.append("<html><head><title>");
buf.append("Listing of: ");
buf.append(dir.getPath());
buf.append("</title></head><body>\r\n");
buf.append("<h3>Listing of: ");
buf.append(dir.getPath());
buf.append("</h3>\r\n");
buf.append("<ul>");
buf.append("<li><a href=\"../\">..</a></li>\r\n");
for (File f: dir.listFiles()) {
if (f.isHidden() || !f.canRead()) {
continue;
}
String name = f.getName();
buf.append("<li><a href=\"");
buf.append(name);
buf.append("\">");
buf.append(name);
buf.append("</a></li>\r\n");
}
buf.append("</ul></body></html>\r\n");
response.setContent(Unpooled.copiedBuffer(buf, CharsetUtil.UTF_8));
// Close the connection as soon as the error message is sent.
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
}
private static void sendRedirect(ChannelHandlerContext ctx, String newUri) {
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, FOUND);
response.setHeader(LOCATION, newUri);
// Close the connection as soon as the error message is sent.
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
}
private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");

View File

@ -25,12 +25,14 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.http.Cookie;
import io.netty.handler.codec.http.CookieDecoder;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpChunk;
import io.netty.handler.codec.http.HttpChunkTrailer;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.QueryStringDecoder;
@ -62,14 +64,19 @@ public class HttpSnoopServerHandler extends ChannelInboundMessageHandlerAdapter<
buf.append("WELCOME TO THE WILD WILD WEB SERVER\r\n");
buf.append("===================================\r\n");
buf.append("VERSION: " + request.getProtocolVersion() + "\r\n");
buf.append("HOSTNAME: " + getHost(request, "unknown") + "\r\n");
buf.append("REQUEST_URI: " + request.getUri() + "\r\n\r\n");
buf.append("VERSION: ").append(request.getProtocolVersion()).append("\r\n");
buf.append("HOSTNAME: ").append(getHost(request, "unknown")).append("\r\n");
buf.append("REQUEST_URI: ").append(request.getUri()).append("\r\n\r\n");
for (Map.Entry<String, String> h: request.getHeaders()) {
buf.append("HEADER: " + h.getKey() + " = " + h.getValue() + "\r\n");
List<Map.Entry<String, String>> headers = request.getHeaders();
if (!headers.isEmpty()) {
for (Map.Entry<String, String> h: request.getHeaders()) {
String key = h.getKey();
String value = h.getValue();
buf.append("HEADER: ").append(key).append(" = ").append(value).append("\r\n");
}
buf.append("\r\n");
}
buf.append("\r\n");
QueryStringDecoder queryStringDecoder = new QueryStringDecoder(request.getUri());
Map<String, List<String>> params = queryStringDecoder.getParameters();
@ -78,7 +85,7 @@ public class HttpSnoopServerHandler extends ChannelInboundMessageHandlerAdapter<
String key = p.getKey();
List<String> vals = p.getValue();
for (String val : vals) {
buf.append("PARAM: " + key + " = " + val + "\r\n");
buf.append("PARAM: ").append(key).append(" = ").append(val).append("\r\n");
}
}
buf.append("\r\n");
@ -89,9 +96,12 @@ public class HttpSnoopServerHandler extends ChannelInboundMessageHandlerAdapter<
} else {
ByteBuf content = request.getContent();
if (content.readable()) {
buf.append("CONTENT: " + content.toString(CharsetUtil.UTF_8) + "\r\n");
buf.append("CONTENT: ");
buf.append(content.toString(CharsetUtil.UTF_8));
buf.append("\r\n");
}
writeResponse(ctx);
appendDecoderResult(buf, request);
writeResponse(ctx, request);
}
} else {
HttpChunk chunk = (HttpChunk) msg;
@ -104,25 +114,46 @@ public class HttpSnoopServerHandler extends ChannelInboundMessageHandlerAdapter<
buf.append("\r\n");
for (String name: trailer.getHeaderNames()) {
for (String value: trailer.getHeaders(name)) {
buf.append("TRAILING HEADER: " + name + " = " + value + "\r\n");
buf.append("TRAILING HEADER: ");
buf.append(name).append(" = ").append(value).append("\r\n");
}
}
buf.append("\r\n");
}
writeResponse(ctx);
appendDecoderResult(buf, chunk);
writeResponse(ctx, chunk);
} else {
buf.append("CHUNK: " + chunk.getContent().toString(CharsetUtil.UTF_8) + "\r\n");
buf.append("CHUNK: ");
buf.append(chunk.getContent().toString(CharsetUtil.UTF_8)).append("\r\n");
appendDecoderResult(buf, chunk);
}
}
}
private void writeResponse(ChannelHandlerContext ctx) {
private static void appendDecoderResult(StringBuilder buf, HttpObject o) {
DecoderResult result = o.getDecoderResult();
if (result.isSuccess()) {
return;
}
buf.append(".. WITH A ");
if (result.isPartialFailure()) {
buf.append("PARTIAL ");
}
buf.append("DECODER FAILURE: ");
buf.append(result.cause());
buf.append("\r\n");
}
private void writeResponse(ChannelHandlerContext ctx, HttpObject currentObj) {
// Decide whether to close the connection or not.
boolean keepAlive = isKeepAlive(request);
// Build the response object.
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
HttpResponse response = new DefaultHttpResponse(
HTTP_1_1, currentObj.getDecoderResult().isSuccess()? OK : BAD_REQUEST);
response.setContent(Unpooled.copiedBuffer(buf.toString(), CharsetUtil.UTF_8));
response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");

View File

@ -59,6 +59,12 @@ public class AutobahnServerHandler extends ChannelInboundMessageHandlerAdapter<O
}
private void handleHttpRequest(ChannelHandlerContext ctx, HttpRequest req) throws Exception {
// Handle a bad request.
if (!req.getDecoderResult().isSuccess()) {
sendHttpResponse(ctx, req, new DefaultHttpResponse(HTTP_1_1, BAD_REQUEST));
return;
}
// Allow only GET methods.
if (req.getMethod() != GET) {
sendHttpResponse(ctx, req, new DefaultHttpResponse(HTTP_1_1, FORBIDDEN));

View File

@ -61,6 +61,12 @@ public class WebSocketServerHandler extends ChannelInboundMessageHandlerAdapter<
}
private void handleHttpRequest(ChannelHandlerContext ctx, HttpRequest req) throws Exception {
// Handle a bad request.
if (!req.getDecoderResult().isSuccess()) {
sendHttpResponse(ctx, req, new DefaultHttpResponse(HTTP_1_1, BAD_REQUEST));
return;
}
// Allow only GET methods.
if (req.getMethod() != GET) {
sendHttpResponse(ctx, req, new DefaultHttpResponse(HTTP_1_1, FORBIDDEN));

View File

@ -62,6 +62,12 @@ public class WebSocketSslServerHandler extends ChannelInboundMessageHandlerAdapt
}
private void handleHttpRequest(ChannelHandlerContext ctx, HttpRequest req) throws Exception {
// Handle a bad request.
if (!req.getDecoderResult().isSuccess()) {
sendHttpResponse(ctx, req, new DefaultHttpResponse(HTTP_1_1, BAD_REQUEST));
return;
}
// Allow only GET methods.
if (req.getMethod() != GET) {
sendHttpResponse(ctx, req, new DefaultHttpResponse(HTTP_1_1, FORBIDDEN));

View File

@ -20,7 +20,7 @@
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.0.0.Alpha5-SNAPSHOT</version>
<version>4.0.0.Alpha6-SNAPSHOT</version>
</parent>
<artifactId>netty-handler</artifactId>

View File

@ -170,6 +170,9 @@ public class SslHandler
private final Queue<ChannelFuture> handshakeFutures = new ArrayDeque<ChannelFuture>();
private final SSLEngineInboundCloseFuture sslCloseFuture = new SSLEngineInboundCloseFuture();
private volatile long handshakeTimeoutMillis = 10000;
private volatile long closeNotifyTimeoutMillis = 3000;
/**
* Creates a new instance.
*
@ -227,6 +230,46 @@ public class SslHandler
this.startTls = startTls;
}
public long getHandshakeTimeoutMillis() {
return handshakeTimeoutMillis;
}
public void setHandshakeTimeout(long handshakeTimeout, TimeUnit unit) {
if (unit == null) {
throw new NullPointerException("unit");
}
setHandshakeTimeoutMillis(unit.toMillis(handshakeTimeout));
}
public void setHandshakeTimeoutMillis(long handshakeTimeoutMillis) {
if (handshakeTimeoutMillis < 0) {
throw new IllegalArgumentException(
"handshakeTimeoutMillis: " + handshakeTimeoutMillis + " (expected: >= 0)");
}
this.handshakeTimeoutMillis = handshakeTimeoutMillis;
}
public long getCloseNotifyTimeoutMillis() {
return handshakeTimeoutMillis;
}
public void setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit) {
if (unit == null) {
throw new NullPointerException("unit");
}
setCloseNotifyTimeoutMillis(unit.toMillis(closeNotifyTimeout));
}
public void setCloseNotifyTimeoutMillis(long closeNotifyTimeoutMillis) {
if (closeNotifyTimeoutMillis < 0) {
throw new IllegalArgumentException(
"closeNotifyTimeoutMillis: " + closeNotifyTimeoutMillis + " (expected: >= 0)");
}
this.closeNotifyTimeoutMillis = closeNotifyTimeoutMillis;
}
/**
* Returns the {@link SSLEngine} which is used by this handler.
*/
@ -247,23 +290,32 @@ public class SslHandler
public ChannelFuture handshake(final ChannelFuture future) {
final ChannelHandlerContext ctx = this.ctx;
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
if (future.isDone()) {
return;
}
final ScheduledFuture<?> timeoutFuture;
if (handshakeTimeoutMillis > 0) {
timeoutFuture = ctx.executor().schedule(new Runnable() {
@Override
public void run() {
if (future.isDone()) {
return;
}
SSLException e = new SSLException("handshake timed out");
future.setFailure(e);
ctx.fireExceptionCaught(e);
ctx.close();
}
}, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
} else {
timeoutFuture = null;
}
SSLException e = new SSLException("handshake timed out");
future.setFailure(e);
ctx.fireExceptionCaught(e);
ctx.close();
}
}, 10, TimeUnit.SECONDS); // FIXME: Magic value
ctx.executor().execute(new Runnable() {
@Override
public void run() {
try {
if (timeoutFuture != null) {
timeoutFuture.cancel(false);
}
engine.beginHandshake();
handshakeFutures.add(future);
flush(ctx, ctx.newFuture());
@ -861,7 +913,7 @@ public class SslHandler
}
}
private static void safeClose(
private void safeClose(
final ChannelHandlerContext ctx, ChannelFuture flushFuture,
final ChannelFuture closeFuture) {
if (!ctx.channel().isActive()) {
@ -869,23 +921,31 @@ public class SslHandler
return;
}
// Force-close the connection if close_notify is not fully sent in time.
final ScheduledFuture<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
@Override
public void run() {
logger.warn(
ctx.channel() + " last lssssswrite attempt timed out." +
" Force-closing the connection.");
ctx.close(closeFuture);
}
}, 3, TimeUnit.SECONDS); // FIXME: Magic value
final ScheduledFuture<?> timeoutFuture;
if (closeNotifyTimeoutMillis > 0) {
// Force-close the connection if close_notify is not fully sent in time.
timeoutFuture = ctx.executor().schedule(new Runnable() {
@Override
public void run() {
logger.warn(
ctx.channel() + " last lssssswrite attempt timed out." +
" Force-closing the connection.");
ctx.close(closeFuture);
}
}, closeNotifyTimeoutMillis, TimeUnit.MILLISECONDS);
} else {
timeoutFuture = null;
}
// Close the connection if close_notify is sent in time.
flushFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f)
throws Exception {
timeoutFuture.cancel(false);
if (timeoutFuture != null) {
timeoutFuture.cancel(false);
}
if (ctx.channel().isActive()) {
ctx.close(closeFuture);
}

38
pom.xml
View File

@ -26,7 +26,7 @@
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<packaging>pom</packaging>
<version>4.0.0.Alpha5-SNAPSHOT</version>
<version>4.0.0.Alpha6-SNAPSHOT</version>
<name>Netty</name>
<url>http://netty.io/</url>
@ -318,7 +318,7 @@
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-build</artifactId>
<version>9</version>
<version>10</version>
</dependency>
</dependencies>
</plugin>
@ -378,10 +378,40 @@
</plugin>
</plugins>
<!-- Workaround for the 'M2E plugin execution not covered' problem.
See: http://wiki.eclipse.org/M2E_plugin_execution_not_covered -->
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>2.5</version>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>2.5</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.4</version>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.3</version>
</plugin>
<plugin>
<artifactId>maven-jxr-plugin</artifactId>
<version>2.2</version>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
</plugin>
<!-- Workaround for the 'M2E plugin execution not covered' problem.
See: http://wiki.eclipse.org/M2E_plugin_execution_not_covered -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>

View File

@ -20,7 +20,7 @@
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.0.0.Alpha5-SNAPSHOT</version>
<version>4.0.0.Alpha6-SNAPSHOT</version>
</parent>
<artifactId>netty-tarball</artifactId>
@ -51,7 +51,6 @@
<plugins>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>2.5</version>
<executions>
<execution>
<id>clean-first</id>
@ -87,7 +86,6 @@
<!-- Do not deploy this module -->
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.7</version>
<configuration>
<skip>true</skip>
</configuration>
@ -102,7 +100,6 @@
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<id>copy-jars</id>
@ -158,7 +155,6 @@
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<id>build-tarball</id>
@ -177,11 +173,9 @@
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -20,7 +20,7 @@
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.0.0.Alpha5-SNAPSHOT</version>
<version>4.0.0.Alpha6-SNAPSHOT</version>
</parent>
<artifactId>netty-testsuite</artifactId>
@ -45,7 +45,6 @@
<plugins>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.7</version>
<configuration>
<skip>true</skip>
</configuration>

View File

@ -72,6 +72,8 @@ public class ServerSocketSuspendTest extends AbstractServerSocketTest {
}
}
Thread.sleep(TIMEOUT / 1000000);
try {
long startTime = System.nanoTime();
for (int i = 0; i < NUM_CHANNELS; i ++) {

View File

@ -15,13 +15,38 @@
*/
package io.netty.testsuite.util;
import io.netty.util.NetworkConstants;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
public class TestUtils {
private static int START_PORT = 20000;
private static int END_PORT = 30000;
private static final int START_PORT = 32768;
private static final int END_PORT = 65536;
private static final int NUM_CANDIDATES = END_PORT - START_PORT;
private static final List<Integer> PORTS = new ArrayList<Integer>();
private static Iterator<Integer> PORTS_ITERATOR;
static {
for (int i = START_PORT; i < END_PORT; i ++) {
PORTS.add(i);
}
Collections.shuffle(PORTS);
}
private static int nextCandidatePort() {
if (PORTS_ITERATOR == null || !PORTS_ITERATOR.hasNext()) {
PORTS_ITERATOR = PORTS.iterator();
}
return PORTS_ITERATOR.next();
}
/**
* Return a free port which can be used to bind to
@ -29,19 +54,28 @@ public class TestUtils {
* @return port
*/
public static int getFreePort() {
for(int start = START_PORT; start <= END_PORT; start++) {
for (int i = 0; i < NUM_CANDIDATES; i ++) {
int port = nextCandidatePort();
try {
ServerSocket socket = new ServerSocket(start);
socket.setReuseAddress(true);
socket.close();
START_PORT = start + 1;
return start;
// Ensure it is possible to bind on both wildcard and loopback.
ServerSocket ss;
ss = new ServerSocket();
ss.setReuseAddress(false);
ss.bind(new InetSocketAddress(port));
ss.close();
ss = new ServerSocket();
ss.setReuseAddress(false);
ss.bind(new InetSocketAddress(NetworkConstants.LOCALHOST, port));
ss.close();
return port;
} catch (IOException e) {
// ignore
}
}
throw new RuntimeException("Unable to find a free port....");
throw new RuntimeException("unable to find a free port");
}
private TestUtils() { }

View File

@ -20,7 +20,7 @@
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.0.0.Alpha5-SNAPSHOT</version>
<version>4.0.0.Alpha6-SNAPSHOT</version>
</parent>
<artifactId>netty-transport</artifactId>

View File

@ -30,13 +30,13 @@ public abstract class SctpServerChannel extends AbstractSelectableChannel {
public static SctpServerChannel open() throws IOException {
return null;
}
protected SctpServerChannel(SelectorProvider provider) {
super(provider);
}
public abstract <T> T getOption(SctpSocketOption<T> name) throws IOException;
public abstract <T> SctpChannel setOption(SctpSocketOption<T> name, T value) throws IOException;
public abstract <T> SctpServerChannel setOption(SctpSocketOption<T> name, T value) throws IOException;
public abstract Set<SocketAddress> getAllLocalAddresses() throws IOException;

View File

@ -17,17 +17,16 @@ package io.netty.bootstrap;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInboundMessageHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInboundMessageHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.SocketChannel;
@ -232,6 +231,7 @@ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap> {
try {
childGroup.register(child);
} catch (Throwable t) {
child.unsafe().closeForcibly();
logger.warn("Failed to register an accepted channel: " + child, t);
}
}

View File

@ -547,6 +547,15 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
@Override
public final void closeForcibly() {
try {
doClose();
} catch (Exception e) {
logger.warn("Failed to close a channel.", e);
}
}
@Override
public final void deregister(final ChannelFuture future) {
if (eventLoop().inEventLoop()) {

View File

@ -248,6 +248,12 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFu
*/
void close(ChannelFuture future);
/**
* Closes the {@link Channel} immediately without firing any events. Probably only useful
* when registration attempt failed.
*/
void closeForcibly();
/**
* Deregister the {@link Channel} of the {@link ChannelFuture} from {@link EventLoop} and notify the
* {@link ChannelFuture} once the operation was complete.

View File

@ -26,6 +26,7 @@ public abstract class ChannelInitializer<C extends Channel> extends ChannelState
public abstract void initChannel(C ch) throws Exception;
@SuppressWarnings("unchecked")
@Override
public final void channelRegistered(ChannelHandlerContext ctx)
throws Exception {

View File

@ -48,6 +48,7 @@ public class DefaultChannelConfig implements ChannelConfig {
return result;
}
@SuppressWarnings("unchecked")
@Override
public boolean setOptions(Map<ChannelOption<?>, ?> options) {
if (options == null) {
@ -64,6 +65,7 @@ public class DefaultChannelConfig implements ChannelConfig {
return setAllOptions;
}
@SuppressWarnings("unchecked")
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == null) {

View File

@ -388,6 +388,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
return remove(getContextOrDie(name)).handler();
}
@SuppressWarnings("unchecked")
@Override
public <T extends ChannelHandler> T remove(Class<T> handlerType) {
return (T) remove(getContextOrDie(handlerType)).handler();
@ -784,6 +785,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
}
@SuppressWarnings("unchecked")
@Override
public <T extends ChannelHandler> T get(Class<T> handlerType) {
ChannelHandlerContext ctx = context(handlerType);

View File

@ -23,6 +23,7 @@ import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.channel.SingleThreadEventExecutor;
@ -33,6 +34,7 @@ import java.nio.channels.AlreadyConnectedException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ConnectionPendingException;
import java.nio.channels.NotYetConnectedException;
import java.util.Collections;
/**
* A {@link Channel} for the local transport.
@ -215,16 +217,24 @@ public class LocalChannel extends AbstractChannel {
}
final LocalChannel peer = this.peer;
assert peer != null;
final ChannelPipeline peerPipeline = peer.pipeline();
final EventLoop peerLoop = peer.eventLoop();
buf.drainTo(peer.pipeline().inboundMessageBuffer());
peer.eventLoop().execute(new Runnable() {
@Override
public void run() {
peer.pipeline().fireInboundBufferUpdated();
}
});
if (peerLoop == eventLoop()) {
buf.drainTo(peerPipeline.inboundMessageBuffer());
peerPipeline.fireInboundBufferUpdated();
} else {
final Object[] msgs = buf.toArray();
buf.clear();
peerLoop.execute(new Runnable() {
@Override
public void run() {
MessageBuf<Object> buf = peerPipeline.inboundMessageBuffer();
Collections.addAll(buf, msgs);
peerPipeline.fireInboundBufferUpdated();
}
});
}
}
@Override

View File

@ -61,6 +61,7 @@ public class DefaultDatagramChannelConfig extends DefaultChannelConfig implement
IP_MULTICAST_ADDR, IP_MULTICAST_IF, IP_MULTICAST_TTL, IP_TOS, UDP_RECEIVE_PACKET_SIZE);
}
@SuppressWarnings("unchecked")
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == SO_BROADCAST) {
@ -82,12 +83,10 @@ public class DefaultDatagramChannelConfig extends DefaultChannelConfig implement
return (T) Boolean.valueOf(isLoopbackModeDisabled());
}
if (option == IP_MULTICAST_ADDR) {
@SuppressWarnings("unchecked")
T i = (T) getInterface();
return i;
}
if (option == IP_MULTICAST_IF) {
@SuppressWarnings("unchecked")
T i = (T) getNetworkInterface();
return i;
}

View File

@ -48,6 +48,7 @@ public class DefaultSctpChannelConfig extends DefaultChannelConfig implements Sc
}
@SuppressWarnings("unchecked")
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == SO_RCVBUF) {

View File

@ -28,7 +28,6 @@ import java.util.Map;
import static com.sun.nio.sctp.SctpStandardSocketOptions.SCTP_INIT_MAXSTREAMS;
import static com.sun.nio.sctp.SctpStandardSocketOptions.SO_RCVBUF;
import static com.sun.nio.sctp.SctpStandardSocketOptions.SO_SNDBUF;
import static io.netty.channel.ChannelOption.SCTP_NODELAY;
/**
* The default {@link SctpServerChannelConfig} implementation for SCTP.
@ -56,6 +55,7 @@ public class DefaultSctpServerChannelConfig extends DefaultChannelConfig impleme
}
@SuppressWarnings("unchecked")
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == ChannelOption.SO_RCVBUF) {

View File

@ -49,6 +49,7 @@ public class DefaultServerSocketChannelConfig extends DefaultChannelConfig
return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG);
}
@SuppressWarnings("unchecked")
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == SO_RCVBUF) {

View File

@ -51,6 +51,7 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig
ALLOW_HALF_CLOSURE);
}
@SuppressWarnings("unchecked")
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == SO_RCVBUF) {

View File

@ -27,10 +27,17 @@ public final class SctpNotificationEvent {
this.attachment = attachment;
}
/**
* Return the {@link Notification}
*/
public Notification notification() {
return notification;
}
/**
* Return the attachment of this {@link SctpNotificationEvent}, or
* <code>null</code> if no attachment was provided
*/
public Object attachment() {
return attachment;
}

View File

@ -68,7 +68,7 @@ abstract class AbstractAioChannel extends AbstractChannel {
@Override
protected Runnable doRegister() throws Exception {
if (((AioEventLoop) eventLoop()).parent() != group) {
if (eventLoop().parent() != group) {
throw new ChannelException(
getClass().getSimpleName() + " must be registered to the " +
AioEventLoopGroup.class.getSimpleName() + " which was specified in the constructor.");

View File

@ -45,6 +45,7 @@ final class AioServerSocketChannelConfig extends DefaultChannelConfig
return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG);
}
@SuppressWarnings("unchecked")
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == SO_RCVBUF) {

View File

@ -57,6 +57,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
AIO_READ_TIMEOUT, AIO_WRITE_TIMEOUT, ALLOW_HALF_CLOSURE);
}
@SuppressWarnings("unchecked")
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == SO_RCVBUF) {

View File

@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit;
final class SelectorUtil {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SelectorUtil.class);
static final long DEFAULT_SELECT_TIMEOUT = 10;
static final long DEFAULT_SELECT_TIMEOUT = 500;
static final long SELECT_TIMEOUT =
SystemPropertyUtil.getLong("io.netty.selectTimeout", DEFAULT_SELECT_TIMEOUT);
static final long SELECT_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(SELECT_TIMEOUT);

View File

@ -19,9 +19,9 @@ package io.netty.channel.socket.oio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelTaskScheduler;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ChannelTaskScheduler;
import java.util.Collections;
import java.util.Queue;

View File

@ -52,7 +52,7 @@ public class OioSctpChannel extends AbstractOioMessageChannel
private final SctpChannel ch;
private final SctpChannelConfig config;
private final NotificationHandler notificationHandler;
private final NotificationHandler<?> notificationHandler;
private static SctpChannel openChannel() {
try {

View File

@ -21,9 +21,7 @@ import io.netty.buffer.ChannelBufType;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.socket.DefaultSctpChannelConfig;
import io.netty.channel.socket.DefaultSctpServerChannelConfig;
import io.netty.channel.socket.SctpNotificationHandler;
import io.netty.channel.socket.SctpServerChannelConfig;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;

View File

@ -0,0 +1,153 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.local;
import static org.junit.Assert.*;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.MessageBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
public class LocalTransportThreadModelTest2 {
private static final String LOCAL_CHANNEL = LocalTransportThreadModelTest2.class.getName();
static final int messageCountPerRun = 4;
@Test(timeout = 15000)
public void testSocketReuse() throws InterruptedException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
LocalHander serverHandler = new LocalHander("SERVER");
serverBootstrap
.group(new LocalEventLoopGroup(), new LocalEventLoopGroup())
.channel(LocalServerChannel.class)
.localAddress(new LocalAddress(LOCAL_CHANNEL))
.childHandler(serverHandler);
Bootstrap clientBootstrap = new Bootstrap();
LocalHander clientHandler = new LocalHander("CLIENT");
clientBootstrap
.group(new LocalEventLoopGroup())
.channel(LocalChannel.class)
.remoteAddress(new LocalAddress(LOCAL_CHANNEL)).handler(clientHandler);
serverBootstrap.bind().sync();
int count = 100;
for (int i = 1; i < count + 1; i ++) {
Channel ch = clientBootstrap.connect().sync().channel();
// SPIN until we get what we are looking for.
int target = i * messageCountPerRun;
while (serverHandler.count.get() != target || clientHandler.count.get() != target) {
Thread.sleep(50);
}
close(ch, clientHandler);
}
assertEquals(count * 2 * messageCountPerRun, serverHandler.count.get() +
clientHandler.count.get());
}
public void close(final Channel localChannel, final LocalHander localRegistrationHandler) {
// we want to make sure we actually shutdown IN the event loop
if (localChannel.eventLoop().inEventLoop()) {
MessageBuf<Object> outboundMessageBuffer =
localChannel.pipeline().outboundMessageBuffer();
if (!outboundMessageBuffer.isEmpty()) {
System.err.println("NOT EMPTY TO SEND!");
}
// Wait until all messages are flushed before closing the channel.
if (localRegistrationHandler.lastWriteFuture != null) {
localRegistrationHandler.lastWriteFuture.awaitUninterruptibly();
}
MessageBuf<Object> inboundMessageBuffer =
localChannel.pipeline().inboundMessageBuffer();
if (!inboundMessageBuffer.isEmpty()) {
// sometimes we close the pipeline before everything on it has been notified/received.
// we want these messages, since they are in our queue.
Iterator<Object> iterator = inboundMessageBuffer.iterator();
while (iterator.hasNext()) {
Object next = iterator.next();
System.err.println("DEFERRED on close: " + next);
iterator.remove();
}
}
localChannel.close();
return;
} else {
localChannel.eventLoop().execute(new Runnable() {
@Override
public void run() {
close(localChannel, localRegistrationHandler);
}
});
}
// Wait until the connection is closed or the connection attempt fails.
localChannel.closeFuture().awaitUninterruptibly();
MessageBuf<Object> inboundMessageBuffer = localChannel.pipeline().inboundMessageBuffer();
if (!inboundMessageBuffer.isEmpty()) {
// sometimes we close the pipeline before everything on it has been notified/received.
// we want these messages, since they are in our queue.
Iterator<Object> iterator = inboundMessageBuffer.iterator();
while (iterator.hasNext()) {
Object next = iterator.next();
System.err.println("DEFERRED on close: " + next);
iterator.remove();
}
}
}
@Sharable
class LocalHander extends ChannelInboundMessageHandlerAdapter<Object> {
private final String name;
public volatile ChannelFuture lastWriteFuture = null;
public AtomicInteger count = new AtomicInteger(0);
public LocalHander(String name) {
this.name = name;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < messageCountPerRun; i ++) {
lastWriteFuture = ctx.channel().write(name + " " + i);
}
}
@Override
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
count.incrementAndGet();
}
}
}