Introduce ChannelHandlerInvoker, dedeciated for invoking event handler methods, and move most handler invocation code in ChannelHandlerContext to the default ChannelHandlerInvoker implementation

- Fixes #1912
- Add ChannelHandlerInvoker and its default implementation
- Add pipeline manipulation methods that accept ChannelHandlerInvoker
- Rename Channel(Inbound|Outbound)Invoker to
  Channel(Inbound|Outbound)Ops to avoid confusion
- Remove the Javadoc references to the package-private interfaces
This commit is contained in:
Trustin Lee 2013-11-06 21:14:07 +09:00
parent 883ab29d05
commit 132af3a485
72 changed files with 1598 additions and 928 deletions

View File

@ -51,7 +51,7 @@ public class SpdyFrameDecoderTest {
testTooLargeHeaderNameOnSynStreamRequest(SpdyVersion.SPDY_3_1);
}
private void testTooLargeHeaderNameOnSynStreamRequest(final SpdyVersion version) throws Exception {
private static void testTooLargeHeaderNameOnSynStreamRequest(final SpdyVersion version) throws Exception {
List<Integer> headerSizes = Arrays.asList(90, 900);
for (final int maxHeaderSize : headerSizes) { // 90 catches the header name, 900 the value
SpdyHeadersFrame frame = new DefaultSpdySynStreamFrame(1, 0, (byte) 0);

View File

@ -16,9 +16,8 @@
package io.netty.util.concurrent;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.RunnableFuture;
@ -29,24 +28,43 @@ import java.util.concurrent.TimeUnit;
*/
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;
private final EventExecutorGroup parent;
protected AbstractEventExecutor() {
this(null);
}
protected AbstractEventExecutor(EventExecutorGroup parent) {
this.parent = parent;
}
@Override
public EventExecutorGroup parent() {
return parent;
}
@Override
public EventExecutor next() {
return this;
}
@Override
@SuppressWarnings("unchecked")
public <E extends EventExecutor> Set<E> children() {
return Collections.singleton((E) this);
}
@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}
@Override
public Iterator<EventExecutor> iterator() {
return new EventExecutorIterator();
}
@Override
public Future<?> shutdownGracefully() {
return shutdownGracefully(2, 15, TimeUnit.SECONDS);
return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
}
/**
@ -131,27 +149,4 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
throw new UnsupportedOperationException();
}
private final class EventExecutorIterator implements Iterator<EventExecutor> {
private boolean nextCalled;
@Override
public boolean hasNext() {
return !nextCalled;
}
@Override
public EventExecutor next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
nextCalled = true;
return AbstractEventExecutor.this;
}
@Override
public void remove() {
throw new UnsupportedOperationException("read-only");
}
}
}

View File

@ -23,12 +23,13 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static io.netty.util.concurrent.AbstractEventExecutor.*;
/**
* Abstract base class for {@link EventExecutorGroup} implementations.
*/
public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
@Override
public Future<?> submit(Runnable task) {
return next().submit(task);
@ -66,7 +67,7 @@ public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
@Override
public Future<?> shutdownGracefully() {
return shutdownGracefully(2, 15, TimeUnit.SECONDS);
return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
}
/**

View File

@ -16,15 +16,36 @@
package io.netty.util.concurrent;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
/**
* Default {@link SingleThreadEventExecutor} implementation which just execute all submitted task in a
* serial fashion
*
*/
final class DefaultEventExecutor extends SingleThreadEventExecutor {
public final class DefaultEventExecutor extends SingleThreadEventExecutor {
DefaultEventExecutor(DefaultEventExecutorGroup parent, Executor executor) {
public DefaultEventExecutor() {
this((EventExecutorGroup) null);
}
public DefaultEventExecutor(ThreadFactory threadFactory) {
this(null, threadFactory);
}
public DefaultEventExecutor(Executor executor) {
this(null, executor);
}
public DefaultEventExecutor(EventExecutorGroup parent) {
this(parent, new DefaultThreadFactory(DefaultEventExecutor.class));
}
public DefaultEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory) {
super(parent, threadFactory, true);
}
public DefaultEventExecutor(EventExecutorGroup parent, Executor executor) {
super(parent, executor, true);
}

View File

@ -15,6 +15,10 @@
*/
package io.netty.util.concurrent;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
/**
* The {@link EventExecutor} is a special {@link EventExecutorGroup} which comes
* with some handy methods to see if a {@link Thread} is executed in a event loop.
@ -30,6 +34,12 @@ public interface EventExecutor extends EventExecutorGroup {
@Override
EventExecutor next();
/**
* Returns an unmodifiable singleton set which contains itself.
*/
@Override
<E extends EventExecutor> Set<E> children();
/**
* Return the {@link EventExecutorGroup} which is the parent of this {@link EventExecutor},
*/
@ -69,4 +79,25 @@ public interface EventExecutor extends EventExecutorGroup {
* every call of blocking methods will just return without blocking.
*/
<V> Future<V> newFailedFuture(Throwable cause);
@Override
Future<?> submit(Runnable task);
@Override
<T> Future<T> submit(Runnable task, T result);
@Override
<T> Future<T> submit(Callable<T> task);
@Override
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
@Override
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
@Override
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
@Override
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}

View File

@ -15,8 +15,8 @@
*/
package io.netty.util.concurrent;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -27,11 +27,11 @@ import java.util.concurrent.TimeUnit;
* to shut them down in a global fashion.
*
*/
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
public interface EventExecutorGroup extends ScheduledExecutorService {
/**
* Returns {@code true} if and only if this executor was started to be
* {@linkplain #shutdownGracefully() shut down gracefuclly} or was {@linkplain #isShutdown() shut down}.
* Returns {@code true} if and only if all {@link EventExecutor}s managed by this {@link EventExecutorGroup}
* are being {@linkplain #shutdownGracefully() shut down gracefuclly} or was {@linkplain #isShutdown() shut down}.
*/
boolean isShuttingDown();
@ -59,7 +59,8 @@ public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<E
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
/**
* Returns the {@link Future} which is notified when this executor has been terminated.
* Returns the {@link Future} which is notified when all {@link EventExecutor}s managed by this
* {@link EventExecutorGroup} have been terminated.
*/
Future<?> terminationFuture();
@ -78,16 +79,14 @@ public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<E
List<Runnable> shutdownNow();
/**
* Returns one of the {@link EventExecutor}s that belong to this group.
* Returns one of the {@link EventExecutor}s managed by this {@link EventExecutorGroup}.
*/
EventExecutor next();
/**
* Returns a read-only {@link Iterator} over all {@link EventExecutor}, which are handled by this
* {@link EventExecutorGroup} at the time of invoke this method.
* Returns the unmodifiable set of {@link EventExecutor}s managed by this {@link EventExecutorGroup}.
*/
@Override
Iterator<EventExecutor> iterator();
<E extends EventExecutor> Set<E> children();
@Override
Future<?> submit(Runnable task);

View File

@ -64,11 +64,6 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
delayedTaskQueue.add(purgeTask);
}
@Override
public EventExecutorGroup parent() {
return null;
}
/**
* Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
*

View File

@ -21,18 +21,14 @@ import java.util.concurrent.TimeUnit;
* {@link AbstractEventExecutor} which execute tasks in the callers thread.
*/
public final class ImmediateEventExecutor extends AbstractEventExecutor {
public static final ImmediateEventExecutor INSTANCE = new ImmediateEventExecutor();
private final Future<?> terminationFuture = new FailedFuture<Object>(
GlobalEventExecutor.INSTANCE, new UnsupportedOperationException());
private ImmediateEventExecutor() {
// use static instance
}
@Override
public EventExecutorGroup parent() {
return null;
// Singleton
}
@Override

View File

@ -16,8 +16,7 @@
package io.netty.util.concurrent;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
@ -31,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
private final AtomicInteger childIndex = new AtomicInteger();
private final AtomicInteger terminatedChildren = new AtomicInteger();
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
@ -62,7 +62,7 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new SingleThreadEventExecutor[nThreads];
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
@ -104,6 +104,10 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
protected ThreadFactory newDefaultThreadFactory() {
@ -115,11 +119,6 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}
@Override
public Iterator<EventExecutor> iterator() {
return children().iterator();
}
/**
* Return the number of {@link EventExecutor} this implementation uses. This number is the maps
* 1:1 to the threads it use.
@ -128,13 +127,10 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
return children.length;
}
/**
* Return a safe-copy of all of the children of this group.
*/
protected Set<EventExecutor> children() {
Set<EventExecutor> children = Collections.newSetFromMap(new LinkedHashMap<EventExecutor, Boolean>());
Collections.addAll(children, this.children);
return children;
@Override
@SuppressWarnings("unchecked")
public final <E extends EventExecutor> Set<E> children() {
return (Set<E>) readonlyChildren;
}
/**

View File

@ -57,7 +57,6 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
}
};
private final EventExecutorGroup parent;
private final Queue<Runnable> taskQueue;
final Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
@ -98,14 +97,13 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
* @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
* executor thread
*/
protected SingleThreadEventExecutor(
EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
super(parent);
if (executor == null) {
throw new NullPointerException("executor");
}
this.parent = parent;
this.addTaskWakesUp = addTaskWakesUp;
this.executor = executor;
@ -122,11 +120,6 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
return new LinkedBlockingQueue<Runnable>();
}
@Override
public EventExecutorGroup parent() {
return parent;
}
/**
* Interrupt the current running {@link Thread}.
*/

View File

@ -20,10 +20,10 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.local.LocalServerChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.logging.LogLevel;
@ -44,7 +44,7 @@ public class LocalEcho {
// Address to bind on / connect to.
final LocalAddress addr = new LocalAddress(port);
EventLoopGroup serverGroup = new LocalEventLoopGroup();
EventLoopGroup serverGroup = new DefaultEventLoopGroup();
EventLoopGroup clientGroup = new NioEventLoopGroup(); // NIO event loops are also OK
try {
// Note that we can use any event loop to ensure certain local channels

View File

@ -68,24 +68,24 @@ public final class SocksServerConnectHandler extends SimpleChannelInboundHandler
final Channel inboundChannel = ctx.channel();
b.group(inboundChannel.eventLoop())
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new DirectClientInitializer(promise));
b.connect(request.host(), request.port())
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// Connection established use handler provided results
} else {
// Close the connection if the connection attempt has failed.
ctx.channel().writeAndFlush(
new SocksCmdResponse(SocksCmdStatus.FAILURE, request.addressType()));
SocksServerUtils.closeOnFlush(ctx.channel());
}
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new DirectClientInitializer(promise));
b.connect(request.host(), request.port()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// Connection established use handler provided results
} else {
// Close the connection if the connection attempt has failed.
ctx.channel().writeAndFlush(
new SocksCmdResponse(SocksCmdStatus.FAILURE, request.addressType()));
SocksServerUtils.closeOnFlush(ctx.channel());
}
});
}
});
}
@Override

View File

@ -18,8 +18,8 @@ package io.netty.example.uptime;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
@ -62,10 +62,10 @@ public class UptimeClient {
.channel(NioSocketChannel.class)
.remoteAddress(host, port)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(READ_TIMEOUT, 0, 0), handler);
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(READ_TIMEOUT, 0, 0), handler);
}
});
return b;

View File

