From 132af3a485015ff912bd567a47881814d2ce1828 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Wed, 6 Nov 2013 21:14:07 +0900 Subject: [PATCH] Introduce ChannelHandlerInvoker, dedeciated for invoking event handler methods, and move most handler invocation code in ChannelHandlerContext to the default ChannelHandlerInvoker implementation - Fixes #1912 - Add ChannelHandlerInvoker and its default implementation - Add pipeline manipulation methods that accept ChannelHandlerInvoker - Rename Channel(Inbound|Outbound)Invoker to Channel(Inbound|Outbound)Ops to avoid confusion - Remove the Javadoc references to the package-private interfaces --- .../codec/spdy/SpdyFrameDecoderTest.java | 2 +- .../concurrent/AbstractEventExecutor.java | 57 +- .../AbstractEventExecutorGroup.java | 5 +- .../util/concurrent/DefaultEventExecutor.java | 25 +- .../netty/util/concurrent/EventExecutor.java | 31 + .../util/concurrent/EventExecutorGroup.java | 19 +- .../util/concurrent/GlobalEventExecutor.java | 5 - .../concurrent/ImmediateEventExecutor.java | 8 +- .../MultithreadEventExecutorGroup.java | 26 +- .../concurrent/SingleThreadEventExecutor.java | 11 +- .../io/netty/example/localecho/LocalEcho.java | 4 +- .../socksproxy/SocksServerConnectHandler.java | 34 +- .../io/netty/example/uptime/UptimeClient.java | 10 +- .../transport/socket/SocketEchoTest.java | 8 +- .../transport/socket/SocketStartTlsTest.java | 10 +- .../channel/sctp/nio/NioSctpChannel.java | 2 - .../udt/nio/NioUdtAcceptorChannel.java | 1 - .../udt/nio/NioUdtByteRendezvousChannel.java | 2 +- .../nio/NioUdtMessageRendezvousChannel.java | 2 - .../netty/channel/udt/nio/NioUdtProvider.java | 2 +- .../nio/NioUdtByteAcceptorChannelTest.java | 2 +- .../nio/NioUdtByteConnectorChannelTest.java | 4 +- .../nio/NioUdtMessageAcceptorChannelTest.java | 2 +- .../NioUdtMessageConnectorChannelTest.java | 2 +- .../test/udt/nio/NioUdtProviderTest.java | 1 - .../io/netty/bootstrap/AbstractBootstrap.java | 6 +- .../io/netty/bootstrap/ServerBootstrap.java | 5 +- .../io/netty/channel/AbstractChannel.java | 8 +- .../io/netty/channel/AbstractEventLoop.java | 39 ++ .../netty/channel/AbstractEventLoopGroup.java | 27 + .../netty/channel/AbstractServerChannel.java | 2 +- .../main/java/io/netty/channel/Channel.java | 13 +- .../java/io/netty/channel/ChannelHandler.java | 31 +- .../netty/channel/ChannelHandlerContext.java | 14 +- .../netty/channel/ChannelHandlerInvoker.java | 149 +++++ .../channel/ChannelHandlerInvokerUtil.java | 207 ++++++ ...undInvoker.java => ChannelInboundOps.java} | 18 +- ...ndInvoker.java => ChannelOutboundOps.java} | 6 +- .../io/netty/channel/ChannelPipeline.java | 84 ++- .../netty/channel/ChannelPropertyAccess.java | 8 +- .../channel/DefaultChannelHandlerContext.java | 622 ++---------------- .../channel/DefaultChannelHandlerInvoker.java | 459 +++++++++++++ .../netty/channel/DefaultChannelPipeline.java | 105 ++- ...alEventLoop.java => DefaultEventLoop.java} | 29 +- .../netty/channel/DefaultEventLoopGroup.java | 56 ++ .../main/java/io/netty/channel/EventLoop.java | 12 +- .../java/io/netty/channel/EventLoopGroup.java | 7 +- .../channel/MultithreadEventLoopGroup.java | 6 +- .../java/io/netty/channel/ServerChannel.java | 1 - .../netty/channel/SingleThreadEventLoop.java | 16 +- .../ThreadPerChannelEventLoopGroup.java | 34 +- .../channel/embedded/EmbeddedChannel.java | 2 +- .../channel/embedded/EmbeddedEventLoop.java | 108 ++- .../channel/local/LocalEventLoopGroup.java | 21 +- .../nio/AbstractNioMessageChannel.java | 7 +- .../nio/AbstractNioMessageServerChannel.java | 5 +- .../io/netty/channel/nio/NioEventLoop.java | 2 +- .../netty/channel/nio/NioEventLoopGroup.java | 4 +- .../channel/oio/AbstractOioByteChannel.java | 3 - .../netty/channel/oio/AbstractOioChannel.java | 3 - .../oio/AbstractOioMessageChannel.java | 1 - .../oio/AbstractOioMessageServerChannel.java | 1 - .../netty/channel/oio/OioEventLoopGroup.java | 17 +- .../socket/nio/NioServerSocketChannel.java | 2 +- .../io/netty/bootstrap/BootstrapTest.java | 10 +- .../io/netty/channel/BaseChannelTest.java | 19 +- .../channel/DefaultChannelPipelineTest.java | 4 +- .../channel/SingleThreadEventLoopTest.java | 11 +- .../netty/channel/local/LocalChannelTest.java | 9 +- .../local/LocalTransportThreadModelTest.java | 23 +- .../local/LocalTransportThreadModelTest2.java | 5 +- .../local/LocalTransportThreadModelTest3.java | 30 +- 72 files changed, 1598 insertions(+), 928 deletions(-) create mode 100644 transport/src/main/java/io/netty/channel/AbstractEventLoop.java create mode 100644 transport/src/main/java/io/netty/channel/AbstractEventLoopGroup.java create mode 100644 transport/src/main/java/io/netty/channel/ChannelHandlerInvoker.java create mode 100644 transport/src/main/java/io/netty/channel/ChannelHandlerInvokerUtil.java rename transport/src/main/java/io/netty/channel/{ChannelInboundInvoker.java => ChannelInboundOps.java} (86%) rename transport/src/main/java/io/netty/channel/{ChannelOutboundInvoker.java => ChannelOutboundOps.java} (98%) create mode 100644 transport/src/main/java/io/netty/channel/DefaultChannelHandlerInvoker.java rename transport/src/main/java/io/netty/channel/{local/LocalEventLoop.java => DefaultEventLoop.java} (56%) create mode 100644 transport/src/main/java/io/netty/channel/DefaultEventLoopGroup.java diff --git a/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdyFrameDecoderTest.java b/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdyFrameDecoderTest.java index 7c25570260..aec20912a5 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdyFrameDecoderTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdyFrameDecoderTest.java @@ -51,7 +51,7 @@ public class SpdyFrameDecoderTest { testTooLargeHeaderNameOnSynStreamRequest(SpdyVersion.SPDY_3_1); } - private void testTooLargeHeaderNameOnSynStreamRequest(final SpdyVersion version) throws Exception { + private static void testTooLargeHeaderNameOnSynStreamRequest(final SpdyVersion version) throws Exception { List headerSizes = Arrays.asList(90, 900); for (final int maxHeaderSize : headerSizes) { // 90 catches the header name, 900 the value SpdyHeadersFrame frame = new DefaultSpdySynStreamFrame(1, 0, (byte) 0); diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java index 142220579e..99a17d414b 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java @@ -16,9 +16,8 @@ package io.netty.util.concurrent; import java.util.Collections; -import java.util.Iterator; import java.util.List; -import java.util.NoSuchElementException; +import java.util.Set; import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.Callable; import java.util.concurrent.RunnableFuture; @@ -29,24 +28,43 @@ import java.util.concurrent.TimeUnit; */ public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor { + static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2; + static final long DEFAULT_SHUTDOWN_TIMEOUT = 15; + + private final EventExecutorGroup parent; + + protected AbstractEventExecutor() { + this(null); + } + + protected AbstractEventExecutor(EventExecutorGroup parent) { + this.parent = parent; + } + + @Override + public EventExecutorGroup parent() { + return parent; + } + @Override public EventExecutor next() { return this; } + @Override + @SuppressWarnings("unchecked") + public Set children() { + return Collections.singleton((E) this); + } + @Override public boolean inEventLoop() { return inEventLoop(Thread.currentThread()); } - @Override - public Iterator iterator() { - return new EventExecutorIterator(); - } - @Override public Future shutdownGracefully() { - return shutdownGracefully(2, 15, TimeUnit.SECONDS); + return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS); } /** @@ -131,27 +149,4 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { throw new UnsupportedOperationException(); } - - private final class EventExecutorIterator implements Iterator { - private boolean nextCalled; - - @Override - public boolean hasNext() { - return !nextCalled; - } - - @Override - public EventExecutor next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - nextCalled = true; - return AbstractEventExecutor.this; - } - - @Override - public void remove() { - throw new UnsupportedOperationException("read-only"); - } - } } diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutorGroup.java index 61789d6c93..546258a995 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutorGroup.java @@ -23,12 +23,13 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static io.netty.util.concurrent.AbstractEventExecutor.*; + /** * Abstract base class for {@link EventExecutorGroup} implementations. */ public abstract class AbstractEventExecutorGroup implements EventExecutorGroup { - @Override public Future submit(Runnable task) { return next().submit(task); @@ -66,7 +67,7 @@ public abstract class AbstractEventExecutorGroup implements EventExecutorGroup { @Override public Future shutdownGracefully() { - return shutdownGracefully(2, 15, TimeUnit.SECONDS); + return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS); } /** diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutor.java index 3591b24e76..d053951ff8 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutor.java @@ -16,15 +16,36 @@ package io.netty.util.concurrent; import java.util.concurrent.Executor; +import java.util.concurrent.ThreadFactory; /** * Default {@link SingleThreadEventExecutor} implementation which just execute all submitted task in a * serial fashion * */ -final class DefaultEventExecutor extends SingleThreadEventExecutor { +public final class DefaultEventExecutor extends SingleThreadEventExecutor { - DefaultEventExecutor(DefaultEventExecutorGroup parent, Executor executor) { + public DefaultEventExecutor() { + this((EventExecutorGroup) null); + } + + public DefaultEventExecutor(ThreadFactory threadFactory) { + this(null, threadFactory); + } + + public DefaultEventExecutor(Executor executor) { + this(null, executor); + } + + public DefaultEventExecutor(EventExecutorGroup parent) { + this(parent, new DefaultThreadFactory(DefaultEventExecutor.class)); + } + + public DefaultEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory) { + super(parent, threadFactory, true); + } + + public DefaultEventExecutor(EventExecutorGroup parent, Executor executor) { super(parent, executor, true); } diff --git a/common/src/main/java/io/netty/util/concurrent/EventExecutor.java b/common/src/main/java/io/netty/util/concurrent/EventExecutor.java index 5f5c3729f1..4f8f94a664 100644 --- a/common/src/main/java/io/netty/util/concurrent/EventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/EventExecutor.java @@ -15,6 +15,10 @@ */ package io.netty.util.concurrent; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + /** * The {@link EventExecutor} is a special {@link EventExecutorGroup} which comes * with some handy methods to see if a {@link Thread} is executed in a event loop. @@ -30,6 +34,12 @@ public interface EventExecutor extends EventExecutorGroup { @Override EventExecutor next(); + /** + * Returns an unmodifiable singleton set which contains itself. + */ + @Override + Set children(); + /** * Return the {@link EventExecutorGroup} which is the parent of this {@link EventExecutor}, */ @@ -69,4 +79,25 @@ public interface EventExecutor extends EventExecutorGroup { * every call of blocking methods will just return without blocking. */ Future newFailedFuture(Throwable cause); + + @Override + Future submit(Runnable task); + + @Override + Future submit(Runnable task, T result); + + @Override + Future submit(Callable task); + + @Override + ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit); + + @Override + ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit); + + @Override + ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); + + @Override + ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); } diff --git a/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java index 30bd90d883..d2ba63389a 100644 --- a/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java @@ -15,8 +15,8 @@ */ package io.netty.util.concurrent; -import java.util.Iterator; import java.util.List; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -27,11 +27,11 @@ import java.util.concurrent.TimeUnit; * to shut them down in a global fashion. * */ -public interface EventExecutorGroup extends ScheduledExecutorService, Iterable { +public interface EventExecutorGroup extends ScheduledExecutorService { /** - * Returns {@code true} if and only if this executor was started to be - * {@linkplain #shutdownGracefully() shut down gracefuclly} or was {@linkplain #isShutdown() shut down}. + * Returns {@code true} if and only if all {@link EventExecutor}s managed by this {@link EventExecutorGroup} + * are being {@linkplain #shutdownGracefully() shut down gracefuclly} or was {@linkplain #isShutdown() shut down}. */ boolean isShuttingDown(); @@ -59,7 +59,8 @@ public interface EventExecutorGroup extends ScheduledExecutorService, Iterable shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit); /** - * Returns the {@link Future} which is notified when this executor has been terminated. + * Returns the {@link Future} which is notified when all {@link EventExecutor}s managed by this + * {@link EventExecutorGroup} have been terminated. */ Future terminationFuture(); @@ -78,16 +79,14 @@ public interface EventExecutorGroup extends ScheduledExecutorService, Iterable shutdownNow(); /** - * Returns one of the {@link EventExecutor}s that belong to this group. + * Returns one of the {@link EventExecutor}s managed by this {@link EventExecutorGroup}. */ EventExecutor next(); /** - * Returns a read-only {@link Iterator} over all {@link EventExecutor}, which are handled by this - * {@link EventExecutorGroup} at the time of invoke this method. + * Returns the unmodifiable set of {@link EventExecutor}s managed by this {@link EventExecutorGroup}. */ - @Override - Iterator iterator(); + Set children(); @Override Future submit(Runnable task); diff --git a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java index ca00e52a09..2941d82e48 100644 --- a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java @@ -64,11 +64,6 @@ public final class GlobalEventExecutor extends AbstractEventExecutor { delayedTaskQueue.add(purgeTask); } - @Override - public EventExecutorGroup parent() { - return null; - } - /** * Take the next {@link Runnable} from the task queue and so will block if no task is currently present. * diff --git a/common/src/main/java/io/netty/util/concurrent/ImmediateEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/ImmediateEventExecutor.java index 0fae584403..db74c92391 100644 --- a/common/src/main/java/io/netty/util/concurrent/ImmediateEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/ImmediateEventExecutor.java @@ -21,18 +21,14 @@ import java.util.concurrent.TimeUnit; * {@link AbstractEventExecutor} which execute tasks in the callers thread. */ public final class ImmediateEventExecutor extends AbstractEventExecutor { + public static final ImmediateEventExecutor INSTANCE = new ImmediateEventExecutor(); private final Future terminationFuture = new FailedFuture( GlobalEventExecutor.INSTANCE, new UnsupportedOperationException()); private ImmediateEventExecutor() { - // use static instance - } - - @Override - public EventExecutorGroup parent() { - return null; + // Singleton } @Override diff --git a/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java index ff951cb520..9c30d3acbb 100644 --- a/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java @@ -16,8 +16,7 @@ package io.netty.util.concurrent; import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; @@ -31,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup { private final EventExecutor[] children; + private final Set readonlyChildren; private final AtomicInteger childIndex = new AtomicInteger(); private final AtomicInteger terminatedChildren = new AtomicInteger(); private final Promise terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE); @@ -62,7 +62,7 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } - children = new SingleThreadEventExecutor[nThreads]; + children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { @@ -104,6 +104,10 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } + + Set childrenSet = new LinkedHashSet(children.length); + Collections.addAll(childrenSet, children); + readonlyChildren = Collections.unmodifiableSet(childrenSet); } protected ThreadFactory newDefaultThreadFactory() { @@ -115,11 +119,6 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto return children[Math.abs(childIndex.getAndIncrement() % children.length)]; } - @Override - public Iterator iterator() { - return children().iterator(); - } - /** * Return the number of {@link EventExecutor} this implementation uses. This number is the maps * 1:1 to the threads it use. @@ -128,13 +127,10 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto return children.length; } - /** - * Return a safe-copy of all of the children of this group. - */ - protected Set children() { - Set children = Collections.newSetFromMap(new LinkedHashMap()); - Collections.addAll(children, this.children); - return children; + @Override + @SuppressWarnings("unchecked") + public final Set children() { + return (Set) readonlyChildren; } /** diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 33aa42c12a..aed8ad19b9 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -57,7 +57,6 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } }; - private final EventExecutorGroup parent; private final Queue taskQueue; final Queue> delayedTaskQueue = new PriorityQueue>(); @@ -98,14 +97,13 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { * @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the * executor thread */ - protected SingleThreadEventExecutor( - EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) { + protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) { + super(parent); if (executor == null) { throw new NullPointerException("executor"); } - this.parent = parent; this.addTaskWakesUp = addTaskWakesUp; this.executor = executor; @@ -122,11 +120,6 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { return new LinkedBlockingQueue(); } - @Override - public EventExecutorGroup parent() { - return parent; - } - /** * Interrupt the current running {@link Thread}. */ diff --git a/example/src/main/java/io/netty/example/localecho/LocalEcho.java b/example/src/main/java/io/netty/example/localecho/LocalEcho.java index cd707fcdc0..bc9aca12ff 100644 --- a/example/src/main/java/io/netty/example/localecho/LocalEcho.java +++ b/example/src/main/java/io/netty/example/localecho/LocalEcho.java @@ -20,10 +20,10 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; +import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.EventLoopGroup; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; -import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.local.LocalServerChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.handler.logging.LogLevel; @@ -44,7 +44,7 @@ public class LocalEcho { // Address to bind on / connect to. final LocalAddress addr = new LocalAddress(port); - EventLoopGroup serverGroup = new LocalEventLoopGroup(); + EventLoopGroup serverGroup = new DefaultEventLoopGroup(); EventLoopGroup clientGroup = new NioEventLoopGroup(); // NIO event loops are also OK try { // Note that we can use any event loop to ensure certain local channels diff --git a/example/src/main/java/io/netty/example/socksproxy/SocksServerConnectHandler.java b/example/src/main/java/io/netty/example/socksproxy/SocksServerConnectHandler.java index c993738818..801e0faadc 100644 --- a/example/src/main/java/io/netty/example/socksproxy/SocksServerConnectHandler.java +++ b/example/src/main/java/io/netty/example/socksproxy/SocksServerConnectHandler.java @@ -68,24 +68,24 @@ public final class SocksServerConnectHandler extends SimpleChannelInboundHandler final Channel inboundChannel = ctx.channel(); b.group(inboundChannel.eventLoop()) - .channel(NioSocketChannel.class) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) - .option(ChannelOption.SO_KEEPALIVE, true) - .handler(new DirectClientInitializer(promise)); - b.connect(request.host(), request.port()) - .addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - // Connection established use handler provided results - } else { - // Close the connection if the connection attempt has failed. - ctx.channel().writeAndFlush( - new SocksCmdResponse(SocksCmdStatus.FAILURE, request.addressType())); - SocksServerUtils.closeOnFlush(ctx.channel()); - } + .channel(NioSocketChannel.class) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) + .option(ChannelOption.SO_KEEPALIVE, true) + .handler(new DirectClientInitializer(promise)); + + b.connect(request.host(), request.port()).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + // Connection established use handler provided results + } else { + // Close the connection if the connection attempt has failed. + ctx.channel().writeAndFlush( + new SocksCmdResponse(SocksCmdStatus.FAILURE, request.addressType())); + SocksServerUtils.closeOnFlush(ctx.channel()); } - }); + } + }); } @Override diff --git a/example/src/main/java/io/netty/example/uptime/UptimeClient.java b/example/src/main/java/io/netty/example/uptime/UptimeClient.java index ffc943bdbe..97db6116d8 100644 --- a/example/src/main/java/io/netty/example/uptime/UptimeClient.java +++ b/example/src/main/java/io/netty/example/uptime/UptimeClient.java @@ -18,8 +18,8 @@ package io.netty.example.uptime; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; -import io.netty.channel.socket.SocketChannel; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.IdleStateHandler; @@ -62,10 +62,10 @@ public class UptimeClient { .channel(NioSocketChannel.class) .remoteAddress(host, port) .handler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast(new IdleStateHandler(READ_TIMEOUT, 0, 0), handler); - } + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(new IdleStateHandler(READ_TIMEOUT, 0, 0), handler); + } }); return b; diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java index 1136e36110..3c9fdeb0ed 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java @@ -23,10 +23,10 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; +import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.socket.SocketChannel; -import io.netty.util.concurrent.DefaultEventExecutorGroup; -import io.netty.util.concurrent.EventExecutorGroup; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -42,7 +42,7 @@ public class SocketEchoTest extends AbstractSocketTest { private static final Random random = new Random(); static final byte[] data = new byte[1048576]; - private static EventExecutorGroup group; + private static EventLoopGroup group; static { random.nextBytes(data); @@ -50,7 +50,7 @@ public class SocketEchoTest extends AbstractSocketTest { @BeforeClass public static void createGroup() { - group = new DefaultEventExecutorGroup(2); + group = new DefaultEventLoopGroup(2); } @AfterClass diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java index 1153814f0d..00b38c04e3 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java @@ -21,6 +21,8 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; +import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; @@ -30,8 +32,6 @@ import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslHandler; import io.netty.testsuite.util.BogusSslContextFactory; -import io.netty.util.concurrent.DefaultEventExecutorGroup; -import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Future; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -46,11 +46,11 @@ import static org.junit.Assert.*; public class SocketStartTlsTest extends AbstractSocketTest { private static final LogLevel LOG_LEVEL = LogLevel.TRACE; - private static EventExecutorGroup executor; + private static EventLoopGroup executor; @BeforeClass public static void createExecutor() { - executor = new DefaultEventExecutorGroup(2); + executor = new DefaultEventLoopGroup(2); } @AfterClass @@ -64,7 +64,7 @@ public class SocketStartTlsTest extends AbstractSocketTest { } public void testStartTls(ServerBootstrap sb, Bootstrap cb) throws Throwable { - final EventExecutorGroup executor = SocketStartTlsTest.executor; + final EventLoopGroup executor = SocketStartTlsTest.executor; final SSLEngine sse = BogusSslContextFactory.getServerContext().createSSLEngine(); final SSLEngine cse = BogusSslContextFactory.getClientContext().createSSLEngine(); diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java index f2ec013a01..e2046ed6db 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java @@ -19,7 +19,6 @@ import com.sun.nio.sctp.Association; import com.sun.nio.sctp.MessageInfo; import com.sun.nio.sctp.NotificationHandler; import com.sun.nio.sctp.SctpChannel; - import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; @@ -29,7 +28,6 @@ import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; -import io.netty.channel.EventLoopGroup; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.nio.AbstractNioMessageChannel; import io.netty.channel.sctp.DefaultSctpChannelConfig; diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java index ed1bdda61a..f6a3b9fd8f 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java @@ -17,7 +17,6 @@ package io.netty.channel.udt.nio; import com.barchart.udt.TypeUDT; import com.barchart.udt.nio.ServerSocketChannelUDT; - import io.netty.channel.ChannelException; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.EventLoop; diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteRendezvousChannel.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteRendezvousChannel.java index a127dc8bf7..fada22e228 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteRendezvousChannel.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteRendezvousChannel.java @@ -15,9 +15,9 @@ */ package io.netty.channel.udt.nio; -import io.netty.channel.EventLoop; import com.barchart.udt.TypeUDT; +import io.netty.channel.EventLoop; /** * Byte Channel Rendezvous for UDT Streams. diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageRendezvousChannel.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageRendezvousChannel.java index 4344d57477..4bcf5420bf 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageRendezvousChannel.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageRendezvousChannel.java @@ -16,7 +16,6 @@ package io.netty.channel.udt.nio; import com.barchart.udt.TypeUDT; - import io.netty.channel.EventLoop; import io.netty.channel.udt.UdtMessage; @@ -30,5 +29,4 @@ public class NioUdtMessageRendezvousChannel extends NioUdtMessageConnectorChanne public NioUdtMessageRendezvousChannel(EventLoop eventLoop) { super(eventLoop, NioUdtProvider.newRendezvousChannelUDT(TypeUDT.DATAGRAM)); } - } diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtProvider.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtProvider.java index 104af23081..f49b32fe27 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtProvider.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtProvider.java @@ -241,8 +241,8 @@ public abstract class NioUdtProvider { super(type, kind); } - @SuppressWarnings("unchecked") @Override + @SuppressWarnings("unchecked") public T newChannel(EventLoop eventLoop, EventLoopGroup childGroup) { switch (kind()) { case ACCEPTOR: diff --git a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteAcceptorChannelTest.java b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteAcceptorChannelTest.java index 2b94f5a637..9b399661fc 100644 --- a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteAcceptorChannelTest.java +++ b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteAcceptorChannelTest.java @@ -31,6 +31,6 @@ public class NioUdtByteAcceptorChannelTest extends AbstractUdtTest { @Test public void metadata() throws Exception { EventLoop loop = new NioEventLoopGroup().next(); - assertEquals(false, new NioUdtByteAcceptorChannel(loop, new NioEventLoopGroup()).metadata().hasDisconnect()); + assertFalse(new NioUdtByteAcceptorChannel(loop, new NioEventLoopGroup()).metadata().hasDisconnect()); } } diff --git a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteConnectorChannelTest.java b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteConnectorChannelTest.java index 053274e4c4..f87d9feece 100644 --- a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteConnectorChannelTest.java +++ b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteConnectorChannelTest.java @@ -21,7 +21,7 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.udt.nio.NioUdtByteConnectorChannel; import org.junit.Test; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; public class NioUdtByteConnectorChannelTest extends AbstractUdtTest { @@ -31,6 +31,6 @@ public class NioUdtByteConnectorChannelTest extends AbstractUdtTest { @Test public void metadata() throws Exception { EventLoop loop = new NioEventLoopGroup().next(); - assertEquals(false, new NioUdtByteConnectorChannel(loop).metadata().hasDisconnect()); + assertFalse(new NioUdtByteConnectorChannel(loop).metadata().hasDisconnect()); } } diff --git a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageAcceptorChannelTest.java b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageAcceptorChannelTest.java index 0a043a80e6..6216951bff 100644 --- a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageAcceptorChannelTest.java +++ b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageAcceptorChannelTest.java @@ -31,6 +31,6 @@ public class NioUdtMessageAcceptorChannelTest extends AbstractUdtTest { @Test public void metadata() throws Exception { EventLoop loop = new NioEventLoopGroup().next(); - assertEquals(false, new NioUdtMessageAcceptorChannel(loop, new NioEventLoopGroup()).metadata().hasDisconnect()); + assertFalse(new NioUdtMessageAcceptorChannel(loop, new NioEventLoopGroup()).metadata().hasDisconnect()); } } diff --git a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageConnectorChannelTest.java b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageConnectorChannelTest.java index a9b8dbdaf4..9a6bd68ab8 100644 --- a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageConnectorChannelTest.java +++ b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageConnectorChannelTest.java @@ -31,6 +31,6 @@ public class NioUdtMessageConnectorChannelTest extends AbstractUdtTest { @Test public void metadata() throws Exception { EventLoop loop = new NioEventLoopGroup().next(); - assertEquals(false, new NioUdtMessageConnectorChannel(loop).metadata().hasDisconnect()); + assertFalse(new NioUdtMessageConnectorChannel(loop).metadata().hasDisconnect()); } } diff --git a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtProviderTest.java b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtProviderTest.java index 68073f1b18..18b407e324 100644 --- a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtProviderTest.java +++ b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtProviderTest.java @@ -21,7 +21,6 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.udt.UdtServerChannel; import io.netty.channel.udt.nio.NioUdtProvider; - import org.junit.Test; import static org.junit.Assert.*; diff --git a/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java b/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java index 313f556f98..f1615afe82 100644 --- a/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java @@ -22,9 +22,9 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPromise; -import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.util.AttributeKey; +import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.internal.StringUtil; import java.net.InetAddress; @@ -169,14 +169,14 @@ public abstract class AbstractBootstrap, C ext /** * Returns a deep clone of this bootstrap which has the identical configuration. This method is useful when making * multiple {@link Channel}s with similar settings. Please note that this method does not clone the - * {@link EventLoopGroup} deeply but shallowly, making the group a shared resource. + * {@link EventExecutorGroup} deeply but shallowly, making the group a shared resource. */ @Override @SuppressWarnings("CloneDoesntDeclareCloneNotSupportedException") public abstract B clone(); /** - * Create a new {@link Channel} and register it with an {@link EventLoop}. + * Create a new {@link Channel} and register it with an {@link EventExecutorGroup}. */ public ChannelFuture register() { validate(); diff --git a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java index d12d558953..38b6be687f 100644 --- a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java @@ -29,6 +29,7 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; import io.netty.channel.socket.SocketChannel; import io.netty.util.AttributeKey; +import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -118,8 +119,8 @@ public final class ServerBootstrap extends AbstractBootstrap { +public interface Channel extends AttributeMap, ChannelOutboundOps, ChannelPropertyAccess, Comparable { /** * Returns the globally unique identifier of this {@link Channel}. @@ -168,14 +168,21 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr * are only provided to implement the actual transport, and must be invoked from an I/O thread except for the * following methods: *
    + *
  • {@link #invoker()}
  • *
  • {@link #localAddress()}
  • *
  • {@link #remoteAddress()}
  • *
  • {@link #closeForcibly()}
  • - *
  • {@link #register(EventLoop, ChannelPromise)}
  • + *
  • {@link #register(ChannelPromise)}
  • *
  • {@link #voidPromise()}
  • *
*/ interface Unsafe { + + /** + * Returns the {@link ChannelHandlerInvoker} which is used by default unless specified by a user. + */ + ChannelHandlerInvoker invoker(); + /** * Return the {@link SocketAddress} to which is bound local or * {@code null} if none. @@ -189,7 +196,7 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr SocketAddress remoteAddress(); /** - * Register the {@link Channel} of the {@link ChannelPromise} with the {@link EventLoop} and notify + * Register the {@link Channel} of the {@link ChannelPromise} and notify * the {@link ChannelFuture} once the registration was complete. */ void register(ChannelPromise promise); diff --git a/transport/src/main/java/io/netty/channel/ChannelHandler.java b/transport/src/main/java/io/netty/channel/ChannelHandler.java index 2384292037..8fef023216 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandler.java @@ -26,25 +26,28 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** - * Handles or intercepts a {@link ChannelInboundInvoker} or {@link ChannelOutboundInvoker} operation, and forwards it - * to the next handler in a {@link ChannelPipeline}. + * Handles an I/O event or intercepts an I/O operation, and forwards it to its next handler in + * its {@link ChannelPipeline}. * *

Sub-types

*

- * {@link ChannelHandler} itself does not provide many methods. To handle a - * a {@link ChannelInboundInvoker} or {@link ChannelOutboundInvoker} operation - * you need to implement its sub-interfaces. There are many different sub-interfaces - * which handles inbound and outbound operations. - * - * But the most useful for developers may be: + * {@link ChannelHandler} itself does not provide many methods, but you usually have to implement one of its subtypes: *

    - *
  • {@link ChannelInboundHandlerAdapter} handles and intercepts inbound operations
  • - *
  • {@link ChannelOutboundHandlerAdapter} handles and intercepts outbound operations
  • + *
  • {@link ChannelInboundHandler} to handle inbound I/O events, and
  • + *
  • {@link ChannelOutboundHandler} to handle outbound I/O operations.
  • *
- * - * You will also find more detailed explanation from the documentation of - * each sub-interface on how an event is interpreted when it goes upstream and - * downstream respectively. + *

+ *

+ * Alternatively, the following adapter classes are provided for your convenience: + *

    + *
  • {@link ChannelInboundHandlerAdapter} to handle inbound I/O events,
  • + *
  • {@link ChannelOutboundHandlerAdapter} to handle outbound I/O operations, and
  • + *
  • {@link ChannelHandlerAdapter} to handle both inbound and outbound events
  • + *
+ *

+ *

+ * For more information, please refer to the documentation of each subtype. + *

* *

The context object

*

diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java index 2d7ca803b1..937a35e940 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java @@ -30,9 +30,8 @@ import java.nio.channels.Channels; * *

Notify

* - * You can notify the closest handler in the - * same {@link ChannelPipeline} by calling one of the various methods which are listed in {@link ChannelInboundInvoker} - * and {@link ChannelOutboundInvoker}. Please refer to {@link ChannelPipeline} to understand how an event flows. + * You can notify the closest handler in the same {@link ChannelPipeline} by calling one of the various method. + * Please refer to {@link ChannelPipeline} to understand how an event flows. * *

Modifying a pipeline

* @@ -123,8 +122,7 @@ import java.nio.channels.Channels; * the operation in your application. */ public interface ChannelHandlerContext - extends AttributeMap, ChannelPropertyAccess, - ChannelInboundInvoker, ChannelOutboundInvoker { + extends AttributeMap, ChannelPropertyAccess, ChannelInboundOps, ChannelOutboundOps { /** * Return the {@link Channel} which is bound to the {@link ChannelHandlerContext}. @@ -132,9 +130,7 @@ public interface ChannelHandlerContext Channel channel(); /** - * The {@link EventExecutor} that is used to dispatch the events. This can also be used to directly - * submit tasks that get executed in the event loop. For more information please refer to the - * {@link EventExecutor} javadoc. + * Returns the {@link EventExecutor} which is used to execute an arbitrary task. */ EventExecutor executor(); @@ -153,7 +149,7 @@ public interface ChannelHandlerContext /** * Return {@code true} if the {@link ChannelHandler} which belongs to this {@link ChannelHandler} was removed * from the {@link ChannelPipeline}. Note that this method is only meant to be called from with in the - * {@link EventLoop}. + * {@link EventExecutor}. */ boolean isRemoved(); diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerInvoker.java b/transport/src/main/java/io/netty/channel/ChannelHandlerInvoker.java new file mode 100644 index 0000000000..dc7bc56e5a --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerInvoker.java @@ -0,0 +1,149 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel; + +import io.netty.util.concurrent.EventExecutor; + +import java.net.SocketAddress; + +/** + * Invokes the event handler methods of {@link ChannelInboundHandler} and {@link ChannelOutboundHandler}. + * A user can specify a {@link ChannelHandlerInvoker} to implement a custom thread model unsupported by the default + * implementation. + */ +public interface ChannelHandlerInvoker { + + /** + * Returns the {@link EventExecutor} which is used to execute an arbitrary task. + */ + EventExecutor executor(); + + /** + * Invokes {@link ChannelInboundHandler#channelRegistered(ChannelHandlerContext)}. This method is not for a user + * but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in + * {@link ChannelHandlerContext} instead. + */ + void invokeChannelRegistered(ChannelHandlerContext ctx); + + /** + * Invokes {@link ChannelInboundHandler#channelActive(ChannelHandlerContext)}. This method is not for a user + * but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in + * {@link ChannelHandlerContext} instead. + */ + void invokeChannelActive(ChannelHandlerContext ctx); + + /** + * Invokes {@link ChannelInboundHandler#channelInactive(ChannelHandlerContext)}. This method is not for a user + * but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in + * {@link ChannelHandlerContext} instead. + */ + void invokeChannelInactive(ChannelHandlerContext ctx); + + /** + * Invokes {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}. This method is not for a user + * but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in + * {@link ChannelHandlerContext} instead. + */ + void invokeExceptionCaught(ChannelHandlerContext ctx, Throwable cause); + + /** + * Invokes {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)}. This method is not for + * a user but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in + * {@link ChannelHandlerContext} instead. + */ + void invokeUserEventTriggered(ChannelHandlerContext ctx, Object event); + + /** + * Invokes {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is not for a user + * but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in + * {@link ChannelHandlerContext} instead. + */ + void invokeChannelRead(ChannelHandlerContext ctx, Object msg); + + /** + * Invokes {@link ChannelInboundHandler#channelReadComplete(ChannelHandlerContext)}. This method is not for a user + * but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in + * {@link ChannelHandlerContext} instead. + */ + void invokeChannelReadComplete(ChannelHandlerContext ctx); + + /** + * Invokes {@link ChannelInboundHandler#channelWritabilityChanged(ChannelHandlerContext)}. This method is not for + * a user but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in + * {@link ChannelHandlerContext} instead. + */ + void invokeChannelWritabilityChanged(ChannelHandlerContext ctx); + + /** + * Invokes {@link ChannelOutboundHandler#bind(ChannelHandlerContext, SocketAddress, ChannelPromise)}. + * This method is not for a user but for the internal {@link ChannelHandlerContext} implementation. + * To trigger an event, use the methods in {@link ChannelHandlerContext} instead. + */ + void invokeBind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise); + + /** + * Invokes + * {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)}. + * This method is not for a user but for the internal {@link ChannelHandlerContext} implementation. + * To trigger an event, use the methods in {@link ChannelHandlerContext} instead. + */ + void invokeConnect( + ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise); + + /** + * Invokes {@link ChannelOutboundHandler#disconnect(ChannelHandlerContext, ChannelPromise)}. + * This method is not for a user but for the internal {@link ChannelHandlerContext} implementation. + * To trigger an event, use the methods in {@link ChannelHandlerContext} instead. + */ + void invokeDisconnect(ChannelHandlerContext ctx, ChannelPromise promise); + + /** + * Invokes {@link ChannelOutboundHandler#close(ChannelHandlerContext, ChannelPromise)}. + * This method is not for a user but for the internal {@link ChannelHandlerContext} implementation. + * To trigger an event, use the methods in {@link ChannelHandlerContext} instead. + */ + void invokeClose(ChannelHandlerContext ctx, ChannelPromise promise); + + /** + * Invokes {@link ChannelOutboundHandler#read(ChannelHandlerContext)}. + * This method is not for a user but for the internal {@link ChannelHandlerContext} implementation. + * To trigger an event, use the methods in {@link ChannelHandlerContext} instead. + */ + void invokeRead(ChannelHandlerContext ctx); + + /** + * Invokes {@link ChannelOutboundHandler#write(ChannelHandlerContext, Object, ChannelPromise)}. + * This method is not for a user but for the internal {@link ChannelHandlerContext} implementation. + * To trigger an event, use the methods in {@link ChannelHandlerContext} instead. + */ + void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise); + + /** + * Invokes {@link ChannelOutboundHandler#write(ChannelHandlerContext, Object, ChannelPromise)} and + * {@link ChannelOutboundHandler#flush(ChannelHandlerContext)} sequentially. + * This method is not for a user but for the internal {@link ChannelHandlerContext} implementation. + * To trigger an event, use the methods in {@link ChannelHandlerContext} instead. + */ + void invokeWriteAndFlush(ChannelHandlerContext ctx, Object msg, ChannelPromise promise); + + /** + * Invokes {@link ChannelOutboundHandler#flush(ChannelHandlerContext)}. + * This method is not for a user but for the internal {@link ChannelHandlerContext} implementation. + * To trigger an event, use the methods in {@link ChannelHandlerContext} instead. + */ + void invokeFlush(ChannelHandlerContext ctx); +} diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerInvokerUtil.java b/transport/src/main/java/io/netty/channel/ChannelHandlerInvokerUtil.java new file mode 100644 index 0000000000..a72775fcd4 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerInvokerUtil.java @@ -0,0 +1,207 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel; + +import java.net.SocketAddress; + +import static io.netty.channel.DefaultChannelPipeline.*; + +/** + * A set of helper methods for easier implementation of custom {@link ChannelHandlerInvoker} implementation. + */ +public final class ChannelHandlerInvokerUtil { + + public static void invokeChannelRegisteredNow(ChannelHandlerContext ctx) { + try { + ((ChannelInboundHandler) ctx.handler()).channelRegistered(ctx); + } catch (Throwable t) { + notifyHandlerException(ctx, t); + } + } + + public static void invokeChannelActiveNow(final ChannelHandlerContext ctx) { + try { + ((ChannelInboundHandler) ctx.handler()).channelActive(ctx); + } catch (Throwable t) { + notifyHandlerException(ctx, t); + } + } + + public static void invokeChannelInactiveNow(final ChannelHandlerContext ctx) { + try { + ((ChannelInboundHandler) ctx.handler()).channelInactive(ctx); + } catch (Throwable t) { + notifyHandlerException(ctx, t); + } + } + + public static void invokeExceptionCaughtNow(final ChannelHandlerContext ctx, final Throwable cause) { + try { + ctx.handler().exceptionCaught(ctx, cause); + } catch (Throwable t) { + if (logger.isWarnEnabled()) { + logger.warn( + "An exception was thrown by a user handler's " + + "exceptionCaught() method while handling the following exception:", cause); + } + } + } + + public static void invokeUserEventTriggeredNow(final ChannelHandlerContext ctx, final Object event) { + try { + ((ChannelInboundHandler) ctx.handler()).userEventTriggered(ctx, event); + } catch (Throwable t) { + notifyHandlerException(ctx, t); + } + } + + public static void invokeChannelReadNow(final ChannelHandlerContext ctx, final Object msg) { + try { + ((ChannelInboundHandler) ctx.handler()).channelRead(ctx, msg); + } catch (Throwable t) { + notifyHandlerException(ctx, t); + } + } + + public static void invokeChannelReadCompleteNow(final ChannelHandlerContext ctx) { + try { + ((ChannelInboundHandler) ctx.handler()).channelReadComplete(ctx); + } catch (Throwable t) { + notifyHandlerException(ctx, t); + } + } + + public static void invokeChannelWritabilityChangedNow(final ChannelHandlerContext ctx) { + try { + ((ChannelInboundHandler) ctx.handler()).channelWritabilityChanged(ctx); + } catch (Throwable t) { + notifyHandlerException(ctx, t); + } + } + + public static void invokeBindNow( + final ChannelHandlerContext ctx, final SocketAddress localAddress, final ChannelPromise promise) { + try { + ((ChannelOutboundHandler) ctx.handler()).bind(ctx, localAddress, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } + public static void invokeConnectNow( + final ChannelHandlerContext ctx, + final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { + try { + ((ChannelOutboundHandler) ctx.handler()).connect(ctx, remoteAddress, localAddress, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } + + public static void invokeDisconnectNow(final ChannelHandlerContext ctx, final ChannelPromise promise) { + try { + ((ChannelOutboundHandler) ctx.handler()).disconnect(ctx, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } + + public static void invokeCloseNow(final ChannelHandlerContext ctx, final ChannelPromise promise) { + try { + ((ChannelOutboundHandler) ctx.handler()).close(ctx, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } + + public static void invokeReadNow(final ChannelHandlerContext ctx) { + try { + ((ChannelOutboundHandler) ctx.handler()).read(ctx); + } catch (Throwable t) { + notifyHandlerException(ctx, t); + } + } + + public static void invokeWriteNow(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + try { + ((ChannelOutboundHandler) ctx.handler()).write(ctx, msg, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } + + public static void invokeFlushNow(final ChannelHandlerContext ctx) { + try { + ((ChannelOutboundHandler) ctx.handler()).flush(ctx); + } catch (Throwable t) { + notifyHandlerException(ctx, t); + } + } + + public static void invokeWriteAndFlushNow(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + invokeWriteNow(ctx, msg, promise); + invokeFlushNow(ctx); + } + + private static void notifyHandlerException(ChannelHandlerContext ctx, Throwable cause) { + if (inExceptionCaught(cause)) { + if (logger.isWarnEnabled()) { + logger.warn( + "An exception was thrown by a user handler " + + "while handling an exceptionCaught event", cause); + } + return; + } + + invokeExceptionCaughtNow(ctx, cause); + } + + private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) { + // only try to fail the promise if its not a VoidChannelPromise, as + // the VoidChannelPromise would also fire the cause through the pipeline + if (promise instanceof VoidChannelPromise) { + return; + } + + if (!promise.tryFailure(cause)) { + if (logger.isWarnEnabled()) { + logger.warn("Failed to fail the promise because it's done already: {}", promise, cause); + } + } + } + + private static boolean inExceptionCaught(Throwable cause) { + do { + StackTraceElement[] trace = cause.getStackTrace(); + if (trace != null) { + for (StackTraceElement t : trace) { + if (t == null) { + break; + } + if ("exceptionCaught".equals(t.getMethodName())) { + return true; + } + } + } + + cause = cause.getCause(); + } while (cause != null); + + return false; + } + + private ChannelHandlerInvokerUtil() { } +} diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java b/transport/src/main/java/io/netty/channel/ChannelInboundOps.java similarity index 86% rename from transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java rename to transport/src/main/java/io/netty/channel/ChannelInboundOps.java index 0771762140..f6b1aebcd4 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundOps.java @@ -19,7 +19,7 @@ package io.netty.channel; /** * Interface which is shared by others which need to fire inbound events */ -interface ChannelInboundInvoker { +interface ChannelInboundOps { /** * A {@link Channel} was registered to its {@link EventLoop}. @@ -28,7 +28,7 @@ interface ChannelInboundInvoker { * called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. */ - ChannelInboundInvoker fireChannelRegistered(); + ChannelInboundOps fireChannelRegistered(); /** * A {@link Channel} is active now, which means it is connected. @@ -37,7 +37,7 @@ interface ChannelInboundInvoker { * called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. */ - ChannelInboundInvoker fireChannelActive(); + ChannelInboundOps fireChannelActive(); /** * A {@link Channel} is inactive now, which means it is closed. @@ -46,7 +46,7 @@ interface ChannelInboundInvoker { * called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. */ - ChannelInboundInvoker fireChannelInactive(); + ChannelInboundOps fireChannelInactive(); /** * A {@link Channel} received an {@link Throwable} in one of its inbound operations. @@ -55,7 +55,7 @@ interface ChannelInboundInvoker { * method called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. */ - ChannelInboundInvoker fireExceptionCaught(Throwable cause); + ChannelInboundOps fireExceptionCaught(Throwable cause); /** * A {@link Channel} received an user defined event. @@ -64,7 +64,7 @@ interface ChannelInboundInvoker { * method called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. */ - ChannelInboundInvoker fireUserEventTriggered(Object event); + ChannelInboundOps fireUserEventTriggered(Object event); /** * A {@link Channel} received a message. @@ -73,13 +73,13 @@ interface ChannelInboundInvoker { * method called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. */ - ChannelInboundInvoker fireChannelRead(Object msg); + ChannelInboundOps fireChannelRead(Object msg); - ChannelInboundInvoker fireChannelReadComplete(); + ChannelInboundOps fireChannelReadComplete(); /** * Triggers an {@link ChannelInboundHandler#channelWritabilityChanged(ChannelHandlerContext)} * event to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}. */ - ChannelInboundInvoker fireChannelWritabilityChanged(); + ChannelInboundOps fireChannelWritabilityChanged(); } diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java b/transport/src/main/java/io/netty/channel/ChannelOutboundOps.java similarity index 98% rename from transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java rename to transport/src/main/java/io/netty/channel/ChannelOutboundOps.java index f4de88cde6..56ebd04f13 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundOps.java @@ -21,7 +21,7 @@ import java.net.SocketAddress; /** * Interface which is shared by others which need to execute outbound logic. */ -interface ChannelOutboundInvoker { +interface ChannelOutboundOps { /** * Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation @@ -171,7 +171,7 @@ interface ChannelOutboundInvoker { * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. */ - ChannelOutboundInvoker read(); + ChannelOutboundOps read(); /** * Request to write a message via this ChannelOutboundInvoker through the {@link ChannelPipeline}. @@ -190,7 +190,7 @@ interface ChannelOutboundInvoker { /** * Request to flush all pending messages via this ChannelOutboundInvoker. */ - ChannelOutboundInvoker flush(); + ChannelOutboundOps flush(); /** * Shortcut for call {@link #write(Object, ChannelPromise)} and {@link #flush()}. diff --git a/transport/src/main/java/io/netty/channel/ChannelPipeline.java b/transport/src/main/java/io/netty/channel/ChannelPipeline.java index 7090f5dced..dcdef66016 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/ChannelPipeline.java @@ -212,7 +212,7 @@ import java.util.NoSuchElementException; * after the exchange. */ public interface ChannelPipeline - extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable> { + extends ChannelInboundOps, ChannelOutboundOps, Iterable> { /** * Inserts a {@link ChannelHandler} at the first position of this pipeline. @@ -242,6 +242,20 @@ public interface ChannelPipeline */ ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler); + /** + * Inserts a {@link ChannelHandler} at the first position of this pipeline. + * + * @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods + * @param name the name of the handler to insert first + * @param handler the handler to insert first + * + * @throws IllegalArgumentException + * if there's an entry with the same name already in the pipeline + * @throws NullPointerException + * if the specified name or handler is {@code null} + */ + ChannelPipeline addFirst(ChannelHandlerInvoker invoker, String name, ChannelHandler handler); + /** * Appends a {@link ChannelHandler} at the last position of this pipeline. * @@ -270,6 +284,20 @@ public interface ChannelPipeline */ ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler); + /** + * Appends a {@link ChannelHandler} at the last position of this pipeline. + * + * @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods + * @param name the name of the handler to append + * @param handler the handler to append + * + * @throws IllegalArgumentException + * if there's an entry with the same name already in the pipeline + * @throws NullPointerException + * if the specified name or handler is {@code null} + */ + ChannelPipeline addLast(ChannelHandlerInvoker invoker, String name, ChannelHandler handler); + /** * Inserts a {@link ChannelHandler} before an existing handler of this * pipeline. @@ -306,6 +334,24 @@ public interface ChannelPipeline */ ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler); + /** + * Inserts a {@link ChannelHandler} before an existing handler of this + * pipeline. + * + * @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods + * @param baseName the name of the existing handler + * @param name the name of the handler to insert before + * @param handler the handler to insert before + * + * @throws NoSuchElementException + * if there's no such entry with the specified {@code baseName} + * @throws IllegalArgumentException + * if there's an entry with the same name already in the pipeline + * @throws NullPointerException + * if the specified baseName, name, or handler is {@code null} + */ + ChannelPipeline addBefore(ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler); + /** * Inserts a {@link ChannelHandler} after an existing handler of this * pipeline. @@ -342,6 +388,24 @@ public interface ChannelPipeline */ ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler); + /** + * Inserts a {@link ChannelHandler} after an existing handler of this + * pipeline. + * + * @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods + * @param baseName the name of the existing handler + * @param name the name of the handler to insert after + * @param handler the handler to insert after + * + * @throws NoSuchElementException + * if there's no such entry with the specified {@code baseName} + * @throws IllegalArgumentException + * if there's an entry with the same name already in the pipeline + * @throws NullPointerException + * if the specified baseName, name, or handler is {@code null} + */ + ChannelPipeline addAfter(ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler); + /** * Inserts a {@link ChannelHandler}s at the first position of this pipeline. * @@ -360,6 +424,15 @@ public interface ChannelPipeline */ ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers); + /** + * Inserts a {@link ChannelHandler}s at the first position of this pipeline. + * + * @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods + * @param handlers the handlers to insert first + * + */ + ChannelPipeline addFirst(ChannelHandlerInvoker invoker, ChannelHandler... handlers); + /** * Inserts a {@link ChannelHandler}s at the last position of this pipeline. * @@ -378,6 +451,15 @@ public interface ChannelPipeline */ ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers); + /** + * Inserts a {@link ChannelHandler}s at the last position of this pipeline. + * + * @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods + * @param handlers the handlers to insert last + * + */ + ChannelPipeline addLast(ChannelHandlerInvoker invoker, ChannelHandler... handlers); + /** * Removes the specified {@link ChannelHandler} from this pipeline. * diff --git a/transport/src/main/java/io/netty/channel/ChannelPropertyAccess.java b/transport/src/main/java/io/netty/channel/ChannelPropertyAccess.java index 7daa5fb9fe..5ca6c01ece 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPropertyAccess.java +++ b/transport/src/main/java/io/netty/channel/ChannelPropertyAccess.java @@ -17,7 +17,6 @@ package io.netty.channel; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; /** @@ -60,11 +59,8 @@ interface ChannelPropertyAccess { ChannelFuture newFailedFuture(Throwable cause); /** - * Return a special ChannelPromise which can be reused for different operations. - *

- * It's only supported to use - * it for {@link ChannelOutboundInvoker#write(Object, ChannelPromise)}. - *

+ * Return a special ChannelPromise which can be reused for {@code write(..)} operations. Using it for other + * outbound operations will fail with undetermined consequences. *

* Be aware that the returned {@link ChannelPromise} will not support most operations and should only be used * if you want to save an object allocation for every write operation. You will not be able to detect if the diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index 2a99b1fa83..ed75cc37cd 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -15,13 +15,9 @@ */ package io.netty.channel; -import static io.netty.channel.DefaultChannelPipeline.logger; import io.netty.buffer.ByteBufAllocator; import io.netty.util.DefaultAttributeMap; -import io.netty.util.Recycler; import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.EventExecutorGroup; -import io.netty.util.internal.StringUtil; import java.net.SocketAddress; @@ -38,17 +34,17 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements // Will be set to null if no child executor should be used, otherwise it will be set to the // child executor. - final EventExecutor executor; + final ChannelHandlerInvoker invoker; private ChannelFuture succeededFuture; // Lazily instantiated tasks used to trigger events to a handler with different executor. - private Runnable invokeChannelReadCompleteTask; - private Runnable invokeReadTask; - private Runnable invokeFlushTask; - private Runnable invokeChannelWritableStateChangedTask; + Runnable invokeChannelReadCompleteTask; + Runnable invokeReadTask; + Runnable invokeFlushTask; + Runnable invokeChannelWritableStateChangedTask; - DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutorGroup group, String name, - ChannelHandler handler) { + DefaultChannelHandlerContext( + DefaultChannelPipeline pipeline, ChannelHandlerInvoker invoker, String name, ChannelHandler handler) { if (name == null) { throw new NullPointerException("name"); @@ -62,17 +58,10 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements this.name = name; this.handler = handler; - if (group != null) { - // Pin one of the child executors once and remember it so that the same child executor - // is used to fire events for the same channel. - EventExecutor childExecutor = pipeline.childExecutors.get(group); - if (childExecutor == null) { - childExecutor = group.next(); - pipeline.childExecutors.put(group, childExecutor); - } - executor = childExecutor; + if (invoker == null) { + this.invoker = channel.unsafe().invoker(); } else { - executor = null; + this.invoker = invoker; } } @@ -118,11 +107,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements @Override public EventExecutor executor() { - if (executor == null) { - return channel().eventLoop(); - } else { - return executor; - } + return invoker.executor(); } @Override @@ -137,237 +122,60 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements @Override public ChannelHandlerContext fireChannelRegistered() { - final DefaultChannelHandlerContext next = findContextInbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeChannelRegistered(); - } else { - executor.execute(new Runnable() { - @Override - public void run() { - next.invokeChannelRegistered(); - } - }); - } + DefaultChannelHandlerContext next = findContextInbound(); + next.invoker.invokeChannelRegistered(next); return this; } - private void invokeChannelRegistered() { - try { - ((ChannelInboundHandler) handler).channelRegistered(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } - @Override public ChannelHandlerContext fireChannelActive() { - final DefaultChannelHandlerContext next = findContextInbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeChannelActive(); - } else { - executor.execute(new Runnable() { - @Override - public void run() { - next.invokeChannelActive(); - } - }); - } + DefaultChannelHandlerContext next = findContextInbound(); + next.invoker.invokeChannelActive(next); return this; } - private void invokeChannelActive() { - try { - ((ChannelInboundHandler) handler).channelActive(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } - @Override public ChannelHandlerContext fireChannelInactive() { - final DefaultChannelHandlerContext next = findContextInbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeChannelInactive(); - } else { - executor.execute(new Runnable() { - @Override - public void run() { - next.invokeChannelInactive(); - } - }); - } + DefaultChannelHandlerContext next = findContextInbound(); + next.invoker.invokeChannelInactive(next); return this; } - private void invokeChannelInactive() { - try { - ((ChannelInboundHandler) handler).channelInactive(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } - @Override - public ChannelHandlerContext fireExceptionCaught(final Throwable cause) { - if (cause == null) { - throw new NullPointerException("cause"); - } - - final DefaultChannelHandlerContext next = this.next; - - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeExceptionCaught(cause); - } else { - try { - executor.execute(new Runnable() { - @Override - public void run() { - next.invokeExceptionCaught(cause); - } - }); - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to submit an exceptionCaught() event.", t); - logger.warn("The exceptionCaught() event that was failed to submit was:", cause); - } - } - } - + public ChannelHandlerContext fireExceptionCaught(Throwable cause) { + DefaultChannelHandlerContext next = this.next; + next.invoker.invokeExceptionCaught(next, cause); return this; } - private void invokeExceptionCaught(final Throwable cause) { - try { - handler.exceptionCaught(this, cause); - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - logger.warn( - "An exception was thrown by a user handler's " + - "exceptionCaught() method while handling the following exception:", cause); - } - } - } - @Override - public ChannelHandlerContext fireUserEventTriggered(final Object event) { - if (event == null) { - throw new NullPointerException("event"); - } - - final DefaultChannelHandlerContext next = findContextInbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeUserEventTriggered(event); - } else { - executor.execute(new Runnable() { - @Override - public void run() { - next.invokeUserEventTriggered(event); - } - }); - } + public ChannelHandlerContext fireUserEventTriggered(Object event) { + DefaultChannelHandlerContext next = findContextInbound(); + next.invoker.invokeUserEventTriggered(next, event); return this; } - private void invokeUserEventTriggered(Object event) { - try { - ((ChannelInboundHandler) handler).userEventTriggered(this, event); - } catch (Throwable t) { - notifyHandlerException(t); - } - } - @Override - public ChannelHandlerContext fireChannelRead(final Object msg) { - if (msg == null) { - throw new NullPointerException("msg"); - } - - final DefaultChannelHandlerContext next = findContextInbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeChannelRead(msg); - } else { - executor.execute(new Runnable() { - @Override - public void run() { - next.invokeChannelRead(msg); - } - }); - } + public ChannelHandlerContext fireChannelRead(Object msg) { + DefaultChannelHandlerContext next = findContextInbound(); + next.invoker.invokeChannelRead(next, msg); return this; } - private void invokeChannelRead(Object msg) { - try { - ((ChannelInboundHandler) handler).channelRead(this, msg); - } catch (Throwable t) { - notifyHandlerException(t); - } - } - @Override public ChannelHandlerContext fireChannelReadComplete() { - final DefaultChannelHandlerContext next = findContextInbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeChannelReadComplete(); - } else { - Runnable task = next.invokeChannelReadCompleteTask; - if (task == null) { - next.invokeChannelReadCompleteTask = task = new Runnable() { - @Override - public void run() { - next.invokeChannelReadComplete(); - } - }; - } - executor.execute(task); - } + DefaultChannelHandlerContext next = findContextInbound(); + next.invoker.invokeChannelReadComplete(next); return this; } - private void invokeChannelReadComplete() { - try { - ((ChannelInboundHandler) handler).channelReadComplete(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } - @Override public ChannelHandlerContext fireChannelWritabilityChanged() { - final DefaultChannelHandlerContext next = findContextInbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeChannelWritabilityChanged(); - } else { - Runnable task = next.invokeChannelWritableStateChangedTask; - if (task == null) { - next.invokeChannelWritableStateChangedTask = task = new Runnable() { - @Override - public void run() { - next.invokeChannelWritabilityChanged(); - } - }; - } - executor.execute(task); - } + DefaultChannelHandlerContext next = findContextInbound(); + next.invoker.invokeChannelWritabilityChanged(next); return this; } - private void invokeChannelWritabilityChanged() { - try { - ((ChannelInboundHandler) handler).channelWritabilityChanged(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } - @Override public ChannelFuture bind(SocketAddress localAddress) { return bind(localAddress, newPromise()); @@ -395,258 +203,72 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements @Override public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { - if (localAddress == null) { - throw new NullPointerException("localAddress"); - } - validatePromise(promise, false); - - final DefaultChannelHandlerContext next = findContextOutbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeBind(localAddress, promise); - } else { - safeExecute(executor, new Runnable() { - @Override - public void run() { - next.invokeBind(localAddress, promise); - } - }, promise); - } - + DefaultChannelHandlerContext next = findContextOutbound(); + next.invoker.invokeBind(next, localAddress, promise); return promise; } - private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { - try { - ((ChannelOutboundHandler) handler).bind(this, localAddress, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } - @Override public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return connect(remoteAddress, null, promise); } @Override - public ChannelFuture connect( - final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { - - if (remoteAddress == null) { - throw new NullPointerException("remoteAddress"); - } - validatePromise(promise, false); - - final DefaultChannelHandlerContext next = findContextOutbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeConnect(remoteAddress, localAddress, promise); - } else { - safeExecute(executor, new Runnable() { - @Override - public void run() { - next.invokeConnect(remoteAddress, localAddress, promise); - } - }, promise); - } - + public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + DefaultChannelHandlerContext next = findContextOutbound(); + next.invoker.invokeConnect(next, remoteAddress, localAddress, promise); return promise; } - private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { - try { - ((ChannelOutboundHandler) handler).connect(this, remoteAddress, localAddress, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } - @Override - public ChannelFuture disconnect(final ChannelPromise promise) { - validatePromise(promise, false); - - final DefaultChannelHandlerContext next = findContextOutbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - // Translate disconnect to close if the channel has no notion of disconnect-reconnect. - // So far, UDP/IP is the only transport that has such behavior. - if (!channel().metadata().hasDisconnect()) { - next.invokeClose(promise); - } else { - next.invokeDisconnect(promise); - } - } else { - safeExecute(executor, new Runnable() { - @Override - public void run() { - if (!channel().metadata().hasDisconnect()) { - next.invokeClose(promise); - } else { - next.invokeDisconnect(promise); - } - } - }, promise); + public ChannelFuture disconnect(ChannelPromise promise) { + if (!channel().metadata().hasDisconnect()) { + return close(promise); } + DefaultChannelHandlerContext next = findContextOutbound(); + next.invoker.invokeDisconnect(next, promise); return promise; } - private void invokeDisconnect(ChannelPromise promise) { - try { - ((ChannelOutboundHandler) handler).disconnect(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } - @Override - public ChannelFuture close(final ChannelPromise promise) { - validatePromise(promise, false); - - final DefaultChannelHandlerContext next = findContextOutbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeClose(promise); - } else { - safeExecute(executor, new Runnable() { - @Override - public void run() { - next.invokeClose(promise); - } - }, promise); - } - + public ChannelFuture close(ChannelPromise promise) { + DefaultChannelHandlerContext next = findContextOutbound(); + next.invoker.invokeClose(next, promise); return promise; } - private void invokeClose(ChannelPromise promise) { - try { - ((ChannelOutboundHandler) handler).close(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } - @Override public ChannelHandlerContext read() { - final DefaultChannelHandlerContext next = findContextOutbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeRead(); - } else { - Runnable task = next.invokeReadTask; - if (task == null) { - next.invokeReadTask = task = new Runnable() { - @Override - public void run() { - next.invokeRead(); - } - }; - } - executor.execute(task); - } - + DefaultChannelHandlerContext next = findContextOutbound(); + next.invoker.invokeRead(next); return this; } - private void invokeRead() { - try { - ((ChannelOutboundHandler) handler).read(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } - @Override public ChannelFuture write(Object msg) { return write(msg, newPromise()); } @Override - public ChannelFuture write(final Object msg, final ChannelPromise promise) { - if (msg == null) { - throw new NullPointerException("msg"); - } - - validatePromise(promise, true); - - write(msg, false, promise); - + public ChannelFuture write(Object msg, ChannelPromise promise) { + DefaultChannelHandlerContext next = findContextOutbound(); + next.invoker.invokeWrite(next, msg, promise); return promise; } - private void invokeWrite(Object msg, ChannelPromise promise) { - try { - ((ChannelOutboundHandler) handler).write(this, msg, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } - @Override public ChannelHandlerContext flush() { - final DefaultChannelHandlerContext next = findContextOutbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeFlush(); - } else { - Runnable task = next.invokeFlushTask; - if (task == null) { - next.invokeFlushTask = task = new Runnable() { - @Override - public void run() { - next.invokeFlush(); - } - }; - } - safeExecute(executor, task, channel.voidPromise()); - } - + DefaultChannelHandlerContext next = findContextOutbound(); + next.invoker.invokeFlush(next); return this; } - private void invokeFlush() { - try { - ((ChannelOutboundHandler) handler).flush(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } - @Override public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { - if (msg == null) { - throw new NullPointerException("msg"); - } - - validatePromise(promise, true); - - write(msg, true, promise); - - return promise; - } - - private void write(Object msg, boolean flush, ChannelPromise promise) { - DefaultChannelHandlerContext next = findContextOutbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeWrite(msg, promise); - if (flush) { - next.invokeFlush(); - } - } else { - int size = channel.estimatorHandle().size(msg); - if (size > 0) { - ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer(); - // Check for null as it may be set to null if the channel is closed already - if (buffer != null) { - buffer.incrementPendingOutboundBytes(size); - } - } - safeExecute(executor, WriteTask.newInstance(next, msg, size, flush, promise), promise); - } + next.invoker.invokeWriteAndFlush(next, msg, promise); + return promise; } @Override @@ -654,53 +276,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements return writeAndFlush(msg, newPromise()); } - private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) { - // only try to fail the promise if its not a VoidChannelPromise, as - // the VoidChannelPromise would also fire the cause through the pipeline - if (promise instanceof VoidChannelPromise) { - return; - } - - if (!promise.tryFailure(cause)) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to fail the promise because it's done already: {}", promise, cause); - } - } - } - - private void notifyHandlerException(Throwable cause) { - if (inExceptionCaught(cause)) { - if (logger.isWarnEnabled()) { - logger.warn( - "An exception was thrown by a user handler " + - "while handling an exceptionCaught event", cause); - } - return; - } - - invokeExceptionCaught(cause); - } - - private static boolean inExceptionCaught(Throwable cause) { - do { - StackTraceElement[] trace = cause.getStackTrace(); - if (trace != null) { - for (StackTraceElement t : trace) { - if (t == null) { - break; - } - if ("exceptionCaught".equals(t.getMethodName())) { - return true; - } - } - } - - cause = cause.getCause(); - } while (cause != null); - - return false; - } - @Override public ChannelPromise newPromise() { return new DefaultChannelPromise(channel(), executor()); @@ -725,35 +300,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements return new FailedChannelFuture(channel(), executor(), cause); } - private void validatePromise(ChannelPromise promise, boolean allowVoidPromise) { - if (promise == null) { - throw new NullPointerException("promise"); - } - - if (promise.isDone()) { - throw new IllegalArgumentException("promise already done: " + promise); - } - - if (promise.channel() != channel()) { - throw new IllegalArgumentException(String.format( - "promise.channel does not match: %s (expected: %s)", promise.channel(), channel())); - } - - if (promise.getClass() == DefaultChannelPromise.class) { - return; - } - - if (!allowVoidPromise && promise instanceof VoidChannelPromise) { - throw new IllegalArgumentException( - StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation"); - } - - if (promise instanceof AbstractChannel.CloseFuture) { - throw new IllegalArgumentException( - StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline"); - } - } - private DefaultChannelHandlerContext findContextInbound() { DefaultChannelHandlerContext ctx = this; do { @@ -783,68 +329,4 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements public boolean isRemoved() { return removed; } - - private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise) { - try { - executor.execute(runnable); - } catch (Throwable cause) { - promise.setFailure(cause); - } - } - - static final class WriteTask implements Runnable { - private DefaultChannelHandlerContext ctx; - private Object msg; - private ChannelPromise promise; - private int size; - private boolean flush; - - private static final Recycler RECYCLER = new Recycler() { - @Override - protected WriteTask newObject(Handle handle) { - return new WriteTask(handle); - } - }; - - private static WriteTask newInstance( - DefaultChannelHandlerContext ctx, Object msg, int size, boolean flush, ChannelPromise promise) { - WriteTask task = RECYCLER.get(); - task.ctx = ctx; - task.msg = msg; - task.promise = promise; - task.size = size; - task.flush = flush; - return task; - } - - private final Recycler.Handle handle; - - private WriteTask(Recycler.Handle handle) { - this.handle = handle; - } - - @Override - public void run() { - try { - if (size > 0) { - ChannelOutboundBuffer buffer = ctx.channel.unsafe().outboundBuffer(); - // Check for null as it may be set to null if the channel is closed already - if (buffer != null) { - buffer.decrementPendingOutboundBytes(size); - } - } - ctx.invokeWrite(msg, promise); - if (flush) { - ctx.invokeFlush(); - } - } finally { - // Set to null so the GC can collect them directly - ctx = null; - msg = null; - promise = null; - - RECYCLER.recycle(this, handle); - } - } - } } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerInvoker.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerInvoker.java new file mode 100644 index 0000000000..5c729bf74b --- /dev/null +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerInvoker.java @@ -0,0 +1,459 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel; + +import io.netty.util.Recycler; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.internal.StringUtil; + +import java.net.SocketAddress; + +import static io.netty.channel.ChannelHandlerInvokerUtil.*; +import static io.netty.channel.DefaultChannelPipeline.*; + +public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker { + + private final EventExecutor executor; + + public DefaultChannelHandlerInvoker(EventExecutor executor) { + if (executor == null) { + throw new NullPointerException("executor"); + } + + this.executor = executor; + } + + @Override + public EventExecutor executor() { + return executor; + } + + @Override + public void invokeChannelRegistered(final ChannelHandlerContext ctx) { + if (executor.inEventLoop()) { + invokeChannelRegisteredNow(ctx); + } else { + executor.execute(new Runnable() { + @Override + public void run() { + invokeChannelRegisteredNow(ctx); + } + }); + } + } + + @Override + public void invokeChannelActive(final ChannelHandlerContext ctx) { + if (executor.inEventLoop()) { + invokeChannelActiveNow(ctx); + } else { + executor.execute(new Runnable() { + @Override + public void run() { + invokeChannelActiveNow(ctx); + } + }); + } + } + + @Override + public void invokeChannelInactive(final ChannelHandlerContext ctx) { + if (executor.inEventLoop()) { + invokeChannelInactiveNow(ctx); + } else { + executor.execute(new Runnable() { + @Override + public void run() { + invokeChannelInactiveNow(ctx); + } + }); + } + } + + @Override + public void invokeExceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { + if (cause == null) { + throw new NullPointerException("cause"); + } + + if (executor.inEventLoop()) { + invokeExceptionCaughtNow(ctx, cause); + } else { + try { + executor.execute(new Runnable() { + @Override + public void run() { + invokeExceptionCaughtNow(ctx, cause); + } + }); + } catch (Throwable t) { + if (logger.isWarnEnabled()) { + logger.warn("Failed to submit an exceptionCaught() event.", t); + logger.warn("The exceptionCaught() event that was failed to submit was:", cause); + } + } + } + } + + @Override + public void invokeUserEventTriggered(final ChannelHandlerContext ctx, final Object event) { + if (event == null) { + throw new NullPointerException("event"); + } + + if (executor.inEventLoop()) { + invokeUserEventTriggeredNow(ctx, event); + } else { + safeExecuteInbound(new Runnable() { + @Override + public void run() { + invokeUserEventTriggeredNow(ctx, event); + } + }, event); + } + } + + @Override + public void invokeChannelRead(final ChannelHandlerContext ctx, final Object msg) { + if (msg == null) { + throw new NullPointerException("msg"); + } + + if (executor.inEventLoop()) { + invokeChannelReadNow(ctx, msg); + } else { + safeExecuteInbound(new Runnable() { + @Override + public void run() { + invokeChannelReadNow(ctx, msg); + } + }, msg); + } + } + + @Override + public void invokeChannelReadComplete(final ChannelHandlerContext ctx) { + if (executor.inEventLoop()) { + invokeChannelReadCompleteNow(ctx); + } else { + DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx; + Runnable task = dctx.invokeChannelReadCompleteTask; + if (task == null) { + dctx.invokeChannelReadCompleteTask = task = new Runnable() { + @Override + public void run() { + invokeChannelReadCompleteNow(ctx); + } + }; + } + executor.execute(task); + } + } + + @Override + public void invokeChannelWritabilityChanged(final ChannelHandlerContext ctx) { + if (executor.inEventLoop()) { + invokeChannelWritabilityChangedNow(ctx); + } else { + DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx; + Runnable task = dctx.invokeChannelWritableStateChangedTask; + if (task == null) { + dctx.invokeChannelWritableStateChangedTask = task = new Runnable() { + @Override + public void run() { + invokeChannelWritabilityChangedNow(ctx); + } + }; + } + executor.execute(task); + } + } + + @Override + public void invokeBind( + final ChannelHandlerContext ctx, final SocketAddress localAddress, final ChannelPromise promise) { + if (localAddress == null) { + throw new NullPointerException("localAddress"); + } + validatePromise(ctx, promise, false); + + if (executor.inEventLoop()) { + invokeBindNow(ctx, localAddress, promise); + } else { + safeExecuteOutbound(new Runnable() { + @Override + public void run() { + invokeBindNow(ctx, localAddress, promise); + } + }, promise); + } + } + + @Override + public void invokeConnect( + final ChannelHandlerContext ctx, + final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { + if (remoteAddress == null) { + throw new NullPointerException("remoteAddress"); + } + validatePromise(ctx, promise, false); + + if (executor.inEventLoop()) { + invokeConnectNow(ctx, remoteAddress, localAddress, promise); + } else { + safeExecuteOutbound(new Runnable() { + @Override + public void run() { + invokeConnectNow(ctx, remoteAddress, localAddress, promise); + } + }, promise); + } + } + + @Override + public void invokeDisconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) { + validatePromise(ctx, promise, false); + + if (executor.inEventLoop()) { + invokeDisconnectNow(ctx, promise); + } else { + safeExecuteOutbound(new Runnable() { + @Override + public void run() { + invokeDisconnectNow(ctx, promise); + } + }, promise); + } + } + + @Override + public void invokeClose(final ChannelHandlerContext ctx, final ChannelPromise promise) { + validatePromise(ctx, promise, false); + + if (executor.inEventLoop()) { + invokeCloseNow(ctx, promise); + } else { + safeExecuteOutbound(new Runnable() { + @Override + public void run() { + invokeCloseNow(ctx, promise); + } + }, promise); + } + } + + @Override + public void invokeRead(final ChannelHandlerContext ctx) { + if (executor.inEventLoop()) { + invokeReadNow(ctx); + } else { + DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx; + Runnable task = dctx.invokeReadTask; + if (task == null) { + dctx.invokeReadTask = task = new Runnable() { + @Override + public void run() { + invokeReadNow(ctx); + } + }; + } + executor.execute(task); + } + } + + @Override + public void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + if (msg == null) { + throw new NullPointerException("msg"); + } + + validatePromise(ctx, promise, true); + invokeWrite(ctx, msg, false, promise); + } + + private void invokeWrite(ChannelHandlerContext ctx, Object msg, boolean flush, ChannelPromise promise) { + + if (executor.inEventLoop()) { + invokeWriteNow(ctx, msg, promise); + if (flush) { + invokeFlushNow(ctx); + } + } else { + AbstractChannel channel = (AbstractChannel) ctx.channel(); + int size = channel.estimatorHandle().size(msg); + if (size > 0) { + ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer(); + // Check for null as it may be set to null if the channel is closed already + if (buffer != null) { + buffer.incrementPendingOutboundBytes(size); + } + } + safeExecuteOutbound(WriteTask.newInstance(ctx, msg, size, flush, promise), promise, msg); + } + } + + @Override + public void invokeFlush(final ChannelHandlerContext ctx) { + if (executor.inEventLoop()) { + invokeFlushNow(ctx); + } else { + DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx; + Runnable task = dctx.invokeFlushTask; + if (task == null) { + dctx.invokeFlushTask = task = new Runnable() { + @Override + public void run() { + invokeFlushNow(ctx); + } + }; + } + executor.execute(task); + } + } + + @Override + public void invokeWriteAndFlush(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + if (msg == null) { + throw new NullPointerException("msg"); + } + + validatePromise(ctx, promise, true); + + invokeWrite(ctx, msg, true, promise); + } + + private static void validatePromise(ChannelHandlerContext ctx, ChannelPromise promise, boolean allowVoidPromise) { + if (ctx == null) { + throw new NullPointerException("ctx"); + } + + if (promise == null) { + throw new NullPointerException("promise"); + } + + if (promise.isDone()) { + throw new IllegalArgumentException("promise already done: " + promise); + } + + if (promise.channel() != ctx.channel()) { + throw new IllegalArgumentException(String.format( + "promise.channel does not match: %s (expected: %s)", promise.channel(), ctx.channel())); + } + + if (promise.getClass() == DefaultChannelPromise.class) { + return; + } + + if (!allowVoidPromise && promise instanceof VoidChannelPromise) { + throw new IllegalArgumentException( + StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation"); + } + + if (promise instanceof AbstractChannel.CloseFuture) { + throw new IllegalArgumentException( + StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline"); + } + } + + private void safeExecuteInbound(Runnable task, Object msg) { + boolean success = false; + try { + executor.execute(task); + success = true; + } finally { + if (!success) { + ReferenceCountUtil.release(msg); + } + } + } + + private void safeExecuteOutbound(Runnable task, ChannelPromise promise) { + try { + executor.execute(task); + } catch (Throwable cause) { + promise.setFailure(cause); + } + } + private void safeExecuteOutbound(Runnable task, ChannelPromise promise, Object msg) { + try { + executor.execute(task); + } catch (Throwable cause) { + try { + promise.setFailure(cause); + } finally { + ReferenceCountUtil.release(msg); + } + } + } + + static final class WriteTask implements Runnable { + private ChannelHandlerContext ctx; + private Object msg; + private ChannelPromise promise; + private int size; + private boolean flush; + + private static final Recycler RECYCLER = new Recycler() { + @Override + protected WriteTask newObject(Handle handle) { + return new WriteTask(handle); + } + }; + + private static WriteTask newInstance( + ChannelHandlerContext ctx, Object msg, int size, boolean flush, ChannelPromise promise) { + WriteTask task = RECYCLER.get(); + task.ctx = ctx; + task.msg = msg; + task.promise = promise; + task.size = size; + task.flush = flush; + return task; + } + + private final Recycler.Handle handle; + + private WriteTask(Recycler.Handle handle) { + this.handle = handle; + } + + @Override + public void run() { + try { + if (size > 0) { + ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer(); + // Check for null as it may be set to null if the channel is closed already + if (buffer != null) { + buffer.decrementPendingOutboundBytes(size); + } + } + invokeWriteNow(ctx, msg, promise); + if (flush) { + invokeFlushNow(ctx); + } + } finally { + // Set to null so the GC can collect them directly + ctx = null; + msg = null; + promise = null; + + RECYCLER.recycle(this, handle); + } + } + } +} diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index dafe98bddd..ec8256a971 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -63,8 +63,8 @@ final class DefaultChannelPipeline implements ChannelPipeline { private final Map name2ctx = new HashMap(4); - final Map childExecutors = - new IdentityHashMap(); + final Map childInvokers = + new IdentityHashMap(); public DefaultChannelPipeline(AbstractChannel channel) { if (channel == null) { @@ -89,14 +89,22 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline addFirst(String name, ChannelHandler handler) { - return addFirst(null, name, handler); + return addFirst((ChannelHandlerInvoker) null, name, handler); } @Override - public ChannelPipeline addFirst(EventExecutorGroup group, final String name, ChannelHandler handler) { + public ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) { + return addFirst(findInvoker(group), name, handler); + } + + @Override + public ChannelPipeline addFirst(ChannelHandlerInvoker invoker, final String name, ChannelHandler handler) { synchronized (this) { checkDuplicateName(name); - DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler); + + DefaultChannelHandlerContext newCtx = + new DefaultChannelHandlerContext(this, invoker, name, handler); + addFirst0(name, newCtx); } @@ -119,15 +127,22 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline addLast(String name, ChannelHandler handler) { - return addLast(null, name, handler); + return addLast((ChannelHandlerInvoker) null, name, handler); } @Override - public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) { + public ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { + return addLast(findInvoker(group), name, handler); + } + + @Override + public ChannelPipeline addLast(ChannelHandlerInvoker invoker, final String name, ChannelHandler handler) { synchronized (this) { checkDuplicateName(name); - DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler); + DefaultChannelHandlerContext newCtx = + new DefaultChannelHandlerContext(this, invoker, name, handler); + addLast0(name, newCtx); } @@ -150,16 +165,25 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) { - return addBefore(null, baseName, name, handler); + return addBefore((ChannelHandlerInvoker) null, baseName, name, handler); + } + + @Override + public ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) { + return addBefore(findInvoker(group), baseName, name, handler); } @Override public ChannelPipeline addBefore( - EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) { + ChannelHandlerInvoker invoker, String baseName, final String name, ChannelHandler handler) { synchronized (this) { DefaultChannelHandlerContext ctx = getContextOrDie(baseName); + checkDuplicateName(name); - DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler); + + DefaultChannelHandlerContext newCtx = + new DefaultChannelHandlerContext(this, invoker, name, handler); + addBefore0(name, ctx, newCtx); } return this; @@ -180,16 +204,24 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) { - return addAfter(null, baseName, name, handler); + return addAfter((ChannelHandlerInvoker) null, baseName, name, handler); + } + + @Override + public ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) { + return addAfter(findInvoker(group), baseName, name, handler); } @Override public ChannelPipeline addAfter( - EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) { + ChannelHandlerInvoker invoker, String baseName, final String name, ChannelHandler handler) { synchronized (this) { DefaultChannelHandlerContext ctx = getContextOrDie(baseName); + checkDuplicateName(name); - DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler); + + DefaultChannelHandlerContext newCtx = + new DefaultChannelHandlerContext(this, invoker, name, handler); addAfter0(name, ctx, newCtx); } @@ -213,11 +245,16 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline addFirst(ChannelHandler... handlers) { - return addFirst(null, handlers); + return addFirst((ChannelHandlerInvoker) null, handlers); } @Override - public ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) { + public ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers) { + return addFirst(findInvoker(group), handlers); + } + + @Override + public ChannelPipeline addFirst(ChannelHandlerInvoker invoker, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } @@ -234,7 +271,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { for (int i = size - 1; i >= 0; i --) { ChannelHandler h = handlers[i]; - addFirst(executor, generateName(h), h); + addFirst(invoker, generateName(h), h); } return this; @@ -242,11 +279,16 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline addLast(ChannelHandler... handlers) { - return addLast(null, handlers); + return addLast((ChannelHandlerInvoker) null, handlers); } @Override - public ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { + public ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers) { + return addLast(findInvoker(group), handlers); + } + + @Override + public ChannelPipeline addLast(ChannelHandlerInvoker invoker, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } @@ -255,12 +297,33 @@ final class DefaultChannelPipeline implements ChannelPipeline { if (h == null) { break; } - addLast(executor, generateName(h), h); + addLast(invoker, generateName(h), h); } return this; } + private ChannelHandlerInvoker findInvoker(EventExecutorGroup group) { + if (group == null) { + return null; + } + + // Pin one of the child executors once and remember it so that the same child executor + // is used to fire events for the same channel. + ChannelHandlerInvoker invoker = childInvokers.get(group); + if (invoker == null) { + EventExecutor executor = group.next(); + if (executor instanceof EventLoop) { + invoker = ((EventLoop) executor).asInvoker(); + } else { + invoker = new DefaultChannelHandlerInvoker(executor); + } + childInvokers.put(group, invoker); + } + + return invoker; + } + private String generateName(ChannelHandler handler) { WeakHashMap, String> cache = nameCaches[(int) (Thread.currentThread().getId() % nameCaches.length)]; Class handlerType = handler.getClass(); @@ -396,7 +459,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } final DefaultChannelHandlerContext newCtx = - new DefaultChannelHandlerContext(this, ctx.executor, newName, newHandler); + new DefaultChannelHandlerContext(this, ctx.invoker, newName, newHandler); if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { replace0(ctx, newName, newCtx); diff --git a/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java b/transport/src/main/java/io/netty/channel/DefaultEventLoop.java similarity index 56% rename from transport/src/main/java/io/netty/channel/local/LocalEventLoop.java rename to transport/src/main/java/io/netty/channel/DefaultEventLoop.java index b9aa21976c..53d2e3b12b 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java +++ b/transport/src/main/java/io/netty/channel/DefaultEventLoop.java @@ -13,15 +13,36 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.netty.channel.local; +package io.netty.channel; -import io.netty.channel.SingleThreadEventLoop; +import io.netty.util.concurrent.DefaultThreadFactory; import java.util.concurrent.Executor; +import java.util.concurrent.ThreadFactory; -final class LocalEventLoop extends SingleThreadEventLoop { +public class DefaultEventLoop extends SingleThreadEventLoop { - LocalEventLoop(LocalEventLoopGroup parent, Executor executor) { + public DefaultEventLoop() { + this((EventLoopGroup) null); + } + + public DefaultEventLoop(ThreadFactory threadFactory) { + this(null, threadFactory); + } + + public DefaultEventLoop(Executor executor) { + this(null, executor); + } + + public DefaultEventLoop(EventLoopGroup parent) { + this(parent, new DefaultThreadFactory(DefaultEventLoop.class)); + } + + public DefaultEventLoop(EventLoopGroup parent, ThreadFactory threadFactory) { + super(parent, threadFactory, true); + } + + public DefaultEventLoop(EventLoopGroup parent, Executor executor) { super(parent, executor, true); } diff --git a/transport/src/main/java/io/netty/channel/DefaultEventLoopGroup.java b/transport/src/main/java/io/netty/channel/DefaultEventLoopGroup.java new file mode 100644 index 0000000000..6e8ba13452 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/DefaultEventLoopGroup.java @@ -0,0 +1,56 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadFactory; + +/** + * {@link MultithreadEventLoopGroup} which must be used for the local transport. + */ +public class DefaultEventLoopGroup extends MultithreadEventLoopGroup { + + /** + * Create a new instance with the default number of threads. + */ + public DefaultEventLoopGroup() { + this(0); + } + + /** + * Create a new instance + * + * @param nThreads the number of threads to use + */ + public DefaultEventLoopGroup(int nThreads) { + this(nThreads, null); + } + + /** + * Create a new instance + * + * @param nThreads the number of threads to use + * @param threadFactory the {@link ThreadFactory} or {@code null} to use the default + */ + public DefaultEventLoopGroup(int nThreads, ThreadFactory threadFactory) { + super(nThreads, threadFactory); + } + + @Override + protected EventLoop newChild(Executor executor, Object... args) throws Exception { + return new DefaultEventLoop(this, executor); + } +} diff --git a/transport/src/main/java/io/netty/channel/EventLoop.java b/transport/src/main/java/io/netty/channel/EventLoop.java index 13a2a396f2..4566665ba3 100644 --- a/transport/src/main/java/io/netty/channel/EventLoop.java +++ b/transport/src/main/java/io/netty/channel/EventLoop.java @@ -1,5 +1,5 @@ /* - * Copyright 2012 The Netty Project + * Copyright 2013 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance @@ -13,6 +13,7 @@ * License for the specific language governing permissions and limitations * under the License. */ + package io.netty.channel; import io.netty.util.concurrent.EventExecutor; @@ -27,4 +28,13 @@ import io.netty.util.concurrent.EventExecutor; public interface EventLoop extends EventExecutor, EventLoopGroup { @Override EventLoopGroup parent(); + + @Override + EventLoop next(); + + /** + * Creates a new default {@link ChannelHandlerInvoker} implementation that uses this {@link EventLoop} to + * invoke event handler methods. + */ + ChannelHandlerInvoker asInvoker(); } diff --git a/transport/src/main/java/io/netty/channel/EventLoopGroup.java b/transport/src/main/java/io/netty/channel/EventLoopGroup.java index d32a43925f..e8fddf7780 100644 --- a/transport/src/main/java/io/netty/channel/EventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/EventLoopGroup.java @@ -1,5 +1,5 @@ /* - * Copyright 2012 The Netty Project + * Copyright 2013 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance @@ -13,6 +13,7 @@ * License for the specific language governing permissions and limitations * under the License. */ + package io.netty.channel; import io.netty.util.concurrent.EventExecutorGroup; @@ -20,12 +21,8 @@ import io.netty.util.concurrent.EventExecutorGroup; /** * Special {@link EventExecutorGroup} which allows to register {@link Channel}'s that get * processed for later selection during the event loop. - * */ public interface EventLoopGroup extends EventExecutorGroup { - /** - * Return the next {@link EventLoop} to use - */ @Override EventLoop next(); } diff --git a/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java b/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java index 29a42123d2..86306aa1c0 100644 --- a/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java @@ -16,6 +16,7 @@ package io.netty.channel; import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.MultithreadEventExecutorGroup; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; @@ -25,7 +26,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; /** - * Abstract base class for {@link EventLoopGroup} implementations that handles their tasks with multiple threads at + * Abstract base class for {@link EventExecutorGroup} implementations that handles their tasks with multiple threads at * the same time. */ public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup { @@ -66,4 +67,7 @@ public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutor public EventLoop next() { return (EventLoop) super.next(); } + + @Override + protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception; } diff --git a/transport/src/main/java/io/netty/channel/ServerChannel.java b/transport/src/main/java/io/netty/channel/ServerChannel.java index f19fc442b0..121e3d6eb5 100644 --- a/transport/src/main/java/io/netty/channel/ServerChannel.java +++ b/transport/src/main/java/io/netty/channel/ServerChannel.java @@ -22,6 +22,5 @@ import io.netty.channel.socket.ServerSocketChannel; * them. {@link ServerSocketChannel} is a good example. */ public interface ServerChannel extends Channel { - EventLoopGroup childEventLoopGroup(); } diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index 97a632b08a..9b8f41932d 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -15,28 +15,23 @@ */ package io.netty.channel; -import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.SingleThreadEventExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; /** - * Abstract base class for {@link EventLoop}'s that execute all its submitted tasks in a single thread. + * Abstract base class for {@link EventLoop}s that execute all its submitted tasks in a single thread. * */ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { - /** - * @see {@link SingleThreadEventExecutor#SingleThreadEventExecutor(EventExecutorGroup, ThreadFactory, boolean)} - */ + private final ChannelHandlerInvoker invoker = new DefaultChannelHandlerInvoker(this); + protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { super(parent, threadFactory, addTaskWakesUp); } - /** - * @see {@link SingleThreadEventExecutor#SingleThreadEventExecutor(EventExecutorGroup, Executor, boolean)} - */ protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) { super(parent, executor, addTaskWakesUp); } @@ -50,4 +45,9 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im public EventLoop next() { return (EventLoop) super.next(); } + + @Override + public ChannelHandlerInvoker asInvoker() { + return invoker; + } } diff --git a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java index 170ac4401e..cb5bc4a717 100644 --- a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java @@ -16,7 +16,6 @@ package io.netty.channel; -import io.netty.util.concurrent.AbstractEventExecutorGroup; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; @@ -26,10 +25,8 @@ import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.ThreadPerTaskExecutor; import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.PlatformDependent; -import io.netty.util.internal.ReadOnlyIterator; import java.util.Collections; -import java.util.Iterator; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; @@ -42,14 +39,14 @@ import java.util.concurrent.TimeUnit; /** * An {@link EventLoopGroup} that creates one {@link EventLoop} per {@link Channel}. */ -public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup { +public class ThreadPerChannelEventLoopGroup extends AbstractEventLoopGroup { private final Object[] childArgs; private final int maxChannels; final Executor executor; - final Set activeChildren = - Collections.newSetFromMap(PlatformDependent.newConcurrentHashMap()); - final Queue idleChildren = new ConcurrentLinkedQueue(); + final Set activeChildren = + Collections.newSetFromMap(PlatformDependent.newConcurrentHashMap()); + final Queue idleChildren = new ConcurrentLinkedQueue(); private final ChannelException tooManyChannels; private volatile boolean shuttingDown; @@ -76,9 +73,7 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i * * @param maxChannels the maximum number of channels to handle with this instance. Once you try to register * a new {@link Channel} and the maximum is exceed it will throw an - * {@link ChannelException} on the {@link #register(Channel)} and - * {@link #register(Channel, ChannelPromise)} method. - * Use {@code 0} to use no limit + * {@link ChannelException}. Use {@code 0} to use no limit */ protected ThreadPerChannelEventLoopGroup(int maxChannels) { this(maxChannels, Executors.defaultThreadFactory()); @@ -89,9 +84,7 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i * * @param maxChannels the maximum number of channels to handle with this instance. Once you try to register * a new {@link Channel} and the maximum is exceed it will throw an - * {@link ChannelException} on the {@link #register(Channel)} and - * {@link #register(Channel, ChannelPromise)} method. - * Use {@code 0} to use no limit + * {@link ChannelException}. Use {@code 0} to use no limit * @param threadFactory the {@link ThreadFactory} used to create new {@link Thread} instances that handle the * registered {@link Channel}s * @param args arguments which will passed to each {@link #newChild(Object...)} call. @@ -105,9 +98,7 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i * * @param maxChannels the maximum number of channels to handle with this instance. Once you try to register * a new {@link Channel} and the maximum is exceed it will throw an - * {@link ChannelException} on the {@link #register(Channel)} and - * {@link #register(Channel, ChannelPromise)} method. - * Use {@code 0} to use no limit + * {@link ChannelException}. Use {@code 0} to use no limit * @param executor the {@link Executor} used to create new {@link Thread} instances that handle the * registered {@link Channel}s * @param args arguments which will passed to each {@link #newChild(Object...)} call. @@ -135,15 +126,16 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i } /** - * Creates a new {@link EventLoop}. The default implementation creates a new {@link ThreadPerChannelEventLoop}. + * Creates a new {@link EventLoop}. */ - protected ThreadPerChannelEventLoop newChild(@SuppressWarnings("UnusedParameters") Object... args) { + protected EventLoop newChild(@SuppressWarnings("UnusedParameters") Object... args) { return new ThreadPerChannelEventLoop(this); } @Override - public Iterator iterator() { - return new ReadOnlyIterator(activeChildren.iterator()); + @SuppressWarnings("unchecked") + public Set children() { + return Collections.unmodifiableSet((Set) activeChildren); } @Override @@ -152,7 +144,7 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i throw new RejectedExecutionException("shutting down"); } - ThreadPerChannelEventLoop loop = idleChildren.poll(); + EventLoop loop = idleChildren.poll(); if (loop == null) { if (maxChannels > 0 && activeChildren.size() >= maxChannels) { throw tooManyChannels; diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java index 02046993d7..4c3b955441 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java @@ -235,7 +235,7 @@ public class EmbeddedChannel extends AbstractChannel { } /** - * Run all tasks that are pending in the {@link EventLoop} for this {@link Channel} + * Run all tasks that are pending in the {@link io.netty.channel.EventLoop} for this {@link Channel} */ public void runPendingTasks() { try { diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java index 952f1164e1..f23adf3c3a 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java @@ -15,22 +15,28 @@ */ package io.netty.channel.embedded; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; +import io.netty.channel.AbstractEventLoop; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelHandlerInvoker; import io.netty.channel.ChannelPromise; -import io.netty.channel.EventLoop; -import io.netty.channel.EventLoopGroup; -import io.netty.util.concurrent.AbstractEventExecutor; +import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; +import java.net.SocketAddress; import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.TimeUnit; -final class EmbeddedEventLoop extends AbstractEventExecutor implements EventLoop { +import static io.netty.channel.ChannelHandlerInvokerUtil.*; + +final class EmbeddedEventLoop extends AbstractEventLoop implements ChannelHandlerInvoker { private final Queue tasks = new ArrayDeque(2); + protected EmbeddedEventLoop() { + super(null); + } + @Override public void execute(Runnable command) { if (command == null) { @@ -82,9 +88,7 @@ final class EmbeddedEventLoop extends AbstractEventExecutor implements EventLoop } @Override - public boolean awaitTermination(long timeout, TimeUnit unit) - throws InterruptedException { - Thread.sleep(unit.toMillis(timeout)); + public boolean awaitTermination(long timeout, TimeUnit unit) { return false; } @@ -99,12 +103,94 @@ final class EmbeddedEventLoop extends AbstractEventExecutor implements EventLoop } @Override - public EventLoop next() { + public ChannelHandlerInvoker asInvoker() { return this; } @Override - public EventLoopGroup parent() { + public EventExecutor executor() { return this; } + + @Override + public void invokeChannelRegistered(ChannelHandlerContext ctx) { + invokeChannelRegisteredNow(ctx); + } + + @Override + public void invokeChannelActive(ChannelHandlerContext ctx) { + invokeChannelActiveNow(ctx); + } + + @Override + public void invokeChannelInactive(ChannelHandlerContext ctx) { + invokeChannelInactiveNow(ctx); + } + + @Override + public void invokeExceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + invokeExceptionCaughtNow(ctx, cause); + } + + @Override + public void invokeUserEventTriggered(ChannelHandlerContext ctx, Object event) { + invokeUserEventTriggeredNow(ctx, event); + } + + @Override + public void invokeChannelRead(ChannelHandlerContext ctx, Object msg) { + invokeChannelReadNow(ctx, msg); + } + + @Override + public void invokeChannelReadComplete(ChannelHandlerContext ctx) { + invokeChannelReadCompleteNow(ctx); + } + + @Override + public void invokeChannelWritabilityChanged(ChannelHandlerContext ctx) { + invokeChannelWritabilityChangedNow(ctx); + } + + @Override + public void invokeBind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) { + invokeBindNow(ctx, localAddress, promise); + } + + @Override + public void invokeConnect( + ChannelHandlerContext ctx, + SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + invokeConnectNow(ctx, remoteAddress, localAddress, promise); + } + + @Override + public void invokeDisconnect(ChannelHandlerContext ctx, ChannelPromise promise) { + invokeDisconnectNow(ctx, promise); + } + + @Override + public void invokeClose(ChannelHandlerContext ctx, ChannelPromise promise) { + invokeCloseNow(ctx, promise); + } + + @Override + public void invokeRead(ChannelHandlerContext ctx) { + invokeReadNow(ctx); + } + + @Override + public void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + invokeWriteNow(ctx, msg, promise); + } + + @Override + public void invokeWriteAndFlush(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + invokeWriteAndFlushNow(ctx, msg, promise); + } + + @Override + public void invokeFlush(ChannelHandlerContext ctx) { + invokeFlushNow(ctx); + } } diff --git a/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java b/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java index 2e0a56d413..2bd3ff611e 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java @@ -15,23 +15,20 @@ */ package io.netty.channel.local; -import io.netty.channel.MultithreadEventLoopGroup; -import io.netty.util.concurrent.EventExecutor; +import io.netty.channel.DefaultEventLoopGroup; -import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; /** - * {@link MultithreadEventLoopGroup} which must be used for the local transport. + * @deprecated Use {@link DefaultEventLoopGroup} instead. */ -public class LocalEventLoopGroup extends MultithreadEventLoopGroup { +@Deprecated +public class LocalEventLoopGroup extends DefaultEventLoopGroup { /** * Create a new instance with the default number of threads. */ - public LocalEventLoopGroup() { - this(0); - } + public LocalEventLoopGroup() { } /** * Create a new instance @@ -39,7 +36,7 @@ public class LocalEventLoopGroup extends MultithreadEventLoopGroup { * @param nThreads the number of threads to use */ public LocalEventLoopGroup(int nThreads) { - this(nThreads, null); + super(nThreads); } /** @@ -51,10 +48,4 @@ public class LocalEventLoopGroup extends MultithreadEventLoopGroup { public LocalEventLoopGroup(int nThreads, ThreadFactory threadFactory) { super(nThreads, threadFactory); } - - @Override - protected EventExecutor newChild( - Executor executor, Object... args) throws Exception { - return new LocalEventLoop(this, executor); - } } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java index ac31e89c46..b998658b1b 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java @@ -33,11 +33,8 @@ import java.util.List; */ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { - /** - * @see {@link AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)} - */ - protected AbstractNioMessageChannel(Channel parent, EventLoop eventLoop, SelectableChannel ch, - int readInterestOp) { + protected AbstractNioMessageChannel( + Channel parent, EventLoop eventLoop, SelectableChannel ch, int readInterestOp) { super(parent, eventLoop, ch, readInterestOp); } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageServerChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageServerChannel.java index 7a75308c67..3037501f12 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageServerChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageServerChannel.java @@ -26,8 +26,8 @@ public abstract class AbstractNioMessageServerChannel extends AbstractNioMessage private final EventLoopGroup childGroup; - protected AbstractNioMessageServerChannel(Channel parent, EventLoop eventLoop, EventLoopGroup childGroup, - SelectableChannel ch, int readInterestOp) { + protected AbstractNioMessageServerChannel( + Channel parent, EventLoop eventLoop, EventLoopGroup childGroup, SelectableChannel ch, int readInterestOp) { super(parent, eventLoop, ch, readInterestOp); this.childGroup = childGroup; } @@ -36,5 +36,4 @@ public abstract class AbstractNioMessageServerChannel extends AbstractNioMessage public EventLoopGroup childEventLoopGroup() { return childGroup; } - } diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java index 337697f71b..decd79ace7 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java @@ -43,7 +43,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; /** - * {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a + * {@link io.netty.channel.SingleThreadEventLoop} implementation which register the {@link Channel}'s to a * {@link Selector} and so does the multi-plexing of these in the event loop. * */ diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java index 805aca589b..584307816f 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java @@ -16,6 +16,7 @@ package io.netty.channel.nio; import io.netty.channel.Channel; +import io.netty.channel.EventLoop; import io.netty.channel.MultithreadEventLoopGroup; import io.netty.util.concurrent.EventExecutor; @@ -92,8 +93,7 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup { } @Override - protected EventExecutor newChild( - Executor executor, Object... args) throws Exception { + protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0]); } } diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java index a9b9fe2eaa..86f312fbb8 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java @@ -36,9 +36,6 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { private volatile boolean inputShutdown; private static final ChannelMetadata METADATA = new ChannelMetadata(false); - /** - * @see AbstractOioByteChannel#AbstractOioByteChannel(Channel) - */ protected AbstractOioByteChannel(Channel parent, EventLoop eventLoop) { super(parent, eventLoop); } diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java index d5b316e62b..08297d5a14 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java @@ -41,9 +41,6 @@ public abstract class AbstractOioChannel extends AbstractChannel { } }; - /** - * @see AbstractChannel#AbstractChannel(Channel) - */ protected AbstractOioChannel(Channel parent, EventLoop eventLoop) { super(parent, eventLoop); } diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java index 0686bda292..4898dff0a9 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java @@ -18,7 +18,6 @@ package io.netty.channel.oio; import io.netty.channel.Channel; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoop; -import io.netty.channel.EventLoopGroup; import java.io.IOException; import java.util.ArrayList; diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageServerChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageServerChannel.java index baae0460f6..87df2595b3 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageServerChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageServerChannel.java @@ -33,5 +33,4 @@ public abstract class AbstractOioMessageServerChannel extends AbstractOioMessage public EventLoopGroup childEventLoopGroup() { return childGroup; } - } diff --git a/transport/src/main/java/io/netty/channel/oio/OioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/oio/OioEventLoopGroup.java index e803f8b325..0e4c1fbc2a 100644 --- a/transport/src/main/java/io/netty/channel/oio/OioEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/oio/OioEventLoopGroup.java @@ -18,17 +18,16 @@ package io.netty.channel.oio; import io.netty.channel.Channel; import io.netty.channel.ChannelException; -import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; -import io.netty.channel.EventLoopGroup; import io.netty.channel.ThreadPerChannelEventLoopGroup; +import io.netty.util.concurrent.EventExecutorGroup; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; /** - * {@link EventLoopGroup} which is used to handle OIO {@link Channel}'s. Each {@link Channel} will be handled by its + * {@link EventExecutorGroup} which is used to handle OIO {@link Channel}'s. Each {@link Channel} will be handled by its * own {@link EventLoop} to not block others. */ public class OioEventLoopGroup extends ThreadPerChannelEventLoopGroup { @@ -45,9 +44,7 @@ public class OioEventLoopGroup extends ThreadPerChannelEventLoopGroup { * * @param maxChannels the maximum number of channels to handle with this instance. Once you try to register * a new {@link Channel} and the maximum is exceed it will throw an - * {@link ChannelException} on the {@link #register(Channel)} and - * {@link #register(Channel, ChannelPromise)} method. - * Use {@code 0} to use no limit + * {@link ChannelException}. Use {@code 0} to use no limit */ public OioEventLoopGroup(int maxChannels) { this(maxChannels, Executors.defaultThreadFactory()); @@ -58,9 +55,7 @@ public class OioEventLoopGroup extends ThreadPerChannelEventLoopGroup { * * @param maxChannels the maximum number of channels to handle with this instance. Once you try to register * a new {@link Channel} and the maximum is exceed it will throw an - * {@link ChannelException} on the {@link #register(Channel)} and - * {@link #register(Channel, ChannelPromise)} method. - * Use {@code 0} to use no limit + * {@link ChannelException}. Use {@code 0} to use no limit * @param executor the {@link Executor} used to create new {@link Thread} instances that handle the * registered {@link Channel}s */ @@ -73,9 +68,7 @@ public class OioEventLoopGroup extends ThreadPerChannelEventLoopGroup { * * @param maxChannels the maximum number of channels to handle with this instance. Once you try to register * a new {@link Channel} and the maximum is exceed it will throw an - * {@link ChannelException} on the {@link #register(Channel)} and - * {@link #register(Channel, ChannelPromise)} method. - * Use {@code 0} to use no limit + * {@link ChannelException}. Use {@code 0} to use no limit * @param threadFactory the {@link ThreadFactory} used to create new {@link Thread} instances that handle the * registered {@link Channel}s */ diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java index d007ec27a0..ce80e17d10 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java @@ -39,7 +39,7 @@ import java.util.List; * NIO selector based implementation to accept new connections. */ public class NioServerSocketChannel extends AbstractNioMessageServerChannel - implements io.netty.channel.socket.ServerSocketChannel { + implements io.netty.channel.socket.ServerSocketChannel { private static final ChannelMetadata METADATA = new ChannelMetadata(false); diff --git a/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java b/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java index efcadfb6a1..78ce3bbb20 100644 --- a/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java +++ b/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java @@ -19,10 +19,10 @@ package io.netty.bootstrap; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.EventLoopGroup; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; -import io.netty.channel.local.LocalEventLoopGroup; import io.netty.util.concurrent.Future; import org.junit.Test; @@ -33,8 +33,8 @@ public class BootstrapTest { @Test(timeout = 10000) public void testBindDeadLock() throws Exception { - EventLoopGroup groupA = new LocalEventLoopGroup(1); - EventLoopGroup groupB = new LocalEventLoopGroup(1); + EventLoopGroup groupA = new DefaultEventLoopGroup(1); + EventLoopGroup groupB = new DefaultEventLoopGroup(1); try { ChannelInboundHandler dummyHandler = new DummyHandler(); @@ -81,8 +81,8 @@ public class BootstrapTest { @Test(timeout = 10000) public void testConnectDeadLock() throws Exception { - EventLoopGroup groupA = new LocalEventLoopGroup(1); - EventLoopGroup groupB = new LocalEventLoopGroup(1); + EventLoopGroup groupA = new DefaultEventLoopGroup(1); + EventLoopGroup groupB = new DefaultEventLoopGroup(1); try { ChannelInboundHandler dummyHandler = new DummyHandler(); diff --git a/transport/src/test/java/io/netty/channel/BaseChannelTest.java b/transport/src/test/java/io/netty/channel/BaseChannelTest.java index b3aaf3d1f4..75bd1d4203 100644 --- a/transport/src/test/java/io/netty/channel/BaseChannelTest.java +++ b/transport/src/test/java/io/netty/channel/BaseChannelTest.java @@ -16,27 +16,27 @@ package io.netty.channel; -import static org.junit.Assert.assertEquals; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.local.LocalChannel; -import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.local.LocalServerChannel; import java.io.UnsupportedEncodingException; +import static org.junit.Assert.*; + class BaseChannelTest { private final LoggingHandler loggingHandler; BaseChannelTest() { - this.loggingHandler = new LoggingHandler(); + loggingHandler = new LoggingHandler(); } ServerBootstrap getLocalServerBootstrap() { - EventLoopGroup serverGroup = new LocalEventLoopGroup(); + EventLoopGroup serverGroup = new DefaultEventLoopGroup(); ServerBootstrap sb = new ServerBootstrap(); sb.group(serverGroup); sb.channel(LocalServerChannel.class); @@ -50,12 +50,12 @@ class BaseChannelTest { } Bootstrap getLocalClientBootstrap() { - EventLoopGroup clientGroup = new LocalEventLoopGroup(); + EventLoopGroup clientGroup = new DefaultEventLoopGroup(); Bootstrap cb = new Bootstrap(); cb.channel(LocalChannel.class); cb.group(clientGroup); - cb.handler(this.loggingHandler); + cb.handler(loggingHandler); return cb; } @@ -79,16 +79,15 @@ class BaseChannelTest { } void assertLog(String expected) { - String actual = this.loggingHandler.getLog(); + String actual = loggingHandler.getLog(); assertEquals(expected, actual); } void clearLog() { - this.loggingHandler.clear(); + loggingHandler.clear(); } void setInterest(LoggingHandler.Event... events) { - this.loggingHandler.setInterest(events); + loggingHandler.setInterest(events); } - } diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index 8586b578fe..6efee19e35 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -21,7 +21,6 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; -import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.local.LocalServerChannel; import io.netty.util.AbstractReferenceCounted; import io.netty.util.ReferenceCountUtil; @@ -30,7 +29,6 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Test; -import java.net.SocketAddress; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; @@ -44,7 +42,7 @@ import static org.junit.Assert.*; public class DefaultChannelPipelineTest { - private static final EventLoopGroup group = new LocalEventLoopGroup(1); + private static final EventLoopGroup group = new DefaultEventLoopGroup(1); private Channel self; private Channel peer; diff --git a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java index b69d1e6bbd..026a1da291 100644 --- a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java @@ -32,8 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.*; import static org.junit.Assert.*; public class SingleThreadEventLoopTest { @@ -131,7 +130,7 @@ public class SingleThreadEventLoopTest { testScheduleTask(loopB); } - private static void testScheduleTask(EventLoop loopA) throws InterruptedException, ExecutionException { + private static void testScheduleTask(EventExecutor loopA) throws InterruptedException, ExecutionException { long startTime = System.nanoTime(); final AtomicLong endTime = new AtomicLong(); loopA.schedule(new Runnable() { @@ -153,7 +152,7 @@ public class SingleThreadEventLoopTest { testScheduleTaskAtFixedRate(loopB); } - private static void testScheduleTaskAtFixedRate(EventLoop loopA) throws InterruptedException { + private static void testScheduleTaskAtFixedRate(EventExecutor loopA) throws InterruptedException { final Queue timestamps = new LinkedBlockingQueue(); ScheduledFuture f = loopA.scheduleAtFixedRate(new Runnable() { @Override @@ -193,7 +192,7 @@ public class SingleThreadEventLoopTest { testScheduleLaggyTaskAtFixedRate(loopB); } - private static void testScheduleLaggyTaskAtFixedRate(EventLoop loopA) throws InterruptedException { + private static void testScheduleLaggyTaskAtFixedRate(EventExecutor loopA) throws InterruptedException { final Queue timestamps = new LinkedBlockingQueue(); ScheduledFuture f = loopA.scheduleAtFixedRate(new Runnable() { @Override @@ -243,7 +242,7 @@ public class SingleThreadEventLoopTest { testScheduleTaskWithFixedDelay(loopB); } - private static void testScheduleTaskWithFixedDelay(EventLoop loopA) throws InterruptedException { + private static void testScheduleTaskWithFixedDelay(EventExecutor loopA) throws InterruptedException { final Queue timestamps = new LinkedBlockingQueue(); ScheduledFuture f = loopA.scheduleWithFixedDelay(new Runnable() { @Override diff --git a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java index a3cee35160..e2988322d1 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java @@ -22,6 +22,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; +import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.EventLoopGroup; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -42,8 +43,8 @@ public class LocalChannelTest { @Test public void testLocalAddressReuse() throws Exception { for (int i = 0; i < 2; i ++) { - EventLoopGroup clientGroup = new LocalEventLoopGroup(); - EventLoopGroup serverGroup = new LocalEventLoopGroup(); + EventLoopGroup clientGroup = new DefaultEventLoopGroup(); + EventLoopGroup serverGroup = new DefaultEventLoopGroup(); LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); Bootstrap cb = new Bootstrap(); ServerBootstrap sb = new ServerBootstrap(); @@ -96,8 +97,8 @@ public class LocalChannelTest { @Test public void testWriteFailsFastOnClosedChannel() throws Exception { - EventLoopGroup clientGroup = new LocalEventLoopGroup(); - EventLoopGroup serverGroup = new LocalEventLoopGroup(); + EventLoopGroup clientGroup = new DefaultEventLoopGroup(); + EventLoopGroup serverGroup = new DefaultEventLoopGroup(); LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); Bootstrap cb = new Bootstrap(); ServerBootstrap sb = new ServerBootstrap(); diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java index a600a4ee55..8222832423 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java @@ -24,11 +24,10 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.EventLoopGroup; import io.netty.util.ReferenceCountUtil; -import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.DefaultThreadFactory; -import io.netty.util.concurrent.EventExecutorGroup; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -49,7 +48,7 @@ public class LocalTransportThreadModelTest { @BeforeClass public static void init() { // Configure a test server - group = new LocalEventLoopGroup(); + group = new DefaultEventLoopGroup(); ServerBootstrap sb = new ServerBootstrap(); sb.group(group) .channel(LocalServerChannel.class) @@ -84,9 +83,9 @@ public class LocalTransportThreadModelTest { @Test(timeout = 5000) public void testStagedExecution() throws Throwable { - EventLoopGroup l = new LocalEventLoopGroup(4, new DefaultThreadFactory("l")); - EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1")); - EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2")); + EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l")); + EventLoopGroup e1 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e1")); + EventLoopGroup e2 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e2")); ThreadNameAuditor h1 = new ThreadNameAuditor(); ThreadNameAuditor h2 = new ThreadNameAuditor(); ThreadNameAuditor h3 = new ThreadNameAuditor(); @@ -229,12 +228,12 @@ public class LocalTransportThreadModelTest { @Test(timeout = 30000) @Ignore public void testConcurrentMessageBufferAccess() throws Throwable { - EventLoopGroup l = new LocalEventLoopGroup(4, new DefaultThreadFactory("l")); - EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1")); - EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2")); - EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e3")); - EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e4")); - EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e5")); + EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l")); + EventLoopGroup e1 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e1")); + EventLoopGroup e2 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e2")); + EventLoopGroup e3 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e3")); + EventLoopGroup e4 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e4")); + EventLoopGroup e5 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e5")); try { final MessageForwarder1 h1 = new MessageForwarder1(); diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java index 45218327aa..277ebb98ac 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java @@ -22,6 +22,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.DefaultEventLoopGroup; import io.netty.util.ReferenceCountUtil; import org.junit.Test; @@ -40,14 +41,14 @@ public class LocalTransportThreadModelTest2 { ServerBootstrap serverBootstrap = new ServerBootstrap(); LocalHander serverHandler = new LocalHander("SERVER"); serverBootstrap - .group(new LocalEventLoopGroup(), new LocalEventLoopGroup()) + .group(new DefaultEventLoopGroup(), new DefaultEventLoopGroup()) .channel(LocalServerChannel.class) .childHandler(serverHandler); Bootstrap clientBootstrap = new Bootstrap(); LocalHander clientHandler = new LocalHander("CLIENT"); clientBootstrap - .group(new LocalEventLoopGroup()) + .group(new DefaultEventLoopGroup()) .channel(LocalChannel.class) .remoteAddress(new LocalAddress(LOCAL_CHANNEL)).handler(clientHandler); diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java index 09380215a5..f914d9197c 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java @@ -23,11 +23,15 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.EventLoopGroup; import io.netty.util.ReferenceCountUtil; -import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.DefaultThreadFactory; -import io.netty.util.concurrent.EventExecutorGroup; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; import java.util.Deque; import java.util.LinkedList; @@ -36,12 +40,6 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedDeque; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; - public class LocalTransportThreadModelTest3 { enum EventType { @@ -62,7 +60,7 @@ public class LocalTransportThreadModelTest3 { @BeforeClass public static void init() { // Configure a test server - group = new LocalEventLoopGroup(); + group = new DefaultEventLoopGroup(); ServerBootstrap sb = new ServerBootstrap(); sb.group(group) .channel(LocalServerChannel.class) @@ -116,14 +114,14 @@ public class LocalTransportThreadModelTest3 { } private static void testConcurrentAddRemove(boolean inbound) throws Exception { - EventLoopGroup l = new LocalEventLoopGroup(4, new DefaultThreadFactory("l")); - EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1")); - EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2")); - EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e3")); - EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e4")); - EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e5")); + EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l")); + EventLoopGroup e1 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e1")); + EventLoopGroup e2 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e2")); + EventLoopGroup e3 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e3")); + EventLoopGroup e4 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e4")); + EventLoopGroup e5 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e5")); - final EventExecutorGroup[] groups = {e1, e2, e3, e4, e5}; + final EventLoopGroup[] groups = { e1, e2, e3, e4, e5 }; try { Deque events = new ConcurrentLinkedDeque(); final EventForwarder h1 = new EventForwarder();