diff --git a/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java b/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java index eb6338dde6..ed3efbcb6d 100644 --- a/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java +++ b/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java @@ -195,6 +195,14 @@ public class LoggingHandler extends ChannelHandlerAdapter { ctx.fireChannelRegistered(); } + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + if (logger.isEnabled(internalLevel)) { + logger.log(internalLevel, format(ctx, "UNREGISTERED")); + } + ctx.fireChannelUnregistered(); + } + @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { if (logger.isEnabled(internalLevel)) { diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java index 4bd719d690..740c69753d 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java @@ -21,7 +21,6 @@ import io.netty.bootstrap.ChannelFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.oio.OioEventLoopGroup; @@ -109,8 +108,8 @@ public class SocketTestPermutation { public Bootstrap newInstance() { return new Bootstrap().group(nioWorkerGroup).channelFactory(new ChannelFactory() { @Override - public Channel newChannel(EventLoop loop) { - return new NioDatagramChannel(loop, InternetProtocolFamily.IPv4); + public Channel newChannel() { + return new NioDatagramChannel(InternetProtocolFamily.IPv4); } @Override diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 6aa91ed038..35c8aecf00 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -32,12 +32,12 @@ abstract class AbstractEpollChannel extends AbstractChannel { volatile int fd; int id; - AbstractEpollChannel(EventLoop eventLoop, int fd, int flag) { - this(null, eventLoop, fd, flag, false); + AbstractEpollChannel(int fd, int flag) { + this(null, fd, flag, false); } - AbstractEpollChannel(Channel parent, EventLoop eventLoop, int fd, int flag, boolean active) { - super(parent, eventLoop); + AbstractEpollChannel(Channel parent, int fd, int flag, boolean active) { + super(parent); this.fd = fd; readFlag = flag; flags |= flag; diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index abe0daf550..d7cb9f1cb9 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -23,7 +23,6 @@ import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; -import io.netty.channel.EventLoop; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramChannelConfig; @@ -51,8 +50,8 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements private volatile boolean connected; private final EpollDatagramChannelConfig config; - public EpollDatagramChannel(EventLoop loop) { - super(loop, Native.socketDgramFd(), Native.EPOLLIN); + public EpollDatagramChannel() { + super(Native.socketDgramFd(), Native.EPOLLIN); config = new EpollDatagramChannelConfig(this); } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java index 40d5cc5d86..3ed89eca28 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java @@ -19,7 +19,6 @@ import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; -import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.ServerSocketChannel; import java.net.InetSocketAddress; @@ -32,13 +31,11 @@ import java.net.SocketAddress; public final class EpollServerSocketChannel extends AbstractEpollChannel implements ServerSocketChannel { private final EpollServerSocketChannelConfig config; - private final EventLoopGroup childGroup; private volatile InetSocketAddress local; - public EpollServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup) { - super(eventLoop, Native.socketStreamFd(), Native.EPOLLACCEPT); + public EpollServerSocketChannel() { + super(Native.socketStreamFd(), Native.EPOLLACCEPT); config = new EpollServerSocketChannelConfig(this); - this.childGroup = childGroup; } @Override @@ -81,11 +78,6 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme throw new UnsupportedOperationException(); } - @Override - public EventLoopGroup childEventLoopGroup() { - return childGroup; - } - final class EpollServerSocketUnsafe extends AbstractEpollUnsafe { @Override @@ -109,8 +101,7 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme } try { readPending = false; - pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this, - childEventLoopGroup().next(), socketFd)); + pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this, socketFd)); } catch (Throwable t) { // keep on reading as we use epoll ET and need to consume everything from the socket pipeline.fireChannelReadComplete(); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index d8faae629b..7ff6c2bac1 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -66,8 +66,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So private volatile boolean inputShutdown; private volatile boolean outputShutdown; - EpollSocketChannel(Channel parent, EventLoop eventLoop, int fd) { - super(parent, eventLoop, fd, Native.EPOLLIN, true); + EpollSocketChannel(Channel parent, int fd) { + super(parent, fd, Native.EPOLLIN, true); config = new EpollSocketChannelConfig(this); // Directly cache the remote and local addresses // See https://github.com/netty/netty/issues/2359 @@ -75,8 +75,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So local = Native.localAddress(fd); } - public EpollSocketChannel(EventLoop eventLoop) { - super(eventLoop, Native.socketStreamFd(), Native.EPOLLIN); + public EpollSocketChannel() { + super(Native.socketStreamFd(), Native.EPOLLIN); config = new EpollSocketChannelConfig(this); } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java index 6af029b517..6d56c6b891 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java @@ -19,7 +19,6 @@ import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ChannelFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; -import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.InternetProtocolFamily; import io.netty.channel.socket.nio.NioDatagramChannel; @@ -100,8 +99,8 @@ class EpollSocketTestPermutation extends SocketTestPermutation { public Bootstrap newInstance() { return new Bootstrap().group(nioWorkerGroup).channelFactory(new ChannelFactory() { @Override - public Channel newChannel(EventLoop loop) { - return new NioDatagramChannel(loop, InternetProtocolFamily.IPv4); + public Channel newChannel() { + return new NioDatagramChannel(InternetProtocolFamily.IPv4); } @Override diff --git a/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannel.java b/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannel.java index 5908b0338f..8df3d10cc8 100644 --- a/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannel.java +++ b/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannel.java @@ -19,7 +19,6 @@ import gnu.io.CommPort; import gnu.io.CommPortIdentifier; import gnu.io.SerialPort; import io.netty.channel.ChannelPromise; -import io.netty.channel.EventLoop; import io.netty.channel.oio.OioByteStreamChannel; import java.net.SocketAddress; @@ -40,8 +39,8 @@ public class RxtxChannel extends OioByteStreamChannel { private RxtxDeviceAddress deviceAddress; private SerialPort serialPort; - public RxtxChannel(EventLoop eventLoop) { - super(null, eventLoop); + public RxtxChannel() { + super(null); config = new DefaultRxtxChannelConfig(this); } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java index b6bdbb4e2a..714227e441 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java @@ -27,7 +27,6 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPromise; -import io.netty.channel.EventLoop; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.nio.AbstractNioMessageChannel; import io.netty.channel.sctp.DefaultSctpChannelConfig; @@ -82,15 +81,15 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett /** * Create a new instance */ - public NioSctpChannel(EventLoop eventLoop) { - this(eventLoop, newSctpChannel()); + public NioSctpChannel() { + this(newSctpChannel()); } /** * Create a new instance using {@link SctpChannel} */ - public NioSctpChannel(EventLoop eventLoop, SctpChannel sctpChannel) { - this(null, eventLoop, sctpChannel); + public NioSctpChannel(SctpChannel sctpChannel) { + this(null, sctpChannel); } /** @@ -100,8 +99,8 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett * or {@code null}. * @param sctpChannel the underlying {@link SctpChannel} */ - public NioSctpChannel(Channel parent, EventLoop eventLoop, SctpChannel sctpChannel) { - super(parent, eventLoop, sctpChannel, SelectionKey.OP_READ); + public NioSctpChannel(Channel parent, SctpChannel sctpChannel) { + super(parent, sctpChannel, SelectionKey.OP_READ); try { sctpChannel.configureBlocking(false); config = new NioSctpChannelConfig(this, sctpChannel); diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java index 64e1690424..5a617326f5 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java @@ -22,9 +22,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPromise; -import io.netty.channel.EventLoop; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.AbstractNioMessageServerChannel; +import io.netty.channel.nio.AbstractNioMessageChannel; import io.netty.channel.sctp.DefaultSctpServerChannelConfig; import io.netty.channel.sctp.SctpServerChannelConfig; @@ -46,9 +44,8 @@ import java.util.Set; * Be aware that not all operations systems support SCTP. Please refer to the documentation of your operation system, * to understand what you need to do to use it. Also this feature is only supported on Java 7+. */ -public class NioSctpServerChannel extends AbstractNioMessageServerChannel +public class NioSctpServerChannel extends AbstractNioMessageChannel implements io.netty.channel.sctp.SctpServerChannel { - private static final ChannelMetadata METADATA = new ChannelMetadata(false); private static SctpServerChannel newSocket() { @@ -65,9 +62,9 @@ public class NioSctpServerChannel extends AbstractNioMessageServerChannel /** * Create a new instance */ - public NioSctpServerChannel(EventLoop eventLoop, EventLoopGroup childGroup) { - super(null, eventLoop, childGroup, newSocket(), SelectionKey.OP_ACCEPT); - config = new NioSctpServerChannelConfig(this, javaChannel()); + public NioSctpServerChannel() { + super(null, newSocket(), SelectionKey.OP_ACCEPT); + config = new DefaultSctpServerChannelConfig(this, javaChannel()); } @Override @@ -143,7 +140,7 @@ public class NioSctpServerChannel extends AbstractNioMessageServerChannel if (ch == null) { return 0; } - buf.add(new NioSctpChannel(this, childEventLoopGroup().next(), ch)); + buf.add(new NioSctpChannel(this, ch)); return 1; } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java index 34c905e7d8..84dbe56612 100755 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java @@ -26,7 +26,6 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPromise; -import io.netty.channel.EventLoop; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.oio.AbstractOioMessageChannel; import io.netty.channel.sctp.DefaultSctpChannelConfig; @@ -88,8 +87,8 @@ public class OioSctpChannel extends AbstractOioMessageChannel /** * Create a new instance with an new {@link SctpChannel}. */ - public OioSctpChannel(EventLoop eventLoop) { - this(eventLoop, openChannel()); + public OioSctpChannel() { + this(openChannel()); } /** @@ -97,8 +96,8 @@ public class OioSctpChannel extends AbstractOioMessageChannel * * @param ch the {@link SctpChannel} which is used by this instance */ - public OioSctpChannel(EventLoop eventLoop, SctpChannel ch) { - this(null, eventLoop, ch); + public OioSctpChannel(SctpChannel ch) { + this(null, ch); } /** @@ -108,8 +107,8 @@ public class OioSctpChannel extends AbstractOioMessageChannel * {@link} has no parent as it was created by your self. * @param ch the {@link SctpChannel} which is used by this instance */ - public OioSctpChannel(Channel parent, EventLoop eventLoop, SctpChannel ch) { - super(parent, eventLoop); + public OioSctpChannel(Channel parent, SctpChannel ch) { + super(parent); this.ch = ch; boolean success = false; try { diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java index 685949c72e..37914d13be 100755 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java @@ -22,9 +22,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPromise; -import io.netty.channel.EventLoop; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.oio.AbstractOioMessageServerChannel; +import io.netty.channel.oio.AbstractOioMessageChannel; import io.netty.channel.sctp.DefaultSctpServerChannelConfig; import io.netty.channel.sctp.SctpServerChannelConfig; import io.netty.util.internal.logging.InternalLogger; @@ -49,7 +47,7 @@ import java.util.Set; * Be aware that not all operations systems support SCTP. Please refer to the documentation of your operation system, * to understand what you need to do to use it. Also this feature is only supported on Java 7+. */ -public class OioSctpServerChannel extends AbstractOioMessageServerChannel +public class OioSctpServerChannel extends AbstractOioMessageChannel implements io.netty.channel.sctp.SctpServerChannel { private static final InternalLogger logger = @@ -72,8 +70,8 @@ public class OioSctpServerChannel extends AbstractOioMessageServerChannel /** * Create a new instance with an new {@link SctpServerChannel} */ - public OioSctpServerChannel(EventLoop eventLoop, EventLoopGroup childGroup) { - this(eventLoop, childGroup, newServerSocket()); + public OioSctpServerChannel() { + this(newServerSocket()); } /** @@ -81,8 +79,8 @@ public class OioSctpServerChannel extends AbstractOioMessageServerChannel * * @param sch the {@link SctpServerChannel} which is used by this instance */ - public OioSctpServerChannel(EventLoop eventLoop, EventLoopGroup childGroup, SctpServerChannel sch) { - super(null, eventLoop, childGroup); + public OioSctpServerChannel(SctpServerChannel sch) { + super(null); if (sch == null) { throw new NullPointerException("sctp server channel"); } @@ -198,7 +196,7 @@ public class OioSctpServerChannel extends AbstractOioMessageServerChannel if (key.isAcceptable()) { s = sch.accept(); if (s != null) { - buf.add(new OioSctpChannel(this, childEventLoopGroup().next(), s)); + buf.add(new OioSctpChannel(this, s)); acceptedChannels ++; } } diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java index f6a3b9fd8f..b913a8a257 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java @@ -19,9 +19,7 @@ import com.barchart.udt.TypeUDT; import com.barchart.udt.nio.ServerSocketChannelUDT; import io.netty.channel.ChannelException; import io.netty.channel.ChannelOutboundBuffer; -import io.netty.channel.EventLoop; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.AbstractNioMessageServerChannel; +import io.netty.channel.nio.AbstractNioMessageChannel; import io.netty.channel.udt.DefaultUdtServerChannelConfig; import io.netty.channel.udt.UdtServerChannel; import io.netty.channel.udt.UdtServerChannelConfig; @@ -36,16 +34,15 @@ import static java.nio.channels.SelectionKey.*; /** * Common base for Netty Byte/Message UDT Stream/Datagram acceptors. */ -public abstract class NioUdtAcceptorChannel extends AbstractNioMessageServerChannel implements UdtServerChannel { +public abstract class NioUdtAcceptorChannel extends AbstractNioMessageChannel implements UdtServerChannel { protected static final InternalLogger logger = InternalLoggerFactory.getInstance(NioUdtAcceptorChannel.class); private final UdtServerChannelConfig config; - protected NioUdtAcceptorChannel(EventLoop eventLoop, EventLoopGroup childGroup, - ServerSocketChannelUDT channelUDT) { - super(null, eventLoop, childGroup, channelUDT, OP_ACCEPT); + protected NioUdtAcceptorChannel(final ServerSocketChannelUDT channelUDT) { + super(null, channelUDT, OP_ACCEPT); try { channelUDT.configureBlocking(false); config = new DefaultUdtServerChannelConfig(this, channelUDT, true); @@ -61,8 +58,8 @@ public abstract class NioUdtAcceptorChannel extends AbstractNioMessageServerChan } } - protected NioUdtAcceptorChannel(EventLoop eventLoop, EventLoopGroup childGroup, final TypeUDT type) { - this(eventLoop, childGroup, NioUdtProvider.newAcceptorChannelUDT(type)); + protected NioUdtAcceptorChannel(final TypeUDT type) { + this(NioUdtProvider.newAcceptorChannelUDT(type)); } @Override diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteAcceptorChannel.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteAcceptorChannel.java index 5e73b9c1ee..52d2e30093 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteAcceptorChannel.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteAcceptorChannel.java @@ -18,8 +18,6 @@ package io.netty.channel.udt.nio; import com.barchart.udt.TypeUDT; import com.barchart.udt.nio.SocketChannelUDT; import io.netty.channel.ChannelMetadata; -import io.netty.channel.EventLoop; -import io.netty.channel.EventLoopGroup; import java.util.List; @@ -30,8 +28,8 @@ public class NioUdtByteAcceptorChannel extends NioUdtAcceptorChannel { private static final ChannelMetadata METADATA = new ChannelMetadata(false); - public NioUdtByteAcceptorChannel(EventLoop eventLoop, EventLoopGroup childGroup) { - super(eventLoop, childGroup, TypeUDT.STREAM); + public NioUdtByteAcceptorChannel() { + super(TypeUDT.STREAM); } @Override @@ -40,7 +38,7 @@ public class NioUdtByteAcceptorChannel extends NioUdtAcceptorChannel { if (channelUDT == null) { return 0; } else { - buf.add(new NioUdtByteConnectorChannel(this, childEventLoopGroup().next(), channelUDT)); + buf.add(new NioUdtByteConnectorChannel(this, channelUDT)); return 1; } } diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteConnectorChannel.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteConnectorChannel.java index f56d204330..fe17d789b8 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteConnectorChannel.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteConnectorChannel.java @@ -21,7 +21,6 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelMetadata; -import io.netty.channel.EventLoop; import io.netty.channel.FileRegion; import io.netty.channel.nio.AbstractNioByteChannel; import io.netty.channel.udt.DefaultUdtChannelConfig; @@ -47,12 +46,12 @@ public class NioUdtByteConnectorChannel extends AbstractNioByteChannel implement private final UdtChannelConfig config; - public NioUdtByteConnectorChannel(EventLoop eventLoop) { - this(eventLoop, TypeUDT.STREAM); + public NioUdtByteConnectorChannel() { + this(TypeUDT.STREAM); } - public NioUdtByteConnectorChannel(Channel parent, EventLoop eventLoop, SocketChannelUDT channelUDT) { - super(parent, eventLoop, channelUDT); + public NioUdtByteConnectorChannel(final Channel parent, final SocketChannelUDT channelUDT) { + super(parent, channelUDT); try { channelUDT.configureBlocking(false); switch (channelUDT.socketUDT().status()) { @@ -76,12 +75,12 @@ public class NioUdtByteConnectorChannel extends AbstractNioByteChannel implement } } - public NioUdtByteConnectorChannel(EventLoop eventLoop, final SocketChannelUDT channelUDT) { - this(null, eventLoop, channelUDT); + public NioUdtByteConnectorChannel(final SocketChannelUDT channelUDT) { + this(null, channelUDT); } - public NioUdtByteConnectorChannel(EventLoop eventLoop, final TypeUDT type) { - this(eventLoop, NioUdtProvider.newConnectorChannelUDT(type)); + public NioUdtByteConnectorChannel(final TypeUDT type) { + this(NioUdtProvider.newConnectorChannelUDT(type)); } @Override diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteRendezvousChannel.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteRendezvousChannel.java index fada22e228..d0647dd9d3 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteRendezvousChannel.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteRendezvousChannel.java @@ -15,17 +15,15 @@ */ package io.netty.channel.udt.nio; - import com.barchart.udt.TypeUDT; -import io.netty.channel.EventLoop; /** * Byte Channel Rendezvous for UDT Streams. */ public class NioUdtByteRendezvousChannel extends NioUdtByteConnectorChannel { - public NioUdtByteRendezvousChannel(EventLoop eventLoop) { - super(eventLoop, NioUdtProvider.newRendezvousChannelUDT(TypeUDT.STREAM)); + public NioUdtByteRendezvousChannel() { + super(NioUdtProvider.newRendezvousChannelUDT(TypeUDT.STREAM)); } } diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageAcceptorChannel.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageAcceptorChannel.java index 25c7530c75..380afc5c5e 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageAcceptorChannel.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageAcceptorChannel.java @@ -18,8 +18,6 @@ package io.netty.channel.udt.nio; import com.barchart.udt.TypeUDT; import com.barchart.udt.nio.SocketChannelUDT; import io.netty.channel.ChannelMetadata; -import io.netty.channel.EventLoop; -import io.netty.channel.EventLoopGroup; import java.util.List; @@ -30,8 +28,8 @@ public class NioUdtMessageAcceptorChannel extends NioUdtAcceptorChannel { private static final ChannelMetadata METADATA = new ChannelMetadata(false); - public NioUdtMessageAcceptorChannel(EventLoop eventLoop, EventLoopGroup childGroup) { - super(eventLoop, childGroup, TypeUDT.DATAGRAM); + public NioUdtMessageAcceptorChannel() { + super(TypeUDT.DATAGRAM); } @Override @@ -40,7 +38,7 @@ public class NioUdtMessageAcceptorChannel extends NioUdtAcceptorChannel { if (channelUDT == null) { return 0; } else { - buf.add(new NioUdtMessageConnectorChannel(this, childEventLoopGroup().next(), channelUDT)); + buf.add(new NioUdtMessageConnectorChannel(this, channelUDT)); return 1; } } diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageConnectorChannel.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageConnectorChannel.java index 53868b3dae..55cd180e4a 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageConnectorChannel.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageConnectorChannel.java @@ -22,7 +22,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOutboundBuffer; -import io.netty.channel.EventLoop; import io.netty.channel.nio.AbstractNioMessageChannel; import io.netty.channel.udt.DefaultUdtChannelConfig; import io.netty.channel.udt.UdtChannel; @@ -51,12 +50,12 @@ public class NioUdtMessageConnectorChannel extends AbstractNioMessageChannel imp private final UdtChannelConfig config; - public NioUdtMessageConnectorChannel(EventLoop eventLoop) { - this(eventLoop, TypeUDT.DATAGRAM); + public NioUdtMessageConnectorChannel() { + this(TypeUDT.DATAGRAM); } - public NioUdtMessageConnectorChannel(final Channel parent, EventLoop eventLoop, final SocketChannelUDT channelUDT) { - super(parent, eventLoop, channelUDT, OP_READ); + public NioUdtMessageConnectorChannel(final Channel parent, final SocketChannelUDT channelUDT) { + super(parent, channelUDT, OP_READ); try { channelUDT.configureBlocking(false); switch (channelUDT.socketUDT().status()) { @@ -80,12 +79,12 @@ public class NioUdtMessageConnectorChannel extends AbstractNioMessageChannel imp } } - public NioUdtMessageConnectorChannel(EventLoop eventLoop, final SocketChannelUDT channelUDT) { - this(null, eventLoop, channelUDT); + public NioUdtMessageConnectorChannel(final SocketChannelUDT channelUDT) { + this(null, channelUDT); } - public NioUdtMessageConnectorChannel(EventLoop eventLoop, final TypeUDT type) { - this(eventLoop, NioUdtProvider.newConnectorChannelUDT(type)); + public NioUdtMessageConnectorChannel(final TypeUDT type) { + this(NioUdtProvider.newConnectorChannelUDT(type)); } @Override diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageRendezvousChannel.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageRendezvousChannel.java index 4bcf5420bf..893db2b7f6 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageRendezvousChannel.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageRendezvousChannel.java @@ -16,7 +16,6 @@ package io.netty.channel.udt.nio; import com.barchart.udt.TypeUDT; -import io.netty.channel.EventLoop; import io.netty.channel.udt.UdtMessage; /** @@ -24,9 +23,10 @@ import io.netty.channel.udt.UdtMessage; *

* Note: send/receive must use {@link UdtMessage} in the pipeline */ -public class NioUdtMessageRendezvousChannel extends NioUdtMessageConnectorChannel { +public class NioUdtMessageRendezvousChannel extends + NioUdtMessageConnectorChannel { - public NioUdtMessageRendezvousChannel(EventLoop eventLoop) { - super(eventLoop, NioUdtProvider.newRendezvousChannelUDT(TypeUDT.DATAGRAM)); + public NioUdtMessageRendezvousChannel() { + super(NioUdtProvider.newRendezvousChannelUDT(TypeUDT.DATAGRAM)); } } diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtProvider.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtProvider.java index f49b32fe27..22eca7555c 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtProvider.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtProvider.java @@ -24,13 +24,10 @@ import com.barchart.udt.nio.SelectorProviderUDT; import com.barchart.udt.nio.ServerSocketChannelUDT; import com.barchart.udt.nio.SocketChannelUDT; import io.netty.bootstrap.ChannelFactory; -import io.netty.bootstrap.ServerChannelFactory; import io.netty.channel.Channel; import io.netty.channel.ChannelException; -import io.netty.channel.EventLoop; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.udt.UdtChannel; import io.netty.channel.udt.UdtServerChannel; +import io.netty.channel.udt.UdtChannel; import java.io.IOException; import java.nio.channels.spi.SelectorProvider; @@ -42,21 +39,21 @@ import java.nio.channels.spi.SelectorProvider; *

* Provides {@link SelectorProvider} for UDT channels. */ -public abstract class NioUdtProvider { +public final class NioUdtProvider implements ChannelFactory { /** * {@link ChannelFactory} for UDT Byte Acceptor. See {@link TypeUDT#STREAM} * and {@link KindUDT#ACCEPTOR}. */ - public static final ServerChannelFactory BYTE_ACCEPTOR = - new NioUdtServerChannelFactory(TypeUDT.STREAM, KindUDT.ACCEPTOR); + public static final ChannelFactory BYTE_ACCEPTOR = new NioUdtProvider( + TypeUDT.STREAM, KindUDT.ACCEPTOR); /** * {@link ChannelFactory} for UDT Byte Connector. See {@link TypeUDT#STREAM} * and {@link KindUDT#CONNECTOR}. */ - public static final ChannelFactory BYTE_CONNECTOR = - new NioUdtChannelFactory(TypeUDT.STREAM, KindUDT.CONNECTOR); + public static final ChannelFactory BYTE_CONNECTOR = new NioUdtProvider( + TypeUDT.STREAM, KindUDT.CONNECTOR); /** * {@link SelectorProvider} for UDT Byte channels. See @@ -68,22 +65,22 @@ public abstract class NioUdtProvider { * {@link ChannelFactory} for UDT Byte Rendezvous. See * {@link TypeUDT#STREAM} and {@link KindUDT#RENDEZVOUS}. */ - public static final ChannelFactory BYTE_RENDEZVOUS = - new NioUdtChannelFactory(TypeUDT.STREAM, KindUDT.RENDEZVOUS); + public static final ChannelFactory BYTE_RENDEZVOUS = new NioUdtProvider( + TypeUDT.STREAM, KindUDT.RENDEZVOUS); /** * {@link ChannelFactory} for UDT Message Acceptor. See * {@link TypeUDT#DATAGRAM} and {@link KindUDT#ACCEPTOR}. */ - public static final ServerChannelFactory MESSAGE_ACCEPTOR = - new NioUdtServerChannelFactory(TypeUDT.DATAGRAM, KindUDT.ACCEPTOR); + public static final ChannelFactory MESSAGE_ACCEPTOR = new NioUdtProvider( + TypeUDT.DATAGRAM, KindUDT.ACCEPTOR); /** * {@link ChannelFactory} for UDT Message Connector. See * {@link TypeUDT#DATAGRAM} and {@link KindUDT#CONNECTOR}. */ - public static final ChannelFactory MESSAGE_CONNECTOR = - new NioUdtChannelFactory(TypeUDT.DATAGRAM, KindUDT.CONNECTOR); + public static final ChannelFactory MESSAGE_CONNECTOR = new NioUdtProvider( + TypeUDT.DATAGRAM, KindUDT.CONNECTOR); /** * {@link SelectorProvider} for UDT Message channels. See @@ -95,8 +92,8 @@ public abstract class NioUdtProvider { * {@link ChannelFactory} for UDT Message Rendezvous. See * {@link TypeUDT#DATAGRAM} and {@link KindUDT#RENDEZVOUS}. */ - public static final ChannelFactory MESSAGE_RENDEZVOUS = - new NioUdtChannelFactory(TypeUDT.DATAGRAM, KindUDT.RENDEZVOUS); + public static final ChannelFactory MESSAGE_RENDEZVOUS = new NioUdtProvider( + TypeUDT.DATAGRAM, KindUDT.RENDEZVOUS); /** * Expose underlying {@link ChannelUDT} for debugging and monitoring. @@ -131,7 +128,8 @@ public abstract class NioUdtProvider { /** * Convenience factory for {@link KindUDT#ACCEPTOR} channels. */ - protected static ServerSocketChannelUDT newAcceptorChannelUDT(final TypeUDT type) { + protected static ServerSocketChannelUDT newAcceptorChannelUDT( + final TypeUDT type) { try { return SelectorProviderUDT.from(type).openServerSocketChannel(); } catch (final IOException e) { @@ -153,7 +151,8 @@ public abstract class NioUdtProvider { /** * Convenience factory for {@link KindUDT#RENDEZVOUS} channels. */ - protected static RendezvousChannelUDT newRendezvousChannelUDT(final TypeUDT type) { + protected static RendezvousChannelUDT newRendezvousChannelUDT( + final TypeUDT type) { try { return SelectorProviderUDT.from(type).openRendezvousChannel(); } catch (final IOException e) { @@ -195,70 +194,42 @@ public abstract class NioUdtProvider { } /** - * Produce new {@link UdtChannel} based on factory {@link #kind()} and {@link #type()} + * Produce new {@link UdtChannel} based on factory {@link #kind()} and + * {@link #type()} */ - private static final class NioUdtChannelFactory - extends NioUdtProvider implements ChannelFactory { - - private NioUdtChannelFactory(final TypeUDT type, final KindUDT kind) { - super(type, kind); - } - - @SuppressWarnings("unchecked") - @Override - public T newChannel(EventLoop eventLoop) { - switch (kind()) { + @SuppressWarnings("unchecked") + @Override + public T newChannel() { + switch (kind) { case ACCEPTOR: - throw new IllegalStateException("wrong kind: " + kind()); - case CONNECTOR: - switch (type()) { - case DATAGRAM: - return (T) new NioUdtMessageConnectorChannel(eventLoop); - case STREAM: - return (T) new NioUdtByteConnectorChannel(eventLoop); - default: - throw new IllegalStateException("wrong type: " + type()); - } - case RENDEZVOUS: - switch (type()) { - case DATAGRAM: - return (T) new NioUdtMessageRendezvousChannel(eventLoop); - case STREAM: - return (T) new NioUdtByteRendezvousChannel(eventLoop); - default: - throw new IllegalStateException("wrong type: " + type()); - } - default: - throw new IllegalStateException("wrong kind: " + kind()); - } - } - } - - private static final class NioUdtServerChannelFactory extends NioUdtProvider - implements ServerChannelFactory { - - private NioUdtServerChannelFactory(final TypeUDT type, final KindUDT kind) { - super(type, kind); - } - - @Override - @SuppressWarnings("unchecked") - public T newChannel(EventLoop eventLoop, EventLoopGroup childGroup) { - switch (kind()) { - case ACCEPTOR: - switch (type()) { - case DATAGRAM: - return (T) new NioUdtMessageAcceptorChannel(eventLoop, childGroup); - case STREAM: - return (T) new NioUdtByteAcceptorChannel(eventLoop, childGroup); - default: - throw new IllegalStateException("wrong type: " + type()); + switch (type) { + case DATAGRAM: + return (T) new NioUdtMessageAcceptorChannel(); + case STREAM: + return (T) new NioUdtByteAcceptorChannel(); + default: + throw new IllegalStateException("wrong type=" + type); } case CONNECTOR: + switch (type) { + case DATAGRAM: + return (T) new NioUdtMessageConnectorChannel(); + case STREAM: + return (T) new NioUdtByteConnectorChannel(); + default: + throw new IllegalStateException("wrong type=" + type); + } case RENDEZVOUS: + switch (type) { + case DATAGRAM: + return (T) new NioUdtMessageRendezvousChannel(); + case STREAM: + return (T) new NioUdtByteRendezvousChannel(); + default: + throw new IllegalStateException("wrong type=" + type); + } default: - throw new IllegalStateException("wrong kind: " + kind()); - } + throw new IllegalStateException("wrong kind=" + kind); } } diff --git a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteAcceptorChannelTest.java b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteAcceptorChannelTest.java index 9b399661fc..260187c5d1 100644 --- a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteAcceptorChannelTest.java +++ b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteAcceptorChannelTest.java @@ -16,8 +16,6 @@ package io.netty.test.udt.nio; -import io.netty.channel.EventLoop; -import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.udt.nio.NioUdtByteAcceptorChannel; import org.junit.Test; @@ -30,7 +28,6 @@ public class NioUdtByteAcceptorChannelTest extends AbstractUdtTest { */ @Test public void metadata() throws Exception { - EventLoop loop = new NioEventLoopGroup().next(); - assertFalse(new NioUdtByteAcceptorChannel(loop, new NioEventLoopGroup()).metadata().hasDisconnect()); + assertEquals(false, new NioUdtByteAcceptorChannel().metadata().hasDisconnect()); } } diff --git a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteConnectorChannelTest.java b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteConnectorChannelTest.java index f87d9feece..6a75828307 100644 --- a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteConnectorChannelTest.java +++ b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteConnectorChannelTest.java @@ -16,8 +16,6 @@ package io.netty.test.udt.nio; -import io.netty.channel.EventLoop; -import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.udt.nio.NioUdtByteConnectorChannel; import org.junit.Test; @@ -30,7 +28,6 @@ public class NioUdtByteConnectorChannelTest extends AbstractUdtTest { */ @Test public void metadata() throws Exception { - EventLoop loop = new NioEventLoopGroup().next(); - assertFalse(new NioUdtByteConnectorChannel(loop).metadata().hasDisconnect()); + assertEquals(false, new NioUdtByteConnectorChannel().metadata().hasDisconnect()); } } diff --git a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteRendezvousChannelTest.java b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteRendezvousChannelTest.java index 347772e6f8..8275ef4629 100644 --- a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteRendezvousChannelTest.java +++ b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteRendezvousChannelTest.java @@ -20,7 +20,6 @@ import com.yammer.metrics.Metrics; import com.yammer.metrics.core.Meter; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; -import io.netty.channel.EventLoop; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.udt.nio.NioUdtByteRendezvousChannel; import io.netty.channel.udt.nio.NioUdtProvider; @@ -45,8 +44,7 @@ public class NioUdtByteRendezvousChannelTest extends AbstractUdtTest { */ @Test public void metadata() throws Exception { - EventLoop loop = new NioEventLoopGroup().next(); - assertFalse(new NioUdtByteRendezvousChannel(loop).metadata().hasDisconnect()); + assertFalse(new NioUdtByteRendezvousChannel().metadata().hasDisconnect()); } /** diff --git a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageAcceptorChannelTest.java b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageAcceptorChannelTest.java index 6216951bff..a3099a8be1 100644 --- a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageAcceptorChannelTest.java +++ b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageAcceptorChannelTest.java @@ -16,8 +16,6 @@ package io.netty.test.udt.nio; -import io.netty.channel.EventLoop; -import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.udt.nio.NioUdtMessageAcceptorChannel; import org.junit.Test; @@ -30,7 +28,6 @@ public class NioUdtMessageAcceptorChannelTest extends AbstractUdtTest { */ @Test public void metadata() throws Exception { - EventLoop loop = new NioEventLoopGroup().next(); - assertFalse(new NioUdtMessageAcceptorChannel(loop, new NioEventLoopGroup()).metadata().hasDisconnect()); + assertEquals(false, new NioUdtMessageAcceptorChannel().metadata().hasDisconnect()); } } diff --git a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageConnectorChannelTest.java b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageConnectorChannelTest.java index 9a6bd68ab8..8c2e2df588 100644 --- a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageConnectorChannelTest.java +++ b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageConnectorChannelTest.java @@ -16,8 +16,6 @@ package io.netty.test.udt.nio; -import io.netty.channel.EventLoop; -import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.udt.nio.NioUdtMessageConnectorChannel; import org.junit.Test; @@ -30,7 +28,6 @@ public class NioUdtMessageConnectorChannelTest extends AbstractUdtTest { */ @Test public void metadata() throws Exception { - EventLoop loop = new NioEventLoopGroup().next(); - assertFalse(new NioUdtMessageConnectorChannel(loop).metadata().hasDisconnect()); + assertEquals(false, new NioUdtMessageConnectorChannel().metadata().hasDisconnect()); } } diff --git a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageRendezvousChannelTest.java b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageRendezvousChannelTest.java index 34ffd92596..082517b7ab 100644 --- a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageRendezvousChannelTest.java +++ b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageRendezvousChannelTest.java @@ -20,7 +20,6 @@ import com.yammer.metrics.Metrics; import com.yammer.metrics.core.Meter; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; -import io.netty.channel.EventLoop; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.udt.nio.NioUdtMessageRendezvousChannel; import io.netty.channel.udt.nio.NioUdtProvider; @@ -45,8 +44,7 @@ public class NioUdtMessageRendezvousChannelTest extends AbstractUdtTest { */ @Test public void metadata() throws Exception { - EventLoop loop = new NioEventLoopGroup().next(); - assertFalse(new NioUdtMessageRendezvousChannel(loop).metadata().hasDisconnect()); + assertFalse(new NioUdtMessageRendezvousChannel().metadata().hasDisconnect()); } /** diff --git a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtProviderTest.java b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtProviderTest.java index 18b407e324..aae2f9e0a0 100644 --- a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtProviderTest.java +++ b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtProviderTest.java @@ -16,9 +16,6 @@ package io.netty.test.udt.nio; -import io.netty.channel.EventLoop; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.udt.UdtServerChannel; import io.netty.channel.udt.nio.NioUdtProvider; import org.junit.Test; @@ -32,22 +29,18 @@ public class NioUdtProviderTest extends AbstractUdtTest { */ @Test public void provideFactory() { - - EventLoop loop = new NioEventLoopGroup().next(); - EventLoopGroup childGroup = new NioEventLoopGroup(); - // bytes - assertNotNull(NioUdtProvider.BYTE_ACCEPTOR.newChannel(loop, childGroup)); - assertNotNull(NioUdtProvider.BYTE_CONNECTOR.newChannel(loop)); - assertNotNull(NioUdtProvider.BYTE_RENDEZVOUS.newChannel(loop)); + assertNotNull(NioUdtProvider.BYTE_ACCEPTOR.newChannel()); + assertNotNull(NioUdtProvider.BYTE_CONNECTOR.newChannel()); + assertNotNull(NioUdtProvider.BYTE_RENDEZVOUS.newChannel()); // message - assertNotNull(NioUdtProvider.MESSAGE_ACCEPTOR.newChannel(loop, childGroup)); - assertNotNull(NioUdtProvider.MESSAGE_CONNECTOR.newChannel(loop)); - assertNotNull(NioUdtProvider.MESSAGE_RENDEZVOUS.newChannel(loop)); + assertNotNull(NioUdtProvider.MESSAGE_ACCEPTOR.newChannel()); + assertNotNull(NioUdtProvider.MESSAGE_CONNECTOR.newChannel()); + assertNotNull(NioUdtProvider.MESSAGE_RENDEZVOUS.newChannel()); // acceptor types - assertTrue(NioUdtProvider.BYTE_ACCEPTOR.newChannel(loop, childGroup) instanceof UdtServerChannel); - assertTrue(NioUdtProvider.MESSAGE_ACCEPTOR.newChannel(loop, childGroup) instanceof UdtServerChannel); + assertTrue(NioUdtProvider.BYTE_ACCEPTOR.newChannel() instanceof UdtServerChannel); + assertTrue(NioUdtProvider.MESSAGE_ACCEPTOR.newChannel() instanceof UdtServerChannel); } } diff --git a/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java b/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java index 10fc05df27..233bac7c61 100644 --- a/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java @@ -17,6 +17,7 @@ package io.netty.bootstrap; import io.netty.channel.Channel; +import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; @@ -24,7 +25,6 @@ import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelPromise; import io.netty.channel.EventLoopGroup; -import io.netty.channel.VoidChannel; import io.netty.util.AttributeKey; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.GlobalEventExecutor; @@ -46,6 +46,7 @@ import java.util.Map; public abstract class AbstractBootstrap, C extends Channel> implements Cloneable { private volatile EventLoopGroup group; + private volatile ChannelFactory channelFactory; private volatile SocketAddress localAddress; private final Map, Object> options = new LinkedHashMap, Object>(); private final Map, Object> attrs = new LinkedHashMap, Object>(); @@ -57,6 +58,7 @@ public abstract class AbstractBootstrap, C ext AbstractBootstrap(AbstractBootstrap bootstrap) { group = bootstrap.group; + channelFactory = bootstrap.channelFactory; handler = bootstrap.handler; localAddress = bootstrap.localAddress; synchronized (bootstrap.options) { @@ -68,7 +70,8 @@ public abstract class AbstractBootstrap, C ext } /** - * The {@link EventLoopGroup} which is used to handle all the events for the to-be-created {@link Channel} + * The {@link EventLoopGroup} which is used to handle all the events for the to-be-creates + * {@link Channel} */ @SuppressWarnings("unchecked") public B group(EventLoopGroup group) { @@ -82,6 +85,38 @@ public abstract class AbstractBootstrap, C ext return (B) this; } + /** + * The {@link Class} which is used to create {@link Channel} instances from. + * You either use this or {@link #channelFactory(ChannelFactory)} if your + * {@link Channel} implementation has no no-args constructor. + */ + public B channel(Class channelClass) { + if (channelClass == null) { + throw new NullPointerException("channelClass"); + } + return channelFactory(new BootstrapChannelFactory(channelClass)); + } + + /** + * {@link ChannelFactory} which is used to create {@link Channel} instances from + * when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)} + * is not working for you because of some more complex needs. If your {@link Channel} implementation + * has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for + * simplify your code. + */ + @SuppressWarnings("unchecked") + public B channelFactory(ChannelFactory channelFactory) { + if (channelFactory == null) { + throw new NullPointerException("channelFactory"); + } + if (this.channelFactory != null) { + throw new IllegalStateException("channelFactory set already"); + } + + this.channelFactory = channelFactory; + return (B) this; + } + /** * The {@link SocketAddress} which is used to bind the local "end" to. * @@ -166,6 +201,9 @@ public abstract class AbstractBootstrap, C ext if (group == null) { throw new IllegalStateException("group not set"); } + if (channelFactory == null) { + throw new IllegalStateException("factory not set"); + } return (B) this; } @@ -255,16 +293,8 @@ public abstract class AbstractBootstrap, C ext return promise; } - abstract Channel createChannel(); - final ChannelFuture initAndRegister() { - Channel channel; - try { - channel = createChannel(); - } catch (Throwable t) { - return VoidChannel.INSTANCE.newFailedFuture(t); - } - + final Channel channel = channelFactory().newChannel(); try { init(channel); } catch (Throwable t) { @@ -272,8 +302,7 @@ public abstract class AbstractBootstrap, C ext return channel.newFailedFuture(t); } - ChannelPromise regFuture = channel.newPromise(); - channel.unsafe().register(regFuture); + ChannelFuture regFuture = group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); @@ -330,6 +359,10 @@ public abstract class AbstractBootstrap, C ext return localAddress; } + final ChannelFactory channelFactory() { + return channelFactory; + } + final ChannelHandler handler() { return handler; } @@ -359,6 +392,11 @@ public abstract class AbstractBootstrap, C ext buf.append(StringUtil.simpleClassName(group)); buf.append(", "); } + if (channelFactory != null) { + buf.append("channelFactory: "); + buf.append(channelFactory); + buf.append(", "); + } if (localAddress != null) { buf.append("localAddress: "); buf.append(localAddress); @@ -391,4 +429,26 @@ public abstract class AbstractBootstrap, C ext } return buf.toString(); } + + private static final class BootstrapChannelFactory implements ChannelFactory { + private final Class clazz; + + BootstrapChannelFactory(Class clazz) { + this.clazz = clazz; + } + + @Override + public T newChannel() { + try { + return clazz.newInstance(); + } catch (Throwable t) { + throw new ChannelException("Unable to create Channel from class " + clazz, t); + } + } + + @Override + public String toString() { + return clazz.getSimpleName() + ".class"; + } + } } diff --git a/transport/src/main/java/io/netty/bootstrap/Bootstrap.java b/transport/src/main/java/io/netty/bootstrap/Bootstrap.java index 0e9c03de91..687d717fc9 100644 --- a/transport/src/main/java/io/netty/bootstrap/Bootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/Bootstrap.java @@ -16,20 +16,15 @@ package io.netty.bootstrap; import io.netty.channel.Channel; -import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; -import io.netty.channel.EventLoop; -import io.netty.channel.ServerChannel; import io.netty.util.AttributeKey; -import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; -import java.lang.reflect.Constructor; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -47,58 +42,15 @@ public final class Bootstrap extends AbstractBootstrap { private static final InternalLogger logger = InternalLoggerFactory.getInstance(Bootstrap.class); - private volatile ChannelFactory channelFactory; - private volatile SocketAddress remoteAddress; public Bootstrap() { } private Bootstrap(Bootstrap bootstrap) { super(bootstrap); - channelFactory = bootstrap.channelFactory; remoteAddress = bootstrap.remoteAddress; } - /** - * The {@link Class} which is used to create {@link Channel} instances from. - * You either use this or {@link #channelFactory(ChannelFactory)} if your - * {@link Channel} implementation has no no-args constructor. - */ - public Bootstrap channel(Class channelClass) { - if (channelClass == null) { - throw new NullPointerException("channelClass"); - } - return channelFactory(new BootstrapChannelFactory(channelClass)); - } - - /** - * {@link ServerChannelFactory} which is used to create {@link ServerChannel} instances when calling - * {@link #bind()}. This method is usually only used if {@link #channel(Class)} is not working for you because of - * some more complex needs. If your {@link Channel} implementation has a no-args constructor, its highly recommend - * to just use {@link #channel(Class)} for simplify your code. - */ - public Bootstrap channelFactory(ChannelFactory channelFactory) { - if (channelFactory == null) { - throw new NullPointerException("channelFactory"); - } - if (this.channelFactory != null) { - throw new IllegalStateException("channelFactory set already"); - } - - this.channelFactory = channelFactory; - return this; - } - - ChannelFactory channelFactory() { - return channelFactory; - } - - @Override - Channel createChannel() { - EventLoop eventLoop = group().next(); - return channelFactory().newChannel(eventLoop); - } - /** * The {@link SocketAddress} to connect to once the {@link #connect()} method * is called. @@ -255,9 +207,6 @@ public final class Bootstrap extends AbstractBootstrap { if (handler() == null) { throw new IllegalStateException("handler not set"); } - if (channelFactory == null) { - throw new IllegalStateException("channel or channelFactory not set"); - } return this; } @@ -281,28 +230,4 @@ public final class Bootstrap extends AbstractBootstrap { return buf.toString(); } - - private static final class BootstrapChannelFactory implements ChannelFactory { - - private final Class clazz; - - BootstrapChannelFactory(Class clazz) { - this.clazz = clazz; - } - - @Override - public T newChannel(EventLoop eventLoop) { - try { - Constructor constructor = clazz.getConstructor(EventLoop.class); - return constructor.newInstance(eventLoop); - } catch (Throwable t) { - throw new ChannelException("Unable to create Channel from class " + clazz, t); - } - } - - @Override - public String toString() { - return StringUtil.simpleClassName(clazz) + ".class"; - } - } } diff --git a/transport/src/main/java/io/netty/bootstrap/ChannelFactory.java b/transport/src/main/java/io/netty/bootstrap/ChannelFactory.java index df332f886f..292eb6efa6 100644 --- a/transport/src/main/java/io/netty/bootstrap/ChannelFactory.java +++ b/transport/src/main/java/io/netty/bootstrap/ChannelFactory.java @@ -16,16 +16,14 @@ package io.netty.bootstrap; import io.netty.channel.Channel; -import io.netty.channel.EventLoop; /** * Factory that creates a new {@link Channel} on {@link Bootstrap#bind()}, {@link Bootstrap#connect()}, and * {@link ServerBootstrap#bind()}. */ public interface ChannelFactory { - /** * Creates a new channel. */ - T newChannel(EventLoop eventLoop); + T newChannel(); } diff --git a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java index e59fd9738f..2b410a4a57 100644 --- a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java @@ -17,24 +17,22 @@ package io.netty.bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; -import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; import io.netty.channel.socket.SocketChannel; import io.netty.util.AttributeKey; -import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; -import java.lang.reflect.Constructor; import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; @@ -48,7 +46,6 @@ public final class ServerBootstrap extends AbstractBootstrap channelFactory; private final Map, Object> childOptions = new LinkedHashMap, Object>(); private final Map, Object> childAttrs = new LinkedHashMap, Object>(); private volatile EventLoopGroup childGroup; @@ -58,7 +55,6 @@ public final class ServerBootstrap extends AbstractBootstrap channelClass) { - if (channelClass == null) { - throw new NullPointerException("channelClass"); - } - return channelFactory(new ServerBootstrapChannelFactory(channelClass)); - } - - /** - * {@link ChannelFactory} which is used to create {@link Channel} instances from - * when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)} - * is not working for you because of some more complex needs. If your {@link Channel} implementation - * has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for - * simplify your code. - */ - public ServerBootstrap channelFactory(ServerChannelFactory channelFactory) { - if (channelFactory == null) { - throw new NullPointerException("channelFactory"); - } - if (this.channelFactory != null) { - throw new IllegalStateException("channelFactory set already"); - } - - this.channelFactory = channelFactory; - return this; - } - - @Override - Channel createChannel() { - EventLoop eventLoop = group().next(); - return channelFactory().newChannel(eventLoop, childGroup); - } - - ServerChannelFactory channelFactory() { - return channelFactory; - } - /** * Specify the {@link EventLoopGroup} which is used for the parent (acceptor) and the child (client). */ @@ -119,8 +74,8 @@ public final class ServerBootstrap extends AbstractBootstrap, Object>[] currentChildOptions; final Entry, Object>[] currentChildAttrs; @@ -225,8 +181,8 @@ public final class ServerBootstrap extends AbstractBootstrap() { @Override public void initChannel(Channel ch) throws Exception { - ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions, - currentChildAttrs)); + ch.pipeline().addLast(new ServerBootstrapAcceptor( + currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } @@ -241,9 +197,6 @@ public final class ServerBootstrap extends AbstractBootstrap, Object>[] childOptions; private final Entry, Object>[] childAttrs; - ServerBootstrapAcceptor(ChannelHandler childHandler, Entry, Object>[] childOptions, - Entry, Object>[] childAttrs) { + @SuppressWarnings("unchecked") + ServerBootstrapAcceptor( + EventLoopGroup childGroup, ChannelHandler childHandler, + Entry, Object>[] childOptions, Entry, Object>[] childAttrs) { + this.childGroup = childGroup; this.childHandler = childHandler; this.childOptions = childOptions; this.childAttrs = childAttrs; @@ -273,7 +230,7 @@ public final class ServerBootstrap extends AbstractBootstrap) e.getKey()).set(e.getValue()); } - child.unsafe().register(child.newPromise()); + try { + childGroup.register(child).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + forceClose(child, future.cause()); + } + } + }); + } catch (Throwable t) { + forceClose(child, t); + } + } + + private static void forceClose(Channel child, Throwable t) { + child.unsafe().closeForcibly(); + logger.warn("Failed to register an accepted channel: " + child, t); } @Override @@ -304,7 +277,7 @@ public final class ServerBootstrap extends AbstractBootstrap - implements ServerChannelFactory { - - private final Class clazz; - - ServerBootstrapChannelFactory(Class clazz) { - this.clazz = clazz; - } - - @Override - public T newChannel(EventLoop eventLoop, EventLoopGroup childGroup) { - try { - Constructor constructor = clazz.getConstructor(EventLoop.class, EventLoopGroup.class); - return constructor.newInstance(eventLoop, childGroup); - } catch (Throwable t) { - throw new ChannelException("Unable to create Channel from class " + clazz, t); - } - } - - @Override - public String toString() { - return StringUtil.simpleClassName(clazz) + ".class"; - } - } } + diff --git a/transport/src/main/java/io/netty/bootstrap/ServerChannelFactory.java b/transport/src/main/java/io/netty/bootstrap/ServerChannelFactory.java deleted file mode 100644 index e92f128bcc..0000000000 --- a/transport/src/main/java/io/netty/bootstrap/ServerChannelFactory.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2013 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.bootstrap; - -import io.netty.channel.Channel; -import io.netty.channel.EventLoop; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.ServerChannel; - -/** - * Factory that creates a new {@link Channel} on {@link Bootstrap#bind()}, {@link Bootstrap#connect()}, and - * {@link ServerBootstrap#bind()}. - */ -public interface ServerChannelFactory { - - /** - * Creates a new channel. - */ - T newChannel(EventLoop eventLoop, EventLoopGroup childGroup); -} diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index b8b2db8413..e49a78a770 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -59,7 +59,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private volatile SocketAddress localAddress; private volatile SocketAddress remoteAddress; - private final EventLoop eventLoop; + private volatile EventLoop eventLoop; private volatile boolean registered; /** Cache for the string representation of this channel */ @@ -72,9 +72,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha * @param parent * the parent of this channel. {@code null} if there's no parent. */ - protected AbstractChannel(Channel parent, EventLoop eventLoop) { + protected AbstractChannel(Channel parent) { this.parent = parent; - this.eventLoop = validate(eventLoop); unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this); } @@ -107,6 +106,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public EventLoop eventLoop() { + EventLoop eventLoop = this.eventLoop; + if (eventLoop == null) { + throw new IllegalStateException("channel not registered to an event loop"); + } return eventLoop; } @@ -179,6 +182,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return pipeline.close(); } + @Override + public ChannelFuture deregister() { + return pipeline.deregister(); + } + @Override public Channel flush() { pipeline.flush(); @@ -210,6 +218,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return pipeline.close(promise); } + @Override + public ChannelFuture deregister(ChannelPromise promise) { + return pipeline.deregister(promise); + } + @Override public Channel read() { pipeline.read(); @@ -375,7 +388,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public final ChannelHandlerInvoker invoker() { - return eventLoop.asInvoker(); + return eventLoop().asInvoker(); } @Override @@ -394,7 +407,22 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } @Override - public final void register(final ChannelPromise promise) { + public final void register(EventLoop eventLoop, final ChannelPromise promise) { + if (eventLoop == null) { + throw new NullPointerException("eventLoop"); + } + if (isRegistered()) { + promise.setFailure(new IllegalStateException("registered to an event loop already")); + return; + } + if (!isCompatible(eventLoop)) { + promise.setFailure(new IllegalStateException("incompatible event loop type: " + + eventLoop.getClass().getName())); + return; + } + + AbstractChannel.this.eventLoop = eventLoop; + if (eventLoop.inEventLoop()) { register0(promise); } else { @@ -552,7 +580,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha }); } - deregister(); + deregister(voidPromise()); } } @@ -565,8 +593,14 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } - private void deregister() { + @Override + public final void deregister(final ChannelPromise promise) { + if (!promise.setUncancellable()) { + return; + } + if (!registered) { + safeSetSuccess(promise); return; } @@ -577,6 +611,18 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } finally { if (registered) { registered = false; + invokeLater(new OneTimeTask() { + @Override + public void run() { + pipeline.fireChannelUnregistered(); + } + }); + safeSetSuccess(promise); + } else { + // Some transports like local and AIO does not allow the deregistration of + // an open channel. Their doDeregister() calls close(). Consequently, + // close() calls deregister() again - no need to fire channelUnregistered. + safeSetSuccess(promise); } } } @@ -718,16 +764,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } - private EventLoop validate(EventLoop eventLoop) { - if (eventLoop == null) { - throw new IllegalStateException("null event loop"); - } - if (!isCompatible(eventLoop)) { - throw new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()); - } - return eventLoop; - } - /** * Create a new {@link ChannelOutboundBuffer} which holds the pending messages for this {@link AbstractChannel}. */ diff --git a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java index 5bde3d2454..7a2794a587 100644 --- a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java @@ -34,14 +34,11 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S private static final ChannelMetadata METADATA = new ChannelMetadata(false); - private final EventLoopGroup childGroup; - /** * Creates a new instance. */ - protected AbstractServerChannel(EventLoop eventLoop, EventLoopGroup childGroup) { - super(null, eventLoop); - this.childGroup = childGroup; + protected AbstractServerChannel() { + super(null); } @Override @@ -74,11 +71,6 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S throw new UnsupportedOperationException(); } - @Override - public EventLoopGroup childEventLoopGroup() { - return childGroup; - } - private final class DefaultServerUnsafe extends AbstractUnsafe { @Override public void write(Object msg, ChannelPromise promise) { diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index af8913ea60..0e8e7c3de7 100644 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -280,6 +280,19 @@ public interface Channel extends AttributeMap, Comparable { */ ChannelFuture close(); + /** + * Request to deregister this {@link Channel} from the previous assigned {@link EventLoop} and notify the + * {@link ChannelFuture} once the operation completes, either because the operation was successful or because of + * an error. + *

+ * This will result in having the + * {@link ChannelHandler#deregister(ChannelHandlerContext, ChannelPromise)} + * method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the + * {@link Channel}. + * + */ + ChannelFuture deregister(); + /** * Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation * completes, either because the operation was successful or because of an error. @@ -353,6 +366,20 @@ public interface Channel extends AttributeMap, Comparable { */ ChannelFuture close(ChannelPromise promise); + /** + * Request to deregister this {@link Channel} from the previous assigned {@link EventLoop} and notify the + * {@link ChannelFuture} once the operation completes, either because the operation was successful or because of + * an error. + * + * The given {@link ChannelPromise} will be notified. + *

+ * This will result in having the + * {@link ChannelHandler#deregister(ChannelHandlerContext, ChannelPromise)} + * method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the + * {@link Channel}. + */ + ChannelFuture deregister(ChannelPromise promise); + /** * Request to Read data from the {@link Channel} into the first inbound buffer, triggers an * {@link ChannelHandler#channelRead(ChannelHandlerContext, Object)} event if data was @@ -406,6 +433,7 @@ public interface Channel extends AttributeMap, Comparable { *

  • {@link #remoteAddress()}
  • *
  • {@link #closeForcibly()}
  • *
  • {@link #register(ChannelPromise)}
  • + *
  • {@link #deregister(ChannelPromise)}
  • *
  • {@link #voidPromise()}
  • * */ @@ -432,7 +460,7 @@ public interface Channel extends AttributeMap, Comparable { * Register the {@link Channel} of the {@link ChannelPromise} and notify * the {@link ChannelFuture} once the registration was complete. */ - void register(ChannelPromise promise); + void register(EventLoop eventLoop, ChannelPromise promise); /** * Bind the {@link SocketAddress} to the {@link Channel} of the {@link ChannelPromise} and notify @@ -467,6 +495,12 @@ public interface Channel extends AttributeMap, Comparable { */ void closeForcibly(); + /** + * Deregister the {@link Channel} of the {@link ChannelPromise} from {@link EventLoop} and notify the + * {@link ChannelPromise} once the operation was complete. + */ + void deregister(ChannelPromise promise); + /** * Schedules a read operation that fills the inbound buffer of the first {@link ChannelHandler} in the * {@link ChannelPipeline}. If there's already a pending read operation, this method does nothing. diff --git a/transport/src/main/java/io/netty/channel/ChannelHandler.java b/transport/src/main/java/io/netty/channel/ChannelHandler.java index 5e6e73d0c2..db7d9e24b8 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandler.java @@ -196,6 +196,11 @@ public interface ChannelHandler { */ void channelRegistered(ChannelHandlerContext ctx) throws Exception; + /** + * The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop} + */ + void channelUnregistered(ChannelHandlerContext ctx) throws Exception; + /** * The {@link Channel} of the {@link ChannelHandlerContext} is now active */ @@ -276,6 +281,15 @@ public interface ChannelHandler { */ void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; + /** + * Called once a deregister operation is made from the current registered {@link EventLoop}. + * + * @param ctx the {@link ChannelHandlerContext} for which the close operation is made + * @param promise the {@link ChannelPromise} to notify once the operation completes + * @throws Exception thrown if an error accour + */ + void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; + /** * Intercepts {@link ChannelHandlerContext#read()}. */ diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java index ff1bbc6cc4..0ebf2fb091 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java @@ -102,6 +102,18 @@ public class ChannelHandlerAdapter implements ChannelHandler { ctx.fireChannelRegistered(); } + /** + * Calls {@link ChannelHandlerContext#fireChannelRegistered()} to forward + * to the next {@link ChannelHandler} in the {@link ChannelPipeline}. + * + * Sub-classes may override this method to change behavior. + */ + @Skip + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + ctx.fireChannelUnregistered(); + } + /** * Calls {@link ChannelHandlerContext#fireChannelActive()} to forward * to the next {@link ChannelHandler} in the {@link ChannelPipeline}. @@ -224,6 +236,18 @@ public class ChannelHandlerAdapter implements ChannelHandler { ctx.close(promise); } + /** + * Calls {@link ChannelHandlerContext#deregister(ChannelPromise)} to forward + * to the next {@link ChannelHandler} in the {@link ChannelPipeline}. + * + * Sub-classes may override this method to change behavior. + */ + @Skip + @Override + public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + ctx.deregister(promise); + } + /** * Calls {@link ChannelHandlerContext#read()} to forward * to the next {@link ChannelHandler} in the {@link ChannelPipeline}. diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerAppender.java b/transport/src/main/java/io/netty/channel/ChannelHandlerAppender.java index 475eae6468..60338a5b78 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerAppender.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerAppender.java @@ -192,7 +192,10 @@ public class ChannelHandlerAppender extends ChannelHandlerAdapter { } else { name = e.name; } - pipeline.addAfter(ctx.invoker(), oldName, name, e.handler); + // Pass in direct the invoker to eliminate the possibility of an IllegalStateException + // if the Channel is not registered yet. + DefaultChannelHandlerContext context = (DefaultChannelHandlerContext) ctx; + pipeline.addAfter(context.invoker, oldName, name, e.handler); } } finally { if (selfRemoval) { diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java index 38fdb32505..2ce54c5d95 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java @@ -175,6 +175,15 @@ public interface ChannelHandlerContext extends AttributeMap { */ ChannelHandlerContext fireChannelRegistered(); + /** + * A {@link Channel} was unregistered from its {@link EventLoop}. + * + * This will result in having the {@link ChannelHandler#channelUnregistered(ChannelHandlerContext)} method + * called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the + * {@link Channel}. + */ + ChannelHandlerContext fireChannelUnregistered(); + /** * A {@link Channel} is active now, which means it is connected. * @@ -295,6 +304,19 @@ public interface ChannelHandlerContext extends AttributeMap { */ ChannelFuture close(); + /** + * Request to deregister from the previous assigned {@link EventExecutor} and notify the + * {@link ChannelFuture} once the operation completes, either because the operation was successful or because of + * an error. + *

    + * This will result in having the + * {@link ChannelHandler#deregister(ChannelHandlerContext, ChannelPromise)} + * method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the + * {@link Channel}. + * + */ + ChannelFuture deregister(); + /** * Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation * completes, either because the operation was successful or because of an error. @@ -368,6 +390,20 @@ public interface ChannelHandlerContext extends AttributeMap { */ ChannelFuture close(ChannelPromise promise); + /** + * Request to deregister from the previous assigned {@link EventExecutor} and notify the + * {@link ChannelFuture} once the operation completes, either because the operation was successful or because of + * an error. + * + * The given {@link ChannelPromise} will be notified. + *

    + * This will result in having the + * {@link ChannelHandler#deregister(ChannelHandlerContext, ChannelPromise)} + * method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the + * {@link Channel}. + */ + ChannelFuture deregister(ChannelPromise promise); + /** * Request to Read data from the {@link Channel} into the first inbound buffer, triggers an * {@link ChannelHandler#channelRead(ChannelHandlerContext, Object)} event if data was diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerInvoker.java b/transport/src/main/java/io/netty/channel/ChannelHandlerInvoker.java index 145e0eed78..ef0bfd7b60 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerInvoker.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerInvoker.java @@ -39,6 +39,13 @@ public interface ChannelHandlerInvoker { */ void invokeChannelRegistered(ChannelHandlerContext ctx); + /** + * Invokes {@link ChannelHandler#channelUnregistered(ChannelHandlerContext)}. This method is not for a user + * but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in + * {@link ChannelHandlerContext} instead. + */ + void invokeChannelUnregistered(ChannelHandlerContext ctx); + /** * Invokes {@link ChannelHandler#channelActive(ChannelHandlerContext)}. This method is not for a user * but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in @@ -118,6 +125,13 @@ public interface ChannelHandlerInvoker { */ void invokeClose(ChannelHandlerContext ctx, ChannelPromise promise); + /** + * Invokes {@link ChannelHandler#deregister(ChannelHandlerContext, ChannelPromise)}. + * This method is not for a user but for the internal {@link ChannelHandlerContext} implementation. + * To trigger an event, use the methods in {@link ChannelHandlerContext} instead. + */ + void invokeDeregister(ChannelHandlerContext ctx, ChannelPromise promise); + /** * Invokes {@link ChannelHandler#read(ChannelHandlerContext)}. * This method is not for a user but for the internal {@link ChannelHandlerContext} implementation. diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerInvokerUtil.java b/transport/src/main/java/io/netty/channel/ChannelHandlerInvokerUtil.java index 2c5e7f51c1..c847da4f52 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerInvokerUtil.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerInvokerUtil.java @@ -35,6 +35,14 @@ public final class ChannelHandlerInvokerUtil { } } + public static void invokeChannelUnregisteredNow(ChannelHandlerContext ctx) { + try { + ctx.handler().channelUnregistered(ctx); + } catch (Throwable t) { + notifyHandlerException(ctx, t); + } + } + public static void invokeChannelActiveNow(final ChannelHandlerContext ctx) { try { ctx.handler().channelActive(ctx); @@ -129,6 +137,14 @@ public final class ChannelHandlerInvokerUtil { } } + public static void invokeDeregisterNow(final ChannelHandlerContext ctx, final ChannelPromise promise) { + try { + ctx.handler().deregister(ctx, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } + public static void invokeReadNow(final ChannelHandlerContext ctx) { try { ctx.handler().read(ctx); diff --git a/transport/src/main/java/io/netty/channel/ChannelPipeline.java b/transport/src/main/java/io/netty/channel/ChannelPipeline.java index 579829c3b2..bca9ac03d5 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/ChannelPipeline.java @@ -17,6 +17,7 @@ package io.netty.channel; import io.netty.buffer.ByteBuf; import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorGroup; import java.net.ConnectException; @@ -661,6 +662,15 @@ public interface ChannelPipeline extends Iterable> */ ChannelPipeline fireChannelRegistered(); + /** + * A {@link Channel} was unregistered from its {@link EventLoop}. + * + * This will result in having the {@link ChannelHandler#channelUnregistered(ChannelHandlerContext)} method + * called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the + * {@link Channel}. + */ + ChannelPipeline fireChannelUnregistered(); + /** * A {@link Channel} is active now, which means it is connected. * @@ -781,6 +791,19 @@ public interface ChannelPipeline extends Iterable> */ ChannelFuture close(); + /** + * Request to deregister the {@link Channel} from the previous assigned {@link EventExecutor} and notify the + * {@link ChannelFuture} once the operation completes, either because the operation was successful or because of + * an error. + *

    + * This will result in having the + * {@link ChannelHandler#deregister(ChannelHandlerContext, ChannelPromise)} + * method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the + * {@link Channel}. + * + */ + ChannelFuture deregister(); + /** * Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation * completes, either because the operation was successful or because of an error. @@ -854,6 +877,20 @@ public interface ChannelPipeline extends Iterable> */ ChannelFuture close(ChannelPromise promise); + /** + * Request to deregister the {@link Channel} bound this {@link ChannelPipeline} from the previous assigned + * {@link EventExecutor} and notify the {@link ChannelFuture} once the operation completes, either because the + * operation was successful or because of an error. + * + * The given {@link ChannelPromise} will be notified. + *

    ChannelOutboundHandler + * This will result in having the + * {@link ChannelHandler#deregister(ChannelHandlerContext, ChannelPromise)} + * method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the + * {@link Channel}. + */ + ChannelFuture deregister(ChannelPromise promise); + /** * Request to Read data from the {@link Channel} into the first inbound buffer, triggers an * {@link ChannelHandler#channelRead(ChannelHandlerContext, Object)} event if data was diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index b7fa562a41..bdb459e95b 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -38,19 +38,21 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou static final int MASK_HANDLER_REMOVED = 1 << 1; private static final int MASK_EXCEPTION_CAUGHT = 1 << 2; private static final int MASK_CHANNEL_REGISTERED = 1 << 3; - private static final int MASK_CHANNEL_ACTIVE = 1 << 4; - private static final int MASK_CHANNEL_INACTIVE = 1 << 5; - private static final int MASK_CHANNEL_READ = 1 << 6; - private static final int MASK_CHANNEL_READ_COMPLETE = 1 << 7; - private static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8; - private static final int MASK_USER_EVENT_TRIGGERED = 1 << 9; - private static final int MASK_BIND = 1 << 10; - private static final int MASK_CONNECT = 1 << 11; - private static final int MASK_DISCONNECT = 1 << 12; - private static final int MASK_CLOSE = 1 << 13; - private static final int MASK_READ = 1 << 14; - private static final int MASK_WRITE = 1 << 15; - private static final int MASK_FLUSH = 1 << 16; + private static final int MASK_CHANNEL_UNREGISTERED = 1 << 4; + private static final int MASK_CHANNEL_ACTIVE = 1 << 5; + private static final int MASK_CHANNEL_INACTIVE = 1 << 6; + private static final int MASK_CHANNEL_READ = 1 << 7; + private static final int MASK_CHANNEL_READ_COMPLETE = 1 << 8; + private static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 9; + private static final int MASK_USER_EVENT_TRIGGERED = 1 << 10; + private static final int MASK_BIND = 1 << 11; + private static final int MASK_CONNECT = 1 << 12; + private static final int MASK_DISCONNECT = 1 << 13; + private static final int MASK_CLOSE = 1 << 14; + private static final int MASK_DEREGISTER = 1 << 15; + private static final int MASK_READ = 1 << 16; + private static final int MASK_WRITE = 1 << 17; + private static final int MASK_FLUSH = 1 << 18; /** * Cache the result of the costly generation of {@link #skipFlags} in the partitioned synchronized @@ -111,6 +113,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou "channelRegistered", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) { flags |= MASK_CHANNEL_REGISTERED; } + if (handlerType.getMethod( + "channelUnregistered", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) { + flags |= MASK_CHANNEL_UNREGISTERED; + } if (handlerType.getMethod( "channelActive", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) { flags |= MASK_CHANNEL_ACTIVE; @@ -153,6 +159,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou "close", ChannelHandlerContext.class, ChannelPromise.class).isAnnotationPresent(Skip.class)) { flags |= MASK_CLOSE; } + if (handlerType.getMethod( + "deregister", ChannelHandlerContext.class, ChannelPromise.class).isAnnotationPresent(Skip.class)) { + flags |= MASK_DEREGISTER; + } if (handlerType.getMethod( "read", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) { flags |= MASK_READ; @@ -215,14 +225,9 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou this.pipeline = pipeline; this.name = name; this.handler = handler; + this.invoker = invoker; skipFlags = skipFlags(handler); - - if (invoker == null) { - this.invoker = channel.unsafe().invoker(); - } else { - this.invoker = invoker; - } } /** Invocation initiated by {@link DefaultChannelPipeline#teardownAll()}}. */ @@ -267,11 +272,14 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou @Override public EventExecutor executor() { - return invoker.executor(); + return invoker().executor(); } @Override public ChannelHandlerInvoker invoker() { + if (invoker == null) { + return channel.unsafe().invoker(); + } return invoker; } @@ -293,35 +301,42 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou @Override public ChannelHandlerContext fireChannelRegistered() { DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_REGISTERED); - next.invoker.invokeChannelRegistered(next); + next.invoker().invokeChannelRegistered(next); + return this; + } + + @Override + public ChannelHandlerContext fireChannelUnregistered() { + DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_UNREGISTERED); + next.invoker().invokeChannelUnregistered(next); return this; } @Override public ChannelHandlerContext fireChannelActive() { DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_ACTIVE); - next.invoker.invokeChannelActive(next); + next.invoker().invokeChannelActive(next); return this; } @Override public ChannelHandlerContext fireChannelInactive() { DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_INACTIVE); - next.invoker.invokeChannelInactive(next); + next.invoker().invokeChannelInactive(next); return this; } @Override public ChannelHandlerContext fireExceptionCaught(Throwable cause) { DefaultChannelHandlerContext next = findContextInbound(MASK_EXCEPTION_CAUGHT); - next.invoker.invokeExceptionCaught(next, cause); + next.invoker().invokeExceptionCaught(next, cause); return this; } @Override public ChannelHandlerContext fireUserEventTriggered(Object event) { DefaultChannelHandlerContext next = findContextInbound(MASK_USER_EVENT_TRIGGERED); - next.invoker.invokeUserEventTriggered(next, event); + next.invoker().invokeUserEventTriggered(next, event); return this; } @@ -329,21 +344,21 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou public ChannelHandlerContext fireChannelRead(Object msg) { DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ); ReferenceCountUtil.touch(msg, next); - next.invoker.invokeChannelRead(next, msg); + next.invoker().invokeChannelRead(next, msg); return this; } @Override public ChannelHandlerContext fireChannelReadComplete() { DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ_COMPLETE); - next.invoker.invokeChannelReadComplete(next); + next.invoker().invokeChannelReadComplete(next); return this; } @Override public ChannelHandlerContext fireChannelWritabilityChanged() { DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED); - next.invoker.invokeChannelWritabilityChanged(next); + next.invoker().invokeChannelWritabilityChanged(next); return this; } @@ -372,10 +387,15 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou return close(newPromise()); } + @Override + public ChannelFuture deregister() { + return deregister(newPromise()); + } + @Override public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { DefaultChannelHandlerContext next = findContextOutbound(MASK_BIND); - next.invoker.invokeBind(next, localAddress, promise); + next.invoker().invokeBind(next, localAddress, promise); return promise; } @@ -387,7 +407,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou @Override public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { DefaultChannelHandlerContext next = findContextOutbound(MASK_CONNECT); - next.invoker.invokeConnect(next, remoteAddress, localAddress, promise); + next.invoker().invokeConnect(next, remoteAddress, localAddress, promise); return promise; } @@ -398,21 +418,28 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou } DefaultChannelHandlerContext next = findContextOutbound(MASK_DISCONNECT); - next.invoker.invokeDisconnect(next, promise); + next.invoker().invokeDisconnect(next, promise); return promise; } @Override public ChannelFuture close(ChannelPromise promise) { DefaultChannelHandlerContext next = findContextOutbound(MASK_CLOSE); - next.invoker.invokeClose(next, promise); + next.invoker().invokeClose(next, promise); + return promise; + } + + @Override + public ChannelFuture deregister(ChannelPromise promise) { + DefaultChannelHandlerContext next = findContextOutbound(MASK_DEREGISTER); + next.invoker().invokeDeregister(next, promise); return promise; } @Override public ChannelHandlerContext read() { DefaultChannelHandlerContext next = findContextOutbound(MASK_READ); - next.invoker.invokeRead(next); + next.invoker().invokeRead(next); return this; } @@ -425,14 +452,14 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou public ChannelFuture write(Object msg, ChannelPromise promise) { DefaultChannelHandlerContext next = findContextOutbound(MASK_WRITE); ReferenceCountUtil.touch(msg, next); - next.invoker.invokeWrite(next, msg, promise); + next.invoker().invokeWrite(next, msg, promise); return promise; } @Override public ChannelHandlerContext flush() { DefaultChannelHandlerContext next = findContextOutbound(MASK_FLUSH); - next.invoker.invokeFlush(next); + next.invoker().invokeFlush(next); return this; } @@ -441,9 +468,9 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou DefaultChannelHandlerContext next; next = findContextOutbound(MASK_WRITE); ReferenceCountUtil.touch(msg, next); - next.invoker.invokeWrite(next, msg, promise); + next.invoker().invokeWrite(next, msg, promise); next = findContextOutbound(MASK_FLUSH); - next.invoker.invokeFlush(next); + next.invoker().invokeFlush(next); return promise; } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerInvoker.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerInvoker.java index 71703bd7e2..5f976e6810 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerInvoker.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerInvoker.java @@ -57,6 +57,20 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker { } } + @Override + public void invokeChannelUnregistered(final ChannelHandlerContext ctx) { + if (executor.inEventLoop()) { + invokeChannelUnregisteredNow(ctx); + } else { + executor.execute(new OneTimeTask() { + @Override + public void run() { + invokeChannelUnregisteredNow(ctx); + } + }); + } + } + @Override public void invokeChannelActive(final ChannelHandlerContext ctx) { if (executor.inEventLoop()) { @@ -269,6 +283,25 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker { } } + @Override + public void invokeDeregister(final ChannelHandlerContext ctx, final ChannelPromise promise) { + if (!validatePromise(ctx, promise, false)) { + // promise cancelled + return; + } + + if (executor.inEventLoop()) { + invokeDeregisterNow(ctx, promise); + } else { + safeExecuteOutbound(new OneTimeTask() { + @Override + public void run() { + invokeDeregisterNow(ctx, promise); + } + }, promise); + } + } + @Override public void invokeRead(final ChannelHandlerContext ctx) { if (executor.inEventLoop()) { diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 5dc6e4a797..9ca3812a71 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -550,6 +550,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { try { ctx.handler().handlerAdded(ctx); } catch (Throwable t) { + t.printStackTrace(); boolean removed = false; try { remove(ctx); @@ -802,6 +803,17 @@ final class DefaultChannelPipeline implements ChannelPipeline { return this; } + @Override + public ChannelPipeline fireChannelUnregistered() { + head.fireChannelUnregistered(); + + // Remove all handlers sequentially if channel is closed and unregistered. + if (!channel.isOpen()) { + teardownAll(); + } + return this; + } + /** * Removes all handlers from the pipeline one by one from tail (exclusive) to head (inclusive) to trigger * handlerRemoved(). Note that the tail handler is excluded because it's neither an outbound handler nor it @@ -825,7 +837,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline fireChannelInactive() { head.fireChannelInactive(); - teardownAll(); return this; } @@ -887,6 +898,11 @@ final class DefaultChannelPipeline implements ChannelPipeline { return tail.close(); } + @Override + public ChannelFuture deregister() { + return tail.deregister(); + } + @Override public ChannelPipeline flush() { tail.flush(); @@ -918,6 +934,11 @@ final class DefaultChannelPipeline implements ChannelPipeline { return tail.close(promise); } + @Override + public ChannelFuture deregister(ChannelPromise promise) { + return tail.close(promise); + } + @Override public ChannelPipeline read() { tail.read(); @@ -983,6 +1004,9 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { } + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { } + @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @@ -1050,6 +1074,11 @@ final class DefaultChannelPipeline implements ChannelPipeline { unsafe.close(promise); } + @Override + public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + unsafe.deregister(promise); + } + @Override public void read(ChannelHandlerContext ctx) { unsafe.beginRead(); diff --git a/transport/src/main/java/io/netty/channel/EventLoopGroup.java b/transport/src/main/java/io/netty/channel/EventLoopGroup.java index e8fddf7780..669e61a175 100644 --- a/transport/src/main/java/io/netty/channel/EventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/EventLoopGroup.java @@ -25,4 +25,16 @@ import io.netty.util.concurrent.EventExecutorGroup; public interface EventLoopGroup extends EventExecutorGroup { @Override EventLoop next(); + + /** + * Register a {@link Channel} with this {@link EventLoop}. The returned {@link ChannelFuture} + * will get notified once the registration was complete. + */ + ChannelFuture register(Channel channel); + + /** + * Register a {@link Channel} with this {@link EventLoop}. The passed {@link ChannelFuture} + * will get notified once the registration was complete and also will get returned. + */ + ChannelFuture register(Channel channel, ChannelPromise promise); } diff --git a/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java b/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java index 86306aa1c0..29f5a9aa58 100644 --- a/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java @@ -69,5 +69,12 @@ public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutor } @Override - protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception; + public ChannelFuture register(Channel channel) { + return next().register(channel); + } + + @Override + public ChannelFuture register(Channel channel, ChannelPromise promise) { + return next().register(channel, promise); + } } diff --git a/transport/src/main/java/io/netty/channel/ServerChannel.java b/transport/src/main/java/io/netty/channel/ServerChannel.java index 121e3d6eb5..5e61a4992c 100644 --- a/transport/src/main/java/io/netty/channel/ServerChannel.java +++ b/transport/src/main/java/io/netty/channel/ServerChannel.java @@ -18,9 +18,8 @@ package io.netty.channel; import io.netty.channel.socket.ServerSocketChannel; /** - * A {@link Channel} that accepts an incoming connection attempt and creates its child {@link Channel}s by accepting - * them. {@link ServerSocketChannel} is a good example. + * A {@link Channel} that accepts an incoming connection attempt and creates + * its child {@link Channel}s by accepting them. {@link ServerSocketChannel} is + * a good example. */ -public interface ServerChannel extends Channel { - EventLoopGroup childEventLoopGroup(); -} +public interface ServerChannel extends Channel { } diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index 9d08aff420..81cc3dbaef 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -51,13 +51,31 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im return invoker; } + @Override + public ChannelFuture register(Channel channel) { + return register(channel, new DefaultChannelPromise(channel, this)); + } + + @Override + public ChannelFuture register(final Channel channel, final ChannelPromise promise) { + if (channel == null) { + throw new NullPointerException("channel"); + } + if (promise == null) { + throw new NullPointerException("promise"); + } + + channel.unsafe().register(this, promise); + return promise; + } + @Override protected boolean wakesUpForTask(Runnable task) { return !(task instanceof NonWakeupRunnable); } /** - * Marker interface for {@linkRunnable} that will not trigger an {@link #wakeup(boolean)} in all cases. + * Marker interface for {@link Runnable} that will not trigger an {@link #wakeup(boolean)} in all cases. */ interface NonWakeupRunnable extends Runnable { } } diff --git a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java index 26c90d88a8..78397665d5 100644 --- a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java +++ b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java @@ -30,6 +30,21 @@ public class ThreadPerChannelEventLoop extends SingleThreadEventLoop { this.parent = parent; } + @Override + public ChannelFuture register(Channel channel, ChannelPromise promise) { + return super.register(channel, promise).addListener(new ChannelFutureListener() { + @Override + @SuppressWarnings("unchecked") + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + ch = future.channel(); + } else { + deregister(); + } + } + }); + } + @Override protected void run() { for (;;) { diff --git a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java index cb5bc4a717..bc87063892 100644 --- a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java @@ -16,6 +16,7 @@ package io.netty.channel; +import io.netty.util.concurrent.AbstractEventExecutorGroup; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; @@ -39,13 +40,14 @@ import java.util.concurrent.TimeUnit; /** * An {@link EventLoopGroup} that creates one {@link EventLoop} per {@link Channel}. */ -public class ThreadPerChannelEventLoopGroup extends AbstractEventLoopGroup { +public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup { private final Object[] childArgs; private final int maxChannels; final Executor executor; final Set activeChildren = Collections.newSetFromMap(PlatformDependent.newConcurrentHashMap()); + private final Set readOnlyActiveChildren = Collections.unmodifiableSet(activeChildren); final Queue idleChildren = new ConcurrentLinkedQueue(); private final ChannelException tooManyChannels; @@ -73,7 +75,9 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventLoopGroup { * * @param maxChannels the maximum number of channels to handle with this instance. Once you try to register * a new {@link Channel} and the maximum is exceed it will throw an - * {@link ChannelException}. Use {@code 0} to use no limit + * {@link ChannelException}. on the {@link #register(Channel)} and + * {@link #register(Channel, ChannelPromise)} method. + * Use {@code 0} to use no limit */ protected ThreadPerChannelEventLoopGroup(int maxChannels) { this(maxChannels, Executors.defaultThreadFactory()); @@ -84,7 +88,9 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventLoopGroup { * * @param maxChannels the maximum number of channels to handle with this instance. Once you try to register * a new {@link Channel} and the maximum is exceed it will throw an - * {@link ChannelException}. Use {@code 0} to use no limit + * {@link ChannelException} on the {@link #register(Channel)} and + * {@link #register(Channel, ChannelPromise)} method. + * Use {@code 0} to use no limit * @param threadFactory the {@link ThreadFactory} used to create new {@link Thread} instances that handle the * registered {@link Channel}s * @param args arguments which will passed to each {@link #newChild(Object...)} call. @@ -98,7 +104,9 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventLoopGroup { * * @param maxChannels the maximum number of channels to handle with this instance. Once you try to register * a new {@link Channel} and the maximum is exceed it will throw an - * {@link ChannelException}. Use {@code 0} to use no limit + * {@link ChannelException} on the {@link #register(Channel)} and + * {@link #register(Channel, ChannelPromise)} method. + * Use {@code 0} to use no limit * @param executor the {@link Executor} used to create new {@link Thread} instances that handle the * registered {@link Channel}s * @param args arguments which will passed to each {@link #newChild(Object...)} call. @@ -126,34 +134,21 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventLoopGroup { } /** - * Creates a new {@link EventLoop}. + * Creates a new {@link EventLoop}. The default implementation creates a new {@link ThreadPerChannelEventLoop}. */ - protected EventLoop newChild(@SuppressWarnings("UnusedParameters") Object... args) { + protected EventLoop newChild(@SuppressWarnings("UnusedParameters") Object... args) throws Exception { return new ThreadPerChannelEventLoop(this); } @Override @SuppressWarnings("unchecked") public Set children() { - return Collections.unmodifiableSet((Set) activeChildren); + return (Set) readOnlyActiveChildren; } @Override public EventLoop next() { - if (shuttingDown) { - throw new RejectedExecutionException("shutting down"); - } - - EventLoop loop = idleChildren.poll(); - if (loop == null) { - if (maxChannels > 0 && activeChildren.size() >= maxChannels) { - throw tooManyChannels; - } - loop = newChild(childArgs); - loop.terminationFuture().addListener(childTerminationListener); - } - activeChildren.add(loop); - return loop; + throw new UnsupportedOperationException(); } @Override @@ -271,4 +266,47 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventLoopGroup { } return isTerminated(); } + + @Override + public ChannelFuture register(Channel channel) { + if (channel == null) { + throw new NullPointerException("channel"); + } + try { + EventLoop l = nextChild(); + return l.register(channel, new DefaultChannelPromise(channel, l)); + } catch (Throwable t) { + return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t); + } + } + + @Override + public ChannelFuture register(Channel channel, ChannelPromise promise) { + if (channel == null) { + throw new NullPointerException("channel"); + } + try { + return nextChild().register(channel, promise); + } catch (Throwable t) { + promise.setFailure(t); + return promise; + } + } + + private EventLoop nextChild() throws Exception { + if (shuttingDown) { + throw new RejectedExecutionException("shutting down"); + } + + EventLoop loop = idleChildren.poll(); + if (loop == null) { + if (maxChannels > 0 && activeChildren.size() >= maxChannels) { + throw tooManyChannels; + } + loop = newChild(childArgs); + loop.terminationFuture().addListener(childTerminationListener); + } + activeChildren.add(loop); + return loop; + } } diff --git a/transport/src/main/java/io/netty/channel/VoidChannel.java b/transport/src/main/java/io/netty/channel/VoidChannel.java deleted file mode 100644 index fee1b343a1..0000000000 --- a/transport/src/main/java/io/netty/channel/VoidChannel.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Copyright 2013 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.channel; - -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GlobalEventExecutor; -import io.netty.util.internal.StringUtil; - -import java.net.SocketAddress; -import java.util.concurrent.TimeUnit; - -/** - * A {@link Channel} that represents a non-existing {@link Channel} which could not be instantiated successfully. - */ -public final class VoidChannel extends AbstractChannel { - - public static final VoidChannel INSTANCE = new VoidChannel(); - - private VoidChannel() { - super(null, new AbstractEventLoop(null) { - private final ChannelHandlerInvoker invoker = - new DefaultChannelHandlerInvoker(GlobalEventExecutor.INSTANCE); - - @Override - @Deprecated - public void shutdown() { - GlobalEventExecutor.INSTANCE.shutdown(); - } - - @Override - public ChannelHandlerInvoker asInvoker() { - return invoker; - } - - @Override - public boolean inEventLoop(Thread thread) { - return GlobalEventExecutor.INSTANCE.inEventLoop(thread); - } - - @Override - public boolean isShuttingDown() { - return GlobalEventExecutor.INSTANCE.isShuttingDown(); - } - - @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { - return GlobalEventExecutor.INSTANCE.shutdownGracefully(quietPeriod, timeout, unit); - } - - @Override - public Future terminationFuture() { - return GlobalEventExecutor.INSTANCE.terminationFuture(); - } - - @Override - public boolean isShutdown() { - return GlobalEventExecutor.INSTANCE.isShutdown(); - } - - @Override - public boolean isTerminated() { - return GlobalEventExecutor.INSTANCE.isTerminated(); - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - return GlobalEventExecutor.INSTANCE.awaitTermination(timeout, unit); - } - - @Override - public void execute(Runnable command) { - GlobalEventExecutor.INSTANCE.execute(command); - } - }); - } - - @Override - protected AbstractUnsafe newUnsafe() { - return new AbstractUnsafe() { - @Override - public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { - reject(); - } - }; - } - - @Override - protected boolean isCompatible(EventLoop loop) { - return true; - } - - @Override - protected SocketAddress localAddress0() { - return reject(); - } - - @Override - protected SocketAddress remoteAddress0() { - return reject(); - } - - @Override - protected void doBind(SocketAddress localAddress) throws Exception { - reject(); - } - - @Override - protected void doDisconnect() throws Exception { - reject(); - } - - @Override - protected void doClose() throws Exception { - reject(); - } - - @Override - protected void doBeginRead() throws Exception { - reject(); - } - - @Override - protected void doWrite(ChannelOutboundBuffer in) throws Exception { - reject(); - } - - @Override - public ChannelConfig config() { - return reject(); - } - - @Override - public boolean isOpen() { - return reject(); - } - - @Override - public boolean isActive() { - return reject(); - } - - @Override - public ChannelMetadata metadata() { - return reject(); - } - - @Override - public String toString() { - return StringUtil.simpleClassName(this); - } - - private static T reject() { - throw new UnsupportedOperationException( - StringUtil.simpleClassName(VoidChannel.class) + - " is only for the representation of a non-existing " + - StringUtil.simpleClassName(Channel.class) + '.'); - } -} diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java index ab2160d939..3e353fa401 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java @@ -54,7 +54,7 @@ public class EmbeddedChannel extends AbstractChannel { private static final ChannelMetadata METADATA = new ChannelMetadata(false); - private final EmbeddedEventLoop loop; + private final EmbeddedEventLoop loop = new EmbeddedEventLoop(); private final ChannelConfig config = new DefaultChannelConfig(this); private final Queue inboundMessages = new ArrayDeque(); private final Queue outboundMessages = new ArrayDeque(); @@ -74,9 +74,7 @@ public class EmbeddedChannel extends AbstractChannel { * @param handlers the @link ChannelHandler}s which will be add in the {@link ChannelPipeline} */ public EmbeddedChannel(ChannelHandler... handlers) { - super(null, new EmbeddedEventLoop()); - - loop = (EmbeddedEventLoop) eventLoop(); + super(null); if (handlers == null) { throw new NullPointerException("handlers"); @@ -91,7 +89,7 @@ public class EmbeddedChannel extends AbstractChannel { } p.addLast(new LastInboundHandler()); - unsafe().register(newPromise()); + loop.register(this); } @Override diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java index 7c5211dc53..2a6ff2c83c 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java @@ -16,6 +16,8 @@ package io.netty.channel.embedded; import io.netty.channel.AbstractEventLoop; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerInvoker; import io.netty.channel.ChannelPromise; @@ -92,6 +94,17 @@ final class EmbeddedEventLoop extends AbstractEventLoop implements ChannelHandle return false; } + @Override + public ChannelFuture register(Channel channel) { + return register(channel, channel.newPromise()); + } + + @Override + public ChannelFuture register(Channel channel, ChannelPromise promise) { + channel.unsafe().register(this, promise); + return promise; + } + @Override public boolean inEventLoop() { return true; @@ -117,6 +130,11 @@ final class EmbeddedEventLoop extends AbstractEventLoop implements ChannelHandle invokeChannelRegisteredNow(ctx); } + @Override + public void invokeChannelUnregistered(ChannelHandlerContext ctx) { + invokeChannelUnregisteredNow(ctx); + } + @Override public void invokeChannelActive(ChannelHandlerContext ctx) { invokeChannelActiveNow(ctx); @@ -174,6 +192,11 @@ final class EmbeddedEventLoop extends AbstractEventLoop implements ChannelHandle invokeCloseNow(ctx, promise); } + @Override + public void invokeDeregister(ChannelHandlerContext ctx, ChannelPromise promise) { + invokeDeregisterNow(ctx, promise); + } + @Override public void invokeRead(ChannelHandlerContext ctx) { invokeReadNow(ctx); diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index 89112577a3..ccf44bd9b4 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -87,12 +87,12 @@ public class LocalChannel extends AbstractChannel { private volatile boolean readInProgress; private volatile boolean registerInProgress; - public LocalChannel(EventLoop eventLoop) { - super(null, eventLoop); + public LocalChannel() { + super(null); } - LocalChannel(LocalServerChannel parent, EventLoop eventLoop, LocalChannel peer) { - super(parent, eventLoop); + LocalChannel(LocalServerChannel parent, LocalChannel peer) { + super(parent); this.peer = peer; localAddress = parent.localAddress(); remoteAddress = peer.localAddress(); diff --git a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java index 27f5cc40c9..73647c8ac2 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java @@ -20,7 +20,6 @@ import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelPipeline; import io.netty.channel.DefaultChannelConfig; import io.netty.channel.EventLoop; -import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; import io.netty.channel.SingleThreadEventLoop; import io.netty.util.concurrent.SingleThreadEventExecutor; @@ -47,10 +46,6 @@ public class LocalServerChannel extends AbstractServerChannel { private volatile LocalAddress localAddress; private volatile boolean acceptInProgress; - public LocalServerChannel(EventLoop eventLoop, EventLoopGroup childGroup) { - super(eventLoop, childGroup); - } - @Override public ChannelConfig config() { return config; @@ -136,7 +131,7 @@ public class LocalServerChannel extends AbstractServerChannel { } LocalChannel serve(final LocalChannel peer) { - final LocalChannel child = new LocalChannel(this, childEventLoopGroup().next(), peer); + final LocalChannel child = new LocalChannel(this, peer); if (eventLoop().inEventLoop()) { serve0(child); } else { diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index da5effa18d..18ed477afe 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -22,7 +22,6 @@ import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoop; import io.netty.channel.FileRegion; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.socket.ChannelInputShutdownEvent; @@ -44,8 +43,8 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { * @param parent the parent {@link Channel} by which this instance was created. May be {@code null} * @param ch the underlying {@link SelectableChannel} on which it operates */ - protected AbstractNioByteChannel(Channel parent, EventLoop eventLoop, SelectableChannel ch) { - super(parent, eventLoop, ch, SelectionKey.OP_READ); + protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { + super(parent, ch, SelectionKey.OP_READ); } @Override diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java index 43f975c6fc..c71467ce9e 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java @@ -65,8 +65,8 @@ public abstract class AbstractNioChannel extends AbstractChannel { * @param ch the underlying {@link SelectableChannel} on which it operates * @param readInterestOp the ops to set to receive data from the {@link SelectableChannel} */ - protected AbstractNioChannel(Channel parent, EventLoop eventLoop, SelectableChannel ch, int readInterestOp) { - super(parent, eventLoop); + protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { + super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java index 7e533a5ace..c9e537431a 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java @@ -19,7 +19,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoop; import io.netty.channel.ServerChannel; import java.io.IOException; @@ -33,9 +32,11 @@ import java.util.List; */ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { - protected AbstractNioMessageChannel( - Channel parent, EventLoop eventLoop, SelectableChannel ch, int readInterestOp) { - super(parent, eventLoop, ch, readInterestOp); + /** + * @see {@link AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)} + */ + protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { + super(parent, ch, readInterestOp); } @Override @@ -169,5 +170,4 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { * @return {@code true} if and only if the message has been written */ protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception; - } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageServerChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageServerChannel.java deleted file mode 100644 index 3037501f12..0000000000 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageServerChannel.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.nio; - -import io.netty.channel.Channel; -import io.netty.channel.EventLoop; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.ServerChannel; - -import java.nio.channels.SelectableChannel; - -public abstract class AbstractNioMessageServerChannel extends AbstractNioMessageChannel implements ServerChannel { - - private final EventLoopGroup childGroup; - - protected AbstractNioMessageServerChannel( - Channel parent, EventLoop eventLoop, EventLoopGroup childGroup, SelectableChannel ch, int readInterestOp) { - super(parent, eventLoop, ch, readInterestOp); - this.childGroup = childGroup; - } - - @Override - public EventLoopGroup childEventLoopGroup() { - return childGroup; - } -} diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java index 6fe62add0b..e4fd801c4a 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java @@ -22,7 +22,6 @@ import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoop; import io.netty.channel.FileRegion; import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.util.internal.StringUtil; @@ -37,8 +36,11 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { private volatile boolean inputShutdown; private static final ChannelMetadata METADATA = new ChannelMetadata(false); - protected AbstractOioByteChannel(Channel parent, EventLoop eventLoop) { - super(parent, eventLoop); + /** + * @see AbstractOioByteChannel#AbstractOioByteChannel(Channel) + */ + protected AbstractOioByteChannel(Channel parent) { + super(parent); } protected boolean isInputShutdown() { diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java index 541fc8790d..bd163b2799 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java @@ -46,8 +46,11 @@ public abstract class AbstractOioChannel extends AbstractChannel { } }; - protected AbstractOioChannel(Channel parent, EventLoop eventLoop) { - super(parent, eventLoop); + /** + * @see AbstractChannel#AbstractChannel(Channel) + */ + protected AbstractOioChannel(Channel parent) { + super(parent); } @Override diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java index fe61ede482..60b70c09f9 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java @@ -18,8 +18,6 @@ package io.netty.channel.oio; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoop; - import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -31,8 +29,8 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel { private final List readBuf = new ArrayList(); - protected AbstractOioMessageChannel(Channel parent, EventLoop eventLoop) { - super(parent, eventLoop); + protected AbstractOioMessageChannel(Channel parent) { + super(parent); } @Override diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageServerChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageServerChannel.java deleted file mode 100644 index 87df2595b3..0000000000 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageServerChannel.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.oio; - -import io.netty.channel.Channel; -import io.netty.channel.EventLoop; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.ServerChannel; - -public abstract class AbstractOioMessageServerChannel extends AbstractOioMessageChannel implements ServerChannel { - - private final EventLoopGroup childGroup; - - protected AbstractOioMessageServerChannel(Channel parent, EventLoop eventLoop, EventLoopGroup childGroup) { - super(parent, eventLoop); - this.childGroup = childGroup; - } - - @Override - public EventLoopGroup childEventLoopGroup() { - return childGroup; - } -} diff --git a/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java b/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java index 74fbec53c3..97f9c8171e 100644 --- a/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java @@ -17,7 +17,6 @@ package io.netty.channel.oio; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; -import io.netty.channel.EventLoop; import io.netty.channel.FileRegion; import java.io.IOException; @@ -57,8 +56,8 @@ public abstract class OioByteStreamChannel extends AbstractOioByteChannel { * @param parent the parent {@link Channel} which was used to create this instance. This can be null if the * {@link} has no parent as it was created by your self. */ - protected OioByteStreamChannel(Channel parent, EventLoop eventLoop) { - super(parent, eventLoop); + protected OioByteStreamChannel(Channel parent) { + super(parent); } /** diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java index d359778f5d..d78328e7f7 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java @@ -25,7 +25,6 @@ import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPromise; -import io.netty.channel.EventLoop; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.nio.AbstractNioMessageChannel; import io.netty.channel.socket.DatagramChannelConfig; @@ -106,40 +105,27 @@ public final class NioDatagramChannel /** * Create a new instance which will use the Operation Systems default {@link InternetProtocolFamily}. */ - public NioDatagramChannel(EventLoop eventLoop) { - this(eventLoop, newSocket(DEFAULT_SELECTOR_PROVIDER)); - } - - /** - * Create a new instance using the given {@link SelectorProvider} - * which will use the Operation Systems default {@link InternetProtocolFamily}. - */ - public NioDatagramChannel(EventLoop eventLoop, SelectorProvider provider) { - this(eventLoop, newSocket(provider)); + public NioDatagramChannel() { + this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } /** * Create a new instance using the given {@link InternetProtocolFamily}. If {@code null} is used it will depend * on the Operation Systems default which will be chosen. */ - public NioDatagramChannel(EventLoop eventLoop, InternetProtocolFamily ipFamily) { - this(eventLoop, newSocket(DEFAULT_SELECTOR_PROVIDER, ipFamily)); + public NioDatagramChannel(InternetProtocolFamily ipFamily) { + this(DEFAULT_SELECTOR_PROVIDER, ipFamily); } - /** - * Create a new instance using the given {@link SelectorProvider} and {@link InternetProtocolFamily}. - * If {@link InternetProtocolFamily} is {@code null} it will depend on the Operation Systems default - * which will be chosen. - */ - public NioDatagramChannel(EventLoop eventLoop, SelectorProvider provider, InternetProtocolFamily ipFamily) { - this(eventLoop, newSocket(provider, ipFamily)); + public NioDatagramChannel(SelectorProvider provider, InternetProtocolFamily ipFamily) { + this(newSocket(provider, ipFamily)); } /** * Create a new instance from the given {@link DatagramChannel}. */ - public NioDatagramChannel(EventLoop eventLoop, DatagramChannel socket) { - super(null, eventLoop, socket, SelectionKey.OP_READ); + public NioDatagramChannel(DatagramChannel socket) { + super(null, socket, SelectionKey.OP_READ); config = new NioDatagramChannelConfig(this, socket); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java index 34d1c34fb3..7b7e18bd2a 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java @@ -18,9 +18,7 @@ package io.netty.channel.socket.nio; import io.netty.channel.ChannelException; import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOutboundBuffer; -import io.netty.channel.EventLoop; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.AbstractNioMessageServerChannel; +import io.netty.channel.nio.AbstractNioMessageChannel; import io.netty.channel.socket.DefaultServerSocketChannelConfig; import io.netty.channel.socket.ServerSocketChannelConfig; import io.netty.util.internal.logging.InternalLogger; @@ -40,8 +38,8 @@ import java.util.List; * A {@link io.netty.channel.socket.ServerSocketChannel} implementation which uses * NIO selector based implementation to accept new connections. */ -public class NioServerSocketChannel extends AbstractNioMessageServerChannel - implements io.netty.channel.socket.ServerSocketChannel { +public class NioServerSocketChannel extends AbstractNioMessageChannel + implements io.netty.channel.socket.ServerSocketChannel { private static final ChannelMetadata METADATA = new ChannelMetadata(false); private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); @@ -68,22 +66,12 @@ public class NioServerSocketChannel extends AbstractNioMessageServerChannel /** * Create a new instance */ - public NioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup) { - this(eventLoop, childGroup, newSocket(DEFAULT_SELECTOR_PROVIDER)); + public NioServerSocketChannel() { + this(DEFAULT_SELECTOR_PROVIDER); } - /** - * Create a new instance using the given {@link SelectorProvider}. - */ - public NioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup, SelectorProvider provider) { - this(eventLoop, childGroup, newSocket(provider)); - } - - /** - * Create a new instance using the given {@link ServerSocketChannel}. - */ - public NioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup, ServerSocketChannel channel) { - super(null, eventLoop, childGroup, channel, SelectionKey.OP_ACCEPT); + public NioServerSocketChannel(SelectorProvider provider) { + super(null, newSocket(provider), SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } @@ -138,7 +126,7 @@ public class NioServerSocketChannel extends AbstractNioMessageServerChannel try { if (ch != null) { - buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch)); + buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index ba5c17e414..a6d2493033 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -66,32 +66,31 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty /** * Create a new instance */ - public NioSocketChannel(EventLoop eventLoop) { - this(eventLoop, newSocket(DEFAULT_SELECTOR_PROVIDER)); + public NioSocketChannel() { + this(DEFAULT_SELECTOR_PROVIDER); } /** * Create a new instance using the given {@link SelectorProvider}. */ - public NioSocketChannel(EventLoop eventLoop, SelectorProvider provider) { - this(eventLoop, newSocket(provider)); + public NioSocketChannel(SelectorProvider provider) { + this(newSocket(provider)); } /** * Create a new instance using the given {@link SocketChannel}. */ - public NioSocketChannel(EventLoop eventLoop, SocketChannel socket) { - this(null, eventLoop, socket); + public NioSocketChannel(SocketChannel socket) { + this(null, socket); } - /** * Create a new instance * * @param parent the {@link Channel} which created this instance or {@code null} if it was created by the user * @param socket the {@link SocketChannel} which will be used */ - public NioSocketChannel(Channel parent, EventLoop eventLoop, SocketChannel socket) { - super(parent, eventLoop, socket); + public NioSocketChannel(Channel parent, SocketChannel socket) { + super(parent, socket); config = new NioSocketChannelConfig(this, socket.socket()); } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java index b7b06483be..852396d049 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java @@ -25,7 +25,6 @@ import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPromise; -import io.netty.channel.EventLoop; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.oio.AbstractOioMessageChannel; import io.netty.channel.socket.DatagramChannel; @@ -56,7 +55,8 @@ import java.util.Locale; * @see AddressedEnvelope * @see DatagramPacket */ -public final class OioDatagramChannel extends AbstractOioMessageChannel implements DatagramChannel { +public class OioDatagramChannel extends AbstractOioMessageChannel + implements DatagramChannel { private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioDatagramChannel.class); @@ -79,8 +79,8 @@ public final class OioDatagramChannel extends AbstractOioMessageChannel implemen /** * Create a new instance with an new {@link MulticastSocket}. */ - public OioDatagramChannel(EventLoop eventLoop) { - this(eventLoop, newSocket()); + public OioDatagramChannel() { + this(newSocket()); } /** @@ -88,8 +88,8 @@ public final class OioDatagramChannel extends AbstractOioMessageChannel implemen * * @param socket the {@link MulticastSocket} which is used by this instance */ - public OioDatagramChannel(EventLoop eventLoop, MulticastSocket socket) { - super(null, eventLoop); + public OioDatagramChannel(MulticastSocket socket) { + super(null); boolean success = false; try { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java index 208d8b3bab..72ce74c521 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java @@ -18,9 +18,7 @@ package io.netty.channel.socket.oio; import io.netty.channel.ChannelException; import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOutboundBuffer; -import io.netty.channel.EventLoop; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.oio.AbstractOioMessageServerChannel; +import io.netty.channel.oio.AbstractOioMessageChannel; import io.netty.channel.socket.ServerSocketChannel; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -40,7 +38,8 @@ import java.util.concurrent.locks.ReentrantLock; * * This implementation use Old-Blocking-IO. */ -public class OioServerSocketChannel extends AbstractOioMessageServerChannel implements ServerSocketChannel { +public class OioServerSocketChannel extends AbstractOioMessageChannel + implements ServerSocketChannel { private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioServerSocketChannel.class); @@ -62,8 +61,8 @@ public class OioServerSocketChannel extends AbstractOioMessageServerChannel impl /** * Create a new instance with an new {@link Socket} */ - public OioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup) { - this(eventLoop, childGroup, newServerSocket()); + public OioServerSocketChannel() { + this(newServerSocket()); } /** @@ -71,8 +70,8 @@ public class OioServerSocketChannel extends AbstractOioMessageServerChannel impl * * @param socket the {@link ServerSocket} which is used by this instance */ - public OioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup, ServerSocket socket) { - super(null, eventLoop, childGroup); + public OioServerSocketChannel(ServerSocket socket) { + super(null); if (socket == null) { throw new NullPointerException("socket"); } @@ -155,7 +154,7 @@ public class OioServerSocketChannel extends AbstractOioMessageServerChannel impl Socket s = socket.accept(); try { if (s != null) { - buf.add(new OioSocketChannel(this, childEventLoopGroup().next(), s)); + buf.add(new OioSocketChannel(this, s)); return 1; } } catch (Throwable t) { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java index b066541be1..dc48bc4e4b 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java @@ -49,8 +49,8 @@ public class OioSocketChannel extends OioByteStreamChannel /** * Create a new instance with an new {@link Socket} */ - public OioSocketChannel(EventLoop eventLoop) { - this(eventLoop, new Socket()); + public OioSocketChannel() { + this(new Socket()); } /** @@ -58,8 +58,8 @@ public class OioSocketChannel extends OioByteStreamChannel * * @param socket the {@link Socket} which is used by this instance */ - public OioSocketChannel(EventLoop eventLoop, Socket socket) { - this(null, eventLoop, socket); + public OioSocketChannel(Socket socket) { + this(null, socket); } /** @@ -69,8 +69,8 @@ public class OioSocketChannel extends OioByteStreamChannel * {@link} has no parent as it was created by your self. * @param socket the {@link Socket} which is used by this instance */ - public OioSocketChannel(Channel parent, EventLoop eventLoop, Socket socket) { - super(parent, eventLoop); + public OioSocketChannel(Channel parent, Socket socket) { + super(parent); this.socket = socket; config = new DefaultOioSocketChannelConfig(this, socket); diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index 891b5b11f9..c783a8d79c 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -137,7 +137,7 @@ public class DefaultChannelPipelineTest { @Test public void testRemoveChannelHandler() { - ChannelPipeline pipeline = new LocalChannel(group.next()).pipeline(); + ChannelPipeline pipeline = new LocalChannel().pipeline(); ChannelHandler handler1 = newHandler(); ChannelHandler handler2 = newHandler(); @@ -160,7 +160,7 @@ public class DefaultChannelPipelineTest { @Test public void testReplaceChannelHandler() { - ChannelPipeline pipeline = new LocalChannel(group.next()).pipeline(); + ChannelPipeline pipeline = new LocalChannel().pipeline(); ChannelHandler handler1 = newHandler(); pipeline.addLast("handler1", handler1); @@ -185,7 +185,7 @@ public class DefaultChannelPipelineTest { @Test public void testChannelHandlerContextNavigation() { - ChannelPipeline pipeline = new LocalChannel(group.next()).pipeline(); + ChannelPipeline pipeline = new LocalChannel().pipeline(); final int HANDLER_ARRAY_LEN = 5; ChannelHandler[] firstHandlers = newHandlers(HANDLER_ARRAY_LEN); @@ -200,12 +200,11 @@ public class DefaultChannelPipelineTest { @Test public void testFireChannelRegistered() throws Exception { final CountDownLatch latch = new CountDownLatch(1); - Channel ch = new LocalChannel(group.next()); - ChannelPipeline pipeline = ch.pipeline(); + ChannelPipeline pipeline = new LocalChannel().pipeline(); pipeline.addLast(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { - ch.pipeline().addLast(new ChannelHandlerAdapter() { + ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { latch.countDown(); @@ -213,13 +212,13 @@ public class DefaultChannelPipelineTest { }); } }); - ch.unsafe().register(ch.newPromise()); + group.register(pipeline.channel()); assertTrue(latch.await(2, TimeUnit.SECONDS)); } @Test public void testPipelineOperation() { - ChannelPipeline pipeline = new LocalChannel(group.next()).pipeline(); + ChannelPipeline pipeline = new LocalChannel().pipeline(); final int handlerNum = 5; ChannelHandler[] handlers1 = newHandlers(handlerNum); @@ -247,7 +246,7 @@ public class DefaultChannelPipelineTest { @Test public void testChannelHandlerContextOrder() { - ChannelPipeline pipeline = new LocalChannel(group.next()).pipeline(); + ChannelPipeline pipeline = new LocalChannel().pipeline(); pipeline.addFirst("1", newHandler()); pipeline.addLast("10", newHandler()); @@ -444,7 +443,8 @@ public class DefaultChannelPipelineTest { // Tests for https://github.com/netty/netty/issues/2349 @Test public void testCancelBind() throws Exception { - ChannelPipeline pipeline = new LocalChannel(group.next()).pipeline(); + ChannelPipeline pipeline = new LocalChannel().pipeline(); + group.register(pipeline.channel()); ChannelPromise promise = pipeline.channel().newPromise(); assertTrue(promise.cancel(false)); @@ -454,7 +454,8 @@ public class DefaultChannelPipelineTest { @Test public void testCancelConnect() throws Exception { - ChannelPipeline pipeline = new LocalChannel(group.next()).pipeline(); + ChannelPipeline pipeline = new LocalChannel().pipeline(); + group.register(pipeline.channel()); ChannelPromise promise = pipeline.channel().newPromise(); assertTrue(promise.cancel(false)); @@ -464,7 +465,8 @@ public class DefaultChannelPipelineTest { @Test public void testCancelDisconnect() throws Exception { - ChannelPipeline pipeline = new LocalChannel(group.next()).pipeline(); + ChannelPipeline pipeline = new LocalChannel().pipeline(); + group.register(pipeline.channel()); ChannelPromise promise = pipeline.channel().newPromise(); assertTrue(promise.cancel(false)); @@ -474,7 +476,8 @@ public class DefaultChannelPipelineTest { @Test public void testCancelClose() throws Exception { - ChannelPipeline pipeline = new LocalChannel(group.next()).pipeline(); + ChannelPipeline pipeline = new LocalChannel().pipeline(); + group.register(pipeline.channel()); ChannelPromise promise = pipeline.channel().newPromise(); assertTrue(promise.cancel(false)); @@ -482,9 +485,21 @@ public class DefaultChannelPipelineTest { assertTrue(future.isCancelled()); } + @Test + public void testCancelDeregister() throws Exception { + ChannelPipeline pipeline = new LocalChannel().pipeline(); + group.register(pipeline.channel()); + + ChannelPromise promise = pipeline.channel().newPromise(); + assertTrue(promise.cancel(false)); + ChannelFuture future = pipeline.deregister(promise); + assertTrue(future.isCancelled()); + } + @Test public void testCancelWrite() throws Exception { - ChannelPipeline pipeline = new LocalChannel(group.next()).pipeline(); + ChannelPipeline pipeline = new LocalChannel().pipeline(); + group.register(pipeline.channel()); ChannelPromise promise = pipeline.channel().newPromise(); assertTrue(promise.cancel(false)); @@ -497,7 +512,8 @@ public class DefaultChannelPipelineTest { @Test public void testCancelWriteAndFlush() throws Exception { - ChannelPipeline pipeline = new LocalChannel(group.next()).pipeline(); + ChannelPipeline pipeline = new LocalChannel().pipeline(); + group.register(pipeline.channel()); ChannelPromise promise = pipeline.channel().newPromise(); assertTrue(promise.cancel(false)); diff --git a/transport/src/test/java/io/netty/channel/LoggingHandler.java b/transport/src/test/java/io/netty/channel/LoggingHandler.java index 5ebc042cca..b1c2147df2 100644 --- a/transport/src/test/java/io/netty/channel/LoggingHandler.java +++ b/transport/src/test/java/io/netty/channel/LoggingHandler.java @@ -22,8 +22,8 @@ import java.util.EnumSet; final class LoggingHandler implements ChannelHandler { enum Event { - WRITE, FLUSH, BIND, CONNECT, DISCONNECT, CLOSE, READ, WRITABILITY, HANDLER_ADDED, - HANDLER_REMOVED, EXCEPTION, READ_COMPLETE, REGISTERED, ACTIVE, INACTIVE, USER + WRITE, FLUSH, BIND, CONNECT, DISCONNECT, CLOSE, DEREGISTER, READ, WRITABILITY, HANDLER_ADDED, + HANDLER_REMOVED, EXCEPTION, READ_COMPLETE, REGISTERED, UNREGISTERED, ACTIVE, INACTIVE, USER } private StringBuilder log = new StringBuilder(); @@ -68,6 +68,12 @@ final class LoggingHandler implements ChannelHandler { ctx.close(promise); } + @Override + public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + log(Event.DEREGISTER); + ctx.deregister(promise); + } + @Override public void read(ChannelHandlerContext ctx) throws Exception { log(Event.READ); @@ -101,6 +107,12 @@ final class LoggingHandler implements ChannelHandler { ctx.fireChannelRegistered(); } + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + log(Event.UNREGISTERED); + ctx.fireChannelUnregistered(); + } + @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log(Event.ACTIVE); diff --git a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java index 11456e2296..cd57d9ee6e 100644 --- a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java @@ -339,13 +339,11 @@ public class SingleThreadEventLoopTest { } try { - Channel channel = new LocalChannel(loopA); - ChannelPromise f = channel.newPromise(); - channel.unsafe().register(f); + ChannelFuture f = loopA.register(new LocalChannel()); f.awaitUninterruptibly(); assertFalse(f.isSuccess()); assertThat(f.cause(), is(instanceOf(RejectedExecutionException.class))); - assertFalse(channel.isOpen()); + assertFalse(f.channel().isOpen()); } finally { for (Appender a: appenders) { root.addAppender(a); @@ -358,7 +356,7 @@ public class SingleThreadEventLoopTest { public void testRegistrationAfterShutdown2() throws Exception { loopA.shutdown(); final CountDownLatch latch = new CountDownLatch(1); - Channel ch = new LocalChannel(loopA); + Channel ch = new LocalChannel(); ChannelPromise promise = ch.newPromise(); promise.addListener(new ChannelFutureListener() { @Override @@ -377,10 +375,10 @@ public class SingleThreadEventLoopTest { } try { - ch.unsafe().register(promise); - promise.awaitUninterruptibly(); - assertFalse(promise.isSuccess()); - assertThat(promise.cause(), is(instanceOf(RejectedExecutionException.class))); + ChannelFuture f = loopA.register(ch, promise); + f.awaitUninterruptibly(); + assertFalse(f.isSuccess()); + assertThat(f.cause(), is(instanceOf(RejectedExecutionException.class))); // Ensure the listener was notified. assertFalse(latch.await(1, TimeUnit.SECONDS)); diff --git a/transport/src/test/java/io/netty/channel/ThreadPerChannelEventLoopGroupTest.java b/transport/src/test/java/io/netty/channel/ThreadPerChannelEventLoopGroupTest.java index 2f607bf09c..718b7bfc5b 100644 --- a/transport/src/test/java/io/netty/channel/ThreadPerChannelEventLoopGroupTest.java +++ b/transport/src/test/java/io/netty/channel/ThreadPerChannelEventLoopGroupTest.java @@ -82,7 +82,7 @@ public class ThreadPerChannelEventLoopGroupTest { ChannelGroup channelGroup = new DefaultChannelGroup(testExecutor); while (taskCount-- > 0) { Channel channel = new EmbeddedChannel(NOOP_HANDLER); - channel.unsafe().register(new DefaultChannelPromise(channel, testExecutor)); + loopGroup.register(channel, new DefaultChannelPromise(channel, testExecutor)); channelGroup.add(channel); } channelGroup.close().sync(); diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java index 890e7ca493..a55de13e4f 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java @@ -89,7 +89,7 @@ public class LocalTransportThreadModelTest { ThreadNameAuditor h2 = new ThreadNameAuditor(); ThreadNameAuditor h3 = new ThreadNameAuditor(true); - Channel ch = new LocalChannel(l.next()); + Channel ch = new LocalChannel(); // With no EventExecutor specified, h1 will be always invoked by EventLoop 'l'. ch.pipeline().addLast(h1); // h2 will be always invoked by EventExecutor 'e1'. @@ -97,9 +97,7 @@ public class LocalTransportThreadModelTest { // h3 will be always invoked by EventExecutor 'e2'. ch.pipeline().addLast(e2, h3); - ChannelPromise promise = ch.newPromise(); - ch.unsafe().register(promise); - promise.sync().channel().connect(localAddr).sync(); + l.register(ch).sync().channel().connect(localAddr).sync(); // Fire inbound events from all possible starting points. ch.pipeline().fireChannelRead("1"); @@ -242,7 +240,7 @@ public class LocalTransportThreadModelTest { final MessageForwarder2 h5 = new MessageForwarder2(); final MessageDiscarder h6 = new MessageDiscarder(); - final Channel ch = new LocalChannel(l.next()); + final Channel ch = new LocalChannel(); // inbound: int -> byte[4] -> int -> int -> byte[4] -> int -> /dev/null // outbound: int -> int -> byte[4] -> int -> int -> byte[4] -> /dev/null @@ -253,9 +251,7 @@ public class LocalTransportThreadModelTest { .addLast(e4, h5) .addLast(e5, h6); - ChannelPromise promise = ch.newPromise(); - ch.unsafe().register(promise); - promise.sync().channel().connect(localAddr).sync(); + l.register(ch).sync().channel().connect(localAddr).sync(); final int ROUNDS = 1024; final int ELEMS_PER_ROUNDS = 8192; diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java index 6fb6b36903..ffc0b56ab1 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java @@ -130,7 +130,7 @@ public class LocalTransportThreadModelTest3 { final EventForwarder h5 = new EventForwarder(); final EventRecorder h6 = new EventRecorder(events, inbound); - final Channel ch = new LocalChannel(l.next()); + final Channel ch = new LocalChannel(); if (!inbound) { ch.config().setAutoRead(false); } @@ -141,9 +141,7 @@ public class LocalTransportThreadModelTest3 { .addLast(e1, h5) .addLast(e1, "recorder", h6); - ChannelPromise promise = ch.newPromise(); - ch.unsafe().register(promise); - promise.sync().channel().connect(localAddr).sync(); + l.register(ch).sync().channel().connect(localAddr).sync(); final LinkedList expectedEvents = events(inbound, 8192);