@ -23,10 +23,10 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -42,7 +42,7 @@ public class SocketEchoTest extends AbstractSocketTest {
private static final Random random = new Random();
static final byte[] data = new byte[1048576];
private static EventExecutorGroup group;
private static EventLoopGroup group;
static {
random.nextBytes(data);
@ -50,7 +50,7 @@ public class SocketEchoTest extends AbstractSocketTest {
@BeforeClass
public static void createGroup() {
group = new DefaultEventExecutorGroup(2);
group = new DefaultEventLoopGroup(2);
}
@AfterClass

View File

@ -21,6 +21,8 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
@ -30,8 +32,6 @@ import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.testsuite.util.BogusSslContextFactory;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -46,11 +46,11 @@ import static org.junit.Assert.*;
public class SocketStartTlsTest extends AbstractSocketTest {
private static final LogLevel LOG_LEVEL = LogLevel.TRACE;
private static EventExecutorGroup executor;
private static EventLoopGroup executor;
@BeforeClass
public static void createExecutor() {
executor = new DefaultEventExecutorGroup(2);
executor = new DefaultEventLoopGroup(2);
}
@AfterClass
@ -64,7 +64,7 @@ public class SocketStartTlsTest extends AbstractSocketTest {
}
public void testStartTls(ServerBootstrap sb, Bootstrap cb) throws Throwable {
final EventExecutorGroup executor = SocketStartTlsTest.executor;
final EventLoopGroup executor = SocketStartTlsTest.executor;
final SSLEngine sse = BogusSslContextFactory.getServerContext().createSSLEngine();
final SSLEngine cse = BogusSslContextFactory.getClientContext().createSSLEngine();

View File

@ -19,7 +19,6 @@ import com.sun.nio.sctp.Association;
import com.sun.nio.sctp.MessageInfo;
import com.sun.nio.sctp.NotificationHandler;
import com.sun.nio.sctp.SctpChannel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
@ -29,7 +28,6 @@ import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.nio.AbstractNioMessageChannel;
import io.netty.channel.sctp.DefaultSctpChannelConfig;

View File

@ -17,7 +17,6 @@ package io.netty.channel.udt.nio;
import com.barchart.udt.TypeUDT;
import com.barchart.udt.nio.ServerSocketChannelUDT;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.EventLoop;

View File

@ -15,9 +15,9 @@
*/
package io.netty.channel.udt.nio;
import io.netty.channel.EventLoop;
import com.barchart.udt.TypeUDT;
import io.netty.channel.EventLoop;
/**
* Byte Channel Rendezvous for UDT Streams.

View File

@ -16,7 +16,6 @@
package io.netty.channel.udt.nio;
import com.barchart.udt.TypeUDT;
import io.netty.channel.EventLoop;
import io.netty.channel.udt.UdtMessage;
@ -30,5 +29,4 @@ public class NioUdtMessageRendezvousChannel extends NioUdtMessageConnectorChanne
public NioUdtMessageRendezvousChannel(EventLoop eventLoop) {
super(eventLoop, NioUdtProvider.newRendezvousChannelUDT(TypeUDT.DATAGRAM));
}
}

View File

@ -241,8 +241,8 @@ public abstract class NioUdtProvider {
super(type, kind);
}
@SuppressWarnings("unchecked")
@Override
@SuppressWarnings("unchecked")
public T newChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
switch (kind()) {
case ACCEPTOR:

View File

@ -31,6 +31,6 @@ public class NioUdtByteAcceptorChannelTest extends AbstractUdtTest {
@Test
public void metadata() throws Exception {
EventLoop loop = new NioEventLoopGroup().next();
assertEquals(false, new NioUdtByteAcceptorChannel(loop, new NioEventLoopGroup()).metadata().hasDisconnect());
assertFalse(new NioUdtByteAcceptorChannel(loop, new NioEventLoopGroup()).metadata().hasDisconnect());
}
}

View File

@ -21,7 +21,7 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.udt.nio.NioUdtByteConnectorChannel;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;
public class NioUdtByteConnectorChannelTest extends AbstractUdtTest {
@ -31,6 +31,6 @@ public class NioUdtByteConnectorChannelTest extends AbstractUdtTest {
@Test
public void metadata() throws Exception {
EventLoop loop = new NioEventLoopGroup().next();
assertEquals(false, new NioUdtByteConnectorChannel(loop).metadata().hasDisconnect());
assertFalse(new NioUdtByteConnectorChannel(loop).metadata().hasDisconnect());
}
}

View File

@ -31,6 +31,6 @@ public class NioUdtMessageAcceptorChannelTest extends AbstractUdtTest {
@Test
public void metadata() throws Exception {
EventLoop loop = new NioEventLoopGroup().next();
assertEquals(false, new NioUdtMessageAcceptorChannel(loop, new NioEventLoopGroup()).metadata().hasDisconnect());
assertFalse(new NioUdtMessageAcceptorChannel(loop, new NioEventLoopGroup()).metadata().hasDisconnect());
}
}

View File

@ -31,6 +31,6 @@ public class NioUdtMessageConnectorChannelTest extends AbstractUdtTest {
@Test
public void metadata() throws Exception {
EventLoop loop = new NioEventLoopGroup().next();
assertEquals(false, new NioUdtMessageConnectorChannel(loop).metadata().hasDisconnect());
assertFalse(new NioUdtMessageConnectorChannel(loop).metadata().hasDisconnect());
}
}

View File

@ -21,7 +21,6 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.udt.UdtServerChannel;
import io.netty.channel.udt.nio.NioUdtProvider;
import org.junit.Test;
import static org.junit.Assert.*;

View File

@ -22,9 +22,9 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.StringUtil;
import java.net.InetAddress;
@ -169,14 +169,14 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
/**
* Returns a deep clone of this bootstrap which has the identical configuration. This method is useful when making
* multiple {@link Channel}s with similar settings. Please note that this method does not clone the
* {@link EventLoopGroup} deeply but shallowly, making the group a shared resource.
* {@link EventExecutorGroup} deeply but shallowly, making the group a shared resource.
*/
@Override
@SuppressWarnings("CloneDoesntDeclareCloneNotSupportedException")
public abstract B clone();
/**
* Create a new {@link Channel} and register it with an {@link EventLoop}.
* Create a new {@link Channel} and register it with an {@link EventExecutorGroup}.
*/
public ChannelFuture register() {
validate();

View File

@ -29,6 +29,7 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -118,8 +119,8 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
}
/**
* Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
* {@link EventLoopGroup}'s are used to handle all the events and IO for {@link SocketChannel} and
* Set the {@link EventExecutorGroup} for the parent (acceptor) and the child (client). These
* {@link EventExecutorGroup}'s are used to handle all the events and IO for {@link SocketChannel} and
* {@link Channel}'s.
*/
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {

View File

@ -371,6 +371,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private ChannelOutboundBuffer outboundBuffer = ChannelOutboundBuffer.newInstance(AbstractChannel.this);
private boolean inFlush0;
@Override
public final ChannelHandlerInvoker invoker() {
return eventLoop.asInvoker();
}
@Override
public final ChannelOutboundBuffer outboundBuffer() {
return outboundBuffer;
@ -715,7 +720,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
protected abstract SocketAddress remoteAddress0();
/**
* Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
* Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register
* process.
*
* Sub-classes may override this method
*/

View File

@ -0,0 +1,39 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.concurrent.AbstractEventExecutor;
/**
* Skeletal implementation of {@link EventLoop}.
*/
public abstract class AbstractEventLoop extends AbstractEventExecutor implements EventLoop {
protected AbstractEventLoop(EventLoopGroup parent) {
super(parent);
}
@Override
public EventLoopGroup parent() {
return (EventLoopGroup) super.parent();
}
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
}

View File

@ -0,0 +1,27 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.concurrent.AbstractEventExecutorGroup;
/**
* Skeletal implementation of {@link EventLoopGroup}.
*/
public abstract class AbstractEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup {
@Override
public abstract EventLoop next();
}

View File

@ -34,7 +34,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private EventLoopGroup childGroup;
private final EventLoopGroup childGroup;
/**
* Creates a new instance.

View File

@ -66,7 +66,7 @@ import java.net.SocketAddress;
* operations. For example, with the old I/O datagram transport, multicast
* join / leave operations are provided by {@link DatagramChannel}.
*/
public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPropertyAccess, Comparable<Channel> {
public interface Channel extends AttributeMap, ChannelOutboundOps, ChannelPropertyAccess, Comparable<Channel> {
/**
* Returns the globally unique identifier of this {@link Channel}.
@ -168,14 +168,21 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr
* are only provided to implement the actual transport, and must be invoked from an I/O thread except for the
* following methods:
* <ul>
* <li>{@link #invoker()}</li>
* <li>{@link #localAddress()}</li>
* <li>{@link #remoteAddress()}</li>
* <li>{@link #closeForcibly()}</li>
* <li>{@link #register(EventLoop, ChannelPromise)}</li>
* <li>{@link #register(ChannelPromise)}</li>
* <li>{@link #voidPromise()}</li>
* </ul>
*/
interface Unsafe {
/**
* Returns the {@link ChannelHandlerInvoker} which is used by default unless specified by a user.
*/
ChannelHandlerInvoker invoker();
/**
* Return the {@link SocketAddress} to which is bound local or
* {@code null} if none.
@ -189,7 +196,7 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr
SocketAddress remoteAddress();
/**
* Register the {@link Channel} of the {@link ChannelPromise} with the {@link EventLoop} and notify
* Register the {@link Channel} of the {@link ChannelPromise} and notify
* the {@link ChannelFuture} once the registration was complete.
*/
void register(ChannelPromise promise);

View File

@ -26,25 +26,28 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Handles or intercepts a {@link ChannelInboundInvoker} or {@link ChannelOutboundInvoker} operation, and forwards it
* to the next handler in a {@link ChannelPipeline}.
* Handles an I/O event or intercepts an I/O operation, and forwards it to its next handler in
* its {@link ChannelPipeline}.
*
* <h3>Sub-types</h3>
* <p>
* {@link ChannelHandler} itself does not provide many methods. To handle a
* a {@link ChannelInboundInvoker} or {@link ChannelOutboundInvoker} operation
* you need to implement its sub-interfaces. There are many different sub-interfaces
* which handles inbound and outbound operations.
*
* But the most useful for developers may be:
* {@link ChannelHandler} itself does not provide many methods, but you usually have to implement one of its subtypes:
* <ul>
* <li>{@link ChannelInboundHandlerAdapter} handles and intercepts inbound operations</li>
* <li>{@link ChannelOutboundHandlerAdapter} handles and intercepts outbound operations</li>
* <li>{@link ChannelInboundHandler} to handle inbound I/O events, and</li>
* <li>{@link ChannelOutboundHandler} to handle outbound I/O operations.</li>
* </ul>
*
* You will also find more detailed explanation from the documentation of
* each sub-interface on how an event is interpreted when it goes upstream and
* downstream respectively.
* </p>
* <p>
* Alternatively, the following adapter classes are provided for your convenience:
* <ul>
* <li>{@link ChannelInboundHandlerAdapter} to handle inbound I/O events,</li>
* <li>{@link ChannelOutboundHandlerAdapter} to handle outbound I/O operations, and</li>
* <li>{@link ChannelHandlerAdapter} to handle both inbound and outbound events</li>
* </ul>
* </p>
* <p>
* For more information, please refer to the documentation of each subtype.
* </p>
*
* <h3>The context object</h3>
* <p>

View File

@ -30,9 +30,8 @@ import java.nio.channels.Channels;
*
* <h3>Notify</h3>
*
* You can notify the closest handler in the
* same {@link ChannelPipeline} by calling one of the various methods which are listed in {@link ChannelInboundInvoker}
* and {@link ChannelOutboundInvoker}. Please refer to {@link ChannelPipeline} to understand how an event flows.
* You can notify the closest handler in the same {@link ChannelPipeline} by calling one of the various method.
* Please refer to {@link ChannelPipeline} to understand how an event flows.
*
* <h3>Modifying a pipeline</h3>
*
@ -123,8 +122,7 @@ import java.nio.channels.Channels;
* the operation in your application.
*/
public interface ChannelHandlerContext
extends AttributeMap, ChannelPropertyAccess,
ChannelInboundInvoker, ChannelOutboundInvoker {
extends AttributeMap, ChannelPropertyAccess, ChannelInboundOps, ChannelOutboundOps {
/**
* Return the {@link Channel} which is bound to the {@link ChannelHandlerContext}.
@ -132,9 +130,7 @@ public interface ChannelHandlerContext
Channel channel();
/**
* The {@link EventExecutor} that is used to dispatch the events. This can also be used to directly
* submit tasks that get executed in the event loop. For more information please refer to the
* {@link EventExecutor} javadoc.
* Returns the {@link EventExecutor} which is used to execute an arbitrary task.
*/
EventExecutor executor();
@ -153,7 +149,7 @@ public interface ChannelHandlerContext
/**
* Return {@code true} if the {@link ChannelHandler} which belongs to this {@link ChannelHandler} was removed
* from the {@link ChannelPipeline}. Note that this method is only meant to be called from with in the
* {@link EventLoop}.
* {@link EventExecutor}.
*/
boolean isRemoved();

View File

@ -0,0 +1,149 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.concurrent.EventExecutor;
import java.net.SocketAddress;
/**
* Invokes the event handler methods of {@link ChannelInboundHandler} and {@link ChannelOutboundHandler}.
* A user can specify a {@link ChannelHandlerInvoker} to implement a custom thread model unsupported by the default
* implementation.
*/
public interface ChannelHandlerInvoker {
/**
* Returns the {@link EventExecutor} which is used to execute an arbitrary task.
*/
EventExecutor executor();
/**
* Invokes {@link ChannelInboundHandler#channelRegistered(ChannelHandlerContext)}. This method is not for a user
* but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeChannelRegistered(ChannelHandlerContext ctx);
/**
* Invokes {@link ChannelInboundHandler#channelActive(ChannelHandlerContext)}. This method is not for a user
* but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeChannelActive(ChannelHandlerContext ctx);
/**
* Invokes {@link ChannelInboundHandler#channelInactive(ChannelHandlerContext)}. This method is not for a user
* but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeChannelInactive(ChannelHandlerContext ctx);
/**
* Invokes {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}. This method is not for a user
* but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeExceptionCaught(ChannelHandlerContext ctx, Throwable cause);
/**
* Invokes {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)}. This method is not for
* a user but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeUserEventTriggered(ChannelHandlerContext ctx, Object event);
/**
* Invokes {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is not for a user
* but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeChannelRead(ChannelHandlerContext ctx, Object msg);
/**
* Invokes {@link ChannelInboundHandler#channelReadComplete(ChannelHandlerContext)}. This method is not for a user
* but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeChannelReadComplete(ChannelHandlerContext ctx);
/**
* Invokes {@link ChannelInboundHandler#channelWritabilityChanged(ChannelHandlerContext)}. This method is not for
* a user but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeChannelWritabilityChanged(ChannelHandlerContext ctx);
/**
* Invokes {@link ChannelOutboundHandler#bind(ChannelHandlerContext, SocketAddress, ChannelPromise)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeBind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise);
/**
* Invokes
* {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeConnect(
ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
/**
* Invokes {@link ChannelOutboundHandler#disconnect(ChannelHandlerContext, ChannelPromise)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeDisconnect(ChannelHandlerContext ctx, ChannelPromise promise);
/**
* Invokes {@link ChannelOutboundHandler#close(ChannelHandlerContext, ChannelPromise)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeClose(ChannelHandlerContext ctx, ChannelPromise promise);
/**
* Invokes {@link ChannelOutboundHandler#read(ChannelHandlerContext)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeRead(ChannelHandlerContext ctx);
/**
* Invokes {@link ChannelOutboundHandler#write(ChannelHandlerContext, Object, ChannelPromise)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise);
/**
* Invokes {@link ChannelOutboundHandler#write(ChannelHandlerContext, Object, ChannelPromise)} and
* {@link ChannelOutboundHandler#flush(ChannelHandlerContext)} sequentially.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeWriteAndFlush(ChannelHandlerContext ctx, Object msg, ChannelPromise promise);
/**
* Invokes {@link ChannelOutboundHandler#flush(ChannelHandlerContext)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeFlush(ChannelHandlerContext ctx);
}

View File

@ -0,0 +1,207 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import java.net.SocketAddress;
import static io.netty.channel.DefaultChannelPipeline.*;
/**
* A set of helper methods for easier implementation of custom {@link ChannelHandlerInvoker} implementation.
*/
public final class ChannelHandlerInvokerUtil {
public static void invokeChannelRegisteredNow(ChannelHandlerContext ctx) {
try {
((ChannelInboundHandler) ctx.handler()).channelRegistered(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeChannelActiveNow(final ChannelHandlerContext ctx) {
try {
((ChannelInboundHandler) ctx.handler()).channelActive(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeChannelInactiveNow(final ChannelHandlerContext ctx) {
try {
((ChannelInboundHandler) ctx.handler()).channelInactive(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeExceptionCaughtNow(final ChannelHandlerContext ctx, final Throwable cause) {
try {
ctx.handler().exceptionCaught(ctx, cause);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn(
"An exception was thrown by a user handler's " +
"exceptionCaught() method while handling the following exception:", cause);
}
}
}
public static void invokeUserEventTriggeredNow(final ChannelHandlerContext ctx, final Object event) {
try {
((ChannelInboundHandler) ctx.handler()).userEventTriggered(ctx, event);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeChannelReadNow(final ChannelHandlerContext ctx, final Object msg) {
try {
((ChannelInboundHandler) ctx.handler()).channelRead(ctx, msg);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeChannelReadCompleteNow(final ChannelHandlerContext ctx) {
try {
((ChannelInboundHandler) ctx.handler()).channelReadComplete(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeChannelWritabilityChangedNow(final ChannelHandlerContext ctx) {
try {
((ChannelInboundHandler) ctx.handler()).channelWritabilityChanged(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeBindNow(
final ChannelHandlerContext ctx, final SocketAddress localAddress, final ChannelPromise promise) {
try {
((ChannelOutboundHandler) ctx.handler()).bind(ctx, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
public static void invokeConnectNow(
final ChannelHandlerContext ctx,
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
try {
((ChannelOutboundHandler) ctx.handler()).connect(ctx, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
public static void invokeDisconnectNow(final ChannelHandlerContext ctx, final ChannelPromise promise) {
try {
((ChannelOutboundHandler) ctx.handler()).disconnect(ctx, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
public static void invokeCloseNow(final ChannelHandlerContext ctx, final ChannelPromise promise) {
try {
((ChannelOutboundHandler) ctx.handler()).close(ctx, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
public static void invokeReadNow(final ChannelHandlerContext ctx) {
try {
((ChannelOutboundHandler) ctx.handler()).read(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeWriteNow(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) ctx.handler()).write(ctx, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
public static void invokeFlushNow(final ChannelHandlerContext ctx) {
try {
((ChannelOutboundHandler) ctx.handler()).flush(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeWriteAndFlushNow(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
invokeWriteNow(ctx, msg, promise);
invokeFlushNow(ctx);
}
private static void notifyHandlerException(ChannelHandlerContext ctx, Throwable cause) {
if (inExceptionCaught(cause)) {
if (logger.isWarnEnabled()) {
logger.warn(
"An exception was thrown by a user handler " +
"while handling an exceptionCaught event", cause);
}
return;
}
invokeExceptionCaughtNow(ctx, cause);
}
private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
// only try to fail the promise if its not a VoidChannelPromise, as
// the VoidChannelPromise would also fire the cause through the pipeline
if (promise instanceof VoidChannelPromise) {
return;
}
if (!promise.tryFailure(cause)) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to fail the promise because it's done already: {}", promise, cause);
}
}
}
private static boolean inExceptionCaught(Throwable cause) {
do {
StackTraceElement[] trace = cause.getStackTrace();
if (trace != null) {
for (StackTraceElement t : trace) {
if (t == null) {
break;
}
if ("exceptionCaught".equals(t.getMethodName())) {
return true;
}
}
}
cause = cause.getCause();
} while (cause != null);
return false;
}
private ChannelHandlerInvokerUtil() { }
}

View File

@ -19,7 +19,7 @@ package io.netty.channel;
/**
* Interface which is shared by others which need to fire inbound events
*/
interface ChannelInboundInvoker {
interface ChannelInboundOps {
/**
* A {@link Channel} was registered to its {@link EventLoop}.
@ -28,7 +28,7 @@ interface ChannelInboundInvoker {
* called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelInboundInvoker fireChannelRegistered();
ChannelInboundOps fireChannelRegistered();
/**
* A {@link Channel} is active now, which means it is connected.
@ -37,7 +37,7 @@ interface ChannelInboundInvoker {
* called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelInboundInvoker fireChannelActive();
ChannelInboundOps fireChannelActive();
/**
* A {@link Channel} is inactive now, which means it is closed.
@ -46,7 +46,7 @@ interface ChannelInboundInvoker {
* called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelInboundInvoker fireChannelInactive();
ChannelInboundOps fireChannelInactive();
/**
* A {@link Channel} received an {@link Throwable} in one of its inbound operations.
@ -55,7 +55,7 @@ interface ChannelInboundInvoker {
* method called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelInboundInvoker fireExceptionCaught(Throwable cause);
ChannelInboundOps fireExceptionCaught(Throwable cause);
/**
* A {@link Channel} received an user defined event.
@ -64,7 +64,7 @@ interface ChannelInboundInvoker {
* method called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelInboundInvoker fireUserEventTriggered(Object event);
ChannelInboundOps fireUserEventTriggered(Object event);
/**
* A {@link Channel} received a message.
@ -73,13 +73,13 @@ interface ChannelInboundInvoker {
* method called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelInboundInvoker fireChannelRead(Object msg);
ChannelInboundOps fireChannelRead(Object msg);
ChannelInboundInvoker fireChannelReadComplete();
ChannelInboundOps fireChannelReadComplete();
/**
* Triggers an {@link ChannelInboundHandler#channelWritabilityChanged(ChannelHandlerContext)}
* event to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*/
ChannelInboundInvoker fireChannelWritabilityChanged();
ChannelInboundOps fireChannelWritabilityChanged();
}

View File

@ -21,7 +21,7 @@ import java.net.SocketAddress;
/**
* Interface which is shared by others which need to execute outbound logic.
*/
interface ChannelOutboundInvoker {
interface ChannelOutboundOps {
/**
* Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation
@ -171,7 +171,7 @@ interface ChannelOutboundInvoker {
* method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelOutboundInvoker read();
ChannelOutboundOps read();
/**
* Request to write a message via this ChannelOutboundInvoker through the {@link ChannelPipeline}.
@ -190,7 +190,7 @@ interface ChannelOutboundInvoker {
/**
* Request to flush all pending messages via this ChannelOutboundInvoker.
*/
ChannelOutboundInvoker flush();
ChannelOutboundOps flush();
/**
* Shortcut for call {@link #write(Object, ChannelPromise)} and {@link #flush()}.

View File

@ -212,7 +212,7 @@ import java.util.NoSuchElementException;
* after the exchange.
*/
public interface ChannelPipeline
extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {
extends ChannelInboundOps, ChannelOutboundOps, Iterable<Entry<String, ChannelHandler>> {
/**
* Inserts a {@link ChannelHandler} at the first position of this pipeline.
@ -242,6 +242,20 @@ public interface ChannelPipeline
*/
ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} at the first position of this pipeline.
*
* @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods
* @param name the name of the handler to insert first
* @param handler the handler to insert first
*
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified name or handler is {@code null}
*/
ChannelPipeline addFirst(ChannelHandlerInvoker invoker, String name, ChannelHandler handler);
/**
* Appends a {@link ChannelHandler} at the last position of this pipeline.
*
@ -270,6 +284,20 @@ public interface ChannelPipeline
*/
ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
/**
* Appends a {@link ChannelHandler} at the last position of this pipeline.
*
* @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods
* @param name the name of the handler to append
* @param handler the handler to append
*
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified name or handler is {@code null}
*/
ChannelPipeline addLast(ChannelHandlerInvoker invoker, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} before an existing handler of this
* pipeline.
@ -306,6 +334,24 @@ public interface ChannelPipeline
*/
ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} before an existing handler of this
* pipeline.
*
* @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods
* @param baseName the name of the existing handler
* @param name the name of the handler to insert before
* @param handler the handler to insert before
*
* @throws NoSuchElementException
* if there's no such entry with the specified {@code baseName}
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified baseName, name, or handler is {@code null}
*/
ChannelPipeline addBefore(ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} after an existing handler of this
* pipeline.
@ -342,6 +388,24 @@ public interface ChannelPipeline
*/
ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} after an existing handler of this
* pipeline.
*
* @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods
* @param baseName the name of the existing handler
* @param name the name of the handler to insert after
* @param handler the handler to insert after
*
* @throws NoSuchElementException
* if there's no such entry with the specified {@code baseName}
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified baseName, name, or handler is {@code null}
*/
ChannelPipeline addAfter(ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler}s at the first position of this pipeline.
*
@ -360,6 +424,15 @@ public interface ChannelPipeline
*/
ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);
/**
* Inserts a {@link ChannelHandler}s at the first position of this pipeline.
*
* @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods
* @param handlers the handlers to insert first
*
*/
ChannelPipeline addFirst(ChannelHandlerInvoker invoker, ChannelHandler... handlers);
/**
* Inserts a {@link ChannelHandler}s at the last position of this pipeline.
*
@ -378,6 +451,15 @@ public interface ChannelPipeline
*/
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
/**
* Inserts a {@link ChannelHandler}s at the last position of this pipeline.
*
* @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods
* @param handlers the handlers to insert last
*
*/
ChannelPipeline addLast(ChannelHandlerInvoker invoker, ChannelHandler... handlers);
/**
* Removes the specified {@link ChannelHandler} from this pipeline.
*

View File

@ -17,7 +17,6 @@ package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/**
@ -60,11 +59,8 @@ interface ChannelPropertyAccess {
ChannelFuture newFailedFuture(Throwable cause);
/**
* Return a special ChannelPromise which can be reused for different operations.
* <p>
* It's only supported to use
* it for {@link ChannelOutboundInvoker#write(Object, ChannelPromise)}.
* </p>
* Return a special ChannelPromise which can be reused for {@code write(..)} operations. Using it for other
* outbound operations will fail with undetermined consequences.
* <p>
* Be aware that the returned {@link ChannelPromise} will not support most operations and should only be used
* if you want to save an object allocation for every write operation. You will not be able to detect if the

View File

@ -15,13 +15,9 @@
*/
package io.netty.channel;
import static io.netty.channel.DefaultChannelPipeline.logger;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.DefaultAttributeMap;
import io.netty.util.Recycler;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.StringUtil;
import java.net.SocketAddress;
@ -38,17 +34,17 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
// Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor.
final EventExecutor executor;
final ChannelHandlerInvoker invoker;
private ChannelFuture succeededFuture;
// Lazily instantiated tasks used to trigger events to a handler with different executor.
private Runnable invokeChannelReadCompleteTask;
private Runnable invokeReadTask;
private Runnable invokeFlushTask;
private Runnable invokeChannelWritableStateChangedTask;
Runnable invokeChannelReadCompleteTask;
Runnable invokeReadTask;
Runnable invokeFlushTask;
Runnable invokeChannelWritableStateChangedTask;
DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutorGroup group, String name,
ChannelHandler handler) {
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
if (name == null) {
throw new NullPointerException("name");
@ -62,17 +58,10 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
this.name = name;
this.handler = handler;
if (group != null) {
// Pin one of the child executors once and remember it so that the same child executor
// is used to fire events for the same channel.
EventExecutor childExecutor = pipeline.childExecutors.get(group);
if (childExecutor == null) {
childExecutor = group.next();
pipeline.childExecutors.put(group, childExecutor);
}
executor = childExecutor;
if (invoker == null) {
this.invoker = channel.unsafe().invoker();
} else {
executor = null;
this.invoker = invoker;
}
}
@ -118,11 +107,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public EventExecutor executor() {
if (executor == null) {
return channel().eventLoop();
} else {
return executor;
}
return invoker.executor();
}
@Override
@ -137,237 +122,60 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public ChannelHandlerContext fireChannelRegistered() {
final DefaultChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
DefaultChannelHandlerContext next = findContextInbound();
next.invoker.invokeChannelRegistered(next);
return this;
}
private void invokeChannelRegistered() {
try {
((ChannelInboundHandler) handler).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override
public ChannelHandlerContext fireChannelActive() {
final DefaultChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
DefaultChannelHandlerContext next = findContextInbound();
next.invoker.invokeChannelActive(next);
return this;
}
private void invokeChannelActive() {
try {
((ChannelInboundHandler) handler).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override
public ChannelHandlerContext fireChannelInactive() {
final DefaultChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelInactive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelInactive();
}
});
}
DefaultChannelHandlerContext next = findContextInbound();
next.invoker.invokeChannelInactive(next);
return this;
}
private void invokeChannelInactive() {
try {
((ChannelInboundHandler) handler).channelInactive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
if (cause == null) {
throw new NullPointerException("cause");
}
final DefaultChannelHandlerContext next = this.next;
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeExceptionCaught(cause);
} else {
try {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeExceptionCaught(cause);
}
});
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to submit an exceptionCaught() event.", t);
logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
}
}
}
public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
DefaultChannelHandlerContext next = this.next;
next.invoker.invokeExceptionCaught(next, cause);
return this;
}
private void invokeExceptionCaught(final Throwable cause) {
try {
handler.exceptionCaught(this, cause);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn(
"An exception was thrown by a user handler's " +
"exceptionCaught() method while handling the following exception:", cause);
}
}
}
@Override
public ChannelHandlerContext fireUserEventTriggered(final Object event) {
if (event == null) {
throw new NullPointerException("event");
}
final DefaultChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeUserEventTriggered(event);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeUserEventTriggered(event);
}
});
}
public ChannelHandlerContext fireUserEventTriggered(Object event) {
DefaultChannelHandlerContext next = findContextInbound();
next.invoker.invokeUserEventTriggered(next, event);
return this;
}
private void invokeUserEventTriggered(Object event) {
try {
((ChannelInboundHandler) handler).userEventTriggered(this, event);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
if (msg == null) {
throw new NullPointerException("msg");
}
final DefaultChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(msg);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(msg);
}
});
}
public ChannelHandlerContext fireChannelRead(Object msg) {
DefaultChannelHandlerContext next = findContextInbound();
next.invoker.invokeChannelRead(next, msg);
return this;
}
private void invokeChannelRead(Object msg) {
try {
((ChannelInboundHandler) handler).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override
public ChannelHandlerContext fireChannelReadComplete() {
final DefaultChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelReadComplete();
} else {
Runnable task = next.invokeChannelReadCompleteTask;
if (task == null) {
next.invokeChannelReadCompleteTask = task = new Runnable() {
@Override
public void run() {
next.invokeChannelReadComplete();
}
};
}
executor.execute(task);
}
DefaultChannelHandlerContext next = findContextInbound();
next.invoker.invokeChannelReadComplete(next);
return this;
}
private void invokeChannelReadComplete() {
try {
((ChannelInboundHandler) handler).channelReadComplete(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override
public ChannelHandlerContext fireChannelWritabilityChanged() {
final DefaultChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelWritabilityChanged();
} else {
Runnable task = next.invokeChannelWritableStateChangedTask;
if (task == null) {
next.invokeChannelWritableStateChangedTask = task = new Runnable() {
@Override
public void run() {
next.invokeChannelWritabilityChanged();
}
};
}
executor.execute(task);
}
DefaultChannelHandlerContext next = findContextInbound();
next.invoker.invokeChannelWritabilityChanged(next);
return this;
}
private void invokeChannelWritabilityChanged() {
try {
((ChannelInboundHandler) handler).channelWritabilityChanged(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return bind(localAddress, newPromise());
@ -395,258 +203,72 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
validatePromise(promise, false);
final DefaultChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise);
}
DefaultChannelHandlerContext next = findContextOutbound();
next.invoker.invokeBind(next, localAddress, promise);
return promise;
}
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return connect(remoteAddress, null, promise);
}
@Override
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
validatePromise(promise, false);
final DefaultChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise);
}
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
DefaultChannelHandlerContext next = findContextOutbound();
next.invoker.invokeConnect(next, remoteAddress, localAddress, promise);
return promise;
}
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
@Override
public ChannelFuture disconnect(final ChannelPromise promise) {
validatePromise(promise, false);
final DefaultChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// Translate disconnect to close if the channel has no notion of disconnect-reconnect.
// So far, UDP/IP is the only transport that has such behavior.
if (!channel().metadata().hasDisconnect()) {
next.invokeClose(promise);
} else {
next.invokeDisconnect(promise);
}
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
if (!channel().metadata().hasDisconnect()) {
next.invokeClose(promise);
} else {
next.invokeDisconnect(promise);
}
}
}, promise);
public ChannelFuture disconnect(ChannelPromise promise) {
if (!channel().metadata().hasDisconnect()) {
return close(promise);
}
DefaultChannelHandlerContext next = findContextOutbound();
next.invoker.invokeDisconnect(next, promise);
return promise;
}
private void invokeDisconnect(ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler).disconnect(this, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
@Override
public ChannelFuture close(final ChannelPromise promise) {
validatePromise(promise, false);
final DefaultChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeClose(promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeClose(promise);
}
}, promise);
}
public ChannelFuture close(ChannelPromise promise) {
DefaultChannelHandlerContext next = findContextOutbound();
next.invoker.invokeClose(next, promise);
return promise;
}
private void invokeClose(ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler).close(this, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
@Override
public ChannelHandlerContext read() {
final DefaultChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeRead();
} else {
Runnable task = next.invokeReadTask;
if (task == null) {
next.invokeReadTask = task = new Runnable() {
@Override
public void run() {
next.invokeRead();
}
};
}
executor.execute(task);
}
DefaultChannelHandlerContext next = findContextOutbound();
next.invoker.invokeRead(next);
return this;
}
private void invokeRead() {
try {
((ChannelOutboundHandler) handler).read(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}
@Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
validatePromise(promise, true);
write(msg, false, promise);
public ChannelFuture write(Object msg, ChannelPromise promise) {
DefaultChannelHandlerContext next = findContextOutbound();
next.invoker.invokeWrite(next, msg, promise);
return promise;
}
private void invokeWrite(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
@Override
public ChannelHandlerContext flush() {
final DefaultChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeFlush();
} else {
Runnable task = next.invokeFlushTask;
if (task == null) {
next.invokeFlushTask = task = new Runnable() {
@Override
public void run() {
next.invokeFlush();
}
};
}
safeExecute(executor, task, channel.voidPromise());
}
DefaultChannelHandlerContext next = findContextOutbound();
next.invoker.invokeFlush(next);
return this;
}
private void invokeFlush() {
try {
((ChannelOutboundHandler) handler).flush(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
validatePromise(promise, true);
write(msg, true, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
DefaultChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeWrite(msg, promise);
if (flush) {
next.invokeFlush();
}
} else {
int size = channel.estimatorHandle().size(msg);
if (size > 0) {
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.incrementPendingOutboundBytes(size);
}
}
safeExecute(executor, WriteTask.newInstance(next, msg, size, flush, promise), promise);
}
next.invoker.invokeWriteAndFlush(next, msg, promise);
return promise;
}
@Override
@ -654,53 +276,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
return writeAndFlush(msg, newPromise());
}
private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
// only try to fail the promise if its not a VoidChannelPromise, as
// the VoidChannelPromise would also fire the cause through the pipeline
if (promise instanceof VoidChannelPromise) {
return;
}
if (!promise.tryFailure(cause)) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to fail the promise because it's done already: {}", promise, cause);
}
}
}
private void notifyHandlerException(Throwable cause) {
if (inExceptionCaught(cause)) {
if (logger.isWarnEnabled()) {
logger.warn(
"An exception was thrown by a user handler " +
"while handling an exceptionCaught event", cause);
}
return;
}
invokeExceptionCaught(cause);
}
private static boolean inExceptionCaught(Throwable cause) {
do {
StackTraceElement[] trace = cause.getStackTrace();
if (trace != null) {
for (StackTraceElement t : trace) {
if (t == null) {
break;
}
if ("exceptionCaught".equals(t.getMethodName())) {
return true;
}
}
}
cause = cause.getCause();
} while (cause != null);
return false;
}
@Override
public ChannelPromise newPromise() {
return new DefaultChannelPromise(channel(), executor());
@ -725,35 +300,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
return new FailedChannelFuture(channel(), executor(), cause);
}
private void validatePromise(ChannelPromise promise, boolean allowVoidPromise) {
if (promise == null) {
throw new NullPointerException("promise");
}
if (promise.isDone()) {
throw new IllegalArgumentException("promise already done: " + promise);
}
if (promise.channel() != channel()) {
throw new IllegalArgumentException(String.format(
"promise.channel does not match: %s (expected: %s)", promise.channel(), channel()));
}
if (promise.getClass() == DefaultChannelPromise.class) {
return;
}
if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
throw new IllegalArgumentException(
StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
}
if (promise instanceof AbstractChannel.CloseFuture) {
throw new IllegalArgumentException(
StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
}
}
private DefaultChannelHandlerContext findContextInbound() {
DefaultChannelHandlerContext ctx = this;
do {
@ -783,68 +329,4 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
public boolean isRemoved() {
return removed;
}
private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise) {
try {
executor.execute(runnable);
} catch (Throwable cause) {
promise.setFailure(cause);
}
}
static final class WriteTask implements Runnable {
private DefaultChannelHandlerContext ctx;
private Object msg;
private ChannelPromise promise;
private int size;
private boolean flush;
private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
@Override
protected WriteTask newObject(Handle handle) {
return new WriteTask(handle);
}
};
private static WriteTask newInstance(
DefaultChannelHandlerContext ctx, Object msg, int size, boolean flush, ChannelPromise promise) {
WriteTask task = RECYCLER.get();
task.ctx = ctx;
task.msg = msg;
task.promise = promise;
task.size = size;
task.flush = flush;
return task;
}
private final Recycler.Handle handle;
private WriteTask(Recycler.Handle handle) {
this.handle = handle;
}
@Override
public void run() {
try {
if (size > 0) {
ChannelOutboundBuffer buffer = ctx.channel.unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.decrementPendingOutboundBytes(size);
}
}
ctx.invokeWrite(msg, promise);
if (flush) {
ctx.invokeFlush();
}
} finally {
// Set to null so the GC can collect them directly
ctx = null;
msg = null;
promise = null;
RECYCLER.recycle(this, handle);
}
}
}
}

View File

@ -0,0 +1,459 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.StringUtil;
import java.net.SocketAddress;
import static io.netty.channel.ChannelHandlerInvokerUtil.*;
import static io.netty.channel.DefaultChannelPipeline.*;
public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
private final EventExecutor executor;
public DefaultChannelHandlerInvoker(EventExecutor executor) {
if (executor == null) {
throw new NullPointerException("executor");
}
this.executor = executor;
}
@Override
public EventExecutor executor() {
return executor;
}
@Override
public void invokeChannelRegistered(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeChannelRegisteredNow(ctx);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
invokeChannelRegisteredNow(ctx);
}
});
}
}
@Override
public void invokeChannelActive(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeChannelActiveNow(ctx);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
invokeChannelActiveNow(ctx);
}
});
}
}
@Override
public void invokeChannelInactive(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeChannelInactiveNow(ctx);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
invokeChannelInactiveNow(ctx);
}
});
}
}
@Override
public void invokeExceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
if (cause == null) {
throw new NullPointerException("cause");
}
if (executor.inEventLoop()) {
invokeExceptionCaughtNow(ctx, cause);
} else {
try {
executor.execute(new Runnable() {
@Override
public void run() {
invokeExceptionCaughtNow(ctx, cause);
}
});
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to submit an exceptionCaught() event.", t);
logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
}
}
}
}
@Override
public void invokeUserEventTriggered(final ChannelHandlerContext ctx, final Object event) {
if (event == null) {
throw new NullPointerException("event");
}
if (executor.inEventLoop()) {
invokeUserEventTriggeredNow(ctx, event);
} else {
safeExecuteInbound(new Runnable() {
@Override
public void run() {
invokeUserEventTriggeredNow(ctx, event);
}
}, event);
}
}
@Override
public void invokeChannelRead(final ChannelHandlerContext ctx, final Object msg) {
if (msg == null) {
throw new NullPointerException("msg");
}
if (executor.inEventLoop()) {
invokeChannelReadNow(ctx, msg);
} else {
safeExecuteInbound(new Runnable() {
@Override
public void run() {
invokeChannelReadNow(ctx, msg);
}
}, msg);
}
}
@Override
public void invokeChannelReadComplete(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeChannelReadCompleteNow(ctx);
} else {
DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx;
Runnable task = dctx.invokeChannelReadCompleteTask;
if (task == null) {
dctx.invokeChannelReadCompleteTask = task = new Runnable() {
@Override
public void run() {
invokeChannelReadCompleteNow(ctx);
}
};
}
executor.execute(task);
}
}
@Override
public void invokeChannelWritabilityChanged(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeChannelWritabilityChangedNow(ctx);
} else {
DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx;
Runnable task = dctx.invokeChannelWritableStateChangedTask;
if (task == null) {
dctx.invokeChannelWritableStateChangedTask = task = new Runnable() {
@Override
public void run() {
invokeChannelWritabilityChangedNow(ctx);
}
};
}
executor.execute(task);
}
}
@Override
public void invokeBind(
final ChannelHandlerContext ctx, final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
validatePromise(ctx, promise, false);
if (executor.inEventLoop()) {
invokeBindNow(ctx, localAddress, promise);
} else {
safeExecuteOutbound(new Runnable() {
@Override
public void run() {
invokeBindNow(ctx, localAddress, promise);
}
}, promise);
}
}
@Override
public void invokeConnect(
final ChannelHandlerContext ctx,
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
validatePromise(ctx, promise, false);
if (executor.inEventLoop()) {
invokeConnectNow(ctx, remoteAddress, localAddress, promise);
} else {
safeExecuteOutbound(new Runnable() {
@Override
public void run() {
invokeConnectNow(ctx, remoteAddress, localAddress, promise);
}
}, promise);
}
}
@Override
public void invokeDisconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
validatePromise(ctx, promise, false);
if (executor.inEventLoop()) {
invokeDisconnectNow(ctx, promise);
} else {
safeExecuteOutbound(new Runnable() {
@Override
public void run() {
invokeDisconnectNow(ctx, promise);
}
}, promise);
}
}
@Override
public void invokeClose(final ChannelHandlerContext ctx, final ChannelPromise promise) {
validatePromise(ctx, promise, false);
if (executor.inEventLoop()) {
invokeCloseNow(ctx, promise);
} else {
safeExecuteOutbound(new Runnable() {
@Override
public void run() {
invokeCloseNow(ctx, promise);
}
}, promise);
}
}
@Override
public void invokeRead(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeReadNow(ctx);
} else {
DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx;
Runnable task = dctx.invokeReadTask;
if (task == null) {
dctx.invokeReadTask = task = new Runnable() {
@Override
public void run() {
invokeReadNow(ctx);
}
};
}
executor.execute(task);
}
}
@Override
public void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
validatePromise(ctx, promise, true);
invokeWrite(ctx, msg, false, promise);
}
private void invokeWrite(ChannelHandlerContext ctx, Object msg, boolean flush, ChannelPromise promise) {
if (executor.inEventLoop()) {
invokeWriteNow(ctx, msg, promise);
if (flush) {
invokeFlushNow(ctx);
}
} else {
AbstractChannel channel = (AbstractChannel) ctx.channel();
int size = channel.estimatorHandle().size(msg);
if (size > 0) {
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.incrementPendingOutboundBytes(size);
}
}
safeExecuteOutbound(WriteTask.newInstance(ctx, msg, size, flush, promise), promise, msg);
}
}
@Override
public void invokeFlush(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeFlushNow(ctx);
} else {
DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx;
Runnable task = dctx.invokeFlushTask;
if (task == null) {
dctx.invokeFlushTask = task = new Runnable() {
@Override
public void run() {
invokeFlushNow(ctx);
}
};
}
executor.execute(task);
}
}
@Override
public void invokeWriteAndFlush(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
validatePromise(ctx, promise, true);
invokeWrite(ctx, msg, true, promise);
}
private static void validatePromise(ChannelHandlerContext ctx, ChannelPromise promise, boolean allowVoidPromise) {
if (ctx == null) {
throw new NullPointerException("ctx");
}
if (promise == null) {
throw new NullPointerException("promise");
}
if (promise.isDone()) {
throw new IllegalArgumentException("promise already done: " + promise);
}
if (promise.channel() != ctx.channel()) {
throw new IllegalArgumentException(String.format(
"promise.channel does not match: %s (expected: %s)", promise.channel(), ctx.channel()));
}
if (promise.getClass() == DefaultChannelPromise.class) {
return;
}
if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
throw new IllegalArgumentException(
StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
}
if (promise instanceof AbstractChannel.CloseFuture) {
throw new IllegalArgumentException(
StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
}
}
private void safeExecuteInbound(Runnable task, Object msg) {
boolean success = false;
try {
executor.execute(task);
success = true;
} finally {
if (!success) {
ReferenceCountUtil.release(msg);
}
}
}
private void safeExecuteOutbound(Runnable task, ChannelPromise promise) {
try {
executor.execute(task);
} catch (Throwable cause) {
promise.setFailure(cause);
}
}
private void safeExecuteOutbound(Runnable task, ChannelPromise promise, Object msg) {
try {
executor.execute(task);
} catch (Throwable cause) {
try {
promise.setFailure(cause);
} finally {
ReferenceCountUtil.release(msg);
}
}
}
static final class WriteTask implements Runnable {
private ChannelHandlerContext ctx;
private Object msg;
private ChannelPromise promise;
private int size;
private boolean flush;
private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
@Override
protected WriteTask newObject(Handle handle) {
return new WriteTask(handle);
}
};
private static WriteTask newInstance(
ChannelHandlerContext ctx, Object msg, int size, boolean flush, ChannelPromise promise) {
WriteTask task = RECYCLER.get();
task.ctx = ctx;
task.msg = msg;
task.promise = promise;
task.size = size;
task.flush = flush;
return task;
}
private final Recycler.Handle handle;
private WriteTask(Recycler.Handle handle) {
this.handle = handle;
}
@Override
public void run() {
try {
if (size > 0) {
ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.decrementPendingOutboundBytes(size);
}
}
invokeWriteNow(ctx, msg, promise);
if (flush) {
invokeFlushNow(ctx);
}
} finally {
// Set to null so the GC can collect them directly
ctx = null;
msg = null;
promise = null;
RECYCLER.recycle(this, handle);
}
}
}
}

View File

@ -63,8 +63,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
private final Map<String, DefaultChannelHandlerContext> name2ctx =
new HashMap<String, DefaultChannelHandlerContext>(4);
final Map<EventExecutorGroup, EventExecutor> childExecutors =
new IdentityHashMap<EventExecutorGroup, EventExecutor>();
final Map<EventExecutorGroup, ChannelHandlerInvoker> childInvokers =
new IdentityHashMap<EventExecutorGroup, ChannelHandlerInvoker>();
public DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) {
@ -89,14 +89,22 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addFirst(String name, ChannelHandler handler) {
return addFirst(null, name, handler);
return addFirst((ChannelHandlerInvoker) null, name, handler);
}
@Override
public ChannelPipeline addFirst(EventExecutorGroup group, final String name, ChannelHandler handler) {
public ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
return addFirst(findInvoker(group), name, handler);
}
@Override
public ChannelPipeline addFirst(ChannelHandlerInvoker invoker, final String name, ChannelHandler handler) {
synchronized (this) {
checkDuplicateName(name);
DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
DefaultChannelHandlerContext newCtx =
new DefaultChannelHandlerContext(this, invoker, name, handler);
addFirst0(name, newCtx);
}
@ -119,15 +127,22 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addLast(String name, ChannelHandler handler) {
return addLast(null, name, handler);
return addLast((ChannelHandlerInvoker) null, name, handler);
}
@Override
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
public ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
return addLast(findInvoker(group), name, handler);
}
@Override
public ChannelPipeline addLast(ChannelHandlerInvoker invoker, final String name, ChannelHandler handler) {
synchronized (this) {
checkDuplicateName(name);
DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
DefaultChannelHandlerContext newCtx =
new DefaultChannelHandlerContext(this, invoker, name, handler);
addLast0(name, newCtx);
}
@ -150,16 +165,25 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
return addBefore(null, baseName, name, handler);
return addBefore((ChannelHandlerInvoker) null, baseName, name, handler);
}
@Override
public ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
return addBefore(findInvoker(group), baseName, name, handler);
}
@Override
public ChannelPipeline addBefore(
EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) {
ChannelHandlerInvoker invoker, String baseName, final String name, ChannelHandler handler) {
synchronized (this) {
DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
checkDuplicateName(name);
DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
DefaultChannelHandlerContext newCtx =
new DefaultChannelHandlerContext(this, invoker, name, handler);
addBefore0(name, ctx, newCtx);
}
return this;
@ -180,16 +204,24 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
return addAfter(null, baseName, name, handler);
return addAfter((ChannelHandlerInvoker) null, baseName, name, handler);
}
@Override
public ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
return addAfter(findInvoker(group), baseName, name, handler);
}
@Override
public ChannelPipeline addAfter(
EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) {
ChannelHandlerInvoker invoker, String baseName, final String name, ChannelHandler handler) {
synchronized (this) {
DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
checkDuplicateName(name);
DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
DefaultChannelHandlerContext newCtx =
new DefaultChannelHandlerContext(this, invoker, name, handler);
addAfter0(name, ctx, newCtx);
}
@ -213,11 +245,16 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addFirst(ChannelHandler... handlers) {
return addFirst(null, handlers);
return addFirst((ChannelHandlerInvoker) null, handlers);
}
@Override
public ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
public ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers) {
return addFirst(findInvoker(group), handlers);
}
@Override
public ChannelPipeline addFirst(ChannelHandlerInvoker invoker, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
@ -234,7 +271,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
for (int i = size - 1; i >= 0; i --) {
ChannelHandler h = handlers[i];
addFirst(executor, generateName(h), h);
addFirst(invoker, generateName(h), h);
}
return this;
@ -242,11 +279,16 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
return addLast((ChannelHandlerInvoker) null, handlers);
}
@Override
public ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
public ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers) {
return addLast(findInvoker(group), handlers);
}
@Override
public ChannelPipeline addLast(ChannelHandlerInvoker invoker, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
@ -255,12 +297,33 @@ final class DefaultChannelPipeline implements ChannelPipeline {
if (h == null) {
break;
}
addLast(executor, generateName(h), h);
addLast(invoker, generateName(h), h);
}
return this;
}
private ChannelHandlerInvoker findInvoker(EventExecutorGroup group) {
if (group == null) {
return null;
}
// Pin one of the child executors once and remember it so that the same child executor
// is used to fire events for the same channel.
ChannelHandlerInvoker invoker = childInvokers.get(group);
if (invoker == null) {
EventExecutor executor = group.next();
if (executor instanceof EventLoop) {
invoker = ((EventLoop) executor).asInvoker();
} else {
invoker = new DefaultChannelHandlerInvoker(executor);
}
childInvokers.put(group, invoker);
}
return invoker;
}
private String generateName(ChannelHandler handler) {
WeakHashMap<Class<?>, String> cache = nameCaches[(int) (Thread.currentThread().getId() % nameCaches.length)];
Class<?> handlerType = handler.getClass();
@ -396,7 +459,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
}
final DefaultChannelHandlerContext newCtx =
new DefaultChannelHandlerContext(this, ctx.executor, newName, newHandler);
new DefaultChannelHandlerContext(this, ctx.invoker, newName, newHandler);
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
replace0(ctx, newName, newCtx);

View File

@ -13,15 +13,36 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.local;
package io.netty.channel;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
final class LocalEventLoop extends SingleThreadEventLoop {
public class DefaultEventLoop extends SingleThreadEventLoop {
LocalEventLoop(LocalEventLoopGroup parent, Executor executor) {
public DefaultEventLoop() {
this((EventLoopGroup) null);
}
public DefaultEventLoop(ThreadFactory threadFactory) {
this(null, threadFactory);
}
public DefaultEventLoop(Executor executor) {
this(null, executor);
}
public DefaultEventLoop(EventLoopGroup parent) {
this(parent, new DefaultThreadFactory(DefaultEventLoop.class));
}
public DefaultEventLoop(EventLoopGroup parent, ThreadFactory threadFactory) {
super(parent, threadFactory, true);
}
public DefaultEventLoop(EventLoopGroup parent, Executor executor) {
super(parent, executor, true);
}

View File

@ -0,0 +1,56 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
/**
* {@link MultithreadEventLoopGroup} which must be used for the local transport.
*/
public class DefaultEventLoopGroup extends MultithreadEventLoopGroup {
/**
* Create a new instance with the default number of threads.
*/
public DefaultEventLoopGroup() {
this(0);
}
/**
* Create a new instance
*
* @param nThreads the number of threads to use
*/
public DefaultEventLoopGroup(int nThreads) {
this(nThreads, null);
}
/**
* Create a new instance
*
* @param nThreads the number of threads to use
* @param threadFactory the {@link ThreadFactory} or {@code null} to use the default
*/
public DefaultEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
}
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new DefaultEventLoop(this, executor);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012 The Netty Project
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
@ -13,6 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.concurrent.EventExecutor;
@ -27,4 +28,13 @@ import io.netty.util.concurrent.EventExecutor;
public interface EventLoop extends EventExecutor, EventLoopGroup {
@Override
EventLoopGroup parent();
@Override
EventLoop next();
/**
* Creates a new default {@link ChannelHandlerInvoker} implementation that uses this {@link EventLoop} to
* invoke event handler methods.
*/
ChannelHandlerInvoker asInvoker();
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012 The Netty Project
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
@ -13,6 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.concurrent.EventExecutorGroup;
@ -20,12 +21,8 @@ import io.netty.util.concurrent.EventExecutorGroup;
/**
* Special {@link EventExecutorGroup} which allows to register {@link Channel}'s that get
* processed for later selection during the event loop.
*
*/
public interface EventLoopGroup extends EventExecutorGroup {
/**
* Return the next {@link EventLoop} to use
*/
@Override
EventLoop next();
}

View File

@ -16,6 +16,7 @@
package io.netty.channel;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.MultithreadEventExecutorGroup;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
@ -25,7 +26,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
/**
* Abstract base class for {@link EventLoopGroup} implementations that handles their tasks with multiple threads at
* Abstract base class for {@link EventExecutorGroup} implementations that handles their tasks with multiple threads at
* the same time.
*/
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
@ -66,4 +67,7 @@ public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutor
public EventLoop next() {
return (EventLoop) super.next();
}
@Override
protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;
}

View File

@ -22,6 +22,5 @@ import io.netty.channel.socket.ServerSocketChannel;
* them. {@link ServerSocketChannel} is a good example.
*/
public interface ServerChannel extends Channel {
EventLoopGroup childEventLoopGroup();
}

View File

@ -15,28 +15,23 @@
*/
package io.netty.channel;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
/**
* Abstract base class for {@link EventLoop}'s that execute all its submitted tasks in a single thread.
* Abstract base class for {@link EventLoop}s that execute all its submitted tasks in a single thread.
*
*/
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
/**
* @see {@link SingleThreadEventExecutor#SingleThreadEventExecutor(EventExecutorGroup, ThreadFactory, boolean)}
*/
private final ChannelHandlerInvoker invoker = new DefaultChannelHandlerInvoker(this);
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
super(parent, threadFactory, addTaskWakesUp);
}
/**
* @see {@link SingleThreadEventExecutor#SingleThreadEventExecutor(EventExecutorGroup, Executor, boolean)}
*/
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) {
super(parent, executor, addTaskWakesUp);
}
@ -50,4 +45,9 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
public EventLoop next() {
return (EventLoop) super.next();
}
@Override
public ChannelHandlerInvoker asInvoker() {
return invoker;
}
}

View File

@ -16,7 +16,6 @@
package io.netty.channel;
import io.netty.util.concurrent.AbstractEventExecutorGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
@ -26,10 +25,8 @@ import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ThreadPerTaskExecutor;
import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ReadOnlyIterator;
import java.util.Collections;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -42,14 +39,14 @@ import java.util.concurrent.TimeUnit;
/**
* An {@link EventLoopGroup} that creates one {@link EventLoop} per {@link Channel}.
*/
public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup {
public class ThreadPerChannelEventLoopGroup extends AbstractEventLoopGroup {
private final Object[] childArgs;
private final int maxChannels;
final Executor executor;
final Set<ThreadPerChannelEventLoop> activeChildren =
Collections.newSetFromMap(PlatformDependent.<ThreadPerChannelEventLoop, Boolean>newConcurrentHashMap());
final Queue<ThreadPerChannelEventLoop> idleChildren = new ConcurrentLinkedQueue<ThreadPerChannelEventLoop>();
final Set<EventLoop> activeChildren =
Collections.newSetFromMap(PlatformDependent.<EventLoop, Boolean>newConcurrentHashMap());
final Queue<EventLoop> idleChildren = new ConcurrentLinkedQueue<EventLoop>();
private final ChannelException tooManyChannels;
private volatile boolean shuttingDown;
@ -76,9 +73,7 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i
*
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException} on the {@link #register(Channel)} and
* {@link #register(Channel, ChannelPromise)} method.
* Use {@code 0} to use no limit
* {@link ChannelException}. Use {@code 0} to use no limit
*/
protected ThreadPerChannelEventLoopGroup(int maxChannels) {
this(maxChannels, Executors.defaultThreadFactory());
@ -89,9 +84,7 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i
*
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException} on the {@link #register(Channel)} and
* {@link #register(Channel, ChannelPromise)} method.
* Use {@code 0} to use no limit
* {@link ChannelException}. Use {@code 0} to use no limit
* @param threadFactory the {@link ThreadFactory} used to create new {@link Thread} instances that handle the
* registered {@link Channel}s
* @param args arguments which will passed to each {@link #newChild(Object...)} call.
@ -105,9 +98,7 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i
*
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException} on the {@link #register(Channel)} and
* {@link #register(Channel, ChannelPromise)} method.
* Use {@code 0} to use no limit
* {@link ChannelException}. Use {@code 0} to use no limit
* @param executor the {@link Executor} used to create new {@link Thread} instances that handle the
* registered {@link Channel}s
* @param args arguments which will passed to each {@link #newChild(Object...)} call.
@ -135,15 +126,16 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i
}
/**
* Creates a new {@link EventLoop}. The default implementation creates a new {@link ThreadPerChannelEventLoop}.
* Creates a new {@link EventLoop}.
*/
protected ThreadPerChannelEventLoop newChild(@SuppressWarnings("UnusedParameters") Object... args) {
protected EventLoop newChild(@SuppressWarnings("UnusedParameters") Object... args) {
return new ThreadPerChannelEventLoop(this);
}
@Override
public Iterator<EventExecutor> iterator() {
return new ReadOnlyIterator<EventExecutor>(activeChildren.iterator());
@SuppressWarnings("unchecked")
public <E extends EventExecutor> Set<E> children() {
return Collections.unmodifiableSet((Set<E>) activeChildren);
}
@Override
@ -152,7 +144,7 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i
throw new RejectedExecutionException("shutting down");
}
ThreadPerChannelEventLoop loop = idleChildren.poll();
EventLoop loop = idleChildren.poll();
if (loop == null) {
if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
throw tooManyChannels;

View File

@ -235,7 +235,7 @@ public class EmbeddedChannel extends AbstractChannel {
}
/**
* Run all tasks that are pending in the {@link EventLoop} for this {@link Channel}
* Run all tasks that are pending in the {@link io.netty.channel.EventLoop} for this {@link Channel}
*/
public void runPendingTasks() {
try {

View File

@ -15,22 +15,28 @@
*/
package io.netty.channel.embedded;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.AbstractEventLoop;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerInvoker;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.AbstractEventExecutor;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
final class EmbeddedEventLoop extends AbstractEventExecutor implements EventLoop {
import static io.netty.channel.ChannelHandlerInvokerUtil.*;
final class EmbeddedEventLoop extends AbstractEventLoop implements ChannelHandlerInvoker {
private final Queue<Runnable> tasks = new ArrayDeque<Runnable>(2);
protected EmbeddedEventLoop() {
super(null);
}
@Override
public void execute(Runnable command) {
if (command == null) {
@ -82,9 +88,7 @@ final class EmbeddedEventLoop extends AbstractEventExecutor implements EventLoop
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
Thread.sleep(unit.toMillis(timeout));
public boolean awaitTermination(long timeout, TimeUnit unit) {
return false;
}
@ -99,12 +103,94 @@ final class EmbeddedEventLoop extends AbstractEventExecutor implements EventLoop
}
@Override
public EventLoop next() {
public ChannelHandlerInvoker asInvoker() {
return this;
}
@Override
public EventLoopGroup parent() {
public EventExecutor executor() {
return this;
}
@Override
public void invokeChannelRegistered(ChannelHandlerContext ctx) {
invokeChannelRegisteredNow(ctx);
}
@Override
public void invokeChannelActive(ChannelHandlerContext ctx) {
invokeChannelActiveNow(ctx);
}
@Override
public void invokeChannelInactive(ChannelHandlerContext ctx) {
invokeChannelInactiveNow(ctx);
}
@Override
public void invokeExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
invokeExceptionCaughtNow(ctx, cause);
}
@Override
public void invokeUserEventTriggered(ChannelHandlerContext ctx, Object event) {
invokeUserEventTriggeredNow(ctx, event);
}
@Override
public void invokeChannelRead(ChannelHandlerContext ctx, Object msg) {
invokeChannelReadNow(ctx, msg);
}
@Override
public void invokeChannelReadComplete(ChannelHandlerContext ctx) {
invokeChannelReadCompleteNow(ctx);
}
@Override
public void invokeChannelWritabilityChanged(ChannelHandlerContext ctx) {
invokeChannelWritabilityChangedNow(ctx);
}
@Override
public void invokeBind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
invokeBindNow(ctx, localAddress, promise);
}
@Override
public void invokeConnect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
invokeConnectNow(ctx, remoteAddress, localAddress, promise);
}
@Override
public void invokeDisconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
invokeDisconnectNow(ctx, promise);
}
@Override
public void invokeClose(ChannelHandlerContext ctx, ChannelPromise promise) {
invokeCloseNow(ctx, promise);
}
@Override
public void invokeRead(ChannelHandlerContext ctx) {
invokeReadNow(ctx);
}
@Override
public void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
invokeWriteNow(ctx, msg, promise);
}
@Override
public void invokeWriteAndFlush(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
invokeWriteAndFlushNow(ctx, msg, promise);
}
@Override
public void invokeFlush(ChannelHandlerContext ctx) {
invokeFlushNow(ctx);
}
}

View File

@ -15,23 +15,20 @@
*/
package io.netty.channel.local;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.channel.DefaultEventLoopGroup;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
/**
* {@link MultithreadEventLoopGroup} which must be used for the local transport.
* @deprecated Use {@link DefaultEventLoopGroup} instead.
*/
public class LocalEventLoopGroup extends MultithreadEventLoopGroup {
@Deprecated
public class LocalEventLoopGroup extends DefaultEventLoopGroup {
/**
* Create a new instance with the default number of threads.
*/
public LocalEventLoopGroup() {
this(0);
}
public LocalEventLoopGroup() { }
/**
* Create a new instance
@ -39,7 +36,7 @@ public class LocalEventLoopGroup extends MultithreadEventLoopGroup {
* @param nThreads the number of threads to use
*/
public LocalEventLoopGroup(int nThreads) {
this(nThreads, null);
super(nThreads);
}
/**
@ -51,10 +48,4 @@ public class LocalEventLoopGroup extends MultithreadEventLoopGroup {
public LocalEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
}
@Override
protected EventExecutor newChild(
Executor executor, Object... args) throws Exception {
return new LocalEventLoop(this, executor);
}
}

View File

@ -33,11 +33,8 @@ import java.util.List;
*/
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
/**
* @see {@link AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)}
*/
protected AbstractNioMessageChannel(Channel parent, EventLoop eventLoop, SelectableChannel ch,
int readInterestOp) {
protected AbstractNioMessageChannel(
Channel parent, EventLoop eventLoop, SelectableChannel ch, int readInterestOp) {
super(parent, eventLoop, ch, readInterestOp);
}

View File

@ -26,8 +26,8 @@ public abstract class AbstractNioMessageServerChannel extends AbstractNioMessage
private final EventLoopGroup childGroup;
protected AbstractNioMessageServerChannel(Channel parent, EventLoop eventLoop, EventLoopGroup childGroup,
SelectableChannel ch, int readInterestOp) {
protected AbstractNioMessageServerChannel(
Channel parent, EventLoop eventLoop, EventLoopGroup childGroup, SelectableChannel ch, int readInterestOp) {
super(parent, eventLoop, ch, readInterestOp);
this.childGroup = childGroup;
}
@ -36,5 +36,4 @@ public abstract class AbstractNioMessageServerChannel extends AbstractNioMessage
public EventLoopGroup childEventLoopGroup() {
return childGroup;
}
}

View File

@ -43,7 +43,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a
* {@link io.netty.channel.SingleThreadEventLoop} implementation which register the {@link Channel}'s to a
* {@link Selector} and so does the multi-plexing of these in the event loop.
*
*/

View File

@ -16,6 +16,7 @@
package io.netty.channel.nio;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
@ -92,8 +93,7 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup {
}
@Override
protected EventExecutor newChild(
Executor executor, Object... args) throws Exception {
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0]);
}
}

View File

@ -36,9 +36,6 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
private volatile boolean inputShutdown;
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
/**
* @see AbstractOioByteChannel#AbstractOioByteChannel(Channel)
*/
protected AbstractOioByteChannel(Channel parent, EventLoop eventLoop) {
super(parent, eventLoop);
}

View File

@ -41,9 +41,6 @@ public abstract class AbstractOioChannel extends AbstractChannel {
}
};
/**
* @see AbstractChannel#AbstractChannel(Channel)
*/
protected AbstractOioChannel(Channel parent, EventLoop eventLoop) {
super(parent, eventLoop);
}

View File

@ -18,7 +18,6 @@ package io.netty.channel.oio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.util.ArrayList;

View File

@ -33,5 +33,4 @@ public abstract class AbstractOioMessageServerChannel extends AbstractOioMessage
public EventLoopGroup childEventLoopGroup() {
return childGroup;
}
}

View File

@ -18,17 +18,16 @@ package io.netty.channel.oio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ThreadPerChannelEventLoopGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/**
* {@link EventLoopGroup} which is used to handle OIO {@link Channel}'s. Each {@link Channel} will be handled by its
* {@link EventExecutorGroup} which is used to handle OIO {@link Channel}'s. Each {@link Channel} will be handled by its
* own {@link EventLoop} to not block others.
*/
public class OioEventLoopGroup extends ThreadPerChannelEventLoopGroup {
@ -45,9 +44,7 @@ public class OioEventLoopGroup extends ThreadPerChannelEventLoopGroup {
*
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException} on the {@link #register(Channel)} and
* {@link #register(Channel, ChannelPromise)} method.
* Use {@code 0} to use no limit
* {@link ChannelException}. Use {@code 0} to use no limit
*/
public OioEventLoopGroup(int maxChannels) {
this(maxChannels, Executors.defaultThreadFactory());
@ -58,9 +55,7 @@ public class OioEventLoopGroup extends ThreadPerChannelEventLoopGroup {
*
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException} on the {@link #register(Channel)} and
* {@link #register(Channel, ChannelPromise)} method.
* Use {@code 0} to use no limit
* {@link ChannelException}. Use {@code 0} to use no limit
* @param executor the {@link Executor} used to create new {@link Thread} instances that handle the
* registered {@link Channel}s
*/
@ -73,9 +68,7 @@ public class OioEventLoopGroup extends ThreadPerChannelEventLoopGroup {
*
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException} on the {@link #register(Channel)} and
* {@link #register(Channel, ChannelPromise)} method.
* Use {@code 0} to use no limit
* {@link ChannelException}. Use {@code 0} to use no limit
* @param threadFactory the {@link ThreadFactory} used to create new {@link Thread} instances that handle the
* registered {@link Channel}s
*/

View File

@ -39,7 +39,7 @@ import java.util.List;
* NIO selector based implementation to accept new connections.
*/
public class NioServerSocketChannel extends AbstractNioMessageServerChannel
implements io.netty.channel.socket.ServerSocketChannel {
implements io.netty.channel.socket.ServerSocketChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false);

View File

@ -19,10 +19,10 @@ package io.netty.bootstrap;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.util.concurrent.Future;
import org.junit.Test;
@ -33,8 +33,8 @@ public class BootstrapTest {
@Test(timeout = 10000)
public void testBindDeadLock() throws Exception {
EventLoopGroup groupA = new LocalEventLoopGroup(1);
EventLoopGroup groupB = new LocalEventLoopGroup(1);
EventLoopGroup groupA = new DefaultEventLoopGroup(1);
EventLoopGroup groupB = new DefaultEventLoopGroup(1);
try {
ChannelInboundHandler dummyHandler = new DummyHandler();
@ -81,8 +81,8 @@ public class BootstrapTest {
@Test(timeout = 10000)
public void testConnectDeadLock() throws Exception {
EventLoopGroup groupA = new LocalEventLoopGroup(1);
EventLoopGroup groupB = new LocalEventLoopGroup(1);
EventLoopGroup groupA = new DefaultEventLoopGroup(1);
EventLoopGroup groupB = new DefaultEventLoopGroup(1);
try {
ChannelInboundHandler dummyHandler = new DummyHandler();

View File

@ -16,27 +16,27 @@
package io.netty.channel;
import static org.junit.Assert.assertEquals;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.local.LocalServerChannel;
import java.io.UnsupportedEncodingException;
import static org.junit.Assert.*;
class BaseChannelTest {
private final LoggingHandler loggingHandler;
BaseChannelTest() {
this.loggingHandler = new LoggingHandler();
loggingHandler = new LoggingHandler();
}
ServerBootstrap getLocalServerBootstrap() {
EventLoopGroup serverGroup = new LocalEventLoopGroup();
EventLoopGroup serverGroup = new DefaultEventLoopGroup();
ServerBootstrap sb = new ServerBootstrap();
sb.group(serverGroup);
sb.channel(LocalServerChannel.class);
@ -50,12 +50,12 @@ class BaseChannelTest {
}
Bootstrap getLocalClientBootstrap() {
EventLoopGroup clientGroup = new LocalEventLoopGroup();
EventLoopGroup clientGroup = new DefaultEventLoopGroup();
Bootstrap cb = new Bootstrap();
cb.channel(LocalChannel.class);
cb.group(clientGroup);
cb.handler(this.loggingHandler);
cb.handler(loggingHandler);
return cb;
}
@ -79,16 +79,15 @@ class BaseChannelTest {
}
void assertLog(String expected) {
String actual = this.loggingHandler.getLog();
String actual = loggingHandler.getLog();
assertEquals(expected, actual);
}
void clearLog() {
this.loggingHandler.clear();
loggingHandler.clear();
}
void setInterest(LoggingHandler.Event... events) {
this.loggingHandler.setInterest(events);
loggingHandler.setInterest(events);
}
}

View File

@ -21,7 +21,6 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.local.LocalServerChannel;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCountUtil;
@ -30,7 +29,6 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Test;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
@ -44,7 +42,7 @@ import static org.junit.Assert.*;
public class DefaultChannelPipelineTest {
private static final EventLoopGroup group = new LocalEventLoopGroup(1);
private static final EventLoopGroup group = new DefaultEventLoopGroup(1);
private Channel self;
private Channel peer;

View File

@ -32,8 +32,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
public class SingleThreadEventLoopTest {
@ -131,7 +130,7 @@ public class SingleThreadEventLoopTest {
testScheduleTask(loopB);
}
private static void testScheduleTask(EventLoop loopA) throws InterruptedException, ExecutionException {
private static void testScheduleTask(EventExecutor loopA) throws InterruptedException, ExecutionException {
long startTime = System.nanoTime();
final AtomicLong endTime = new AtomicLong();
loopA.schedule(new Runnable() {
@ -153,7 +152,7 @@ public class SingleThreadEventLoopTest {
testScheduleTaskAtFixedRate(loopB);
}
private static void testScheduleTaskAtFixedRate(EventLoop loopA) throws InterruptedException {
private static void testScheduleTaskAtFixedRate(EventExecutor loopA) throws InterruptedException {
final Queue<Long> timestamps = new LinkedBlockingQueue<Long>();
ScheduledFuture<?> f = loopA.scheduleAtFixedRate(new Runnable() {
@Override
@ -193,7 +192,7 @@ public class SingleThreadEventLoopTest {
testScheduleLaggyTaskAtFixedRate(loopB);
}
private static void testScheduleLaggyTaskAtFixedRate(EventLoop loopA) throws InterruptedException {
private static void testScheduleLaggyTaskAtFixedRate(EventExecutor loopA) throws InterruptedException {
final Queue<Long> timestamps = new LinkedBlockingQueue<Long>();
ScheduledFuture<?> f = loopA.scheduleAtFixedRate(new Runnable() {
@Override
@ -243,7 +242,7 @@ public class SingleThreadEventLoopTest {
testScheduleTaskWithFixedDelay(loopB);
}
private static void testScheduleTaskWithFixedDelay(EventLoop loopA) throws InterruptedException {
private static void testScheduleTaskWithFixedDelay(EventExecutor loopA) throws InterruptedException {
final Queue<Long> timestamps = new LinkedBlockingQueue<Long>();
ScheduledFuture<?> f = loopA.scheduleWithFixedDelay(new Runnable() {
@Override

View File

@ -22,6 +22,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -42,8 +43,8 @@ public class LocalChannelTest {
@Test
public void testLocalAddressReuse() throws Exception {
for (int i = 0; i < 2; i ++) {
EventLoopGroup clientGroup = new LocalEventLoopGroup();
EventLoopGroup serverGroup = new LocalEventLoopGroup();
EventLoopGroup clientGroup = new DefaultEventLoopGroup();
EventLoopGroup serverGroup = new DefaultEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
ServerBootstrap sb = new ServerBootstrap();
@ -96,8 +97,8 @@ public class LocalChannelTest {
@Test
public void testWriteFailsFastOnClosedChannel() throws Exception {
EventLoopGroup clientGroup = new LocalEventLoopGroup();
EventLoopGroup serverGroup = new LocalEventLoopGroup();
EventLoopGroup clientGroup = new DefaultEventLoopGroup();
EventLoopGroup serverGroup = new DefaultEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
ServerBootstrap sb = new ServerBootstrap();

View File

@ -24,11 +24,10 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.EventExecutorGroup;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@ -49,7 +48,7 @@ public class LocalTransportThreadModelTest {
@BeforeClass
public static void init() {
// Configure a test server
group = new LocalEventLoopGroup();
group = new DefaultEventLoopGroup();
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
@ -84,9 +83,9 @@ public class LocalTransportThreadModelTest {
@Test(timeout = 5000)
public void testStagedExecution() throws Throwable {
EventLoopGroup l = new LocalEventLoopGroup(4, new DefaultThreadFactory("l"));
EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1"));
EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2"));
EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l"));
EventLoopGroup e1 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e1"));
EventLoopGroup e2 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e2"));
ThreadNameAuditor h1 = new ThreadNameAuditor();
ThreadNameAuditor h2 = new ThreadNameAuditor();
ThreadNameAuditor h3 = new ThreadNameAuditor();
@ -229,12 +228,12 @@ public class LocalTransportThreadModelTest {
@Test(timeout = 30000)
@Ignore
public void testConcurrentMessageBufferAccess() throws Throwable {
EventLoopGroup l = new LocalEventLoopGroup(4, new DefaultThreadFactory("l"));
EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1"));
EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2"));
EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e3"));
EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e4"));
EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e5"));
EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l"));
EventLoopGroup e1 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e1"));
EventLoopGroup e2 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e2"));
EventLoopGroup e3 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e3"));
EventLoopGroup e4 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e4"));
EventLoopGroup e5 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e5"));
try {
final MessageForwarder1 h1 = new MessageForwarder1();

View File

@ -22,6 +22,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.util.ReferenceCountUtil;
import org.junit.Test;
@ -40,14 +41,14 @@ public class LocalTransportThreadModelTest2 {
ServerBootstrap serverBootstrap = new ServerBootstrap();
LocalHander serverHandler = new LocalHander("SERVER");
serverBootstrap
.group(new LocalEventLoopGroup(), new LocalEventLoopGroup())
.group(new DefaultEventLoopGroup(), new DefaultEventLoopGroup())
.channel(LocalServerChannel.class)
.childHandler(serverHandler);
Bootstrap clientBootstrap = new Bootstrap();
LocalHander clientHandler = new LocalHander("CLIENT");
clientBootstrap
.group(new LocalEventLoopGroup())
.group(new DefaultEventLoopGroup())
.channel(LocalChannel.class)
.remoteAddress(new LocalAddress(LOCAL_CHANNEL)).handler(clientHandler);

View File

@ -23,11 +23,15 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.EventExecutorGroup;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import java.util.Deque;
import java.util.LinkedList;
@ -36,12 +40,6 @@ import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
public class LocalTransportThreadModelTest3 {
enum EventType {
@ -62,7 +60,7 @@ public class LocalTransportThreadModelTest3 {
@BeforeClass
public static void init() {
// Configure a test server
group = new LocalEventLoopGroup();
group = new DefaultEventLoopGroup();
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
@ -116,14 +114,14 @@ public class LocalTransportThreadModelTest3 {
}
private static void testConcurrentAddRemove(boolean inbound) throws Exception {
EventLoopGroup l = new LocalEventLoopGroup(4, new DefaultThreadFactory("l"));
EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1"));
EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2"));
EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e3"));
EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e4"));
EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e5"));
EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l"));
EventLoopGroup e1 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e1"));
EventLoopGroup e2 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e2"));
EventLoopGroup e3 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e3"));
EventLoopGroup e4 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e4"));
EventLoopGroup e5 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e5"));
final EventExecutorGroup[] groups = {e1, e2, e3, e4, e5};
final EventLoopGroup[] groups = { e1, e2, e3, e4, e5 };
try {
Deque<EventType> events = new ConcurrentLinkedDeque<EventType>();
final EventForwarder h1 = new EventForwarder();