Strict thread model / Allow assign an executor to a handler

- Add EventExecutor and make EventLoop extend it
- Add SingleThreadEventExecutor and MultithreadEventExecutor
- Add EventExecutor's default implementation
- Fixed an API design problem where there is no way to get non-bypass
  buffer of desired type
This commit is contained in:
Trustin Lee 2012-06-01 17:51:19 -07:00
parent 754cd99843
commit 141a05c831
44 changed files with 1994 additions and 1335 deletions

View File

@ -169,7 +169,7 @@ public abstract class AbstractSocketSpdyEchoTest {
protected abstract ServerBootstrap newServerBootstrap();
protected abstract Bootstrap newClientBootstrap();
@Test
@Test(timeout = 10000)
public void testSpdyEcho() throws Throwable {
for (int version = SPDY_MIN_VERSION; version <= SPDY_MAX_VERSION; version ++) {
sb = newServerBootstrap();

View File

@ -100,7 +100,7 @@ public abstract class StreamToMessageDecoder<O> extends ChannelInboundHandlerAda
* inbound buffer.
*/
public void replace(String newHandlerName, ChannelInboundHandler<Byte> newHandler) {
if (!ctx.eventLoop().inEventLoop()) {
if (!ctx.executor().inEventLoop()) {
throw new IllegalStateException("not in event loop");
}

View File

@ -17,7 +17,6 @@ package io.netty.handler.codec.embedder;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.CodecException;
@ -57,11 +56,10 @@ public class DecoderEmbedder<E> extends AbstractCodecEmbedder<E> {
@Override
public boolean offer(Object input) {
ChannelBufferHolder<Object> in = pipeline().inbound();
if (in.hasByteBuffer()) {
in.byteBuffer().writeBytes((ChannelBuffer) input);
if (input instanceof ChannelBuffer) {
pipeline().inboundByteBuffer().writeBytes((ChannelBuffer) input);
} else {
in.messageBuffer().add(input);
pipeline().inboundMessageBuffer().add(input);
}
pipeline().fireInboundBufferUpdated();

View File

@ -2,6 +2,7 @@ package io.netty.handler.codec.embedder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventExecutor;
import io.netty.channel.EventLoop;
import java.util.Collections;
@ -12,7 +13,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
class EmbeddedEventLoop extends AbstractExecutorService implements
EventLoop {
EventLoop, EventExecutor.Unsafe {
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay,
@ -85,4 +86,19 @@ class EmbeddedEventLoop extends AbstractExecutorService implements
public boolean inEventLoop() {
return true;
}
@Override
public EventLoop parent() {
return null;
}
@Override
public Unsafe unsafe() {
return this;
}
@Override
public EventExecutor nextChild() {
return this;
}
}

View File

@ -21,7 +21,7 @@ public class HexDumpProxyBackendHandler extends ChannelInboundStreamHandlerAdapt
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<Byte> ctx) throws Exception {
ChannelBuffer in = ctx.inbound().byteBuffer();
ChannelBuffer out = inboundChannel.outbound().byteBuffer();
ChannelBuffer out = inboundChannel.outboundByteBuffer();
out.discardReadBytes();
out.writeBytes(in);
in.clear();

View File

@ -46,7 +46,7 @@ public class HexDumpProxyFrontendHandler extends ChannelInboundStreamHandlerAdap
// Start the connection attempt.
Bootstrap b = new Bootstrap();
b.eventLoop(ctx.eventLoop())
b.eventLoop(inboundChannel.eventLoop())
.channel(new NioSocketChannel())
.remoteAddress(remoteHost, remotePort)
.initializer(new ChannelInitializer<SocketChannel>() {
@ -75,7 +75,7 @@ public class HexDumpProxyFrontendHandler extends ChannelInboundStreamHandlerAdap
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<Byte> ctx) throws Exception {
ChannelBuffer in = ctx.inbound().byteBuffer();
ChannelBuffer out = outboundChannel.outbound().byteBuffer();
ChannelBuffer out = outboundChannel.outboundByteBuffer();
out.discardReadBytes();
out.writeBytes(in);
in.clear();

View File

@ -79,7 +79,7 @@ public class UptimeClientHandler extends ChannelInboundStreamHandlerAdapter {
throws Exception {
println("Sleeping for: " + UptimeClient.RECONNECT_DELAY + "s");
final EventLoop loop = ctx.eventLoop();
final EventLoop loop = ctx.channel().eventLoop();
loop.schedule(new Runnable() {
@Override
public void run() {

View File

@ -187,7 +187,7 @@ public class BlockingReadHandler<E> extends ChannelInboundHandlerAdapter<Object>
}
private void detectDeadLock() {
if (ctx.eventLoop().inEventLoop()) {
if (ctx.executor().inEventLoop()) {
throw new BlockingOperationException();
}
}

View File

@ -21,8 +21,10 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.DefaultChannelFuture;
import io.netty.handler.codec.StreamToStreamCodec;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.NonReentrantLock;
@ -139,9 +141,7 @@ import javax.net.ssl.SSLException;
* @apiviz.landmark
* @apiviz.uses io.netty.handler.ssl.SslBufferPool
*/
public class SslHandler extends FrameDecoder
implements ChannelDownstreamHandler,
LifeCycleAwareChannelHandler {
public class SslHandler extends StreamToStreamCodec {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SslHandler.class);
@ -421,38 +421,21 @@ public class SslHandler extends FrameDecoder
}
@Override
public void handleDownstream(
final ChannelHandlerContext context, final ChannelEvent evt) throws Exception {
if (evt instanceof ChannelStateEvent) {
ChannelStateEvent e = (ChannelStateEvent) evt;
switch (e.getState()) {
case OPEN:
case CONNECTED:
case BOUND:
if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
closeOutboundAndChannel(context, e);
return;
}
}
}
if (!(evt instanceof MessageEvent)) {
context.sendDownstream(evt);
return;
}
public void disconnect(ChannelOutboundHandlerContext<Byte> ctx,
ChannelFuture future) throws Exception {
closeOutboundAndChannel(ctx, e);
super.disconnect(ctx, future);
}
MessageEvent e = (MessageEvent) evt;
if (!(e.getMessage() instanceof ChannelBuffer)) {
context.sendDownstream(evt);
return;
}
// Do not encrypt the first write request if this handler is
// created with startTLS flag turned on.
if (startTls && sentFirstMessage.compareAndSet(false, true)) {
context.sendDownstream(evt);
return;
}
@Override
public void close(ChannelOutboundHandlerContext<Byte> ctx,
ChannelFuture future) throws Exception {
closeOutboundAndChannel(ctx, e);
super.close(ctx, future);
}
@Override
public void encode(ChannelOutboundHandlerContext<Byte> ctx, ChannelBuffer in, ChannelBuffer out) throws Exception {
// Otherwise, all messages are encrypted.
ChannelBuffer msg = (ChannelBuffer) e.getMessage();
PendingWrite pendingWrite;

View File

@ -158,10 +158,10 @@ public class ChunkedWriteHandler extends ChannelHandlerAdapter<Object, Object> {
}
if (fireExceptionCaught) {
if (ctx.eventLoop().inEventLoop()) {
if (ctx.executor().inEventLoop()) {
ctx.fireExceptionCaught(cause);
} else {
ctx.eventLoop().execute(new Runnable() {
ctx.executor().execute(new Runnable() {
@Override
public void run() {
ctx.fireExceptionCaught(cause);
@ -212,10 +212,10 @@ public class ChunkedWriteHandler extends ChannelHandlerAdapter<Object, Object> {
} catch (final Throwable t) {
this.currentEvent = null;
if (ctx.eventLoop().inEventLoop()) {
if (ctx.executor().inEventLoop()) {
ctx.fireExceptionCaught(t);
} else {
ctx.eventLoop().execute(new Runnable() {
ctx.executor().execute(new Runnable() {
@Override
public void run() {
ctx.fireExceptionCaught(t);

View File

@ -27,7 +27,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelOutboundHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.EventExecutor;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
@ -274,7 +274,7 @@ public class IdleStateHandler extends ChannelHandlerAdapter<Object, Object> {
return;
}
EventLoop loop = ctx.eventLoop();
EventExecutor loop = ctx.executor();
lastReadTime = lastWriteTime = System.currentTimeMillis();
if (readerIdleTimeMillis > 0) {
@ -335,7 +335,7 @@ public class IdleStateHandler extends ChannelHandlerAdapter<Object, Object> {
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout =
ctx.eventLoop().schedule(this, readerIdleTimeMillis, TimeUnit.MILLISECONDS);
ctx.executor().schedule(this, readerIdleTimeMillis, TimeUnit.MILLISECONDS);
try {
channelIdle(ctx, new IdleStateEvent(
IdleState.READER_IDLE, readerIdleCount ++, currentTime - lastReadTime));
@ -344,7 +344,7 @@ public class IdleStateHandler extends ChannelHandlerAdapter<Object, Object> {
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = ctx.eventLoop().schedule(this, nextDelay, TimeUnit.MILLISECONDS);
readerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.MILLISECONDS);
}
}
@ -369,7 +369,7 @@ public class IdleStateHandler extends ChannelHandlerAdapter<Object, Object> {
long nextDelay = writerIdleTimeMillis - (currentTime - lastWriteTime);
if (nextDelay <= 0) {
// Writer is idle - set a new timeout and notify the callback.
writerIdleTimeout = ctx.eventLoop().schedule(
writerIdleTimeout = ctx.executor().schedule(
this, writerIdleTimeMillis, TimeUnit.MILLISECONDS);
try {
channelIdle(ctx, new IdleStateEvent(
@ -379,7 +379,7 @@ public class IdleStateHandler extends ChannelHandlerAdapter<Object, Object> {
}
} else {
// Write occurred before the timeout - set a new timeout with shorter delay.
writerIdleTimeout = ctx.eventLoop().schedule(this, nextDelay, TimeUnit.MILLISECONDS);
writerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.MILLISECONDS);
}
}
}
@ -404,7 +404,7 @@ public class IdleStateHandler extends ChannelHandlerAdapter<Object, Object> {
if (nextDelay <= 0) {
// Both reader and writer are idle - set a new timeout and
// notify the callback.
allIdleTimeout = ctx.eventLoop().schedule(
allIdleTimeout = ctx.executor().schedule(
this, allIdleTimeMillis, TimeUnit.MILLISECONDS);
try {
channelIdle(ctx, new IdleStateEvent(
@ -415,7 +415,7 @@ public class IdleStateHandler extends ChannelHandlerAdapter<Object, Object> {
} else {
// Either read or write occurred before the timeout - set a new
// timeout with shorter delay.
allIdleTimeout = ctx.eventLoop().schedule(this, nextDelay, TimeUnit.MILLISECONDS);
allIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.MILLISECONDS);
}
}
}

View File

@ -23,7 +23,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
@ -164,11 +163,9 @@ public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter<Object> {
return;
}
EventLoop loop = ctx.eventLoop();
lastReadTime = System.currentTimeMillis();
if (timeoutMillis > 0) {
timeout = loop.schedule(
timeout = ctx.executor().schedule(
new ReadTimeoutTask(ctx),
timeoutMillis, TimeUnit.MILLISECONDS);
}
@ -209,7 +206,7 @@ public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter<Object> {
long nextDelay = timeoutMillis - (currentTime - lastReadTime);
if (nextDelay <= 0) {
// Read timed out - set a new timeout and notify the callback.
timeout = ctx.eventLoop().schedule(this, timeoutMillis, TimeUnit.MILLISECONDS);
timeout = ctx.executor().schedule(this, timeoutMillis, TimeUnit.MILLISECONDS);
try {
readTimedOut(ctx);
} catch (Throwable t) {
@ -217,7 +214,7 @@ public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter<Object> {
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
timeout = ctx.eventLoop().schedule(this, nextDelay, TimeUnit.MILLISECONDS);
timeout = ctx.executor().schedule(this, nextDelay, TimeUnit.MILLISECONDS);
}
}
}

View File

@ -113,7 +113,7 @@ public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter<Object> {
public void flush(final ChannelOutboundHandlerContext<Object> ctx, final ChannelFuture future) throws Exception {
if (timeoutMillis > 0) {
// Schedule a timeout.
final ScheduledFuture<?> sf = ctx.eventLoop().schedule(new Runnable() {
final ScheduledFuture<?> sf = ctx.executor().schedule(new Runnable() {
@Override
public void run() {
if (future.setFailure(WriteTimeoutException.INSTANCE)) {

View File

@ -15,6 +15,7 @@
*/
package io.netty.channel;
import io.netty.buffer.ChannelBuffer;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.DefaultAttributeMap;
@ -24,6 +25,7 @@ import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -266,8 +268,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
@Override
public ChannelBufferHolder<Object> outbound() {
return pipeline().outbound();
public ChannelBuffer outboundByteBuffer() {
return pipeline().outboundByteBuffer();
}
@Override
public Queue<Object> outboundMessageBuffer() {
return pipeline().outboundMessageBuffer();
}
@Override

View File

@ -15,7 +15,10 @@
*/
package io.netty.channel;
import io.netty.buffer.ChannelBuffer;
import java.net.SocketAddress;
import java.util.Queue;
/**
* A skeletal server-side {@link Channel} implementation. A server-side
@ -37,8 +40,13 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
}
@Override
public ChannelBufferHolder<Object> outbound() {
return ChannelBufferHolders.discardBuffer();
public ChannelBuffer outboundByteBuffer() {
throw new NoSuchBufferException();
}
@Override
public Queue<Object> outboundMessageBuffer() {
throw new NoSuchBufferException();
}
@Override

View File

@ -15,6 +15,7 @@
*/
package io.netty.channel;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
@ -23,6 +24,7 @@ import io.netty.util.AttributeMap;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import java.util.Queue;
/**
@ -136,7 +138,8 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFu
boolean isRegistered();
boolean isActive();
ChannelBufferHolder<Object> outbound();
ChannelBuffer outboundByteBuffer();
Queue<Object> outboundMessageBuffer();
/**
* Returns the local address where this channel is bound to. The returned

View File

@ -129,7 +129,7 @@ public interface ChannelHandlerContext
ChannelInboundInvoker, ChannelOutboundInvoker {
Channel channel();
ChannelPipeline pipeline();
EventLoop eventLoop();
EventExecutor executor();
String name();
ChannelHandler handler();

View File

@ -23,6 +23,7 @@ import java.nio.channels.Channels;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
/**
@ -205,8 +206,10 @@ import java.util.NoSuchElementException;
*/
public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundInvoker {
ChannelBufferHolder<Object> inbound();
ChannelBufferHolder<Object> outbound();
Queue<Object> inboundMessageBuffer();
ChannelBuffer inboundByteBuffer();
Queue<Object> outboundMessageBuffer();
ChannelBuffer outboundByteBuffer();
/**
* Inserts a {@link ChannelHandler} at the first position of this pipeline.
@ -221,6 +224,19 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI
*/
ChannelPipeline addFirst(String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} at the first position of this pipeline.
*
* @param name the name of the handler to insert first
* @param handler the handler to insert first
*
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified name or handler is {@code null}
*/
ChannelPipeline addFirst(EventExecutor executor, String name, ChannelHandler handler);
/**
* Appends a {@link ChannelHandler} at the last position of this pipeline.
*
@ -234,6 +250,19 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI
*/
ChannelPipeline addLast(String name, ChannelHandler handler);
/**
* Appends a {@link ChannelHandler} at the last position of this pipeline.
*
* @param name the name of the handler to append
* @param handler the handler to append
*
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified name or handler is {@code null}
*/
ChannelPipeline addLast(EventExecutor executor, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} before an existing handler of this
* pipeline.
@ -251,6 +280,23 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI
*/
ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} before an existing handler of this
* pipeline.
*
* @param baseName the name of the existing handler
* @param name the name of the handler to insert before
* @param handler the handler to insert before
*
* @throws NoSuchElementException
* if there's no such entry with the specified {@code baseName}
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified baseName, name, or handler is {@code null}
*/
ChannelPipeline addBefore(EventExecutor executor, String baseName, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} after an existing handler of this
* pipeline.
@ -268,9 +314,31 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI
*/
ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} after an existing handler of this
* pipeline.
*
* @param baseName the name of the existing handler
* @param name the name of the handler to insert after
* @param handler the handler to insert after
*
* @throws NoSuchElementException
* if there's no such entry with the specified {@code baseName}
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified baseName, name, or handler is {@code null}
*/
ChannelPipeline addAfter(EventExecutor executor, String baseName, String name, ChannelHandler handler);
ChannelPipeline addFirst(ChannelHandler... handlers);
ChannelPipeline addFirst(EventExecutor executor, ChannelHandler... handlers);
ChannelPipeline addLast(ChannelHandler... handlers);
ChannelPipeline addLast(EventExecutor executor, ChannelHandler... handlers);
/**
* Removes the specified {@link ChannelHandler} from this pipeline.
*

View File

@ -0,0 +1,375 @@
package io.netty.channel;
import io.netty.buffer.ChannelBuffer;
import io.netty.util.DefaultAttributeMap;
import java.net.SocketAddress;
import java.util.Queue;
final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelInboundHandlerContext<Object>, ChannelOutboundHandlerContext<Object> {
volatile DefaultChannelHandlerContext next;
volatile DefaultChannelHandlerContext prev;
private final DefaultChannelPipeline pipeline;
final EventExecutor executor;
private final String name;
private final ChannelHandler handler;
private final boolean canHandleInbound;
private final boolean canHandleOutbound;
final ChannelBufferHolder<Object> in;
private final ChannelBufferHolder<Object> out;
// Runnables that calls handlers
final Runnable fireChannelRegisteredTask = new Runnable() {
@Override
@SuppressWarnings("unchecked")
public void run() {
DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this;
try {
((ChannelInboundHandler<Object>) ctx.handler()).channelRegistered(ctx);
} catch (Throwable t) {
pipeline.notifyHandlerException(t);
}
}
};
final Runnable fireChannelUnregisteredTask = new Runnable() {
@Override
@SuppressWarnings("unchecked")
public void run() {
DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this;
try {
((ChannelInboundHandler<Object>) ctx.handler()).channelUnregistered(ctx);
} catch (Throwable t) {
pipeline.notifyHandlerException(t);
}
}
};
final Runnable fireChannelActiveTask = new Runnable() {
@Override
@SuppressWarnings("unchecked")
public void run() {
DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this;
try {
((ChannelInboundHandler<Object>) ctx.handler()).channelActive(ctx);
} catch (Throwable t) {
pipeline.notifyHandlerException(t);
}
}
};
final Runnable fireChannelInactiveTask = new Runnable() {
@Override
@SuppressWarnings("unchecked")
public void run() {
DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this;
try {
((ChannelInboundHandler<Object>) ctx.handler()).channelInactive(ctx);
} catch (Throwable t) {
pipeline.notifyHandlerException(t);
}
}
};
final Runnable fireInboundBufferUpdatedTask = new Runnable() {
@Override
@SuppressWarnings("unchecked")
public void run() {
DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this;
try {
((ChannelInboundHandler<Object>) ctx.handler()).inboundBufferUpdated(ctx);
} catch (Throwable t) {
pipeline.notifyHandlerException(t);
} finally {
ChannelBufferHolder<Object> inbound = ctx.inbound();
if (!inbound.isBypass() && inbound.isEmpty() && inbound.hasByteBuffer()) {
inbound.byteBuffer().discardReadBytes();
}
}
}
};
@SuppressWarnings("unchecked")
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor,
DefaultChannelHandlerContext prev, DefaultChannelHandlerContext next,
String name, ChannelHandler handler) {
if (name == null) {
throw new NullPointerException("name");
}
if (handler == null) {
throw new NullPointerException("handler");
}
canHandleInbound = handler instanceof ChannelInboundHandler;
canHandleOutbound = handler instanceof ChannelOutboundHandler;
if (!canHandleInbound && !canHandleOutbound) {
throw new IllegalArgumentException(
"handler must be either " +
ChannelInboundHandler.class.getName() + " or " +
ChannelOutboundHandler.class.getName() + '.');
}
this.prev = prev;
this.next = next;
this.pipeline = pipeline;
this.name = name;
this.handler = handler;
if (executor != null) {
// Pin one of the child executors once and remember it so that the same child executor
// is used to fire events for the same channel.
EventExecutor childExecutor = pipeline.childExecutors.get(executor);
if (childExecutor == null) {
childExecutor = executor.unsafe().nextChild();
pipeline.childExecutors.put(executor, childExecutor);
}
this.executor = childExecutor;
} else {
this.executor = null;
}
if (canHandleInbound) {
try {
in = ((ChannelInboundHandler<Object>) handler).newInboundBuffer(this);
} catch (Exception e) {
throw new ChannelPipelineException("A user handler failed to create a new inbound buffer.", e);
}
} else {
in = null;
}
if (canHandleOutbound) {
try {
out = ((ChannelOutboundHandler<Object>) handler).newOutboundBuffer(this);
} catch (Exception e) {
throw new ChannelPipelineException("A user handler failed to create a new outbound buffer.", e);
} finally {
if (in != null) {
// TODO Release the inbound buffer once pooling is implemented.
}
}
} else {
out = null;
}
}
@Override
public Channel channel() {
return pipeline.channel;
}
@Override
public ChannelPipeline pipeline() {
return pipeline;
}
@Override
public EventExecutor executor() {
if (executor == null) {
return channel().eventLoop();
} else {
return executor;
}
}
@Override
public ChannelHandler handler() {
return handler;
}
@Override
public String name() {
return name;
}
@Override
public boolean canHandleInbound() {
return canHandleInbound;
}
@Override
public boolean canHandleOutbound() {
return canHandleOutbound;
}
@Override
public ChannelBufferHolder<Object> inbound() {
return in;
}
@Override
public ChannelBufferHolder<Object> outbound() {
return out;
}
@Override
public ChannelBuffer nextInboundByteBuffer() {
return DefaultChannelPipeline.nextInboundByteBuffer(next);
}
@Override
public Queue<Object> nextInboundMessageBuffer() {
return DefaultChannelPipeline.nextInboundMessageBuffer(next);
}
@Override
public ChannelBuffer nextOutboundByteBuffer() {
return pipeline.nextOutboundByteBuffer(prev);
}
@Override
public Queue<Object> nextOutboundMessageBuffer() {
return pipeline.nextOutboundMessageBuffer(prev);
}
@Override
public void fireChannelRegistered() {
DefaultChannelHandlerContext next = DefaultChannelPipeline.nextInboundContext(this.next);
if (next != null) {
DefaultChannelPipeline.fireChannelRegistered(next);
}
}
@Override
public void fireChannelUnregistered() {
DefaultChannelHandlerContext next = DefaultChannelPipeline.nextInboundContext(this.next);
if (next != null) {
DefaultChannelPipeline.fireChannelUnregistered(next);
}
}
@Override
public void fireChannelActive() {
DefaultChannelHandlerContext next = DefaultChannelPipeline.nextInboundContext(this.next);
if (next != null) {
DefaultChannelPipeline.fireChannelActive(next);
}
}
@Override
public void fireChannelInactive() {
DefaultChannelHandlerContext next = DefaultChannelPipeline.nextInboundContext(this.next);
if (next != null) {
DefaultChannelPipeline.fireChannelInactive(next);
}
}
@Override
public void fireExceptionCaught(Throwable cause) {
DefaultChannelHandlerContext next = DefaultChannelPipeline.nextInboundContext(this.next);
if (next != null) {
pipeline.fireExceptionCaught(next, cause);
} else {
DefaultChannelPipeline.logTerminalException(cause);
}
}
@Override
public void fireUserEventTriggered(Object event) {
DefaultChannelHandlerContext next = DefaultChannelPipeline.nextInboundContext(this.next);
if (next != null) {
pipeline.fireUserEventTriggered(next, event);
}
}
@Override
public void fireInboundBufferUpdated() {
DefaultChannelHandlerContext next = DefaultChannelPipeline.nextInboundContext(this.next);
if (next != null) {
DefaultChannelPipeline.fireInboundBufferUpdated(next);
}
}
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return bind(localAddress, newFuture());
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
return connect(remoteAddress, newFuture());
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return connect(remoteAddress, localAddress, newFuture());
}
@Override
public ChannelFuture disconnect() {
return disconnect(newFuture());
}
@Override
public ChannelFuture close() {
return close(newFuture());
}
@Override
public ChannelFuture deregister() {
return deregister(newFuture());
}
@Override
public ChannelFuture flush() {
return flush(newFuture());
}
@Override
public ChannelFuture write(Object message) {
return write(message, newFuture());
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelFuture future) {
return pipeline.bind(DefaultChannelPipeline.nextOutboundContext(prev), localAddress, future);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelFuture future) {
return connect(remoteAddress, null, future);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) {
return pipeline.connect(DefaultChannelPipeline.nextOutboundContext(prev), remoteAddress, localAddress, future);
}
@Override
public ChannelFuture disconnect(ChannelFuture future) {
return pipeline.disconnect(DefaultChannelPipeline.nextOutboundContext(prev), future);
}
@Override
public ChannelFuture close(ChannelFuture future) {
return pipeline.close(DefaultChannelPipeline.nextOutboundContext(prev), future);
}
@Override
public ChannelFuture deregister(ChannelFuture future) {
return pipeline.deregister(DefaultChannelPipeline.nextOutboundContext(prev), future);
}
@Override
public ChannelFuture flush(ChannelFuture future) {
return pipeline.flush(DefaultChannelPipeline.nextOutboundContext(prev), future);
}
@Override
public ChannelFuture write(Object message, ChannelFuture future) {
return pipeline.write(DefaultChannelPipeline.nextOutboundContext(prev), message, future);
}
@Override
public ChannelFuture newFuture() {
return channel().newFuture();
}
@Override
public ChannelFuture newSucceededFuture() {
return channel().newSucceededFuture();
}
@Override
public ChannelFuture newFailedFuture(Throwable cause) {
return channel().newFailedFuture(cause);
}
}

View File

@ -0,0 +1,34 @@
package io.netty.channel;
import java.util.concurrent.ThreadFactory;
class DefaultChildEventExecutor extends SingleThreadEventExecutor {
DefaultChildEventExecutor(EventExecutor parent, ThreadFactory threadFactory) {
super(parent, threadFactory);
}
@Override
protected void run() {
for (;;) {
Runnable task;
try {
task = takeTask();
task.run();
} catch (InterruptedException e) {
// Waken up by interruptThread()
}
if (isShutdown() && peekTask() == null) {
break;
}
}
}
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop) {
interruptThread();
}
}
}

View File

@ -0,0 +1,19 @@
package io.netty.channel;
import java.util.concurrent.ThreadFactory;
public class DefaultEventExecutor extends MultithreadEventExecutor {
public DefaultEventExecutor(int nThreads) {
super(nThreads);
}
public DefaultEventExecutor(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
}
@Override
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
return new DefaultChildEventExecutor(this, threadFactory);
}
}

View File

@ -0,0 +1,13 @@
package io.netty.channel;
import java.util.concurrent.ScheduledExecutorService;
public interface EventExecutor extends ScheduledExecutorService {
EventExecutor parent();
boolean inEventLoop();
Unsafe unsafe();
public interface Unsafe {
EventExecutor nextChild();
}
}

View File

@ -1,9 +1,8 @@
package io.netty.channel;
import java.util.concurrent.ScheduledExecutorService;
public interface EventLoop extends ScheduledExecutorService {
public interface EventLoop extends EventExecutor {
@Override
EventLoop parent();
ChannelFuture register(Channel channel);
ChannelFuture register(Channel channel, ChannelFuture future);
boolean inEventLoop();
}

View File

@ -1,7 +0,0 @@
package io.netty.channel;
import java.util.concurrent.ThreadFactory;
public interface EventLoopFactory<T extends EventLoop> {
T newEventLoop(ThreadFactory threadFactory) throws Exception;
}

View File

@ -0,0 +1,209 @@
package io.netty.channel;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
public abstract class MultithreadEventExecutor implements EventExecutor {
protected static final int DEFAULT_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
protected static final ThreadFactory DEFAULT_THREAD_FACTORY = Executors.defaultThreadFactory();
private final EventExecutor[] children;
private final AtomicInteger childIndex = new AtomicInteger();
private final Unsafe unsafe = new Unsafe() {
@Override
public EventExecutor nextChild() {
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}
};
protected MultithreadEventExecutor(Object... args) {
this(DEFAULT_POOL_SIZE, args);
}
protected MultithreadEventExecutor(int nThreads, Object... args) {
this(nThreads, DEFAULT_THREAD_FACTORY, args);
}
protected MultithreadEventExecutor(int nThreads, ThreadFactory threadFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format(
"nThreads: %d (expected: > 0)", nThreads));
}
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
children = new SingleThreadEventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(threadFactory, args);
success = true;
} catch (Exception e) {
throw new EventLoopException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdown();
}
}
}
}
}
protected abstract EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception;
@Override
public EventExecutor parent() {
return null;
}
@Override
public Unsafe unsafe() {
return unsafe;
}
@Override
public void shutdown() {
for (EventExecutor l: children) {
l.shutdown();
}
}
@Override
public List<Runnable> shutdownNow() {
for (EventExecutor l: children) {
l.shutdownNow();
}
return Collections.emptyList();
}
@Override
public boolean isShutdown() {
for (EventExecutor l: children) {
if (!l.isShutdown()) {
return false;
}
}
return true;
}
@Override
public boolean isTerminated() {
for (EventExecutor l: children) {
if (!l.isTerminated()) {
return false;
}
}
return true;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long deadline = System.nanoTime() + unit.toNanos(timeout);
loop: for (EventExecutor l: children) {
for (;;) {
long timeLeft = deadline - System.nanoTime();
if (timeLeft <= 0) {
break loop;
}
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
}
return isTerminated();
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return currentEventLoop().submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return currentEventLoop().submit(task, result);
}
@Override
public Future<?> submit(Runnable task) {
return currentEventLoop().submit(task);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return currentEventLoop().invokeAll(tasks);
}
@Override
public <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return currentEventLoop().invokeAll(tasks, timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return currentEventLoop().invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
return currentEventLoop().invokeAny(tasks, timeout, unit);
}
@Override
public void execute(Runnable command) {
currentEventLoop().execute(command);
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay,
TimeUnit unit) {
return currentEventLoop().schedule(command, delay, unit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return currentEventLoop().schedule(callable, delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return currentEventLoop().scheduleAtFixedRate(command, initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return currentEventLoop().scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
@Override
public boolean inEventLoop() {
return SingleThreadEventExecutor.currentEventLoop() != null;
}
private static EventExecutor currentEventLoop() {
EventExecutor loop = SingleThreadEventExecutor.currentEventLoop();
if (loop == null) {
throw new IllegalStateException("not called from an event loop thread");
}
return loop;
}
}

View File

@ -1,208 +1,38 @@
package io.netty.channel;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
public class MultithreadEventLoop implements EventLoop {
public abstract class MultithreadEventLoop extends MultithreadEventExecutor implements EventLoop {
protected static final int DEFAULT_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
protected static final ThreadFactory DEFAULT_THREAD_FACTORY = Executors.defaultThreadFactory();
private final EventLoop[] children;
private final AtomicInteger childIndex = new AtomicInteger();
public MultithreadEventLoop(EventLoopFactory<? extends SingleThreadEventLoop> loopFactory) {
this(loopFactory, DEFAULT_POOL_SIZE);
protected MultithreadEventLoop(int nThreads, Object... args) {
super(nThreads, args);
}
public MultithreadEventLoop(EventLoopFactory<? extends SingleThreadEventLoop> loopFactory, int nThreads) {
this(loopFactory, nThreads, DEFAULT_THREAD_FACTORY);
protected MultithreadEventLoop(int nThreads, ThreadFactory threadFactory,
Object... args) {
super(nThreads, threadFactory, args);
}
public MultithreadEventLoop(EventLoopFactory<? extends SingleThreadEventLoop> loopFactory, int nThreads, ThreadFactory threadFactory) {
if (loopFactory == null) {
throw new NullPointerException("loopFactory");
}
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format(
"nThreads: %d (expected: > 0)", nThreads));
}
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
children = new EventLoop[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = loopFactory.newEventLoop(threadFactory);
success = true;
} catch (Exception e) {
throw new EventLoopException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdown();
}
}
}
}
protected MultithreadEventLoop(Object... args) {
super(args);
}
@Override
public void shutdown() {
for (EventLoop l: children) {
l.shutdown();
}
}
protected abstract EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception;
@Override
public List<Runnable> shutdownNow() {
for (EventLoop l: children) {
l.shutdownNow();
}
return Collections.emptyList();
}
@Override
public boolean isShutdown() {
for (EventLoop l: children) {
if (!l.isShutdown()) {
return false;
}
}
return true;
}
@Override
public boolean isTerminated() {
for (EventLoop l: children) {
if (!l.isTerminated()) {
return false;
}
}
return true;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long deadline = System.nanoTime() + unit.toNanos(timeout);
loop: for (EventLoop l: children) {
for (;;) {
long timeLeft = deadline - System.nanoTime();
if (timeLeft <= 0) {
break loop;
}
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
}
return isTerminated();
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return currentEventLoop().submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return currentEventLoop().submit(task, result);
}
@Override
public Future<?> submit(Runnable task) {
return currentEventLoop().submit(task);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return currentEventLoop().invokeAll(tasks);
}
@Override
public <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return currentEventLoop().invokeAll(tasks, timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return currentEventLoop().invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
return currentEventLoop().invokeAny(tasks, timeout, unit);
}
@Override
public void execute(Runnable command) {
currentEventLoop().execute(command);
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay,
TimeUnit unit) {
return currentEventLoop().schedule(command, delay, unit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return currentEventLoop().schedule(callable, delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return currentEventLoop().scheduleAtFixedRate(command, initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return currentEventLoop().scheduleWithFixedDelay(command, initialDelay, delay, unit);
public EventLoop parent() {
return (EventLoop) super.parent();
}
@Override
public ChannelFuture register(Channel channel) {
return nextEventLoop().register(channel);
return ((EventLoop) unsafe().nextChild()).register(channel);
}
@Override
public ChannelFuture register(Channel channel, ChannelFuture future) {
return nextEventLoop().register(channel, future);
}
@Override
public boolean inEventLoop() {
return SingleThreadEventLoop.CURRENT_EVENT_LOOP.get() != null;
}
private EventLoop nextEventLoop() {
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}
private static SingleThreadEventLoop currentEventLoop() {
SingleThreadEventLoop loop = SingleThreadEventLoop.CURRENT_EVENT_LOOP.get();
if (loop == null) {
throw new IllegalStateException("not called from an event loop thread");
}
return loop;
return ((EventLoop) unsafe().nextChild()).register(channel, future);
}
}

View File

@ -0,0 +1,581 @@
package io.netty.channel;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.QueueFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public abstract class SingleThreadEventExecutor extends AbstractExecutorService implements EventExecutor {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
private static final long SCHEDULE_CHECK_INTERVAL = TimeUnit.MILLISECONDS.toNanos(10);
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
private static final long START_TIME = System.nanoTime();
private static final AtomicLong nextTaskId = new AtomicLong();
static final ThreadLocal<SingleThreadEventExecutor> CURRENT_EVENT_LOOP = new ThreadLocal<SingleThreadEventExecutor>();
public static SingleThreadEventExecutor currentEventLoop() {
return CURRENT_EVENT_LOOP.get();
}
private static long nanoTime() {
return System.nanoTime() - START_TIME;
}
private static long deadlineNanos(long delay) {
return nanoTime() + delay;
}
private final EventExecutor parent;
private final Unsafe unsafe = new Unsafe() {
@Override
public EventExecutor nextChild() {
return SingleThreadEventExecutor.this;
}
};
private final BlockingQueue<Runnable> taskQueue = QueueFactory.createQueue();
private final Thread thread;
private final Object stateLock = new Object();
private final Semaphore threadLock = new Semaphore(0);
// TODO: Use PriorityQueue to reduce the locking overhead of DelayQueue.
private final Queue<ScheduledFutureTask<?>> scheduledTasks = new DelayQueue<ScheduledFutureTask<?>>();
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
/** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */
private volatile int state;
private long lastCheckTimeNanos;
private long lastPurgeTimeNanos;
protected SingleThreadEventExecutor(EventExecutor parent) {
this(parent, Executors.defaultThreadFactory());
}
protected SingleThreadEventExecutor(EventExecutor parent, ThreadFactory threadFactory) {
this.parent = parent;
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
CURRENT_EVENT_LOOP.set(SingleThreadEventExecutor.this);
try {
SingleThreadEventExecutor.this.run();
} finally {
synchronized (stateLock) {
state = 3;
}
try {
cancelScheduledTasks();
runShutdownHooks();
cleanup();
} finally {
threadLock.release();
assert taskQueue.isEmpty();
}
}
}
});
}
@Override
public EventExecutor parent() {
return parent;
}
@Override
public Unsafe unsafe() {
return unsafe;
}
protected void interruptThread() {
thread.interrupt();
}
protected Runnable pollTask() {
assert inEventLoop();
Runnable task = taskQueue.poll();
if (task != null) {
return task;
}
if (fetchScheduledTasks()) {
task = taskQueue.poll();
return task;
}
return null;
}
protected Runnable takeTask() throws InterruptedException {
assert inEventLoop();
for (;;) {
Runnable task = taskQueue.poll(SCHEDULE_CHECK_INTERVAL * 2 / 3, TimeUnit.NANOSECONDS);
if (task != null) {
return task;
}
fetchScheduledTasks();
task = taskQueue.poll();
if (task != null) {
return task;
}
}
}
protected Runnable peekTask() {
assert inEventLoop();
Runnable task = taskQueue.peek();
if (task != null) {
return task;
}
if (fetchScheduledTasks()) {
task = taskQueue.peek();
return task;
}
return null;
}
protected boolean hasTasks() {
assert inEventLoop();
boolean empty = taskQueue.isEmpty();
if (!empty) {
return true;
}
if (fetchScheduledTasks()) {
return !taskQueue.isEmpty();
}
return false;
}
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (isShutdown()) {
reject();
}
taskQueue.add(task);
}
protected boolean removeTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
return taskQueue.remove(task);
}
protected void runAllTasks() {
for (;;) {
final Runnable task = pollTask();
if (task == null) {
break;
}
task.run();
}
}
protected abstract void run();
protected void cleanup() {
// Do nothing. Subclasses will override.
}
protected abstract void wakeup(boolean inEventLoop);
@Override
public boolean inEventLoop() {
return Thread.currentThread() == thread;
}
public void addShutdownHook(final Runnable task) {
if (inEventLoop()) {
shutdownHooks.add(task);
} else {
execute(new Runnable() {
@Override
public void run() {
shutdownHooks.add(task);
}
});
}
}
public void removeShutdownHook(final Runnable task) {
if (inEventLoop()) {
shutdownHooks.remove(task);
} else {
execute(new Runnable() {
@Override
public void run() {
shutdownHooks.remove(task);
}
});
}
}
private void runShutdownHooks() {
// Note shutdown hooks can add / remove shutdown hooks.
while (!shutdownHooks.isEmpty()) {
List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
shutdownHooks.clear();
for (Runnable task: copy) {
try {
task.run();
} catch (Throwable t) {
logger.warn("Shutdown hook raised an exception.", t);
}
}
}
}
@Override
public void shutdown() {
boolean inEventLoop = inEventLoop();
boolean wakeup = false;
if (inEventLoop) {
synchronized (stateLock) {
assert state == 1;
state = 2;
wakeup = true;
}
} else {
synchronized (stateLock) {
switch (state) {
case 0:
state = 3;
try {
cleanup();
} finally {
threadLock.release();
}
break;
case 1:
state = 2;
wakeup = true;
break;
}
}
}
if (wakeup) {
wakeup(inEventLoop);
}
}
@Override
public List<Runnable> shutdownNow() {
shutdown();
return Collections.emptyList();
}
@Override
public boolean isShutdown() {
return state >= 2;
}
@Override
public boolean isTerminated() {
return state == 3;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
if (unit == null) {
throw new NullPointerException("unit");
}
if (inEventLoop()) {
throw new IllegalStateException("cannot await termination of the current thread");
}
if (threadLock.tryAcquire(timeout, unit)) {
threadLock.release();
}
return isTerminated();
}
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (inEventLoop()) {
addTask(task);
wakeup(true);
} else {
synchronized (stateLock) {
if (state == 0) {
state = 1;
thread.start();
}
}
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
wakeup(false);
}
}
private static void reject() {
throw new RejectedExecutionException("event loop shut down");
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (command == null) {
throw new NullPointerException("command");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (delay < 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: >= 0)", delay));
}
return schedule(new ScheduledFutureTask<Void>(command, null, deadlineNanos(unit.toNanos(delay))));
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
if (callable == null) {
throw new NullPointerException("callable");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (delay < 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: >= 0)", delay));
}
return schedule(new ScheduledFutureTask<V>(callable, deadlineNanos(unit.toNanos(delay))));
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
if (command == null) {
throw new NullPointerException("command");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (initialDelay < 0) {
throw new IllegalArgumentException(
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
}
if (period <= 0) {
throw new IllegalArgumentException(
String.format("period: %d (expected: > 0)", period));
}
return schedule(new ScheduledFutureTask<Void>(
command, null, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
if (command == null) {
throw new NullPointerException("command");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (initialDelay < 0) {
throw new IllegalArgumentException(
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
}
if (delay <= 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: > 0)", delay));
}
return schedule(new ScheduledFutureTask<Void>(
command, null, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
}
private <V> ScheduledFuture<V> schedule(ScheduledFutureTask<V> task) {
if (isShutdown()) {
reject();
}
scheduledTasks.add(task);
if (isShutdown()) {
task.cancel(false);
}
if (!inEventLoop()) {
synchronized (stateLock) {
if (state == 0) {
state = 1;
thread.start();
}
}
} else {
fetchScheduledTasks();
}
return task;
}
private boolean fetchScheduledTasks() {
if (scheduledTasks.isEmpty()) {
return false;
}
long nanoTime = nanoTime();
if (nanoTime - lastPurgeTimeNanos >= SCHEDULE_PURGE_INTERVAL) {
for (Iterator<ScheduledFutureTask<?>> i = scheduledTasks.iterator(); i.hasNext();) {
ScheduledFutureTask<?> task = i.next();
if (task.isCancelled()) {
i.remove();
}
}
}
if (nanoTime - lastCheckTimeNanos >= SCHEDULE_CHECK_INTERVAL) {
boolean added = false;
for (;;) {
ScheduledFutureTask<?> task = scheduledTasks.poll();
if (task == null) {
break;
}
if (!task.isCancelled()) {
if (isShutdown()) {
task.cancel(false);
} else {
taskQueue.add(task);
added = true;
}
}
}
return added;
}
return false;
}
private void cancelScheduledTasks() {
if (scheduledTasks.isEmpty()) {
return;
}
for (ScheduledFutureTask<?> task: scheduledTasks.toArray(new ScheduledFutureTask<?>[scheduledTasks.size()])) {
task.cancel(false);
}
scheduledTasks.clear();
}
private class ScheduledFutureTask<V> extends FutureTask<V> implements ScheduledFuture<V> {
private final long id = nextTaskId.getAndIncrement();
private long deadlineNanos;
/** 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
private final long periodNanos;
ScheduledFutureTask(Runnable runnable, V result, long nanoTime) {
super(runnable, result);
this.deadlineNanos = nanoTime;
this.periodNanos = 0;
}
ScheduledFutureTask(Runnable runnable, V result, long nanoTime, long period) {
super(runnable, result);
if (period == 0) {
throw new IllegalArgumentException(
String.format("period: %d (expected: != 0)", period));
}
this.deadlineNanos = nanoTime;
this.periodNanos = period;
}
ScheduledFutureTask(Callable<V> callable, long nanoTime) {
super(callable);
this.deadlineNanos = nanoTime;
this.periodNanos = 0;
}
public long deadlineNanos() {
return deadlineNanos;
}
public long delayNanos() {
return Math.max(0, deadlineNanos() - nanoTime());
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(delayNanos(), TimeUnit.NANOSECONDS);
}
@Override
public int compareTo(Delayed o) {
if (this == o) {
return 0;
}
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
long d = deadlineNanos() - that.deadlineNanos();
if (d < 0) {
return -1;
} else if (d > 0) {
return 1;
} else if (id < that.id) {
return -1;
} else if (id == that.id) {
throw new Error();
} else {
return 1;
}
}
@Override
public void run() {
if (periodNanos == 0) {
super.run();
} else {
boolean reset = runAndReset();
if (reset && !isShutdown()) {
long p = periodNanos;
if (p > 0) {
deadlineNanos += p;
} else {
deadlineNanos = nanoTime() - p;
}
schedule(this);
}
}
}
}
}

View File

@ -1,93 +1,20 @@
package io.netty.channel;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.QueueFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public abstract class SingleThreadEventLoop extends AbstractExecutorService implements EventLoop {
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SingleThreadEventLoop.class);
private static final long SCHEDULE_CHECK_INTERVAL = TimeUnit.MILLISECONDS.toNanos(10);
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
private static final long START_TIME = System.nanoTime();
private static final AtomicLong nextTaskId = new AtomicLong();
static final ThreadLocal<SingleThreadEventLoop> CURRENT_EVENT_LOOP = new ThreadLocal<SingleThreadEventLoop>();
public static SingleThreadEventLoop currentEventLoop() {
return CURRENT_EVENT_LOOP.get();
protected SingleThreadEventLoop(EventLoop parent) {
super(parent);
}
private static long nanoTime() {
return System.nanoTime() - START_TIME;
protected SingleThreadEventLoop(EventLoop parent, ThreadFactory threadFactory) {
super(parent, threadFactory);
}
private static long deadlineNanos(long delay) {
return nanoTime() + delay;
}
// Fields for event loop
private final BlockingQueue<Runnable> taskQueue = QueueFactory.createQueue();
private final Thread thread;
private final Object stateLock = new Object();
private final Semaphore threadLock = new Semaphore(0);
// TODO: Use PriorityQueue to reduce the locking overhead of DelayQueue.
private final Queue<ScheduledFutureTask<?>> scheduledTasks = new DelayQueue<ScheduledFutureTask<?>>();
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
/** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */
private volatile int state;
private long lastCheckTimeNanos;
private long lastPurgeTimeNanos;
protected SingleThreadEventLoop() {
this(Executors.defaultThreadFactory());
}
protected SingleThreadEventLoop(ThreadFactory threadFactory) {
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
CURRENT_EVENT_LOOP.set(SingleThreadEventLoop.this);
try {
SingleThreadEventLoop.this.run();
} finally {
synchronized (stateLock) {
state = 3;
}
try {
cancelScheduledTasks();
runShutdownHooks();
cleanup();
} finally {
threadLock.release();
assert taskQueue.isEmpty();
}
}
}
});
@Override
public EventLoop parent() {
return (EventLoop) super.parent();
}
@Override
@ -112,475 +39,4 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl
}
return future;
}
protected void interruptThread() {
thread.interrupt();
}
protected Runnable pollTask() {
assert inEventLoop();
Runnable task = taskQueue.poll();
if (task != null) {
return task;
}
if (fetchScheduledTasks()) {
task = taskQueue.poll();
return task;
}
return null;
}
protected Runnable takeTask() throws InterruptedException {
assert inEventLoop();
for (;;) {
Runnable task = taskQueue.poll(SCHEDULE_CHECK_INTERVAL * 2 / 3, TimeUnit.NANOSECONDS);
if (task != null) {
return task;
}
fetchScheduledTasks();
task = taskQueue.poll();
if (task != null) {
return task;
}
}
}
protected Runnable peekTask() {
assert inEventLoop();
Runnable task = taskQueue.peek();
if (task != null) {
return task;
}
if (fetchScheduledTasks()) {
task = taskQueue.peek();
return task;
}
return null;
}
protected boolean hasTasks() {
assert inEventLoop();
boolean empty = taskQueue.isEmpty();
if (!empty) {
return true;
}
if (fetchScheduledTasks()) {
return !taskQueue.isEmpty();
}
return false;
}
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (isShutdown()) {
reject();
}
taskQueue.add(task);
}
protected boolean removeTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
return taskQueue.remove(task);
}
protected void runAllTasks() {
for (;;) {
final Runnable task = pollTask();
if (task == null) {
break;
}
task.run();
}
}
protected abstract void run();
protected void cleanup() {
// Do nothing. Subclasses will override.
}
protected abstract void wakeup(boolean inEventLoop);
@Override
public boolean inEventLoop() {
return Thread.currentThread() == thread;
}
public void addShutdownHook(final Runnable task) {
if (inEventLoop()) {
shutdownHooks.add(task);
} else {
execute(new Runnable() {
@Override
public void run() {
shutdownHooks.add(task);
}
});
}
}
public void removeShutdownHook(final Runnable task) {
if (inEventLoop()) {
shutdownHooks.remove(task);
} else {
execute(new Runnable() {
@Override
public void run() {
shutdownHooks.remove(task);
}
});
}
}
private void runShutdownHooks() {
// Note shutdown hooks can add / remove shutdown hooks.
while (!shutdownHooks.isEmpty()) {
List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
shutdownHooks.clear();
for (Runnable task: copy) {
try {
task.run();
} catch (Throwable t) {
logger.warn("Shutdown hook raised an exception.", t);
}
}
}
}
@Override
public void shutdown() {
boolean inEventLoop = inEventLoop();
boolean wakeup = false;
if (inEventLoop) {
synchronized (stateLock) {
assert state == 1;
state = 2;
wakeup = true;
}
} else {
synchronized (stateLock) {
switch (state) {
case 0:
state = 3;
try {
cleanup();
} finally {
threadLock.release();
}
break;
case 1:
state = 2;
wakeup = true;
break;
}
}
}
if (wakeup) {
wakeup(inEventLoop);
}
}
@Override
public List<Runnable> shutdownNow() {
shutdown();
return Collections.emptyList();
}
@Override
public boolean isShutdown() {
return state >= 2;
}
@Override
public boolean isTerminated() {
return state == 3;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
if (unit == null) {
throw new NullPointerException("unit");
}
if (inEventLoop()) {
throw new IllegalStateException("cannot await termination of the current thread");
}
if (threadLock.tryAcquire(timeout, unit)) {
threadLock.release();
}
return isTerminated();
}
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (inEventLoop()) {
addTask(task);
wakeup(true);
} else {
synchronized (stateLock) {
if (state == 0) {
state = 1;
thread.start();
}
}
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
wakeup(false);
}
}
private static void reject() {
throw new RejectedExecutionException("event loop shut down");
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (command == null) {
throw new NullPointerException("command");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (delay < 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: >= 0)", delay));
}
return schedule(new ScheduledFutureTask<Void>(command, null, deadlineNanos(unit.toNanos(delay))));
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
if (callable == null) {
throw new NullPointerException("callable");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (delay < 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: >= 0)", delay));
}
return schedule(new ScheduledFutureTask<V>(callable, deadlineNanos(unit.toNanos(delay))));
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
if (command == null) {
throw new NullPointerException("command");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (initialDelay < 0) {
throw new IllegalArgumentException(
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
}
if (period <= 0) {
throw new IllegalArgumentException(
String.format("period: %d (expected: > 0)", period));
}
return schedule(new ScheduledFutureTask<Void>(
command, null, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
if (command == null) {
throw new NullPointerException("command");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (initialDelay < 0) {
throw new IllegalArgumentException(
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
}
if (delay <= 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: > 0)", delay));
}
return schedule(new ScheduledFutureTask<Void>(
command, null, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
}
private <V> ScheduledFuture<V> schedule(ScheduledFutureTask<V> task) {
if (isShutdown()) {
reject();
}
scheduledTasks.add(task);
if (isShutdown()) {
task.cancel(false);
}
if (!inEventLoop()) {
synchronized (stateLock) {
if (state == 0) {
state = 1;
thread.start();
}
}
} else {
fetchScheduledTasks();
}
return task;
}
private boolean fetchScheduledTasks() {
if (scheduledTasks.isEmpty()) {
return false;
}
long nanoTime = nanoTime();
if (nanoTime - lastPurgeTimeNanos >= SCHEDULE_PURGE_INTERVAL) {
for (Iterator<ScheduledFutureTask<?>> i = scheduledTasks.iterator(); i.hasNext();) {
ScheduledFutureTask<?> task = i.next();
if (task.isCancelled()) {
i.remove();
}
}
}
if (nanoTime - lastCheckTimeNanos >= SCHEDULE_CHECK_INTERVAL) {
boolean added = false;
for (;;) {
ScheduledFutureTask<?> task = scheduledTasks.poll();
if (task == null) {
break;
}
if (!task.isCancelled()) {
if (isShutdown()) {
task.cancel(false);
} else {
taskQueue.add(task);
added = true;
}
}
}
return added;
}
return false;
}
private void cancelScheduledTasks() {
if (scheduledTasks.isEmpty()) {
return;
}
for (ScheduledFutureTask<?> task: scheduledTasks.toArray(new ScheduledFutureTask<?>[scheduledTasks.size()])) {
task.cancel(false);
}
scheduledTasks.clear();
}
private class ScheduledFutureTask<V> extends FutureTask<V> implements ScheduledFuture<V> {
private final long id = nextTaskId.getAndIncrement();
private long deadlineNanos;
/** 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
private final long periodNanos;
ScheduledFutureTask(Runnable runnable, V result, long nanoTime) {
super(runnable, result);
this.deadlineNanos = nanoTime;
this.periodNanos = 0;
}
ScheduledFutureTask(Runnable runnable, V result, long nanoTime, long period) {
super(runnable, result);
if (period == 0) {
throw new IllegalArgumentException(
String.format("period: %d (expected: != 0)", period));
}
this.deadlineNanos = nanoTime;
this.periodNanos = period;
}
ScheduledFutureTask(Callable<V> callable, long nanoTime) {
super(callable);
this.deadlineNanos = nanoTime;
this.periodNanos = 0;
}
public long deadlineNanos() {
return deadlineNanos;
}
public long delayNanos() {
return Math.max(0, deadlineNanos() - nanoTime());
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(delayNanos(), TimeUnit.NANOSECONDS);
}
@Override
public int compareTo(Delayed o) {
if (this == o) {
return 0;
}
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
long d = deadlineNanos() - that.deadlineNanos();
if (d < 0) {
return -1;
} else if (d > 0) {
return 1;
} else if (id < that.id) {
return -1;
} else if (id == that.id) {
throw new Error();
} else {
return 1;
}
}
@Override
public void run() {
if (periodNanos == 0) {
super.run();
} else {
boolean reset = runAndReset();
if (reset && !isShutdown()) {
long p = periodNanos;
if (p > 0) {
deadlineNanos += p;
} else {
deadlineNanos = nanoTime() - p;
}
schedule(this);
}
}
}
}
}

View File

@ -186,7 +186,7 @@ public class LocalChannel extends AbstractChannel {
assert peer != null;
Queue<Object> in = buf.messageBuffer();
Queue<Object> out = peer.pipeline().inbound().messageBuffer();
Queue<Object> out = peer.pipeline().inboundMessageBuffer();
for (;;) {
Object msg = in.poll();
if (msg == null) {

View File

@ -15,14 +15,15 @@
*/
package io.netty.channel.local;
import io.netty.channel.EventLoop;
import io.netty.channel.SingleThreadEventLoop;
import java.util.concurrent.ThreadFactory;
final class LocalChildEventLoop extends SingleThreadEventLoop {
LocalChildEventLoop(ThreadFactory threadFactory) {
super(threadFactory);
LocalChildEventLoop(EventLoop parent, ThreadFactory threadFactory) {
super(parent, threadFactory);
}
@Override

View File

@ -1,26 +1,24 @@
package io.netty.channel.local;
import io.netty.channel.EventLoopFactory;
import io.netty.channel.EventExecutor;
import io.netty.channel.MultithreadEventLoop;
import java.util.concurrent.ThreadFactory;
public class LocalEventLoop extends MultithreadEventLoop {
public LocalEventLoop() {
this(DEFAULT_POOL_SIZE);
}
public LocalEventLoop() {}
public LocalEventLoop(int nThreads) {
this(nThreads, DEFAULT_THREAD_FACTORY);
super(nThreads);
}
public LocalEventLoop(int nThreads, ThreadFactory threadFactory) {
super(new EventLoopFactory<LocalChildEventLoop>() {
@Override
public LocalChildEventLoop newEventLoop(ThreadFactory threadFactory) throws Exception {
return new LocalChildEventLoop(threadFactory);
}
}, nThreads, threadFactory);
super(nThreads, threadFactory);
}
@Override
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
return new LocalChildEventLoop(this, threadFactory);
}
}

View File

@ -119,7 +119,7 @@ public class LocalServerChannel extends AbstractServerChannel {
private void serve0(final LocalChannel child) {
if (eventLoop().inEventLoop()) {
pipeline().inbound().messageBuffer().add(child);
pipeline().inboundMessageBuffer().add(child);
pipeline().fireInboundBufferUpdated();
} else {
eventLoop().execute(new Runnable() {

View File

@ -27,11 +27,10 @@ abstract class AbstractNioMessageChannel extends AbstractNioChannel {
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
final ChannelBufferHolder<Object> buf = pipeline.inbound();
final Queue<Object> msgBuf = pipeline.inboundMessageBuffer();
boolean closed = false;
boolean read = false;
try {
Queue<Object> msgBuf = buf.messageBuffer();
for (;;) {
int localReadAmount = doReadMessages(msgBuf);
if (localReadAmount > 0) {

View File

@ -28,11 +28,10 @@ abstract class AbstractNioStreamChannel extends AbstractNioChannel {
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
final ChannelBufferHolder<Object> buf = pipeline.inbound();
final ChannelBuffer byteBuf = pipeline.inboundByteBuffer();
boolean closed = false;
boolean read = false;
try {
ChannelBuffer byteBuf = buf.byteBuffer();
expandReadBuffer(byteBuf);
for (;;) {
int localReadAmount = doReadBytes(byteBuf);

View File

@ -60,8 +60,8 @@ final class NioChildEventLoop extends SingleThreadEventLoop {
private int cancelledKeys;
private boolean cleanedCancelledKeys;
NioChildEventLoop(ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(threadFactory);
NioChildEventLoop(NioEventLoop parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}

View File

@ -1,6 +1,6 @@
package io.netty.channel.socket.nio;
import io.netty.channel.EventLoopFactory;
import io.netty.channel.EventExecutor;
import io.netty.channel.MultithreadEventLoop;
import java.nio.channels.spi.SelectorProvider;
@ -8,25 +8,28 @@ import java.util.concurrent.ThreadFactory;
public class NioEventLoop extends MultithreadEventLoop {
public NioEventLoop() {
this(DEFAULT_POOL_SIZE);
}
public NioEventLoop() {}
public NioEventLoop(int nThreads) {
this(nThreads, DEFAULT_THREAD_FACTORY);
super(nThreads);
}
public NioEventLoop(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
super(nThreads, threadFactory);
}
public NioEventLoop(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
super(new EventLoopFactory<NioChildEventLoop>() {
@Override
public NioChildEventLoop newEventLoop(ThreadFactory threadFactory) throws Exception {
return new NioChildEventLoop(threadFactory, selectorProvider);
}
super(nThreads, threadFactory, selectorProvider);
}
}, nThreads, threadFactory);
@Override
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
SelectorProvider selectorProvider;
if (args == null || args.length == 0 || args[0] == null) {
selectorProvider = SelectorProvider.provider();
} else {
selectorProvider = (SelectorProvider) args[0];
}
return new NioChildEventLoop(this, threadFactory, selectorProvider);
}
}

View File

@ -25,11 +25,10 @@ abstract class AbstractOioMessageChannel extends AbstractOioChannel {
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
final ChannelBufferHolder<Object> buf = pipeline.inbound();
final Queue<Object> msgBuf = pipeline.inboundMessageBuffer();
boolean closed = false;
boolean read = false;
try {
Queue<Object> msgBuf = buf.messageBuffer();
int localReadAmount = doReadMessages(msgBuf);
if (localReadAmount > 0) {
read = true;

View File

@ -25,11 +25,10 @@ abstract class AbstractOioStreamChannel extends AbstractOioChannel {
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
final ChannelBufferHolder<Object> buf = pipeline.inbound();
final ChannelBuffer byteBuf = pipeline.inboundByteBuffer();
boolean closed = false;
boolean read = false;
try {
ChannelBuffer byteBuf = buf.byteBuffer();
expandReadBuffer(byteBuf);
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount > 0) {

View File

@ -8,12 +8,10 @@ import io.netty.channel.SingleThreadEventLoop;
class OioChildEventLoop extends SingleThreadEventLoop {
private final OioEventLoop parent;
private AbstractOioChannel ch;
OioChildEventLoop(OioEventLoop parent) {
super(parent.threadFactory);
this.parent = parent;
super(parent, parent.threadFactory);
}
@Override
@ -71,6 +69,7 @@ class OioChildEventLoop extends SingleThreadEventLoop {
private void deregister() {
ch = null;
OioEventLoop parent = (OioEventLoop) parent();
parent.activeChildren.remove(this);
parent.idleChildren.add(this);
}

View File

@ -4,8 +4,9 @@ package io.netty.channel.socket.oio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventExecutor;
import io.netty.channel.EventLoop;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.SingleThreadEventExecutor;
import io.netty.util.internal.QueueFactory;
import java.util.Collection;
@ -31,6 +32,12 @@ public class OioEventLoop implements EventLoop {
new ConcurrentHashMap<OioChildEventLoop, Boolean>());
final Queue<OioChildEventLoop> idleChildren = QueueFactory.createQueue();
private final ChannelException tooManyChannels;
private final Unsafe unsafe = new Unsafe() {
@Override
public EventExecutor nextChild() {
throw new UnsupportedOperationException();
}
};
public OioEventLoop() {
this(0);
@ -56,6 +63,16 @@ public class OioEventLoop implements EventLoop {
tooManyChannels.setStackTrace(new StackTraceElement[0]);
}
@Override
public EventLoop parent() {
return null;
}
@Override
public Unsafe unsafe() {
return unsafe;
}
@Override
public void shutdown() {
for (EventLoop l: activeChildren) {
@ -209,7 +226,7 @@ public class OioEventLoop implements EventLoop {
throw new NullPointerException("channel");
}
try {
return nextEventLoop().register(channel);
return nextChild().register(channel);
} catch (Throwable t) {
return channel.newFailedFuture(t);
}
@ -221,7 +238,7 @@ public class OioEventLoop implements EventLoop {
throw new NullPointerException("channel");
}
try {
return nextEventLoop().register(channel, future);
return nextChild().register(channel, future);
} catch (Throwable t) {
return channel.newFailedFuture(t);
}
@ -229,16 +246,16 @@ public class OioEventLoop implements EventLoop {
@Override
public boolean inEventLoop() {
return SingleThreadEventLoop.currentEventLoop() != null;
return SingleThreadEventExecutor.currentEventLoop() != null;
}
private EventLoop nextEventLoop() {
private EventLoop nextChild() {
OioChildEventLoop loop = idleChildren.poll();
if (loop == null) {
if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
throw tooManyChannels;
}
loop = new OioChildEventLoop(this);
loop = new OioChildEventLoop(OioEventLoop.this);
}
activeChildren.add(loop);
return loop;
@ -246,7 +263,7 @@ public class OioEventLoop implements EventLoop {
private static OioChildEventLoop currentEventLoop() {
OioChildEventLoop loop =
(OioChildEventLoop) SingleThreadEventLoop.currentEventLoop();
(OioChildEventLoop) SingleThreadEventExecutor.currentEventLoop();
if (loop == null) {
throw new IllegalStateException("not called from an event loop thread");
}

View File

@ -246,6 +246,10 @@ public class SingleThreadEventLoopTest {
final AtomicInteger cleanedUp = new AtomicInteger();
SingleThreadEventLoopImpl() {
super(null);
}
@Override
protected void run() {
for (;;) {

View File

@ -70,7 +70,7 @@ public class LocalChannelRegistryTest {
Channel cc = cb.connect().sync().channel();
// Send a message event up the pipeline.
cc.pipeline().inbound().messageBuffer().add("Hello, World");
cc.pipeline().inboundMessageBuffer().add("Hello, World");
cc.pipeline().fireInboundBufferUpdated();
// Close the channel

View File

@ -0,0 +1,192 @@
package io.netty.channel.local;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandlerContext;
import io.netty.channel.DefaultEventExecutor;
import io.netty.channel.EventExecutor;
import io.netty.channel.EventLoop;
import io.netty.util.internal.QueueFactory;
import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
public class LocalTransportThreadModelTest {
private static ServerBootstrap sb;
private static LocalAddress ADDR;
@BeforeClass
public static void init() {
// Configure a test server
sb = new ServerBootstrap();
sb.eventLoop(new LocalEventLoop(), new LocalEventLoop())
.channel(new LocalServerChannel())
.localAddress(LocalAddress.ANY)
.childInitializer(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundMessageHandlerAdapter<Object>() {
@Override
public void messageReceived(ChannelInboundHandlerContext<Object> ctx, Object msg) {
// Discard
}
});
}
});
ADDR = (LocalAddress) sb.bind().syncUninterruptibly().channel().localAddress();
}
@AfterClass
public static void destroy() {
sb.shutdown();
}
@Test
public void testSimple() throws Exception {
EventLoop l = new LocalEventLoop(4, new PrefixThreadFactory("l"));
EventExecutor e1 = new DefaultEventExecutor(4, new PrefixThreadFactory("e1"));
EventExecutor e2 = new DefaultEventExecutor(4, new PrefixThreadFactory("e2"));
TestHandler h1 = new TestHandler();
TestHandler h2 = new TestHandler();
TestHandler h3 = new TestHandler();
Channel ch = new LocalChannel();
ch.pipeline().addLast(h1);
ch.pipeline().addLast(e1, h2);
ch.pipeline().addLast(e2, h3);
l.register(ch).sync();
ch.connect(ADDR).sync();
ch.pipeline().fireInboundBufferUpdated();
ch.pipeline().context(h1).fireInboundBufferUpdated();
ch.pipeline().context(h2).fireInboundBufferUpdated();
ch.pipeline().context(h3).fireInboundBufferUpdated();
ch.pipeline().flush();
ch.pipeline().context(h3).flush();
ch.pipeline().context(h2).flush();
ch.pipeline().context(h1).flush().sync();
String currentName = Thread.currentThread().getName();
// Events should never be handled from the current thread.
Assert.assertFalse(h1.inboundThreadNames.contains(currentName));
Assert.assertFalse(h2.inboundThreadNames.contains(currentName));
Assert.assertFalse(h3.inboundThreadNames.contains(currentName));
Assert.assertFalse(h1.outboundThreadNames.contains(currentName));
Assert.assertFalse(h2.outboundThreadNames.contains(currentName));
Assert.assertFalse(h3.outboundThreadNames.contains(currentName));
// Assert that events were handled by the correct executor.
for (String name: h1.inboundThreadNames) {
Assert.assertTrue(name.startsWith("l-"));
}
for (String name: h2.inboundThreadNames) {
Assert.assertTrue(name.startsWith("e1-"));
}
for (String name: h3.inboundThreadNames) {
Assert.assertTrue(name.startsWith("e2-"));
}
for (String name: h1.outboundThreadNames) {
Assert.assertTrue(name.startsWith("l-"));
}
for (String name: h2.outboundThreadNames) {
Assert.assertTrue(name.startsWith("e1-"));
}
for (String name: h3.outboundThreadNames) {
Assert.assertTrue(name.startsWith("e2-"));
}
// Assert that the events for the same handler were handled by the same thread.
Set<String> names = new HashSet<String>();
names.addAll(h1.inboundThreadNames);
names.addAll(h1.outboundThreadNames);
Assert.assertEquals(1, names.size());
names.clear();
names.addAll(h2.inboundThreadNames);
names.addAll(h2.outboundThreadNames);
Assert.assertEquals(1, names.size());
names.clear();
names.addAll(h3.inboundThreadNames);
names.addAll(h3.outboundThreadNames);
Assert.assertEquals(1, names.size());
// Count the number of events
Assert.assertEquals(1, h1.inboundThreadNames.size());
Assert.assertEquals(2, h2.inboundThreadNames.size());
Assert.assertEquals(3, h3.inboundThreadNames.size());
Assert.assertEquals(3, h1.outboundThreadNames.size());
Assert.assertEquals(2, h2.outboundThreadNames.size());
Assert.assertEquals(1, h3.outboundThreadNames.size());
}
private static class TestHandler extends ChannelHandlerAdapter<Object, Object> {
private final Queue<String> inboundThreadNames = QueueFactory.createQueue();
private final Queue<String> outboundThreadNames = QueueFactory.createQueue();
@Override
public ChannelBufferHolder<Object> newInboundBuffer(
ChannelInboundHandlerContext<Object> ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
}
@Override
public ChannelBufferHolder<Object> newOutboundBuffer(
ChannelOutboundHandlerContext<Object> ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
}
@Override
public void inboundBufferUpdated(
ChannelInboundHandlerContext<Object> ctx) throws Exception {
ctx.inbound().messageBuffer().clear();
inboundThreadNames.add(Thread.currentThread().getName());
ctx.fireInboundBufferUpdated();
}
@Override
public void flush(ChannelOutboundHandlerContext<Object> ctx,
ChannelFuture future) throws Exception {
ctx.outbound().messageBuffer().clear();
outboundThreadNames.add(Thread.currentThread().getName());
ctx.flush(future);
}
}
private static class PrefixThreadFactory implements ThreadFactory {
private final String prefix;
private final AtomicInteger id = new AtomicInteger();
public PrefixThreadFactory(String prefix) {
this.prefix = prefix;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(prefix + '-' + id.incrementAndGet());
return t;
}
}
}