diff --git a/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java b/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java index d046f06dca..3ed578aab3 100644 --- a/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java @@ -180,27 +180,25 @@ public abstract class AbstractByteBuf implements ByteBuf { return; } - if (minWritableBytes > maxCapacity - writerIndex) { + if (minWritableBytes < 0) { throw new IllegalArgumentException(String.format( - "minWritableBytes(%d) + writerIndex(%d) > maxCapacity(%d)", - minWritableBytes, writerIndex, maxCapacity)); + "minWritableBytes: %d (expected: 0+)", minWritableBytes)); } - int minNewCapacity = writerIndex + minWritableBytes; - - if (minNewCapacity > maxCapacity) { + if (minWritableBytes > maxCapacity - writerIndex) { throw new IllegalArgumentException(String.format( "minWritableBytes: %d (exceeds maxCapacity(%d))", minWritableBytes, maxCapacity)); } // Normalize the current capacity to the power of 2. - int newCapacity = calculateNewCapacity(minNewCapacity); + int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes); // Adjust to the new capacity. capacity(newCapacity); } private int calculateNewCapacity(int minNewCapacity) { + final int maxCapacity = this.maxCapacity; final int threshold = 1048576 * 4; // 4 MiB page if (minNewCapacity == threshold) { @@ -223,7 +221,8 @@ public abstract class AbstractByteBuf implements ByteBuf { while (newCapacity < minNewCapacity) { newCapacity <<= 1; } - return newCapacity; + + return Math.min(newCapacity, maxCapacity); } @Override @@ -726,6 +725,11 @@ public abstract class AbstractByteBuf implements ByteBuf { return nioBuffer(readerIndex, readableBytes()); } + @Override + public ByteBuffer[] nioBuffers() { + return nioBuffers(readerIndex, readableBytes()); + } + @Override public String toString(Charset charset) { return toString(readerIndex, readableBytes(), charset); diff --git a/buffer/src/main/java/io/netty/buffer/ByteBuf.java b/buffer/src/main/java/io/netty/buffer/ByteBuf.java index 76a68e881e..09d19c4a59 100644 --- a/buffer/src/main/java/io/netty/buffer/ByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/ByteBuf.java @@ -1701,6 +1701,39 @@ public interface ByteBuf extends ChannelBuf, Comparable { */ ByteBuffer nioBuffer(int index, int length); + /** + * Returns {@code true} if and only if {@link #nioBuffers()} method will not fail. + */ + boolean hasNioBuffers(); + + /** + * Exposes this buffer's readable bytes as an NIO {@link ByteBuffer}'s. The returned buffer + * shares the content with this buffer, while changing the position and limit of the returned + * NIO buffer does not affect the indexes and marks of this buffer. This method does not + * modify {@code readerIndex} or {@code writerIndex} of this buffer. Please note that the + * returned NIO buffer will not see the changes of this buffer if this buffer is a dynamic + * buffer and it adjusted its capacity. + * + * + * @throws UnsupportedOperationException + * if this buffer cannot create a {@link ByteBuffer} that shares the content with itself + */ + ByteBuffer[] nioBuffers(); + + /** + * Exposes this buffer's bytes as an NIO {@link ByteBuffer}'s for the specified offset and length + * The returned buffer shares the content with this buffer, while changing the position and limit + * of the returned NIO buffer does not affect the indexes and marks of this buffer. This method does + * not modify {@code readerIndex} or {@code writerIndex} of this buffer. Please note that the + * returned NIO buffer will not see the changes of this buffer if this buffer is a dynamic + * buffer and it adjusted its capacity. + * + * + * @throws UnsupportedOperationException + * if this buffer cannot create a {@link ByteBuffer} that shares the content with itself + */ + ByteBuffer[] nioBuffers(int offset, int length); + /** * Returns {@code true} if and only if this buffer has a backing byte array. * If this method returns true, you can safely call {@link #array()} and @@ -1801,6 +1834,13 @@ public interface ByteBuf extends ChannelBuf, Comparable { */ ByteBuffer nioBuffer(); + /** + * Returns the internal NIO buffer array that is reused for I/O. + * + * @throws UnsupportedOperationException if the buffer has no internal NIO buffer array + */ + ByteBuffer[] nioBuffers(); + /** * Returns a new buffer whose type is identical to the callee. * diff --git a/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java b/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java index e9f44435e2..6c5a184c7b 100644 --- a/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java @@ -15,7 +15,6 @@ */ package io.netty.buffer; -import java.nio.ByteBuffer; import java.util.List; public interface CompositeByteBuf extends ByteBuf, Iterable { @@ -48,32 +47,4 @@ public interface CompositeByteBuf extends ByteBuf, Iterable { * Same with {@link #slice(int, int)} except that this method returns a list. */ List decompose(int offset, int length); - - /** - * Exposes this buffer's readable bytes as an NIO {@link ByteBuffer}'s. The returned buffer - * shares the content with this buffer, while changing the position and limit of the returned - * NIO buffer does not affect the indexes and marks of this buffer. This method does not - * modify {@code readerIndex} or {@code writerIndex} of this buffer. Please note that the - * returned NIO buffer will not see the changes of this buffer if this buffer is a dynamic - * buffer and it adjusted its capacity. - * - * - * @throws UnsupportedOperationException - * if this buffer cannot create a {@link ByteBuffer} that shares the content with itself - */ - ByteBuffer[] nioBuffers(); - - /** - * Exposes this buffer's bytes as an NIO {@link ByteBuffer}'s for the specified offset and length - * The returned buffer shares the content with this buffer, while changing the position and limit - * of the returned NIO buffer does not affect the indexes and marks of this buffer. This method does - * not modify {@code readerIndex} or {@code writerIndex} of this buffer. Please note that the - * returned NIO buffer will not see the changes of this buffer if this buffer is a dynamic - * buffer and it adjusted its capacity. - * - * - * @throws UnsupportedOperationException - * if this buffer cannot create a {@link ByteBuffer} that shares the content with itself - */ - ByteBuffer[] nioBuffers(int offset, int length); } diff --git a/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java b/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java index cfbc99d8b8..4074111e43 100644 --- a/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java @@ -1001,6 +1001,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit } return false; } + @Override public ByteBuffer nioBuffer(int index, int length) { if (components.size() == 1) { @@ -1023,6 +1024,11 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit return merged; } + @Override + public boolean hasNioBuffers() { + return true; + } + @Override public ByteBuffer[] nioBuffers(int index, int length) { int componentId = toComponentIndex(index); @@ -1206,6 +1212,16 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit throw new UnsupportedOperationException(); } + @Override + public ByteBuffer[] nioBuffers() { + ByteBuffer[] nioBuffers = new ByteBuffer[components.size()]; + int index = 0; + for (Component component : components) { + nioBuffers[index++] = component.buf.unsafe().nioBuffer(); + } + return nioBuffers; + } + @Override public ByteBuf newBuffer(int initialCapacity) { CompositeByteBuf buf = new DefaultCompositeByteBuf(maxNumComponents); diff --git a/buffer/src/main/java/io/netty/buffer/DirectByteBuf.java b/buffer/src/main/java/io/netty/buffer/DirectByteBuf.java index 07f9802b03..229baf111f 100644 --- a/buffer/src/main/java/io/netty/buffer/DirectByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/DirectByteBuf.java @@ -380,6 +380,16 @@ public class DirectByteBuf extends AbstractByteBuf { } } + @Override + public boolean hasNioBuffers() { + return false; + } + + @Override + public ByteBuffer[] nioBuffers(int offset, int length) { + throw new UnsupportedOperationException(); + } + @Override public ByteBuf copy(int index, int length) { ByteBuffer src; @@ -408,6 +418,11 @@ public class DirectByteBuf extends AbstractByteBuf { return tmpBuf; } + @Override + public ByteBuffer[] nioBuffers() { + throw new UnsupportedOperationException(); + } + @Override public ByteBuf newBuffer(int initialCapacity) { return new DirectByteBuf(initialCapacity, Math.max(initialCapacity, maxCapacity())); diff --git a/buffer/src/main/java/io/netty/buffer/DuplicatedByteBuf.java b/buffer/src/main/java/io/netty/buffer/DuplicatedByteBuf.java index 7cf17084d0..cb3f1f2b48 100644 --- a/buffer/src/main/java/io/netty/buffer/DuplicatedByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/DuplicatedByteBuf.java @@ -211,6 +211,16 @@ public class DuplicatedByteBuf extends AbstractByteBuf implements WrappedByteBuf return buffer.nioBuffer(index, length); } + @Override + public boolean hasNioBuffers() { + return buffer.hasNioBuffers(); + } + + @Override + public ByteBuffer[] nioBuffers(int offset, int length) { + return buffer.nioBuffers(offset, length); + } + @Override public Unsafe unsafe() { return unsafe; @@ -223,6 +233,11 @@ public class DuplicatedByteBuf extends AbstractByteBuf implements WrappedByteBuf return buffer.unsafe().nioBuffer(); } + @Override + public ByteBuffer[] nioBuffers() { + return buffer.unsafe().nioBuffers(); + } + @Override public ByteBuf newBuffer(int initialCapacity) { return buffer.unsafe().newBuffer(initialCapacity); diff --git a/buffer/src/main/java/io/netty/buffer/HeapByteBuf.java b/buffer/src/main/java/io/netty/buffer/HeapByteBuf.java index 5b19e2185c..00cdbecbcc 100644 --- a/buffer/src/main/java/io/netty/buffer/HeapByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/HeapByteBuf.java @@ -209,6 +209,16 @@ public class HeapByteBuf extends AbstractByteBuf { return ByteBuffer.wrap(array, index, length); } + @Override + public boolean hasNioBuffers() { + return false; + } + + @Override + public ByteBuffer[] nioBuffers(int offset, int length) { + throw new UnsupportedOperationException(); + } + @Override public short getShort(int index) { return (short) (array[index] << 8 | array[index + 1] & 0xFF); @@ -297,6 +307,11 @@ public class HeapByteBuf extends AbstractByteBuf { return nioBuf; } + @Override + public ByteBuffer[] nioBuffers() { + throw new UnsupportedOperationException(); + } + @Override public ByteBuf newBuffer(int initialCapacity) { return new HeapByteBuf(initialCapacity, Math.max(initialCapacity, maxCapacity())); diff --git a/buffer/src/main/java/io/netty/buffer/ReadOnlyByteBuf.java b/buffer/src/main/java/io/netty/buffer/ReadOnlyByteBuf.java index ba69c906f2..774570d569 100644 --- a/buffer/src/main/java/io/netty/buffer/ReadOnlyByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/ReadOnlyByteBuf.java @@ -203,6 +203,16 @@ public class ReadOnlyByteBuf extends AbstractByteBuf implements WrappedByteBuf { return buffer.nioBuffer(index, length).asReadOnlyBuffer(); } + @Override + public boolean hasNioBuffers() { + return buffer.hasNioBuffers(); + } + + @Override + public ByteBuffer[] nioBuffers(int offset, int length) { + return buffer.nioBuffers(offset, length); + } + @Override public int capacity() { return buffer.capacity(); diff --git a/buffer/src/main/java/io/netty/buffer/SlicedByteBuf.java b/buffer/src/main/java/io/netty/buffer/SlicedByteBuf.java index 4a11e53566..c23b5cff56 100644 --- a/buffer/src/main/java/io/netty/buffer/SlicedByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/SlicedByteBuf.java @@ -257,6 +257,17 @@ public class SlicedByteBuf extends AbstractByteBuf implements WrappedByteBuf { return buffer.nioBuffer(index + adjustment, length); } + @Override + public boolean hasNioBuffers() { + return buffer.hasNioBuffers(); + } + + @Override + public ByteBuffer[] nioBuffers(int index, int length) { + checkIndex(index, length); + return buffer.nioBuffers(index, length); + } + private void checkIndex(int index) { if (index < 0 || index >= capacity()) { throw new IndexOutOfBoundsException("Invalid index: " + index @@ -290,6 +301,11 @@ public class SlicedByteBuf extends AbstractByteBuf implements WrappedByteBuf { return buffer.nioBuffer(adjustment, length); } + @Override + public ByteBuffer[] nioBuffers() { + return buffer.nioBuffers(adjustment, length); + } + @Override public ByteBuf newBuffer(int initialCapacity) { return buffer.unsafe().newBuffer(initialCapacity); diff --git a/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java b/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java index 5ae917261f..cb3810116a 100644 --- a/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java @@ -657,6 +657,29 @@ public class SwappedByteBuf implements WrappedByteBuf { return buf.nioBuffer(index, length).order(order); } + @Override + public boolean hasNioBuffers() { + return buf.hasNioBuffers(); + } + + @Override + public ByteBuffer[] nioBuffers() { + ByteBuffer[] nioBuffers = buf.nioBuffers(); + for (int i = 0; i < nioBuffers.length; i++) { + nioBuffers[i] = nioBuffers[i].order(order); + } + return nioBuffers; + } + + @Override + public ByteBuffer[] nioBuffers(int offset, int length) { + ByteBuffer[] nioBuffers = buf.nioBuffers(offset, length); + for (int i = 0; i < nioBuffers.length; i++) { + nioBuffers[i] = nioBuffers[i].order(order); + } + return nioBuffers; + } + @Override public boolean hasArray() { return buf.hasArray(); diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpCodecUtil.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpCodecUtil.java index ac40046505..d2a7b1dc00 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpCodecUtil.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpCodecUtil.java @@ -151,6 +151,17 @@ final class HttpCodecUtil { return false; } + static void removeTransferEncodingChunked(HttpMessage m) { + List values = m.getHeaders(HttpHeaders.Names.TRANSFER_ENCODING); + values.remove(HttpHeaders.Values.CHUNKED); + m.setHeader(HttpHeaders.Names.TRANSFER_ENCODING, values); + } + + static boolean isContentLengthSet(HttpMessage m) { + List contentLength = m.getHeaders(HttpHeaders.Names.CONTENT_LENGTH); + return !contentLength.isEmpty(); + } + /** * A constructor to ensure that instances of this class are never made */ diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpMessageEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpMessageEncoder.java index c83c192190..c8d86c669e 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpMessageEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpMessageEncoder.java @@ -47,7 +47,7 @@ public abstract class HttpMessageEncoder extends MessageToByteEncoder { private static final ByteBuf LAST_CHUNK = copiedBuffer("0\r\n\r\n", CharsetUtil.US_ASCII); - private volatile boolean chunked; + private boolean transferEncodingChunked; /** * Creates a new instance. @@ -64,17 +64,26 @@ public abstract class HttpMessageEncoder extends MessageToByteEncoder { public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { if (msg instanceof HttpMessage) { HttpMessage m = (HttpMessage) msg; - boolean chunked; + boolean contentMustBeEmpty; if (m.isChunked()) { - // check if the Transfer-Encoding is set to chunked already. - // if not add the header to the message - if (!HttpCodecUtil.isTransferEncodingChunked(m)) { - m.addHeader(Names.TRANSFER_ENCODING, Values.CHUNKED); + // if Content-Length is set then the message can't be HTTP chunked + if (HttpCodecUtil.isContentLengthSet(m)) { + contentMustBeEmpty = false; + transferEncodingChunked = false; + HttpCodecUtil.removeTransferEncodingChunked(m); + } else { + // check if the Transfer-Encoding is set to chunked already. + // if not add the header to the message + if (!HttpCodecUtil.isTransferEncodingChunked(m)) { + m.addHeader(Names.TRANSFER_ENCODING, Values.CHUNKED); + } + contentMustBeEmpty = true; + transferEncodingChunked = true; } - chunked = this.chunked = true; } else { - chunked = this.chunked = HttpCodecUtil.isTransferEncodingChunked(m); + transferEncodingChunked = contentMustBeEmpty = HttpCodecUtil.isTransferEncodingChunked(m); } + out.markWriterIndex(); encodeInitialLine(out, m); encodeHeaders(out, m); @@ -83,20 +92,19 @@ public abstract class HttpMessageEncoder extends MessageToByteEncoder { ByteBuf content = m.getContent(); if (content.readable()) { - if (chunked) { + if (contentMustBeEmpty) { out.resetWriterIndex(); throw new IllegalArgumentException( - "HttpMessage.content must be empty " + - "if Transfer-Encoding is chunked."); + "HttpMessage.content must be empty if Transfer-Encoding is chunked."); } else { out.writeBytes(content, content.readerIndex(), content.readableBytes()); } } } else if (msg instanceof HttpChunk) { HttpChunk chunk = (HttpChunk) msg; - if (chunked) { + if (transferEncodingChunked) { if (chunk.isLast()) { - chunked = false; + transferEncodingChunked = false; if (chunk instanceof HttpChunkTrailer) { out.writeByte((byte) '0'); out.writeByte(CR); @@ -150,10 +158,10 @@ public abstract class HttpMessageEncoder extends MessageToByteEncoder { private static void encodeHeader(ByteBuf buf, String header, String value) throws UnsupportedEncodingException { - buf.writeBytes(header.getBytes("ASCII")); + buf.writeBytes(header.getBytes(CharsetUtil.US_ASCII)); buf.writeByte(COLON); buf.writeByte(SP); - buf.writeBytes(value.getBytes("ASCII")); + buf.writeBytes(value.getBytes(CharsetUtil.US_ASCII)); buf.writeByte(CR); buf.writeByte(LF); } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpRequestEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpRequestEncoder.java index eda340707e..f450c809da 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpRequestEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpRequestEncoder.java @@ -17,6 +17,7 @@ package io.netty.handler.codec.http; import static io.netty.handler.codec.http.HttpConstants.*; import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; /** * Encodes an {@link HttpRequest} or an {@link HttpChunk} into @@ -33,11 +34,11 @@ public class HttpRequestEncoder extends HttpMessageEncoder { @Override protected void encodeInitialLine(ByteBuf buf, HttpMessage message) throws Exception { HttpRequest request = (HttpRequest) message; - buf.writeBytes(request.getMethod().toString().getBytes("ASCII")); + buf.writeBytes(request.getMethod().toString().getBytes(CharsetUtil.US_ASCII)); buf.writeByte(SP); - buf.writeBytes(request.getUri().getBytes("ASCII")); + buf.writeBytes(request.getUri().getBytes(CharsetUtil.UTF_8)); buf.writeByte(SP); - buf.writeBytes(request.getProtocolVersion().toString().getBytes("ASCII")); + buf.writeBytes(request.getProtocolVersion().toString().getBytes(CharsetUtil.US_ASCII)); buf.writeByte(CR); buf.writeByte(LF); } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpResponseEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpResponseEncoder.java index d897f680dd..141b41d8e1 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpResponseEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpResponseEncoder.java @@ -17,6 +17,7 @@ package io.netty.handler.codec.http; import static io.netty.handler.codec.http.HttpConstants.*; import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; /** * Encodes an {@link HttpResponse} or an {@link HttpChunk} into @@ -33,11 +34,11 @@ public class HttpResponseEncoder extends HttpMessageEncoder { @Override protected void encodeInitialLine(ByteBuf buf, HttpMessage message) throws Exception { HttpResponse response = (HttpResponse) message; - buf.writeBytes(response.getProtocolVersion().toString().getBytes("ASCII")); + buf.writeBytes(response.getProtocolVersion().toString().getBytes(CharsetUtil.US_ASCII)); buf.writeByte(SP); - buf.writeBytes(String.valueOf(response.getStatus().getCode()).getBytes("ASCII")); + buf.writeBytes(String.valueOf(response.getStatus().getCode()).getBytes(CharsetUtil.US_ASCII)); buf.writeByte(SP); - buf.writeBytes(String.valueOf(response.getStatus().getReasonPhrase()).getBytes("ASCII")); + buf.writeBytes(String.valueOf(response.getStatus().getReasonPhrase()).getBytes(CharsetUtil.US_ASCII)); buf.writeByte(CR); buf.writeByte(LF); } diff --git a/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspRequestEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspRequestEncoder.java index 4a6815fb53..52ba6a2299 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspRequestEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspRequestEncoder.java @@ -18,6 +18,7 @@ package io.netty.handler.codec.rtsp; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.HttpMessage; import io.netty.handler.codec.http.HttpRequest; +import io.netty.util.CharsetUtil; /** * Encodes an RTSP request represented in {@link HttpRequest} into @@ -30,11 +31,11 @@ public class RtspRequestEncoder extends RtspMessageEncoder { protected void encodeInitialLine(ByteBuf buf, HttpMessage message) throws Exception { HttpRequest request = (HttpRequest) message; - buf.writeBytes(request.getMethod().toString().getBytes("ASCII")); + buf.writeBytes(request.getMethod().toString().getBytes(CharsetUtil.US_ASCII)); buf.writeByte((byte) ' '); - buf.writeBytes(request.getUri().getBytes("ASCII")); + buf.writeBytes(request.getUri().getBytes(CharsetUtil.UTF_8)); buf.writeByte((byte) ' '); - buf.writeBytes(request.getProtocolVersion().toString().getBytes("ASCII")); + buf.writeBytes(request.getProtocolVersion().toString().getBytes(CharsetUtil.US_ASCII)); buf.writeByte((byte) '\r'); buf.writeByte((byte) '\n'); } diff --git a/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspResponseEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspResponseEncoder.java index 63b94c5df7..e5c2f719a4 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspResponseEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspResponseEncoder.java @@ -18,6 +18,7 @@ package io.netty.handler.codec.rtsp; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.HttpMessage; import io.netty.handler.codec.http.HttpResponse; +import io.netty.util.CharsetUtil; /** * Encodes an RTSP response represented in {@link HttpResponse} into @@ -30,11 +31,11 @@ public class RtspResponseEncoder extends RtspMessageEncoder { protected void encodeInitialLine(ByteBuf buf, HttpMessage message) throws Exception { HttpResponse response = (HttpResponse) message; - buf.writeBytes(response.getProtocolVersion().toString().getBytes("ASCII")); + buf.writeBytes(response.getProtocolVersion().toString().getBytes(CharsetUtil.US_ASCII)); buf.writeByte((byte) ' '); - buf.writeBytes(String.valueOf(response.getStatus().getCode()).getBytes("ASCII")); + buf.writeBytes(String.valueOf(response.getStatus().getCode()).getBytes(CharsetUtil.US_ASCII)); buf.writeByte((byte) ' '); - buf.writeBytes(String.valueOf(response.getStatus().getReasonPhrase()).getBytes("ASCII")); + buf.writeBytes(String.valueOf(response.getStatus().getReasonPhrase()).getBytes(CharsetUtil.US_ASCII)); buf.writeByte((byte) '\r'); buf.writeByte((byte) '\n'); } diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/DefaultSpdyDataFrame.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/DefaultSpdyDataFrame.java index 6678b65031..a043ad3943 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/DefaultSpdyDataFrame.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/DefaultSpdyDataFrame.java @@ -26,7 +26,6 @@ public class DefaultSpdyDataFrame implements SpdyDataFrame { private int streamId; private boolean last; - private boolean compressed; private ByteBuf data = Unpooled.EMPTY_BUFFER; /** @@ -62,18 +61,6 @@ public class DefaultSpdyDataFrame implements SpdyDataFrame { this.last = last; } - @Override - @Deprecated - public boolean isCompressed() { - return compressed; - } - - @Override - @Deprecated - public void setCompressed(boolean compressed) { - this.compressed = compressed; - } - @Override public ByteBuf getData() { return data; @@ -97,8 +84,6 @@ public class DefaultSpdyDataFrame implements SpdyDataFrame { buf.append(getClass().getSimpleName()); buf.append("(last: "); buf.append(isLast()); - buf.append("; compressed: "); - buf.append(isCompressed()); buf.append(')'); buf.append(StringUtil.NEWLINE); buf.append("--> Stream-ID = "); diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyCodecUtil.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyCodecUtil.java index 8d4059a399..5458a9f77f 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyCodecUtil.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyCodecUtil.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.spdy; import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; final class SpdyCodecUtil { @@ -251,7 +252,7 @@ final class SpdyCodecUtil { byte[] SPDY2_DICT_; try { - SPDY2_DICT_ = SPDY2_DICT_S.getBytes("US-ASCII"); + SPDY2_DICT_ = SPDY2_DICT_S.getBytes(CharsetUtil.US_ASCII); // dictionary is null terminated SPDY2_DICT_[SPDY2_DICT_.length - 1] = (byte) 0; } catch (Exception e) { diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyControlFrame.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyControlFrame.java new file mode 100644 index 0000000000..2a7cf729c1 --- /dev/null +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyControlFrame.java @@ -0,0 +1,23 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.spdy; + +/** + * A SPDY Protocol Control Frame + */ +public interface SpdyControlFrame { + // Tag interface +} diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyDataFrame.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyDataFrame.java index 0208cf0add..4458051e50 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyDataFrame.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyDataFrame.java @@ -44,18 +44,6 @@ public interface SpdyDataFrame { */ void setLast(boolean last); - /** - * @deprecated Removed from SPDY specification. - */ - @Deprecated - boolean isCompressed(); - - /** - * @deprecated Removed from SPDY specification. - */ - @Deprecated - void setCompressed(boolean compressed); - /** * Returns the data payload of this frame. If there is no data payload * {@link Unpooled#EMPTY_BUFFER} is returned. diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java index 4f2bfbb44e..f05b4c469a 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java @@ -23,6 +23,7 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import io.netty.handler.codec.UnsupportedMessageTypeException; +import io.netty.util.CharsetUtil; import java.util.Set; @@ -75,17 +76,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder { @Override public boolean isEncodable(Object msg) throws Exception { - // FIXME: Introduce supertype - return msg instanceof SpdyDataFrame || - msg instanceof SpdySynStreamFrame || - msg instanceof SpdySynReplyFrame || - msg instanceof SpdyRstStreamFrame || - msg instanceof SpdySettingsFrame || - msg instanceof SpdyNoOpFrame || - msg instanceof SpdyPingFrame || - msg instanceof SpdyGoAwayFrame || - msg instanceof SpdyHeadersFrame || - msg instanceof SpdyWindowUpdateFrame; + return msg instanceof SpdyDataFrame || msg instanceof SpdyControlFrame; } @Override @@ -312,14 +303,14 @@ public class SpdyFrameEncoder extends MessageToByteEncoder { ByteBuf headerBlock = Unpooled.buffer(); writeLengthField(version, headerBlock, numHeaders); for (String name: names) { - byte[] nameBytes = name.getBytes("UTF-8"); + byte[] nameBytes = name.getBytes(CharsetUtil.UTF_8); writeLengthField(version, headerBlock, nameBytes.length); headerBlock.writeBytes(nameBytes); int savedIndex = headerBlock.writerIndex(); int valueLength = 0; writeLengthField(version, headerBlock, valueLength); for (String value: headerFrame.getHeaders(name)) { - byte[] valueBytes = value.getBytes("UTF-8"); + byte[] valueBytes = value.getBytes(CharsetUtil.UTF_8); headerBlock.writeBytes(valueBytes); headerBlock.writeByte(0); valueLength += valueBytes.length + 1; diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyGoAwayFrame.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyGoAwayFrame.java index 2fc1de4791..ebc2a82bcc 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyGoAwayFrame.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyGoAwayFrame.java @@ -18,7 +18,7 @@ package io.netty.handler.codec.spdy; /** * A SPDY Protocol GOAWAY Control Frame */ -public interface SpdyGoAwayFrame { +public interface SpdyGoAwayFrame extends SpdyControlFrame { /** * Returns the Last-good-stream-ID of this frame. diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeadersFrame.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeadersFrame.java index 43ca4f289c..c0d53ad4b7 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeadersFrame.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeadersFrame.java @@ -18,7 +18,7 @@ package io.netty.handler.codec.spdy; /** * A SPDY Protocol HEADERS Control Frame */ -public interface SpdyHeadersFrame extends SpdyHeaderBlock { +public interface SpdyHeadersFrame extends SpdyHeaderBlock, SpdyControlFrame { /** * Returns the Stream-ID of this frame. diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyNoOpFrame.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyNoOpFrame.java index 54592c41cb..31b6e11a0b 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyNoOpFrame.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyNoOpFrame.java @@ -18,6 +18,6 @@ package io.netty.handler.codec.spdy; /** * A SPDY Protocol NOOP Control Frame */ -public interface SpdyNoOpFrame { +public interface SpdyNoOpFrame extends SpdyControlFrame { // Tag interface } diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyPingFrame.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyPingFrame.java index 4387732bb2..8a97479ed9 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyPingFrame.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyPingFrame.java @@ -18,7 +18,7 @@ package io.netty.handler.codec.spdy; /** * A SPDY Protocol PING Control Frame */ -public interface SpdyPingFrame { +public interface SpdyPingFrame extends SpdyControlFrame { /** * Returns the ID of this frame. diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyRstStreamFrame.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyRstStreamFrame.java index 03dfbcd447..4296d72df4 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyRstStreamFrame.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyRstStreamFrame.java @@ -18,7 +18,7 @@ package io.netty.handler.codec.spdy; /** * A SPDY Protocol RST_STREAM Control Frame */ -public interface SpdyRstStreamFrame { +public interface SpdyRstStreamFrame extends SpdyControlFrame { /** * Returns the Stream-ID of this frame. diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySettingsFrame.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySettingsFrame.java index 1f41e4b0fc..47e1d7b062 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySettingsFrame.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySettingsFrame.java @@ -20,7 +20,7 @@ import java.util.Set; /** * A SPDY Protocol SETTINGS Control Frame */ -public interface SpdySettingsFrame { +public interface SpdySettingsFrame extends SpdyControlFrame { int SETTINGS_UPLOAD_BANDWIDTH = 1; int SETTINGS_DOWNLOAD_BANDWIDTH = 2; diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySynReplyFrame.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySynReplyFrame.java index 72f4017c01..c78a3ab0d8 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySynReplyFrame.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySynReplyFrame.java @@ -18,7 +18,7 @@ package io.netty.handler.codec.spdy; /** * A SPDY Protocol SYN_REPLY Control Frame */ -public interface SpdySynReplyFrame extends SpdyHeaderBlock { +public interface SpdySynReplyFrame extends SpdyHeaderBlock, SpdyControlFrame { /** * Returns the Stream-ID of this frame. diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySynStreamFrame.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySynStreamFrame.java index 82282b86ab..e670a0780b 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySynStreamFrame.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySynStreamFrame.java @@ -18,7 +18,7 @@ package io.netty.handler.codec.spdy; /** * A SPDY Protocol SYN_STREAM Control Frame */ -public interface SpdySynStreamFrame extends SpdyHeaderBlock { +public interface SpdySynStreamFrame extends SpdyHeaderBlock, SpdyControlFrame { /** * Returns the Stream-ID of this frame. diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyWindowUpdateFrame.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyWindowUpdateFrame.java index 5e5fd7034f..d0fea74fdf 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyWindowUpdateFrame.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyWindowUpdateFrame.java @@ -18,7 +18,7 @@ package io.netty.handler.codec.spdy; /** * A SPDY Protocol WINDOW_UPDATE Control Frame */ -public interface SpdyWindowUpdateFrame { +public interface SpdyWindowUpdateFrame extends SpdyControlFrame { /** * Returns the Stream-ID of this frame. diff --git a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java index d0bd9dcd5f..5724abfae3 100644 --- a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java @@ -21,7 +21,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; -import io.netty.util.Signal; +import io.netty.util.internal.Signal; /** * A specialized variation of {@link ByteToMessageDecoder} which enables implementation diff --git a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java index 0e3bf4d99b..caab92a54a 100644 --- a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java @@ -20,7 +20,7 @@ import io.netty.buffer.ByteBufIndexFinder; import io.netty.buffer.ChannelBufType; import io.netty.buffer.SwappedByteBuf; import io.netty.buffer.Unpooled; -import io.netty.util.Signal; +import io.netty.util.internal.Signal; import java.io.IOException; import java.io.InputStream; @@ -672,6 +672,22 @@ class ReplayingDecoderBuffer implements ByteBuf { return buffer.nioBuffer(index, length); } + @Override + public boolean hasNioBuffers() { + return buffer.hasNioBuffers(); + } + + @Override + public ByteBuffer[] nioBuffers() { + throw new UnreplayableOperationException(); + } + + @Override + public ByteBuffer[] nioBuffers(int index, int length) { + checkIndex(index, length); + return buffer.nioBuffers(index, length); + } + @Override public String toString(int index, int length, Charset charset) { checkIndex(index, length); diff --git a/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderBufferTest.java b/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderBufferTest.java index 2d3f15b2cb..2c91cd82bf 100644 --- a/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderBufferTest.java +++ b/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderBufferTest.java @@ -20,7 +20,7 @@ import static org.junit.Assert.assertTrue; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.util.CharsetUtil; -import io.netty.util.Signal; +import io.netty.util.internal.Signal; import org.junit.Test; diff --git a/common/src/main/java/io/netty/util/UniqueName.java b/common/src/main/java/io/netty/util/UniqueName.java index dbdd47617d..525bb7bfe6 100644 --- a/common/src/main/java/io/netty/util/UniqueName.java +++ b/common/src/main/java/io/netty/util/UniqueName.java @@ -18,6 +18,9 @@ package io.netty.util; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; +/** + * Defines a name that must be unique in the map that is provided during construction. + */ public class UniqueName implements Comparable { private static final AtomicInteger nextId = new AtomicInteger(); @@ -25,6 +28,13 @@ public class UniqueName implements Comparable { private final int id; private final String name; + /** + * Constructs a new {@link UniqueName} + * + * @param map the map of names to compare with + * @param name the name of this {@link UniqueName} + * @param args the arguments to process + */ public UniqueName(ConcurrentMap map, String name, Object... args) { if (map == null) { throw new NullPointerException("map"); @@ -37,7 +47,7 @@ public class UniqueName implements Comparable { } if (map.putIfAbsent(name, Boolean.TRUE) != null) { - throw new IllegalArgumentException(String.format("'%s' already in use", name)); + throw new IllegalArgumentException(String.format("'%s' is already in use", name)); } id = nextId.incrementAndGet(); @@ -45,16 +55,33 @@ public class UniqueName implements Comparable { } /** + * Validates the given arguments. This method does not do anything on its own, but must be + * overridden by its subclasses. + * * @param args arguments to validate */ protected void validateArgs(Object... args) { // Subclasses will override. } + /** + * Returns this {@link UniqueName}'s name + * + * @return the name + */ public final String name() { return name; } + /** + * Returns this {@link UniqueName}'s ID + * + * @return the id + */ + public final int id() { + return id; + } + @Override public final int hashCode() { return super.hashCode(); @@ -66,19 +93,19 @@ public class UniqueName implements Comparable { } @Override - public int compareTo(UniqueName o) { - if (this == o) { + public int compareTo(UniqueName other) { + if (this == other) { return 0; } - int ret = name.compareTo(o.name); - if (ret != 0) { - return ret; + int returnCode = name.compareTo(other.name); + if (returnCode != 0) { + return returnCode; } - if (id < o.id) { + if (id < other.id) { return -1; - } else if (id > o.id) { + } else if (id > other.id) { return 1; } else { return 0; diff --git a/common/src/main/java/io/netty/util/Signal.java b/common/src/main/java/io/netty/util/internal/Signal.java similarity index 95% rename from common/src/main/java/io/netty/util/Signal.java rename to common/src/main/java/io/netty/util/internal/Signal.java index eebed84071..0c77118f8a 100644 --- a/common/src/main/java/io/netty/util/Signal.java +++ b/common/src/main/java/io/netty/util/internal/Signal.java @@ -13,9 +13,11 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.netty.util; +package io.netty.util.internal; +import io.netty.util.UniqueName; + import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; diff --git a/common/src/test/java/io/netty/util/UniqueNameTest.java b/common/src/test/java/io/netty/util/UniqueNameTest.java new file mode 100644 index 0000000000..ac04603cb6 --- /dev/null +++ b/common/src/test/java/io/netty/util/UniqueNameTest.java @@ -0,0 +1,87 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util; + +import java.util.ArrayList; +import java.util.concurrent.ConcurrentHashMap; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotSame; +import org.junit.Before; +import org.junit.Test; + +public class UniqueNameTest { + + /** + * A {@link ConcurrentHashMap} of registered names. + * This is set up before each test + */ + private ConcurrentHashMap names; + + /** + * Registers a {@link UniqueName} + * + * @param name the name being registered + * @return the unique name + */ + public UniqueName registerName(String name) { + return new UniqueName(names, name); + } + + @Before + public void initializeTest() { + this.names = new ConcurrentHashMap(); + } + + @Test + public void testRegisteringName() { + registerName("Abcedrian"); + + assertTrue(this.names.get("Abcedrian")); + assertTrue(this.names.get("Hellyes") == null); + } + + @Test + public void testNameUniqueness() { + registerName("Leroy"); + boolean failed = false; + try { + registerName("Leroy"); + } catch (IllegalArgumentException ex) { + failed = true; + } + assertTrue(failed); + } + + @Test + public void testIDUniqueness() { + UniqueName one = registerName("one"); + UniqueName two = registerName("two"); + assertNotSame(one.id(), two.id()); + + ArrayList nameList = new ArrayList(); + + for (int index = 0; index < 2500; index++) { + UniqueName currentName = registerName("test" + index); + nameList.add(currentName); + for (UniqueName otherName : nameList) { + if (!currentName.name().equals(otherName.name())) { + assertNotSame(currentName.id(), otherName.name()); + } + } + } + } + +} diff --git a/example/src/main/java/io/netty/example/objectecho/ObjectEchoClientHandler.java b/example/src/main/java/io/netty/example/objectecho/ObjectEchoClientHandler.java index c7b583d99e..62acef8ddd 100644 --- a/example/src/main/java/io/netty/example/objectecho/ObjectEchoClientHandler.java +++ b/example/src/main/java/io/netty/example/objectecho/ObjectEchoClientHandler.java @@ -57,7 +57,7 @@ public class ObjectEchoClientHandler extends ChannelInboundMessageHandlerAdapter @Override public void messageReceived(ChannelHandlerContext ctx, List msg) throws Exception { - // Echo back the received object to the client. + // Echo back the received object to the server. ctx.write(msg); } diff --git a/pom.xml b/pom.xml index accfe6a708..d423c9c8a7 100644 --- a/pom.xml +++ b/pom.xml @@ -283,6 +283,7 @@ java.nio.channels.AsynchronousServerSocketChannel java.nio.channels.AsynchronousChannelGroup java.nio.channels.NetworkChannel + java.nio.channels.InterruptedByTimeoutException diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java index 34ab5d1505..1891372eb9 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java @@ -54,10 +54,11 @@ final class SocketTestPermutation { sbfs.add(new Factory() { @Override public ServerBootstrap newInstance() { - AioEventLoopGroup loop = new AioEventLoopGroup(); + AioEventLoopGroup parentGroup = new AioEventLoopGroup(); + AioEventLoopGroup childGroup = new AioEventLoopGroup(); return new ServerBootstrap(). - group(loop). - channel(new AioServerSocketChannel(loop)); + group(parentGroup, childGroup). + channel(new AioServerSocketChannel(parentGroup, childGroup)); } }); sbfs.add(new Factory() { diff --git a/transport/src/main/java/io/netty/channel/ChannelOption.java b/transport/src/main/java/io/netty/channel/ChannelOption.java index bdc86c38d3..36d399cf46 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOption.java +++ b/transport/src/main/java/io/netty/channel/ChannelOption.java @@ -94,6 +94,11 @@ public class ChannelOption extends UniqueName { public static final ChannelOption SCTP_SET_PEER_PRIMARY_ADDR = new ChannelOption("SCTP_SET_PEER_PRIMARY_ADDR"); + public static final ChannelOption AIO_READ_TIMEOUT = + new ChannelOption("AIO_READ_TIMEOUT"); + public static final ChannelOption AIO_WRITE_TIMEOUT = + new ChannelOption("AIO_WRITE_TIMEOUT"); + public ChannelOption(String name) { super(names, name); } diff --git a/transport/src/main/java/io/netty/channel/DefaultEventExecutor.java b/transport/src/main/java/io/netty/channel/DefaultEventExecutor.java index 33c6993e00..c783d3d000 100644 --- a/transport/src/main/java/io/netty/channel/DefaultEventExecutor.java +++ b/transport/src/main/java/io/netty/channel/DefaultEventExecutor.java @@ -19,8 +19,9 @@ import java.util.concurrent.ThreadFactory; class DefaultEventExecutor extends SingleThreadEventExecutor { - DefaultEventExecutor(DefaultEventExecutorGroup parent, ThreadFactory threadFactory) { - super(parent, threadFactory); + DefaultEventExecutor( + DefaultEventExecutorGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) { + super(parent, threadFactory, scheduler); } @Override diff --git a/transport/src/main/java/io/netty/channel/DefaultEventExecutorGroup.java b/transport/src/main/java/io/netty/channel/DefaultEventExecutorGroup.java index 424c095a41..012b39d132 100644 --- a/transport/src/main/java/io/netty/channel/DefaultEventExecutorGroup.java +++ b/transport/src/main/java/io/netty/channel/DefaultEventExecutorGroup.java @@ -28,7 +28,8 @@ public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup { } @Override - protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { - return new DefaultEventExecutor(this, threadFactory); + protected EventExecutor newChild( + ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception { + return new DefaultEventExecutor(this, threadFactory, scheduler); } } diff --git a/transport/src/main/java/io/netty/channel/MultithreadEventExecutorGroup.java b/transport/src/main/java/io/netty/channel/MultithreadEventExecutorGroup.java index 8622a1e177..8601cfaea7 100644 --- a/transport/src/main/java/io/netty/channel/MultithreadEventExecutorGroup.java +++ b/transport/src/main/java/io/netty/channel/MultithreadEventExecutorGroup.java @@ -24,6 +24,7 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou private static final int DEFAULT_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2; private static final AtomicInteger poolId = new AtomicInteger(); + final TaskScheduler scheduler; private final EventExecutor[] children; private final AtomicInteger childIndex = new AtomicInteger(); @@ -40,11 +41,13 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou threadFactory = new DefaultThreadFactory(); } + scheduler = new TaskScheduler(threadFactory); + children = new SingleThreadEventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { - children[i] = newChild(threadFactory, args); + children[i] = newChild(threadFactory, scheduler, args); success = true; } catch (Exception e) { throw new EventLoopException("failed to create a child event loop", e); @@ -63,10 +66,12 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou return children[Math.abs(childIndex.getAndIncrement() % children.length)]; } - protected abstract EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception; + protected abstract EventExecutor newChild( + ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception; @Override public void shutdown() { + scheduler.shutdown(); for (EventExecutor l: children) { l.shutdown(); } @@ -74,6 +79,9 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou @Override public boolean isShutdown() { + if (!scheduler.isShutdown()) { + return false; + } for (EventExecutor l: children) { if (!l.isShutdown()) { return false; @@ -84,6 +92,9 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou @Override public boolean isTerminated() { + if (!scheduler.isTerminated()) { + return false; + } for (EventExecutor l: children) { if (!l.isTerminated()) { return false; @@ -96,6 +107,15 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long deadline = System.nanoTime() + unit.toNanos(timeout); + for (;;) { + long timeLeft = deadline - System.nanoTime(); + if (timeLeft <= 0) { + return isTerminated(); + } + if (scheduler.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) { + break; + } + } loop: for (EventExecutor l: children) { for (;;) { long timeLeft = deadline - System.nanoTime(); diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java b/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java index f1321362bb..7c7f955274 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java @@ -20,7 +20,6 @@ import io.netty.logging.InternalLoggerFactory; import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Queue; @@ -28,27 +27,18 @@ import java.util.Set; import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; -import java.util.concurrent.DelayQueue; -import java.util.concurrent.Delayed; -import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; public abstract class SingleThreadEventExecutor extends AbstractExecutorService implements EventExecutor { private static final InternalLogger logger = InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class); - private static final long SCHEDULE_CHECK_INTERVAL = TimeUnit.MILLISECONDS.toNanos(10); - private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1); - private static final long START_TIME = System.nanoTime(); - private static final AtomicLong nextTaskId = new AtomicLong(); - static final ThreadLocal CURRENT_EVENT_LOOP = new ThreadLocal(); @@ -56,33 +46,27 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService return CURRENT_EVENT_LOOP.get(); } - private static long nanoTime() { - return System.nanoTime() - START_TIME; - } - - private static long deadlineNanos(long delay) { - return nanoTime() + delay; - } - private final EventExecutorGroup parent; - private final BlockingQueue taskQueue = new LinkedBlockingQueue(); + private final Queue taskQueue; private final Thread thread; private final Object stateLock = new Object(); private final Semaphore threadLock = new Semaphore(0); - // TODO: Use PriorityQueue to reduce the locking overhead of DelayQueue. - private final Queue> scheduledTasks = new DelayQueue>(); + private final TaskScheduler scheduler; private final Set shutdownHooks = new LinkedHashSet(); /** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */ private volatile int state; - private long lastCheckTimeNanos; - private long lastPurgeTimeNanos; - protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory) { + protected SingleThreadEventExecutor( + EventExecutorGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } + if (scheduler == null) { + throw new NullPointerException("scheduler"); + } this.parent = parent; + this.scheduler = scheduler; thread = threadFactory.newThread(new Runnable() { @Override @@ -115,7 +99,6 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService private void cleanupTasks() { for (;;) { boolean ran = false; - cancelScheduledTasks(); ran |= runAllTasks(); ran |= runShutdownHooks(); if (!ran && !hasTasks()) { @@ -124,6 +107,12 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService } } }); + + taskQueue = newTaskQueue(); + } + + protected Queue newTaskQueue() { + return new LinkedBlockingQueue(); } @Override @@ -142,65 +131,26 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService protected Runnable pollTask() { assert inEventLoop(); - - Runnable task = taskQueue.poll(); - if (task != null) { - return task; - } - - if (fetchScheduledTasks()) { - task = taskQueue.poll(); - return task; - } - - return null; + return taskQueue.poll(); } protected Runnable takeTask() throws InterruptedException { assert inEventLoop(); - - for (;;) { - Runnable task = taskQueue.poll(SCHEDULE_CHECK_INTERVAL * 2 / 3, TimeUnit.NANOSECONDS); - if (task != null) { - return task; - } - fetchScheduledTasks(); - task = taskQueue.poll(); - if (task != null) { - return task; - } + if (taskQueue instanceof BlockingQueue) { + return ((BlockingQueue) taskQueue).take(); + } else { + throw new UnsupportedOperationException(); } } protected Runnable peekTask() { assert inEventLoop(); - - Runnable task = taskQueue.peek(); - if (task != null) { - return task; - } - - if (fetchScheduledTasks()) { - task = taskQueue.peek(); - return task; - } - - return null; + return taskQueue.peek(); } protected boolean hasTasks() { assert inEventLoop(); - - boolean empty = taskQueue.isEmpty(); - if (!empty) { - return true; - } - - if (fetchScheduledTasks()) { - return !taskQueue.isEmpty(); - } - - return false; + return !taskQueue.isEmpty(); } protected void addTask(Runnable task) { @@ -397,228 +347,21 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService @Override public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - if (command == null) { - throw new NullPointerException("command"); - } - if (unit == null) { - throw new NullPointerException("unit"); - } - if (delay < 0) { - throw new IllegalArgumentException( - String.format("delay: %d (expected: >= 0)", delay)); - } - return schedule(new ScheduledFutureTask(command, null, deadlineNanos(unit.toNanos(delay)))); + return scheduler.schedule(this, command, delay, unit); } @Override public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - if (callable == null) { - throw new NullPointerException("callable"); - } - if (unit == null) { - throw new NullPointerException("unit"); - } - if (delay < 0) { - throw new IllegalArgumentException( - String.format("delay: %d (expected: >= 0)", delay)); - } - return schedule(new ScheduledFutureTask(callable, deadlineNanos(unit.toNanos(delay)))); + return scheduler.schedule(this, callable, delay, unit); } @Override public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - if (command == null) { - throw new NullPointerException("command"); - } - if (unit == null) { - throw new NullPointerException("unit"); - } - if (initialDelay < 0) { - throw new IllegalArgumentException( - String.format("initialDelay: %d (expected: >= 0)", initialDelay)); - } - if (period <= 0) { - throw new IllegalArgumentException( - String.format("period: %d (expected: > 0)", period)); - } - - return schedule(new ScheduledFutureTask( - command, null, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period))); + return scheduler.scheduleAtFixedRate(this, command, initialDelay, period, unit); } @Override public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - if (command == null) { - throw new NullPointerException("command"); - } - if (unit == null) { - throw new NullPointerException("unit"); - } - if (initialDelay < 0) { - throw new IllegalArgumentException( - String.format("initialDelay: %d (expected: >= 0)", initialDelay)); - } - if (delay <= 0) { - throw new IllegalArgumentException( - String.format("delay: %d (expected: > 0)", delay)); - } - - return schedule(new ScheduledFutureTask( - command, null, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay))); - } - - private ScheduledFuture schedule(ScheduledFutureTask task) { - if (isShutdown()) { - reject(); - } - scheduledTasks.add(task); - if (isShutdown()) { - task.cancel(false); - } - - if (!inEventLoop()) { - synchronized (stateLock) { - if (state == 0) { - state = 1; - thread.start(); - } - } - } else { - fetchScheduledTasks(); - } - - return task; - } - - private boolean fetchScheduledTasks() { - if (scheduledTasks.isEmpty()) { - return false; - } - - long nanoTime = nanoTime(); - if (nanoTime - lastPurgeTimeNanos >= SCHEDULE_PURGE_INTERVAL) { - for (Iterator> i = scheduledTasks.iterator(); i.hasNext();) { - ScheduledFutureTask task = i.next(); - if (task.isCancelled()) { - i.remove(); - } - } - } - - if (nanoTime - lastCheckTimeNanos >= SCHEDULE_CHECK_INTERVAL) { - boolean added = false; - for (;;) { - ScheduledFutureTask task = scheduledTasks.poll(); - if (task == null) { - break; - } - - if (!task.isCancelled()) { - if (isShutdown()) { - task.cancel(false); - } else { - taskQueue.add(task); - added = true; - } - } - } - return added; - } - - return false; - } - - private void cancelScheduledTasks() { - if (scheduledTasks.isEmpty()) { - return; - } - - for (ScheduledFutureTask task: scheduledTasks.toArray(new ScheduledFutureTask[scheduledTasks.size()])) { - task.cancel(false); - } - scheduledTasks.clear(); - } - - private class ScheduledFutureTask extends FutureTask implements ScheduledFuture { - - private final long id = nextTaskId.getAndIncrement(); - private long deadlineNanos; - /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */ - private final long periodNanos; - - ScheduledFutureTask(Runnable runnable, V result, long nanoTime) { - super(runnable, result); - deadlineNanos = nanoTime; - periodNanos = 0; - } - - ScheduledFutureTask(Runnable runnable, V result, long nanoTime, long period) { - super(runnable, result); - if (period == 0) { - throw new IllegalArgumentException( - String.format("period: %d (expected: != 0)", period)); - } - deadlineNanos = nanoTime; - periodNanos = period; - } - - ScheduledFutureTask(Callable callable, long nanoTime) { - super(callable); - deadlineNanos = nanoTime; - periodNanos = 0; - } - - public long deadlineNanos() { - return deadlineNanos; - } - - public long delayNanos() { - return Math.max(0, deadlineNanos() - nanoTime()); - } - - @Override - public long getDelay(TimeUnit unit) { - return unit.convert(delayNanos(), TimeUnit.NANOSECONDS); - } - - @Override - public int compareTo(Delayed o) { - if (this == o) { - return 0; - } - - ScheduledFutureTask that = (ScheduledFutureTask) o; - long d = deadlineNanos() - that.deadlineNanos(); - if (d < 0) { - return -1; - } else if (d > 0) { - return 1; - } else if (id < that.id) { - return -1; - } else if (id == that.id) { - throw new Error(); - } else { - return 1; - } - } - - @Override - public void run() { - if (periodNanos == 0) { - super.run(); - } else { - boolean reset = runAndReset(); - if (reset && !isShutdown()) { - long p = periodNanos; - if (p > 0) { - deadlineNanos += p; - } else { - deadlineNanos = nanoTime() - p; - } - - schedule(this); - } - } - } + return scheduler.scheduleWithFixedDelay(this, command, initialDelay, delay, unit); } } diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index c4d4c6b9fd..05d90c1cdd 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -19,8 +19,9 @@ import java.util.concurrent.ThreadFactory; public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { - protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory) { - super(parent, threadFactory); + protected SingleThreadEventLoop( + EventLoopGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) { + super(parent, threadFactory, scheduler); } @Override diff --git a/transport/src/main/java/io/netty/channel/TaskScheduler.java b/transport/src/main/java/io/netty/channel/TaskScheduler.java new file mode 100644 index 0000000000..b525815a8d --- /dev/null +++ b/transport/src/main/java/io/netty/channel/TaskScheduler.java @@ -0,0 +1,426 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +import io.netty.logging.InternalLogger; +import io.netty.logging.InternalLoggerFactory; + +import java.util.Iterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +public final class TaskScheduler { + + private static final InternalLogger logger = + InternalLoggerFactory.getInstance(TaskScheduler.class); + + private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1); + private static final long START_TIME = System.nanoTime(); + private static final AtomicLong nextTaskId = new AtomicLong(); + + private static long nanoTime() { + return System.nanoTime() - START_TIME; + } + + private static long deadlineNanos(long delay) { + return nanoTime() + delay; + } + + private final BlockingQueue> taskQueue = new DelayQueue>(); + private final Thread thread; + private final Object stateLock = new Object(); + private final Semaphore threadLock = new Semaphore(0); + /** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */ + private volatile int state; + + public TaskScheduler(ThreadFactory threadFactory) { + if (threadFactory == null) { + throw new NullPointerException("threadFactory"); + } + + thread = threadFactory.newThread(new Runnable() { + @Override + public void run() { + try { + for (;;) { + ScheduledFutureTask task; + try { + task = taskQueue.take(); + runTask(task); + } catch (InterruptedException e) { + // Waken up by interruptThread() + } + + if (isShutdown() && taskQueue.peek() == null) { + break; + } + } + } finally { + try { + // Run all remaining tasks and shutdown hooks. + try { + cleanupTasks(); + } finally { + synchronized (stateLock) { + state = 3; + } + } + cleanupTasks(); + } finally { + threadLock.release(); + assert taskQueue.isEmpty(); + } + } + } + + private void runTask(ScheduledFutureTask task) { + EventExecutor executor = task.executor; + if (executor == null) { + task.run(); + } else { + if (executor.isShutdown()) { + task.cancel(false); + } else { + try { + task.executor.execute(task); + } catch (RejectedExecutionException e) { + task.cancel(false); + } + } + } + } + + private void cleanupTasks() { + for (;;) { + boolean ran = false; + cancelScheduledTasks(); + for (;;) { + final ScheduledFutureTask task = taskQueue.poll(); + if (task == null) { + break; + } + + try { + runTask(task); + ran = true; + } catch (Throwable t) { + logger.warn("A task raised an exception.", t); + } + } + + if (!ran && taskQueue.isEmpty()) { + break; + } + } + } + }); + } + + private boolean inSameThread() { + return Thread.currentThread() == thread; + } + + public void shutdown() { + boolean inSameThread = inSameThread(); + boolean wakeup = false; + if (inSameThread) { + synchronized (stateLock) { + assert state == 1; + state = 2; + wakeup = true; + } + } else { + synchronized (stateLock) { + switch (state) { + case 0: + state = 3; + threadLock.release(); + break; + case 1: + state = 2; + wakeup = true; + break; + } + } + } + + if (wakeup && !inSameThread && isShutdown()) { + thread.interrupt(); + } + } + + public boolean isShutdown() { + return state >= 2; + } + + public boolean isTerminated() { + return state == 3; + } + + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + if (unit == null) { + throw new NullPointerException("unit"); + } + + if (inSameThread()) { + throw new IllegalStateException("cannot await termination of the current thread"); + } + + if (threadLock.tryAcquire(timeout, unit)) { + threadLock.release(); + } + + return isTerminated(); + } + + public ScheduledFuture schedule( + EventExecutor executor, Runnable command, long delay, TimeUnit unit) { + if (executor == null) { + throw new NullPointerException("executor"); + } + if (command == null) { + throw new NullPointerException("command"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + if (delay < 0) { + throw new IllegalArgumentException( + String.format("delay: %d (expected: >= 0)", delay)); + } + return schedule(new ScheduledFutureTask(executor, command, null, deadlineNanos(unit.toNanos(delay)))); + } + + public ScheduledFuture schedule( + EventExecutor executor, Callable callable, long delay, TimeUnit unit) { + if (executor == null) { + throw new NullPointerException("executor"); + } + if (callable == null) { + throw new NullPointerException("callable"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + if (delay < 0) { + throw new IllegalArgumentException( + String.format("delay: %d (expected: >= 0)", delay)); + } + return schedule(new ScheduledFutureTask(executor, callable, deadlineNanos(unit.toNanos(delay)))); + } + + public ScheduledFuture scheduleAtFixedRate( + EventExecutor executor, Runnable command, long initialDelay, long period, TimeUnit unit) { + if (executor == null) { + throw new NullPointerException("executor"); + } + if (command == null) { + throw new NullPointerException("command"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + if (initialDelay < 0) { + throw new IllegalArgumentException( + String.format("initialDelay: %d (expected: >= 0)", initialDelay)); + } + if (period <= 0) { + throw new IllegalArgumentException( + String.format("period: %d (expected: > 0)", period)); + } + + return schedule(new ScheduledFutureTask( + executor, command, null, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period))); + } + + public ScheduledFuture scheduleWithFixedDelay( + EventExecutor executor, Runnable command, long initialDelay, long delay, TimeUnit unit) { + if (executor == null) { + throw new NullPointerException("executor"); + } + if (command == null) { + throw new NullPointerException("command"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + if (initialDelay < 0) { + throw new IllegalArgumentException( + String.format("initialDelay: %d (expected: >= 0)", initialDelay)); + } + if (delay <= 0) { + throw new IllegalArgumentException( + String.format("delay: %d (expected: > 0)", delay)); + } + + return schedule(new ScheduledFutureTask( + executor, command, null, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay))); + } + + private ScheduledFuture schedule(ScheduledFutureTask task) { + if (isShutdown()) { + reject(); + } + taskQueue.add(task); + if (isShutdown()) { + task.cancel(false); + } + + boolean started = false; + if (!inSameThread()) { + synchronized (stateLock) { + if (state == 0) { + state = 1; + thread.start(); + started = true; + } + } + } + + if (started) { + schedule(new ScheduledFutureTask( + null, new PurgeTask(), null, + deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL)); + } + + return task; + } + + private static void reject() { + throw new RejectedExecutionException("event executor shut down"); + } + + private void cancelScheduledTasks() { + if (taskQueue.isEmpty()) { + return; + } + + for (ScheduledFutureTask task: taskQueue.toArray(new ScheduledFutureTask[taskQueue.size()])) { + task.cancel(false); + } + + taskQueue.clear(); + } + + private class ScheduledFutureTask extends FutureTask implements ScheduledFuture { + + private final EventExecutor executor; + private final long id = nextTaskId.getAndIncrement(); + private long deadlineNanos; + /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */ + private final long periodNanos; + + ScheduledFutureTask(EventExecutor executor, Runnable runnable, V result, long nanoTime) { + super(runnable, result); + this.executor = executor; + deadlineNanos = nanoTime; + periodNanos = 0; + } + + ScheduledFutureTask(EventExecutor executor, Runnable runnable, V result, long nanoTime, long period) { + super(runnable, result); + if (period == 0) { + throw new IllegalArgumentException( + String.format("period: %d (expected: != 0)", period)); + } + this.executor = executor; + deadlineNanos = nanoTime; + periodNanos = period; + } + + ScheduledFutureTask(EventExecutor executor, Callable callable, long nanoTime) { + super(callable); + this.executor = executor; + deadlineNanos = nanoTime; + periodNanos = 0; + } + + public long deadlineNanos() { + return deadlineNanos; + } + + public long delayNanos() { + return Math.max(0, deadlineNanos() - nanoTime()); + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(delayNanos(), TimeUnit.NANOSECONDS); + } + + @Override + public int compareTo(Delayed o) { + if (this == o) { + return 0; + } + + ScheduledFutureTask that = (ScheduledFutureTask) o; + long d = deadlineNanos() - that.deadlineNanos(); + if (d < 0) { + return -1; + } else if (d > 0) { + return 1; + } else if (id < that.id) { + return -1; + } else if (id == that.id) { + throw new Error(); + } else { + return 1; + } + } + + @Override + public void run() { + if (periodNanos == 0) { + super.run(); + } else { + boolean reset = runAndReset(); + if (reset && !isShutdown()) { + long p = periodNanos; + if (p > 0) { + deadlineNanos += p; + } else { + deadlineNanos = nanoTime() - p; + } + + schedule(this); + } + } + } + } + + private final class PurgeTask implements Runnable { + @Override + public void run() { + Iterator> i = taskQueue.iterator(); + while (i.hasNext()) { + ScheduledFutureTask task = i.next(); + if (task.isCancelled()) { + i.remove(); + } + } + } + } +} diff --git a/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java b/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java index 10047a86d5..fc304a3756 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java +++ b/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java @@ -16,13 +16,15 @@ package io.netty.channel.local; import io.netty.channel.SingleThreadEventLoop; +import io.netty.channel.TaskScheduler; import java.util.concurrent.ThreadFactory; final class LocalEventLoop extends SingleThreadEventLoop { - LocalEventLoop(LocalEventLoopGroup parent, ThreadFactory threadFactory) { - super(parent, threadFactory); + LocalEventLoop( + LocalEventLoopGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) { + super(parent, threadFactory, scheduler); } @Override diff --git a/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java b/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java index 3d6dba41eb..bf1a00e032 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java @@ -17,6 +17,7 @@ package io.netty.channel.local; import io.netty.channel.EventExecutor; import io.netty.channel.MultithreadEventLoopGroup; +import io.netty.channel.TaskScheduler; import java.util.concurrent.ThreadFactory; @@ -35,7 +36,8 @@ public class LocalEventLoopGroup extends MultithreadEventLoopGroup { } @Override - protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { - return new LocalEventLoop(this, threadFactory); + protected EventExecutor newChild( + ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception { + return new LocalEventLoop(this, threadFactory, scheduler); } } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java index bbb000b15f..ac31001d50 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java @@ -68,7 +68,7 @@ abstract class AbstractAioChannel extends AbstractChannel { @Override protected Runnable doRegister() throws Exception { - if (((AioChildEventLoop) eventLoop()).parent() != group) { + if (((AioEventLoop) eventLoop()).parent() != group) { throw new ChannelException( getClass().getSimpleName() + " must be registered to the " + AioEventLoopGroup.class.getSimpleName() + " which was specified in the constructor."); @@ -83,7 +83,7 @@ abstract class AbstractAioChannel extends AbstractChannel { @Override protected boolean isCompatible(EventLoop loop) { - return loop instanceof AioChildEventLoop; + return loop instanceof AioEventLoop; } protected abstract class AbstractAioUnsafe extends AbstractUnsafe { diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioChildEventLoop.java b/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoop.java similarity index 85% rename from transport/src/main/java/io/netty/channel/socket/aio/AioChildEventLoop.java rename to transport/src/main/java/io/netty/channel/socket/aio/AioEventLoop.java index cae4ea1469..8578eb5c84 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioChildEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoop.java @@ -16,13 +16,14 @@ package io.netty.channel.socket.aio; import io.netty.channel.SingleThreadEventLoop; +import io.netty.channel.TaskScheduler; import java.util.concurrent.ThreadFactory; -final class AioChildEventLoop extends SingleThreadEventLoop { +final class AioEventLoop extends SingleThreadEventLoop { - AioChildEventLoop(AioEventLoopGroup parent, ThreadFactory threadFactory) { - super(parent, threadFactory); + AioEventLoop(AioEventLoopGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) { + super(parent, threadFactory, scheduler); } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoopGroup.java index 2abaa66a8b..72f5caa794 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoopGroup.java @@ -18,6 +18,7 @@ package io.netty.channel.socket.aio; import io.netty.channel.EventExecutor; import io.netty.channel.EventLoopException; import io.netty.channel.MultithreadEventLoopGroup; +import io.netty.channel.TaskScheduler; import java.io.IOException; import java.lang.reflect.Field; @@ -57,8 +58,9 @@ public class AioEventLoopGroup extends MultithreadEventLoopGroup { } @Override - protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { - return new AioChildEventLoop(this, threadFactory); + protected EventExecutor newChild( + ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception { + return new AioEventLoop(this, threadFactory, scheduler); } private void executeAioTask(Runnable command) { diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java index d7eca8dc7a..147dff177a 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java @@ -40,9 +40,10 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server private static final InternalLogger logger = InternalLoggerFactory.getInstance(AioServerSocketChannel.class); + private final AioEventLoopGroup childGroup; private final AioServerSocketChannelConfig config; private boolean closed; - private AtomicBoolean readSuspended = new AtomicBoolean(); + private final AtomicBoolean readSuspended = new AtomicBoolean(); private final Runnable acceptTask = new Runnable() { @@ -60,8 +61,13 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server } } - public AioServerSocketChannel(AioEventLoopGroup eventLoop) { - super(null, null, eventLoop, newSocket(eventLoop.group)); + public AioServerSocketChannel(AioEventLoopGroup group) { + this(group, group); + } + + public AioServerSocketChannel(AioEventLoopGroup parentGroup, AioEventLoopGroup childGroup) { + super(null, null, parentGroup, newSocket(parentGroup.group)); + this.childGroup = childGroup; config = new AioServerSocketChannelConfig(javaChannel()); } @@ -147,7 +153,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server // create the socket add it to the buffer and fire the event channel.pipeline().inboundMessageBuffer().add( - new AioSocketChannel(channel, null, channel.group, ch)); + new AioSocketChannel(channel, null, channel.childGroup, ch)); if (!channel.readSuspended.get()) { channel.pipeline().fireInboundBufferUpdated(); } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java index 883b020a26..c82269b82a 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java @@ -31,6 +31,8 @@ import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.ClosedChannelException; import java.nio.channels.CompletionHandler; +import java.nio.channels.InterruptedByTimeoutException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -39,8 +41,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne private static final ChannelMetadata METADATA = new ChannelMetadata(ChannelBufType.BYTE, false); private static final CompletionHandler CONNECT_HANDLER = new ConnectHandler(); - private static final CompletionHandler WRITE_HANDLER = new WriteHandler(); - private static final CompletionHandler READ_HANDLER = new ReadHandler(); + private static final CompletionHandler WRITE_HANDLER = new WriteHandler(); + private static final CompletionHandler READ_HANDLER = new ReadHandler(); + private static final CompletionHandler GATHERING_WRITE_HANDLER = new WriteHandler(); + private static final CompletionHandler SCATTERING_READ_HANDLER = new ReadHandler(); private static AsynchronousSocketChannel newSocket(AsynchronousChannelGroup group) { try { @@ -180,7 +184,14 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne buf.discardReadBytes(); if (buf.readable()) { - javaChannel().write(buf.nioBuffer(), this, WRITE_HANDLER); + if (buf.hasNioBuffers()) { + ByteBuffer[] buffers = buf.nioBuffers(buf.readerIndex(), buf.readableBytes()); + javaChannel().write(buffers, 0, buffers.length, config.getReadTimeout(), + TimeUnit.MILLISECONDS, AioSocketChannel.this, GATHERING_WRITE_HANDLER); + } else { + javaChannel().write(buf.nioBuffer(), config.getReadTimeout(), TimeUnit.MILLISECONDS, + this, WRITE_HANDLER); + } } else { notifyFlushFutures(); flushing = false; @@ -204,17 +215,24 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne expandReadBuffer(byteBuf); } - // Get a ByteBuffer view on the ByteBuf - ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes()); - javaChannel().read(buffer, AioSocketChannel.this, READ_HANDLER); + if (byteBuf.hasNioBuffers()) { + ByteBuffer[] buffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.writableBytes()); + javaChannel().read(buffers, 0, buffers.length, config.getWriteTimeout(), + TimeUnit.MILLISECONDS, AioSocketChannel.this, SCATTERING_READ_HANDLER); + } else { + // Get a ByteBuffer view on the ByteBuf + ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes()); + javaChannel().read(buffer, config.getWriteTimeout(), TimeUnit.MILLISECONDS, + AioSocketChannel.this, READ_HANDLER); + } } - private static final class WriteHandler extends AioCompletionHandler { + private static final class WriteHandler extends AioCompletionHandler { @Override - protected void completed0(Integer result, AioSocketChannel channel) { + protected void completed0(T result, AioSocketChannel channel) { ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer(); - int writtenBytes = result; + int writtenBytes = result.intValue(); if (writtenBytes > 0) { // Update the readerIndex with the amount of read bytes buf.readerIndex(buf.readerIndex() + writtenBytes); @@ -253,6 +271,15 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne channel.notifyFlushFutures(cause); channel.pipeline().fireExceptionCaught(cause); + // Check if the exception was raised because of an InterruptedByTimeoutException which means that the + // write timeout was hit. In that case we should close the channel as it may be unusable anyway. + // + // See http://openjdk.java.net/projects/nio/javadoc/java/nio/channels/AsynchronousSocketChannel.html + if (cause instanceof InterruptedByTimeoutException) { + channel.unsafe().close(channel.unsafe().voidFuture()); + return; + } + ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer(); if (!buf.readable()) { buf.discardReadBytes(); @@ -263,10 +290,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne } } - private static final class ReadHandler extends AioCompletionHandler { + private static final class ReadHandler extends AioCompletionHandler { @Override - protected void completed0(Integer result, AioSocketChannel channel) { + protected void completed0(T result, AioSocketChannel channel) { final ChannelPipeline pipeline = channel.pipeline(); final ByteBuf byteBuf = pipeline.inboundByteBuffer(); @@ -328,7 +355,11 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne channel.pipeline().fireExceptionCaught(t); - if (t instanceof IOException) { + // Check if the exception was raised because of an InterruptedByTimeoutException which means that the + // write timeout was hit. In that case we should close the channel as it may be unusable anyway. + // + // See http://openjdk.java.net/projects/nio/javadoc/java/nio/channels/AsynchronousSocketChannel.html + if (t instanceof IOException || t instanceof InterruptedByTimeoutException) { channel.unsafe().close(channel.unsafe().voidFuture()); } else { // start the next read diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java index 9b2bf14f51..ae1be50145 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java @@ -23,6 +23,7 @@ import io.netty.channel.socket.SocketChannelConfig; import java.io.IOException; import java.net.StandardSocketOptions; +import java.nio.channels.InterruptedByTimeoutException; import java.nio.channels.NetworkChannel; import java.util.Map; @@ -33,6 +34,8 @@ final class AioSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig { private final NetworkChannel channel; + private volatile long readTimeoutInMillis; + private volatile long writeTimeoutInMillis; /** * Creates a new instance. @@ -49,7 +52,8 @@ final class AioSocketChannelConfig extends DefaultChannelConfig public Map, Object> getOptions() { return getOptions( super.getOptions(), - SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS); + SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS, + AIO_READ_TIMEOUT, AIO_WRITE_TIMEOUT); } @Override @@ -75,6 +79,12 @@ final class AioSocketChannelConfig extends DefaultChannelConfig if (option == IP_TOS) { return (T) Integer.valueOf(getTrafficClass()); } + if (option == AIO_READ_TIMEOUT) { + return (T) Long.valueOf(getReadTimeout()); + } + if (option == AIO_WRITE_TIMEOUT) { + return (T) Long.valueOf(getWriteTimeout()); + } return super.getOption(option); } @@ -97,6 +107,10 @@ final class AioSocketChannelConfig extends DefaultChannelConfig setSoLinger((Integer) value); } else if (option == IP_TOS) { setTrafficClass((Integer) value); + } else if (option == AIO_READ_TIMEOUT) { + setReadTimeout((Long) value); + } else if (option == AIO_WRITE_TIMEOUT) { + setWriteTimeout((Long) value); } else { return super.setOption(option, value); } @@ -235,4 +249,50 @@ final class AioSocketChannelConfig extends DefaultChannelConfig throw new ChannelException(e); } } + + /** + * Return the read timeout in milliseconds after which a {@link InterruptedByTimeoutException} will get thrown. + * Once such an exception was detected it will get propagated to the handlers first. After that the channel + * will get closed as it may be in an unknown state. + * + * To disable it just use 0. + */ + public void setReadTimeout(long readTimeoutInMillis) { + if (readTimeoutInMillis < 0) { + throw new IllegalArgumentException("readTimeoutInMillis: " + readTimeoutInMillis); + } + this.readTimeoutInMillis = readTimeoutInMillis; + } + + /** + * Return the write timeout in milliseconds after which a {@link InterruptedByTimeoutException} will get thrown. + * Once such an exception was detected it will get propagated to the handlers first. After that the channel + * will get closed as it may be in an unknown state. + * + * To disable it just use 0. + */ + public void setWriteTimeout(long writeTimeoutInMillis) { + if (writeTimeoutInMillis < 0) { + throw new IllegalArgumentException("writeTimeoutInMillis: " + writeTimeoutInMillis); + } + this.writeTimeoutInMillis = writeTimeoutInMillis; + } + + /** + * Return the read timeout in milliseconds after which a {@link InterruptedByTimeoutException} will get thrown. + * + * The default is 0 + */ + public long getReadTimeout() { + return readTimeoutInMillis; + } + + /** + * Return the write timeout in milliseconds after which a {@link InterruptedByTimeoutException} will get thrown. + * + * The default is 0 + */ + public long getWriteTimeout() { + return writeTimeoutInMillis; + } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoop.java index 4775a2d91b..deb62dbe60 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoop.java @@ -18,6 +18,7 @@ package io.netty.channel.socket.nio; import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.SingleThreadEventLoop; +import io.netty.channel.TaskScheduler; import io.netty.channel.socket.nio.AbstractNioChannel.NioUnsafe; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; @@ -30,7 +31,9 @@ import java.nio.channels.spi.SelectorProvider; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; @@ -60,8 +63,10 @@ final class NioEventLoop extends SingleThreadEventLoop { private int cancelledKeys; private boolean cleanedCancelledKeys; - NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) { - super(parent, threadFactory); + NioEventLoop( + NioEventLoopGroup parent, ThreadFactory threadFactory, + TaskScheduler scheduler, SelectorProvider selectorProvider) { + super(parent, threadFactory, scheduler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } @@ -76,6 +81,12 @@ final class NioEventLoop extends SingleThreadEventLoop { } } + @Override + protected Queue newTaskQueue() { + // This event loop never calls takeTask() + return new ConcurrentLinkedQueue(); + } + @Override protected void run() { Selector selector = this.selector; diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoopGroup.java index 48f6c98fbc..68a1b38bcf 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoopGroup.java @@ -17,6 +17,7 @@ package io.netty.channel.socket.nio; import io.netty.channel.EventExecutor; import io.netty.channel.MultithreadEventLoopGroup; +import io.netty.channel.TaskScheduler; import java.nio.channels.spi.SelectorProvider; import java.util.concurrent.ThreadFactory; @@ -35,18 +36,20 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup { super(nThreads, threadFactory); } - public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) { + public NioEventLoopGroup( + int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) { super(nThreads, threadFactory, selectorProvider); } @Override - protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { + protected EventExecutor newChild( + ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception { SelectorProvider selectorProvider; if (args == null || args.length == 0 || args[0] == null) { selectorProvider = SelectorProvider.provider(); } else { selectorProvider = (SelectorProvider) args[0]; } - return new NioEventLoop(this, threadFactory, selectorProvider); + return new NioEventLoop(this, threadFactory, scheduler, selectorProvider); } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java b/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java index 7049f7080f..5492769af9 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java @@ -27,7 +27,7 @@ class OioEventLoop extends SingleThreadEventLoop { private AbstractOioChannel ch; OioEventLoop(OioEventLoopGroup parent) { - super(parent, parent.threadFactory); + super(parent, parent.threadFactory, parent.scheduler); this.parent = parent; } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoopGroup.java index 76e473faf2..d0b37deacc 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoopGroup.java @@ -21,6 +21,7 @@ import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; +import io.netty.channel.TaskScheduler; import java.util.Collections; import java.util.Queue; @@ -34,6 +35,7 @@ import java.util.concurrent.TimeUnit; public class OioEventLoopGroup implements EventLoopGroup { private final int maxChannels; + final TaskScheduler scheduler; final ThreadFactory threadFactory; final Set activeChildren = Collections.newSetFromMap( new ConcurrentHashMap()); @@ -60,6 +62,8 @@ public class OioEventLoopGroup implements EventLoopGroup { this.maxChannels = maxChannels; this.threadFactory = threadFactory; + scheduler = new TaskScheduler(threadFactory); + tooManyChannels = new ChannelException("too many channels (max: " + maxChannels + ')'); tooManyChannels.setStackTrace(new StackTraceElement[0]); } @@ -71,6 +75,7 @@ public class OioEventLoopGroup implements EventLoopGroup { @Override public void shutdown() { + scheduler.shutdown(); for (EventLoop l: activeChildren) { l.shutdown(); } @@ -81,6 +86,9 @@ public class OioEventLoopGroup implements EventLoopGroup { @Override public boolean isShutdown() { + if (!scheduler.isShutdown()) { + return false; + } for (EventLoop l: activeChildren) { if (!l.isShutdown()) { return false; @@ -96,6 +104,9 @@ public class OioEventLoopGroup implements EventLoopGroup { @Override public boolean isTerminated() { + if (!scheduler.isTerminated()) { + return false; + } for (EventLoop l: activeChildren) { if (!l.isTerminated()) { return false; @@ -113,6 +124,15 @@ public class OioEventLoopGroup implements EventLoopGroup { public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long deadline = System.nanoTime() + unit.toNanos(timeout); + for (;;) { + long timeLeft = deadline - System.nanoTime(); + if (timeLeft <= 0) { + return isTerminated(); + } + if (scheduler.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) { + break; + } + } for (EventLoop l: activeChildren) { for (;;) { long timeLeft = deadline - System.nanoTime(); diff --git a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java index bb9eaa0481..951b28515a 100644 --- a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java @@ -256,7 +256,8 @@ public class SingleThreadEventLoopTest { final AtomicInteger cleanedUp = new AtomicInteger(); SingleThreadEventLoopImpl() { - super(null, Executors.defaultThreadFactory()); + super(null, Executors.defaultThreadFactory(), + new TaskScheduler(Executors.defaultThreadFactory())); } @Override