Add javadocs and some small cleanups

This commit is contained in:
Norman Maurer 2012-12-23 20:58:49 +01:00
parent 71b089cb3b
commit a9fdb682be
17 changed files with 311 additions and 72 deletions

View File

@ -18,6 +18,10 @@ package io.netty.channel;
import java.util.ArrayDeque;
import java.util.Queue;
/**
* This implementation allows to register {@link ChannelFuture} instances which will get notified once some amount of
* data was written and so a checkpoint was reached.
*/
public final class ChannelFlushFutureNotifier {
private long writeCounter;

View File

@ -18,6 +18,9 @@ package io.netty.channel;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
/**
* The result of an asynchronous {@link Channel} I/O operation.
* <p>
@ -90,34 +93,30 @@ import java.util.concurrent.TimeUnit;
* <pre>
* // BAD - NEVER DO THIS
* {@code @Override}
* public void messageReceived({@link ChannelHandlerContext} ctx, {@link MessageEvent} e) {
* if (e.getMessage() instanceof GoodByeMessage) {
* {@link ChannelFuture} future = e.getChannel().close();
* future.awaitUninterruptibly();
* // Perform post-closure operation
* // ...
* }
* public void messageReceived({@link ChannelHandlerContext} ctx, GoodByeMessage msg) {
* {@link ChannelFuture} future = ctx.channel().close();
* future.awaitUninterruptibly();
* // Perform post-closure operation
* // ...
* }
*
* // GOOD
* {@code @Override}
* public void messageReceived({@link ChannelHandlerContext} ctx, {@link MessageEvent} e) {
* if (e.getMessage() instanceof GoodByeMessage) {
* {@link ChannelFuture} future = e.getChannel().close();
* future.addListener(new {@link ChannelFutureListener}() {
* public void operationComplete({@link ChannelFuture} future) {
* // Perform post-closure operation
* // ...
* }
* });
* }
* public void messageReceived({@link ChannelHandlerContext} ctx, GoodByeMessage msg) {
* {@link ChannelFuture} future = ctx.channel().close();
* future.addListener(new {@link ChannelFutureListener}() {
* public void operationComplete({@link ChannelFuture} future) {
* // Perform post-closure operation
* // ...
* }
* });
* }
* </pre>
* <p>
* In spite of the disadvantages mentioned above, there are certainly the cases
* where it is more convenient to call {@link #await()}. In such a case, please
* make sure you do not call {@link #await()} in an I/O thread. Otherwise,
* {@link IllegalStateException} will be raised to prevent a dead lock.
* {@link BlockingOperationException} will be raised to prevent a dead lock.
*
* <h3>Do not confuse I/O timeout and await timeout</h3>
*
@ -145,7 +144,7 @@ import java.util.concurrent.TimeUnit;
* // GOOD
* {@link Bootstrap} b = ...;
* // Configure the connect timeout option.
* <b>b.setOption("connectTimeoutMillis", 10000);</b>
* <b>b.setOption({@link ChannelOption}.CONNECT_TIMEOUT_MILLIS, 10000);</b>
* {@link ChannelFuture} f = b.connect(...);
* f.awaitUninterruptibly();
*

View File

@ -25,7 +25,7 @@ import java.util.Set;
* listening to the individual futures and producing an aggregated result
* (success/failure) when all futures have completed.
*/
public class ChannelFutureAggregator implements ChannelFutureListener {
public final class ChannelFutureAggregator implements ChannelFutureListener {
private final ChannelFuture aggregateFuture;

View File

@ -17,47 +17,103 @@ package io.netty.channel;
import java.net.SocketAddress;
public class ChannelHandlerAdapter extends ChannelStateHandlerAdapter implements ChannelOperationHandler {
/**
* {@link ChannelHandler} implementation which represents a combination out of a {@link ChannelStateHandler} and
* the {@link ChannelOperationHandler}.
*
* It is a good starting point if your {@link ChannelHandler} implementation needs to intercept operations and also
* state updates.
*/
public abstract class ChannelHandlerAdapter extends ChannelStateHandlerAdapter implements ChannelOperationHandler {
/**
* Calls {@link ChannelHandlerContext#bind(SocketAddress, ChannelFuture)} to forward
* to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
ChannelFuture future) throws Exception {
ctx.bind(localAddress, future);
}
/**
* Calls {@link ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelFuture)} to forward
* to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelFuture future) throws Exception {
ctx.connect(remoteAddress, localAddress, future);
}
/**
* Calls {@link ChannelHandlerContext#disconnect(ChannelFuture)} to forward
* to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelFuture future)
throws Exception {
ctx.disconnect(future);
}
/**
* Calls {@link ChannelHandlerContext#close(ChannelFuture)} to forward
* to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void close(ChannelHandlerContext ctx, ChannelFuture future)
throws Exception {
ctx.close(future);
}
/**
* Calls {@link ChannelHandlerContext#close(ChannelFuture)} to forward
* to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void deregister(ChannelHandlerContext ctx, ChannelFuture future)
throws Exception {
ctx.deregister(future);
}
/**
* Calls {@link ChannelHandlerContext#flush(ChannelFuture)} to forward
* to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*
* Be aware that if your class also implement {@link ChannelOutboundHandler} it need to {@code @Override} this
* method and provide some proper implementation. Fail to do so, will result in an {@link IllegalStateException}!
*/
@Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future)
throws Exception {
if (this instanceof ChannelOutboundHandler) {
throw new IllegalStateException(
"flush(...) must be overridden by " + getClass().getName() +
", which implements " + ChannelOutboundHandler.class.getSimpleName());
}
ctx.flush(future);
}
/**
* Calls {@link ChannelHandlerContext#sendFile(FileRegion, ChannelFuture)} to forward
* to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void sendFile(ChannelHandlerContext ctx, FileRegion region, ChannelFuture future) throws Exception {
ctx.sendFile(region, future);
}
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelFuture future) throws Exception {
ctx.bind(localAddress, future);
}
@Override
public void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelFuture future) throws Exception {
ctx.connect(remoteAddress, localAddress, future);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
ctx.disconnect(future);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
ctx.close(future);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
ctx.deregister(future);
}
@Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
if (this instanceof ChannelOutboundHandler) {
throw new IllegalStateException(
"flush(...) must be overridden by " + getClass().getName() +
", which implements " + ChannelOutboundHandler.class.getSimpleName());
}
ctx.flush(future);
}
}

View File

@ -16,9 +16,27 @@
package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
/**
* Utilities for {@link ChannelHandler} implementations.
*/
public final class ChannelHandlerUtil {
/**
* Unfold the given msg and pass it to the next buffer depending on the msg type.
*
* @param ctx
* the {@link ChannelHandlerContext} on which to operate
* @param msg
* the msg to unfold and pass to the next buffer
* @param inbound
* {@code true} if it is an inbound message, {@code false} otherwise
* @return added
* {@code true} if the message was added to the next {@link ByteBuf} or {@link MessageBuf}
* @throws Exception
* thrown if an error accour
*/
public static boolean unfoldAndAdd(
ChannelHandlerContext ctx, Object msg, boolean inbound) throws Exception {
if (msg == null) {
@ -80,6 +98,9 @@ public final class ChannelHandlerUtil {
private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
/**
* Creates a safe copy of the given array and return it.
*/
public static Class<?>[] acceptedMessageTypes(Class<?>[] acceptedMsgTypes) {
if (acceptedMsgTypes == null) {
return EMPTY_TYPES;
@ -99,8 +120,12 @@ public final class ChannelHandlerUtil {
return newAllowedMsgTypes;
}
/**
* Return {@code true} if the given msg is compatible with one of the given acceptedMessageTypes or if
* acceptedMessageTypes is null / empty.
*/
public static boolean acceptMessage(Class<?>[] acceptedMsgTypes, Object msg) {
if (acceptedMsgTypes.length == 0) {
if (acceptedMsgTypes == null || acceptedMsgTypes.length == 0) {
return true;
}
@ -113,6 +138,9 @@ public final class ChannelHandlerUtil {
return false;
}
/**
* Add the given msg to the next outbound {@link MessageBuf}.
*/
public static void addToNextOutboundBuffer(ChannelHandlerContext ctx, Object msg) {
try {
ctx.nextOutboundMessageBuffer().add(msg);
@ -124,6 +152,9 @@ public final class ChannelHandlerUtil {
}
}
/**
* Add the given msg to the next inbound {@link MessageBuf}.
*/
public static void addToNextInboundBuffer(ChannelHandlerContext ctx, Object msg) {
try {
ctx.nextInboundMessageBuffer().add(msg);

View File

@ -27,6 +27,11 @@ import io.netty.buffer.Buf;
public abstract class ChannelInboundHandlerAdapter
extends ChannelStateHandlerAdapter implements ChannelInboundHandler {
/**
* Calls {@link Buf#free()} to free the buffer, sub-classes may override this.
*
* When doing so be aware that you will need to handle all the resource management by your own.
*/
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
buf.free();

View File

@ -17,7 +17,7 @@ package io.netty.channel;
import java.net.SocketAddress;
public class ChannelOperationHandlerAdapter implements ChannelOperationHandler {
public abstract class ChannelOperationHandlerAdapter implements ChannelOperationHandler {
/**
* Do nothing by default, sub-classes may override this method.

View File

@ -18,8 +18,7 @@ package io.netty.channel;
import io.netty.buffer.ByteBuf;
/**
* Abstract base class which handles outgoing bytes
*
* Abstract base class which handles outgoing bytes.
*/
public abstract class ChannelOutboundByteHandlerAdapter
extends ChannelOutboundHandlerAdapter implements ChannelOutboundByteHandler {

View File

@ -26,6 +26,12 @@ import io.netty.buffer.Buf;
*/
public abstract class ChannelOutboundHandlerAdapter
extends ChannelOperationHandlerAdapter implements ChannelOutboundHandler {
/**
* Calls {@link Buf#free()} to free the buffer, sub-classes may override this.
*
* When doing so be aware that you will need to handle all the resource management by your own.
*/
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
buf.free();

View File

@ -35,59 +35,115 @@ public class ChannelStateHandlerAdapter implements ChannelStateHandler {
final boolean isSharable() {
return getClass().isAnnotationPresent(Sharable.class);
}
/**
* Do nothing by default, sub-classes may override this method.
*/
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
/**
* Do nothing by default, sub-classes may override this method.
*/
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
/**
* Do nothing by default, sub-classes may override this method.
*/
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
/**
* Do nothing by default, sub-classes may override this method.
*/
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
/**
* Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)} to forward
* to the next {@link ChannelHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.fireExceptionCaught(cause);
}
/**
* Calls {@link ChannelHandlerContext#fireUserEventTriggered(Object)} to forward
* to the next {@link ChannelHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
ctx.fireUserEventTriggered(evt);
}
/**
* Calls {@link ChannelHandlerContext#fireChannelRegistered()} to forward
* to the next {@link ChannelStateHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
/**
* Calls {@link ChannelHandlerContext#fireChannelUnregistered()} to forward
* to the next {@link ChannelStateHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
/**
* Calls {@link ChannelHandlerContext#fireChannelActive()} to forward
* to the next {@link ChannelStateHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
/**
* Calls {@link ChannelHandlerContext#fireChannelInactive()} to forward
* to the next {@link ChannelStateHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
/**
* Calls {@link ChannelHandlerContext#fireInboundBufferUpdated()} to forward
* to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*
* Be aware that if your class also implement {@link ChannelInboundHandler} it need to {@code @Override} this
* method and provide some proper implementation. Fail to do so, will result in an {@link IllegalStateException}!
*/
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
if (this instanceof ChannelInboundHandler) {

View File

@ -33,11 +33,19 @@ public class CombinedChannelHandler extends ChannelStateHandlerAdapter implement
// User will call init in the subclass constructor.
}
/**
* Combine the given {@link ChannelInboundHandler} and {@link ChannelOutboundHandler}.
*/
public CombinedChannelHandler(
ChannelInboundHandler inboundHandler, ChannelOutboundHandler outboundHandler) {
init(inboundHandler, outboundHandler);
}
/**
* Needs to get called before the handler can be added to the {@link ChannelPipeline}.
* Otherwise it will trigger a {@link IllegalStateException} later.
*
*/
protected void init(
ChannelInboundHandler inboundHandler, ChannelOutboundHandler outboundHandler) {
if (inboundHandler == null) {

View File

@ -22,7 +22,7 @@ import java.util.concurrent.ThreadFactory;
* serial fashion
*
*/
class DefaultEventExecutor extends SingleThreadEventExecutor {
final class DefaultEventExecutor extends SingleThreadEventExecutor {
/**
* @see SingleThreadEventExecutor#SingleThreadEventExecutor(EventExecutorGroup, ThreadFactory, ChannelTaskScheduler)

View File

@ -83,6 +83,9 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}
/**
* Return a safe-copy of all of the children of this group.
*/
protected Set<EventExecutor> children() {
Set<EventExecutor> children = Collections.newSetFromMap(new LinkedHashMap<EventExecutor, Boolean>());
Collections.addAll(children, this.children);

View File

@ -150,6 +150,12 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
taskQueue = newTaskQueue();
}
/**
* Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
* {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
* calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
* implementation that does not support blocking operations at all.
*/
protected Queue<Runnable> newTaskQueue() {
return new LinkedBlockingQueue<Runnable>();
}
@ -164,15 +170,27 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
return this;
}
/**
* Interrupt the current running {@link Thread}.
*/
protected void interruptThread() {
thread.interrupt();
}
/**
* @see {@link Queue#poll()}
*/
protected Runnable pollTask() {
assert inEventLoop();
return taskQueue.poll();
}
/**
* Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
*
* Be aware that this method will throw an {@link UnsupportedOperationException} if the task queue, which was
* created via {@link #newTaskQueue()}, does not implement {@link BlockingQueue}.
*/
protected Runnable takeTask() throws InterruptedException {
assert inEventLoop();
if (taskQueue instanceof BlockingQueue) {
@ -182,16 +200,26 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
}
}
/**
* @see {@link Queue#peek()}
*/
protected Runnable peekTask() {
assert inEventLoop();
return taskQueue.peek();
}
/**
* @see {@link Queue#isEmpty()}
*/
protected boolean hasTasks() {
assert inEventLoop();
return !taskQueue.isEmpty();
}
/**
* Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
* before.
*/
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
@ -202,6 +230,9 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
taskQueue.add(task);
}
/**
* @see {@link Queue#remove(Object)}
*/
protected boolean removeTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
@ -209,6 +240,10 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
return taskQueue.remove(task);
}
/**
* Poll all tasks from the task queue and run them via {@link Runnable#run()} method.
* @return
*/
protected boolean runAllTasks() {
boolean ran = false;
for (;;) {
@ -231,10 +266,16 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
return ran;
}
/**
*
*/
protected abstract void run();
/**
* Do nothing, sub-classes may override
*/
protected void cleanup() {
// Do nothing. Subclasses will override.
// NOOP
}
protected void wakeup(boolean inEventLoop) {
@ -253,6 +294,9 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
return thread == this.thread;
}
/**
* Add a {@link Runnable} which will be executed on shutdown of this instance
*/
public void addShutdownHook(final Runnable task) {
if (inEventLoop()) {
shutdownHooks.add(task);
@ -266,6 +310,9 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
}
}
/**
* Remove a previous added {@link Runnable} as a shutdown hook
*/
public void removeShutdownHook(final Runnable task) {
if (inEventLoop()) {
shutdownHooks.remove(task);
@ -348,6 +395,9 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
return state == ST_TERMINATED;
}
/**
* Confirm that the shutdown if the instance should be done now!
*/
protected boolean confirmShutdown() {
if (!isShutdown()) {
throw new IllegalStateException("must be invoked after shutdown()");

View File

@ -15,9 +15,7 @@
*/
package io.netty.channel;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* The {@link CompleteChannelFuture} which is succeeded already. It is
@ -46,7 +44,7 @@ final class SucceededChannelFuture extends CompleteChannelFuture {
}
@Override
public ChannelFuture sync() throws InterruptedException {
public ChannelFuture sync() {
return this;
}
@ -56,13 +54,12 @@ final class SucceededChannelFuture extends CompleteChannelFuture {
}
@Override
public Void get() throws InterruptedException, ExecutionException {
public Void get() {
return null;
}
@Override
public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException {
public Void get(long timeout, TimeUnit unit) {
return null;
}

View File

@ -21,16 +21,33 @@ import io.netty.channel.ChannelTaskScheduler;
import java.util.concurrent.ThreadFactory;
/**
* {@link MultithreadEventLoopGroup} which must be used for the local transport.
*/
public class LocalEventLoopGroup extends MultithreadEventLoopGroup {
/**
* Create a new instance which used {@link #DEFAULT_POOL_SIZE} number of Threads
*/
public LocalEventLoopGroup() {
this(0);
}
/**
* Create a new instance
*
* @param nThreads the number of Threads to use or {@code 0} for the default of {@link #DEFAULT_POOL_SIZE}
*/
public LocalEventLoopGroup(int nThreads) {
this(nThreads, null);
}
/**
* Create a new instance
*
* @param nThreads the number of Threads to use or {@code 0} for the default of {@link #DEFAULT_POOL_SIZE}
* @param threadFactory the {@link ThreadFactory} or {@code null} to use the default
*/
public LocalEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
}

View File

@ -26,7 +26,7 @@ import io.netty.channel.SingleThreadEventLoop;
import java.net.SocketAddress;
/**
* A {@link ServerChannel} for the local transport.
* A {@link ServerChannel} for the local transport which allows in VM communication.
*/
public class LocalServerChannel extends AbstractServerChannel {
@ -41,10 +41,18 @@ public class LocalServerChannel extends AbstractServerChannel {
private volatile int state; // 0 - open, 1 - active, 2 - closed
private volatile LocalAddress localAddress;
/**
* Creates a new instance
*/
public LocalServerChannel() {
this(null);
}
/**
* Create a new instance
*
* @param id the id to use or {@code null} if a new id should be generated
*/
public LocalServerChannel(Integer id) {
super(id);
}