Resurrect channel deregistration and constructor changes

Motivation:

Due to the complexity of handling deregistration and re-registration of
a channel, we previously decided to remove the deregister() operation
completely to simplify our code.  However, we realized that it shouldn't
be that complicated to implement it during our discussion about making
I/O scheduling more flexible and more customizable [1], and thus the
removal of deregistration and re-registration is unnecessary now.

Modification:

- Revert commit c149f4bcc0
- Revert commit e743a27e75
- Make some additional adjustments

Result:

- deregister(), fireChannelUnregistered(), and channelRegistered() were
  added back..
- Channel constructors do not require an EventLoop anymore.

[1] https://github.com/netty/netty/issues/2250
This commit is contained in:
Norman Maurer 2014-04-22 14:38:10 +02:00 committed by Trustin Lee
parent 797d6d94a4
commit 48f2e705d9
77 changed files with 891 additions and 921 deletions

View File

@ -195,6 +195,14 @@ public class LoggingHandler extends ChannelHandlerAdapter {
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "UNREGISTERED"));
}
ctx.fireChannelUnregistered();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (logger.isEnabled(internalLevel)) {

View File

@ -21,7 +21,6 @@ import io.netty.bootstrap.ChannelFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.oio.OioEventLoopGroup;
@ -109,8 +108,8 @@ public class SocketTestPermutation {
public Bootstrap newInstance() {
return new Bootstrap().group(nioWorkerGroup).channelFactory(new ChannelFactory<Channel>() {
@Override
public Channel newChannel(EventLoop loop) {
return new NioDatagramChannel(loop, InternetProtocolFamily.IPv4);
public Channel newChannel() {
return new NioDatagramChannel(InternetProtocolFamily.IPv4);
}
@Override

View File

@ -32,12 +32,12 @@ abstract class AbstractEpollChannel extends AbstractChannel {
volatile int fd;
int id;
AbstractEpollChannel(EventLoop eventLoop, int fd, int flag) {
this(null, eventLoop, fd, flag, false);
AbstractEpollChannel(int fd, int flag) {
this(null, fd, flag, false);
}
AbstractEpollChannel(Channel parent, EventLoop eventLoop, int fd, int flag, boolean active) {
super(parent, eventLoop);
AbstractEpollChannel(Channel parent, int fd, int flag, boolean active) {
super(parent);
this.fd = fd;
readFlag = flag;
flags |= flag;

View File

@ -23,7 +23,6 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelConfig;
@ -51,8 +50,8 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
private volatile boolean connected;
private final EpollDatagramChannelConfig config;
public EpollDatagramChannel(EventLoop loop) {
super(loop, Native.socketDgramFd(), Native.EPOLLIN);
public EpollDatagramChannel() {
super(Native.socketDgramFd(), Native.EPOLLIN);
config = new EpollDatagramChannelConfig(this);
}

View File

@ -19,7 +19,6 @@ import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import java.net.InetSocketAddress;
@ -32,13 +31,11 @@ import java.net.SocketAddress;
public final class EpollServerSocketChannel extends AbstractEpollChannel implements ServerSocketChannel {
private final EpollServerSocketChannelConfig config;
private final EventLoopGroup childGroup;
private volatile InetSocketAddress local;
public EpollServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
super(eventLoop, Native.socketStreamFd(), Native.EPOLLACCEPT);
public EpollServerSocketChannel() {
super(Native.socketStreamFd(), Native.EPOLLACCEPT);
config = new EpollServerSocketChannelConfig(this);
this.childGroup = childGroup;
}
@Override
@ -81,11 +78,6 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme
throw new UnsupportedOperationException();
}
@Override
public EventLoopGroup childEventLoopGroup() {
return childGroup;
}
final class EpollServerSocketUnsafe extends AbstractEpollUnsafe {
@Override
@ -109,8 +101,7 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme
}
try {
readPending = false;
pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this,
childEventLoopGroup().next(), socketFd));
pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this, socketFd));
} catch (Throwable t) {
// keep on reading as we use epoll ET and need to consume everything from the socket
pipeline.fireChannelReadComplete();

View File

@ -66,8 +66,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
private volatile boolean inputShutdown;
private volatile boolean outputShutdown;
EpollSocketChannel(Channel parent, EventLoop eventLoop, int fd) {
super(parent, eventLoop, fd, Native.EPOLLIN, true);
EpollSocketChannel(Channel parent, int fd) {
super(parent, fd, Native.EPOLLIN, true);
config = new EpollSocketChannelConfig(this);
// Directly cache the remote and local addresses
// See https://github.com/netty/netty/issues/2359
@ -75,8 +75,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
local = Native.localAddress(fd);
}
public EpollSocketChannel(EventLoop eventLoop) {
super(eventLoop, Native.socketStreamFd(), Native.EPOLLIN);
public EpollSocketChannel() {
super(Native.socketStreamFd(), Native.EPOLLIN);
config = new EpollSocketChannelConfig(this);
}

View File

@ -19,7 +19,6 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ChannelFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.nio.NioDatagramChannel;
@ -100,8 +99,8 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
public Bootstrap newInstance() {
return new Bootstrap().group(nioWorkerGroup).channelFactory(new ChannelFactory<Channel>() {
@Override
public Channel newChannel(EventLoop loop) {
return new NioDatagramChannel(loop, InternetProtocolFamily.IPv4);
public Channel newChannel() {
return new NioDatagramChannel(InternetProtocolFamily.IPv4);
}
@Override

View File

@ -19,7 +19,6 @@ import gnu.io.CommPort;
import gnu.io.CommPortIdentifier;
import gnu.io.SerialPort;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.oio.OioByteStreamChannel;
import java.net.SocketAddress;
@ -40,8 +39,8 @@ public class RxtxChannel extends OioByteStreamChannel {
private RxtxDeviceAddress deviceAddress;
private SerialPort serialPort;
public RxtxChannel(EventLoop eventLoop) {
super(null, eventLoop);
public RxtxChannel() {
super(null);
config = new DefaultRxtxChannelConfig(this);
}

View File

@ -27,7 +27,6 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.nio.AbstractNioMessageChannel;
import io.netty.channel.sctp.DefaultSctpChannelConfig;
@ -82,15 +81,15 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
/**
* Create a new instance
*/
public NioSctpChannel(EventLoop eventLoop) {
this(eventLoop, newSctpChannel());
public NioSctpChannel() {
this(newSctpChannel());
}
/**
* Create a new instance using {@link SctpChannel}
*/
public NioSctpChannel(EventLoop eventLoop, SctpChannel sctpChannel) {
this(null, eventLoop, sctpChannel);
public NioSctpChannel(SctpChannel sctpChannel) {
this(null, sctpChannel);
}
/**
@ -100,8 +99,8 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
* or {@code null}.
* @param sctpChannel the underlying {@link SctpChannel}
*/
public NioSctpChannel(Channel parent, EventLoop eventLoop, SctpChannel sctpChannel) {
super(parent, eventLoop, sctpChannel, SelectionKey.OP_READ);
public NioSctpChannel(Channel parent, SctpChannel sctpChannel) {
super(parent, sctpChannel, SelectionKey.OP_READ);
try {
sctpChannel.configureBlocking(false);
config = new NioSctpChannelConfig(this, sctpChannel);

View File

@ -22,9 +22,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.AbstractNioMessageServerChannel;
import io.netty.channel.nio.AbstractNioMessageChannel;
import io.netty.channel.sctp.DefaultSctpServerChannelConfig;
import io.netty.channel.sctp.SctpServerChannelConfig;
@ -46,9 +44,8 @@ import java.util.Set;
* Be aware that not all operations systems support SCTP. Please refer to the documentation of your operation system,
* to understand what you need to do to use it. Also this feature is only supported on Java 7+.
*/
public class NioSctpServerChannel extends AbstractNioMessageServerChannel
public class NioSctpServerChannel extends AbstractNioMessageChannel
implements io.netty.channel.sctp.SctpServerChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private static SctpServerChannel newSocket() {
@ -65,9 +62,9 @@ public class NioSctpServerChannel extends AbstractNioMessageServerChannel
/**
* Create a new instance
*/
public NioSctpServerChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
super(null, eventLoop, childGroup, newSocket(), SelectionKey.OP_ACCEPT);
config = new NioSctpServerChannelConfig(this, javaChannel());
public NioSctpServerChannel() {
super(null, newSocket(), SelectionKey.OP_ACCEPT);
config = new DefaultSctpServerChannelConfig(this, javaChannel());
}
@Override
@ -143,7 +140,7 @@ public class NioSctpServerChannel extends AbstractNioMessageServerChannel
if (ch == null) {
return 0;
}
buf.add(new NioSctpChannel(this, childEventLoopGroup().next(), ch));
buf.add(new NioSctpChannel(this, ch));
return 1;
}

View File

@ -26,7 +26,6 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.oio.AbstractOioMessageChannel;
import io.netty.channel.sctp.DefaultSctpChannelConfig;
@ -88,8 +87,8 @@ public class OioSctpChannel extends AbstractOioMessageChannel
/**
* Create a new instance with an new {@link SctpChannel}.
*/
public OioSctpChannel(EventLoop eventLoop) {
this(eventLoop, openChannel());
public OioSctpChannel() {
this(openChannel());
}
/**
@ -97,8 +96,8 @@ public class OioSctpChannel extends AbstractOioMessageChannel
*
* @param ch the {@link SctpChannel} which is used by this instance
*/
public OioSctpChannel(EventLoop eventLoop, SctpChannel ch) {
this(null, eventLoop, ch);
public OioSctpChannel(SctpChannel ch) {
this(null, ch);
}
/**
@ -108,8 +107,8 @@ public class OioSctpChannel extends AbstractOioMessageChannel
* {@link} has no parent as it was created by your self.
* @param ch the {@link SctpChannel} which is used by this instance
*/
public OioSctpChannel(Channel parent, EventLoop eventLoop, SctpChannel ch) {
super(parent, eventLoop);
public OioSctpChannel(Channel parent, SctpChannel ch) {
super(parent);
this.ch = ch;
boolean success = false;
try {

View File

@ -22,9 +22,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.oio.AbstractOioMessageServerChannel;
import io.netty.channel.oio.AbstractOioMessageChannel;
import io.netty.channel.sctp.DefaultSctpServerChannelConfig;
import io.netty.channel.sctp.SctpServerChannelConfig;
import io.netty.util.internal.logging.InternalLogger;
@ -49,7 +47,7 @@ import java.util.Set;
* Be aware that not all operations systems support SCTP. Please refer to the documentation of your operation system,
* to understand what you need to do to use it. Also this feature is only supported on Java 7+.
*/
public class OioSctpServerChannel extends AbstractOioMessageServerChannel
public class OioSctpServerChannel extends AbstractOioMessageChannel
implements io.netty.channel.sctp.SctpServerChannel {
private static final InternalLogger logger =
@ -72,8 +70,8 @@ public class OioSctpServerChannel extends AbstractOioMessageServerChannel
/**
* Create a new instance with an new {@link SctpServerChannel}
*/
public OioSctpServerChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
this(eventLoop, childGroup, newServerSocket());
public OioSctpServerChannel() {
this(newServerSocket());
}
/**
@ -81,8 +79,8 @@ public class OioSctpServerChannel extends AbstractOioMessageServerChannel
*
* @param sch the {@link SctpServerChannel} which is used by this instance
*/
public OioSctpServerChannel(EventLoop eventLoop, EventLoopGroup childGroup, SctpServerChannel sch) {
super(null, eventLoop, childGroup);
public OioSctpServerChannel(SctpServerChannel sch) {
super(null);
if (sch == null) {
throw new NullPointerException("sctp server channel");
}
@ -198,7 +196,7 @@ public class OioSctpServerChannel extends AbstractOioMessageServerChannel
if (key.isAcceptable()) {
s = sch.accept();
if (s != null) {
buf.add(new OioSctpChannel(this, childEventLoopGroup().next(), s));
buf.add(new OioSctpChannel(this, s));
acceptedChannels ++;
}
}

View File

@ -19,9 +19,7 @@ import com.barchart.udt.TypeUDT;
import com.barchart.udt.nio.ServerSocketChannelUDT;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.AbstractNioMessageServerChannel;
import io.netty.channel.nio.AbstractNioMessageChannel;
import io.netty.channel.udt.DefaultUdtServerChannelConfig;
import io.netty.channel.udt.UdtServerChannel;
import io.netty.channel.udt.UdtServerChannelConfig;
@ -36,16 +34,15 @@ import static java.nio.channels.SelectionKey.*;
/**
* Common base for Netty Byte/Message UDT Stream/Datagram acceptors.
*/
public abstract class NioUdtAcceptorChannel extends AbstractNioMessageServerChannel implements UdtServerChannel {
public abstract class NioUdtAcceptorChannel extends AbstractNioMessageChannel implements UdtServerChannel {
protected static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioUdtAcceptorChannel.class);
private final UdtServerChannelConfig config;
protected NioUdtAcceptorChannel(EventLoop eventLoop, EventLoopGroup childGroup,
ServerSocketChannelUDT channelUDT) {
super(null, eventLoop, childGroup, channelUDT, OP_ACCEPT);
protected NioUdtAcceptorChannel(final ServerSocketChannelUDT channelUDT) {
super(null, channelUDT, OP_ACCEPT);
try {
channelUDT.configureBlocking(false);
config = new DefaultUdtServerChannelConfig(this, channelUDT, true);
@ -61,8 +58,8 @@ public abstract class NioUdtAcceptorChannel extends AbstractNioMessageServerChan
}
}
protected NioUdtAcceptorChannel(EventLoop eventLoop, EventLoopGroup childGroup, final TypeUDT type) {
this(eventLoop, childGroup, NioUdtProvider.newAcceptorChannelUDT(type));
protected NioUdtAcceptorChannel(final TypeUDT type) {
this(NioUdtProvider.newAcceptorChannelUDT(type));
}
@Override

View File

@ -18,8 +18,6 @@ package io.netty.channel.udt.nio;
import com.barchart.udt.TypeUDT;
import com.barchart.udt.nio.SocketChannelUDT;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import java.util.List;
@ -30,8 +28,8 @@ public class NioUdtByteAcceptorChannel extends NioUdtAcceptorChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
public NioUdtByteAcceptorChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
super(eventLoop, childGroup, TypeUDT.STREAM);
public NioUdtByteAcceptorChannel() {
super(TypeUDT.STREAM);
}
@Override
@ -40,7 +38,7 @@ public class NioUdtByteAcceptorChannel extends NioUdtAcceptorChannel {
if (channelUDT == null) {
return 0;
} else {
buf.add(new NioUdtByteConnectorChannel(this, childEventLoopGroup().next(), channelUDT));
buf.add(new NioUdtByteConnectorChannel(this, channelUDT));
return 1;
}
}

View File

@ -21,7 +21,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.EventLoop;
import io.netty.channel.FileRegion;
import io.netty.channel.nio.AbstractNioByteChannel;
import io.netty.channel.udt.DefaultUdtChannelConfig;
@ -47,12 +46,12 @@ public class NioUdtByteConnectorChannel extends AbstractNioByteChannel implement
private final UdtChannelConfig config;
public NioUdtByteConnectorChannel(EventLoop eventLoop) {
this(eventLoop, TypeUDT.STREAM);
public NioUdtByteConnectorChannel() {
this(TypeUDT.STREAM);
}
public NioUdtByteConnectorChannel(Channel parent, EventLoop eventLoop, SocketChannelUDT channelUDT) {
super(parent, eventLoop, channelUDT);
public NioUdtByteConnectorChannel(final Channel parent, final SocketChannelUDT channelUDT) {
super(parent, channelUDT);
try {
channelUDT.configureBlocking(false);
switch (channelUDT.socketUDT().status()) {
@ -76,12 +75,12 @@ public class NioUdtByteConnectorChannel extends AbstractNioByteChannel implement
}
}
public NioUdtByteConnectorChannel(EventLoop eventLoop, final SocketChannelUDT channelUDT) {
this(null, eventLoop, channelUDT);
public NioUdtByteConnectorChannel(final SocketChannelUDT channelUDT) {
this(null, channelUDT);
}
public NioUdtByteConnectorChannel(EventLoop eventLoop, final TypeUDT type) {
this(eventLoop, NioUdtProvider.newConnectorChannelUDT(type));
public NioUdtByteConnectorChannel(final TypeUDT type) {
this(NioUdtProvider.newConnectorChannelUDT(type));
}
@Override

View File

@ -15,17 +15,15 @@
*/
package io.netty.channel.udt.nio;
import com.barchart.udt.TypeUDT;
import io.netty.channel.EventLoop;
/**
* Byte Channel Rendezvous for UDT Streams.
*/
public class NioUdtByteRendezvousChannel extends NioUdtByteConnectorChannel {
public NioUdtByteRendezvousChannel(EventLoop eventLoop) {
super(eventLoop, NioUdtProvider.newRendezvousChannelUDT(TypeUDT.STREAM));
public NioUdtByteRendezvousChannel() {
super(NioUdtProvider.newRendezvousChannelUDT(TypeUDT.STREAM));
}
}

View File

@ -18,8 +18,6 @@ package io.netty.channel.udt.nio;
import com.barchart.udt.TypeUDT;
import com.barchart.udt.nio.SocketChannelUDT;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import java.util.List;
@ -30,8 +28,8 @@ public class NioUdtMessageAcceptorChannel extends NioUdtAcceptorChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
public NioUdtMessageAcceptorChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
super(eventLoop, childGroup, TypeUDT.DATAGRAM);
public NioUdtMessageAcceptorChannel() {
super(TypeUDT.DATAGRAM);
}
@Override
@ -40,7 +38,7 @@ public class NioUdtMessageAcceptorChannel extends NioUdtAcceptorChannel {
if (channelUDT == null) {
return 0;
} else {
buf.add(new NioUdtMessageConnectorChannel(this, childEventLoopGroup().next(), channelUDT));
buf.add(new NioUdtMessageConnectorChannel(this, channelUDT));
return 1;
}
}

View File

@ -22,7 +22,6 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.AbstractNioMessageChannel;
import io.netty.channel.udt.DefaultUdtChannelConfig;
import io.netty.channel.udt.UdtChannel;
@ -51,12 +50,12 @@ public class NioUdtMessageConnectorChannel extends AbstractNioMessageChannel imp
private final UdtChannelConfig config;
public NioUdtMessageConnectorChannel(EventLoop eventLoop) {
this(eventLoop, TypeUDT.DATAGRAM);
public NioUdtMessageConnectorChannel() {
this(TypeUDT.DATAGRAM);
}
public NioUdtMessageConnectorChannel(final Channel parent, EventLoop eventLoop, final SocketChannelUDT channelUDT) {
super(parent, eventLoop, channelUDT, OP_READ);
public NioUdtMessageConnectorChannel(final Channel parent, final SocketChannelUDT channelUDT) {
super(parent, channelUDT, OP_READ);
try {
channelUDT.configureBlocking(false);
switch (channelUDT.socketUDT().status()) {
@ -80,12 +79,12 @@ public class NioUdtMessageConnectorChannel extends AbstractNioMessageChannel imp
}
}
public NioUdtMessageConnectorChannel(EventLoop eventLoop, final SocketChannelUDT channelUDT) {
this(null, eventLoop, channelUDT);
public NioUdtMessageConnectorChannel(final SocketChannelUDT channelUDT) {
this(null, channelUDT);
}
public NioUdtMessageConnectorChannel(EventLoop eventLoop, final TypeUDT type) {
this(eventLoop, NioUdtProvider.newConnectorChannelUDT(type));
public NioUdtMessageConnectorChannel(final TypeUDT type) {
this(NioUdtProvider.newConnectorChannelUDT(type));
}
@Override

View File

@ -16,7 +16,6 @@
package io.netty.channel.udt.nio;
import com.barchart.udt.TypeUDT;
import io.netty.channel.EventLoop;
import io.netty.channel.udt.UdtMessage;
/**
@ -24,9 +23,10 @@ import io.netty.channel.udt.UdtMessage;
* <p>
* Note: send/receive must use {@link UdtMessage} in the pipeline
*/
public class NioUdtMessageRendezvousChannel extends NioUdtMessageConnectorChannel {
public class NioUdtMessageRendezvousChannel extends
NioUdtMessageConnectorChannel {
public NioUdtMessageRendezvousChannel(EventLoop eventLoop) {
super(eventLoop, NioUdtProvider.newRendezvousChannelUDT(TypeUDT.DATAGRAM));
public NioUdtMessageRendezvousChannel() {
super(NioUdtProvider.newRendezvousChannelUDT(TypeUDT.DATAGRAM));
}
}

View File

@ -24,13 +24,10 @@ import com.barchart.udt.nio.SelectorProviderUDT;
import com.barchart.udt.nio.ServerSocketChannelUDT;
import com.barchart.udt.nio.SocketChannelUDT;
import io.netty.bootstrap.ChannelFactory;
import io.netty.bootstrap.ServerChannelFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.udt.UdtChannel;
import io.netty.channel.udt.UdtServerChannel;
import io.netty.channel.udt.UdtChannel;
import java.io.IOException;
import java.nio.channels.spi.SelectorProvider;
@ -42,21 +39,21 @@ import java.nio.channels.spi.SelectorProvider;
* <p>
* Provides {@link SelectorProvider} for UDT channels.
*/
public abstract class NioUdtProvider {
public final class NioUdtProvider<T extends UdtChannel> implements ChannelFactory<T> {
/**
* {@link ChannelFactory} for UDT Byte Acceptor. See {@link TypeUDT#STREAM}
* and {@link KindUDT#ACCEPTOR}.
*/
public static final ServerChannelFactory<UdtServerChannel> BYTE_ACCEPTOR =
new NioUdtServerChannelFactory<UdtServerChannel>(TypeUDT.STREAM, KindUDT.ACCEPTOR);
public static final ChannelFactory<UdtServerChannel> BYTE_ACCEPTOR = new NioUdtProvider<UdtServerChannel>(
TypeUDT.STREAM, KindUDT.ACCEPTOR);
/**
* {@link ChannelFactory} for UDT Byte Connector. See {@link TypeUDT#STREAM}
* and {@link KindUDT#CONNECTOR}.
*/
public static final ChannelFactory<UdtChannel> BYTE_CONNECTOR =
new NioUdtChannelFactory<UdtChannel>(TypeUDT.STREAM, KindUDT.CONNECTOR);
public static final ChannelFactory<UdtChannel> BYTE_CONNECTOR = new NioUdtProvider<UdtChannel>(
TypeUDT.STREAM, KindUDT.CONNECTOR);
/**
* {@link SelectorProvider} for UDT Byte channels. See
@ -68,22 +65,22 @@ public abstract class NioUdtProvider {
* {@link ChannelFactory} for UDT Byte Rendezvous. See
* {@link TypeUDT#STREAM} and {@link KindUDT#RENDEZVOUS}.
*/
public static final ChannelFactory<UdtChannel> BYTE_RENDEZVOUS =
new NioUdtChannelFactory<UdtChannel>(TypeUDT.STREAM, KindUDT.RENDEZVOUS);
public static final ChannelFactory<UdtChannel> BYTE_RENDEZVOUS = new NioUdtProvider<UdtChannel>(
TypeUDT.STREAM, KindUDT.RENDEZVOUS);
/**
* {@link ChannelFactory} for UDT Message Acceptor. See
* {@link TypeUDT#DATAGRAM} and {@link KindUDT#ACCEPTOR}.
*/
public static final ServerChannelFactory<UdtServerChannel> MESSAGE_ACCEPTOR =
new NioUdtServerChannelFactory<UdtServerChannel>(TypeUDT.DATAGRAM, KindUDT.ACCEPTOR);
public static final ChannelFactory<UdtServerChannel> MESSAGE_ACCEPTOR = new NioUdtProvider<UdtServerChannel>(
TypeUDT.DATAGRAM, KindUDT.ACCEPTOR);
/**
* {@link ChannelFactory} for UDT Message Connector. See
* {@link TypeUDT#DATAGRAM} and {@link KindUDT#CONNECTOR}.
*/
public static final ChannelFactory<UdtChannel> MESSAGE_CONNECTOR =
new NioUdtChannelFactory<UdtChannel>(TypeUDT.DATAGRAM, KindUDT.CONNECTOR);
public static final ChannelFactory<UdtChannel> MESSAGE_CONNECTOR = new NioUdtProvider<UdtChannel>(
TypeUDT.DATAGRAM, KindUDT.CONNECTOR);
/**
* {@link SelectorProvider} for UDT Message channels. See
@ -95,8 +92,8 @@ public abstract class NioUdtProvider {
* {@link ChannelFactory} for UDT Message Rendezvous. See
* {@link TypeUDT#DATAGRAM} and {@link KindUDT#RENDEZVOUS}.
*/
public static final ChannelFactory<UdtChannel> MESSAGE_RENDEZVOUS =
new NioUdtChannelFactory<UdtChannel>(TypeUDT.DATAGRAM, KindUDT.RENDEZVOUS);
public static final ChannelFactory<UdtChannel> MESSAGE_RENDEZVOUS = new NioUdtProvider<UdtChannel>(
TypeUDT.DATAGRAM, KindUDT.RENDEZVOUS);
/**
* Expose underlying {@link ChannelUDT} for debugging and monitoring.
@ -131,7 +128,8 @@ public abstract class NioUdtProvider {
/**
* Convenience factory for {@link KindUDT#ACCEPTOR} channels.
*/
protected static ServerSocketChannelUDT newAcceptorChannelUDT(final TypeUDT type) {
protected static ServerSocketChannelUDT newAcceptorChannelUDT(
final TypeUDT type) {
try {
return SelectorProviderUDT.from(type).openServerSocketChannel();
} catch (final IOException e) {
@ -153,7 +151,8 @@ public abstract class NioUdtProvider {
/**
* Convenience factory for {@link KindUDT#RENDEZVOUS} channels.
*/
protected static RendezvousChannelUDT newRendezvousChannelUDT(final TypeUDT type) {
protected static RendezvousChannelUDT newRendezvousChannelUDT(
final TypeUDT type) {
try {
return SelectorProviderUDT.from(type).openRendezvousChannel();
} catch (final IOException e) {
@ -195,70 +194,42 @@ public abstract class NioUdtProvider {
}
/**
* Produce new {@link UdtChannel} based on factory {@link #kind()} and {@link #type()}
* Produce new {@link UdtChannel} based on factory {@link #kind()} and
* {@link #type()}
*/
private static final class NioUdtChannelFactory<T extends UdtChannel>
extends NioUdtProvider implements ChannelFactory<T> {
private NioUdtChannelFactory(final TypeUDT type, final KindUDT kind) {
super(type, kind);
}
@SuppressWarnings("unchecked")
@Override
public T newChannel(EventLoop eventLoop) {
switch (kind()) {
public T newChannel() {
switch (kind) {
case ACCEPTOR:
throw new IllegalStateException("wrong kind: " + kind());
case CONNECTOR:
switch (type()) {
switch (type) {
case DATAGRAM:
return (T) new NioUdtMessageConnectorChannel(eventLoop);
return (T) new NioUdtMessageAcceptorChannel();
case STREAM:
return (T) new NioUdtByteConnectorChannel(eventLoop);
return (T) new NioUdtByteAcceptorChannel();
default:
throw new IllegalStateException("wrong type: " + type());
}
case RENDEZVOUS:
switch (type()) {
case DATAGRAM:
return (T) new NioUdtMessageRendezvousChannel(eventLoop);
case STREAM:
return (T) new NioUdtByteRendezvousChannel(eventLoop);
default:
throw new IllegalStateException("wrong type: " + type());
}
default:
throw new IllegalStateException("wrong kind: " + kind());
}
}
}
private static final class NioUdtServerChannelFactory<T extends UdtServerChannel> extends NioUdtProvider
implements ServerChannelFactory<T> {
private NioUdtServerChannelFactory(final TypeUDT type, final KindUDT kind) {
super(type, kind);
}
@Override
@SuppressWarnings("unchecked")
public T newChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
switch (kind()) {
case ACCEPTOR:
switch (type()) {
case DATAGRAM:
return (T) new NioUdtMessageAcceptorChannel(eventLoop, childGroup);
case STREAM:
return (T) new NioUdtByteAcceptorChannel(eventLoop, childGroup);
default:
throw new IllegalStateException("wrong type: " + type());
throw new IllegalStateException("wrong type=" + type);
}
case CONNECTOR:
case RENDEZVOUS:
switch (type) {
case DATAGRAM:
return (T) new NioUdtMessageConnectorChannel();
case STREAM:
return (T) new NioUdtByteConnectorChannel();
default:
throw new IllegalStateException("wrong kind: " + kind());
throw new IllegalStateException("wrong type=" + type);
}
case RENDEZVOUS:
switch (type) {
case DATAGRAM:
return (T) new NioUdtMessageRendezvousChannel();
case STREAM:
return (T) new NioUdtByteRendezvousChannel();
default:
throw new IllegalStateException("wrong type=" + type);
}
default:
throw new IllegalStateException("wrong kind=" + kind);
}
}

View File

@ -16,8 +16,6 @@
package io.netty.test.udt.nio;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.udt.nio.NioUdtByteAcceptorChannel;
import org.junit.Test;
@ -30,7 +28,6 @@ public class NioUdtByteAcceptorChannelTest extends AbstractUdtTest {
*/
@Test
public void metadata() throws Exception {
EventLoop loop = new NioEventLoopGroup().next();
assertFalse(new NioUdtByteAcceptorChannel(loop, new NioEventLoopGroup()).metadata().hasDisconnect());
assertEquals(false, new NioUdtByteAcceptorChannel().metadata().hasDisconnect());
}
}

View File

@ -16,8 +16,6 @@
package io.netty.test.udt.nio;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.udt.nio.NioUdtByteConnectorChannel;
import org.junit.Test;
@ -30,7 +28,6 @@ public class NioUdtByteConnectorChannelTest extends AbstractUdtTest {
*/
@Test
public void metadata() throws Exception {
EventLoop loop = new NioEventLoopGroup().next();
assertFalse(new NioUdtByteConnectorChannel(loop).metadata().hasDisconnect());
assertEquals(false, new NioUdtByteConnectorChannel().metadata().hasDisconnect());
}
}

View File

@ -20,7 +20,6 @@ import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Meter;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.udt.nio.NioUdtByteRendezvousChannel;
import io.netty.channel.udt.nio.NioUdtProvider;
@ -45,8 +44,7 @@ public class NioUdtByteRendezvousChannelTest extends AbstractUdtTest {
*/
@Test
public void metadata() throws Exception {
EventLoop loop = new NioEventLoopGroup().next();
assertFalse(new NioUdtByteRendezvousChannel(loop).metadata().hasDisconnect());
assertFalse(new NioUdtByteRendezvousChannel().metadata().hasDisconnect());
}
/**

View File

@ -16,8 +16,6 @@
package io.netty.test.udt.nio;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.udt.nio.NioUdtMessageAcceptorChannel;
import org.junit.Test;
@ -30,7 +28,6 @@ public class NioUdtMessageAcceptorChannelTest extends AbstractUdtTest {
*/
@Test
public void metadata() throws Exception {
EventLoop loop = new NioEventLoopGroup().next();
assertFalse(new NioUdtMessageAcceptorChannel(loop, new NioEventLoopGroup()).metadata().hasDisconnect());
assertEquals(false, new NioUdtMessageAcceptorChannel().metadata().hasDisconnect());
}
}

View File

@ -16,8 +16,6 @@
package io.netty.test.udt.nio;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.udt.nio.NioUdtMessageConnectorChannel;
import org.junit.Test;
@ -30,7 +28,6 @@ public class NioUdtMessageConnectorChannelTest extends AbstractUdtTest {
*/
@Test
public void metadata() throws Exception {
EventLoop loop = new NioEventLoopGroup().next();
assertFalse(new NioUdtMessageConnectorChannel(loop).metadata().hasDisconnect());
assertEquals(false, new NioUdtMessageConnectorChannel().metadata().hasDisconnect());
}
}

View File

@ -20,7 +20,6 @@ import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Meter;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.udt.nio.NioUdtMessageRendezvousChannel;
import io.netty.channel.udt.nio.NioUdtProvider;
@ -45,8 +44,7 @@ public class NioUdtMessageRendezvousChannelTest extends AbstractUdtTest {
*/
@Test
public void metadata() throws Exception {
EventLoop loop = new NioEventLoopGroup().next();
assertFalse(new NioUdtMessageRendezvousChannel(loop).metadata().hasDisconnect());
assertFalse(new NioUdtMessageRendezvousChannel().metadata().hasDisconnect());
}
/**

View File

@ -16,9 +16,6 @@
package io.netty.test.udt.nio;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.udt.UdtServerChannel;
import io.netty.channel.udt.nio.NioUdtProvider;
import org.junit.Test;
@ -32,22 +29,18 @@ public class NioUdtProviderTest extends AbstractUdtTest {
*/
@Test
public void provideFactory() {
EventLoop loop = new NioEventLoopGroup().next();
EventLoopGroup childGroup = new NioEventLoopGroup();
// bytes
assertNotNull(NioUdtProvider.BYTE_ACCEPTOR.newChannel(loop, childGroup));
assertNotNull(NioUdtProvider.BYTE_CONNECTOR.newChannel(loop));
assertNotNull(NioUdtProvider.BYTE_RENDEZVOUS.newChannel(loop));
assertNotNull(NioUdtProvider.BYTE_ACCEPTOR.newChannel());
assertNotNull(NioUdtProvider.BYTE_CONNECTOR.newChannel());
assertNotNull(NioUdtProvider.BYTE_RENDEZVOUS.newChannel());
// message
assertNotNull(NioUdtProvider.MESSAGE_ACCEPTOR.newChannel(loop, childGroup));
assertNotNull(NioUdtProvider.MESSAGE_CONNECTOR.newChannel(loop));
assertNotNull(NioUdtProvider.MESSAGE_RENDEZVOUS.newChannel(loop));
assertNotNull(NioUdtProvider.MESSAGE_ACCEPTOR.newChannel());
assertNotNull(NioUdtProvider.MESSAGE_CONNECTOR.newChannel());
assertNotNull(NioUdtProvider.MESSAGE_RENDEZVOUS.newChannel());
// acceptor types
assertTrue(NioUdtProvider.BYTE_ACCEPTOR.newChannel(loop, childGroup) instanceof UdtServerChannel);
assertTrue(NioUdtProvider.MESSAGE_ACCEPTOR.newChannel(loop, childGroup) instanceof UdtServerChannel);
assertTrue(NioUdtProvider.BYTE_ACCEPTOR.newChannel() instanceof UdtServerChannel);
assertTrue(NioUdtProvider.MESSAGE_ACCEPTOR.newChannel() instanceof UdtServerChannel);
}
}

View File

@ -17,6 +17,7 @@
package io.netty.bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
@ -24,7 +25,6 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.VoidChannel;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
@ -46,6 +46,7 @@ import java.util.Map;
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
private volatile EventLoopGroup group;
private volatile ChannelFactory<? extends C> channelFactory;
private volatile SocketAddress localAddress;
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
@ -57,6 +58,7 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
AbstractBootstrap(AbstractBootstrap<B, C> bootstrap) {
group = bootstrap.group;
channelFactory = bootstrap.channelFactory;
handler = bootstrap.handler;
localAddress = bootstrap.localAddress;
synchronized (bootstrap.options) {
@ -68,7 +70,8 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
}
/**
* The {@link EventLoopGroup} which is used to handle all the events for the to-be-created {@link Channel}
* The {@link EventLoopGroup} which is used to handle all the events for the to-be-creates
* {@link Channel}
*/
@SuppressWarnings("unchecked")
public B group(EventLoopGroup group) {
@ -82,6 +85,38 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
return (B) this;
}
/**
* The {@link Class} which is used to create {@link Channel} instances from.
* You either use this or {@link #channelFactory(ChannelFactory)} if your
* {@link Channel} implementation has no no-args constructor.
*/
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new BootstrapChannelFactory<C>(channelClass));
}
/**
* {@link ChannelFactory} which is used to create {@link Channel} instances from
* when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}
* is not working for you because of some more complex needs. If your {@link Channel} implementation
* has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for
* simplify your code.
*/
@SuppressWarnings("unchecked")
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return (B) this;
}
/**
* The {@link SocketAddress} which is used to bind the local "end" to.
*
@ -166,6 +201,9 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
if (group == null) {
throw new IllegalStateException("group not set");
}
if (channelFactory == null) {
throw new IllegalStateException("factory not set");
}
return (B) this;
}
@ -255,16 +293,8 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
return promise;
}
abstract Channel createChannel();
final ChannelFuture initAndRegister() {
Channel channel;
try {
channel = createChannel();
} catch (Throwable t) {
return VoidChannel.INSTANCE.newFailedFuture(t);
}
final Channel channel = channelFactory().newChannel();
try {
init(channel);
} catch (Throwable t) {
@ -272,8 +302,7 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
return channel.newFailedFuture(t);
}
ChannelPromise regFuture = channel.newPromise();
channel.unsafe().register(regFuture);
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
@ -330,6 +359,10 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
return localAddress;
}
final ChannelFactory<? extends C> channelFactory() {
return channelFactory;
}
final ChannelHandler handler() {
return handler;
}
@ -359,6 +392,11 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
buf.append(StringUtil.simpleClassName(group));
buf.append(", ");
}
if (channelFactory != null) {
buf.append("channelFactory: ");
buf.append(channelFactory);
buf.append(", ");
}
if (localAddress != null) {
buf.append("localAddress: ");
buf.append(localAddress);
@ -391,4 +429,26 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
}
return buf.toString();
}
private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
BootstrapChannelFactory(Class<? extends T> clazz) {
this.clazz = clazz;
}
@Override
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
@Override
public String toString() {
return clazz.getSimpleName() + ".class";
}
}
}

View File

@ -16,20 +16,15 @@
package io.netty.bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.ServerChannel;
import io.netty.util.AttributeKey;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.lang.reflect.Constructor;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@ -47,58 +42,15 @@ public final class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Bootstrap.class);
private volatile ChannelFactory<? extends Channel> channelFactory;
private volatile SocketAddress remoteAddress;
public Bootstrap() { }
private Bootstrap(Bootstrap bootstrap) {
super(bootstrap);
channelFactory = bootstrap.channelFactory;
remoteAddress = bootstrap.remoteAddress;
}
/**
* The {@link Class} which is used to create {@link Channel} instances from.
* You either use this or {@link #channelFactory(ChannelFactory)} if your
* {@link Channel} implementation has no no-args constructor.
*/
public Bootstrap channel(Class<? extends Channel> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new BootstrapChannelFactory<Channel>(channelClass));
}
/**
* {@link ServerChannelFactory} which is used to create {@link ServerChannel} instances when calling
* {@link #bind()}. This method is usually only used if {@link #channel(Class)} is not working for you because of
* some more complex needs. If your {@link Channel} implementation has a no-args constructor, its highly recommend
* to just use {@link #channel(Class)} for simplify your code.
*/
public Bootstrap channelFactory(ChannelFactory<? extends Channel> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return this;
}
ChannelFactory<? extends Channel> channelFactory() {
return channelFactory;
}
@Override
Channel createChannel() {
EventLoop eventLoop = group().next();
return channelFactory().newChannel(eventLoop);
}
/**
* The {@link SocketAddress} to connect to once the {@link #connect()} method
* is called.
@ -255,9 +207,6 @@ public final class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
if (handler() == null) {
throw new IllegalStateException("handler not set");
}
if (channelFactory == null) {
throw new IllegalStateException("channel or channelFactory not set");
}
return this;
}
@ -281,28 +230,4 @@ public final class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
return buf.toString();
}
private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
BootstrapChannelFactory(Class<? extends T> clazz) {
this.clazz = clazz;
}
@Override
public T newChannel(EventLoop eventLoop) {
try {
Constructor<? extends T> constructor = clazz.getConstructor(EventLoop.class);
return constructor.newInstance(eventLoop);
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
@Override
public String toString() {
return StringUtil.simpleClassName(clazz) + ".class";
}
}
}

View File

@ -16,16 +16,14 @@
package io.netty.bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
/**
* Factory that creates a new {@link Channel} on {@link Bootstrap#bind()}, {@link Bootstrap#connect()}, and
* {@link ServerBootstrap#bind()}.
*/
public interface ChannelFactory<T extends Channel> {
/**
* Creates a new channel.
*/
T newChannel(EventLoop eventLoop);
T newChannel();
}

View File

@ -17,24 +17,22 @@ package io.netty.bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.lang.reflect.Constructor;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
@ -48,7 +46,6 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);
private volatile ServerChannelFactory<? extends ServerChannel> channelFactory;
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
private volatile EventLoopGroup childGroup;
@ -58,7 +55,6 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
private ServerBootstrap(ServerBootstrap bootstrap) {
super(bootstrap);
channelFactory = bootstrap.channelFactory;
childGroup = bootstrap.childGroup;
childHandler = bootstrap.childHandler;
synchronized (bootstrap.childOptions) {
@ -69,47 +65,6 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
}
}
/**
* The {@link Class} which is used to create {@link Channel} instances from.
* You either use this or {@link #channelFactory(ServerChannelFactory)} if your
* {@link Channel} implementation has no no-args constructor.
*/
public ServerBootstrap channel(Class<? extends ServerChannel> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ServerBootstrapChannelFactory<ServerChannel>(channelClass));
}
/**
* {@link ChannelFactory} which is used to create {@link Channel} instances from
* when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}
* is not working for you because of some more complex needs. If your {@link Channel} implementation
* has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for
* simplify your code.
*/
public ServerBootstrap channelFactory(ServerChannelFactory<? extends ServerChannel> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return this;
}
@Override
Channel createChannel() {
EventLoop eventLoop = group().next();
return channelFactory().newChannel(eventLoop, childGroup);
}
ServerChannelFactory<? extends ServerChannel> channelFactory() {
return channelFactory;
}
/**
* Specify the {@link EventLoopGroup} which is used for the parent (acceptor) and the child (client).
*/
@ -119,8 +74,8 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
}
/**
* Set the {@link EventExecutorGroup} for the parent (acceptor) and the child (client). These
* {@link EventExecutorGroup}'s are used to handle all the events and IO for {@link SocketChannel} and
* Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
* {@link EventLoopGroup}'s are used to handle all the events and IO for {@link SocketChannel} and
* {@link Channel}'s.
*/
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
@ -212,6 +167,7 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
p.addLast(handler());
}
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
@ -225,8 +181,8 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions,
currentChildAttrs));
ch.pipeline().addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
@ -241,9 +197,6 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
logger.warn("childGroup is not set. Using parentGroup instead.");
childGroup = group();
}
if (channelFactory == null) {
throw new IllegalStateException("channel or channelFactory not set");
}
return this;
}
@ -259,12 +212,16 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
private static class ServerBootstrapAcceptor extends ChannelHandlerAdapter {
private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
ServerBootstrapAcceptor(ChannelHandler childHandler, Entry<ChannelOption<?>, Object>[] childOptions,
Entry<AttributeKey<?>, Object>[] childAttrs) {
@SuppressWarnings("unchecked")
ServerBootstrapAcceptor(
EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
@ -273,7 +230,7 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Channel child = (Channel) msg;
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
@ -291,7 +248,23 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
child.unsafe().register(child.newPromise());
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
private static void forceClose(Channel child, Throwable t) {
child.unsafe().closeForcibly();
logger.warn("Failed to register an accepted channel: " + child, t);
}
@Override
@ -358,29 +331,5 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
return buf.toString();
}
private static final class ServerBootstrapChannelFactory<T extends ServerChannel>
implements ServerChannelFactory<T> {
private final Class<? extends T> clazz;
ServerBootstrapChannelFactory(Class<? extends T> clazz) {
this.clazz = clazz;
}
@Override
public T newChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
try {
Constructor<? extends T> constructor = clazz.getConstructor(EventLoop.class, EventLoopGroup.class);
return constructor.newInstance(eventLoop, childGroup);
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
@Override
public String toString() {
return StringUtil.simpleClassName(clazz) + ".class";
}
}
}

View File

@ -1,33 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
/**
* Factory that creates a new {@link Channel} on {@link Bootstrap#bind()}, {@link Bootstrap#connect()}, and
* {@link ServerBootstrap#bind()}.
*/
public interface ServerChannelFactory<T extends ServerChannel> {
/**
* Creates a new channel.
*/
T newChannel(EventLoop eventLoop, EventLoopGroup childGroup);
}

View File

@ -59,7 +59,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private volatile SocketAddress localAddress;
private volatile SocketAddress remoteAddress;
private final EventLoop eventLoop;
private volatile EventLoop eventLoop;
private volatile boolean registered;
/** Cache for the string representation of this channel */
@ -72,9 +72,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
* @param parent
* the parent of this channel. {@code null} if there's no parent.
*/
protected AbstractChannel(Channel parent, EventLoop eventLoop) {
protected AbstractChannel(Channel parent) {
this.parent = parent;
this.eventLoop = validate(eventLoop);
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
}
@ -107,6 +106,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override
public EventLoop eventLoop() {
EventLoop eventLoop = this.eventLoop;
if (eventLoop == null) {
throw new IllegalStateException("channel not registered to an event loop");
}
return eventLoop;
}
@ -179,6 +182,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return pipeline.close();
}
@Override
public ChannelFuture deregister() {
return pipeline.deregister();
}
@Override
public Channel flush() {
pipeline.flush();
@ -210,6 +218,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return pipeline.close(promise);
}
@Override
public ChannelFuture deregister(ChannelPromise promise) {
return pipeline.deregister(promise);
}
@Override
public Channel read() {
pipeline.read();
@ -375,7 +388,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override
public final ChannelHandlerInvoker invoker() {
return eventLoop.asInvoker();
return eventLoop().asInvoker();
}
@Override
@ -394,7 +407,22 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
@Override
public final void register(final ChannelPromise promise) {
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(new IllegalStateException("incompatible event loop type: " +
eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
@ -552,7 +580,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
});
}
deregister();
deregister(voidPromise());
}
}
@ -565,8 +593,14 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
private void deregister() {
@Override
public final void deregister(final ChannelPromise promise) {
if (!promise.setUncancellable()) {
return;
}
if (!registered) {
safeSetSuccess(promise);
return;
}
@ -577,6 +611,18 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} finally {
if (registered) {
registered = false;
invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireChannelUnregistered();
}
});
safeSetSuccess(promise);
} else {
// Some transports like local and AIO does not allow the deregistration of
// an open channel. Their doDeregister() calls close(). Consequently,
// close() calls deregister() again - no need to fire channelUnregistered.
safeSetSuccess(promise);
}
}
}
@ -718,16 +764,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
private EventLoop validate(EventLoop eventLoop) {
if (eventLoop == null) {
throw new IllegalStateException("null event loop");
}
if (!isCompatible(eventLoop)) {
throw new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName());
}
return eventLoop;
}
/**
* Create a new {@link ChannelOutboundBuffer} which holds the pending messages for this {@link AbstractChannel}.
*/

View File

@ -34,14 +34,11 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private final EventLoopGroup childGroup;
/**
* Creates a new instance.
*/
protected AbstractServerChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
super(null, eventLoop);
this.childGroup = childGroup;
protected AbstractServerChannel() {
super(null);
}
@Override
@ -74,11 +71,6 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
throw new UnsupportedOperationException();
}
@Override
public EventLoopGroup childEventLoopGroup() {
return childGroup;
}
private final class DefaultServerUnsafe extends AbstractUnsafe {
@Override
public void write(Object msg, ChannelPromise promise) {

View File

@ -280,6 +280,19 @@ public interface Channel extends AttributeMap, Comparable<Channel> {
*/
ChannelFuture close();
/**
* Request to deregister this {@link Channel} from the previous assigned {@link EventLoop} and notify the
* {@link ChannelFuture} once the operation completes, either because the operation was successful or because of
* an error.
* <p>
* This will result in having the
* {@link ChannelHandler#deregister(ChannelHandlerContext, ChannelPromise)}
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
*
*/
ChannelFuture deregister();
/**
* Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation
* completes, either because the operation was successful or because of an error.
@ -353,6 +366,20 @@ public interface Channel extends AttributeMap, Comparable<Channel> {
*/
ChannelFuture close(ChannelPromise promise);
/**
* Request to deregister this {@link Channel} from the previous assigned {@link EventLoop} and notify the
* {@link ChannelFuture} once the operation completes, either because the operation was successful or because of
* an error.
*
* The given {@link ChannelPromise} will be notified.
* <p>
* This will result in having the
* {@link ChannelHandler#deregister(ChannelHandlerContext, ChannelPromise)}
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelFuture deregister(ChannelPromise promise);
/**
* Request to Read data from the {@link Channel} into the first inbound buffer, triggers an
* {@link ChannelHandler#channelRead(ChannelHandlerContext, Object)} event if data was
@ -406,6 +433,7 @@ public interface Channel extends AttributeMap, Comparable<Channel> {
* <li>{@link #remoteAddress()}</li>
* <li>{@link #closeForcibly()}</li>
* <li>{@link #register(ChannelPromise)}</li>
* <li>{@link #deregister(ChannelPromise)}</li>
* <li>{@link #voidPromise()}</li>
* </ul>
*/
@ -432,7 +460,7 @@ public interface Channel extends AttributeMap, Comparable<Channel> {
* Register the {@link Channel} of the {@link ChannelPromise} and notify
* the {@link ChannelFuture} once the registration was complete.
*/
void register(ChannelPromise promise);
void register(EventLoop eventLoop, ChannelPromise promise);
/**
* Bind the {@link SocketAddress} to the {@link Channel} of the {@link ChannelPromise} and notify
@ -467,6 +495,12 @@ public interface Channel extends AttributeMap, Comparable<Channel> {
*/
void closeForcibly();
/**
* Deregister the {@link Channel} of the {@link ChannelPromise} from {@link EventLoop} and notify the
* {@link ChannelPromise} once the operation was complete.
*/
void deregister(ChannelPromise promise);
/**
* Schedules a read operation that fills the inbound buffer of the first {@link ChannelHandler} in the
* {@link ChannelPipeline}. If there's already a pending read operation, this method does nothing.

View File

@ -196,6 +196,11 @@ public interface ChannelHandler {
*/
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
/**
* The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
*/
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
/**
* The {@link Channel} of the {@link ChannelHandlerContext} is now active
*/
@ -276,6 +281,15 @@ public interface ChannelHandler {
*/
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
* Called once a deregister operation is made from the current registered {@link EventLoop}.
*
* @param ctx the {@link ChannelHandlerContext} for which the close operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error accour
*/
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
* Intercepts {@link ChannelHandlerContext#read()}.
*/

View File

@ -102,6 +102,18 @@ public class ChannelHandlerAdapter implements ChannelHandler {
ctx.fireChannelRegistered();
}
/**
* Calls {@link ChannelHandlerContext#fireChannelRegistered()} to forward
* to the next {@link ChannelHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
/**
* Calls {@link ChannelHandlerContext#fireChannelActive()} to forward
* to the next {@link ChannelHandler} in the {@link ChannelPipeline}.
@ -224,6 +236,18 @@ public class ChannelHandlerAdapter implements ChannelHandler {
ctx.close(promise);
}
/**
* Calls {@link ChannelHandlerContext#deregister(ChannelPromise)} to forward
* to the next {@link ChannelHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
}
/**
* Calls {@link ChannelHandlerContext#read()} to forward
* to the next {@link ChannelHandler} in the {@link ChannelPipeline}.

View File

@ -192,7 +192,10 @@ public class ChannelHandlerAppender extends ChannelHandlerAdapter {
} else {
name = e.name;
}
pipeline.addAfter(ctx.invoker(), oldName, name, e.handler);
// Pass in direct the invoker to eliminate the possibility of an IllegalStateException
// if the Channel is not registered yet.
DefaultChannelHandlerContext context = (DefaultChannelHandlerContext) ctx;
pipeline.addAfter(context.invoker, oldName, name, e.handler);
}
} finally {
if (selfRemoval) {

View File

@ -175,6 +175,15 @@ public interface ChannelHandlerContext extends AttributeMap {
*/
ChannelHandlerContext fireChannelRegistered();
/**
* A {@link Channel} was unregistered from its {@link EventLoop}.
*
* This will result in having the {@link ChannelHandler#channelUnregistered(ChannelHandlerContext)} method
* called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelHandlerContext fireChannelUnregistered();
/**
* A {@link Channel} is active now, which means it is connected.
*
@ -295,6 +304,19 @@ public interface ChannelHandlerContext extends AttributeMap {
*/
ChannelFuture close();
/**
* Request to deregister from the previous assigned {@link EventExecutor} and notify the
* {@link ChannelFuture} once the operation completes, either because the operation was successful or because of
* an error.
* <p>
* This will result in having the
* {@link ChannelHandler#deregister(ChannelHandlerContext, ChannelPromise)}
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
*
*/
ChannelFuture deregister();
/**
* Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation
* completes, either because the operation was successful or because of an error.
@ -368,6 +390,20 @@ public interface ChannelHandlerContext extends AttributeMap {
*/
ChannelFuture close(ChannelPromise promise);
/**
* Request to deregister from the previous assigned {@link EventExecutor} and notify the
* {@link ChannelFuture} once the operation completes, either because the operation was successful or because of
* an error.
*
* The given {@link ChannelPromise} will be notified.
* <p>
* This will result in having the
* {@link ChannelHandler#deregister(ChannelHandlerContext, ChannelPromise)}
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelFuture deregister(ChannelPromise promise);
/**
* Request to Read data from the {@link Channel} into the first inbound buffer, triggers an
* {@link ChannelHandler#channelRead(ChannelHandlerContext, Object)} event if data was

View File

@ -39,6 +39,13 @@ public interface ChannelHandlerInvoker {
*/
void invokeChannelRegistered(ChannelHandlerContext ctx);
/**
* Invokes {@link ChannelHandler#channelUnregistered(ChannelHandlerContext)}. This method is not for a user
* but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeChannelUnregistered(ChannelHandlerContext ctx);
/**
* Invokes {@link ChannelHandler#channelActive(ChannelHandlerContext)}. This method is not for a user
* but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
@ -118,6 +125,13 @@ public interface ChannelHandlerInvoker {
*/
void invokeClose(ChannelHandlerContext ctx, ChannelPromise promise);
/**
* Invokes {@link ChannelHandler#deregister(ChannelHandlerContext, ChannelPromise)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeDeregister(ChannelHandlerContext ctx, ChannelPromise promise);
/**
* Invokes {@link ChannelHandler#read(ChannelHandlerContext)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.

View File

@ -35,6 +35,14 @@ public final class ChannelHandlerInvokerUtil {
}
}
public static void invokeChannelUnregisteredNow(ChannelHandlerContext ctx) {
try {
ctx.handler().channelUnregistered(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeChannelActiveNow(final ChannelHandlerContext ctx) {
try {
ctx.handler().channelActive(ctx);
@ -129,6 +137,14 @@ public final class ChannelHandlerInvokerUtil {
}
}
public static void invokeDeregisterNow(final ChannelHandlerContext ctx, final ChannelPromise promise) {
try {
ctx.handler().deregister(ctx, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
public static void invokeReadNow(final ChannelHandlerContext ctx) {
try {
ctx.handler().read(ctx);

View File

@ -17,6 +17,7 @@ package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import java.net.ConnectException;
@ -661,6 +662,15 @@ public interface ChannelPipeline extends Iterable<Entry<String, ChannelHandler>>
*/
ChannelPipeline fireChannelRegistered();
/**
* A {@link Channel} was unregistered from its {@link EventLoop}.
*
* This will result in having the {@link ChannelHandler#channelUnregistered(ChannelHandlerContext)} method
* called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelPipeline fireChannelUnregistered();
/**
* A {@link Channel} is active now, which means it is connected.
*
@ -781,6 +791,19 @@ public interface ChannelPipeline extends Iterable<Entry<String, ChannelHandler>>
*/
ChannelFuture close();
/**
* Request to deregister the {@link Channel} from the previous assigned {@link EventExecutor} and notify the
* {@link ChannelFuture} once the operation completes, either because the operation was successful or because of
* an error.
* <p>
* This will result in having the
* {@link ChannelHandler#deregister(ChannelHandlerContext, ChannelPromise)}
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
*
*/
ChannelFuture deregister();
/**
* Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation
* completes, either because the operation was successful or because of an error.
@ -854,6 +877,20 @@ public interface ChannelPipeline extends Iterable<Entry<String, ChannelHandler>>
*/
ChannelFuture close(ChannelPromise promise);
/**
* Request to deregister the {@link Channel} bound this {@link ChannelPipeline} from the previous assigned
* {@link EventExecutor} and notify the {@link ChannelFuture} once the operation completes, either because the
* operation was successful or because of an error.
*
* The given {@link ChannelPromise} will be notified.
* <p>ChannelOutboundHandler
* This will result in having the
* {@link ChannelHandler#deregister(ChannelHandlerContext, ChannelPromise)}
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelFuture deregister(ChannelPromise promise);
/**
* Request to Read data from the {@link Channel} into the first inbound buffer, triggers an
* {@link ChannelHandler#channelRead(ChannelHandlerContext, Object)} event if data was

View File

@ -38,19 +38,21 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
static final int MASK_HANDLER_REMOVED = 1 << 1;
private static final int MASK_EXCEPTION_CAUGHT = 1 << 2;
private static final int MASK_CHANNEL_REGISTERED = 1 << 3;
private static final int MASK_CHANNEL_ACTIVE = 1 << 4;
private static final int MASK_CHANNEL_INACTIVE = 1 << 5;
private static final int MASK_CHANNEL_READ = 1 << 6;
private static final int MASK_CHANNEL_READ_COMPLETE = 1 << 7;
private static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
private static final int MASK_USER_EVENT_TRIGGERED = 1 << 9;
private static final int MASK_BIND = 1 << 10;
private static final int MASK_CONNECT = 1 << 11;
private static final int MASK_DISCONNECT = 1 << 12;
private static final int MASK_CLOSE = 1 << 13;
private static final int MASK_READ = 1 << 14;
private static final int MASK_WRITE = 1 << 15;
private static final int MASK_FLUSH = 1 << 16;
private static final int MASK_CHANNEL_UNREGISTERED = 1 << 4;
private static final int MASK_CHANNEL_ACTIVE = 1 << 5;
private static final int MASK_CHANNEL_INACTIVE = 1 << 6;
private static final int MASK_CHANNEL_READ = 1 << 7;
private static final int MASK_CHANNEL_READ_COMPLETE = 1 << 8;
private static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 9;
private static final int MASK_USER_EVENT_TRIGGERED = 1 << 10;
private static final int MASK_BIND = 1 << 11;
private static final int MASK_CONNECT = 1 << 12;
private static final int MASK_DISCONNECT = 1 << 13;
private static final int MASK_CLOSE = 1 << 14;
private static final int MASK_DEREGISTER = 1 << 15;
private static final int MASK_READ = 1 << 16;
private static final int MASK_WRITE = 1 << 17;
private static final int MASK_FLUSH = 1 << 18;
/**
* Cache the result of the costly generation of {@link #skipFlags} in the partitioned synchronized
@ -111,6 +113,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
"channelRegistered", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
flags |= MASK_CHANNEL_REGISTERED;
}
if (handlerType.getMethod(
"channelUnregistered", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
flags |= MASK_CHANNEL_UNREGISTERED;
}
if (handlerType.getMethod(
"channelActive", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
flags |= MASK_CHANNEL_ACTIVE;
@ -153,6 +159,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
"close", ChannelHandlerContext.class, ChannelPromise.class).isAnnotationPresent(Skip.class)) {
flags |= MASK_CLOSE;
}
if (handlerType.getMethod(
"deregister", ChannelHandlerContext.class, ChannelPromise.class).isAnnotationPresent(Skip.class)) {
flags |= MASK_DEREGISTER;
}
if (handlerType.getMethod(
"read", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
flags |= MASK_READ;
@ -215,14 +225,9 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
this.pipeline = pipeline;
this.name = name;
this.handler = handler;
this.invoker = invoker;
skipFlags = skipFlags(handler);
if (invoker == null) {
this.invoker = channel.unsafe().invoker();
} else {
this.invoker = invoker;
}
}
/** Invocation initiated by {@link DefaultChannelPipeline#teardownAll()}}. */
@ -267,11 +272,14 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
@Override
public EventExecutor executor() {
return invoker.executor();
return invoker().executor();
}
@Override
public ChannelHandlerInvoker invoker() {
if (invoker == null) {
return channel.unsafe().invoker();
}
return invoker;
}
@ -293,35 +301,42 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
@Override
public ChannelHandlerContext fireChannelRegistered() {
DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_REGISTERED);
next.invoker.invokeChannelRegistered(next);
next.invoker().invokeChannelRegistered(next);
return this;
}
@Override
public ChannelHandlerContext fireChannelUnregistered() {
DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_UNREGISTERED);
next.invoker().invokeChannelUnregistered(next);
return this;
}
@Override
public ChannelHandlerContext fireChannelActive() {
DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_ACTIVE);
next.invoker.invokeChannelActive(next);
next.invoker().invokeChannelActive(next);
return this;
}
@Override
public ChannelHandlerContext fireChannelInactive() {
DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_INACTIVE);
next.invoker.invokeChannelInactive(next);
next.invoker().invokeChannelInactive(next);
return this;
}
@Override
public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
DefaultChannelHandlerContext next = findContextInbound(MASK_EXCEPTION_CAUGHT);
next.invoker.invokeExceptionCaught(next, cause);
next.invoker().invokeExceptionCaught(next, cause);
return this;
}
@Override
public ChannelHandlerContext fireUserEventTriggered(Object event) {
DefaultChannelHandlerContext next = findContextInbound(MASK_USER_EVENT_TRIGGERED);
next.invoker.invokeUserEventTriggered(next, event);
next.invoker().invokeUserEventTriggered(next, event);
return this;
}
@ -329,21 +344,21 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
public ChannelHandlerContext fireChannelRead(Object msg) {
DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ);
ReferenceCountUtil.touch(msg, next);
next.invoker.invokeChannelRead(next, msg);
next.invoker().invokeChannelRead(next, msg);
return this;
}
@Override
public ChannelHandlerContext fireChannelReadComplete() {
DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ_COMPLETE);
next.invoker.invokeChannelReadComplete(next);
next.invoker().invokeChannelReadComplete(next);
return this;
}
@Override
public ChannelHandlerContext fireChannelWritabilityChanged() {
DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED);
next.invoker.invokeChannelWritabilityChanged(next);
next.invoker().invokeChannelWritabilityChanged(next);
return this;
}
@ -372,10 +387,15 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
return close(newPromise());
}
@Override
public ChannelFuture deregister() {
return deregister(newPromise());
}
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
DefaultChannelHandlerContext next = findContextOutbound(MASK_BIND);
next.invoker.invokeBind(next, localAddress, promise);
next.invoker().invokeBind(next, localAddress, promise);
return promise;
}
@ -387,7 +407,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
DefaultChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
next.invoker.invokeConnect(next, remoteAddress, localAddress, promise);
next.invoker().invokeConnect(next, remoteAddress, localAddress, promise);
return promise;
}
@ -398,21 +418,28 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
}
DefaultChannelHandlerContext next = findContextOutbound(MASK_DISCONNECT);
next.invoker.invokeDisconnect(next, promise);
next.invoker().invokeDisconnect(next, promise);
return promise;
}
@Override
public ChannelFuture close(ChannelPromise promise) {
DefaultChannelHandlerContext next = findContextOutbound(MASK_CLOSE);
next.invoker.invokeClose(next, promise);
next.invoker().invokeClose(next, promise);
return promise;
}
@Override
public ChannelFuture deregister(ChannelPromise promise) {
DefaultChannelHandlerContext next = findContextOutbound(MASK_DEREGISTER);
next.invoker().invokeDeregister(next, promise);
return promise;
}
@Override
public ChannelHandlerContext read() {
DefaultChannelHandlerContext next = findContextOutbound(MASK_READ);
next.invoker.invokeRead(next);
next.invoker().invokeRead(next);
return this;
}
@ -425,14 +452,14 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
public ChannelFuture write(Object msg, ChannelPromise promise) {
DefaultChannelHandlerContext next = findContextOutbound(MASK_WRITE);
ReferenceCountUtil.touch(msg, next);
next.invoker.invokeWrite(next, msg, promise);
next.invoker().invokeWrite(next, msg, promise);
return promise;
}
@Override
public ChannelHandlerContext flush() {
DefaultChannelHandlerContext next = findContextOutbound(MASK_FLUSH);
next.invoker.invokeFlush(next);
next.invoker().invokeFlush(next);
return this;
}
@ -441,9 +468,9 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
DefaultChannelHandlerContext next;
next = findContextOutbound(MASK_WRITE);
ReferenceCountUtil.touch(msg, next);
next.invoker.invokeWrite(next, msg, promise);
next.invoker().invokeWrite(next, msg, promise);
next = findContextOutbound(MASK_FLUSH);
next.invoker.invokeFlush(next);
next.invoker().invokeFlush(next);
return promise;
}

View File

@ -57,6 +57,20 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
}
}
@Override
public void invokeChannelUnregistered(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeChannelUnregisteredNow(ctx);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
invokeChannelUnregisteredNow(ctx);
}
});
}
}
@Override
public void invokeChannelActive(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
@ -269,6 +283,25 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
}
}
@Override
public void invokeDeregister(final ChannelHandlerContext ctx, final ChannelPromise promise) {
if (!validatePromise(ctx, promise, false)) {
// promise cancelled
return;
}
if (executor.inEventLoop()) {
invokeDeregisterNow(ctx, promise);
} else {
safeExecuteOutbound(new OneTimeTask() {
@Override
public void run() {
invokeDeregisterNow(ctx, promise);
}
}, promise);
}
}
@Override
public void invokeRead(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {

View File

@ -550,6 +550,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
try {
ctx.handler().handlerAdded(ctx);
} catch (Throwable t) {
t.printStackTrace();
boolean removed = false;
try {
remove(ctx);
@ -802,6 +803,17 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return this;
}
@Override
public ChannelPipeline fireChannelUnregistered() {
head.fireChannelUnregistered();
// Remove all handlers sequentially if channel is closed and unregistered.
if (!channel.isOpen()) {
teardownAll();
}
return this;
}
/**
* Removes all handlers from the pipeline one by one from tail (exclusive) to head (inclusive) to trigger
* handlerRemoved(). Note that the tail handler is excluded because it's neither an outbound handler nor it
@ -825,7 +837,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline fireChannelInactive() {
head.fireChannelInactive();
teardownAll();
return this;
}
@ -887,6 +898,11 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return tail.close();
}
@Override
public ChannelFuture deregister() {
return tail.deregister();
}
@Override
public ChannelPipeline flush() {
tail.flush();
@ -918,6 +934,11 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return tail.close(promise);
}
@Override
public ChannelFuture deregister(ChannelPromise promise) {
return tail.close(promise);
}
@Override
public ChannelPipeline read() {
tail.read();
@ -983,6 +1004,9 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { }
@ -1050,6 +1074,11 @@ final class DefaultChannelPipeline implements ChannelPipeline {
unsafe.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.deregister(promise);
}
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();

View File

@ -25,4 +25,16 @@ import io.netty.util.concurrent.EventExecutorGroup;
public interface EventLoopGroup extends EventExecutorGroup {
@Override
EventLoop next();
/**
* Register a {@link Channel} with this {@link EventLoop}. The returned {@link ChannelFuture}
* will get notified once the registration was complete.
*/
ChannelFuture register(Channel channel);
/**
* Register a {@link Channel} with this {@link EventLoop}. The passed {@link ChannelFuture}
* will get notified once the registration was complete and also will get returned.
*/
ChannelFuture register(Channel channel, ChannelPromise promise);
}

View File

@ -69,5 +69,12 @@ public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutor
}
@Override
protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return next().register(channel, promise);
}
}

View File

@ -18,9 +18,8 @@ package io.netty.channel;
import io.netty.channel.socket.ServerSocketChannel;
/**
* A {@link Channel} that accepts an incoming connection attempt and creates its child {@link Channel}s by accepting
* them. {@link ServerSocketChannel} is a good example.
* A {@link Channel} that accepts an incoming connection attempt and creates
* its child {@link Channel}s by accepting them. {@link ServerSocketChannel} is
* a good example.
*/
public interface ServerChannel extends Channel {
EventLoopGroup childEventLoopGroup();
}
public interface ServerChannel extends Channel { }

View File

@ -51,6 +51,24 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
return invoker;
}
@Override
public ChannelFuture register(Channel channel) {
return register(channel, new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (promise == null) {
throw new NullPointerException("promise");
}
channel.unsafe().register(this, promise);
return promise;
}
@Override
protected boolean wakesUpForTask(Runnable task) {
return !(task instanceof NonWakeupRunnable);

View File

@ -30,6 +30,21 @@ public class ThreadPerChannelEventLoop extends SingleThreadEventLoop {
this.parent = parent;
}
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return super.register(channel, promise).addListener(new ChannelFutureListener() {
@Override
@SuppressWarnings("unchecked")
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ch = future.channel();
} else {
deregister();
}
}
});
}
@Override
protected void run() {
for (;;) {

View File

@ -16,6 +16,7 @@
package io.netty.channel;
import io.netty.util.concurrent.AbstractEventExecutorGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
@ -39,13 +40,14 @@ import java.util.concurrent.TimeUnit;
/**
* An {@link EventLoopGroup} that creates one {@link EventLoop} per {@link Channel}.
*/
public class ThreadPerChannelEventLoopGroup extends AbstractEventLoopGroup {
public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup {
private final Object[] childArgs;
private final int maxChannels;
final Executor executor;
final Set<EventLoop> activeChildren =
Collections.newSetFromMap(PlatformDependent.<EventLoop, Boolean>newConcurrentHashMap());
private final Set<EventLoop> readOnlyActiveChildren = Collections.unmodifiableSet(activeChildren);
final Queue<EventLoop> idleChildren = new ConcurrentLinkedQueue<EventLoop>();
private final ChannelException tooManyChannels;
@ -73,7 +75,9 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventLoopGroup {
*
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException}. Use {@code 0} to use no limit
* {@link ChannelException}. on the {@link #register(Channel)} and
* {@link #register(Channel, ChannelPromise)} method.
* Use {@code 0} to use no limit
*/
protected ThreadPerChannelEventLoopGroup(int maxChannels) {
this(maxChannels, Executors.defaultThreadFactory());
@ -84,7 +88,9 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventLoopGroup {
*
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException}. Use {@code 0} to use no limit
* {@link ChannelException} on the {@link #register(Channel)} and
* {@link #register(Channel, ChannelPromise)} method.
* Use {@code 0} to use no limit
* @param threadFactory the {@link ThreadFactory} used to create new {@link Thread} instances that handle the
* registered {@link Channel}s
* @param args arguments which will passed to each {@link #newChild(Object...)} call.
@ -98,7 +104,9 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventLoopGroup {
*
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException}. Use {@code 0} to use no limit
* {@link ChannelException} on the {@link #register(Channel)} and
* {@link #register(Channel, ChannelPromise)} method.
* Use {@code 0} to use no limit
* @param executor the {@link Executor} used to create new {@link Thread} instances that handle the
* registered {@link Channel}s
* @param args arguments which will passed to each {@link #newChild(Object...)} call.
@ -126,34 +134,21 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventLoopGroup {
}
/**
* Creates a new {@link EventLoop}.
* Creates a new {@link EventLoop}. The default implementation creates a new {@link ThreadPerChannelEventLoop}.
*/
protected EventLoop newChild(@SuppressWarnings("UnusedParameters") Object... args) {
protected EventLoop newChild(@SuppressWarnings("UnusedParameters") Object... args) throws Exception {
return new ThreadPerChannelEventLoop(this);
}
@Override
@SuppressWarnings("unchecked")
public <E extends EventExecutor> Set<E> children() {
return Collections.unmodifiableSet((Set<E>) activeChildren);
return (Set<E>) readOnlyActiveChildren;
}
@Override
public EventLoop next() {
if (shuttingDown) {
throw new RejectedExecutionException("shutting down");
}
EventLoop loop = idleChildren.poll();
if (loop == null) {
if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
throw tooManyChannels;
}
loop = newChild(childArgs);
loop.terminationFuture().addListener(childTerminationListener);
}
activeChildren.add(loop);
return loop;
throw new UnsupportedOperationException();
}
@Override
@ -271,4 +266,47 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventLoopGroup {
}
return isTerminated();
}
@Override
public ChannelFuture register(Channel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
try {
EventLoop l = nextChild();
return l.register(channel, new DefaultChannelPromise(channel, l));
} catch (Throwable t) {
return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);
}
}
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
if (channel == null) {
throw new NullPointerException("channel");
}
try {
return nextChild().register(channel, promise);
} catch (Throwable t) {
promise.setFailure(t);
return promise;
}
}
private EventLoop nextChild() throws Exception {
if (shuttingDown) {
throw new RejectedExecutionException("shutting down");
}
EventLoop loop = idleChildren.poll();
if (loop == null) {
if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
throw tooManyChannels;
}
loop = newChild(childArgs);
loop.terminationFuture().addListener(childTerminationListener);
}
activeChildren.add(loop);
return loop;
}
}

View File

@ -1,172 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.StringUtil;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
/**
* A {@link Channel} that represents a non-existing {@link Channel} which could not be instantiated successfully.
*/
public final class VoidChannel extends AbstractChannel {
public static final VoidChannel INSTANCE = new VoidChannel();
private VoidChannel() {
super(null, new AbstractEventLoop(null) {
private final ChannelHandlerInvoker invoker =
new DefaultChannelHandlerInvoker(GlobalEventExecutor.INSTANCE);
@Override
@Deprecated
public void shutdown() {
GlobalEventExecutor.INSTANCE.shutdown();
}
@Override
public ChannelHandlerInvoker asInvoker() {
return invoker;
}
@Override
public boolean inEventLoop(Thread thread) {
return GlobalEventExecutor.INSTANCE.inEventLoop(thread);
}
@Override
public boolean isShuttingDown() {
return GlobalEventExecutor.INSTANCE.isShuttingDown();
}
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
return GlobalEventExecutor.INSTANCE.shutdownGracefully(quietPeriod, timeout, unit);
}
@Override
public Future<?> terminationFuture() {
return GlobalEventExecutor.INSTANCE.terminationFuture();
}
@Override
public boolean isShutdown() {
return GlobalEventExecutor.INSTANCE.isShutdown();
}
@Override
public boolean isTerminated() {
return GlobalEventExecutor.INSTANCE.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return GlobalEventExecutor.INSTANCE.awaitTermination(timeout, unit);
}
@Override
public void execute(Runnable command) {
GlobalEventExecutor.INSTANCE.execute(command);
}
});
}
@Override
protected AbstractUnsafe newUnsafe() {
return new AbstractUnsafe() {
@Override
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
reject();
}
};
}
@Override
protected boolean isCompatible(EventLoop loop) {
return true;
}
@Override
protected SocketAddress localAddress0() {
return reject();
}
@Override
protected SocketAddress remoteAddress0() {
return reject();
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
reject();
}
@Override
protected void doDisconnect() throws Exception {
reject();
}
@Override
protected void doClose() throws Exception {
reject();
}
@Override
protected void doBeginRead() throws Exception {
reject();
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
reject();
}
@Override
public ChannelConfig config() {
return reject();
}
@Override
public boolean isOpen() {
return reject();
}
@Override
public boolean isActive() {
return reject();
}
@Override
public ChannelMetadata metadata() {
return reject();
}
@Override
public String toString() {
return StringUtil.simpleClassName(this);
}
private static <T> T reject() {
throw new UnsupportedOperationException(
StringUtil.simpleClassName(VoidChannel.class) +
" is only for the representation of a non-existing " +
StringUtil.simpleClassName(Channel.class) + '.');
}
}

View File

@ -54,7 +54,7 @@ public class EmbeddedChannel extends AbstractChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private final EmbeddedEventLoop loop;
private final EmbeddedEventLoop loop = new EmbeddedEventLoop();
private final ChannelConfig config = new DefaultChannelConfig(this);
private final Queue<Object> inboundMessages = new ArrayDeque<Object>();
private final Queue<Object> outboundMessages = new ArrayDeque<Object>();
@ -74,9 +74,7 @@ public class EmbeddedChannel extends AbstractChannel {
* @param handlers the @link ChannelHandler}s which will be add in the {@link ChannelPipeline}
*/
public EmbeddedChannel(ChannelHandler... handlers) {
super(null, new EmbeddedEventLoop());
loop = (EmbeddedEventLoop) eventLoop();
super(null);
if (handlers == null) {
throw new NullPointerException("handlers");
@ -91,7 +89,7 @@ public class EmbeddedChannel extends AbstractChannel {
}
p.addLast(new LastInboundHandler());
unsafe().register(newPromise());
loop.register(this);
}
@Override

View File

@ -16,6 +16,8 @@
package io.netty.channel.embedded;
import io.netty.channel.AbstractEventLoop;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerInvoker;
import io.netty.channel.ChannelPromise;
@ -92,6 +94,17 @@ final class EmbeddedEventLoop extends AbstractEventLoop implements ChannelHandle
return false;
}
@Override
public ChannelFuture register(Channel channel) {
return register(channel, channel.newPromise());
}
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
channel.unsafe().register(this, promise);
return promise;
}
@Override
public boolean inEventLoop() {
return true;
@ -117,6 +130,11 @@ final class EmbeddedEventLoop extends AbstractEventLoop implements ChannelHandle
invokeChannelRegisteredNow(ctx);
}
@Override
public void invokeChannelUnregistered(ChannelHandlerContext ctx) {
invokeChannelUnregisteredNow(ctx);
}
@Override
public void invokeChannelActive(ChannelHandlerContext ctx) {
invokeChannelActiveNow(ctx);
@ -174,6 +192,11 @@ final class EmbeddedEventLoop extends AbstractEventLoop implements ChannelHandle
invokeCloseNow(ctx, promise);
}
@Override
public void invokeDeregister(ChannelHandlerContext ctx, ChannelPromise promise) {
invokeDeregisterNow(ctx, promise);
}
@Override
public void invokeRead(ChannelHandlerContext ctx) {
invokeReadNow(ctx);

View File

@ -87,12 +87,12 @@ public class LocalChannel extends AbstractChannel {
private volatile boolean readInProgress;
private volatile boolean registerInProgress;
public LocalChannel(EventLoop eventLoop) {
super(null, eventLoop);
public LocalChannel() {
super(null);
}
LocalChannel(LocalServerChannel parent, EventLoop eventLoop, LocalChannel peer) {
super(parent, eventLoop);
LocalChannel(LocalServerChannel parent, LocalChannel peer) {
super(parent);
this.peer = peer;
localAddress = parent.localAddress();
remoteAddress = peer.localAddress();

View File

@ -20,7 +20,6 @@ import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.concurrent.SingleThreadEventExecutor;
@ -47,10 +46,6 @@ public class LocalServerChannel extends AbstractServerChannel {
private volatile LocalAddress localAddress;
private volatile boolean acceptInProgress;
public LocalServerChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
super(eventLoop, childGroup);
}
@Override
public ChannelConfig config() {
return config;
@ -136,7 +131,7 @@ public class LocalServerChannel extends AbstractServerChannel {
}
LocalChannel serve(final LocalChannel peer) {
final LocalChannel child = new LocalChannel(this, childEventLoopGroup().next(), peer);
final LocalChannel child = new LocalChannel(this, peer);
if (eventLoop().inEventLoop()) {
serve0(child);
} else {

View File

@ -22,7 +22,6 @@ import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.FileRegion;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent;
@ -44,8 +43,8 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
* @param parent the parent {@link Channel} by which this instance was created. May be {@code null}
* @param ch the underlying {@link SelectableChannel} on which it operates
*/
protected AbstractNioByteChannel(Channel parent, EventLoop eventLoop, SelectableChannel ch) {
super(parent, eventLoop, ch, SelectionKey.OP_READ);
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
@Override

View File

@ -65,8 +65,8 @@ public abstract class AbstractNioChannel extends AbstractChannel {
* @param ch the underlying {@link SelectableChannel} on which it operates
* @param readInterestOp the ops to set to receive data from the {@link SelectableChannel}
*/
protected AbstractNioChannel(Channel parent, EventLoop eventLoop, SelectableChannel ch, int readInterestOp) {
super(parent, eventLoop);
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {

View File

@ -19,7 +19,6 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.ServerChannel;
import java.io.IOException;
@ -33,9 +32,11 @@ import java.util.List;
*/
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
protected AbstractNioMessageChannel(
Channel parent, EventLoop eventLoop, SelectableChannel ch, int readInterestOp) {
super(parent, eventLoop, ch, readInterestOp);
/**
* @see {@link AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)}
*/
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
@Override
@ -169,5 +170,4 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
* @return {@code true} if and only if the message has been written
*/
protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception;
}

View File

@ -1,39 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.nio;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import java.nio.channels.SelectableChannel;
public abstract class AbstractNioMessageServerChannel extends AbstractNioMessageChannel implements ServerChannel {
private final EventLoopGroup childGroup;
protected AbstractNioMessageServerChannel(
Channel parent, EventLoop eventLoop, EventLoopGroup childGroup, SelectableChannel ch, int readInterestOp) {
super(parent, eventLoop, ch, readInterestOp);
this.childGroup = childGroup;
}
@Override
public EventLoopGroup childEventLoopGroup() {
return childGroup;
}
}

View File

@ -22,7 +22,6 @@ import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.FileRegion;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.util.internal.StringUtil;
@ -37,8 +36,11 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
private volatile boolean inputShutdown;
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
protected AbstractOioByteChannel(Channel parent, EventLoop eventLoop) {
super(parent, eventLoop);
/**
* @see AbstractOioByteChannel#AbstractOioByteChannel(Channel)
*/
protected AbstractOioByteChannel(Channel parent) {
super(parent);
}
protected boolean isInputShutdown() {

View File

@ -46,8 +46,11 @@ public abstract class AbstractOioChannel extends AbstractChannel {
}
};
protected AbstractOioChannel(Channel parent, EventLoop eventLoop) {
super(parent, eventLoop);
/**
* @see AbstractChannel#AbstractChannel(Channel)
*/
protected AbstractOioChannel(Channel parent) {
super(parent);
}
@Override

View File

@ -18,8 +18,6 @@ package io.netty.channel.oio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -31,8 +29,8 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
private final List<Object> readBuf = new ArrayList<Object>();
protected AbstractOioMessageChannel(Channel parent, EventLoop eventLoop) {
super(parent, eventLoop);
protected AbstractOioMessageChannel(Channel parent) {
super(parent);
}
@Override

View File

@ -1,36 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.oio;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
public abstract class AbstractOioMessageServerChannel extends AbstractOioMessageChannel implements ServerChannel {
private final EventLoopGroup childGroup;
protected AbstractOioMessageServerChannel(Channel parent, EventLoop eventLoop, EventLoopGroup childGroup) {
super(parent, eventLoop);
this.childGroup = childGroup;
}
@Override
public EventLoopGroup childEventLoopGroup() {
return childGroup;
}
}

View File

@ -17,7 +17,6 @@ package io.netty.channel.oio;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.FileRegion;
import java.io.IOException;
@ -57,8 +56,8 @@ public abstract class OioByteStreamChannel extends AbstractOioByteChannel {
* @param parent the parent {@link Channel} which was used to create this instance. This can be null if the
* {@link} has no parent as it was created by your self.
*/
protected OioByteStreamChannel(Channel parent, EventLoop eventLoop) {
super(parent, eventLoop);
protected OioByteStreamChannel(Channel parent) {
super(parent);
}
/**

View File

@ -25,7 +25,6 @@ import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.nio.AbstractNioMessageChannel;
import io.netty.channel.socket.DatagramChannelConfig;
@ -106,40 +105,27 @@ public final class NioDatagramChannel
/**
* Create a new instance which will use the Operation Systems default {@link InternetProtocolFamily}.
*/
public NioDatagramChannel(EventLoop eventLoop) {
this(eventLoop, newSocket(DEFAULT_SELECTOR_PROVIDER));
}
/**
* Create a new instance using the given {@link SelectorProvider}
* which will use the Operation Systems default {@link InternetProtocolFamily}.
*/
public NioDatagramChannel(EventLoop eventLoop, SelectorProvider provider) {
this(eventLoop, newSocket(provider));
public NioDatagramChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
/**
* Create a new instance using the given {@link InternetProtocolFamily}. If {@code null} is used it will depend
* on the Operation Systems default which will be chosen.
*/
public NioDatagramChannel(EventLoop eventLoop, InternetProtocolFamily ipFamily) {
this(eventLoop, newSocket(DEFAULT_SELECTOR_PROVIDER, ipFamily));
public NioDatagramChannel(InternetProtocolFamily ipFamily) {
this(DEFAULT_SELECTOR_PROVIDER, ipFamily);
}
/**
* Create a new instance using the given {@link SelectorProvider} and {@link InternetProtocolFamily}.
* If {@link InternetProtocolFamily} is {@code null} it will depend on the Operation Systems default
* which will be chosen.
*/
public NioDatagramChannel(EventLoop eventLoop, SelectorProvider provider, InternetProtocolFamily ipFamily) {
this(eventLoop, newSocket(provider, ipFamily));
public NioDatagramChannel(SelectorProvider provider, InternetProtocolFamily ipFamily) {
this(newSocket(provider, ipFamily));
}
/**
* Create a new instance from the given {@link DatagramChannel}.
*/
public NioDatagramChannel(EventLoop eventLoop, DatagramChannel socket) {
super(null, eventLoop, socket, SelectionKey.OP_READ);
public NioDatagramChannel(DatagramChannel socket) {
super(null, socket, SelectionKey.OP_READ);
config = new NioDatagramChannelConfig(this, socket);
}

View File

@ -18,9 +18,7 @@ package io.netty.channel.socket.nio;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.AbstractNioMessageServerChannel;
import io.netty.channel.nio.AbstractNioMessageChannel;
import io.netty.channel.socket.DefaultServerSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannelConfig;
import io.netty.util.internal.logging.InternalLogger;
@ -40,7 +38,7 @@ import java.util.List;
* A {@link io.netty.channel.socket.ServerSocketChannel} implementation which uses
* NIO selector based implementation to accept new connections.
*/
public class NioServerSocketChannel extends AbstractNioMessageServerChannel
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
@ -68,22 +66,12 @@ public class NioServerSocketChannel extends AbstractNioMessageServerChannel
/**
* Create a new instance
*/
public NioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
this(eventLoop, childGroup, newSocket(DEFAULT_SELECTOR_PROVIDER));
public NioServerSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}
/**
* Create a new instance using the given {@link SelectorProvider}.
*/
public NioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup, SelectorProvider provider) {
this(eventLoop, childGroup, newSocket(provider));
}
/**
* Create a new instance using the given {@link ServerSocketChannel}.
*/
public NioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup, ServerSocketChannel channel) {
super(null, eventLoop, childGroup, channel, SelectionKey.OP_ACCEPT);
public NioServerSocketChannel(SelectorProvider provider) {
super(null, newSocket(provider), SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
@ -138,7 +126,7 @@ public class NioServerSocketChannel extends AbstractNioMessageServerChannel
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {

View File

@ -66,32 +66,31 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
/**
* Create a new instance
*/
public NioSocketChannel(EventLoop eventLoop) {
this(eventLoop, newSocket(DEFAULT_SELECTOR_PROVIDER));
public NioSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}
/**
* Create a new instance using the given {@link SelectorProvider}.
*/
public NioSocketChannel(EventLoop eventLoop, SelectorProvider provider) {
this(eventLoop, newSocket(provider));
public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
/**
* Create a new instance using the given {@link SocketChannel}.
*/
public NioSocketChannel(EventLoop eventLoop, SocketChannel socket) {
this(null, eventLoop, socket);
public NioSocketChannel(SocketChannel socket) {
this(null, socket);
}
/**
* Create a new instance
*
* @param parent the {@link Channel} which created this instance or {@code null} if it was created by the user
* @param socket the {@link SocketChannel} which will be used
*/
public NioSocketChannel(Channel parent, EventLoop eventLoop, SocketChannel socket) {
super(parent, eventLoop, socket);
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}

View File

@ -25,7 +25,6 @@ import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.oio.AbstractOioMessageChannel;
import io.netty.channel.socket.DatagramChannel;
@ -56,7 +55,8 @@ import java.util.Locale;
* @see AddressedEnvelope
* @see DatagramPacket
*/
public final class OioDatagramChannel extends AbstractOioMessageChannel implements DatagramChannel {
public class OioDatagramChannel extends AbstractOioMessageChannel
implements DatagramChannel {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioDatagramChannel.class);
@ -79,8 +79,8 @@ public final class OioDatagramChannel extends AbstractOioMessageChannel implemen
/**
* Create a new instance with an new {@link MulticastSocket}.
*/
public OioDatagramChannel(EventLoop eventLoop) {
this(eventLoop, newSocket());
public OioDatagramChannel() {
this(newSocket());
}
/**
@ -88,8 +88,8 @@ public final class OioDatagramChannel extends AbstractOioMessageChannel implemen
*
* @param socket the {@link MulticastSocket} which is used by this instance
*/
public OioDatagramChannel(EventLoop eventLoop, MulticastSocket socket) {
super(null, eventLoop);
public OioDatagramChannel(MulticastSocket socket) {
super(null);
boolean success = false;
try {

View File

@ -18,9 +18,7 @@ package io.netty.channel.socket.oio;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.oio.AbstractOioMessageServerChannel;
import io.netty.channel.oio.AbstractOioMessageChannel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -40,7 +38,8 @@ import java.util.concurrent.locks.ReentrantLock;
*
* This implementation use Old-Blocking-IO.
*/
public class OioServerSocketChannel extends AbstractOioMessageServerChannel implements ServerSocketChannel {
public class OioServerSocketChannel extends AbstractOioMessageChannel
implements ServerSocketChannel {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(OioServerSocketChannel.class);
@ -62,8 +61,8 @@ public class OioServerSocketChannel extends AbstractOioMessageServerChannel impl
/**
* Create a new instance with an new {@link Socket}
*/
public OioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
this(eventLoop, childGroup, newServerSocket());
public OioServerSocketChannel() {
this(newServerSocket());
}
/**
@ -71,8 +70,8 @@ public class OioServerSocketChannel extends AbstractOioMessageServerChannel impl
*
* @param socket the {@link ServerSocket} which is used by this instance
*/
public OioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup, ServerSocket socket) {
super(null, eventLoop, childGroup);
public OioServerSocketChannel(ServerSocket socket) {
super(null);
if (socket == null) {
throw new NullPointerException("socket");
}
@ -155,7 +154,7 @@ public class OioServerSocketChannel extends AbstractOioMessageServerChannel impl
Socket s = socket.accept();
try {
if (s != null) {
buf.add(new OioSocketChannel(this, childEventLoopGroup().next(), s));
buf.add(new OioSocketChannel(this, s));
return 1;
}
} catch (Throwable t) {

View File

@ -49,8 +49,8 @@ public class OioSocketChannel extends OioByteStreamChannel
/**
* Create a new instance with an new {@link Socket}
*/
public OioSocketChannel(EventLoop eventLoop) {
this(eventLoop, new Socket());
public OioSocketChannel() {
this(new Socket());
}
/**
@ -58,8 +58,8 @@ public class OioSocketChannel extends OioByteStreamChannel
*
* @param socket the {@link Socket} which is used by this instance
*/
public OioSocketChannel(EventLoop eventLoop, Socket socket) {
this(null, eventLoop, socket);
public OioSocketChannel(Socket socket) {
this(null, socket);
}
/**
@ -69,8 +69,8 @@ public class OioSocketChannel extends OioByteStreamChannel
* {@link} has no parent as it was created by your self.
* @param socket the {@link Socket} which is used by this instance
*/
public OioSocketChannel(Channel parent, EventLoop eventLoop, Socket socket) {
super(parent, eventLoop);
public OioSocketChannel(Channel parent, Socket socket) {
super(parent);
this.socket = socket;
config = new DefaultOioSocketChannelConfig(this, socket);

View File

@ -137,7 +137,7 @@ public class DefaultChannelPipelineTest {
@Test
public void testRemoveChannelHandler() {
ChannelPipeline pipeline = new LocalChannel(group.next()).pipeline();
ChannelPipeline pipeline = new LocalChannel().pipeline();
ChannelHandler handler1 = newHandler();
ChannelHandler handler2 = newHandler();
@ -160,7 +160,7 @@ public class DefaultChannelPipelineTest {
@Test
public void testReplaceChannelHandler() {
ChannelPipeline pipeline = new LocalChannel(group.next()).pipeline();
ChannelPipeline pipeline = new LocalChannel().pipeline();
ChannelHandler handler1 = newHandler();
pipeline.addLast("handler1", handler1);
@ -185,7 +185,7 @@ public class DefaultChannelPipelineTest {
@Test
public void testChannelHandlerContextNavigation() {
ChannelPipeline pipeline = new LocalChannel(group.next()).pipeline();
ChannelPipeline pipeline = new LocalChannel().pipeline();
final int HANDLER_ARRAY_LEN = 5;
ChannelHandler[] firstHandlers = newHandlers(HANDLER_ARRAY_LEN);
@ -200,12 +200,11 @@ public class DefaultChannelPipelineTest {
@Test
public void testFireChannelRegistered() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
Channel ch = new LocalChannel(group.next());
ChannelPipeline pipeline = ch.pipeline();
ChannelPipeline pipeline = new LocalChannel().pipeline();
pipeline.addLast(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ChannelHandlerAdapter() {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
latch.countDown();
@ -213,13 +212,13 @@ public class DefaultChannelPipelineTest {
});
}
});
ch.unsafe().register(ch.newPromise());
group.register(pipeline.channel());
assertTrue(latch.await(2, TimeUnit.SECONDS));
}
@Test
public void testPipelineOperation() {
ChannelPipeline pipeline = new LocalChannel(group.next()).pipeline();
ChannelPipeline pipeline = new LocalChannel().pipeline();
final int handlerNum = 5;
ChannelHandler[] handlers1 = newHandlers(handlerNum);
@ -247,7 +246,7 @@ public class DefaultChannelPipelineTest {
@Test
public void testChannelHandlerContextOrder() {
ChannelPipeline pipeline = new LocalChannel(group.next()).pipeline();
ChannelPipeline pipeline = new LocalChannel().pipeline();
pipeline.addFirst("1", newHandler());
pipeline.addLast("10", newHandler());
@ -444,7 +443,8 @@ public class DefaultChannelPipelineTest {
// Tests for https://github.com/netty/netty/issues/2349
@Test
public void testCancelBind() throws Exception {
ChannelPipeline pipeline = new LocalChannel(group.next()).pipeline();
ChannelPipeline pipeline = new LocalChannel().pipeline();
group.register(pipeline.channel());
ChannelPromise promise = pipeline.channel().newPromise();
assertTrue(promise.cancel(false));
@ -454,7 +454,8 @@ public class DefaultChannelPipelineTest {
@Test
public void testCancelConnect() throws Exception {
ChannelPipeline pipeline = new LocalChannel(group.next()).pipeline();
ChannelPipeline pipeline = new LocalChannel().pipeline();
group.register(pipeline.channel());
ChannelPromise promise = pipeline.channel().newPromise();
assertTrue(promise.cancel(false));
@ -464,7 +465,8 @@ public class DefaultChannelPipelineTest {
@Test
public void testCancelDisconnect() throws Exception {
ChannelPipeline pipeline = new LocalChannel(group.next()).pipeline();
ChannelPipeline pipeline = new LocalChannel().pipeline();
group.register(pipeline.channel());
ChannelPromise promise = pipeline.channel().newPromise();
assertTrue(promise.cancel(false));
@ -474,7 +476,8 @@ public class DefaultChannelPipelineTest {
@Test
public void testCancelClose() throws Exception {
ChannelPipeline pipeline = new LocalChannel(group.next()).pipeline();
ChannelPipeline pipeline = new LocalChannel().pipeline();
group.register(pipeline.channel());
ChannelPromise promise = pipeline.channel().newPromise();
assertTrue(promise.cancel(false));
@ -482,9 +485,21 @@ public class DefaultChannelPipelineTest {
assertTrue(future.isCancelled());
}
@Test
public void testCancelDeregister() throws Exception {
ChannelPipeline pipeline = new LocalChannel().pipeline();
group.register(pipeline.channel());
ChannelPromise promise = pipeline.channel().newPromise();
assertTrue(promise.cancel(false));
ChannelFuture future = pipeline.deregister(promise);
assertTrue(future.isCancelled());
}
@Test
public void testCancelWrite() throws Exception {
ChannelPipeline pipeline = new LocalChannel(group.next()).pipeline();
ChannelPipeline pipeline = new LocalChannel().pipeline();
group.register(pipeline.channel());
ChannelPromise promise = pipeline.channel().newPromise();
assertTrue(promise.cancel(false));
@ -497,7 +512,8 @@ public class DefaultChannelPipelineTest {
@Test
public void testCancelWriteAndFlush() throws Exception {
ChannelPipeline pipeline = new LocalChannel(group.next()).pipeline();
ChannelPipeline pipeline = new LocalChannel().pipeline();
group.register(pipeline.channel());
ChannelPromise promise = pipeline.channel().newPromise();
assertTrue(promise.cancel(false));

View File

@ -22,8 +22,8 @@ import java.util.EnumSet;
final class LoggingHandler implements ChannelHandler {
enum Event {
WRITE, FLUSH, BIND, CONNECT, DISCONNECT, CLOSE, READ, WRITABILITY, HANDLER_ADDED,
HANDLER_REMOVED, EXCEPTION, READ_COMPLETE, REGISTERED, ACTIVE, INACTIVE, USER
WRITE, FLUSH, BIND, CONNECT, DISCONNECT, CLOSE, DEREGISTER, READ, WRITABILITY, HANDLER_ADDED,
HANDLER_REMOVED, EXCEPTION, READ_COMPLETE, REGISTERED, UNREGISTERED, ACTIVE, INACTIVE, USER
}
private StringBuilder log = new StringBuilder();
@ -68,6 +68,12 @@ final class LoggingHandler implements ChannelHandler {
ctx.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
log(Event.DEREGISTER);
ctx.deregister(promise);
}
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
log(Event.READ);
@ -101,6 +107,12 @@ final class LoggingHandler implements ChannelHandler {
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
log(Event.UNREGISTERED);
ctx.fireChannelUnregistered();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log(Event.ACTIVE);

View File

@ -339,13 +339,11 @@ public class SingleThreadEventLoopTest {
}
try {
Channel channel = new LocalChannel(loopA);
ChannelPromise f = channel.newPromise();
channel.unsafe().register(f);
ChannelFuture f = loopA.register(new LocalChannel());
f.awaitUninterruptibly();
assertFalse(f.isSuccess());
assertThat(f.cause(), is(instanceOf(RejectedExecutionException.class)));
assertFalse(channel.isOpen());
assertFalse(f.channel().isOpen());
} finally {
for (Appender<ILoggingEvent> a: appenders) {
root.addAppender(a);
@ -358,7 +356,7 @@ public class SingleThreadEventLoopTest {
public void testRegistrationAfterShutdown2() throws Exception {
loopA.shutdown();
final CountDownLatch latch = new CountDownLatch(1);
Channel ch = new LocalChannel(loopA);
Channel ch = new LocalChannel();
ChannelPromise promise = ch.newPromise();
promise.addListener(new ChannelFutureListener() {
@Override
@ -377,10 +375,10 @@ public class SingleThreadEventLoopTest {
}
try {
ch.unsafe().register(promise);
promise.awaitUninterruptibly();
assertFalse(promise.isSuccess());
assertThat(promise.cause(), is(instanceOf(RejectedExecutionException.class)));
ChannelFuture f = loopA.register(ch, promise);
f.awaitUninterruptibly();
assertFalse(f.isSuccess());
assertThat(f.cause(), is(instanceOf(RejectedExecutionException.class)));
// Ensure the listener was notified.
assertFalse(latch.await(1, TimeUnit.SECONDS));

View File

@ -82,7 +82,7 @@ public class ThreadPerChannelEventLoopGroupTest {
ChannelGroup channelGroup = new DefaultChannelGroup(testExecutor);
while (taskCount-- > 0) {
Channel channel = new EmbeddedChannel(NOOP_HANDLER);
channel.unsafe().register(new DefaultChannelPromise(channel, testExecutor));
loopGroup.register(channel, new DefaultChannelPromise(channel, testExecutor));
channelGroup.add(channel);
}
channelGroup.close().sync();

View File

@ -89,7 +89,7 @@ public class LocalTransportThreadModelTest {
ThreadNameAuditor h2 = new ThreadNameAuditor();
ThreadNameAuditor h3 = new ThreadNameAuditor(true);
Channel ch = new LocalChannel(l.next());
Channel ch = new LocalChannel();
// With no EventExecutor specified, h1 will be always invoked by EventLoop 'l'.
ch.pipeline().addLast(h1);
// h2 will be always invoked by EventExecutor 'e1'.
@ -97,9 +97,7 @@ public class LocalTransportThreadModelTest {
// h3 will be always invoked by EventExecutor 'e2'.
ch.pipeline().addLast(e2, h3);
ChannelPromise promise = ch.newPromise();
ch.unsafe().register(promise);
promise.sync().channel().connect(localAddr).sync();
l.register(ch).sync().channel().connect(localAddr).sync();
// Fire inbound events from all possible starting points.
ch.pipeline().fireChannelRead("1");
@ -242,7 +240,7 @@ public class LocalTransportThreadModelTest {
final MessageForwarder2 h5 = new MessageForwarder2();
final MessageDiscarder h6 = new MessageDiscarder();
final Channel ch = new LocalChannel(l.next());
final Channel ch = new LocalChannel();
// inbound: int -> byte[4] -> int -> int -> byte[4] -> int -> /dev/null
// outbound: int -> int -> byte[4] -> int -> int -> byte[4] -> /dev/null
@ -253,9 +251,7 @@ public class LocalTransportThreadModelTest {
.addLast(e4, h5)
.addLast(e5, h6);
ChannelPromise promise = ch.newPromise();
ch.unsafe().register(promise);
promise.sync().channel().connect(localAddr).sync();
l.register(ch).sync().channel().connect(localAddr).sync();
final int ROUNDS = 1024;
final int ELEMS_PER_ROUNDS = 8192;

View File

@ -130,7 +130,7 @@ public class LocalTransportThreadModelTest3 {
final EventForwarder h5 = new EventForwarder();
final EventRecorder h6 = new EventRecorder(events, inbound);
final Channel ch = new LocalChannel(l.next());
final Channel ch = new LocalChannel();
if (!inbound) {
ch.config().setAutoRead(false);
}
@ -141,9 +141,7 @@ public class LocalTransportThreadModelTest3 {
.addLast(e1, h5)
.addLast(e1, "recorder", h6);
ChannelPromise promise = ch.newPromise();
ch.unsafe().register(promise);
promise.sync().channel().connect(localAddr).sync();
l.register(ch).sync().channel().connect(localAddr).sync();
final LinkedList<EventType> expectedEvents = events(inbound, 8192);