Add javadocs to AIO and also fix a few warnings

This commit is contained in:
Norman Maurer 2012-12-18 15:27:52 +01:00
parent 7b0ec599d6
commit f6735f8cc9
16 changed files with 145 additions and 10 deletions

View File

@ -36,7 +36,7 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
private final AtomicInteger childIndex = new AtomicInteger(); private final AtomicInteger childIndex = new AtomicInteger();
/** /**
* Create a new intance * Create a new instance.
* *
* @param nThreads the number of threads that will be used by this instance. Use 0 for the default number * @param nThreads the number of threads that will be used by this instance. Use 0 for the default number
* of {@link #DEFAULT_POOL_SIZE} * of {@link #DEFAULT_POOL_SIZE}

View File

@ -24,7 +24,7 @@ import java.util.concurrent.ThreadFactory;
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup { public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
/** /**
* @see #MultithreadEventLoopGroup(int, java.util.concurrent.ThreadFactory, Object...) * @see {@link MultithreadEventExecutorGroup##MultithreadEventLoopGroup(int,ThreadFactory, Object...)}
*/ */
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory,
Object... args) { Object... args) {

View File

@ -27,6 +27,10 @@ import java.nio.channels.AsynchronousChannel;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/**
* Abstract base class for {@link Channel} implementations that use the new {@link AsynchronousChannel} which is part
* of NIO.2.
*/
abstract class AbstractAioChannel extends AbstractChannel { abstract class AbstractAioChannel extends AbstractChannel {
protected volatile AsynchronousChannel ch; protected volatile AsynchronousChannel ch;
@ -39,6 +43,18 @@ abstract class AbstractAioChannel extends AbstractChannel {
protected ScheduledFuture<?> connectTimeoutFuture; protected ScheduledFuture<?> connectTimeoutFuture;
private ConnectException connectTimeoutException; private ConnectException connectTimeoutException;
/**
* Creates a new instance.
*
* @param id
* the unique non-negative integer ID of this channel.
* Specify {@code null} to auto-generate a unique negative integer
* ID.
* @param parent
* the parent of this channel. {@code null} if there's no parent.
* @param ch
* the {@link AsynchronousChannel} which will handle the IO or {@code null} if not created yet.
*/
protected AbstractAioChannel(Channel parent, Integer id, AsynchronousChannel ch) { protected AbstractAioChannel(Channel parent, Integer id, AsynchronousChannel ch) {
super(parent, id); super(parent, id);
this.ch = ch; this.ch = ch;
@ -54,6 +70,10 @@ abstract class AbstractAioChannel extends AbstractChannel {
return (InetSocketAddress) super.remoteAddress(); return (InetSocketAddress) super.remoteAddress();
} }
/**
* Return the underlying {@link AsynchronousChannel}. Be aware this should only be called after it was set as
* otherwise it will throw an {@link IllegalStateException}.
*/
protected AsynchronousChannel javaChannel() { protected AsynchronousChannel javaChannel() {
if (ch == null) { if (ch == null) {
throw new IllegalStateException("Try to access Channel before eventLoop was registered"); throw new IllegalStateException("Try to access Channel before eventLoop was registered");
@ -159,6 +179,9 @@ abstract class AbstractAioChannel extends AbstractChannel {
} }
} }
/**
* Connect to the remote peer using the given localAddress if one is specified or {@code null} otherwise.
*/
protected abstract void doConnect(SocketAddress remoteAddress, protected abstract void doConnect(SocketAddress remoteAddress,
SocketAddress localAddress, ChannelFuture connectFuture); SocketAddress localAddress, ChannelFuture connectFuture);

View File

@ -16,5 +16,14 @@
package io.netty.channel.socket.aio; package io.netty.channel.socket.aio;
interface AioChannelFinder { interface AioChannelFinder {
/**
* Try to find the {@link AbstractAioChannel} for the given {@link Runnable}.
*
* @param command the {@link Runnable} for which the {@link AbstractAioChannel} should be found.
* @return channel the {@link AbstractAioChannel} which belongs to the {@link Runnable} or {@code null} if
* it could not found.
* @throws Exception will get thrown if an error accours.
*/
AbstractAioChannel findChannel(Runnable command) throws Exception; AbstractAioChannel findChannel(Runnable command) throws Exception;
} }

View File

@ -22,8 +22,6 @@ import java.nio.channels.CompletionHandler;
/** /**
* Special {@link CompletionHandler} which makes sure that the callback methods gets executed in the {@link EventLoop} * Special {@link CompletionHandler} which makes sure that the callback methods gets executed in the {@link EventLoop}
*
*
*/ */
abstract class AioCompletionHandler<V, A extends Channel> implements CompletionHandler<V, A> { abstract class AioCompletionHandler<V, A extends Channel> implements CompletionHandler<V, A> {

View File

@ -28,6 +28,9 @@ import java.util.IdentityHashMap;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
/**
* {@link SingleThreadEventLoop} implementations which will handle AIO {@link Channel}s.
*/
final class AioEventLoop extends SingleThreadEventLoop { final class AioEventLoop extends SingleThreadEventLoop {
private final Set<Channel> channels = Collections.newSetFromMap(new IdentityHashMap<Channel, Boolean>()); private final Set<Channel> channels = Collections.newSetFromMap(new IdentityHashMap<Channel, Boolean>());

View File

@ -15,6 +15,7 @@
*/ */
package io.netty.channel.socket.aio; package io.netty.channel.socket.aio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelTaskScheduler; import io.netty.channel.ChannelTaskScheduler;
import io.netty.channel.EventExecutor; import io.netty.channel.EventExecutor;
import io.netty.channel.EventLoopException; import io.netty.channel.EventLoopException;
@ -32,6 +33,10 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/**
* {@link AioEventLoopGroup} implementation which will handle AIO {@link Channel} implementations.
*
*/
public class AioEventLoopGroup extends MultithreadEventLoopGroup { public class AioEventLoopGroup extends MultithreadEventLoopGroup {
private static final InternalLogger LOGGER = InternalLoggerFactory.getInstance(AioEventLoopGroup.class); private static final InternalLogger LOGGER = InternalLoggerFactory.getInstance(AioEventLoopGroup.class);
private static final AioChannelFinder CHANNEL_FINDER; private static final AioChannelFinder CHANNEL_FINDER;
@ -56,14 +61,30 @@ public class AioEventLoopGroup extends MultithreadEventLoopGroup {
private final AioExecutorService groupExecutor = new AioExecutorService(); private final AioExecutorService groupExecutor = new AioExecutorService();
final AsynchronousChannelGroup group; final AsynchronousChannelGroup group;
/**
* Create a new instance which use the default number of threads of {@link #DEFAULT_POOL_SIZE}.
*/
public AioEventLoopGroup() { public AioEventLoopGroup() {
this(0); this(0);
} }
/**
* Create a new instance
*
* @param nThreads the number of threads that will be used by this instance. Use 0 for the default number
* of {@link #DEFAULT_POOL_SIZE}
*/
public AioEventLoopGroup(int nThreads) { public AioEventLoopGroup(int nThreads) {
this(nThreads, null); this(nThreads, null);
} }
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance. Use 0 for the default number
* of {@link #DEFAULT_POOL_SIZE}
* @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used.
*/
public AioEventLoopGroup(int nThreads, ThreadFactory threadFactory) { public AioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory); super(nThreads, threadFactory);
try { try {

View File

@ -19,6 +19,7 @@ import io.netty.buffer.BufType;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelMetadata;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.ServerSocketChannelConfig; import io.netty.channel.socket.ServerSocketChannelConfig;
import io.netty.logging.InternalLogger; import io.netty.logging.InternalLogger;
@ -32,6 +33,11 @@ import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
/**
* {@link io.netty.channel.socket.ServerSocketChannel} implementation which uses NIO2.
*
* NIO2 is only supported on Java 7+.
*/
public class AioServerSocketChannel extends AbstractAioChannel implements ServerSocketChannel { public class AioServerSocketChannel extends AbstractAioChannel implements ServerSocketChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(BufType.MESSAGE, false); private static final ChannelMetadata METADATA = new ChannelMetadata(BufType.MESSAGE, false);
@ -60,11 +66,21 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
} }
} }
/**
* Create a new instance which has not yet attached an {@link AsynchronousServerSocketChannel}. The
* {@link AsynchronousServerSocketChannel} will be attached after it was this instance was registered to an
* {@link EventLoop}.
*/
public AioServerSocketChannel() { public AioServerSocketChannel() {
super(null, null, null); super(null, null, null);
config = new AioServerSocketChannelConfig(); config = new AioServerSocketChannelConfig();
} }
/**
* Create a new instance from the given {@link AsynchronousServerSocketChannel}.
*
* @param channel the {@link AsynchronousServerSocketChannel} which is used by this instance
*/
public AioServerSocketChannel(AsynchronousServerSocketChannel channel) { public AioServerSocketChannel(AsynchronousServerSocketChannel channel) {
super(null, null, channel); super(null, null, channel);
config = new AioServerSocketChannelConfig(channel); config = new AioServerSocketChannelConfig(channel);
@ -146,7 +162,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
if (ch == null) { if (ch == null) {
AsynchronousServerSocketChannel channel = newSocket(((AioEventLoopGroup) eventLoop().parent()).group); AsynchronousServerSocketChannel channel = newSocket(((AioEventLoopGroup) eventLoop().parent()).group);
ch = channel; ch = channel;
config.active(channel); config.assign(channel);
} }
return task; return task;
} }

View File

@ -45,9 +45,18 @@ final class AioServerSocketChannelConfig extends DefaultChannelConfig
private static final int DEFAULT_SND_BUF_SIZE = 32 * 1024; private static final int DEFAULT_SND_BUF_SIZE = 32 * 1024;
private static final boolean DEFAULT_SO_REUSEADDR = false; private static final boolean DEFAULT_SO_REUSEADDR = false;
/**
* Creates a new instance with no {@link AsynchronousServerSocketChannel} assigned to it.
*
* You should call {@link #assign(AsynchronousServerSocketChannel)} to assign a
* {@link AsynchronousServerSocketChannel} to it and have the configuration set on it.
*/
AioServerSocketChannelConfig() { AioServerSocketChannelConfig() {
} }
/**
* Creates a new instance with the given {@link AsynchronousServerSocketChannel} assigned to it.
*/
AioServerSocketChannelConfig(AsynchronousServerSocketChannel channel) { AioServerSocketChannelConfig(AsynchronousServerSocketChannel channel) {
this.channel.set(channel); this.channel.set(channel);
} }
@ -131,6 +140,7 @@ final class AioServerSocketChannelConfig extends DefaultChannelConfig
return this; return this;
} }
@SuppressWarnings({ "unchecked", "rawtypes" })
private Object getOption(SocketOption option, Object defaultValue) { private Object getOption(SocketOption option, Object defaultValue) {
if (channel.get() == null) { if (channel.get() == null) {
Object value = options.get(option); Object value = options.get(option);
@ -148,6 +158,7 @@ final class AioServerSocketChannelConfig extends DefaultChannelConfig
} }
} }
@SuppressWarnings({ "unchecked", "rawtypes" })
private void setOption(SocketOption option, Object defaultValue) { private void setOption(SocketOption option, Object defaultValue) {
if (channel.get() == null) { if (channel.get() == null) {
options.put(option, defaultValue); options.put(option, defaultValue);
@ -160,7 +171,10 @@ final class AioServerSocketChannelConfig extends DefaultChannelConfig
} }
} }
void active(AsynchronousServerSocketChannel channel) { /**
* Assing the given {@link AsynchronousServerSocketChannel} to this instance
*/
void assign(AsynchronousServerSocketChannel channel) {
if (channel == null) { if (channel == null) {
throw new NullPointerException("channel"); throw new NullPointerException("channel");
} }
@ -169,6 +183,7 @@ final class AioServerSocketChannelConfig extends DefaultChannelConfig
} }
} }
@SuppressWarnings({ "unchecked", "rawtypes" })
private void propagateOptions() { private void propagateOptions() {
for (SocketOption option: options.keySet()) { for (SocketOption option: options.keySet()) {
Object value = options.remove(option); Object value = options.remove(option);

View File

@ -41,6 +41,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
/**
* {@link io.netty.channel.socket.SocketChannel} implementation which uses NIO2.
*
* NIO2 is only supported on Java 7+.
*/
public class AioSocketChannel extends AbstractAioChannel implements SocketChannel { public class AioSocketChannel extends AbstractAioChannel implements SocketChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(BufType.BYTE, false); private static final ChannelMetadata METADATA = new ChannelMetadata(BufType.BYTE, false);
@ -77,10 +82,27 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
} }
}; };
/**
* Create a new instance which has not yet attached an {@link AsynchronousSocketChannel}. The
* {@link AsynchronousSocketChannel} will be attached after it was this instance was registered to an
* {@link EventLoop}.
*/
public AioSocketChannel() { public AioSocketChannel() {
this(null, null, null); this(null, null, null);
} }
/**
* Create a new instance from the given {@link AsynchronousSocketChannel}.
*
* @param parent
* the parent of this channel. {@code null} if there's no parent.
* @param id
* the unique non-negative integer ID of this channel.
* Specify {@code null} to auto-generate a unique negative integer
* ID.
* @param ch
* the {@link AsynchronousSocketChannel} which is used by this instance
*/
AioSocketChannel( AioSocketChannel(
AioServerSocketChannel parent, Integer id, AsynchronousSocketChannel ch) { AioServerSocketChannel parent, Integer id, AsynchronousSocketChannel ch) {
super(parent, id, ch); super(parent, id, ch);
@ -182,7 +204,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
super.doRegister(); super.doRegister();
if (ch == null) { if (ch == null) {
ch = newSocket(((AioEventLoopGroup) eventLoop().parent()).group); ch = newSocket(((AioEventLoopGroup) eventLoop().parent()).group);
config.active(javaChannel()); config.assign(javaChannel());
} }
if (remoteAddress() == null) { if (remoteAddress() == null) {

View File

@ -19,6 +19,11 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.socket.SocketChannelConfig; import io.netty.channel.socket.SocketChannelConfig;
import java.nio.channels.InterruptedByTimeoutException; import java.nio.channels.InterruptedByTimeoutException;
/**
* Special {@link SocketChannelConfig} which is used for the {@link AioSocketChannel} to expose extra configuration
* possiblilites.
*/
public interface AioSocketChannelConfig extends SocketChannelConfig { public interface AioSocketChannelConfig extends SocketChannelConfig {
@Override @Override
AioSocketChannelConfig setTcpNoDelay(boolean tcpNoDelay); AioSocketChannelConfig setTcpNoDelay(boolean tcpNoDelay);

View File

@ -49,9 +49,18 @@ final class DefaultAioSocketChannelConfig extends DefaultChannelConfig
private static final boolean DEFAULT_SO_REUSEADDR = false; private static final boolean DEFAULT_SO_REUSEADDR = false;
private static final boolean DEFAULT_TCP_NODELAY = false; private static final boolean DEFAULT_TCP_NODELAY = false;
/**
* Creates a new instance with no {@link NetworkChannel} assigned to it.
*
* You should call {@link #assign(NetworkChannel)} to assign a {@link NetworkChannel} to it and
* have the configuration set on it.
*/
DefaultAioSocketChannelConfig() { DefaultAioSocketChannelConfig() {
} }
/**
* Creates a new instance with the given {@link NetworkChannel} assigned to it.
*/
DefaultAioSocketChannelConfig(NetworkChannel channel) { DefaultAioSocketChannelConfig(NetworkChannel channel) {
this.channel.set(channel); this.channel.set(channel);
} }
@ -215,6 +224,7 @@ final class DefaultAioSocketChannelConfig extends DefaultChannelConfig
return this; return this;
} }
@SuppressWarnings({ "unchecked", "rawtypes" })
private Object getOption(SocketOption option, Object defaultValue) { private Object getOption(SocketOption option, Object defaultValue) {
if (channel.get() == null) { if (channel.get() == null) {
Object value = options.get(option); Object value = options.get(option);
@ -232,6 +242,7 @@ final class DefaultAioSocketChannelConfig extends DefaultChannelConfig
} }
} }
@SuppressWarnings({ "unchecked", "rawtypes" })
private void setOption(SocketOption option, Object defaultValue) { private void setOption(SocketOption option, Object defaultValue) {
if (channel.get() == null) { if (channel.get() == null) {
options.put(option, defaultValue); options.put(option, defaultValue);
@ -283,7 +294,10 @@ final class DefaultAioSocketChannelConfig extends DefaultChannelConfig
return this; return this;
} }
void active(NetworkChannel channel) { /**
* Assing the given {@link NetworkChannel} to this instance
*/
void assign(NetworkChannel channel) {
if (channel == null) { if (channel == null) {
throw new NullPointerException("channel"); throw new NullPointerException("channel");
} }
@ -292,6 +306,7 @@ final class DefaultAioSocketChannelConfig extends DefaultChannelConfig
} }
} }
@SuppressWarnings({ "unchecked", "rawtypes" })
private void propagateOptions() { private void propagateOptions() {
for (SocketOption option: options.keySet()) { for (SocketOption option: options.keySet()) {
Object value = options.remove(option); Object value = options.remove(option);

View File

@ -19,6 +19,9 @@ import java.lang.reflect.Field;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
/**
* {@link AioChannelFinder} implementation which use reflection for find the right {@link AbstractAioChannel}.
*/
final class ReflectiveAioChannelFinder implements AioChannelFinder { final class ReflectiveAioChannelFinder implements AioChannelFinder {
private static volatile Map<Class<?>, Field> fieldCache = new HashMap<Class<?>, Field>(); private static volatile Map<Class<?>, Field> fieldCache = new HashMap<Class<?>, Field>();

View File

@ -21,6 +21,9 @@ import java.util.Map;
import sun.misc.Unsafe; import sun.misc.Unsafe;
/**
* {@link AioChannelFinder} implementation which will use {@link Unsafe}.
*/
@SuppressWarnings("restriction") @SuppressWarnings("restriction")
final class UnsafeAioChannelFinder implements AioChannelFinder { final class UnsafeAioChannelFinder implements AioChannelFinder {
private static final Unsafe UNSAFE = getUnsafe(); private static final Unsafe UNSAFE = getUnsafe();

View File

@ -17,5 +17,7 @@
/** /**
* <a href="http://en.wikipedia.org/wiki/New_I/O">NIO2</a>-based socket channel * <a href="http://en.wikipedia.org/wiki/New_I/O">NIO2</a>-based socket channel
* API implementation - recommended for a large number of connections (&gt;= 1000). * API implementation - recommended for a large number of connections (&gt;= 1000).
*
* NIO2 is only supported on Java 7+.
*/ */
package io.netty.channel.socket.aio; package io.netty.channel.socket.aio;

View File

@ -33,7 +33,7 @@ abstract class AbstractOioChannel extends AbstractChannel {
protected volatile boolean readSuspended; protected volatile boolean readSuspended;
/** /**
* @see AbstractChannel#AbstractChannel(io.netty.channel.Channel, Integer) * @see AbstractChannel#AbstractChannel(Channel, Integer)
*/ */
protected AbstractOioChannel(Channel parent, Integer id) { protected AbstractOioChannel(Channel parent, Integer id) {
super(parent, id); super(parent, id);
@ -122,7 +122,7 @@ abstract class AbstractOioChannel extends AbstractChannel {
} }
/** /**
* Connect to the remote peer using the given localAddress if one is specified or null otherwise. * Connect to the remote peer using the given localAddress if one is specified or {@code null} otherwise.
*/ */
protected abstract void doConnect( protected abstract void doConnect(
SocketAddress remoteAddress, SocketAddress localAddress) throws Exception; SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;