diff --git a/transport/src/main/java/io/netty/channel/MultithreadEventExecutorGroup.java b/transport/src/main/java/io/netty/channel/MultithreadEventExecutorGroup.java index 8ea32533b3..4978e7230d 100644 --- a/transport/src/main/java/io/netty/channel/MultithreadEventExecutorGroup.java +++ b/transport/src/main/java/io/netty/channel/MultithreadEventExecutorGroup.java @@ -42,7 +42,7 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou * of {@link #DEFAULT_POOL_SIZE} * @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used. * @param args arguments which will passed to each - * {@link #newChild(java.util.concurrent.ThreadFactory, ChannelTaskScheduler, Object...)} + * {@link #newChild(ThreadFactory, ChannelTaskScheduler, Object...)} * call. */ protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java index 031b517aaf..9594e890c1 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java @@ -29,8 +29,18 @@ import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.WritableByteChannel; +/** + * {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes. + */ abstract class AbstractNioByteChannel extends AbstractNioChannel { + /** + * Create a new instance + * + * @param parent the parent {@link Channel} by which this instance was created. May be {@code null} + * @param id the id of this instance or {@code null} if one should be generated + * @param ch the underlying {@link SelectableChannel} on which it operates + */ protected AbstractNioByteChannel( Channel parent, Integer id, SelectableChannel ch) { super(parent, id, ch, SelectionKey.OP_READ); @@ -205,7 +215,18 @@ abstract class AbstractNioByteChannel extends AbstractNioChannel { } } + /** + * Read bytes into the given {@link ByteBuf} and return the amount. + */ protected abstract int doReadBytes(ByteBuf buf) throws Exception; + + /** + * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}. + * @param buf the {@link ByteBuf} from which the bytes should be written + * @param lastSpin {@code true} if this is the last write try + * @return amount the amount of written bytes + * @throws Exception thrown if an error accour + */ protected abstract int doWriteBytes(ByteBuf buf, boolean lastSpin) throws Exception; // 0 - not expanded because the buffer is writable diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java index 464f9cfd5f..bf1bf0d9e9 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java @@ -34,6 +34,9 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +/** + * Abstract base class for {@link Channel} implementations which use a Selector based approach. + */ public abstract class AbstractNioChannel extends AbstractChannel { private static final InternalLogger logger = @@ -67,6 +70,14 @@ public abstract class AbstractNioChannel extends AbstractChannel { private ScheduledFuture connectTimeoutFuture; private ConnectException connectTimeoutException; + /** + * Create a new instance + * + * @param parent the parent {@link Channel} by which this instance was created. May be {@code null} + * @param id the id of this instance or {@code null} if one should be generated + * @param ch the underlying {@link SelectableChannel} on which it operates + * @param readInterestOp the ops to set to receive data from the {@link SelectableChannel} + */ protected AbstractNioChannel( Channel parent, Integer id, SelectableChannel ch, int readInterestOp) { super(parent, id); @@ -117,19 +128,31 @@ public abstract class AbstractNioChannel extends AbstractChannel { return (NioEventLoop) super.eventLoop(); } + /** + * Return the current {@link SelectionKey} + */ protected SelectionKey selectionKey() { assert selectionKey != null; return selectionKey; } + /** + * Return {@code true} if the input of this {@link Channel} is shutdown + */ boolean isInputShutdown() { return inputShutdown; } + /** + * Shutdown the input of this {@link Channel}. + */ void setInputShutdown() { inputShutdown = true; } + /** + * Special {@link Unsafe} sub-type which allows to access the underlying {@link SelectableChannel} + */ public interface NioUnsafe extends Unsafe { SelectableChannel ch(); void finishConnect(); @@ -254,7 +277,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { @Override protected Runnable doRegister() throws Exception { - NioEventLoop loop = (NioEventLoop) eventLoop(); + NioEventLoop loop = eventLoop(); selectionKey = javaChannel().register( loop.selector, isActive() && !inputShutdown ? readInterestOp : 0, this); return null; @@ -262,9 +285,16 @@ public abstract class AbstractNioChannel extends AbstractChannel { @Override protected void doDeregister() throws Exception { - ((NioEventLoop) eventLoop()).cancel(selectionKey()); + eventLoop().cancel(selectionKey()); } + /** + * Conect to the remote peer + */ protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception; + + /** + * Finish the connect + */ protected abstract void doFinishConnect() throws Exception; } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java index f98ce6c9fa..b374e8c57e 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java @@ -22,15 +22,21 @@ import io.netty.channel.ChannelPipeline; import java.io.IOException; import java.nio.channels.SelectableChannel; +/** + * {@link AbstractNioChannel} base class for {@link Channel}s that operate on messages. + */ abstract class AbstractNioMessageChannel extends AbstractNioChannel { + /** + * @see {@link AbstractNioChannel#AbstractNioChannel(Channel, Integer, SelectableChannel, int)} + */ protected AbstractNioMessageChannel( Channel parent, Integer id, SelectableChannel ch, int readInterestOp) { super(parent, id, ch, readInterestOp); } @Override - protected NioMessageUnsafe newUnsafe() { + protected AbstractNioUnsafe newUnsafe() { return new NioMessageUnsafe(); } @@ -94,6 +100,17 @@ abstract class AbstractNioMessageChannel extends AbstractNioChannel { } } + /** + * Read messages into the given {@link MessageBuf} and return the amount. + */ protected abstract int doReadMessages(MessageBuf buf) throws Exception; + + /** + * Write messages form the given {@link MessageBuf} to the underlying {@link java.nio.channels.Channel}. + * @param buf the {@link MessageBuf} from which the bytes should be written + * @param lastSpin {@code true} if this is the last write try + * @return amount the amount of written bytes + * @throws Exception thrown if an error accour + */ protected abstract int doWriteMessages(MessageBuf buf, boolean lastSpin) throws Exception; } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoopGroup.java index 43d676f375..4062099f20 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoopGroup.java @@ -15,6 +15,7 @@ */ package io.netty.channel.socket.nio; +import io.netty.channel.Channel; import io.netty.channel.ChannelTaskScheduler; import io.netty.channel.EventExecutor; import io.netty.channel.MultithreadEventLoopGroup; @@ -23,20 +24,39 @@ import java.nio.channels.Selector; import java.nio.channels.spi.SelectorProvider; import java.util.concurrent.ThreadFactory; +/** + * {@link MultithreadEventLoopGroup} implementations which is used for NIO {@link Selector} based {@link Channel}s. + */ public class NioEventLoopGroup extends MultithreadEventLoopGroup { + /** + * Create a new instance using {@link #DEFAULT_POOL_SIZE} number of threads, the default {@link ThreadFactory} and + * the {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}. + */ public NioEventLoopGroup() { this(0); } + /** + * Create a new instance using nThreads number of threads, {@link ThreadFactory} and the + * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}. + */ public NioEventLoopGroup(int nThreads) { this(nThreads, null); } + /** + * Create a new instance using nThreads number of threads, the given {@link ThreadFactory} and the + * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}. + */ public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) { - super(nThreads, threadFactory); + this(nThreads, threadFactory, SelectorProvider.provider()); } + /** + * Create a new instance using nThreads number of threads, the given {@link ThreadFactory} and the given + * {@link SelectorProvider}. + */ public NioEventLoopGroup( int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) { super(nThreads, threadFactory, selectorProvider); @@ -55,12 +75,6 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup { @Override protected EventExecutor newChild( ThreadFactory threadFactory, ChannelTaskScheduler scheduler, Object... args) throws Exception { - SelectorProvider selectorProvider; - if (args == null || args.length == 0 || args[0] == null) { - selectorProvider = SelectorProvider.provider(); - } else { - selectorProvider = (SelectorProvider) args[0]; - } - return new NioEventLoop(this, threadFactory, scheduler, selectorProvider); + return new NioEventLoop(this, threadFactory, scheduler, (SelectorProvider) args[0]); } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSctpChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSctpChannel.java index 4e3b9983fa..95d52d3653 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSctpChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSctpChannel.java @@ -44,6 +44,13 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Set; +/** + * {@link io.netty.channel.socket.SctpChannel} implementation which use non-blocking mode and allows to read / write + * {@link SctpMessage}s to the underlying {@link SctpChannel}. + * + * Be aware that not all operations systems support SCTP. Please refer to the documentation of your operation system, + * to understand what you need to do to use it. Also this feature is only supported on Java 7+. + */ public class NioSctpChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.SctpChannel { private static final ChannelMetadata METADATA = new ChannelMetadata(BufType.MESSAGE, false); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSctpServerChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSctpServerChannel.java index 40d95769c2..54eda716b7 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSctpServerChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSctpServerChannel.java @@ -33,6 +33,13 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Set; +/** + * {@link io.netty.channel.socket.SctpServerChannel} implementation which use non-blocking mode to accept new + * connections and create the {@link NioSctpChannel} for them. + * + * Be aware that not all operations systems support SCTP. Please refer to the documentation of your operation system, + * to understand what you need to do to use it. Also this feature is only supported on Java 7+. + */ public class NioSctpServerChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.SctpServerChannel { private static final ChannelMetadata METADATA = new ChannelMetadata(BufType.MESSAGE, false); @@ -48,6 +55,9 @@ public class NioSctpServerChannel extends AbstractNioMessageChannel private final SctpServerChannelConfig config; + /** + * Create a new instance + */ public NioSctpServerChannel() { super(null, null, newSocket(), SelectionKey.OP_ACCEPT); config = new DefaultSctpServerChannelConfig(javaChannel()); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java index 9bfda115c1..d8c8688b32 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java @@ -45,6 +45,9 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel private final ServerSocketChannelConfig config; + /** + * Create a new instance + */ public NioServerSocketChannel() { super(null, null, newSocket(), SelectionKey.OP_ACCEPT); config = new DefaultServerSocketChannelConfig(javaChannel().socket()); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/SelectorUtil.java b/transport/src/main/java/io/netty/channel/socket/nio/SelectorUtil.java index f9320b64c9..e5d43cc348 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/SelectorUtil.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/SelectorUtil.java @@ -24,6 +24,9 @@ import java.nio.channels.CancelledKeyException; import java.nio.channels.Selector; import java.util.concurrent.TimeUnit; +/** + * Utility class for operate on a {@link Selector} + */ final class SelectorUtil { private static final InternalLogger logger = InternalLoggerFactory.getInstance(SelectorUtil.class);