Merge remote-tracking branch 'upstream/master'

This commit is contained in:
Jestan Nirojan 2012-08-18 20:11:00 +08:00
commit e257eae287
59 changed files with 1061 additions and 455 deletions

View File

@ -180,27 +180,25 @@ public abstract class AbstractByteBuf implements ByteBuf {
return; return;
} }
if (minWritableBytes > maxCapacity - writerIndex) { if (minWritableBytes < 0) {
throw new IllegalArgumentException(String.format( throw new IllegalArgumentException(String.format(
"minWritableBytes(%d) + writerIndex(%d) > maxCapacity(%d)", "minWritableBytes: %d (expected: 0+)", minWritableBytes));
minWritableBytes, writerIndex, maxCapacity));
} }
int minNewCapacity = writerIndex + minWritableBytes; if (minWritableBytes > maxCapacity - writerIndex) {
if (minNewCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format( throw new IllegalArgumentException(String.format(
"minWritableBytes: %d (exceeds maxCapacity(%d))", minWritableBytes, maxCapacity)); "minWritableBytes: %d (exceeds maxCapacity(%d))", minWritableBytes, maxCapacity));
} }
// Normalize the current capacity to the power of 2. // Normalize the current capacity to the power of 2.
int newCapacity = calculateNewCapacity(minNewCapacity); int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes);
// Adjust to the new capacity. // Adjust to the new capacity.
capacity(newCapacity); capacity(newCapacity);
} }
private int calculateNewCapacity(int minNewCapacity) { private int calculateNewCapacity(int minNewCapacity) {
final int maxCapacity = this.maxCapacity;
final int threshold = 1048576 * 4; // 4 MiB page final int threshold = 1048576 * 4; // 4 MiB page
if (minNewCapacity == threshold) { if (minNewCapacity == threshold) {
@ -223,7 +221,8 @@ public abstract class AbstractByteBuf implements ByteBuf {
while (newCapacity < minNewCapacity) { while (newCapacity < minNewCapacity) {
newCapacity <<= 1; newCapacity <<= 1;
} }
return newCapacity;
return Math.min(newCapacity, maxCapacity);
} }
@Override @Override
@ -726,6 +725,11 @@ public abstract class AbstractByteBuf implements ByteBuf {
return nioBuffer(readerIndex, readableBytes()); return nioBuffer(readerIndex, readableBytes());
} }
@Override
public ByteBuffer[] nioBuffers() {
return nioBuffers(readerIndex, readableBytes());
}
@Override @Override
public String toString(Charset charset) { public String toString(Charset charset) {
return toString(readerIndex, readableBytes(), charset); return toString(readerIndex, readableBytes(), charset);

View File

@ -1701,6 +1701,39 @@ public interface ByteBuf extends ChannelBuf, Comparable<ByteBuf> {
*/ */
ByteBuffer nioBuffer(int index, int length); ByteBuffer nioBuffer(int index, int length);
/**
* Returns {@code true} if and only if {@link #nioBuffers()} method will not fail.
*/
boolean hasNioBuffers();
/**
* Exposes this buffer's readable bytes as an NIO {@link ByteBuffer}'s. The returned buffer
* shares the content with this buffer, while changing the position and limit of the returned
* NIO buffer does not affect the indexes and marks of this buffer. This method does not
* modify {@code readerIndex} or {@code writerIndex} of this buffer. Please note that the
* returned NIO buffer will not see the changes of this buffer if this buffer is a dynamic
* buffer and it adjusted its capacity.
*
*
* @throws UnsupportedOperationException
* if this buffer cannot create a {@link ByteBuffer} that shares the content with itself
*/
ByteBuffer[] nioBuffers();
/**
* Exposes this buffer's bytes as an NIO {@link ByteBuffer}'s for the specified offset and length
* The returned buffer shares the content with this buffer, while changing the position and limit
* of the returned NIO buffer does not affect the indexes and marks of this buffer. This method does
* not modify {@code readerIndex} or {@code writerIndex} of this buffer. Please note that the
* returned NIO buffer will not see the changes of this buffer if this buffer is a dynamic
* buffer and it adjusted its capacity.
*
*
* @throws UnsupportedOperationException
* if this buffer cannot create a {@link ByteBuffer} that shares the content with itself
*/
ByteBuffer[] nioBuffers(int offset, int length);
/** /**
* Returns {@code true} if and only if this buffer has a backing byte array. * Returns {@code true} if and only if this buffer has a backing byte array.
* If this method returns true, you can safely call {@link #array()} and * If this method returns true, you can safely call {@link #array()} and
@ -1801,6 +1834,13 @@ public interface ByteBuf extends ChannelBuf, Comparable<ByteBuf> {
*/ */
ByteBuffer nioBuffer(); ByteBuffer nioBuffer();
/**
* Returns the internal NIO buffer array that is reused for I/O.
*
* @throws UnsupportedOperationException if the buffer has no internal NIO buffer array
*/
ByteBuffer[] nioBuffers();
/** /**
* Returns a new buffer whose type is identical to the callee. * Returns a new buffer whose type is identical to the callee.
* *

View File

@ -15,7 +15,6 @@
*/ */
package io.netty.buffer; package io.netty.buffer;
import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
public interface CompositeByteBuf extends ByteBuf, Iterable<ByteBuf> { public interface CompositeByteBuf extends ByteBuf, Iterable<ByteBuf> {
@ -48,32 +47,4 @@ public interface CompositeByteBuf extends ByteBuf, Iterable<ByteBuf> {
* Same with {@link #slice(int, int)} except that this method returns a list. * Same with {@link #slice(int, int)} except that this method returns a list.
*/ */
List<ByteBuf> decompose(int offset, int length); List<ByteBuf> decompose(int offset, int length);
/**
* Exposes this buffer's readable bytes as an NIO {@link ByteBuffer}'s. The returned buffer
* shares the content with this buffer, while changing the position and limit of the returned
* NIO buffer does not affect the indexes and marks of this buffer. This method does not
* modify {@code readerIndex} or {@code writerIndex} of this buffer. Please note that the
* returned NIO buffer will not see the changes of this buffer if this buffer is a dynamic
* buffer and it adjusted its capacity.
*
*
* @throws UnsupportedOperationException
* if this buffer cannot create a {@link ByteBuffer} that shares the content with itself
*/
ByteBuffer[] nioBuffers();
/**
* Exposes this buffer's bytes as an NIO {@link ByteBuffer}'s for the specified offset and length
* The returned buffer shares the content with this buffer, while changing the position and limit
* of the returned NIO buffer does not affect the indexes and marks of this buffer. This method does
* not modify {@code readerIndex} or {@code writerIndex} of this buffer. Please note that the
* returned NIO buffer will not see the changes of this buffer if this buffer is a dynamic
* buffer and it adjusted its capacity.
*
*
* @throws UnsupportedOperationException
* if this buffer cannot create a {@link ByteBuffer} that shares the content with itself
*/
ByteBuffer[] nioBuffers(int offset, int length);
} }

View File

@ -1001,6 +1001,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
} }
return false; return false;
} }
@Override @Override
public ByteBuffer nioBuffer(int index, int length) { public ByteBuffer nioBuffer(int index, int length) {
if (components.size() == 1) { if (components.size() == 1) {
@ -1023,6 +1024,11 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
return merged; return merged;
} }
@Override
public boolean hasNioBuffers() {
return true;
}
@Override @Override
public ByteBuffer[] nioBuffers(int index, int length) { public ByteBuffer[] nioBuffers(int index, int length) {
int componentId = toComponentIndex(index); int componentId = toComponentIndex(index);
@ -1206,6 +1212,16 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public ByteBuffer[] nioBuffers() {
ByteBuffer[] nioBuffers = new ByteBuffer[components.size()];
int index = 0;
for (Component component : components) {
nioBuffers[index++] = component.buf.unsafe().nioBuffer();
}
return nioBuffers;
}
@Override @Override
public ByteBuf newBuffer(int initialCapacity) { public ByteBuf newBuffer(int initialCapacity) {
CompositeByteBuf buf = new DefaultCompositeByteBuf(maxNumComponents); CompositeByteBuf buf = new DefaultCompositeByteBuf(maxNumComponents);

View File

@ -380,6 +380,16 @@ public class DirectByteBuf extends AbstractByteBuf {
} }
} }
@Override
public boolean hasNioBuffers() {
return false;
}
@Override
public ByteBuffer[] nioBuffers(int offset, int length) {
throw new UnsupportedOperationException();
}
@Override @Override
public ByteBuf copy(int index, int length) { public ByteBuf copy(int index, int length) {
ByteBuffer src; ByteBuffer src;
@ -408,6 +418,11 @@ public class DirectByteBuf extends AbstractByteBuf {
return tmpBuf; return tmpBuf;
} }
@Override
public ByteBuffer[] nioBuffers() {
throw new UnsupportedOperationException();
}
@Override @Override
public ByteBuf newBuffer(int initialCapacity) { public ByteBuf newBuffer(int initialCapacity) {
return new DirectByteBuf(initialCapacity, Math.max(initialCapacity, maxCapacity())); return new DirectByteBuf(initialCapacity, Math.max(initialCapacity, maxCapacity()));

View File

@ -211,6 +211,16 @@ public class DuplicatedByteBuf extends AbstractByteBuf implements WrappedByteBuf
return buffer.nioBuffer(index, length); return buffer.nioBuffer(index, length);
} }
@Override
public boolean hasNioBuffers() {
return buffer.hasNioBuffers();
}
@Override
public ByteBuffer[] nioBuffers(int offset, int length) {
return buffer.nioBuffers(offset, length);
}
@Override @Override
public Unsafe unsafe() { public Unsafe unsafe() {
return unsafe; return unsafe;
@ -223,6 +233,11 @@ public class DuplicatedByteBuf extends AbstractByteBuf implements WrappedByteBuf
return buffer.unsafe().nioBuffer(); return buffer.unsafe().nioBuffer();
} }
@Override
public ByteBuffer[] nioBuffers() {
return buffer.unsafe().nioBuffers();
}
@Override @Override
public ByteBuf newBuffer(int initialCapacity) { public ByteBuf newBuffer(int initialCapacity) {
return buffer.unsafe().newBuffer(initialCapacity); return buffer.unsafe().newBuffer(initialCapacity);

View File

@ -209,6 +209,16 @@ public class HeapByteBuf extends AbstractByteBuf {
return ByteBuffer.wrap(array, index, length); return ByteBuffer.wrap(array, index, length);
} }
@Override
public boolean hasNioBuffers() {
return false;
}
@Override
public ByteBuffer[] nioBuffers(int offset, int length) {
throw new UnsupportedOperationException();
}
@Override @Override
public short getShort(int index) { public short getShort(int index) {
return (short) (array[index] << 8 | array[index + 1] & 0xFF); return (short) (array[index] << 8 | array[index + 1] & 0xFF);
@ -297,6 +307,11 @@ public class HeapByteBuf extends AbstractByteBuf {
return nioBuf; return nioBuf;
} }
@Override
public ByteBuffer[] nioBuffers() {
throw new UnsupportedOperationException();
}
@Override @Override
public ByteBuf newBuffer(int initialCapacity) { public ByteBuf newBuffer(int initialCapacity) {
return new HeapByteBuf(initialCapacity, Math.max(initialCapacity, maxCapacity())); return new HeapByteBuf(initialCapacity, Math.max(initialCapacity, maxCapacity()));

View File

@ -203,6 +203,16 @@ public class ReadOnlyByteBuf extends AbstractByteBuf implements WrappedByteBuf {
return buffer.nioBuffer(index, length).asReadOnlyBuffer(); return buffer.nioBuffer(index, length).asReadOnlyBuffer();
} }
@Override
public boolean hasNioBuffers() {
return buffer.hasNioBuffers();
}
@Override
public ByteBuffer[] nioBuffers(int offset, int length) {
return buffer.nioBuffers(offset, length);
}
@Override @Override
public int capacity() { public int capacity() {
return buffer.capacity(); return buffer.capacity();

View File

@ -257,6 +257,17 @@ public class SlicedByteBuf extends AbstractByteBuf implements WrappedByteBuf {
return buffer.nioBuffer(index + adjustment, length); return buffer.nioBuffer(index + adjustment, length);
} }
@Override
public boolean hasNioBuffers() {
return buffer.hasNioBuffers();
}
@Override
public ByteBuffer[] nioBuffers(int index, int length) {
checkIndex(index, length);
return buffer.nioBuffers(index, length);
}
private void checkIndex(int index) { private void checkIndex(int index) {
if (index < 0 || index >= capacity()) { if (index < 0 || index >= capacity()) {
throw new IndexOutOfBoundsException("Invalid index: " + index throw new IndexOutOfBoundsException("Invalid index: " + index
@ -290,6 +301,11 @@ public class SlicedByteBuf extends AbstractByteBuf implements WrappedByteBuf {
return buffer.nioBuffer(adjustment, length); return buffer.nioBuffer(adjustment, length);
} }
@Override
public ByteBuffer[] nioBuffers() {
return buffer.nioBuffers(adjustment, length);
}
@Override @Override
public ByteBuf newBuffer(int initialCapacity) { public ByteBuf newBuffer(int initialCapacity) {
return buffer.unsafe().newBuffer(initialCapacity); return buffer.unsafe().newBuffer(initialCapacity);

View File

@ -657,6 +657,29 @@ public class SwappedByteBuf implements WrappedByteBuf {
return buf.nioBuffer(index, length).order(order); return buf.nioBuffer(index, length).order(order);
} }
@Override
public boolean hasNioBuffers() {
return buf.hasNioBuffers();
}
@Override
public ByteBuffer[] nioBuffers() {
ByteBuffer[] nioBuffers = buf.nioBuffers();
for (int i = 0; i < nioBuffers.length; i++) {
nioBuffers[i] = nioBuffers[i].order(order);
}
return nioBuffers;
}
@Override
public ByteBuffer[] nioBuffers(int offset, int length) {
ByteBuffer[] nioBuffers = buf.nioBuffers(offset, length);
for (int i = 0; i < nioBuffers.length; i++) {
nioBuffers[i] = nioBuffers[i].order(order);
}
return nioBuffers;
}
@Override @Override
public boolean hasArray() { public boolean hasArray() {
return buf.hasArray(); return buf.hasArray();

View File

@ -151,6 +151,17 @@ final class HttpCodecUtil {
return false; return false;
} }
static void removeTransferEncodingChunked(HttpMessage m) {
List<String> values = m.getHeaders(HttpHeaders.Names.TRANSFER_ENCODING);
values.remove(HttpHeaders.Values.CHUNKED);
m.setHeader(HttpHeaders.Names.TRANSFER_ENCODING, values);
}
static boolean isContentLengthSet(HttpMessage m) {
List<String> contentLength = m.getHeaders(HttpHeaders.Names.CONTENT_LENGTH);
return !contentLength.isEmpty();
}
/** /**
* A constructor to ensure that instances of this class are never made * A constructor to ensure that instances of this class are never made
*/ */

View File

@ -47,7 +47,7 @@ public abstract class HttpMessageEncoder extends MessageToByteEncoder<Object> {
private static final ByteBuf LAST_CHUNK = private static final ByteBuf LAST_CHUNK =
copiedBuffer("0\r\n\r\n", CharsetUtil.US_ASCII); copiedBuffer("0\r\n\r\n", CharsetUtil.US_ASCII);
private volatile boolean chunked; private boolean transferEncodingChunked;
/** /**
* Creates a new instance. * Creates a new instance.
@ -64,17 +64,26 @@ public abstract class HttpMessageEncoder extends MessageToByteEncoder<Object> {
public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
if (msg instanceof HttpMessage) { if (msg instanceof HttpMessage) {
HttpMessage m = (HttpMessage) msg; HttpMessage m = (HttpMessage) msg;
boolean chunked; boolean contentMustBeEmpty;
if (m.isChunked()) { if (m.isChunked()) {
// check if the Transfer-Encoding is set to chunked already. // if Content-Length is set then the message can't be HTTP chunked
// if not add the header to the message if (HttpCodecUtil.isContentLengthSet(m)) {
if (!HttpCodecUtil.isTransferEncodingChunked(m)) { contentMustBeEmpty = false;
m.addHeader(Names.TRANSFER_ENCODING, Values.CHUNKED); transferEncodingChunked = false;
HttpCodecUtil.removeTransferEncodingChunked(m);
} else {
// check if the Transfer-Encoding is set to chunked already.
// if not add the header to the message
if (!HttpCodecUtil.isTransferEncodingChunked(m)) {
m.addHeader(Names.TRANSFER_ENCODING, Values.CHUNKED);
}
contentMustBeEmpty = true;
transferEncodingChunked = true;
} }
chunked = this.chunked = true;
} else { } else {
chunked = this.chunked = HttpCodecUtil.isTransferEncodingChunked(m); transferEncodingChunked = contentMustBeEmpty = HttpCodecUtil.isTransferEncodingChunked(m);
} }
out.markWriterIndex(); out.markWriterIndex();
encodeInitialLine(out, m); encodeInitialLine(out, m);
encodeHeaders(out, m); encodeHeaders(out, m);
@ -83,20 +92,19 @@ public abstract class HttpMessageEncoder extends MessageToByteEncoder<Object> {
ByteBuf content = m.getContent(); ByteBuf content = m.getContent();
if (content.readable()) { if (content.readable()) {
if (chunked) { if (contentMustBeEmpty) {
out.resetWriterIndex(); out.resetWriterIndex();
throw new IllegalArgumentException( throw new IllegalArgumentException(
"HttpMessage.content must be empty " + "HttpMessage.content must be empty if Transfer-Encoding is chunked.");
"if Transfer-Encoding is chunked.");
} else { } else {
out.writeBytes(content, content.readerIndex(), content.readableBytes()); out.writeBytes(content, content.readerIndex(), content.readableBytes());
} }
} }
} else if (msg instanceof HttpChunk) { } else if (msg instanceof HttpChunk) {
HttpChunk chunk = (HttpChunk) msg; HttpChunk chunk = (HttpChunk) msg;
if (chunked) { if (transferEncodingChunked) {
if (chunk.isLast()) { if (chunk.isLast()) {
chunked = false; transferEncodingChunked = false;
if (chunk instanceof HttpChunkTrailer) { if (chunk instanceof HttpChunkTrailer) {
out.writeByte((byte) '0'); out.writeByte((byte) '0');
out.writeByte(CR); out.writeByte(CR);
@ -150,10 +158,10 @@ public abstract class HttpMessageEncoder extends MessageToByteEncoder<Object> {
private static void encodeHeader(ByteBuf buf, String header, String value) private static void encodeHeader(ByteBuf buf, String header, String value)
throws UnsupportedEncodingException { throws UnsupportedEncodingException {
buf.writeBytes(header.getBytes("ASCII")); buf.writeBytes(header.getBytes(CharsetUtil.US_ASCII));
buf.writeByte(COLON); buf.writeByte(COLON);
buf.writeByte(SP); buf.writeByte(SP);
buf.writeBytes(value.getBytes("ASCII")); buf.writeBytes(value.getBytes(CharsetUtil.US_ASCII));
buf.writeByte(CR); buf.writeByte(CR);
buf.writeByte(LF); buf.writeByte(LF);
} }

View File

@ -17,6 +17,7 @@ package io.netty.handler.codec.http;
import static io.netty.handler.codec.http.HttpConstants.*; import static io.netty.handler.codec.http.HttpConstants.*;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
/** /**
* Encodes an {@link HttpRequest} or an {@link HttpChunk} into * Encodes an {@link HttpRequest} or an {@link HttpChunk} into
@ -33,11 +34,11 @@ public class HttpRequestEncoder extends HttpMessageEncoder {
@Override @Override
protected void encodeInitialLine(ByteBuf buf, HttpMessage message) throws Exception { protected void encodeInitialLine(ByteBuf buf, HttpMessage message) throws Exception {
HttpRequest request = (HttpRequest) message; HttpRequest request = (HttpRequest) message;
buf.writeBytes(request.getMethod().toString().getBytes("ASCII")); buf.writeBytes(request.getMethod().toString().getBytes(CharsetUtil.US_ASCII));
buf.writeByte(SP); buf.writeByte(SP);
buf.writeBytes(request.getUri().getBytes("ASCII")); buf.writeBytes(request.getUri().getBytes(CharsetUtil.UTF_8));
buf.writeByte(SP); buf.writeByte(SP);
buf.writeBytes(request.getProtocolVersion().toString().getBytes("ASCII")); buf.writeBytes(request.getProtocolVersion().toString().getBytes(CharsetUtil.US_ASCII));
buf.writeByte(CR); buf.writeByte(CR);
buf.writeByte(LF); buf.writeByte(LF);
} }

View File

@ -17,6 +17,7 @@ package io.netty.handler.codec.http;
import static io.netty.handler.codec.http.HttpConstants.*; import static io.netty.handler.codec.http.HttpConstants.*;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
/** /**
* Encodes an {@link HttpResponse} or an {@link HttpChunk} into * Encodes an {@link HttpResponse} or an {@link HttpChunk} into
@ -33,11 +34,11 @@ public class HttpResponseEncoder extends HttpMessageEncoder {
@Override @Override
protected void encodeInitialLine(ByteBuf buf, HttpMessage message) throws Exception { protected void encodeInitialLine(ByteBuf buf, HttpMessage message) throws Exception {
HttpResponse response = (HttpResponse) message; HttpResponse response = (HttpResponse) message;
buf.writeBytes(response.getProtocolVersion().toString().getBytes("ASCII")); buf.writeBytes(response.getProtocolVersion().toString().getBytes(CharsetUtil.US_ASCII));
buf.writeByte(SP); buf.writeByte(SP);
buf.writeBytes(String.valueOf(response.getStatus().getCode()).getBytes("ASCII")); buf.writeBytes(String.valueOf(response.getStatus().getCode()).getBytes(CharsetUtil.US_ASCII));
buf.writeByte(SP); buf.writeByte(SP);
buf.writeBytes(String.valueOf(response.getStatus().getReasonPhrase()).getBytes("ASCII")); buf.writeBytes(String.valueOf(response.getStatus().getReasonPhrase()).getBytes(CharsetUtil.US_ASCII));
buf.writeByte(CR); buf.writeByte(CR);
buf.writeByte(LF); buf.writeByte(LF);
} }

View File

@ -18,6 +18,7 @@ package io.netty.handler.codec.rtsp;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpMessage; import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpRequest;
import io.netty.util.CharsetUtil;
/** /**
* Encodes an RTSP request represented in {@link HttpRequest} into * Encodes an RTSP request represented in {@link HttpRequest} into
@ -30,11 +31,11 @@ public class RtspRequestEncoder extends RtspMessageEncoder {
protected void encodeInitialLine(ByteBuf buf, HttpMessage message) protected void encodeInitialLine(ByteBuf buf, HttpMessage message)
throws Exception { throws Exception {
HttpRequest request = (HttpRequest) message; HttpRequest request = (HttpRequest) message;
buf.writeBytes(request.getMethod().toString().getBytes("ASCII")); buf.writeBytes(request.getMethod().toString().getBytes(CharsetUtil.US_ASCII));
buf.writeByte((byte) ' '); buf.writeByte((byte) ' ');
buf.writeBytes(request.getUri().getBytes("ASCII")); buf.writeBytes(request.getUri().getBytes(CharsetUtil.UTF_8));
buf.writeByte((byte) ' '); buf.writeByte((byte) ' ');
buf.writeBytes(request.getProtocolVersion().toString().getBytes("ASCII")); buf.writeBytes(request.getProtocolVersion().toString().getBytes(CharsetUtil.US_ASCII));
buf.writeByte((byte) '\r'); buf.writeByte((byte) '\r');
buf.writeByte((byte) '\n'); buf.writeByte((byte) '\n');
} }

View File

@ -18,6 +18,7 @@ package io.netty.handler.codec.rtsp;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpMessage; import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponse;
import io.netty.util.CharsetUtil;
/** /**
* Encodes an RTSP response represented in {@link HttpResponse} into * Encodes an RTSP response represented in {@link HttpResponse} into
@ -30,11 +31,11 @@ public class RtspResponseEncoder extends RtspMessageEncoder {
protected void encodeInitialLine(ByteBuf buf, HttpMessage message) protected void encodeInitialLine(ByteBuf buf, HttpMessage message)
throws Exception { throws Exception {
HttpResponse response = (HttpResponse) message; HttpResponse response = (HttpResponse) message;
buf.writeBytes(response.getProtocolVersion().toString().getBytes("ASCII")); buf.writeBytes(response.getProtocolVersion().toString().getBytes(CharsetUtil.US_ASCII));
buf.writeByte((byte) ' '); buf.writeByte((byte) ' ');
buf.writeBytes(String.valueOf(response.getStatus().getCode()).getBytes("ASCII")); buf.writeBytes(String.valueOf(response.getStatus().getCode()).getBytes(CharsetUtil.US_ASCII));
buf.writeByte((byte) ' '); buf.writeByte((byte) ' ');
buf.writeBytes(String.valueOf(response.getStatus().getReasonPhrase()).getBytes("ASCII")); buf.writeBytes(String.valueOf(response.getStatus().getReasonPhrase()).getBytes(CharsetUtil.US_ASCII));
buf.writeByte((byte) '\r'); buf.writeByte((byte) '\r');
buf.writeByte((byte) '\n'); buf.writeByte((byte) '\n');
} }

View File

@ -26,7 +26,6 @@ public class DefaultSpdyDataFrame implements SpdyDataFrame {
private int streamId; private int streamId;
private boolean last; private boolean last;
private boolean compressed;
private ByteBuf data = Unpooled.EMPTY_BUFFER; private ByteBuf data = Unpooled.EMPTY_BUFFER;
/** /**
@ -62,18 +61,6 @@ public class DefaultSpdyDataFrame implements SpdyDataFrame {
this.last = last; this.last = last;
} }
@Override
@Deprecated
public boolean isCompressed() {
return compressed;
}
@Override
@Deprecated
public void setCompressed(boolean compressed) {
this.compressed = compressed;
}
@Override @Override
public ByteBuf getData() { public ByteBuf getData() {
return data; return data;
@ -97,8 +84,6 @@ public class DefaultSpdyDataFrame implements SpdyDataFrame {
buf.append(getClass().getSimpleName()); buf.append(getClass().getSimpleName());
buf.append("(last: "); buf.append("(last: ");
buf.append(isLast()); buf.append(isLast());
buf.append("; compressed: ");
buf.append(isCompressed());
buf.append(')'); buf.append(')');
buf.append(StringUtil.NEWLINE); buf.append(StringUtil.NEWLINE);
buf.append("--> Stream-ID = "); buf.append("--> Stream-ID = ");

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
final class SpdyCodecUtil { final class SpdyCodecUtil {
@ -251,7 +252,7 @@ final class SpdyCodecUtil {
byte[] SPDY2_DICT_; byte[] SPDY2_DICT_;
try { try {
SPDY2_DICT_ = SPDY2_DICT_S.getBytes("US-ASCII"); SPDY2_DICT_ = SPDY2_DICT_S.getBytes(CharsetUtil.US_ASCII);
// dictionary is null terminated // dictionary is null terminated
SPDY2_DICT_[SPDY2_DICT_.length - 1] = (byte) 0; SPDY2_DICT_[SPDY2_DICT_.length - 1] = (byte) 0;
} catch (Exception e) { } catch (Exception e) {

View File

@ -0,0 +1,23 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.spdy;
/**
* A SPDY Protocol Control Frame
*/
public interface SpdyControlFrame {
// Tag interface
}

View File

@ -44,18 +44,6 @@ public interface SpdyDataFrame {
*/ */
void setLast(boolean last); void setLast(boolean last);
/**
* @deprecated Removed from SPDY specification.
*/
@Deprecated
boolean isCompressed();
/**
* @deprecated Removed from SPDY specification.
*/
@Deprecated
void setCompressed(boolean compressed);
/** /**
* Returns the data payload of this frame. If there is no data payload * Returns the data payload of this frame. If there is no data payload
* {@link Unpooled#EMPTY_BUFFER} is returned. * {@link Unpooled#EMPTY_BUFFER} is returned.

View File

@ -23,6 +23,7 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder; import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.codec.UnsupportedMessageTypeException; import io.netty.handler.codec.UnsupportedMessageTypeException;
import io.netty.util.CharsetUtil;
import java.util.Set; import java.util.Set;
@ -75,17 +76,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder<Object> {
@Override @Override
public boolean isEncodable(Object msg) throws Exception { public boolean isEncodable(Object msg) throws Exception {
// FIXME: Introduce supertype return msg instanceof SpdyDataFrame || msg instanceof SpdyControlFrame;
return msg instanceof SpdyDataFrame ||
msg instanceof SpdySynStreamFrame ||
msg instanceof SpdySynReplyFrame ||
msg instanceof SpdyRstStreamFrame ||
msg instanceof SpdySettingsFrame ||
msg instanceof SpdyNoOpFrame ||
msg instanceof SpdyPingFrame ||
msg instanceof SpdyGoAwayFrame ||
msg instanceof SpdyHeadersFrame ||
msg instanceof SpdyWindowUpdateFrame;
} }
@Override @Override
@ -312,14 +303,14 @@ public class SpdyFrameEncoder extends MessageToByteEncoder<Object> {
ByteBuf headerBlock = Unpooled.buffer(); ByteBuf headerBlock = Unpooled.buffer();
writeLengthField(version, headerBlock, numHeaders); writeLengthField(version, headerBlock, numHeaders);
for (String name: names) { for (String name: names) {
byte[] nameBytes = name.getBytes("UTF-8"); byte[] nameBytes = name.getBytes(CharsetUtil.UTF_8);
writeLengthField(version, headerBlock, nameBytes.length); writeLengthField(version, headerBlock, nameBytes.length);
headerBlock.writeBytes(nameBytes); headerBlock.writeBytes(nameBytes);
int savedIndex = headerBlock.writerIndex(); int savedIndex = headerBlock.writerIndex();
int valueLength = 0; int valueLength = 0;
writeLengthField(version, headerBlock, valueLength); writeLengthField(version, headerBlock, valueLength);
for (String value: headerFrame.getHeaders(name)) { for (String value: headerFrame.getHeaders(name)) {
byte[] valueBytes = value.getBytes("UTF-8"); byte[] valueBytes = value.getBytes(CharsetUtil.UTF_8);
headerBlock.writeBytes(valueBytes); headerBlock.writeBytes(valueBytes);
headerBlock.writeByte(0); headerBlock.writeByte(0);
valueLength += valueBytes.length + 1; valueLength += valueBytes.length + 1;

View File

@ -18,7 +18,7 @@ package io.netty.handler.codec.spdy;
/** /**
* A SPDY Protocol GOAWAY Control Frame * A SPDY Protocol GOAWAY Control Frame
*/ */
public interface SpdyGoAwayFrame { public interface SpdyGoAwayFrame extends SpdyControlFrame {
/** /**
* Returns the Last-good-stream-ID of this frame. * Returns the Last-good-stream-ID of this frame.

View File

@ -18,7 +18,7 @@ package io.netty.handler.codec.spdy;
/** /**
* A SPDY Protocol HEADERS Control Frame * A SPDY Protocol HEADERS Control Frame
*/ */
public interface SpdyHeadersFrame extends SpdyHeaderBlock { public interface SpdyHeadersFrame extends SpdyHeaderBlock, SpdyControlFrame {
/** /**
* Returns the Stream-ID of this frame. * Returns the Stream-ID of this frame.

View File

@ -18,6 +18,6 @@ package io.netty.handler.codec.spdy;
/** /**
* A SPDY Protocol NOOP Control Frame * A SPDY Protocol NOOP Control Frame
*/ */
public interface SpdyNoOpFrame { public interface SpdyNoOpFrame extends SpdyControlFrame {
// Tag interface // Tag interface
} }

View File

@ -18,7 +18,7 @@ package io.netty.handler.codec.spdy;
/** /**
* A SPDY Protocol PING Control Frame * A SPDY Protocol PING Control Frame
*/ */
public interface SpdyPingFrame { public interface SpdyPingFrame extends SpdyControlFrame {
/** /**
* Returns the ID of this frame. * Returns the ID of this frame.

View File

@ -18,7 +18,7 @@ package io.netty.handler.codec.spdy;
/** /**
* A SPDY Protocol RST_STREAM Control Frame * A SPDY Protocol RST_STREAM Control Frame
*/ */
public interface SpdyRstStreamFrame { public interface SpdyRstStreamFrame extends SpdyControlFrame {
/** /**
* Returns the Stream-ID of this frame. * Returns the Stream-ID of this frame.

View File

@ -20,7 +20,7 @@ import java.util.Set;
/** /**
* A SPDY Protocol SETTINGS Control Frame * A SPDY Protocol SETTINGS Control Frame
*/ */
public interface SpdySettingsFrame { public interface SpdySettingsFrame extends SpdyControlFrame {
int SETTINGS_UPLOAD_BANDWIDTH = 1; int SETTINGS_UPLOAD_BANDWIDTH = 1;
int SETTINGS_DOWNLOAD_BANDWIDTH = 2; int SETTINGS_DOWNLOAD_BANDWIDTH = 2;

View File

@ -18,7 +18,7 @@ package io.netty.handler.codec.spdy;
/** /**
* A SPDY Protocol SYN_REPLY Control Frame * A SPDY Protocol SYN_REPLY Control Frame
*/ */
public interface SpdySynReplyFrame extends SpdyHeaderBlock { public interface SpdySynReplyFrame extends SpdyHeaderBlock, SpdyControlFrame {
/** /**
* Returns the Stream-ID of this frame. * Returns the Stream-ID of this frame.

View File

@ -18,7 +18,7 @@ package io.netty.handler.codec.spdy;
/** /**
* A SPDY Protocol SYN_STREAM Control Frame * A SPDY Protocol SYN_STREAM Control Frame
*/ */
public interface SpdySynStreamFrame extends SpdyHeaderBlock { public interface SpdySynStreamFrame extends SpdyHeaderBlock, SpdyControlFrame {
/** /**
* Returns the Stream-ID of this frame. * Returns the Stream-ID of this frame.

View File

@ -18,7 +18,7 @@ package io.netty.handler.codec.spdy;
/** /**
* A SPDY Protocol WINDOW_UPDATE Control Frame * A SPDY Protocol WINDOW_UPDATE Control Frame
*/ */
public interface SpdyWindowUpdateFrame { public interface SpdyWindowUpdateFrame extends SpdyControlFrame {
/** /**
* Returns the Stream-ID of this frame. * Returns the Stream-ID of this frame.

View File

@ -21,7 +21,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.util.Signal; import io.netty.util.internal.Signal;
/** /**
* A specialized variation of {@link ByteToMessageDecoder} which enables implementation * A specialized variation of {@link ByteToMessageDecoder} which enables implementation

View File

@ -20,7 +20,7 @@ import io.netty.buffer.ByteBufIndexFinder;
import io.netty.buffer.ChannelBufType; import io.netty.buffer.ChannelBufType;
import io.netty.buffer.SwappedByteBuf; import io.netty.buffer.SwappedByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.util.Signal; import io.netty.util.internal.Signal;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -672,6 +672,22 @@ class ReplayingDecoderBuffer implements ByteBuf {
return buffer.nioBuffer(index, length); return buffer.nioBuffer(index, length);
} }
@Override
public boolean hasNioBuffers() {
return buffer.hasNioBuffers();
}
@Override
public ByteBuffer[] nioBuffers() {
throw new UnreplayableOperationException();
}
@Override
public ByteBuffer[] nioBuffers(int index, int length) {
checkIndex(index, length);
return buffer.nioBuffers(index, length);
}
@Override @Override
public String toString(int index, int length, Charset charset) { public String toString(int index, int length, Charset charset) {
checkIndex(index, length); checkIndex(index, length);

View File

@ -20,7 +20,7 @@ import static org.junit.Assert.assertTrue;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import io.netty.util.Signal; import io.netty.util.internal.Signal;
import org.junit.Test; import org.junit.Test;

View File

@ -18,6 +18,9 @@ package io.netty.util;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/**
* Defines a name that must be unique in the map that is provided during construction.
*/
public class UniqueName implements Comparable<UniqueName> { public class UniqueName implements Comparable<UniqueName> {
private static final AtomicInteger nextId = new AtomicInteger(); private static final AtomicInteger nextId = new AtomicInteger();
@ -25,6 +28,13 @@ public class UniqueName implements Comparable<UniqueName> {
private final int id; private final int id;
private final String name; private final String name;
/**
* Constructs a new {@link UniqueName}
*
* @param map the map of names to compare with
* @param name the name of this {@link UniqueName}
* @param args the arguments to process
*/
public UniqueName(ConcurrentMap<String, Boolean> map, String name, Object... args) { public UniqueName(ConcurrentMap<String, Boolean> map, String name, Object... args) {
if (map == null) { if (map == null) {
throw new NullPointerException("map"); throw new NullPointerException("map");
@ -37,7 +47,7 @@ public class UniqueName implements Comparable<UniqueName> {
} }
if (map.putIfAbsent(name, Boolean.TRUE) != null) { if (map.putIfAbsent(name, Boolean.TRUE) != null) {
throw new IllegalArgumentException(String.format("'%s' already in use", name)); throw new IllegalArgumentException(String.format("'%s' is already in use", name));
} }
id = nextId.incrementAndGet(); id = nextId.incrementAndGet();
@ -45,16 +55,33 @@ public class UniqueName implements Comparable<UniqueName> {
} }
/** /**
* Validates the given arguments. This method does not do anything on its own, but must be
* overridden by its subclasses.
*
* @param args arguments to validate * @param args arguments to validate
*/ */
protected void validateArgs(Object... args) { protected void validateArgs(Object... args) {
// Subclasses will override. // Subclasses will override.
} }
/**
* Returns this {@link UniqueName}'s name
*
* @return the name
*/
public final String name() { public final String name() {
return name; return name;
} }
/**
* Returns this {@link UniqueName}'s ID
*
* @return the id
*/
public final int id() {
return id;
}
@Override @Override
public final int hashCode() { public final int hashCode() {
return super.hashCode(); return super.hashCode();
@ -66,19 +93,19 @@ public class UniqueName implements Comparable<UniqueName> {
} }
@Override @Override
public int compareTo(UniqueName o) { public int compareTo(UniqueName other) {
if (this == o) { if (this == other) {
return 0; return 0;
} }
int ret = name.compareTo(o.name); int returnCode = name.compareTo(other.name);
if (ret != 0) { if (returnCode != 0) {
return ret; return returnCode;
} }
if (id < o.id) { if (id < other.id) {
return -1; return -1;
} else if (id > o.id) { } else if (id > other.id) {
return 1; return 1;
} else { } else {
return 0; return 0;

View File

@ -13,9 +13,11 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package io.netty.util; package io.netty.util.internal;
import io.netty.util.UniqueName;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;

View File

@ -0,0 +1,87 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotSame;
import org.junit.Before;
import org.junit.Test;
public class UniqueNameTest {
/**
* A {@link ConcurrentHashMap} of registered names.
* This is set up before each test
*/
private ConcurrentHashMap<String, Boolean> names;
/**
* Registers a {@link UniqueName}
*
* @param name the name being registered
* @return the unique name
*/
public UniqueName registerName(String name) {
return new UniqueName(names, name);
}
@Before
public void initializeTest() {
this.names = new ConcurrentHashMap<String, Boolean>();
}
@Test
public void testRegisteringName() {
registerName("Abcedrian");
assertTrue(this.names.get("Abcedrian"));
assertTrue(this.names.get("Hellyes") == null);
}
@Test
public void testNameUniqueness() {
registerName("Leroy");
boolean failed = false;
try {
registerName("Leroy");
} catch (IllegalArgumentException ex) {
failed = true;
}
assertTrue(failed);
}
@Test
public void testIDUniqueness() {
UniqueName one = registerName("one");
UniqueName two = registerName("two");
assertNotSame(one.id(), two.id());
ArrayList<UniqueName> nameList = new ArrayList<UniqueName>();
for (int index = 0; index < 2500; index++) {
UniqueName currentName = registerName("test" + index);
nameList.add(currentName);
for (UniqueName otherName : nameList) {
if (!currentName.name().equals(otherName.name())) {
assertNotSame(currentName.id(), otherName.name());
}
}
}
}
}

View File

@ -57,7 +57,7 @@ public class ObjectEchoClientHandler extends ChannelInboundMessageHandlerAdapter
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, List<Integer> msg) throws Exception { public void messageReceived(ChannelHandlerContext ctx, List<Integer> msg) throws Exception {
// Echo back the received object to the client. // Echo back the received object to the server.
ctx.write(msg); ctx.write(msg);
} }

View File

@ -283,6 +283,7 @@
<ignore>java.nio.channels.AsynchronousServerSocketChannel</ignore> <ignore>java.nio.channels.AsynchronousServerSocketChannel</ignore>
<ignore>java.nio.channels.AsynchronousChannelGroup</ignore> <ignore>java.nio.channels.AsynchronousChannelGroup</ignore>
<ignore>java.nio.channels.NetworkChannel</ignore> <ignore>java.nio.channels.NetworkChannel</ignore>
<ignore>java.nio.channels.InterruptedByTimeoutException</ignore>
</ignores> </ignores>
</configuration> </configuration>
<executions> <executions>

View File

@ -54,10 +54,11 @@ final class SocketTestPermutation {
sbfs.add(new Factory<ServerBootstrap>() { sbfs.add(new Factory<ServerBootstrap>() {
@Override @Override
public ServerBootstrap newInstance() { public ServerBootstrap newInstance() {
AioEventLoopGroup loop = new AioEventLoopGroup(); AioEventLoopGroup parentGroup = new AioEventLoopGroup();
AioEventLoopGroup childGroup = new AioEventLoopGroup();
return new ServerBootstrap(). return new ServerBootstrap().
group(loop). group(parentGroup, childGroup).
channel(new AioServerSocketChannel(loop)); channel(new AioServerSocketChannel(parentGroup, childGroup));
} }
}); });
sbfs.add(new Factory<ServerBootstrap>() { sbfs.add(new Factory<ServerBootstrap>() {

View File

@ -94,6 +94,11 @@ public class ChannelOption<T> extends UniqueName {
public static final ChannelOption<SocketAddress> SCTP_SET_PEER_PRIMARY_ADDR = public static final ChannelOption<SocketAddress> SCTP_SET_PEER_PRIMARY_ADDR =
new ChannelOption<SocketAddress>("SCTP_SET_PEER_PRIMARY_ADDR"); new ChannelOption<SocketAddress>("SCTP_SET_PEER_PRIMARY_ADDR");
public static final ChannelOption<Long> AIO_READ_TIMEOUT =
new ChannelOption<Long>("AIO_READ_TIMEOUT");
public static final ChannelOption<Long> AIO_WRITE_TIMEOUT =
new ChannelOption<Long>("AIO_WRITE_TIMEOUT");
public ChannelOption(String name) { public ChannelOption(String name) {
super(names, name); super(names, name);
} }

View File

@ -19,8 +19,9 @@ import java.util.concurrent.ThreadFactory;
class DefaultEventExecutor extends SingleThreadEventExecutor { class DefaultEventExecutor extends SingleThreadEventExecutor {
DefaultEventExecutor(DefaultEventExecutorGroup parent, ThreadFactory threadFactory) { DefaultEventExecutor(
super(parent, threadFactory); DefaultEventExecutorGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) {
super(parent, threadFactory, scheduler);
} }
@Override @Override

View File

@ -28,7 +28,8 @@ public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup {
} }
@Override @Override
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { protected EventExecutor newChild(
return new DefaultEventExecutor(this, threadFactory); ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception {
return new DefaultEventExecutor(this, threadFactory, scheduler);
} }
} }

View File

@ -24,6 +24,7 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
private static final int DEFAULT_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2; private static final int DEFAULT_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
private static final AtomicInteger poolId = new AtomicInteger(); private static final AtomicInteger poolId = new AtomicInteger();
final TaskScheduler scheduler;
private final EventExecutor[] children; private final EventExecutor[] children;
private final AtomicInteger childIndex = new AtomicInteger(); private final AtomicInteger childIndex = new AtomicInteger();
@ -40,11 +41,13 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
threadFactory = new DefaultThreadFactory(); threadFactory = new DefaultThreadFactory();
} }
scheduler = new TaskScheduler(threadFactory);
children = new SingleThreadEventExecutor[nThreads]; children = new SingleThreadEventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) { for (int i = 0; i < nThreads; i ++) {
boolean success = false; boolean success = false;
try { try {
children[i] = newChild(threadFactory, args); children[i] = newChild(threadFactory, scheduler, args);
success = true; success = true;
} catch (Exception e) { } catch (Exception e) {
throw new EventLoopException("failed to create a child event loop", e); throw new EventLoopException("failed to create a child event loop", e);
@ -63,10 +66,12 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
return children[Math.abs(childIndex.getAndIncrement() % children.length)]; return children[Math.abs(childIndex.getAndIncrement() % children.length)];
} }
protected abstract EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception; protected abstract EventExecutor newChild(
ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception;
@Override @Override
public void shutdown() { public void shutdown() {
scheduler.shutdown();
for (EventExecutor l: children) { for (EventExecutor l: children) {
l.shutdown(); l.shutdown();
} }
@ -74,6 +79,9 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
@Override @Override
public boolean isShutdown() { public boolean isShutdown() {
if (!scheduler.isShutdown()) {
return false;
}
for (EventExecutor l: children) { for (EventExecutor l: children) {
if (!l.isShutdown()) { if (!l.isShutdown()) {
return false; return false;
@ -84,6 +92,9 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
@Override @Override
public boolean isTerminated() { public boolean isTerminated() {
if (!scheduler.isTerminated()) {
return false;
}
for (EventExecutor l: children) { for (EventExecutor l: children) {
if (!l.isTerminated()) { if (!l.isTerminated()) {
return false; return false;
@ -96,6 +107,15 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
public boolean awaitTermination(long timeout, TimeUnit unit) public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException { throws InterruptedException {
long deadline = System.nanoTime() + unit.toNanos(timeout); long deadline = System.nanoTime() + unit.toNanos(timeout);
for (;;) {
long timeLeft = deadline - System.nanoTime();
if (timeLeft <= 0) {
return isTerminated();
}
if (scheduler.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
loop: for (EventExecutor l: children) { loop: for (EventExecutor l: children) {
for (;;) { for (;;) {
long timeLeft = deadline - System.nanoTime(); long timeLeft = deadline - System.nanoTime();

View File

@ -20,7 +20,6 @@ import io.netty.logging.InternalLoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
@ -28,27 +27,18 @@ import java.util.Set;
import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public abstract class SingleThreadEventExecutor extends AbstractExecutorService implements EventExecutor { public abstract class SingleThreadEventExecutor extends AbstractExecutorService implements EventExecutor {
private static final InternalLogger logger = private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class); InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
private static final long SCHEDULE_CHECK_INTERVAL = TimeUnit.MILLISECONDS.toNanos(10);
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
private static final long START_TIME = System.nanoTime();
private static final AtomicLong nextTaskId = new AtomicLong();
static final ThreadLocal<SingleThreadEventExecutor> CURRENT_EVENT_LOOP = static final ThreadLocal<SingleThreadEventExecutor> CURRENT_EVENT_LOOP =
new ThreadLocal<SingleThreadEventExecutor>(); new ThreadLocal<SingleThreadEventExecutor>();
@ -56,33 +46,27 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
return CURRENT_EVENT_LOOP.get(); return CURRENT_EVENT_LOOP.get();
} }
private static long nanoTime() {
return System.nanoTime() - START_TIME;
}
private static long deadlineNanos(long delay) {
return nanoTime() + delay;
}
private final EventExecutorGroup parent; private final EventExecutorGroup parent;
private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>(); private final Queue<Runnable> taskQueue;
private final Thread thread; private final Thread thread;
private final Object stateLock = new Object(); private final Object stateLock = new Object();
private final Semaphore threadLock = new Semaphore(0); private final Semaphore threadLock = new Semaphore(0);
// TODO: Use PriorityQueue to reduce the locking overhead of DelayQueue. private final TaskScheduler scheduler;
private final Queue<ScheduledFutureTask<?>> scheduledTasks = new DelayQueue<ScheduledFutureTask<?>>();
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>(); private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
/** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */ /** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */
private volatile int state; private volatile int state;
private long lastCheckTimeNanos;
private long lastPurgeTimeNanos;
protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory) { protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) {
if (threadFactory == null) { if (threadFactory == null) {
throw new NullPointerException("threadFactory"); throw new NullPointerException("threadFactory");
} }
if (scheduler == null) {
throw new NullPointerException("scheduler");
}
this.parent = parent; this.parent = parent;
this.scheduler = scheduler;
thread = threadFactory.newThread(new Runnable() { thread = threadFactory.newThread(new Runnable() {
@Override @Override
@ -115,7 +99,6 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
private void cleanupTasks() { private void cleanupTasks() {
for (;;) { for (;;) {
boolean ran = false; boolean ran = false;
cancelScheduledTasks();
ran |= runAllTasks(); ran |= runAllTasks();
ran |= runShutdownHooks(); ran |= runShutdownHooks();
if (!ran && !hasTasks()) { if (!ran && !hasTasks()) {
@ -124,6 +107,12 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
} }
} }
}); });
taskQueue = newTaskQueue();
}
protected Queue<Runnable> newTaskQueue() {
return new LinkedBlockingQueue<Runnable>();
} }
@Override @Override
@ -142,65 +131,26 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
protected Runnable pollTask() { protected Runnable pollTask() {
assert inEventLoop(); assert inEventLoop();
return taskQueue.poll();
Runnable task = taskQueue.poll();
if (task != null) {
return task;
}
if (fetchScheduledTasks()) {
task = taskQueue.poll();
return task;
}
return null;
} }
protected Runnable takeTask() throws InterruptedException { protected Runnable takeTask() throws InterruptedException {
assert inEventLoop(); assert inEventLoop();
if (taskQueue instanceof BlockingQueue) {
for (;;) { return ((BlockingQueue<Runnable>) taskQueue).take();
Runnable task = taskQueue.poll(SCHEDULE_CHECK_INTERVAL * 2 / 3, TimeUnit.NANOSECONDS); } else {
if (task != null) { throw new UnsupportedOperationException();
return task;
}
fetchScheduledTasks();
task = taskQueue.poll();
if (task != null) {
return task;
}
} }
} }
protected Runnable peekTask() { protected Runnable peekTask() {
assert inEventLoop(); assert inEventLoop();
return taskQueue.peek();
Runnable task = taskQueue.peek();
if (task != null) {
return task;
}
if (fetchScheduledTasks()) {
task = taskQueue.peek();
return task;
}
return null;
} }
protected boolean hasTasks() { protected boolean hasTasks() {
assert inEventLoop(); assert inEventLoop();
return !taskQueue.isEmpty();
boolean empty = taskQueue.isEmpty();
if (!empty) {
return true;
}
if (fetchScheduledTasks()) {
return !taskQueue.isEmpty();
}
return false;
} }
protected void addTask(Runnable task) { protected void addTask(Runnable task) {
@ -397,228 +347,21 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
@Override @Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (command == null) { return scheduler.schedule(this, command, delay, unit);
throw new NullPointerException("command");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (delay < 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: >= 0)", delay));
}
return schedule(new ScheduledFutureTask<Void>(command, null, deadlineNanos(unit.toNanos(delay))));
} }
@Override @Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
if (callable == null) { return scheduler.schedule(this, callable, delay, unit);
throw new NullPointerException("callable");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (delay < 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: >= 0)", delay));
}
return schedule(new ScheduledFutureTask<V>(callable, deadlineNanos(unit.toNanos(delay))));
} }
@Override @Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
if (command == null) { return scheduler.scheduleAtFixedRate(this, command, initialDelay, period, unit);
throw new NullPointerException("command");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (initialDelay < 0) {
throw new IllegalArgumentException(
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
}
if (period <= 0) {
throw new IllegalArgumentException(
String.format("period: %d (expected: > 0)", period));
}
return schedule(new ScheduledFutureTask<Void>(
command, null, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
} }
@Override @Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
if (command == null) { return scheduler.scheduleWithFixedDelay(this, command, initialDelay, delay, unit);
throw new NullPointerException("command");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (initialDelay < 0) {
throw new IllegalArgumentException(
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
}
if (delay <= 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: > 0)", delay));
}
return schedule(new ScheduledFutureTask<Void>(
command, null, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
}
private <V> ScheduledFuture<V> schedule(ScheduledFutureTask<V> task) {
if (isShutdown()) {
reject();
}
scheduledTasks.add(task);
if (isShutdown()) {
task.cancel(false);
}
if (!inEventLoop()) {
synchronized (stateLock) {
if (state == 0) {
state = 1;
thread.start();
}
}
} else {
fetchScheduledTasks();
}
return task;
}
private boolean fetchScheduledTasks() {
if (scheduledTasks.isEmpty()) {
return false;
}
long nanoTime = nanoTime();
if (nanoTime - lastPurgeTimeNanos >= SCHEDULE_PURGE_INTERVAL) {
for (Iterator<ScheduledFutureTask<?>> i = scheduledTasks.iterator(); i.hasNext();) {
ScheduledFutureTask<?> task = i.next();
if (task.isCancelled()) {
i.remove();
}
}
}
if (nanoTime - lastCheckTimeNanos >= SCHEDULE_CHECK_INTERVAL) {
boolean added = false;
for (;;) {
ScheduledFutureTask<?> task = scheduledTasks.poll();
if (task == null) {
break;
}
if (!task.isCancelled()) {
if (isShutdown()) {
task.cancel(false);
} else {
taskQueue.add(task);
added = true;
}
}
}
return added;
}
return false;
}
private void cancelScheduledTasks() {
if (scheduledTasks.isEmpty()) {
return;
}
for (ScheduledFutureTask<?> task: scheduledTasks.toArray(new ScheduledFutureTask<?>[scheduledTasks.size()])) {
task.cancel(false);
}
scheduledTasks.clear();
}
private class ScheduledFutureTask<V> extends FutureTask<V> implements ScheduledFuture<V> {
private final long id = nextTaskId.getAndIncrement();
private long deadlineNanos;
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
private final long periodNanos;
ScheduledFutureTask(Runnable runnable, V result, long nanoTime) {
super(runnable, result);
deadlineNanos = nanoTime;
periodNanos = 0;
}
ScheduledFutureTask(Runnable runnable, V result, long nanoTime, long period) {
super(runnable, result);
if (period == 0) {
throw new IllegalArgumentException(
String.format("period: %d (expected: != 0)", period));
}
deadlineNanos = nanoTime;
periodNanos = period;
}
ScheduledFutureTask(Callable<V> callable, long nanoTime) {
super(callable);
deadlineNanos = nanoTime;
periodNanos = 0;
}
public long deadlineNanos() {
return deadlineNanos;
}
public long delayNanos() {
return Math.max(0, deadlineNanos() - nanoTime());
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(delayNanos(), TimeUnit.NANOSECONDS);
}
@Override
public int compareTo(Delayed o) {
if (this == o) {
return 0;
}
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
long d = deadlineNanos() - that.deadlineNanos();
if (d < 0) {
return -1;
} else if (d > 0) {
return 1;
} else if (id < that.id) {
return -1;
} else if (id == that.id) {
throw new Error();
} else {
return 1;
}
}
@Override
public void run() {
if (periodNanos == 0) {
super.run();
} else {
boolean reset = runAndReset();
if (reset && !isShutdown()) {
long p = periodNanos;
if (p > 0) {
deadlineNanos += p;
} else {
deadlineNanos = nanoTime() - p;
}
schedule(this);
}
}
}
} }
} }

View File

@ -19,8 +19,9 @@ import java.util.concurrent.ThreadFactory;
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory) { protected SingleThreadEventLoop(
super(parent, threadFactory); EventLoopGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) {
super(parent, threadFactory, scheduler);
} }
@Override @Override

View File

@ -0,0 +1,426 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public final class TaskScheduler {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(TaskScheduler.class);
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
private static final long START_TIME = System.nanoTime();
private static final AtomicLong nextTaskId = new AtomicLong();
private static long nanoTime() {
return System.nanoTime() - START_TIME;
}
private static long deadlineNanos(long delay) {
return nanoTime() + delay;
}
private final BlockingQueue<ScheduledFutureTask<?>> taskQueue = new DelayQueue<ScheduledFutureTask<?>>();
private final Thread thread;
private final Object stateLock = new Object();
private final Semaphore threadLock = new Semaphore(0);
/** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */
private volatile int state;
public TaskScheduler(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
try {
for (;;) {
ScheduledFutureTask<?> task;
try {
task = taskQueue.take();
runTask(task);
} catch (InterruptedException e) {
// Waken up by interruptThread()
}
if (isShutdown() && taskQueue.peek() == null) {
break;
}
}
} finally {
try {
// Run all remaining tasks and shutdown hooks.
try {
cleanupTasks();
} finally {
synchronized (stateLock) {
state = 3;
}
}
cleanupTasks();
} finally {
threadLock.release();
assert taskQueue.isEmpty();
}
}
}
private void runTask(ScheduledFutureTask<?> task) {
EventExecutor executor = task.executor;
if (executor == null) {
task.run();
} else {
if (executor.isShutdown()) {
task.cancel(false);
} else {
try {
task.executor.execute(task);
} catch (RejectedExecutionException e) {
task.cancel(false);
}
}
}
}
private void cleanupTasks() {
for (;;) {
boolean ran = false;
cancelScheduledTasks();
for (;;) {
final ScheduledFutureTask<?> task = taskQueue.poll();
if (task == null) {
break;
}
try {
runTask(task);
ran = true;
} catch (Throwable t) {
logger.warn("A task raised an exception.", t);
}
}
if (!ran && taskQueue.isEmpty()) {
break;
}
}
}
});
}
private boolean inSameThread() {
return Thread.currentThread() == thread;
}
public void shutdown() {
boolean inSameThread = inSameThread();
boolean wakeup = false;
if (inSameThread) {
synchronized (stateLock) {
assert state == 1;
state = 2;
wakeup = true;
}
} else {
synchronized (stateLock) {
switch (state) {
case 0:
state = 3;
threadLock.release();
break;
case 1:
state = 2;
wakeup = true;
break;
}
}
}
if (wakeup && !inSameThread && isShutdown()) {
thread.interrupt();
}
}
public boolean isShutdown() {
return state >= 2;
}
public boolean isTerminated() {
return state == 3;
}
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
if (unit == null) {
throw new NullPointerException("unit");
}
if (inSameThread()) {
throw new IllegalStateException("cannot await termination of the current thread");
}
if (threadLock.tryAcquire(timeout, unit)) {
threadLock.release();
}
return isTerminated();
}
public ScheduledFuture<?> schedule(
EventExecutor executor, Runnable command, long delay, TimeUnit unit) {
if (executor == null) {
throw new NullPointerException("executor");
}
if (command == null) {
throw new NullPointerException("command");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (delay < 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: >= 0)", delay));
}
return schedule(new ScheduledFutureTask<Void>(executor, command, null, deadlineNanos(unit.toNanos(delay))));
}
public <V> ScheduledFuture<V> schedule(
EventExecutor executor, Callable<V> callable, long delay, TimeUnit unit) {
if (executor == null) {
throw new NullPointerException("executor");
}
if (callable == null) {
throw new NullPointerException("callable");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (delay < 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: >= 0)", delay));
}
return schedule(new ScheduledFutureTask<V>(executor, callable, deadlineNanos(unit.toNanos(delay))));
}
public ScheduledFuture<?> scheduleAtFixedRate(
EventExecutor executor, Runnable command, long initialDelay, long period, TimeUnit unit) {
if (executor == null) {
throw new NullPointerException("executor");
}
if (command == null) {
throw new NullPointerException("command");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (initialDelay < 0) {
throw new IllegalArgumentException(
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
}
if (period <= 0) {
throw new IllegalArgumentException(
String.format("period: %d (expected: > 0)", period));
}
return schedule(new ScheduledFutureTask<Void>(
executor, command, null, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
}
public ScheduledFuture<?> scheduleWithFixedDelay(
EventExecutor executor, Runnable command, long initialDelay, long delay, TimeUnit unit) {
if (executor == null) {
throw new NullPointerException("executor");
}
if (command == null) {
throw new NullPointerException("command");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (initialDelay < 0) {
throw new IllegalArgumentException(
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
}
if (delay <= 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: > 0)", delay));
}
return schedule(new ScheduledFutureTask<Void>(
executor, command, null, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
}
private <V> ScheduledFuture<V> schedule(ScheduledFutureTask<V> task) {
if (isShutdown()) {
reject();
}
taskQueue.add(task);
if (isShutdown()) {
task.cancel(false);
}
boolean started = false;
if (!inSameThread()) {
synchronized (stateLock) {
if (state == 0) {
state = 1;
thread.start();
started = true;
}
}
}
if (started) {
schedule(new ScheduledFutureTask<Void>(
null, new PurgeTask(), null,
deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
}
return task;
}
private static void reject() {
throw new RejectedExecutionException("event executor shut down");
}
private void cancelScheduledTasks() {
if (taskQueue.isEmpty()) {
return;
}
for (ScheduledFutureTask<?> task: taskQueue.toArray(new ScheduledFutureTask<?>[taskQueue.size()])) {
task.cancel(false);
}
taskQueue.clear();
}
private class ScheduledFutureTask<V> extends FutureTask<V> implements ScheduledFuture<V> {
private final EventExecutor executor;
private final long id = nextTaskId.getAndIncrement();
private long deadlineNanos;
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
private final long periodNanos;
ScheduledFutureTask(EventExecutor executor, Runnable runnable, V result, long nanoTime) {
super(runnable, result);
this.executor = executor;
deadlineNanos = nanoTime;
periodNanos = 0;
}
ScheduledFutureTask(EventExecutor executor, Runnable runnable, V result, long nanoTime, long period) {
super(runnable, result);
if (period == 0) {
throw new IllegalArgumentException(
String.format("period: %d (expected: != 0)", period));
}
this.executor = executor;
deadlineNanos = nanoTime;
periodNanos = period;
}
ScheduledFutureTask(EventExecutor executor, Callable<V> callable, long nanoTime) {
super(callable);
this.executor = executor;
deadlineNanos = nanoTime;
periodNanos = 0;
}
public long deadlineNanos() {
return deadlineNanos;
}
public long delayNanos() {
return Math.max(0, deadlineNanos() - nanoTime());
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(delayNanos(), TimeUnit.NANOSECONDS);
}
@Override
public int compareTo(Delayed o) {
if (this == o) {
return 0;
}
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
long d = deadlineNanos() - that.deadlineNanos();
if (d < 0) {
return -1;
} else if (d > 0) {
return 1;
} else if (id < that.id) {
return -1;
} else if (id == that.id) {
throw new Error();
} else {
return 1;
}
}
@Override
public void run() {
if (periodNanos == 0) {
super.run();
} else {
boolean reset = runAndReset();
if (reset && !isShutdown()) {
long p = periodNanos;
if (p > 0) {
deadlineNanos += p;
} else {
deadlineNanos = nanoTime() - p;
}
schedule(this);
}
}
}
}
private final class PurgeTask implements Runnable {
@Override
public void run() {
Iterator<ScheduledFutureTask<?>> i = taskQueue.iterator();
while (i.hasNext()) {
ScheduledFutureTask<?> task = i.next();
if (task.isCancelled()) {
i.remove();
}
}
}
}
}

View File

@ -16,13 +16,15 @@
package io.netty.channel.local; package io.netty.channel.local;
import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.TaskScheduler;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
final class LocalEventLoop extends SingleThreadEventLoop { final class LocalEventLoop extends SingleThreadEventLoop {
LocalEventLoop(LocalEventLoopGroup parent, ThreadFactory threadFactory) { LocalEventLoop(
super(parent, threadFactory); LocalEventLoopGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) {
super(parent, threadFactory, scheduler);
} }
@Override @Override

View File

@ -17,6 +17,7 @@ package io.netty.channel.local;
import io.netty.channel.EventExecutor; import io.netty.channel.EventExecutor;
import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.TaskScheduler;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
@ -35,7 +36,8 @@ public class LocalEventLoopGroup extends MultithreadEventLoopGroup {
} }
@Override @Override
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { protected EventExecutor newChild(
return new LocalEventLoop(this, threadFactory); ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception {
return new LocalEventLoop(this, threadFactory, scheduler);
} }
} }

View File

@ -68,7 +68,7 @@ abstract class AbstractAioChannel extends AbstractChannel {
@Override @Override
protected Runnable doRegister() throws Exception { protected Runnable doRegister() throws Exception {
if (((AioChildEventLoop) eventLoop()).parent() != group) { if (((AioEventLoop) eventLoop()).parent() != group) {
throw new ChannelException( throw new ChannelException(
getClass().getSimpleName() + " must be registered to the " + getClass().getSimpleName() + " must be registered to the " +
AioEventLoopGroup.class.getSimpleName() + " which was specified in the constructor."); AioEventLoopGroup.class.getSimpleName() + " which was specified in the constructor.");
@ -83,7 +83,7 @@ abstract class AbstractAioChannel extends AbstractChannel {
@Override @Override
protected boolean isCompatible(EventLoop loop) { protected boolean isCompatible(EventLoop loop) {
return loop instanceof AioChildEventLoop; return loop instanceof AioEventLoop;
} }
protected abstract class AbstractAioUnsafe extends AbstractUnsafe { protected abstract class AbstractAioUnsafe extends AbstractUnsafe {

View File

@ -16,13 +16,14 @@
package io.netty.channel.socket.aio; package io.netty.channel.socket.aio;
import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.TaskScheduler;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
final class AioChildEventLoop extends SingleThreadEventLoop { final class AioEventLoop extends SingleThreadEventLoop {
AioChildEventLoop(AioEventLoopGroup parent, ThreadFactory threadFactory) { AioEventLoop(AioEventLoopGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) {
super(parent, threadFactory); super(parent, threadFactory, scheduler);
} }
@Override @Override

View File

@ -18,6 +18,7 @@ package io.netty.channel.socket.aio;
import io.netty.channel.EventExecutor; import io.netty.channel.EventExecutor;
import io.netty.channel.EventLoopException; import io.netty.channel.EventLoopException;
import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.TaskScheduler;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field; import java.lang.reflect.Field;
@ -57,8 +58,9 @@ public class AioEventLoopGroup extends MultithreadEventLoopGroup {
} }
@Override @Override
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { protected EventExecutor newChild(
return new AioChildEventLoop(this, threadFactory); ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception {
return new AioEventLoop(this, threadFactory, scheduler);
} }
private void executeAioTask(Runnable command) { private void executeAioTask(Runnable command) {

View File

@ -40,9 +40,10 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
private static final InternalLogger logger = private static final InternalLogger logger =
InternalLoggerFactory.getInstance(AioServerSocketChannel.class); InternalLoggerFactory.getInstance(AioServerSocketChannel.class);
private final AioEventLoopGroup childGroup;
private final AioServerSocketChannelConfig config; private final AioServerSocketChannelConfig config;
private boolean closed; private boolean closed;
private AtomicBoolean readSuspended = new AtomicBoolean(); private final AtomicBoolean readSuspended = new AtomicBoolean();
private final Runnable acceptTask = new Runnable() { private final Runnable acceptTask = new Runnable() {
@ -60,8 +61,13 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
} }
} }
public AioServerSocketChannel(AioEventLoopGroup eventLoop) { public AioServerSocketChannel(AioEventLoopGroup group) {
super(null, null, eventLoop, newSocket(eventLoop.group)); this(group, group);
}
public AioServerSocketChannel(AioEventLoopGroup parentGroup, AioEventLoopGroup childGroup) {
super(null, null, parentGroup, newSocket(parentGroup.group));
this.childGroup = childGroup;
config = new AioServerSocketChannelConfig(javaChannel()); config = new AioServerSocketChannelConfig(javaChannel());
} }
@ -147,7 +153,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
// create the socket add it to the buffer and fire the event // create the socket add it to the buffer and fire the event
channel.pipeline().inboundMessageBuffer().add( channel.pipeline().inboundMessageBuffer().add(
new AioSocketChannel(channel, null, channel.group, ch)); new AioSocketChannel(channel, null, channel.childGroup, ch));
if (!channel.readSuspended.get()) { if (!channel.readSuspended.get()) {
channel.pipeline().fireInboundBufferUpdated(); channel.pipeline().fireInboundBufferUpdated();
} }

View File

@ -31,6 +31,8 @@ import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler; import java.nio.channels.CompletionHandler;
import java.nio.channels.InterruptedByTimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -39,8 +41,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
private static final ChannelMetadata METADATA = new ChannelMetadata(ChannelBufType.BYTE, false); private static final ChannelMetadata METADATA = new ChannelMetadata(ChannelBufType.BYTE, false);
private static final CompletionHandler<Void, AioSocketChannel> CONNECT_HANDLER = new ConnectHandler(); private static final CompletionHandler<Void, AioSocketChannel> CONNECT_HANDLER = new ConnectHandler();
private static final CompletionHandler<Integer, AioSocketChannel> WRITE_HANDLER = new WriteHandler(); private static final CompletionHandler<Integer, AioSocketChannel> WRITE_HANDLER = new WriteHandler<Integer>();
private static final CompletionHandler<Integer, AioSocketChannel> READ_HANDLER = new ReadHandler(); private static final CompletionHandler<Integer, AioSocketChannel> READ_HANDLER = new ReadHandler<Integer>();
private static final CompletionHandler<Long, AioSocketChannel> GATHERING_WRITE_HANDLER = new WriteHandler<Long>();
private static final CompletionHandler<Long, AioSocketChannel> SCATTERING_READ_HANDLER = new ReadHandler<Long>();
private static AsynchronousSocketChannel newSocket(AsynchronousChannelGroup group) { private static AsynchronousSocketChannel newSocket(AsynchronousChannelGroup group) {
try { try {
@ -180,7 +184,14 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
buf.discardReadBytes(); buf.discardReadBytes();
if (buf.readable()) { if (buf.readable()) {
javaChannel().write(buf.nioBuffer(), this, WRITE_HANDLER); if (buf.hasNioBuffers()) {
ByteBuffer[] buffers = buf.nioBuffers(buf.readerIndex(), buf.readableBytes());
javaChannel().write(buffers, 0, buffers.length, config.getReadTimeout(),
TimeUnit.MILLISECONDS, AioSocketChannel.this, GATHERING_WRITE_HANDLER);
} else {
javaChannel().write(buf.nioBuffer(), config.getReadTimeout(), TimeUnit.MILLISECONDS,
this, WRITE_HANDLER);
}
} else { } else {
notifyFlushFutures(); notifyFlushFutures();
flushing = false; flushing = false;
@ -204,17 +215,24 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
expandReadBuffer(byteBuf); expandReadBuffer(byteBuf);
} }
// Get a ByteBuffer view on the ByteBuf if (byteBuf.hasNioBuffers()) {
ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes()); ByteBuffer[] buffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.writableBytes());
javaChannel().read(buffer, AioSocketChannel.this, READ_HANDLER); javaChannel().read(buffers, 0, buffers.length, config.getWriteTimeout(),
TimeUnit.MILLISECONDS, AioSocketChannel.this, SCATTERING_READ_HANDLER);
} else {
// Get a ByteBuffer view on the ByteBuf
ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes());
javaChannel().read(buffer, config.getWriteTimeout(), TimeUnit.MILLISECONDS,
AioSocketChannel.this, READ_HANDLER);
}
} }
private static final class WriteHandler extends AioCompletionHandler<Integer, AioSocketChannel> { private static final class WriteHandler<T extends Number> extends AioCompletionHandler<T, AioSocketChannel> {
@Override @Override
protected void completed0(Integer result, AioSocketChannel channel) { protected void completed0(T result, AioSocketChannel channel) {
ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer(); ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer();
int writtenBytes = result; int writtenBytes = result.intValue();
if (writtenBytes > 0) { if (writtenBytes > 0) {
// Update the readerIndex with the amount of read bytes // Update the readerIndex with the amount of read bytes
buf.readerIndex(buf.readerIndex() + writtenBytes); buf.readerIndex(buf.readerIndex() + writtenBytes);
@ -253,6 +271,15 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
channel.notifyFlushFutures(cause); channel.notifyFlushFutures(cause);
channel.pipeline().fireExceptionCaught(cause); channel.pipeline().fireExceptionCaught(cause);
// Check if the exception was raised because of an InterruptedByTimeoutException which means that the
// write timeout was hit. In that case we should close the channel as it may be unusable anyway.
//
// See http://openjdk.java.net/projects/nio/javadoc/java/nio/channels/AsynchronousSocketChannel.html
if (cause instanceof InterruptedByTimeoutException) {
channel.unsafe().close(channel.unsafe().voidFuture());
return;
}
ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer(); ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer();
if (!buf.readable()) { if (!buf.readable()) {
buf.discardReadBytes(); buf.discardReadBytes();
@ -263,10 +290,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
} }
} }
private static final class ReadHandler extends AioCompletionHandler<Integer, AioSocketChannel> { private static final class ReadHandler<T extends Number> extends AioCompletionHandler<T, AioSocketChannel> {
@Override @Override
protected void completed0(Integer result, AioSocketChannel channel) { protected void completed0(T result, AioSocketChannel channel) {
final ChannelPipeline pipeline = channel.pipeline(); final ChannelPipeline pipeline = channel.pipeline();
final ByteBuf byteBuf = pipeline.inboundByteBuffer(); final ByteBuf byteBuf = pipeline.inboundByteBuffer();
@ -328,7 +355,11 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
channel.pipeline().fireExceptionCaught(t); channel.pipeline().fireExceptionCaught(t);
if (t instanceof IOException) { // Check if the exception was raised because of an InterruptedByTimeoutException which means that the
// write timeout was hit. In that case we should close the channel as it may be unusable anyway.
//
// See http://openjdk.java.net/projects/nio/javadoc/java/nio/channels/AsynchronousSocketChannel.html
if (t instanceof IOException || t instanceof InterruptedByTimeoutException) {
channel.unsafe().close(channel.unsafe().voidFuture()); channel.unsafe().close(channel.unsafe().voidFuture());
} else { } else {
// start the next read // start the next read

View File

@ -23,6 +23,7 @@ import io.netty.channel.socket.SocketChannelConfig;
import java.io.IOException; import java.io.IOException;
import java.net.StandardSocketOptions; import java.net.StandardSocketOptions;
import java.nio.channels.InterruptedByTimeoutException;
import java.nio.channels.NetworkChannel; import java.nio.channels.NetworkChannel;
import java.util.Map; import java.util.Map;
@ -33,6 +34,8 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
implements SocketChannelConfig { implements SocketChannelConfig {
private final NetworkChannel channel; private final NetworkChannel channel;
private volatile long readTimeoutInMillis;
private volatile long writeTimeoutInMillis;
/** /**
* Creates a new instance. * Creates a new instance.
@ -49,7 +52,8 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
public Map<ChannelOption<?>, Object> getOptions() { public Map<ChannelOption<?>, Object> getOptions() {
return getOptions( return getOptions(
super.getOptions(), super.getOptions(),
SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS); SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS,
AIO_READ_TIMEOUT, AIO_WRITE_TIMEOUT);
} }
@Override @Override
@ -75,6 +79,12 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
if (option == IP_TOS) { if (option == IP_TOS) {
return (T) Integer.valueOf(getTrafficClass()); return (T) Integer.valueOf(getTrafficClass());
} }
if (option == AIO_READ_TIMEOUT) {
return (T) Long.valueOf(getReadTimeout());
}
if (option == AIO_WRITE_TIMEOUT) {
return (T) Long.valueOf(getWriteTimeout());
}
return super.getOption(option); return super.getOption(option);
} }
@ -97,6 +107,10 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
setSoLinger((Integer) value); setSoLinger((Integer) value);
} else if (option == IP_TOS) { } else if (option == IP_TOS) {
setTrafficClass((Integer) value); setTrafficClass((Integer) value);
} else if (option == AIO_READ_TIMEOUT) {
setReadTimeout((Long) value);
} else if (option == AIO_WRITE_TIMEOUT) {
setWriteTimeout((Long) value);
} else { } else {
return super.setOption(option, value); return super.setOption(option, value);
} }
@ -235,4 +249,50 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
throw new ChannelException(e); throw new ChannelException(e);
} }
} }
/**
* Return the read timeout in milliseconds after which a {@link InterruptedByTimeoutException} will get thrown.
* Once such an exception was detected it will get propagated to the handlers first. After that the channel
* will get closed as it may be in an unknown state.
*
* To disable it just use <code>0</code>.
*/
public void setReadTimeout(long readTimeoutInMillis) {
if (readTimeoutInMillis < 0) {
throw new IllegalArgumentException("readTimeoutInMillis: " + readTimeoutInMillis);
}
this.readTimeoutInMillis = readTimeoutInMillis;
}
/**
* Return the write timeout in milliseconds after which a {@link InterruptedByTimeoutException} will get thrown.
* Once such an exception was detected it will get propagated to the handlers first. After that the channel
* will get closed as it may be in an unknown state.
*
* To disable it just use <code>0</code>.
*/
public void setWriteTimeout(long writeTimeoutInMillis) {
if (writeTimeoutInMillis < 0) {
throw new IllegalArgumentException("writeTimeoutInMillis: " + writeTimeoutInMillis);
}
this.writeTimeoutInMillis = writeTimeoutInMillis;
}
/**
* Return the read timeout in milliseconds after which a {@link InterruptedByTimeoutException} will get thrown.
*
* The default is <code>0</code>
*/
public long getReadTimeout() {
return readTimeoutInMillis;
}
/**
* Return the write timeout in milliseconds after which a {@link InterruptedByTimeoutException} will get thrown.
*
* The default is <code>0</code>
*/
public long getWriteTimeout() {
return writeTimeoutInMillis;
}
} }

View File

@ -18,6 +18,7 @@ package io.netty.channel.socket.nio;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.TaskScheduler;
import io.netty.channel.socket.nio.AbstractNioChannel.NioUnsafe; import io.netty.channel.socket.nio.AbstractNioChannel.NioUnsafe;
import io.netty.logging.InternalLogger; import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory; import io.netty.logging.InternalLoggerFactory;
@ -30,7 +31,9 @@ import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -60,8 +63,10 @@ final class NioEventLoop extends SingleThreadEventLoop {
private int cancelledKeys; private int cancelledKeys;
private boolean cleanedCancelledKeys; private boolean cleanedCancelledKeys;
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) { NioEventLoop(
super(parent, threadFactory); NioEventLoopGroup parent, ThreadFactory threadFactory,
TaskScheduler scheduler, SelectorProvider selectorProvider) {
super(parent, threadFactory, scheduler);
if (selectorProvider == null) { if (selectorProvider == null) {
throw new NullPointerException("selectorProvider"); throw new NullPointerException("selectorProvider");
} }
@ -76,6 +81,12 @@ final class NioEventLoop extends SingleThreadEventLoop {
} }
} }
@Override
protected Queue<Runnable> newTaskQueue() {
// This event loop never calls takeTask()
return new ConcurrentLinkedQueue<Runnable>();
}
@Override @Override
protected void run() { protected void run() {
Selector selector = this.selector; Selector selector = this.selector;

View File

@ -17,6 +17,7 @@ package io.netty.channel.socket.nio;
import io.netty.channel.EventExecutor; import io.netty.channel.EventExecutor;
import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.TaskScheduler;
import java.nio.channels.spi.SelectorProvider; import java.nio.channels.spi.SelectorProvider;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
@ -35,18 +36,20 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup {
super(nThreads, threadFactory); super(nThreads, threadFactory);
} }
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) { public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
super(nThreads, threadFactory, selectorProvider); super(nThreads, threadFactory, selectorProvider);
} }
@Override @Override
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { protected EventExecutor newChild(
ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception {
SelectorProvider selectorProvider; SelectorProvider selectorProvider;
if (args == null || args.length == 0 || args[0] == null) { if (args == null || args.length == 0 || args[0] == null) {
selectorProvider = SelectorProvider.provider(); selectorProvider = SelectorProvider.provider();
} else { } else {
selectorProvider = (SelectorProvider) args[0]; selectorProvider = (SelectorProvider) args[0];
} }
return new NioEventLoop(this, threadFactory, selectorProvider); return new NioEventLoop(this, threadFactory, scheduler, selectorProvider);
} }
} }

View File

@ -27,7 +27,7 @@ class OioEventLoop extends SingleThreadEventLoop {
private AbstractOioChannel ch; private AbstractOioChannel ch;
OioEventLoop(OioEventLoopGroup parent) { OioEventLoop(OioEventLoopGroup parent) {
super(parent, parent.threadFactory); super(parent, parent.threadFactory, parent.scheduler);
this.parent = parent; this.parent = parent;
} }

View File

@ -21,6 +21,7 @@ import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.TaskScheduler;
import java.util.Collections; import java.util.Collections;
import java.util.Queue; import java.util.Queue;
@ -34,6 +35,7 @@ import java.util.concurrent.TimeUnit;
public class OioEventLoopGroup implements EventLoopGroup { public class OioEventLoopGroup implements EventLoopGroup {
private final int maxChannels; private final int maxChannels;
final TaskScheduler scheduler;
final ThreadFactory threadFactory; final ThreadFactory threadFactory;
final Set<OioEventLoop> activeChildren = Collections.newSetFromMap( final Set<OioEventLoop> activeChildren = Collections.newSetFromMap(
new ConcurrentHashMap<OioEventLoop, Boolean>()); new ConcurrentHashMap<OioEventLoop, Boolean>());
@ -60,6 +62,8 @@ public class OioEventLoopGroup implements EventLoopGroup {
this.maxChannels = maxChannels; this.maxChannels = maxChannels;
this.threadFactory = threadFactory; this.threadFactory = threadFactory;
scheduler = new TaskScheduler(threadFactory);
tooManyChannels = new ChannelException("too many channels (max: " + maxChannels + ')'); tooManyChannels = new ChannelException("too many channels (max: " + maxChannels + ')');
tooManyChannels.setStackTrace(new StackTraceElement[0]); tooManyChannels.setStackTrace(new StackTraceElement[0]);
} }
@ -71,6 +75,7 @@ public class OioEventLoopGroup implements EventLoopGroup {
@Override @Override
public void shutdown() { public void shutdown() {
scheduler.shutdown();
for (EventLoop l: activeChildren) { for (EventLoop l: activeChildren) {
l.shutdown(); l.shutdown();
} }
@ -81,6 +86,9 @@ public class OioEventLoopGroup implements EventLoopGroup {
@Override @Override
public boolean isShutdown() { public boolean isShutdown() {
if (!scheduler.isShutdown()) {
return false;
}
for (EventLoop l: activeChildren) { for (EventLoop l: activeChildren) {
if (!l.isShutdown()) { if (!l.isShutdown()) {
return false; return false;
@ -96,6 +104,9 @@ public class OioEventLoopGroup implements EventLoopGroup {
@Override @Override
public boolean isTerminated() { public boolean isTerminated() {
if (!scheduler.isTerminated()) {
return false;
}
for (EventLoop l: activeChildren) { for (EventLoop l: activeChildren) {
if (!l.isTerminated()) { if (!l.isTerminated()) {
return false; return false;
@ -113,6 +124,15 @@ public class OioEventLoopGroup implements EventLoopGroup {
public boolean awaitTermination(long timeout, TimeUnit unit) public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException { throws InterruptedException {
long deadline = System.nanoTime() + unit.toNanos(timeout); long deadline = System.nanoTime() + unit.toNanos(timeout);
for (;;) {
long timeLeft = deadline - System.nanoTime();
if (timeLeft <= 0) {
return isTerminated();
}
if (scheduler.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
for (EventLoop l: activeChildren) { for (EventLoop l: activeChildren) {
for (;;) { for (;;) {
long timeLeft = deadline - System.nanoTime(); long timeLeft = deadline - System.nanoTime();

View File

@ -256,7 +256,8 @@ public class SingleThreadEventLoopTest {
final AtomicInteger cleanedUp = new AtomicInteger(); final AtomicInteger cleanedUp = new AtomicInteger();
SingleThreadEventLoopImpl() { SingleThreadEventLoopImpl() {
super(null, Executors.defaultThreadFactory()); super(null, Executors.defaultThreadFactory(),
new TaskScheduler(Executors.defaultThreadFactory()));
} }
@Override @Override