diff --git a/codec-http/src/test/java/io/netty/handler/codec/spdy/AbstractSocketSpdyEchoTest.java b/codec-http/src/test/java/io/netty/handler/codec/spdy/AbstractSocketSpdyEchoTest.java index 06be085c03..f0207c9348 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/spdy/AbstractSocketSpdyEchoTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/spdy/AbstractSocketSpdyEchoTest.java @@ -169,7 +169,7 @@ public abstract class AbstractSocketSpdyEchoTest { protected abstract ServerBootstrap newServerBootstrap(); protected abstract Bootstrap newClientBootstrap(); - @Test + @Test(timeout = 10000) public void testSpdyEcho() throws Throwable { for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) { sb = newServerBootstrap(); diff --git a/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java index 101ee472a0..f978db21e7 100644 --- a/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java @@ -100,7 +100,7 @@ public abstract class StreamToMessageDecoder extends ChannelInboundHandlerAda * inbound buffer. */ public void replace(String newHandlerName, ChannelInboundHandler newHandler) { - if (!ctx.eventLoop().inEventLoop()) { + if (!ctx.executor().inEventLoop()) { throw new IllegalStateException("not in event loop"); } diff --git a/codec/src/main/java/io/netty/handler/codec/embedder/DecoderEmbedder.java b/codec/src/main/java/io/netty/handler/codec/embedder/DecoderEmbedder.java index b38f905b88..627d2420c8 100644 --- a/codec/src/main/java/io/netty/handler/codec/embedder/DecoderEmbedder.java +++ b/codec/src/main/java/io/netty/handler/codec/embedder/DecoderEmbedder.java @@ -17,7 +17,6 @@ package io.netty.handler.codec.embedder; import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffers; -import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.CodecException; @@ -57,11 +56,10 @@ public class DecoderEmbedder extends AbstractCodecEmbedder { @Override public boolean offer(Object input) { - ChannelBufferHolder in = pipeline().inbound(); - if (in.hasByteBuffer()) { - in.byteBuffer().writeBytes((ChannelBuffer) input); + if (input instanceof ChannelBuffer) { + pipeline().inboundByteBuffer().writeBytes((ChannelBuffer) input); } else { - in.messageBuffer().add(input); + pipeline().inboundMessageBuffer().add(input); } pipeline().fireInboundBufferUpdated(); diff --git a/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedEventLoop.java b/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedEventLoop.java index 0e525e0084..816f7efae4 100644 --- a/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedEventLoop.java +++ b/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedEventLoop.java @@ -2,6 +2,7 @@ package io.netty.handler.codec.embedder; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.EventExecutor; import io.netty.channel.EventLoop; import java.util.Collections; @@ -12,7 +13,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; class EmbeddedEventLoop extends AbstractExecutorService implements - EventLoop { + EventLoop, EventExecutor.Unsafe { @Override public ScheduledFuture schedule(Runnable command, long delay, @@ -85,4 +86,19 @@ class EmbeddedEventLoop extends AbstractExecutorService implements public boolean inEventLoop() { return true; } + + @Override + public EventLoop parent() { + return null; + } + + @Override + public Unsafe unsafe() { + return this; + } + + @Override + public EventExecutor nextChild() { + return this; + } } diff --git a/example/src/main/java/io/netty/example/proxy/HexDumpProxyBackendHandler.java b/example/src/main/java/io/netty/example/proxy/HexDumpProxyBackendHandler.java index 6fd65152a5..24d2aaa6cb 100644 --- a/example/src/main/java/io/netty/example/proxy/HexDumpProxyBackendHandler.java +++ b/example/src/main/java/io/netty/example/proxy/HexDumpProxyBackendHandler.java @@ -21,7 +21,7 @@ public class HexDumpProxyBackendHandler extends ChannelInboundStreamHandlerAdapt @Override public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { ChannelBuffer in = ctx.inbound().byteBuffer(); - ChannelBuffer out = inboundChannel.outbound().byteBuffer(); + ChannelBuffer out = inboundChannel.outboundByteBuffer(); out.discardReadBytes(); out.writeBytes(in); in.clear(); diff --git a/example/src/main/java/io/netty/example/proxy/HexDumpProxyFrontendHandler.java b/example/src/main/java/io/netty/example/proxy/HexDumpProxyFrontendHandler.java index c290204296..f946669232 100644 --- a/example/src/main/java/io/netty/example/proxy/HexDumpProxyFrontendHandler.java +++ b/example/src/main/java/io/netty/example/proxy/HexDumpProxyFrontendHandler.java @@ -46,7 +46,7 @@ public class HexDumpProxyFrontendHandler extends ChannelInboundStreamHandlerAdap // Start the connection attempt. Bootstrap b = new Bootstrap(); - b.eventLoop(ctx.eventLoop()) + b.eventLoop(inboundChannel.eventLoop()) .channel(new NioSocketChannel()) .remoteAddress(remoteHost, remotePort) .initializer(new ChannelInitializer() { @@ -75,7 +75,7 @@ public class HexDumpProxyFrontendHandler extends ChannelInboundStreamHandlerAdap @Override public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { ChannelBuffer in = ctx.inbound().byteBuffer(); - ChannelBuffer out = outboundChannel.outbound().byteBuffer(); + ChannelBuffer out = outboundChannel.outboundByteBuffer(); out.discardReadBytes(); out.writeBytes(in); in.clear(); diff --git a/example/src/main/java/io/netty/example/uptime/UptimeClientHandler.java b/example/src/main/java/io/netty/example/uptime/UptimeClientHandler.java index e52e3a7e7a..f36f7add59 100644 --- a/example/src/main/java/io/netty/example/uptime/UptimeClientHandler.java +++ b/example/src/main/java/io/netty/example/uptime/UptimeClientHandler.java @@ -79,7 +79,7 @@ public class UptimeClientHandler extends ChannelInboundStreamHandlerAdapter { throws Exception { println("Sleeping for: " + UptimeClient.RECONNECT_DELAY + "s"); - final EventLoop loop = ctx.eventLoop(); + final EventLoop loop = ctx.channel().eventLoop(); loop.schedule(new Runnable() { @Override public void run() { diff --git a/handler/src/main/java/io/netty/handler/queue/BlockingReadHandler.java b/handler/src/main/java/io/netty/handler/queue/BlockingReadHandler.java index ea70fcd780..eda3ac4d1b 100644 --- a/handler/src/main/java/io/netty/handler/queue/BlockingReadHandler.java +++ b/handler/src/main/java/io/netty/handler/queue/BlockingReadHandler.java @@ -187,7 +187,7 @@ public class BlockingReadHandler extends ChannelInboundHandlerAdapter } private void detectDeadLock() { - if (ctx.eventLoop().inEventLoop()) { + if (ctx.executor().inEventLoop()) { throw new BlockingOperationException(); } } diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index 9c574b76bc..8c08dfbe0f 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -21,8 +21,10 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.channel.DefaultChannelFuture; +import io.netty.handler.codec.StreamToStreamCodec; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import io.netty.util.internal.NonReentrantLock; @@ -139,9 +141,7 @@ import javax.net.ssl.SSLException; * @apiviz.landmark * @apiviz.uses io.netty.handler.ssl.SslBufferPool */ -public class SslHandler extends FrameDecoder - implements ChannelDownstreamHandler, - LifeCycleAwareChannelHandler { +public class SslHandler extends StreamToStreamCodec { private static final InternalLogger logger = InternalLoggerFactory.getInstance(SslHandler.class); @@ -421,38 +421,21 @@ public class SslHandler extends FrameDecoder } @Override - public void handleDownstream( - final ChannelHandlerContext context, final ChannelEvent evt) throws Exception { - if (evt instanceof ChannelStateEvent) { - ChannelStateEvent e = (ChannelStateEvent) evt; - switch (e.getState()) { - case OPEN: - case CONNECTED: - case BOUND: - if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) { - closeOutboundAndChannel(context, e); - return; - } - } - } - if (!(evt instanceof MessageEvent)) { - context.sendDownstream(evt); - return; - } + public void disconnect(ChannelOutboundHandlerContext ctx, + ChannelFuture future) throws Exception { + closeOutboundAndChannel(ctx, e); + super.disconnect(ctx, future); + } - MessageEvent e = (MessageEvent) evt; - if (!(e.getMessage() instanceof ChannelBuffer)) { - context.sendDownstream(evt); - return; - } - - // Do not encrypt the first write request if this handler is - // created with startTLS flag turned on. - if (startTls && sentFirstMessage.compareAndSet(false, true)) { - context.sendDownstream(evt); - return; - } + @Override + public void close(ChannelOutboundHandlerContext ctx, + ChannelFuture future) throws Exception { + closeOutboundAndChannel(ctx, e); + super.close(ctx, future); + } + @Override + public void encode(ChannelOutboundHandlerContext ctx, ChannelBuffer in, ChannelBuffer out) throws Exception { // Otherwise, all messages are encrypted. ChannelBuffer msg = (ChannelBuffer) e.getMessage(); PendingWrite pendingWrite; diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java index 2651a74065..f3eb30f726 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java @@ -158,10 +158,10 @@ public class ChunkedWriteHandler extends ChannelHandlerAdapter { } if (fireExceptionCaught) { - if (ctx.eventLoop().inEventLoop()) { + if (ctx.executor().inEventLoop()) { ctx.fireExceptionCaught(cause); } else { - ctx.eventLoop().execute(new Runnable() { + ctx.executor().execute(new Runnable() { @Override public void run() { ctx.fireExceptionCaught(cause); @@ -212,10 +212,10 @@ public class ChunkedWriteHandler extends ChannelHandlerAdapter { } catch (final Throwable t) { this.currentEvent = null; - if (ctx.eventLoop().inEventLoop()) { + if (ctx.executor().inEventLoop()) { ctx.fireExceptionCaught(t); } else { - ctx.eventLoop().execute(new Runnable() { + ctx.executor().execute(new Runnable() { @Override public void run() { ctx.fireExceptionCaught(t); diff --git a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java index 1d5bb9eb0b..232eaf283d 100644 --- a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java @@ -27,7 +27,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerContext; import io.netty.channel.ChannelOutboundHandlerContext; import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoop; +import io.netty.channel.EventExecutor; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; @@ -274,7 +274,7 @@ public class IdleStateHandler extends ChannelHandlerAdapter { return; } - EventLoop loop = ctx.eventLoop(); + EventExecutor loop = ctx.executor(); lastReadTime = lastWriteTime = System.currentTimeMillis(); if (readerIdleTimeMillis > 0) { @@ -335,7 +335,7 @@ public class IdleStateHandler extends ChannelHandlerAdapter { if (nextDelay <= 0) { // Reader is idle - set a new timeout and notify the callback. readerIdleTimeout = - ctx.eventLoop().schedule(this, readerIdleTimeMillis, TimeUnit.MILLISECONDS); + ctx.executor().schedule(this, readerIdleTimeMillis, TimeUnit.MILLISECONDS); try { channelIdle(ctx, new IdleStateEvent( IdleState.READER_IDLE, readerIdleCount ++, currentTime - lastReadTime)); @@ -344,7 +344,7 @@ public class IdleStateHandler extends ChannelHandlerAdapter { } } else { // Read occurred before the timeout - set a new timeout with shorter delay. - readerIdleTimeout = ctx.eventLoop().schedule(this, nextDelay, TimeUnit.MILLISECONDS); + readerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.MILLISECONDS); } } @@ -369,7 +369,7 @@ public class IdleStateHandler extends ChannelHandlerAdapter { long nextDelay = writerIdleTimeMillis - (currentTime - lastWriteTime); if (nextDelay <= 0) { // Writer is idle - set a new timeout and notify the callback. - writerIdleTimeout = ctx.eventLoop().schedule( + writerIdleTimeout = ctx.executor().schedule( this, writerIdleTimeMillis, TimeUnit.MILLISECONDS); try { channelIdle(ctx, new IdleStateEvent( @@ -379,7 +379,7 @@ public class IdleStateHandler extends ChannelHandlerAdapter { } } else { // Write occurred before the timeout - set a new timeout with shorter delay. - writerIdleTimeout = ctx.eventLoop().schedule(this, nextDelay, TimeUnit.MILLISECONDS); + writerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.MILLISECONDS); } } } @@ -404,7 +404,7 @@ public class IdleStateHandler extends ChannelHandlerAdapter { if (nextDelay <= 0) { // Both reader and writer are idle - set a new timeout and // notify the callback. - allIdleTimeout = ctx.eventLoop().schedule( + allIdleTimeout = ctx.executor().schedule( this, allIdleTimeMillis, TimeUnit.MILLISECONDS); try { channelIdle(ctx, new IdleStateEvent( @@ -415,7 +415,7 @@ public class IdleStateHandler extends ChannelHandlerAdapter { } else { // Either read or write occurred before the timeout - set a new // timeout with shorter delay. - allIdleTimeout = ctx.eventLoop().schedule(this, nextDelay, TimeUnit.MILLISECONDS); + allIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.MILLISECONDS); } } } diff --git a/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutHandler.java b/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutHandler.java index 1c08ac7030..8e1d88dfd0 100644 --- a/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutHandler.java @@ -23,7 +23,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerContext; import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoop; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; @@ -164,11 +163,9 @@ public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter { return; } - EventLoop loop = ctx.eventLoop(); - lastReadTime = System.currentTimeMillis(); if (timeoutMillis > 0) { - timeout = loop.schedule( + timeout = ctx.executor().schedule( new ReadTimeoutTask(ctx), timeoutMillis, TimeUnit.MILLISECONDS); } @@ -209,7 +206,7 @@ public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter { long nextDelay = timeoutMillis - (currentTime - lastReadTime); if (nextDelay <= 0) { // Read timed out - set a new timeout and notify the callback. - timeout = ctx.eventLoop().schedule(this, timeoutMillis, TimeUnit.MILLISECONDS); + timeout = ctx.executor().schedule(this, timeoutMillis, TimeUnit.MILLISECONDS); try { readTimedOut(ctx); } catch (Throwable t) { @@ -217,7 +214,7 @@ public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter { } } else { // Read occurred before the timeout - set a new timeout with shorter delay. - timeout = ctx.eventLoop().schedule(this, nextDelay, TimeUnit.MILLISECONDS); + timeout = ctx.executor().schedule(this, nextDelay, TimeUnit.MILLISECONDS); } } } diff --git a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java index 30121824e0..719b750401 100644 --- a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java @@ -113,7 +113,7 @@ public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter { public void flush(final ChannelOutboundHandlerContext ctx, final ChannelFuture future) throws Exception { if (timeoutMillis > 0) { // Schedule a timeout. - final ScheduledFuture sf = ctx.eventLoop().schedule(new Runnable() { + final ScheduledFuture sf = ctx.executor().schedule(new Runnable() { @Override public void run() { if (future.setFailure(WriteTimeoutException.INSTANCE)) { diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index bd54a95277..b8dbfc0706 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -15,6 +15,7 @@ */ package io.netty.channel; +import io.netty.buffer.ChannelBuffer; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import io.netty.util.DefaultAttributeMap; @@ -24,6 +25,7 @@ import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.util.ArrayDeque; import java.util.Deque; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -266,8 +268,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } @Override - public ChannelBufferHolder outbound() { - return pipeline().outbound(); + public ChannelBuffer outboundByteBuffer() { + return pipeline().outboundByteBuffer(); + } + + @Override + public Queue outboundMessageBuffer() { + return pipeline().outboundMessageBuffer(); } @Override diff --git a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java index 8020ca8840..0605021291 100644 --- a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java @@ -15,7 +15,10 @@ */ package io.netty.channel; +import io.netty.buffer.ChannelBuffer; + import java.net.SocketAddress; +import java.util.Queue; /** * A skeletal server-side {@link Channel} implementation. A server-side @@ -37,8 +40,13 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S } @Override - public ChannelBufferHolder outbound() { - return ChannelBufferHolders.discardBuffer(); + public ChannelBuffer outboundByteBuffer() { + throw new NoSuchBufferException(); + } + + @Override + public Queue outboundMessageBuffer() { + throw new NoSuchBufferException(); } @Override diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index f11ac0c84d..0620dbad2f 100644 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -15,6 +15,7 @@ */ package io.netty.channel; +import io.netty.buffer.ChannelBuffer; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; @@ -23,6 +24,7 @@ import io.netty.util.AttributeMap; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.channels.SelectionKey; +import java.util.Queue; /** @@ -136,7 +138,8 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFu boolean isRegistered(); boolean isActive(); - ChannelBufferHolder outbound(); + ChannelBuffer outboundByteBuffer(); + Queue outboundMessageBuffer(); /** * Returns the local address where this channel is bound to. The returned diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java index f71cca4ca8..e85dfe0061 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java @@ -129,7 +129,7 @@ public interface ChannelHandlerContext ChannelInboundInvoker, ChannelOutboundInvoker { Channel channel(); ChannelPipeline pipeline(); - EventLoop eventLoop(); + EventExecutor executor(); String name(); ChannelHandler handler(); diff --git a/transport/src/main/java/io/netty/channel/ChannelPipeline.java b/transport/src/main/java/io/netty/channel/ChannelPipeline.java index 29f0ba1a3e..e89b2c1613 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/ChannelPipeline.java @@ -23,6 +23,7 @@ import java.nio.channels.Channels; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Queue; /** @@ -205,8 +206,10 @@ import java.util.NoSuchElementException; */ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundInvoker { - ChannelBufferHolder inbound(); - ChannelBufferHolder outbound(); + Queue inboundMessageBuffer(); + ChannelBuffer inboundByteBuffer(); + Queue outboundMessageBuffer(); + ChannelBuffer outboundByteBuffer(); /** * Inserts a {@link ChannelHandler} at the first position of this pipeline. @@ -221,6 +224,19 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI */ ChannelPipeline addFirst(String name, ChannelHandler handler); + /** + * Inserts a {@link ChannelHandler} at the first position of this pipeline. + * + * @param name the name of the handler to insert first + * @param handler the handler to insert first + * + * @throws IllegalArgumentException + * if there's an entry with the same name already in the pipeline + * @throws NullPointerException + * if the specified name or handler is {@code null} + */ + ChannelPipeline addFirst(EventExecutor executor, String name, ChannelHandler handler); + /** * Appends a {@link ChannelHandler} at the last position of this pipeline. * @@ -234,6 +250,19 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI */ ChannelPipeline addLast(String name, ChannelHandler handler); + /** + * Appends a {@link ChannelHandler} at the last position of this pipeline. + * + * @param name the name of the handler to append + * @param handler the handler to append + * + * @throws IllegalArgumentException + * if there's an entry with the same name already in the pipeline + * @throws NullPointerException + * if the specified name or handler is {@code null} + */ + ChannelPipeline addLast(EventExecutor executor, String name, ChannelHandler handler); + /** * Inserts a {@link ChannelHandler} before an existing handler of this * pipeline. @@ -251,6 +280,23 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI */ ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler); + /** + * Inserts a {@link ChannelHandler} before an existing handler of this + * pipeline. + * + * @param baseName the name of the existing handler + * @param name the name of the handler to insert before + * @param handler the handler to insert before + * + * @throws NoSuchElementException + * if there's no such entry with the specified {@code baseName} + * @throws IllegalArgumentException + * if there's an entry with the same name already in the pipeline + * @throws NullPointerException + * if the specified baseName, name, or handler is {@code null} + */ + ChannelPipeline addBefore(EventExecutor executor, String baseName, String name, ChannelHandler handler); + /** * Inserts a {@link ChannelHandler} after an existing handler of this * pipeline. @@ -268,9 +314,31 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI */ ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler); + /** + * Inserts a {@link ChannelHandler} after an existing handler of this + * pipeline. + * + * @param baseName the name of the existing handler + * @param name the name of the handler to insert after + * @param handler the handler to insert after + * + * @throws NoSuchElementException + * if there's no such entry with the specified {@code baseName} + * @throws IllegalArgumentException + * if there's an entry with the same name already in the pipeline + * @throws NullPointerException + * if the specified baseName, name, or handler is {@code null} + */ + ChannelPipeline addAfter(EventExecutor executor, String baseName, String name, ChannelHandler handler); + ChannelPipeline addFirst(ChannelHandler... handlers); + + ChannelPipeline addFirst(EventExecutor executor, ChannelHandler... handlers); + ChannelPipeline addLast(ChannelHandler... handlers); + ChannelPipeline addLast(EventExecutor executor, ChannelHandler... handlers); + /** * Removes the specified {@link ChannelHandler} from this pipeline. * diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java new file mode 100644 index 0000000000..3567eeb910 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -0,0 +1,375 @@ +package io.netty.channel; + +import io.netty.buffer.ChannelBuffer; +import io.netty.util.DefaultAttributeMap; + +import java.net.SocketAddress; +import java.util.Queue; + +final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelInboundHandlerContext, ChannelOutboundHandlerContext { + volatile DefaultChannelHandlerContext next; + volatile DefaultChannelHandlerContext prev; + private final DefaultChannelPipeline pipeline; + final EventExecutor executor; + private final String name; + private final ChannelHandler handler; + private final boolean canHandleInbound; + private final boolean canHandleOutbound; + final ChannelBufferHolder in; + private final ChannelBufferHolder out; + + // Runnables that calls handlers + final Runnable fireChannelRegisteredTask = new Runnable() { + @Override + @SuppressWarnings("unchecked") + public void run() { + DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this; + try { + ((ChannelInboundHandler) ctx.handler()).channelRegistered(ctx); + } catch (Throwable t) { + pipeline.notifyHandlerException(t); + } + } + }; + final Runnable fireChannelUnregisteredTask = new Runnable() { + @Override + @SuppressWarnings("unchecked") + public void run() { + DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this; + try { + ((ChannelInboundHandler) ctx.handler()).channelUnregistered(ctx); + } catch (Throwable t) { + pipeline.notifyHandlerException(t); + } + } + }; + final Runnable fireChannelActiveTask = new Runnable() { + @Override + @SuppressWarnings("unchecked") + public void run() { + DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this; + try { + ((ChannelInboundHandler) ctx.handler()).channelActive(ctx); + } catch (Throwable t) { + pipeline.notifyHandlerException(t); + } + } + }; + final Runnable fireChannelInactiveTask = new Runnable() { + @Override + @SuppressWarnings("unchecked") + public void run() { + DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this; + try { + ((ChannelInboundHandler) ctx.handler()).channelInactive(ctx); + } catch (Throwable t) { + pipeline.notifyHandlerException(t); + } + } + }; + final Runnable fireInboundBufferUpdatedTask = new Runnable() { + @Override + @SuppressWarnings("unchecked") + public void run() { + DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this; + try { + ((ChannelInboundHandler) ctx.handler()).inboundBufferUpdated(ctx); + } catch (Throwable t) { + pipeline.notifyHandlerException(t); + } finally { + ChannelBufferHolder inbound = ctx.inbound(); + if (!inbound.isBypass() && inbound.isEmpty() && inbound.hasByteBuffer()) { + inbound.byteBuffer().discardReadBytes(); + } + } + } + }; + + @SuppressWarnings("unchecked") + DefaultChannelHandlerContext( + DefaultChannelPipeline pipeline, EventExecutor executor, + DefaultChannelHandlerContext prev, DefaultChannelHandlerContext next, + String name, ChannelHandler handler) { + + if (name == null) { + throw new NullPointerException("name"); + } + if (handler == null) { + throw new NullPointerException("handler"); + } + canHandleInbound = handler instanceof ChannelInboundHandler; + canHandleOutbound = handler instanceof ChannelOutboundHandler; + + if (!canHandleInbound && !canHandleOutbound) { + throw new IllegalArgumentException( + "handler must be either " + + ChannelInboundHandler.class.getName() + " or " + + ChannelOutboundHandler.class.getName() + '.'); + } + + this.prev = prev; + this.next = next; + + this.pipeline = pipeline; + this.name = name; + this.handler = handler; + + if (executor != null) { + // Pin one of the child executors once and remember it so that the same child executor + // is used to fire events for the same channel. + EventExecutor childExecutor = pipeline.childExecutors.get(executor); + if (childExecutor == null) { + childExecutor = executor.unsafe().nextChild(); + pipeline.childExecutors.put(executor, childExecutor); + } + this.executor = childExecutor; + } else { + this.executor = null; + } + + if (canHandleInbound) { + try { + in = ((ChannelInboundHandler) handler).newInboundBuffer(this); + } catch (Exception e) { + throw new ChannelPipelineException("A user handler failed to create a new inbound buffer.", e); + } + } else { + in = null; + } + if (canHandleOutbound) { + try { + out = ((ChannelOutboundHandler) handler).newOutboundBuffer(this); + } catch (Exception e) { + throw new ChannelPipelineException("A user handler failed to create a new outbound buffer.", e); + } finally { + if (in != null) { + // TODO Release the inbound buffer once pooling is implemented. + } + } + } else { + out = null; + } + } + + @Override + public Channel channel() { + return pipeline.channel; + } + + @Override + public ChannelPipeline pipeline() { + return pipeline; + } + + @Override + public EventExecutor executor() { + if (executor == null) { + return channel().eventLoop(); + } else { + return executor; + } + } + + @Override + public ChannelHandler handler() { + return handler; + } + + @Override + public String name() { + return name; + } + + @Override + public boolean canHandleInbound() { + return canHandleInbound; + } + + @Override + public boolean canHandleOutbound() { + return canHandleOutbound; + } + + @Override + public ChannelBufferHolder inbound() { + return in; + } + + @Override + public ChannelBufferHolder outbound() { + return out; + } + + @Override + public ChannelBuffer nextInboundByteBuffer() { + return DefaultChannelPipeline.nextInboundByteBuffer(next); + } + + @Override + public Queue nextInboundMessageBuffer() { + return DefaultChannelPipeline.nextInboundMessageBuffer(next); + } + + @Override + public ChannelBuffer nextOutboundByteBuffer() { + return pipeline.nextOutboundByteBuffer(prev); + } + + @Override + public Queue nextOutboundMessageBuffer() { + return pipeline.nextOutboundMessageBuffer(prev); + } + + @Override + public void fireChannelRegistered() { + DefaultChannelHandlerContext next = DefaultChannelPipeline.nextInboundContext(this.next); + if (next != null) { + DefaultChannelPipeline.fireChannelRegistered(next); + } + } + + @Override + public void fireChannelUnregistered() { + DefaultChannelHandlerContext next = DefaultChannelPipeline.nextInboundContext(this.next); + if (next != null) { + DefaultChannelPipeline.fireChannelUnregistered(next); + } + } + + @Override + public void fireChannelActive() { + DefaultChannelHandlerContext next = DefaultChannelPipeline.nextInboundContext(this.next); + if (next != null) { + DefaultChannelPipeline.fireChannelActive(next); + } + } + + @Override + public void fireChannelInactive() { + DefaultChannelHandlerContext next = DefaultChannelPipeline.nextInboundContext(this.next); + if (next != null) { + DefaultChannelPipeline.fireChannelInactive(next); + } + } + + @Override + public void fireExceptionCaught(Throwable cause) { + DefaultChannelHandlerContext next = DefaultChannelPipeline.nextInboundContext(this.next); + if (next != null) { + pipeline.fireExceptionCaught(next, cause); + } else { + DefaultChannelPipeline.logTerminalException(cause); + } + } + + @Override + public void fireUserEventTriggered(Object event) { + DefaultChannelHandlerContext next = DefaultChannelPipeline.nextInboundContext(this.next); + if (next != null) { + pipeline.fireUserEventTriggered(next, event); + } + } + + @Override + public void fireInboundBufferUpdated() { + DefaultChannelHandlerContext next = DefaultChannelPipeline.nextInboundContext(this.next); + if (next != null) { + DefaultChannelPipeline.fireInboundBufferUpdated(next); + } + } + + @Override + public ChannelFuture bind(SocketAddress localAddress) { + return bind(localAddress, newFuture()); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress) { + return connect(remoteAddress, newFuture()); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { + return connect(remoteAddress, localAddress, newFuture()); + } + + @Override + public ChannelFuture disconnect() { + return disconnect(newFuture()); + } + + @Override + public ChannelFuture close() { + return close(newFuture()); + } + + @Override + public ChannelFuture deregister() { + return deregister(newFuture()); + } + + @Override + public ChannelFuture flush() { + return flush(newFuture()); + } + + @Override + public ChannelFuture write(Object message) { + return write(message, newFuture()); + } + + @Override + public ChannelFuture bind(SocketAddress localAddress, ChannelFuture future) { + return pipeline.bind(DefaultChannelPipeline.nextOutboundContext(prev), localAddress, future); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, ChannelFuture future) { + return connect(remoteAddress, null, future); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) { + return pipeline.connect(DefaultChannelPipeline.nextOutboundContext(prev), remoteAddress, localAddress, future); + } + + @Override + public ChannelFuture disconnect(ChannelFuture future) { + return pipeline.disconnect(DefaultChannelPipeline.nextOutboundContext(prev), future); + } + + @Override + public ChannelFuture close(ChannelFuture future) { + return pipeline.close(DefaultChannelPipeline.nextOutboundContext(prev), future); + } + + @Override + public ChannelFuture deregister(ChannelFuture future) { + return pipeline.deregister(DefaultChannelPipeline.nextOutboundContext(prev), future); + } + + @Override + public ChannelFuture flush(ChannelFuture future) { + return pipeline.flush(DefaultChannelPipeline.nextOutboundContext(prev), future); + } + + @Override + public ChannelFuture write(Object message, ChannelFuture future) { + return pipeline.write(DefaultChannelPipeline.nextOutboundContext(prev), message, future); + } + + @Override + public ChannelFuture newFuture() { + return channel().newFuture(); + } + + @Override + public ChannelFuture newSucceededFuture() { + return channel().newSucceededFuture(); + } + + @Override + public ChannelFuture newFailedFuture(Throwable cause) { + return channel().newFailedFuture(cause); + } +} \ No newline at end of file diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 8ea65888b1..2c6cb3431a 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -18,11 +18,11 @@ package io.netty.channel; import io.netty.buffer.ChannelBuffer; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; -import io.netty.util.DefaultAttributeMap; import java.net.SocketAddress; import java.util.ArrayList; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -37,7 +37,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class); - private final Channel channel; + final Channel channel; private volatile DefaultChannelHandlerContext head; private volatile DefaultChannelHandlerContext tail; private final Map name2ctx = @@ -45,6 +45,9 @@ public class DefaultChannelPipeline implements ChannelPipeline { private boolean firedChannelActive; private boolean fireInboundBufferUpdatedOnActivation; + final Map childExecutors = + new IdentityHashMap(); + public DefaultChannelPipeline(Channel channel) { if (channel == null) { throw new NullPointerException("channel"); @@ -58,13 +61,19 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public synchronized ChannelPipeline addFirst(String name, ChannelHandler handler) { + public ChannelPipeline addFirst(String name, ChannelHandler handler) { + return addFirst(null, name, handler); + } + + @Override + public synchronized ChannelPipeline addFirst(EventExecutor executor, String name, ChannelHandler handler) { if (name2ctx.isEmpty()) { - init(name, handler); + init(executor, name, handler); } else { checkDuplicateName(name); DefaultChannelHandlerContext oldHead = head; - DefaultChannelHandlerContext newHead = new DefaultChannelHandlerContext(null, oldHead, name, handler); + DefaultChannelHandlerContext newHead = + new DefaultChannelHandlerContext(this, executor, null, oldHead, name, handler); callBeforeAdd(newHead); @@ -79,13 +88,19 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public synchronized ChannelPipeline addLast(String name, ChannelHandler handler) { + public ChannelPipeline addLast(String name, ChannelHandler handler) { + return addLast(null, name, handler); + } + + @Override + public synchronized ChannelPipeline addLast(EventExecutor executor, String name, ChannelHandler handler) { if (name2ctx.isEmpty()) { - init(name, handler); + init(executor, name, handler); } else { checkDuplicateName(name); DefaultChannelHandlerContext oldTail = tail; - DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(oldTail, null, name, handler); + DefaultChannelHandlerContext newTail = + new DefaultChannelHandlerContext(this, executor, oldTail, null, name, handler); callBeforeAdd(newTail); @@ -100,13 +115,19 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public synchronized ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) { + public ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) { + return addBefore(null, baseName, name, handler); + } + + @Override + public synchronized ChannelPipeline addBefore(EventExecutor executor, String baseName, String name, ChannelHandler handler) { DefaultChannelHandlerContext ctx = getContextOrDie(baseName); if (ctx == head) { addFirst(name, handler); } else { checkDuplicateName(name); - DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(ctx.prev, ctx, name, handler); + DefaultChannelHandlerContext newCtx = + new DefaultChannelHandlerContext(this, executor, ctx.prev, ctx, name, handler); callBeforeAdd(newCtx); @@ -121,13 +142,19 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public synchronized ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) { + public ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) { + return addAfter(null, baseName, name, handler); + } + + @Override + public synchronized ChannelPipeline addAfter(EventExecutor executor, String baseName, String name, ChannelHandler handler) { DefaultChannelHandlerContext ctx = getContextOrDie(baseName); if (ctx == tail) { addLast(name, handler); } else { checkDuplicateName(name); - DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(ctx, ctx.next, name, handler); + DefaultChannelHandlerContext newCtx = + new DefaultChannelHandlerContext(this, executor, ctx, ctx.next, name, handler); callBeforeAdd(newCtx); @@ -143,6 +170,11 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline addFirst(ChannelHandler... handlers) { + return addFirst(null, handlers); + } + + @Override + public ChannelPipeline addFirst(EventExecutor executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } @@ -159,7 +191,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { for (int i = size - 1; i >= 0; i --) { ChannelHandler h = handlers[i]; - addFirst(generateName(h), h); + addFirst(executor, generateName(h), h); } return this; @@ -167,6 +199,11 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline addLast(ChannelHandler... handlers) { + return addLast(null, handlers); + } + + @Override + public ChannelPipeline addLast(EventExecutor executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } @@ -175,7 +212,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { if (h == null) { break; } - addLast(generateName(h), h); + addLast(executor, generateName(h), h); } return this; @@ -202,7 +239,6 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - @SuppressWarnings("unchecked") public synchronized T remove(Class handlerType) { return (T) remove(getContextOrDie(handlerType)).handler(); } @@ -315,7 +351,8 @@ public class DefaultChannelPipeline implements ChannelPipeline { DefaultChannelHandlerContext prev = ctx.prev; DefaultChannelHandlerContext next = ctx.next; - DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(prev, next, newName, newHandler); + DefaultChannelHandlerContext newCtx = + new DefaultChannelHandlerContext(this, ctx.executor, prev, next, newName, newHandler); callBeforeRemove(ctx); callBeforeAdd(newCtx); @@ -582,21 +619,87 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelBufferHolder inbound() { - DefaultChannelHandlerContext ctx = firstInboundContext(); - if (ctx != null) { - return ctx.inbound(); - } - return null; + public Queue inboundMessageBuffer() { + return nextInboundMessageBuffer(head); } @Override - public ChannelBufferHolder outbound() { - DefaultChannelHandlerContext ctx = firstOutboundContext(); - if (ctx != null) { - return ctx.outbound(); + public ChannelBuffer inboundByteBuffer() { + return nextInboundByteBuffer(head); + } + + @Override + public Queue outboundMessageBuffer() { + return nextOutboundMessageBuffer(tail); + } + + @Override + public ChannelBuffer outboundByteBuffer() { + return nextOutboundByteBuffer(tail); + } + + static ChannelBuffer nextInboundByteBuffer(DefaultChannelHandlerContext ctx) { + for (;;) { + if (ctx == null) { + throw NoSuchBufferException.INSTANCE; + } + ChannelBufferHolder in = ctx.in; + if (in != null && !in.isBypass() && in.hasByteBuffer()) { + return in.byteBuffer(); + } + ctx = ctx.next; + } + } + + static Queue nextInboundMessageBuffer(DefaultChannelHandlerContext ctx) { + for (;;) { + if (ctx == null) { + throw NoSuchBufferException.INSTANCE; + } + ChannelBufferHolder in = ctx.inbound(); + if (in != null && !in.isBypass() && in.hasMessageBuffer()) { + return in.messageBuffer(); + } + ctx = ctx.next; + } + } + + ChannelBuffer nextOutboundByteBuffer(DefaultChannelHandlerContext ctx) { + for (;;) { + if (ctx == null) { + ChannelBufferHolder lastOut = channel().unsafe().directOutbound(); + if (lastOut.hasByteBuffer()) { + return lastOut.byteBuffer(); + } else { + throw NoSuchBufferException.INSTANCE; + } + } + + ChannelBufferHolder out = ctx.outbound(); + if (out != null && !out.isBypass() && out.hasByteBuffer()) { + return out.byteBuffer(); + } + ctx = ctx.prev; + } + } + + Queue nextOutboundMessageBuffer(DefaultChannelHandlerContext ctx) { + for (;;) { + if (ctx == null) { + ChannelBufferHolder lastOut = channel().unsafe().directOutbound(); + if (lastOut.hasMessageBuffer()) { + return lastOut.messageBuffer(); + } else { + throw NoSuchBufferException.INSTANCE; + } + } + + ChannelBufferHolder out = ctx.outbound(); + if (out != null && !out.isBypass() && out.hasMessageBuffer()) { + return out.messageBuffer(); + } + ctx = ctx.prev; } - return channel().unsafe().directOutbound(); } @Override @@ -607,12 +710,12 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } - @SuppressWarnings("unchecked") - private void fireChannelRegistered(DefaultChannelHandlerContext ctx) { - try { - ((ChannelInboundHandler) ctx.handler()).channelRegistered(ctx); - } catch (Throwable t) { - notifyHandlerException(t); + static void fireChannelRegistered(DefaultChannelHandlerContext ctx) { + EventExecutor executor = ctx.executor(); + if (executor.inEventLoop()) { + ctx.fireChannelRegisteredTask.run(); + } else { + executor.execute(ctx.fireChannelRegisteredTask); } } @@ -624,12 +727,12 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } - @SuppressWarnings("unchecked") - private void fireChannelUnregistered(DefaultChannelHandlerContext ctx) { - try { - ((ChannelInboundHandler) ctx.handler()).channelUnregistered(ctx); - } catch (Throwable t) { - notifyHandlerException(t); + static void fireChannelUnregistered(DefaultChannelHandlerContext ctx) { + EventExecutor executor = ctx.executor(); + if (executor.inEventLoop()) { + ctx.fireChannelUnregisteredTask.run(); + } else { + executor.execute(ctx.fireChannelUnregisteredTask); } } @@ -646,14 +749,15 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } - @SuppressWarnings("unchecked") - private void fireChannelActive(DefaultChannelHandlerContext ctx) { - try { - ((ChannelInboundHandler) ctx.handler()).channelActive(ctx); - } catch (Throwable t) { - notifyHandlerException(t); + static void fireChannelActive(DefaultChannelHandlerContext ctx) { + EventExecutor executor = ctx.executor(); + if (executor.inEventLoop()) { + ctx.fireChannelActiveTask.run(); + } else { + executor.execute(ctx.fireChannelActiveTask); } } + @Override public void fireChannelInactive() { DefaultChannelHandlerContext ctx = firstInboundContext(); @@ -666,12 +770,12 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } - @SuppressWarnings("unchecked") - private void fireChannelInactive(DefaultChannelHandlerContext ctx) { - try { - ((ChannelInboundHandler) ctx.handler()).channelInactive(ctx); - } catch (Throwable t) { - notifyHandlerException(t); + static void fireChannelInactive(DefaultChannelHandlerContext ctx) { + EventExecutor executor = ctx.executor(); + if (executor.inEventLoop()) { + ctx.fireChannelInactiveTask.run(); + } else { + executor.execute(ctx.fireChannelInactiveTask); } } @@ -685,28 +789,37 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } - private static void logTerminalException(Throwable cause) { + static void logTerminalException(Throwable cause) { logger.warn( "An exceptionCaught() event was fired, and it reached at the end of the " + "pipeline. It usually means the last inbound handler in the pipeline did not " + "handle the exception.", cause); } - @SuppressWarnings("unchecked") - private void fireExceptionCaught(DefaultChannelHandlerContext ctx, Throwable cause) { + void fireExceptionCaught(final DefaultChannelHandlerContext ctx, final Throwable cause) { if (cause == null) { throw new NullPointerException("cause"); } - try { - ((ChannelInboundHandler) ctx.handler()).exceptionCaught(ctx, cause); - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - logger.warn( - "An exception was thrown by a user handler's " + - "exceptionCaught() method while handling the following exception:", cause); + EventExecutor executor = ctx.executor(); + if (executor.inEventLoop()) { + try { + ((ChannelInboundHandler) ctx.handler()).exceptionCaught(ctx, cause); + } catch (Throwable t) { + if (logger.isWarnEnabled()) { + logger.warn( + "An exception was thrown by a user handler's " + + "exceptionCaught() method while handling the following exception:", cause); + } + notifyHandlerException(t); } - notifyHandlerException(t); + } else { + executor.execute(new Runnable() { + @Override + public void run() { + fireExceptionCaught(ctx, cause); + } + }); } } @@ -718,16 +831,25 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } - @SuppressWarnings("unchecked") - private void fireUserEventTriggered(DefaultChannelHandlerContext ctx, Object event) { + void fireUserEventTriggered(final DefaultChannelHandlerContext ctx, final Object event) { if (event == null) { throw new NullPointerException("event"); } - try { - ((ChannelInboundHandler) ctx.handler()).userEventTriggered(ctx, event); - } catch (Throwable t) { - notifyHandlerException(t); + EventExecutor executor = ctx.executor(); + if (executor.inEventLoop()) { + try { + ((ChannelInboundHandler) ctx.handler()).userEventTriggered(ctx, event); + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + executor.execute(new Runnable() { + @Override + public void run() { + fireUserEventTriggered(ctx, event); + } + }); } } @@ -743,17 +865,12 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } - @SuppressWarnings("unchecked") - private void fireInboundBufferUpdated(DefaultChannelHandlerContext ctx) { - try { - ((ChannelInboundHandler) ctx.handler()).inboundBufferUpdated(ctx); - } catch (Throwable t) { - notifyHandlerException(t); - } finally { - ChannelBufferHolder inbound = ctx.inbound(); - if (!inbound.isBypass() && inbound.isEmpty() && inbound.hasByteBuffer()) { - inbound.byteBuffer().discardReadBytes(); - } + static void fireInboundBufferUpdated(DefaultChannelHandlerContext ctx) { + EventExecutor executor = ctx.executor(); + if (executor.inEventLoop()) { + ctx.fireInboundBufferUpdatedTask.run(); + } else { + executor.execute(ctx.fireInboundBufferUpdatedTask); } } @@ -802,18 +919,27 @@ public class DefaultChannelPipeline implements ChannelPipeline { return bind(firstOutboundContext(), localAddress, future); } - @SuppressWarnings("unchecked") - private ChannelFuture bind(DefaultChannelHandlerContext ctx, SocketAddress localAddress, ChannelFuture future) { + ChannelFuture bind(final DefaultChannelHandlerContext ctx, final SocketAddress localAddress, final ChannelFuture future) { if (localAddress == null) { throw new NullPointerException("localAddress"); } validateFuture(future); if (ctx != null) { - try { - ((ChannelOutboundHandler) ctx.handler()).bind(ctx, localAddress, future); - } catch (Throwable t) { - notifyHandlerException(t); + EventExecutor executor = ctx.executor(); + if (executor.inEventLoop()) { + try { + ((ChannelOutboundHandler) ctx.handler()).bind(ctx, localAddress, future); + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + executor.execute(new Runnable() { + @Override + public void run() { + bind(ctx, localAddress, future); + } + }); } } else { channel().unsafe().bind(localAddress, future); @@ -831,18 +957,27 @@ public class DefaultChannelPipeline implements ChannelPipeline { return connect(firstOutboundContext(), remoteAddress, localAddress, future); } - @SuppressWarnings("unchecked") - private ChannelFuture connect(DefaultChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) { + ChannelFuture connect(final DefaultChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelFuture future) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } validateFuture(future); if (ctx != null) { - try { - ((ChannelOutboundHandler) ctx.handler()).connect(ctx, remoteAddress, localAddress, future); - } catch (Throwable t) { - notifyHandlerException(t); + EventExecutor executor = ctx.executor(); + if (executor.inEventLoop()) { + try { + ((ChannelOutboundHandler) ctx.handler()).connect(ctx, remoteAddress, localAddress, future); + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + executor.execute(new Runnable() { + @Override + public void run() { + connect(ctx, remoteAddress, localAddress, future); + } + }); } } else { channel().unsafe().connect(remoteAddress, localAddress, future); @@ -856,14 +991,23 @@ public class DefaultChannelPipeline implements ChannelPipeline { return disconnect(firstOutboundContext(), future); } - @SuppressWarnings("unchecked") - private ChannelFuture disconnect(DefaultChannelHandlerContext ctx, ChannelFuture future) { + ChannelFuture disconnect(final DefaultChannelHandlerContext ctx, final ChannelFuture future) { validateFuture(future); if (ctx != null) { - try { - ((ChannelOutboundHandler) ctx.handler()).disconnect(ctx, future); - } catch (Throwable t) { - notifyHandlerException(t); + EventExecutor executor = ctx.executor(); + if (executor.inEventLoop()) { + try { + ((ChannelOutboundHandler) ctx.handler()).disconnect(ctx, future); + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + executor.execute(new Runnable() { + @Override + public void run() { + disconnect(ctx, future); + } + }); } } else { channel().unsafe().disconnect(future); @@ -877,14 +1021,23 @@ public class DefaultChannelPipeline implements ChannelPipeline { return close(firstOutboundContext(), future); } - @SuppressWarnings("unchecked") - private ChannelFuture close(DefaultChannelHandlerContext ctx, ChannelFuture future) { + ChannelFuture close(final DefaultChannelHandlerContext ctx, final ChannelFuture future) { validateFuture(future); if (ctx != null) { - try { - ((ChannelOutboundHandler) ctx.handler()).close(ctx, future); - } catch (Throwable t) { - notifyHandlerException(t); + EventExecutor executor = ctx.executor(); + if (executor.inEventLoop()) { + try { + ((ChannelOutboundHandler) ctx.handler()).close(ctx, future); + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + executor.execute(new Runnable() { + @Override + public void run() { + close(ctx, future); + } + }); } } else { channel().unsafe().close(future); @@ -898,14 +1051,23 @@ public class DefaultChannelPipeline implements ChannelPipeline { return deregister(firstOutboundContext(), future); } - @SuppressWarnings("unchecked") - private ChannelFuture deregister(DefaultChannelHandlerContext ctx, ChannelFuture future) { + ChannelFuture deregister(final DefaultChannelHandlerContext ctx, final ChannelFuture future) { validateFuture(future); if (ctx != null) { - try { - ((ChannelOutboundHandler) ctx.handler()).deregister(ctx, future); - } catch (Throwable t) { - notifyHandlerException(t); + EventExecutor executor = ctx.executor(); + if (executor.inEventLoop()) { + try { + ((ChannelOutboundHandler) ctx.handler()).deregister(ctx, future); + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + executor.execute(new Runnable() { + @Override + public void run() { + deregister(ctx, future); + } + }); } } else { channel().unsafe().deregister(future); @@ -919,19 +1081,28 @@ public class DefaultChannelPipeline implements ChannelPipeline { return flush(firstOutboundContext(), future); } - @SuppressWarnings("unchecked") - private ChannelFuture flush(DefaultChannelHandlerContext ctx, ChannelFuture future) { + ChannelFuture flush(final DefaultChannelHandlerContext ctx, final ChannelFuture future) { validateFuture(future); if (ctx != null) { - try { - ((ChannelOutboundHandler) ctx.handler()).flush(ctx, future); - } catch (Throwable t) { - notifyHandlerException(t); - } finally { - ChannelBufferHolder outbound = ctx.outbound(); - if (!outbound.isBypass() && outbound.isEmpty() && outbound.hasByteBuffer()) { - outbound.byteBuffer().discardReadBytes(); + EventExecutor executor = ctx.executor(); + if (executor.inEventLoop()) { + try { + ((ChannelOutboundHandler) ctx.handler()).flush(ctx, future); + } catch (Throwable t) { + notifyHandlerException(t); + } finally { + ChannelBufferHolder outbound = ctx.outbound(); + if (!outbound.isBypass() && outbound.isEmpty() && outbound.hasByteBuffer()) { + outbound.byteBuffer().discardReadBytes(); + } } + } else { + executor.execute(new Runnable() { + @Override + public void run() { + flush(ctx, future); + } + }); } } else { channel().unsafe().flush(future); @@ -942,24 +1113,47 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelFuture write(Object message, ChannelFuture future) { + return write(firstOutboundContext(), message, future); + } + + ChannelFuture write(final DefaultChannelHandlerContext ctx, final Object message, final ChannelFuture future) { if (message == null) { throw new NullPointerException("message"); } validateFuture(future); - ChannelBufferHolder out = outbound(); - if (out.hasMessageBuffer()) { - out.messageBuffer().add(message); - } else if (message instanceof ChannelBuffer) { - ChannelBuffer m = (ChannelBuffer) message; - out.byteBuffer().writeBytes(m, m.readerIndex(), m.readableBytes()); + EventExecutor executor; + ChannelBufferHolder out; + if (ctx != null) { + executor = ctx.executor(); + out = ctx.outbound(); } else { - throw new IllegalArgumentException( - "cannot write a message whose type is not " + - ChannelBuffer.class.getSimpleName() + ": " + message.getClass().getName()); + executor = channel().eventLoop(); + out = channel().unsafe().directOutbound(); } - return flush(future); + if (executor.inEventLoop()) { + if (out.hasMessageBuffer()) { + out.messageBuffer().add(message); + } else if (message instanceof ChannelBuffer) { + ChannelBuffer m = (ChannelBuffer) message; + out.byteBuffer().writeBytes(m, m.readerIndex(), m.readableBytes()); + } else { + throw new IllegalArgumentException( + "cannot write a message whose type is not " + + ChannelBuffer.class.getSimpleName() + ": " + message.getClass().getName()); + } + return flush(ctx, future); + } else { + executor.execute(new Runnable() { + @Override + public void run() { + write(ctx, message, future); + } + }); + } + + return future; } private void validateFuture(ChannelFuture future) { @@ -986,7 +1180,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { return nextOutboundContext(tail); } - private static DefaultChannelHandlerContext nextInboundContext(DefaultChannelHandlerContext ctx) { + static DefaultChannelHandlerContext nextInboundContext(DefaultChannelHandlerContext ctx) { if (ctx == null) { return null; } @@ -1002,7 +1196,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { return realCtx; } - private static DefaultChannelHandlerContext nextOutboundContext(DefaultChannelHandlerContext ctx) { + static DefaultChannelHandlerContext nextOutboundContext(DefaultChannelHandlerContext ctx) { if (ctx == null) { return null; } @@ -1052,8 +1246,9 @@ public class DefaultChannelPipeline implements ChannelPipeline { return inExceptionCaught(cause.getCause()); } - private void init(String name, ChannelHandler handler) { - DefaultChannelHandlerContext ctx = new DefaultChannelHandlerContext(null, null, name, handler); + private void init(EventExecutor executor, String name, ChannelHandler handler) { + DefaultChannelHandlerContext ctx = + new DefaultChannelHandlerContext(this, executor, null, null, name, handler); callBeforeAdd(ctx); head = tail = ctx; name2ctx.clear(); @@ -1093,339 +1288,4 @@ public class DefaultChannelPipeline implements ChannelPipeline { return ctx; } } - - private final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelInboundHandlerContext, ChannelOutboundHandlerContext { - volatile DefaultChannelHandlerContext next; - volatile DefaultChannelHandlerContext prev; - private final String name; - private final ChannelHandler handler; - private final boolean canHandleInbound; - private final boolean canHandleOutbound; - private final ChannelBufferHolder in; - private final ChannelBufferHolder out; - - @SuppressWarnings("unchecked") - DefaultChannelHandlerContext( - DefaultChannelHandlerContext prev, DefaultChannelHandlerContext next, - String name, ChannelHandler handler) { - - if (name == null) { - throw new NullPointerException("name"); - } - if (handler == null) { - throw new NullPointerException("handler"); - } - canHandleInbound = handler instanceof ChannelInboundHandler; - canHandleOutbound = handler instanceof ChannelOutboundHandler; - - if (!canHandleInbound && !canHandleOutbound) { - throw new IllegalArgumentException( - "handler must be either " + - ChannelInboundHandler.class.getName() + " or " + - ChannelOutboundHandler.class.getName() + '.'); - } - - this.prev = prev; - this.next = next; - this.name = name; - this.handler = handler; - - if (canHandleInbound) { - try { - in = ((ChannelInboundHandler) handler).newInboundBuffer(this); - } catch (Exception e) { - throw new ChannelPipelineException("A user handler failed to create a new inbound buffer.", e); - } - } else { - in = null; - } - if (canHandleOutbound) { - try { - out = ((ChannelOutboundHandler) handler).newOutboundBuffer(this); - } catch (Exception e) { - throw new ChannelPipelineException("A user handler failed to create a new outbound buffer.", e); - } finally { - if (in != null) { - // TODO Release the inbound buffer once pooling is implemented. - } - } - } else { - out = null; - } - } - - @Override - public Channel channel() { - return DefaultChannelPipeline.this.channel(); - } - - @Override - public ChannelPipeline pipeline() { - return DefaultChannelPipeline.this; - } - - @Override - public EventLoop eventLoop() { - return channel().eventLoop(); - } - - @Override - public ChannelHandler handler() { - return handler; - } - - @Override - public String name() { - return name; - } - - @Override - public boolean canHandleInbound() { - return canHandleInbound; - } - - @Override - public boolean canHandleOutbound() { - return canHandleOutbound; - } - - @Override - public ChannelBufferHolder inbound() { - return in; - } - - @Override - public ChannelBufferHolder outbound() { - return out; - } - - @Override - public ChannelBuffer nextInboundByteBuffer() { - DefaultChannelHandlerContext ctx = this; - for (;;) { - ctx = nextInboundContext(ctx.next); - if (ctx == null) { - throw NoSuchBufferException.INSTANCE; - } - ChannelBufferHolder nextIn = ctx.inbound(); - if (nextIn.hasByteBuffer()) { - return nextIn.byteBuffer(); - } - } - } - - @Override - public Queue nextInboundMessageBuffer() { - DefaultChannelHandlerContext ctx = this; - for (;;) { - ctx = nextInboundContext(ctx.next); - if (ctx == null) { - throw NoSuchBufferException.INSTANCE; - } - ChannelBufferHolder nextIn = ctx.inbound(); - if (nextIn.hasMessageBuffer()) { - return nextIn.messageBuffer(); - } - } - } - - @Override - public ChannelBuffer nextOutboundByteBuffer() { - DefaultChannelHandlerContext ctx = this; - for (;;) { - ctx = nextOutboundContext(ctx.prev); - if (ctx == null) { - ChannelBufferHolder lastOut = channel().unsafe().directOutbound(); - if (lastOut.hasByteBuffer()) { - return lastOut.byteBuffer(); - } else { - throw NoSuchBufferException.INSTANCE; - } - } - ChannelBufferHolder nextOut = ctx.outbound(); - if (nextOut.hasByteBuffer()) { - return nextOut.byteBuffer(); - } - } - } - - @Override - public Queue nextOutboundMessageBuffer() { - DefaultChannelHandlerContext ctx = this; - for (;;) { - ctx = nextOutboundContext(ctx.prev); - if (ctx == null) { - ChannelBufferHolder lastOut = channel().unsafe().directOutbound(); - if (lastOut.hasMessageBuffer()) { - return lastOut.messageBuffer(); - } else { - throw NoSuchBufferException.INSTANCE; - } - } - ChannelBufferHolder nextOut = ctx.outbound(); - if (nextOut.hasMessageBuffer()) { - return nextOut.messageBuffer(); - } - } - } - - @Override - public void fireChannelRegistered() { - DefaultChannelHandlerContext next = nextInboundContext(this.next); - if (next != null) { - DefaultChannelPipeline.this.fireChannelRegistered(next); - } - } - - @Override - public void fireChannelUnregistered() { - DefaultChannelHandlerContext next = nextInboundContext(this.next); - if (next != null) { - DefaultChannelPipeline.this.fireChannelUnregistered(next); - } - } - - @Override - public void fireChannelActive() { - DefaultChannelHandlerContext next = nextInboundContext(this.next); - if (next != null) { - DefaultChannelPipeline.this.fireChannelActive(next); - } - } - - @Override - public void fireChannelInactive() { - DefaultChannelHandlerContext next = nextInboundContext(this.next); - if (next != null) { - DefaultChannelPipeline.this.fireChannelInactive(next); - } - } - - @Override - public void fireExceptionCaught(Throwable cause) { - DefaultChannelHandlerContext next = nextInboundContext(this.next); - if (next != null) { - DefaultChannelPipeline.this.fireExceptionCaught(next, cause); - } else { - logTerminalException(cause); - } - } - - @Override - public void fireUserEventTriggered(Object event) { - DefaultChannelHandlerContext next = nextInboundContext(this.next); - if (next != null) { - DefaultChannelPipeline.this.fireUserEventTriggered(next, event); - } - } - - @Override - public void fireInboundBufferUpdated() { - DefaultChannelHandlerContext next = nextInboundContext(this.next); - if (next != null) { - DefaultChannelPipeline.this.fireInboundBufferUpdated(next); - } - } - - @Override - public ChannelFuture bind(SocketAddress localAddress) { - return bind(localAddress, newFuture()); - } - - @Override - public ChannelFuture connect(SocketAddress remoteAddress) { - return connect(remoteAddress, newFuture()); - } - - @Override - public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { - return connect(remoteAddress, localAddress, newFuture()); - } - - @Override - public ChannelFuture disconnect() { - return disconnect(newFuture()); - } - - @Override - public ChannelFuture close() { - return close(newFuture()); - } - - @Override - public ChannelFuture deregister() { - return deregister(newFuture()); - } - - @Override - public ChannelFuture flush() { - return flush(newFuture()); - } - - @Override - public ChannelFuture write(Object message) { - return write(message, newFuture()); - } - - @Override - public ChannelFuture bind(SocketAddress localAddress, ChannelFuture future) { - return DefaultChannelPipeline.this.bind(nextOutboundContext(prev), localAddress, future); - } - - @Override - public ChannelFuture connect(SocketAddress remoteAddress, ChannelFuture future) { - return connect(remoteAddress, null, future); - } - - @Override - public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) { - return DefaultChannelPipeline.this.connect(nextOutboundContext(prev), remoteAddress, localAddress, future); - } - - @Override - public ChannelFuture disconnect(ChannelFuture future) { - return DefaultChannelPipeline.this.disconnect(nextOutboundContext(prev), future); - } - - @Override - public ChannelFuture close(ChannelFuture future) { - return DefaultChannelPipeline.this.close(nextOutboundContext(prev), future); - } - - @Override - public ChannelFuture deregister(ChannelFuture future) { - return DefaultChannelPipeline.this.deregister(nextOutboundContext(prev), future); - } - - @Override - public ChannelFuture flush(ChannelFuture future) { - return DefaultChannelPipeline.this.flush(nextOutboundContext(prev), future); - } - - @Override - public ChannelFuture write(Object message, ChannelFuture future) { - if (message instanceof ChannelBuffer) { - ChannelBuffer m = (ChannelBuffer) message; - nextOutboundByteBuffer().writeBytes(m, m.readerIndex(), m.readableBytes()); - } else { - nextOutboundMessageBuffer().add(message); - } - return flush(future); - } - - @Override - public ChannelFuture newFuture() { - return channel().newFuture(); - } - - @Override - public ChannelFuture newSucceededFuture() { - return channel().newSucceededFuture(); - } - - @Override - public ChannelFuture newFailedFuture(Throwable cause) { - return channel().newFailedFuture(cause); - } - } } diff --git a/transport/src/main/java/io/netty/channel/DefaultChildEventExecutor.java b/transport/src/main/java/io/netty/channel/DefaultChildEventExecutor.java new file mode 100644 index 0000000000..9f1bd6b00f --- /dev/null +++ b/transport/src/main/java/io/netty/channel/DefaultChildEventExecutor.java @@ -0,0 +1,34 @@ +package io.netty.channel; + +import java.util.concurrent.ThreadFactory; + +class DefaultChildEventExecutor extends SingleThreadEventExecutor { + + DefaultChildEventExecutor(EventExecutor parent, ThreadFactory threadFactory) { + super(parent, threadFactory); + } + + @Override + protected void run() { + for (;;) { + Runnable task; + try { + task = takeTask(); + task.run(); + } catch (InterruptedException e) { + // Waken up by interruptThread() + } + + if (isShutdown() && peekTask() == null) { + break; + } + } + } + + @Override + protected void wakeup(boolean inEventLoop) { + if (!inEventLoop) { + interruptThread(); + } + } +} diff --git a/transport/src/main/java/io/netty/channel/DefaultEventExecutor.java b/transport/src/main/java/io/netty/channel/DefaultEventExecutor.java new file mode 100644 index 0000000000..fa183e889c --- /dev/null +++ b/transport/src/main/java/io/netty/channel/DefaultEventExecutor.java @@ -0,0 +1,19 @@ +package io.netty.channel; + +import java.util.concurrent.ThreadFactory; + +public class DefaultEventExecutor extends MultithreadEventExecutor { + + public DefaultEventExecutor(int nThreads) { + super(nThreads); + } + + public DefaultEventExecutor(int nThreads, ThreadFactory threadFactory) { + super(nThreads, threadFactory); + } + + @Override + protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { + return new DefaultChildEventExecutor(this, threadFactory); + } +} diff --git a/transport/src/main/java/io/netty/channel/EventExecutor.java b/transport/src/main/java/io/netty/channel/EventExecutor.java new file mode 100644 index 0000000000..3103f5c114 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/EventExecutor.java @@ -0,0 +1,13 @@ +package io.netty.channel; + +import java.util.concurrent.ScheduledExecutorService; + +public interface EventExecutor extends ScheduledExecutorService { + EventExecutor parent(); + boolean inEventLoop(); + Unsafe unsafe(); + + public interface Unsafe { + EventExecutor nextChild(); + } +} diff --git a/transport/src/main/java/io/netty/channel/EventLoop.java b/transport/src/main/java/io/netty/channel/EventLoop.java index bd2a702c90..2013d1d36d 100644 --- a/transport/src/main/java/io/netty/channel/EventLoop.java +++ b/transport/src/main/java/io/netty/channel/EventLoop.java @@ -1,9 +1,8 @@ package io.netty.channel; -import java.util.concurrent.ScheduledExecutorService; - -public interface EventLoop extends ScheduledExecutorService { +public interface EventLoop extends EventExecutor { + @Override + EventLoop parent(); ChannelFuture register(Channel channel); ChannelFuture register(Channel channel, ChannelFuture future); - boolean inEventLoop(); } diff --git a/transport/src/main/java/io/netty/channel/EventLoopFactory.java b/transport/src/main/java/io/netty/channel/EventLoopFactory.java deleted file mode 100644 index f3ca41c54f..0000000000 --- a/transport/src/main/java/io/netty/channel/EventLoopFactory.java +++ /dev/null @@ -1,7 +0,0 @@ -package io.netty.channel; - -import java.util.concurrent.ThreadFactory; - -public interface EventLoopFactory { - T newEventLoop(ThreadFactory threadFactory) throws Exception; -} diff --git a/transport/src/main/java/io/netty/channel/MultithreadEventExecutor.java b/transport/src/main/java/io/netty/channel/MultithreadEventExecutor.java new file mode 100644 index 0000000000..c13fe46716 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/MultithreadEventExecutor.java @@ -0,0 +1,209 @@ +package io.netty.channel; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +public abstract class MultithreadEventExecutor implements EventExecutor { + + protected static final int DEFAULT_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2; + protected static final ThreadFactory DEFAULT_THREAD_FACTORY = Executors.defaultThreadFactory(); + + private final EventExecutor[] children; + private final AtomicInteger childIndex = new AtomicInteger(); + private final Unsafe unsafe = new Unsafe() { + @Override + public EventExecutor nextChild() { + return children[Math.abs(childIndex.getAndIncrement() % children.length)]; + } + }; + + protected MultithreadEventExecutor(Object... args) { + this(DEFAULT_POOL_SIZE, args); + } + + protected MultithreadEventExecutor(int nThreads, Object... args) { + this(nThreads, DEFAULT_THREAD_FACTORY, args); + } + + protected MultithreadEventExecutor(int nThreads, ThreadFactory threadFactory, Object... args) { + if (nThreads <= 0) { + throw new IllegalArgumentException(String.format( + "nThreads: %d (expected: > 0)", nThreads)); + } + if (threadFactory == null) { + throw new NullPointerException("threadFactory"); + } + + children = new SingleThreadEventExecutor[nThreads]; + for (int i = 0; i < nThreads; i ++) { + boolean success = false; + try { + children[i] = newChild(threadFactory, args); + success = true; + } catch (Exception e) { + throw new EventLoopException("failed to create a child event loop", e); + } finally { + if (!success) { + for (int j = 0; j < i; j ++) { + children[j].shutdown(); + } + } + } + } + } + + protected abstract EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception; + + @Override + public EventExecutor parent() { + return null; + } + + @Override + public Unsafe unsafe() { + return unsafe; + } + + @Override + public void shutdown() { + for (EventExecutor l: children) { + l.shutdown(); + } + } + + @Override + public List shutdownNow() { + for (EventExecutor l: children) { + l.shutdownNow(); + } + return Collections.emptyList(); + } + + @Override + public boolean isShutdown() { + for (EventExecutor l: children) { + if (!l.isShutdown()) { + return false; + } + } + return true; + } + + @Override + public boolean isTerminated() { + for (EventExecutor l: children) { + if (!l.isTerminated()) { + return false; + } + } + return true; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException { + long deadline = System.nanoTime() + unit.toNanos(timeout); + loop: for (EventExecutor l: children) { + for (;;) { + long timeLeft = deadline - System.nanoTime(); + if (timeLeft <= 0) { + break loop; + } + if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) { + break; + } + } + } + return isTerminated(); + } + + @Override + public Future submit(Callable task) { + return currentEventLoop().submit(task); + } + + @Override + public Future submit(Runnable task, T result) { + return currentEventLoop().submit(task, result); + } + + @Override + public Future submit(Runnable task) { + return currentEventLoop().submit(task); + } + + @Override + public List> invokeAll(Collection> tasks) + throws InterruptedException { + return currentEventLoop().invokeAll(tasks); + } + + @Override + public List> invokeAll( + Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + return currentEventLoop().invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) + throws InterruptedException, ExecutionException { + return currentEventLoop().invokeAny(tasks); + } + + @Override + public T invokeAny(Collection> tasks, + long timeout, TimeUnit unit) throws InterruptedException, + ExecutionException, TimeoutException { + return currentEventLoop().invokeAny(tasks, timeout, unit); + } + + @Override + public void execute(Runnable command) { + currentEventLoop().execute(command); + } + + @Override + public ScheduledFuture schedule(Runnable command, long delay, + TimeUnit unit) { + return currentEventLoop().schedule(command, delay, unit); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return currentEventLoop().schedule(callable, delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return currentEventLoop().scheduleAtFixedRate(command, initialDelay, period, unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return currentEventLoop().scheduleWithFixedDelay(command, initialDelay, delay, unit); + } + + @Override + public boolean inEventLoop() { + return SingleThreadEventExecutor.currentEventLoop() != null; + } + + private static EventExecutor currentEventLoop() { + EventExecutor loop = SingleThreadEventExecutor.currentEventLoop(); + if (loop == null) { + throw new IllegalStateException("not called from an event loop thread"); + } + return loop; + } +} diff --git a/transport/src/main/java/io/netty/channel/MultithreadEventLoop.java b/transport/src/main/java/io/netty/channel/MultithreadEventLoop.java index ed910c9452..eae825b4f5 100644 --- a/transport/src/main/java/io/netty/channel/MultithreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/MultithreadEventLoop.java @@ -1,208 +1,38 @@ package io.netty.channel; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; -public class MultithreadEventLoop implements EventLoop { +public abstract class MultithreadEventLoop extends MultithreadEventExecutor implements EventLoop { - protected static final int DEFAULT_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2; - protected static final ThreadFactory DEFAULT_THREAD_FACTORY = Executors.defaultThreadFactory(); - private final EventLoop[] children; - private final AtomicInteger childIndex = new AtomicInteger(); - - public MultithreadEventLoop(EventLoopFactory loopFactory) { - this(loopFactory, DEFAULT_POOL_SIZE); + protected MultithreadEventLoop(int nThreads, Object... args) { + super(nThreads, args); } - public MultithreadEventLoop(EventLoopFactory loopFactory, int nThreads) { - this(loopFactory, nThreads, DEFAULT_THREAD_FACTORY); + protected MultithreadEventLoop(int nThreads, ThreadFactory threadFactory, + Object... args) { + super(nThreads, threadFactory, args); } - public MultithreadEventLoop(EventLoopFactory loopFactory, int nThreads, ThreadFactory threadFactory) { - if (loopFactory == null) { - throw new NullPointerException("loopFactory"); - } - if (nThreads <= 0) { - throw new IllegalArgumentException(String.format( - "nThreads: %d (expected: > 0)", nThreads)); - } - if (threadFactory == null) { - throw new NullPointerException("threadFactory"); - } - - children = new EventLoop[nThreads]; - for (int i = 0; i < nThreads; i ++) { - boolean success = false; - try { - children[i] = loopFactory.newEventLoop(threadFactory); - success = true; - } catch (Exception e) { - throw new EventLoopException("failed to create a child event loop", e); - } finally { - if (!success) { - for (int j = 0; j < i; j ++) { - children[j].shutdown(); - } - } - } - } + protected MultithreadEventLoop(Object... args) { + super(args); } @Override - public void shutdown() { - for (EventLoop l: children) { - l.shutdown(); - } - } + protected abstract EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception; @Override - public List shutdownNow() { - for (EventLoop l: children) { - l.shutdownNow(); - } - return Collections.emptyList(); - } - - @Override - public boolean isShutdown() { - for (EventLoop l: children) { - if (!l.isShutdown()) { - return false; - } - } - return true; - } - - @Override - public boolean isTerminated() { - for (EventLoop l: children) { - if (!l.isTerminated()) { - return false; - } - } - return true; - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) - throws InterruptedException { - long deadline = System.nanoTime() + unit.toNanos(timeout); - loop: for (EventLoop l: children) { - for (;;) { - long timeLeft = deadline - System.nanoTime(); - if (timeLeft <= 0) { - break loop; - } - if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) { - break; - } - } - } - return isTerminated(); - } - - @Override - public Future submit(Callable task) { - return currentEventLoop().submit(task); - } - - @Override - public Future submit(Runnable task, T result) { - return currentEventLoop().submit(task, result); - } - - @Override - public Future submit(Runnable task) { - return currentEventLoop().submit(task); - } - - @Override - public List> invokeAll(Collection> tasks) - throws InterruptedException { - return currentEventLoop().invokeAll(tasks); - } - - @Override - public List> invokeAll( - Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException { - return currentEventLoop().invokeAll(tasks, timeout, unit); - } - - @Override - public T invokeAny(Collection> tasks) - throws InterruptedException, ExecutionException { - return currentEventLoop().invokeAny(tasks); - } - - @Override - public T invokeAny(Collection> tasks, - long timeout, TimeUnit unit) throws InterruptedException, - ExecutionException, TimeoutException { - return currentEventLoop().invokeAny(tasks, timeout, unit); - } - - @Override - public void execute(Runnable command) { - currentEventLoop().execute(command); - } - - @Override - public ScheduledFuture schedule(Runnable command, long delay, - TimeUnit unit) { - return currentEventLoop().schedule(command, delay, unit); - } - - @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - return currentEventLoop().schedule(callable, delay, unit); - } - - @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - return currentEventLoop().scheduleAtFixedRate(command, initialDelay, period, unit); - } - - @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - return currentEventLoop().scheduleWithFixedDelay(command, initialDelay, delay, unit); + public EventLoop parent() { + return (EventLoop) super.parent(); } @Override public ChannelFuture register(Channel channel) { - return nextEventLoop().register(channel); + return ((EventLoop) unsafe().nextChild()).register(channel); } @Override public ChannelFuture register(Channel channel, ChannelFuture future) { - return nextEventLoop().register(channel, future); - } - - @Override - public boolean inEventLoop() { - return SingleThreadEventLoop.CURRENT_EVENT_LOOP.get() != null; - } - - private EventLoop nextEventLoop() { - return children[Math.abs(childIndex.getAndIncrement() % children.length)]; - } - - private static SingleThreadEventLoop currentEventLoop() { - SingleThreadEventLoop loop = SingleThreadEventLoop.CURRENT_EVENT_LOOP.get(); - if (loop == null) { - throw new IllegalStateException("not called from an event loop thread"); - } - return loop; + return ((EventLoop) unsafe().nextChild()).register(channel, future); } } diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java b/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java new file mode 100644 index 0000000000..9f4dbdba59 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java @@ -0,0 +1,581 @@ +package io.netty.channel; + +import io.netty.logging.InternalLogger; +import io.netty.logging.InternalLoggerFactory; +import io.netty.util.internal.QueueFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +public abstract class SingleThreadEventExecutor extends AbstractExecutorService implements EventExecutor { + + private static final InternalLogger logger = + InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class); + + private static final long SCHEDULE_CHECK_INTERVAL = TimeUnit.MILLISECONDS.toNanos(10); + private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1); + private static final long START_TIME = System.nanoTime(); + private static final AtomicLong nextTaskId = new AtomicLong(); + + static final ThreadLocal CURRENT_EVENT_LOOP = new ThreadLocal(); + + public static SingleThreadEventExecutor currentEventLoop() { + return CURRENT_EVENT_LOOP.get(); + } + + private static long nanoTime() { + return System.nanoTime() - START_TIME; + } + + private static long deadlineNanos(long delay) { + return nanoTime() + delay; + } + + private final EventExecutor parent; + private final Unsafe unsafe = new Unsafe() { + @Override + public EventExecutor nextChild() { + return SingleThreadEventExecutor.this; + } + }; + + private final BlockingQueue taskQueue = QueueFactory.createQueue(); + private final Thread thread; + private final Object stateLock = new Object(); + private final Semaphore threadLock = new Semaphore(0); + // TODO: Use PriorityQueue to reduce the locking overhead of DelayQueue. + private final Queue> scheduledTasks = new DelayQueue>(); + private final Set shutdownHooks = new LinkedHashSet(); + /** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */ + private volatile int state; + private long lastCheckTimeNanos; + private long lastPurgeTimeNanos; + + protected SingleThreadEventExecutor(EventExecutor parent) { + this(parent, Executors.defaultThreadFactory()); + } + + protected SingleThreadEventExecutor(EventExecutor parent, ThreadFactory threadFactory) { + this.parent = parent; + thread = threadFactory.newThread(new Runnable() { + @Override + public void run() { + CURRENT_EVENT_LOOP.set(SingleThreadEventExecutor.this); + try { + SingleThreadEventExecutor.this.run(); + } finally { + synchronized (stateLock) { + state = 3; + } + try { + cancelScheduledTasks(); + runShutdownHooks(); + cleanup(); + } finally { + threadLock.release(); + assert taskQueue.isEmpty(); + } + } + } + }); + } + + @Override + public EventExecutor parent() { + return parent; + } + + @Override + public Unsafe unsafe() { + return unsafe; + } + + protected void interruptThread() { + thread.interrupt(); + } + + protected Runnable pollTask() { + assert inEventLoop(); + + Runnable task = taskQueue.poll(); + if (task != null) { + return task; + } + + if (fetchScheduledTasks()) { + task = taskQueue.poll(); + return task; + } + + return null; + } + + protected Runnable takeTask() throws InterruptedException { + assert inEventLoop(); + + for (;;) { + Runnable task = taskQueue.poll(SCHEDULE_CHECK_INTERVAL * 2 / 3, TimeUnit.NANOSECONDS); + if (task != null) { + return task; + } + fetchScheduledTasks(); + task = taskQueue.poll(); + if (task != null) { + return task; + } + } + } + + protected Runnable peekTask() { + assert inEventLoop(); + + Runnable task = taskQueue.peek(); + if (task != null) { + return task; + } + + if (fetchScheduledTasks()) { + task = taskQueue.peek(); + return task; + } + + return null; + } + + protected boolean hasTasks() { + assert inEventLoop(); + + boolean empty = taskQueue.isEmpty(); + if (!empty) { + return true; + } + + if (fetchScheduledTasks()) { + return !taskQueue.isEmpty(); + } + + return false; + } + + protected void addTask(Runnable task) { + if (task == null) { + throw new NullPointerException("task"); + } + if (isShutdown()) { + reject(); + } + taskQueue.add(task); + } + + protected boolean removeTask(Runnable task) { + if (task == null) { + throw new NullPointerException("task"); + } + return taskQueue.remove(task); + } + + protected void runAllTasks() { + for (;;) { + final Runnable task = pollTask(); + if (task == null) { + break; + } + + task.run(); + } + } + + protected abstract void run(); + + protected void cleanup() { + // Do nothing. Subclasses will override. + } + + protected abstract void wakeup(boolean inEventLoop); + + @Override + public boolean inEventLoop() { + return Thread.currentThread() == thread; + } + + public void addShutdownHook(final Runnable task) { + if (inEventLoop()) { + shutdownHooks.add(task); + } else { + execute(new Runnable() { + @Override + public void run() { + shutdownHooks.add(task); + } + }); + } + } + + public void removeShutdownHook(final Runnable task) { + if (inEventLoop()) { + shutdownHooks.remove(task); + } else { + execute(new Runnable() { + @Override + public void run() { + shutdownHooks.remove(task); + } + }); + } + } + + private void runShutdownHooks() { + // Note shutdown hooks can add / remove shutdown hooks. + while (!shutdownHooks.isEmpty()) { + List copy = new ArrayList(shutdownHooks); + shutdownHooks.clear(); + for (Runnable task: copy) { + try { + task.run(); + } catch (Throwable t) { + logger.warn("Shutdown hook raised an exception.", t); + } + } + } + } + + @Override + public void shutdown() { + boolean inEventLoop = inEventLoop(); + boolean wakeup = false; + if (inEventLoop) { + synchronized (stateLock) { + assert state == 1; + state = 2; + wakeup = true; + } + } else { + synchronized (stateLock) { + switch (state) { + case 0: + state = 3; + try { + cleanup(); + } finally { + threadLock.release(); + } + break; + case 1: + state = 2; + wakeup = true; + break; + } + } + } + + if (wakeup) { + wakeup(inEventLoop); + } + } + + @Override + public List shutdownNow() { + shutdown(); + return Collections.emptyList(); + } + + @Override + public boolean isShutdown() { + return state >= 2; + } + + @Override + public boolean isTerminated() { + return state == 3; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + if (unit == null) { + throw new NullPointerException("unit"); + } + + if (inEventLoop()) { + throw new IllegalStateException("cannot await termination of the current thread"); + } + + if (threadLock.tryAcquire(timeout, unit)) { + threadLock.release(); + } + + return isTerminated(); + } + + @Override + public void execute(Runnable task) { + if (task == null) { + throw new NullPointerException("task"); + } + + if (inEventLoop()) { + addTask(task); + wakeup(true); + } else { + synchronized (stateLock) { + if (state == 0) { + state = 1; + thread.start(); + } + } + addTask(task); + if (isShutdown() && removeTask(task)) { + reject(); + } + wakeup(false); + } + } + + private static void reject() { + throw new RejectedExecutionException("event loop shut down"); + } + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + if (command == null) { + throw new NullPointerException("command"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + if (delay < 0) { + throw new IllegalArgumentException( + String.format("delay: %d (expected: >= 0)", delay)); + } + return schedule(new ScheduledFutureTask(command, null, deadlineNanos(unit.toNanos(delay)))); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + if (callable == null) { + throw new NullPointerException("callable"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + if (delay < 0) { + throw new IllegalArgumentException( + String.format("delay: %d (expected: >= 0)", delay)); + } + return schedule(new ScheduledFutureTask(callable, deadlineNanos(unit.toNanos(delay)))); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + if (command == null) { + throw new NullPointerException("command"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + if (initialDelay < 0) { + throw new IllegalArgumentException( + String.format("initialDelay: %d (expected: >= 0)", initialDelay)); + } + if (period <= 0) { + throw new IllegalArgumentException( + String.format("period: %d (expected: > 0)", period)); + } + + return schedule(new ScheduledFutureTask( + command, null, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period))); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + if (command == null) { + throw new NullPointerException("command"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + if (initialDelay < 0) { + throw new IllegalArgumentException( + String.format("initialDelay: %d (expected: >= 0)", initialDelay)); + } + if (delay <= 0) { + throw new IllegalArgumentException( + String.format("delay: %d (expected: > 0)", delay)); + } + + return schedule(new ScheduledFutureTask( + command, null, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay))); + } + + private ScheduledFuture schedule(ScheduledFutureTask task) { + if (isShutdown()) { + reject(); + } + scheduledTasks.add(task); + if (isShutdown()) { + task.cancel(false); + } + + if (!inEventLoop()) { + synchronized (stateLock) { + if (state == 0) { + state = 1; + thread.start(); + } + } + } else { + fetchScheduledTasks(); + } + + return task; + } + + private boolean fetchScheduledTasks() { + if (scheduledTasks.isEmpty()) { + return false; + } + + long nanoTime = nanoTime(); + if (nanoTime - lastPurgeTimeNanos >= SCHEDULE_PURGE_INTERVAL) { + for (Iterator> i = scheduledTasks.iterator(); i.hasNext();) { + ScheduledFutureTask task = i.next(); + if (task.isCancelled()) { + i.remove(); + } + } + } + + if (nanoTime - lastCheckTimeNanos >= SCHEDULE_CHECK_INTERVAL) { + boolean added = false; + for (;;) { + ScheduledFutureTask task = scheduledTasks.poll(); + if (task == null) { + break; + } + + if (!task.isCancelled()) { + if (isShutdown()) { + task.cancel(false); + } else { + taskQueue.add(task); + added = true; + } + } + } + return added; + } + + return false; + } + + private void cancelScheduledTasks() { + if (scheduledTasks.isEmpty()) { + return; + } + + for (ScheduledFutureTask task: scheduledTasks.toArray(new ScheduledFutureTask[scheduledTasks.size()])) { + task.cancel(false); + } + scheduledTasks.clear(); + } + + private class ScheduledFutureTask extends FutureTask implements ScheduledFuture { + + private final long id = nextTaskId.getAndIncrement(); + private long deadlineNanos; + /** 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */ + private final long periodNanos; + + ScheduledFutureTask(Runnable runnable, V result, long nanoTime) { + super(runnable, result); + this.deadlineNanos = nanoTime; + this.periodNanos = 0; + } + + ScheduledFutureTask(Runnable runnable, V result, long nanoTime, long period) { + super(runnable, result); + if (period == 0) { + throw new IllegalArgumentException( + String.format("period: %d (expected: != 0)", period)); + } + this.deadlineNanos = nanoTime; + this.periodNanos = period; + } + + ScheduledFutureTask(Callable callable, long nanoTime) { + super(callable); + this.deadlineNanos = nanoTime; + this.periodNanos = 0; + } + + public long deadlineNanos() { + return deadlineNanos; + } + + public long delayNanos() { + return Math.max(0, deadlineNanos() - nanoTime()); + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(delayNanos(), TimeUnit.NANOSECONDS); + } + + @Override + public int compareTo(Delayed o) { + if (this == o) { + return 0; + } + + ScheduledFutureTask that = (ScheduledFutureTask) o; + long d = deadlineNanos() - that.deadlineNanos(); + if (d < 0) { + return -1; + } else if (d > 0) { + return 1; + } else if (id < that.id) { + return -1; + } else if (id == that.id) { + throw new Error(); + } else { + return 1; + } + } + + @Override + public void run() { + if (periodNanos == 0) { + super.run(); + } else { + boolean reset = runAndReset(); + if (reset && !isShutdown()) { + long p = periodNanos; + if (p > 0) { + deadlineNanos += p; + } else { + deadlineNanos = nanoTime() - p; + } + + schedule(this); + } + } + } + } +} diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index 5e6c9f22b3..0602d5f4e3 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -1,93 +1,20 @@ package io.netty.channel; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; -import io.netty.util.internal.QueueFactory; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.AbstractExecutorService; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.DelayQueue; -import java.util.concurrent.Delayed; -import java.util.concurrent.Executors; -import java.util.concurrent.FutureTask; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -public abstract class SingleThreadEventLoop extends AbstractExecutorService implements EventLoop { +public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { - private static final InternalLogger logger = - InternalLoggerFactory.getInstance(SingleThreadEventLoop.class); - - private static final long SCHEDULE_CHECK_INTERVAL = TimeUnit.MILLISECONDS.toNanos(10); - private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1); - private static final long START_TIME = System.nanoTime(); - private static final AtomicLong nextTaskId = new AtomicLong(); - - static final ThreadLocal CURRENT_EVENT_LOOP = new ThreadLocal(); - - public static SingleThreadEventLoop currentEventLoop() { - return CURRENT_EVENT_LOOP.get(); + protected SingleThreadEventLoop(EventLoop parent) { + super(parent); } - private static long nanoTime() { - return System.nanoTime() - START_TIME; + protected SingleThreadEventLoop(EventLoop parent, ThreadFactory threadFactory) { + super(parent, threadFactory); } - private static long deadlineNanos(long delay) { - return nanoTime() + delay; - } - - // Fields for event loop - private final BlockingQueue taskQueue = QueueFactory.createQueue(); - private final Thread thread; - private final Object stateLock = new Object(); - private final Semaphore threadLock = new Semaphore(0); - // TODO: Use PriorityQueue to reduce the locking overhead of DelayQueue. - private final Queue> scheduledTasks = new DelayQueue>(); - private final Set shutdownHooks = new LinkedHashSet(); - /** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */ - private volatile int state; - private long lastCheckTimeNanos; - private long lastPurgeTimeNanos; - - protected SingleThreadEventLoop() { - this(Executors.defaultThreadFactory()); - } - - protected SingleThreadEventLoop(ThreadFactory threadFactory) { - thread = threadFactory.newThread(new Runnable() { - @Override - public void run() { - CURRENT_EVENT_LOOP.set(SingleThreadEventLoop.this); - try { - SingleThreadEventLoop.this.run(); - } finally { - synchronized (stateLock) { - state = 3; - } - try { - cancelScheduledTasks(); - runShutdownHooks(); - cleanup(); - } finally { - threadLock.release(); - assert taskQueue.isEmpty(); - } - } - } - }); + @Override + public EventLoop parent() { + return (EventLoop) super.parent(); } @Override @@ -112,475 +39,4 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl } return future; } - - protected void interruptThread() { - thread.interrupt(); - } - - protected Runnable pollTask() { - assert inEventLoop(); - - Runnable task = taskQueue.poll(); - if (task != null) { - return task; - } - - if (fetchScheduledTasks()) { - task = taskQueue.poll(); - return task; - } - - return null; - } - - protected Runnable takeTask() throws InterruptedException { - assert inEventLoop(); - - for (;;) { - Runnable task = taskQueue.poll(SCHEDULE_CHECK_INTERVAL * 2 / 3, TimeUnit.NANOSECONDS); - if (task != null) { - return task; - } - fetchScheduledTasks(); - task = taskQueue.poll(); - if (task != null) { - return task; - } - } - } - - protected Runnable peekTask() { - assert inEventLoop(); - - Runnable task = taskQueue.peek(); - if (task != null) { - return task; - } - - if (fetchScheduledTasks()) { - task = taskQueue.peek(); - return task; - } - - return null; - } - - protected boolean hasTasks() { - assert inEventLoop(); - - boolean empty = taskQueue.isEmpty(); - if (!empty) { - return true; - } - - if (fetchScheduledTasks()) { - return !taskQueue.isEmpty(); - } - - return false; - } - - protected void addTask(Runnable task) { - if (task == null) { - throw new NullPointerException("task"); - } - if (isShutdown()) { - reject(); - } - taskQueue.add(task); - } - - protected boolean removeTask(Runnable task) { - if (task == null) { - throw new NullPointerException("task"); - } - return taskQueue.remove(task); - } - - protected void runAllTasks() { - for (;;) { - final Runnable task = pollTask(); - if (task == null) { - break; - } - - task.run(); - } - } - - protected abstract void run(); - - protected void cleanup() { - // Do nothing. Subclasses will override. - } - - protected abstract void wakeup(boolean inEventLoop); - - @Override - public boolean inEventLoop() { - return Thread.currentThread() == thread; - } - - public void addShutdownHook(final Runnable task) { - if (inEventLoop()) { - shutdownHooks.add(task); - } else { - execute(new Runnable() { - @Override - public void run() { - shutdownHooks.add(task); - } - }); - } - } - - public void removeShutdownHook(final Runnable task) { - if (inEventLoop()) { - shutdownHooks.remove(task); - } else { - execute(new Runnable() { - @Override - public void run() { - shutdownHooks.remove(task); - } - }); - } - } - - private void runShutdownHooks() { - // Note shutdown hooks can add / remove shutdown hooks. - while (!shutdownHooks.isEmpty()) { - List copy = new ArrayList(shutdownHooks); - shutdownHooks.clear(); - for (Runnable task: copy) { - try { - task.run(); - } catch (Throwable t) { - logger.warn("Shutdown hook raised an exception.", t); - } - } - } - } - - @Override - public void shutdown() { - boolean inEventLoop = inEventLoop(); - boolean wakeup = false; - if (inEventLoop) { - synchronized (stateLock) { - assert state == 1; - state = 2; - wakeup = true; - } - } else { - synchronized (stateLock) { - switch (state) { - case 0: - state = 3; - try { - cleanup(); - } finally { - threadLock.release(); - } - break; - case 1: - state = 2; - wakeup = true; - break; - } - } - } - - if (wakeup) { - wakeup(inEventLoop); - } - } - - @Override - public List shutdownNow() { - shutdown(); - return Collections.emptyList(); - } - - @Override - public boolean isShutdown() { - return state >= 2; - } - - @Override - public boolean isTerminated() { - return state == 3; - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - if (unit == null) { - throw new NullPointerException("unit"); - } - - if (inEventLoop()) { - throw new IllegalStateException("cannot await termination of the current thread"); - } - - if (threadLock.tryAcquire(timeout, unit)) { - threadLock.release(); - } - - return isTerminated(); - } - - @Override - public void execute(Runnable task) { - if (task == null) { - throw new NullPointerException("task"); - } - - if (inEventLoop()) { - addTask(task); - wakeup(true); - } else { - synchronized (stateLock) { - if (state == 0) { - state = 1; - thread.start(); - } - } - addTask(task); - if (isShutdown() && removeTask(task)) { - reject(); - } - wakeup(false); - } - } - - private static void reject() { - throw new RejectedExecutionException("event loop shut down"); - } - - @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - if (command == null) { - throw new NullPointerException("command"); - } - if (unit == null) { - throw new NullPointerException("unit"); - } - if (delay < 0) { - throw new IllegalArgumentException( - String.format("delay: %d (expected: >= 0)", delay)); - } - return schedule(new ScheduledFutureTask(command, null, deadlineNanos(unit.toNanos(delay)))); - } - - @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - if (callable == null) { - throw new NullPointerException("callable"); - } - if (unit == null) { - throw new NullPointerException("unit"); - } - if (delay < 0) { - throw new IllegalArgumentException( - String.format("delay: %d (expected: >= 0)", delay)); - } - return schedule(new ScheduledFutureTask(callable, deadlineNanos(unit.toNanos(delay)))); - } - - @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - if (command == null) { - throw new NullPointerException("command"); - } - if (unit == null) { - throw new NullPointerException("unit"); - } - if (initialDelay < 0) { - throw new IllegalArgumentException( - String.format("initialDelay: %d (expected: >= 0)", initialDelay)); - } - if (period <= 0) { - throw new IllegalArgumentException( - String.format("period: %d (expected: > 0)", period)); - } - - return schedule(new ScheduledFutureTask( - command, null, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period))); - } - - @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - if (command == null) { - throw new NullPointerException("command"); - } - if (unit == null) { - throw new NullPointerException("unit"); - } - if (initialDelay < 0) { - throw new IllegalArgumentException( - String.format("initialDelay: %d (expected: >= 0)", initialDelay)); - } - if (delay <= 0) { - throw new IllegalArgumentException( - String.format("delay: %d (expected: > 0)", delay)); - } - - return schedule(new ScheduledFutureTask( - command, null, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay))); - } - - private ScheduledFuture schedule(ScheduledFutureTask task) { - if (isShutdown()) { - reject(); - } - scheduledTasks.add(task); - if (isShutdown()) { - task.cancel(false); - } - - if (!inEventLoop()) { - synchronized (stateLock) { - if (state == 0) { - state = 1; - thread.start(); - } - } - } else { - fetchScheduledTasks(); - } - - return task; - } - - private boolean fetchScheduledTasks() { - if (scheduledTasks.isEmpty()) { - return false; - } - - long nanoTime = nanoTime(); - if (nanoTime - lastPurgeTimeNanos >= SCHEDULE_PURGE_INTERVAL) { - for (Iterator> i = scheduledTasks.iterator(); i.hasNext();) { - ScheduledFutureTask task = i.next(); - if (task.isCancelled()) { - i.remove(); - } - } - } - - if (nanoTime - lastCheckTimeNanos >= SCHEDULE_CHECK_INTERVAL) { - boolean added = false; - for (;;) { - ScheduledFutureTask task = scheduledTasks.poll(); - if (task == null) { - break; - } - - if (!task.isCancelled()) { - if (isShutdown()) { - task.cancel(false); - } else { - taskQueue.add(task); - added = true; - } - } - } - return added; - } - - return false; - } - - private void cancelScheduledTasks() { - if (scheduledTasks.isEmpty()) { - return; - } - - for (ScheduledFutureTask task: scheduledTasks.toArray(new ScheduledFutureTask[scheduledTasks.size()])) { - task.cancel(false); - } - scheduledTasks.clear(); - } - - private class ScheduledFutureTask extends FutureTask implements ScheduledFuture { - - private final long id = nextTaskId.getAndIncrement(); - private long deadlineNanos; - /** 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */ - private final long periodNanos; - - ScheduledFutureTask(Runnable runnable, V result, long nanoTime) { - super(runnable, result); - this.deadlineNanos = nanoTime; - this.periodNanos = 0; - } - - ScheduledFutureTask(Runnable runnable, V result, long nanoTime, long period) { - super(runnable, result); - if (period == 0) { - throw new IllegalArgumentException( - String.format("period: %d (expected: != 0)", period)); - } - this.deadlineNanos = nanoTime; - this.periodNanos = period; - } - - ScheduledFutureTask(Callable callable, long nanoTime) { - super(callable); - this.deadlineNanos = nanoTime; - this.periodNanos = 0; - } - - public long deadlineNanos() { - return deadlineNanos; - } - - public long delayNanos() { - return Math.max(0, deadlineNanos() - nanoTime()); - } - - @Override - public long getDelay(TimeUnit unit) { - return unit.convert(delayNanos(), TimeUnit.NANOSECONDS); - } - - @Override - public int compareTo(Delayed o) { - if (this == o) { - return 0; - } - - ScheduledFutureTask that = (ScheduledFutureTask) o; - long d = deadlineNanos() - that.deadlineNanos(); - if (d < 0) { - return -1; - } else if (d > 0) { - return 1; - } else if (id < that.id) { - return -1; - } else if (id == that.id) { - throw new Error(); - } else { - return 1; - } - } - - @Override - public void run() { - if (periodNanos == 0) { - super.run(); - } else { - boolean reset = runAndReset(); - if (reset && !isShutdown()) { - long p = periodNanos; - if (p > 0) { - deadlineNanos += p; - } else { - deadlineNanos = nanoTime() - p; - } - - schedule(this); - } - } - } - } } diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index 8b47bee3d0..0880a38ce0 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -186,7 +186,7 @@ public class LocalChannel extends AbstractChannel { assert peer != null; Queue in = buf.messageBuffer(); - Queue out = peer.pipeline().inbound().messageBuffer(); + Queue out = peer.pipeline().inboundMessageBuffer(); for (;;) { Object msg = in.poll(); if (msg == null) { diff --git a/transport/src/main/java/io/netty/channel/local/LocalChildEventLoop.java b/transport/src/main/java/io/netty/channel/local/LocalChildEventLoop.java index 83f165aa57..c08a69fcf2 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChildEventLoop.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChildEventLoop.java @@ -15,14 +15,15 @@ */ package io.netty.channel.local; +import io.netty.channel.EventLoop; import io.netty.channel.SingleThreadEventLoop; import java.util.concurrent.ThreadFactory; final class LocalChildEventLoop extends SingleThreadEventLoop { - LocalChildEventLoop(ThreadFactory threadFactory) { - super(threadFactory); + LocalChildEventLoop(EventLoop parent, ThreadFactory threadFactory) { + super(parent, threadFactory); } @Override diff --git a/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java b/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java index eafa0f0051..fa6952f29a 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java +++ b/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java @@ -1,26 +1,24 @@ package io.netty.channel.local; -import io.netty.channel.EventLoopFactory; +import io.netty.channel.EventExecutor; import io.netty.channel.MultithreadEventLoop; import java.util.concurrent.ThreadFactory; public class LocalEventLoop extends MultithreadEventLoop { - public LocalEventLoop() { - this(DEFAULT_POOL_SIZE); - } + public LocalEventLoop() {} public LocalEventLoop(int nThreads) { - this(nThreads, DEFAULT_THREAD_FACTORY); + super(nThreads); } public LocalEventLoop(int nThreads, ThreadFactory threadFactory) { - super(new EventLoopFactory() { - @Override - public LocalChildEventLoop newEventLoop(ThreadFactory threadFactory) throws Exception { - return new LocalChildEventLoop(threadFactory); - } - }, nThreads, threadFactory); + super(nThreads, threadFactory); + } + + @Override + protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { + return new LocalChildEventLoop(this, threadFactory); } } diff --git a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java index 88a66eea9f..a6b0a60a5e 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java @@ -119,7 +119,7 @@ public class LocalServerChannel extends AbstractServerChannel { private void serve0(final LocalChannel child) { if (eventLoop().inEventLoop()) { - pipeline().inbound().messageBuffer().add(child); + pipeline().inboundMessageBuffer().add(child); pipeline().fireInboundBufferUpdated(); } else { eventLoop().execute(new Runnable() { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java index 8b74ec9257..25ac0ac299 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java @@ -27,11 +27,10 @@ abstract class AbstractNioMessageChannel extends AbstractNioChannel { assert eventLoop().inEventLoop(); final ChannelPipeline pipeline = pipeline(); - final ChannelBufferHolder buf = pipeline.inbound(); + final Queue msgBuf = pipeline.inboundMessageBuffer(); boolean closed = false; boolean read = false; try { - Queue msgBuf = buf.messageBuffer(); for (;;) { int localReadAmount = doReadMessages(msgBuf); if (localReadAmount > 0) { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioStreamChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioStreamChannel.java index 353cdeb6d5..c577438166 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioStreamChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioStreamChannel.java @@ -28,11 +28,10 @@ abstract class AbstractNioStreamChannel extends AbstractNioChannel { assert eventLoop().inEventLoop(); final ChannelPipeline pipeline = pipeline(); - final ChannelBufferHolder buf = pipeline.inbound(); + final ChannelBuffer byteBuf = pipeline.inboundByteBuffer(); boolean closed = false; boolean read = false; try { - ChannelBuffer byteBuf = buf.byteBuffer(); expandReadBuffer(byteBuf); for (;;) { int localReadAmount = doReadBytes(byteBuf); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioChildEventLoop.java b/transport/src/main/java/io/netty/channel/socket/nio/NioChildEventLoop.java index 039979f28b..06950d74fc 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioChildEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioChildEventLoop.java @@ -60,8 +60,8 @@ final class NioChildEventLoop extends SingleThreadEventLoop { private int cancelledKeys; private boolean cleanedCancelledKeys; - NioChildEventLoop(ThreadFactory threadFactory, SelectorProvider selectorProvider) { - super(threadFactory); + NioChildEventLoop(NioEventLoop parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) { + super(parent, threadFactory); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoop.java index 925a2e58bf..d5f9f9fd3b 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoop.java @@ -1,6 +1,6 @@ package io.netty.channel.socket.nio; -import io.netty.channel.EventLoopFactory; +import io.netty.channel.EventExecutor; import io.netty.channel.MultithreadEventLoop; import java.nio.channels.spi.SelectorProvider; @@ -8,25 +8,28 @@ import java.util.concurrent.ThreadFactory; public class NioEventLoop extends MultithreadEventLoop { - public NioEventLoop() { - this(DEFAULT_POOL_SIZE); - } + public NioEventLoop() {} public NioEventLoop(int nThreads) { - this(nThreads, DEFAULT_THREAD_FACTORY); + super(nThreads); } public NioEventLoop(int nThreads, ThreadFactory threadFactory) { - this(nThreads, threadFactory, SelectorProvider.provider()); + super(nThreads, threadFactory); } public NioEventLoop(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) { - super(new EventLoopFactory() { - @Override - public NioChildEventLoop newEventLoop(ThreadFactory threadFactory) throws Exception { - return new NioChildEventLoop(threadFactory, selectorProvider); - } + super(nThreads, threadFactory, selectorProvider); + } - }, nThreads, threadFactory); + @Override + protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { + SelectorProvider selectorProvider; + if (args == null || args.length == 0 || args[0] == null) { + selectorProvider = SelectorProvider.provider(); + } else { + selectorProvider = (SelectorProvider) args[0]; + } + return new NioChildEventLoop(this, threadFactory, selectorProvider); } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java index 13763c9102..6ce7d41fc9 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java @@ -25,11 +25,10 @@ abstract class AbstractOioMessageChannel extends AbstractOioChannel { assert eventLoop().inEventLoop(); final ChannelPipeline pipeline = pipeline(); - final ChannelBufferHolder buf = pipeline.inbound(); + final Queue msgBuf = pipeline.inboundMessageBuffer(); boolean closed = false; boolean read = false; try { - Queue msgBuf = buf.messageBuffer(); int localReadAmount = doReadMessages(msgBuf); if (localReadAmount > 0) { read = true; diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java index 4f151508da..411f087ab3 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java @@ -25,11 +25,10 @@ abstract class AbstractOioStreamChannel extends AbstractOioChannel { assert eventLoop().inEventLoop(); final ChannelPipeline pipeline = pipeline(); - final ChannelBufferHolder buf = pipeline.inbound(); + final ChannelBuffer byteBuf = pipeline.inboundByteBuffer(); boolean closed = false; boolean read = false; try { - ChannelBuffer byteBuf = buf.byteBuffer(); expandReadBuffer(byteBuf); int localReadAmount = doReadBytes(byteBuf); if (localReadAmount > 0) { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioChildEventLoop.java b/transport/src/main/java/io/netty/channel/socket/oio/OioChildEventLoop.java index c3e3c07a00..37e3ffa07f 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioChildEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioChildEventLoop.java @@ -8,12 +8,10 @@ import io.netty.channel.SingleThreadEventLoop; class OioChildEventLoop extends SingleThreadEventLoop { - private final OioEventLoop parent; private AbstractOioChannel ch; OioChildEventLoop(OioEventLoop parent) { - super(parent.threadFactory); - this.parent = parent; + super(parent, parent.threadFactory); } @Override @@ -71,6 +69,7 @@ class OioChildEventLoop extends SingleThreadEventLoop { private void deregister() { ch = null; + OioEventLoop parent = (OioEventLoop) parent(); parent.activeChildren.remove(this); parent.idleChildren.add(this); } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java b/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java index 75a46ac2b1..edd4bffcd7 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java @@ -4,8 +4,9 @@ package io.netty.channel.socket.oio; import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; +import io.netty.channel.EventExecutor; import io.netty.channel.EventLoop; -import io.netty.channel.SingleThreadEventLoop; +import io.netty.channel.SingleThreadEventExecutor; import io.netty.util.internal.QueueFactory; import java.util.Collection; @@ -31,6 +32,12 @@ public class OioEventLoop implements EventLoop { new ConcurrentHashMap()); final Queue idleChildren = QueueFactory.createQueue(); private final ChannelException tooManyChannels; + private final Unsafe unsafe = new Unsafe() { + @Override + public EventExecutor nextChild() { + throw new UnsupportedOperationException(); + } + }; public OioEventLoop() { this(0); @@ -56,6 +63,16 @@ public class OioEventLoop implements EventLoop { tooManyChannels.setStackTrace(new StackTraceElement[0]); } + @Override + public EventLoop parent() { + return null; + } + + @Override + public Unsafe unsafe() { + return unsafe; + } + @Override public void shutdown() { for (EventLoop l: activeChildren) { @@ -209,7 +226,7 @@ public class OioEventLoop implements EventLoop { throw new NullPointerException("channel"); } try { - return nextEventLoop().register(channel); + return nextChild().register(channel); } catch (Throwable t) { return channel.newFailedFuture(t); } @@ -221,7 +238,7 @@ public class OioEventLoop implements EventLoop { throw new NullPointerException("channel"); } try { - return nextEventLoop().register(channel, future); + return nextChild().register(channel, future); } catch (Throwable t) { return channel.newFailedFuture(t); } @@ -229,16 +246,16 @@ public class OioEventLoop implements EventLoop { @Override public boolean inEventLoop() { - return SingleThreadEventLoop.currentEventLoop() != null; + return SingleThreadEventExecutor.currentEventLoop() != null; } - private EventLoop nextEventLoop() { + private EventLoop nextChild() { OioChildEventLoop loop = idleChildren.poll(); if (loop == null) { if (maxChannels > 0 && activeChildren.size() >= maxChannels) { throw tooManyChannels; } - loop = new OioChildEventLoop(this); + loop = new OioChildEventLoop(OioEventLoop.this); } activeChildren.add(loop); return loop; @@ -246,7 +263,7 @@ public class OioEventLoop implements EventLoop { private static OioChildEventLoop currentEventLoop() { OioChildEventLoop loop = - (OioChildEventLoop) SingleThreadEventLoop.currentEventLoop(); + (OioChildEventLoop) SingleThreadEventExecutor.currentEventLoop(); if (loop == null) { throw new IllegalStateException("not called from an event loop thread"); } diff --git a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java index a8a27b2bbb..90b4161c2d 100644 --- a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java @@ -246,6 +246,10 @@ public class SingleThreadEventLoopTest { final AtomicInteger cleanedUp = new AtomicInteger(); + SingleThreadEventLoopImpl() { + super(null); + } + @Override protected void run() { for (;;) { diff --git a/transport/src/test/java/io/netty/channel/local/LocalChannelRegistryTest.java b/transport/src/test/java/io/netty/channel/local/LocalChannelRegistryTest.java index f5d149b7fc..af257a6ffd 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalChannelRegistryTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalChannelRegistryTest.java @@ -70,7 +70,7 @@ public class LocalChannelRegistryTest { Channel cc = cb.connect().sync().channel(); // Send a message event up the pipeline. - cc.pipeline().inbound().messageBuffer().add("Hello, World"); + cc.pipeline().inboundMessageBuffer().add("Hello, World"); cc.pipeline().fireInboundBufferUpdated(); // Close the channel diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java new file mode 100644 index 0000000000..93367b88db --- /dev/null +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java @@ -0,0 +1,192 @@ +package io.netty.channel.local; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelBufferHolders; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelInboundHandlerContext; +import io.netty.channel.ChannelInboundMessageHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOutboundHandlerContext; +import io.netty.channel.DefaultEventExecutor; +import io.netty.channel.EventExecutor; +import io.netty.channel.EventLoop; +import io.netty.util.internal.QueueFactory; + +import java.util.HashSet; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class LocalTransportThreadModelTest { + + private static ServerBootstrap sb; + private static LocalAddress ADDR; + + @BeforeClass + public static void init() { + // Configure a test server + sb = new ServerBootstrap(); + sb.eventLoop(new LocalEventLoop(), new LocalEventLoop()) + .channel(new LocalServerChannel()) + .localAddress(LocalAddress.ANY) + .childInitializer(new ChannelInitializer() { + @Override + public void initChannel(LocalChannel ch) throws Exception { + ch.pipeline().addLast(new ChannelInboundMessageHandlerAdapter() { + @Override + public void messageReceived(ChannelInboundHandlerContext ctx, Object msg) { + // Discard + } + }); + } + }); + + ADDR = (LocalAddress) sb.bind().syncUninterruptibly().channel().localAddress(); + } + + @AfterClass + public static void destroy() { + sb.shutdown(); + } + + @Test + public void testSimple() throws Exception { + EventLoop l = new LocalEventLoop(4, new PrefixThreadFactory("l")); + EventExecutor e1 = new DefaultEventExecutor(4, new PrefixThreadFactory("e1")); + EventExecutor e2 = new DefaultEventExecutor(4, new PrefixThreadFactory("e2")); + TestHandler h1 = new TestHandler(); + TestHandler h2 = new TestHandler(); + TestHandler h3 = new TestHandler(); + + Channel ch = new LocalChannel(); + ch.pipeline().addLast(h1); + ch.pipeline().addLast(e1, h2); + ch.pipeline().addLast(e2, h3); + + l.register(ch).sync(); + + ch.connect(ADDR).sync(); + + ch.pipeline().fireInboundBufferUpdated(); + ch.pipeline().context(h1).fireInboundBufferUpdated(); + ch.pipeline().context(h2).fireInboundBufferUpdated(); + ch.pipeline().context(h3).fireInboundBufferUpdated(); + ch.pipeline().flush(); + ch.pipeline().context(h3).flush(); + ch.pipeline().context(h2).flush(); + ch.pipeline().context(h1).flush().sync(); + + String currentName = Thread.currentThread().getName(); + + // Events should never be handled from the current thread. + Assert.assertFalse(h1.inboundThreadNames.contains(currentName)); + Assert.assertFalse(h2.inboundThreadNames.contains(currentName)); + Assert.assertFalse(h3.inboundThreadNames.contains(currentName)); + Assert.assertFalse(h1.outboundThreadNames.contains(currentName)); + Assert.assertFalse(h2.outboundThreadNames.contains(currentName)); + Assert.assertFalse(h3.outboundThreadNames.contains(currentName)); + + // Assert that events were handled by the correct executor. + for (String name: h1.inboundThreadNames) { + Assert.assertTrue(name.startsWith("l-")); + } + for (String name: h2.inboundThreadNames) { + Assert.assertTrue(name.startsWith("e1-")); + } + for (String name: h3.inboundThreadNames) { + Assert.assertTrue(name.startsWith("e2-")); + } + for (String name: h1.outboundThreadNames) { + Assert.assertTrue(name.startsWith("l-")); + } + for (String name: h2.outboundThreadNames) { + Assert.assertTrue(name.startsWith("e1-")); + } + for (String name: h3.outboundThreadNames) { + Assert.assertTrue(name.startsWith("e2-")); + } + + // Assert that the events for the same handler were handled by the same thread. + Set names = new HashSet(); + names.addAll(h1.inboundThreadNames); + names.addAll(h1.outboundThreadNames); + Assert.assertEquals(1, names.size()); + + names.clear(); + names.addAll(h2.inboundThreadNames); + names.addAll(h2.outboundThreadNames); + Assert.assertEquals(1, names.size()); + + names.clear(); + names.addAll(h3.inboundThreadNames); + names.addAll(h3.outboundThreadNames); + Assert.assertEquals(1, names.size()); + + // Count the number of events + Assert.assertEquals(1, h1.inboundThreadNames.size()); + Assert.assertEquals(2, h2.inboundThreadNames.size()); + Assert.assertEquals(3, h3.inboundThreadNames.size()); + Assert.assertEquals(3, h1.outboundThreadNames.size()); + Assert.assertEquals(2, h2.outboundThreadNames.size()); + Assert.assertEquals(1, h3.outboundThreadNames.size()); + } + + private static class TestHandler extends ChannelHandlerAdapter { + + private final Queue inboundThreadNames = QueueFactory.createQueue(); + private final Queue outboundThreadNames = QueueFactory.createQueue(); + + @Override + public ChannelBufferHolder newInboundBuffer( + ChannelInboundHandlerContext ctx) throws Exception { + return ChannelBufferHolders.messageBuffer(); + } + + @Override + public ChannelBufferHolder newOutboundBuffer( + ChannelOutboundHandlerContext ctx) throws Exception { + return ChannelBufferHolders.messageBuffer(); + } + + @Override + public void inboundBufferUpdated( + ChannelInboundHandlerContext ctx) throws Exception { + ctx.inbound().messageBuffer().clear(); + inboundThreadNames.add(Thread.currentThread().getName()); + ctx.fireInboundBufferUpdated(); + } + + @Override + public void flush(ChannelOutboundHandlerContext ctx, + ChannelFuture future) throws Exception { + ctx.outbound().messageBuffer().clear(); + outboundThreadNames.add(Thread.currentThread().getName()); + ctx.flush(future); + } + } + + private static class PrefixThreadFactory implements ThreadFactory { + private final String prefix; + private final AtomicInteger id = new AtomicInteger(); + + public PrefixThreadFactory(String prefix) { + this.prefix = prefix; + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName(prefix + '-' + id.incrementAndGet()); + return t; + } + } +}