From 116054a364f74d10f9b3161dd60b2d72ab05f9c6 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Tue, 3 Apr 2012 22:03:04 +0900 Subject: [PATCH] Initial incomplete checkin of the event loop API --- .../main/java/io/netty/channel/Channel.java | 30 +- .../main/java/io/netty/channel/EventLoop.java | 9 + .../netty/channel/SingleThreadEventLoop.java | 206 +++++++++++++ .../java/io/netty/channel/socket/Worker.java | 31 -- .../channel/socket/nio/AbstractNioWorker.java | 272 ++++-------------- .../netty/channel/socket/nio/NioChannel.java | 27 -- .../channel/SingleThreadEventLoopTest.java | 162 +++++++++++ 7 files changed, 462 insertions(+), 275 deletions(-) create mode 100644 transport/src/main/java/io/netty/channel/EventLoop.java create mode 100644 transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java delete mode 100644 transport/src/main/java/io/netty/channel/socket/Worker.java delete mode 100644 transport/src/main/java/io/netty/channel/socket/nio/NioChannel.java create mode 100644 transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index b08bc68a40..945b406e82 100644 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -15,16 +15,17 @@ */ package io.netty.channel; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.channels.NotYetConnectedException; -import java.nio.channels.SelectionKey; - import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannelConfig; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.NotYetConnectedException; +import java.nio.channels.SelectionKey; + /** * A nexus to a network socket or a component which is capable of I/O @@ -136,6 +137,8 @@ public interface Channel extends Comparable { */ Integer getId(); + EventLoop eventLoop(); + /** * Returns the {@link ChannelFactory} which created this channel. */ @@ -372,4 +375,21 @@ public interface Channel extends Comparable { * Attaches an object to this {@link Channel} to store a stateful information */ void setAttachment(Object attachment); + + Unsafe unsafe(); + + public interface Unsafe { + void setEventLoop(EventLoop eventLoop); + void clearEventLoop(); + java.nio.channels.Channel ch(); + + void bind(SocketAddress local) throws IOException; + void connect(SocketAddress remote) throws IOException; + boolean finishConnect() throws IOException; + boolean read() throws IOException; + boolean write() throws IOException; + void unbind() throws IOException; + void disconnect() throws IOException; + void close() throws IOException; + } } diff --git a/transport/src/main/java/io/netty/channel/EventLoop.java b/transport/src/main/java/io/netty/channel/EventLoop.java new file mode 100644 index 0000000000..80026f6168 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/EventLoop.java @@ -0,0 +1,9 @@ +package io.netty.channel; + +import java.util.concurrent.ExecutorService; + +public interface EventLoop extends ExecutorService { + ChannelFuture attach(Channel channel); + void attach(Channel channel, ChannelFuture future); + boolean inEventLoop(); +} diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java new file mode 100644 index 0000000000..8b5488a1ed --- /dev/null +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -0,0 +1,206 @@ +package io.netty.channel; + +import io.netty.util.internal.QueueFactory; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +public abstract class SingleThreadEventLoop extends AbstractExecutorService implements EventLoop { + + private final BlockingQueue taskQueue = QueueFactory.createQueue(Runnable.class); + private final Thread thread; + private final Object stateLock = new Object(); + private final Semaphore threadLock = new Semaphore(0); + /** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */ + private volatile int state; + + protected SingleThreadEventLoop() { + this(Executors.defaultThreadFactory()); + } + + protected SingleThreadEventLoop(ThreadFactory threadFactory) { + thread = threadFactory.newThread(new Runnable() { + @Override + public void run() { + try { + SingleThreadEventLoop.this.run(); + } finally { + synchronized (stateLock) { + state = 3; + } + try { + cleanup(); + } finally { + threadLock.release(); + assert taskQueue.isEmpty(); + } + } + } + }); + } + + public ChannelFuture attach(Channel channel) { + ChannelFuture future = new DefaultChannelFuture(channel, false); + attach(channel, future); + return future; + } + + protected void interruptThread() { + thread.interrupt(); + } + + protected Runnable pollTask() { + assert inEventLoop(); + return taskQueue.poll(); + } + + protected Runnable takeTask() throws InterruptedException { + assert inEventLoop(); + return taskQueue.take(); + } + + protected Runnable peekTask() { + assert inEventLoop(); + return taskQueue.peek(); + } + + protected boolean hasTasks() { + assert inEventLoop(); + return !taskQueue.isEmpty(); + } + + protected void addTask(Runnable task) { + if (task == null) { + throw new NullPointerException("task"); + } + if (isShutdown()) { + reject(); + } + taskQueue.add(task); + } + + protected boolean removeTask(Runnable task) { + if (task == null) { + throw new NullPointerException("task"); + } + return taskQueue.remove(task); + } + + protected abstract void run(); + + protected void cleanup() { + // Do nothing. Subclasses will override. + } + + protected abstract void wakeup(boolean inEventLoop); + + @Override + public boolean inEventLoop() { + return Thread.currentThread() == thread; + } + + @Override + public void shutdown() { + boolean inEventLoop = inEventLoop(); + boolean wakeup = false; + if (inEventLoop) { + synchronized (stateLock) { + assert state == 1; + state = 2; + wakeup = true; + } + } else { + synchronized (stateLock) { + switch (state) { + case 0: + state = 3; + try { + cleanup(); + } finally { + threadLock.release(); + } + break; + case 1: + state = 2; + wakeup = true; + break; + } + } + } + + if (wakeup) { + wakeup(inEventLoop); + } + } + + @Override + public List shutdownNow() { + shutdown(); + return Collections.emptyList(); + } + + @Override + public boolean isShutdown() { + return state >= 2; + } + + @Override + public boolean isTerminated() { + return state == 3; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + if (unit == null) { + throw new NullPointerException("unit"); + } + + if (inEventLoop()) { + throw new IllegalStateException("cannot await termination of the current thread"); + } + + if (threadLock.tryAcquire(timeout, unit)) { + threadLock.release(); + } + + return isTerminated(); + } + + @Override + public void execute(Runnable task) { + if (task == null) { + throw new NullPointerException("task"); + } + + if (inEventLoop()) { + if (isShutdown()) { + reject(); + } + addTask(task); + wakeup(true); + } else { + synchronized (stateLock) { + if (state == 0) { + state = 1; + thread.start(); + } + } + addTask(task); + if (isShutdown() && removeTask(task)) { + reject(); + } + wakeup(false); + } + } + + private static void reject() { + throw new RejectedExecutionException("event loop shut down"); + } +} diff --git a/transport/src/main/java/io/netty/channel/socket/Worker.java b/transport/src/main/java/io/netty/channel/socket/Worker.java deleted file mode 100644 index 64dc433038..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/Worker.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.channel.socket; - -/** - * A {@link Worker} is responsible to dispatch IO operations - * - */ -public interface Worker extends Runnable { - - /** - * Execute the given {@link Runnable} in the IO-Thread. This may be now or later once the IO-Thread do some other work. - * - * @param task the {@link Runnable} to execute - */ - void executeInIoThread(Runnable task); -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index c65b6ff451..c6341a842d 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -15,19 +15,15 @@ */ package io.netty.channel.socket.nio; -import static io.netty.channel.Channels.*; - import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; import io.netty.channel.MessageEvent; -import io.netty.channel.socket.Worker; +import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.socket.nio.SendBufferPool.SendBuffer; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; -import io.netty.util.internal.DeadLockProofWorker; -import io.netty.util.internal.QueueFactory; import java.io.IOException; import java.net.ConnectException; @@ -44,13 +40,14 @@ import java.nio.channels.WritableByteChannel; import java.util.Iterator; import java.util.Queue; import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -abstract class AbstractNioWorker implements Worker { +import static io.netty.channel.Channels.*; + +abstract class AbstractNioWorker extends SingleThreadEventLoop { /** * Internal Netty logger. */ @@ -62,26 +59,10 @@ abstract class AbstractNioWorker implements Worker { static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization. - /** - * Executor used to execute {@link Runnable}s such as registration task. - */ - private final Executor executor; - - /** - * Boolean to indicate if this worker has been started. - */ - private boolean started; - - /** - * If this worker has been started thread will be a reference to the thread - * used when starting. i.e. the current thread when the run method is executed. - */ - protected volatile Thread thread; - /** * The NIO {@link Selector}. */ - protected volatile Selector selector; + protected final Selector selector; /** * Boolean that controls determines if a blocked Selector.select should @@ -101,43 +82,33 @@ abstract class AbstractNioWorker implements Worker { */ private final Object startStopLock = new Object(); - /** - * Queue of channel registration tasks. - */ - protected final Queue registerTaskQueue = QueueFactory.createQueue(Runnable.class); - - /** - * Queue of WriteTasks - */ - protected final Queue writeTaskQueue = QueueFactory.createQueue(Runnable.class); - - private final Queue eventQueue = QueueFactory.createQueue(Runnable.class); - - private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation protected final SendBufferPool sendBufferPool = new SendBufferPool(); - private final boolean allowShutdownOnIdle; - - AbstractNioWorker(Executor executor) { - this(executor, true); + protected AbstractNioWorker() { + selector = openSelector(); } - public AbstractNioWorker(Executor executor, boolean allowShutdownOnIdle) { - this.executor = executor; - this.allowShutdownOnIdle = allowShutdownOnIdle; - + protected AbstractNioWorker(ThreadFactory threadFactory) { + super(threadFactory); + selector = openSelector(); } - public void registerWithWorker(final Channel channel, final ChannelFuture future) { - final Selector selector = start(); + private static Selector openSelector() { + try { + return Selector.open(); + } catch (IOException e) { + throw new ChannelException("failed to open a new selector", e); + } + } + @Override + public void attach(final Channel channel, final ChannelFuture future) { try { if (channel instanceof NioServerSocketChannel) { final NioServerSocketChannel ch = (NioServerSocketChannel) channel; - registerTaskQueue.add(new Runnable() { - + execute(new Runnable() { @Override public void run() { try { @@ -151,14 +122,13 @@ abstract class AbstractNioWorker implements Worker { } else if (channel instanceof NioClientSocketChannel) { final NioClientSocketChannel clientChannel = (NioClientSocketChannel) channel; - registerTaskQueue.add(new Runnable() { - + execute(new Runnable() { @Override public void run() { try { try { clientChannel.getJdkChannel().register(selector, clientChannel.getRawInterestOps() | SelectionKey.OP_CONNECT, clientChannel); - } catch (ClosedChannelException e) { + } catch (ClosedChannelException ignored) { clientChannel.getWorker().close(clientChannel, succeededFuture(channel)); } int connectTimeout = channel.getConfig().getConnectTimeoutMillis(); @@ -172,8 +142,7 @@ abstract class AbstractNioWorker implements Worker { } }); } else if (channel instanceof AbstractNioChannel) { - registerTaskQueue.add(new Runnable() { - + execute(new Runnable() { @Override public void run() { try { @@ -195,56 +164,12 @@ abstract class AbstractNioWorker implements Worker { future.setFailure(t); fireExceptionCaught(channel, t); } - - } - - /** - * Start the {@link AbstractNioWorker} and return the {@link Selector} that will be used for the {@link AbstractNioChannel}'s when they get registered - * - * @return selector - */ - protected final Selector start() { - synchronized (startStopLock) { - if (!started && selector == null) { - // Open a selector if this worker didn't start yet. - try { - this.selector = Selector.open(); - } catch (Throwable t) { - throw new ChannelException("Failed to create a selector.", t); - } - - // Start the worker thread with the new Selector. - boolean success = false; - try { - DeadLockProofWorker.start(executor, this); - success = true; - } finally { - if (!success) { - // Release the Selector if the execution fails. - try { - selector.close(); - } catch (Throwable t) { - logger.warn("Failed to close a selector.", t); - } - this.selector = null; - // The method will return to the caller at this point. - } - } - } - - assert selector != null && selector.isOpen(); - - started = true; - } - return selector; } @Override - public void run() { - thread = Thread.currentThread(); + protected void run() { long lastConnectTimeoutCheckTimeNanos = System.nanoTime(); - boolean shutdown = false; Selector selector = this.selector; for (;;) { @@ -293,9 +218,7 @@ abstract class AbstractNioWorker implements Worker { } cancelledKeys = 0; - processRegisterTaskQueue(); - processEventQueue(); - processWriteTaskQueue(); + processTaskQueue(); processSelectedKeys(selector.selectedKeys()); // Handle connection timeout every 10 milliseconds approximately. @@ -311,33 +234,13 @@ abstract class AbstractNioWorker implements Worker { // connections are registered in a one-by-one manner instead of // concurrent manner. if (selector.keys().isEmpty()) { - if (shutdown || - executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) { - + if (isShutdown()) { synchronized (startStopLock) { - if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) { - started = false; - try { - selector.close(); - } catch (IOException e) { - logger.warn( - "Failed to close a selector.", e); - } finally { - this.selector = null; - } + if (!hasTasks() && selector.keys().isEmpty()) { break; - } else { - shutdown = false; } } - } else { - if (allowShutdownOnIdle) { - // Give one more second. - shutdown = true; - } } - } else { - shutdown = false; } } catch (Throwable t) { logger.warn( @@ -353,41 +256,20 @@ abstract class AbstractNioWorker implements Worker { } } } - + @Override - public void executeInIoThread(Runnable task) { - executeInIoThread(task, false); - } - - /** - * Execute the {@link Runnable} in a IO-Thread - * - * @param task the {@link Runnable} to execute - * @param alwaysAsync true if the {@link Runnable} should be executed in an async - * fashion even if the current Thread == IO Thread - */ - public void executeInIoThread(Runnable task, boolean alwaysAsync) { - if (!alwaysAsync && isIoThread()) { - task.run(); - } else { - start(); - boolean added = eventQueue.offer(task); - - assert added; - if (added) { - // wake up the selector to speed things - Selector selector = this.selector; - if (selector != null) { - selector.wakeup(); - } - } + protected void cleanup() { + try { + selector.close(); + } catch (IOException e) { + logger.warn( + "Failed to close a selector.", e); } - } - private void processRegisterTaskQueue() throws IOException { + private void processTaskQueue() throws IOException { for (;;) { - final Runnable task = registerTaskQueue.poll(); + final Runnable task = pollTask(); if (task == null) { break; } @@ -397,30 +279,6 @@ abstract class AbstractNioWorker implements Worker { } } - private void processWriteTaskQueue() throws IOException { - - for (;;) { - final Runnable task = writeTaskQueue.poll(); - if (task == null) { - break; - } - - task.run(); - cleanUpCancelledKeys(); - } - } - - private void processEventQueue() throws IOException { - for (;;) { - final Runnable task = eventQueue.poll(); - if (task == null) { - break; - } - task.run(); - cleanUpCancelledKeys(); - } - } - private void processSelectedKeys(Set selectedKeys) throws IOException { for (Iterator i = selectedKeys.iterator(); i.hasNext();) { SelectionKey k = i.next(); @@ -445,7 +303,7 @@ abstract class AbstractNioWorker implements Worker { connect(k); } - } catch (CancelledKeyException e) { + } catch (CancelledKeyException ignored) { close(k); } finally { if (removeKey) { @@ -609,10 +467,9 @@ abstract class AbstractNioWorker implements Worker { protected boolean scheduleWriteIfNecessary(final AbstractNioChannel channel) { - if (!isIoThread()) { + if (!inEventLoop()) { if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { - boolean offered = writeTaskQueue.offer(channel.writeTask); - assert offered; + execute(channel.writeTask); } final Selector workerSelector = selector; @@ -632,7 +489,7 @@ abstract class AbstractNioWorker implements Worker { boolean open = true; boolean addOpWrite = false; boolean removeOpWrite = false; - boolean iothread = isIoThread(); + boolean inEventLoop = inEventLoop(); long writtenBytes = 0; @@ -704,7 +561,7 @@ abstract class AbstractNioWorker implements Worker { buf = null; evt = null; future.setFailure(t); - if (iothread) { + if (inEventLoop) { fireExceptionCaught(channel, t); } else { fireExceptionCaughtLater(channel, t); @@ -731,21 +588,13 @@ abstract class AbstractNioWorker implements Worker { } } } - if (iothread) { + if (inEventLoop) { fireWriteComplete(channel, writtenBytes); } else { fireWriteCompleteLater(channel, writtenBytes); } } - /** - * Return true if the current executing thread is the same as the one that runs the {@link #run()} method - * - */ - boolean isIoThread() { - return Thread.currentThread() == thread; - } - protected void setOpWrite(AbstractNioChannel channel) { Selector selector = this.selector; SelectionKey key = channel.getJdkChannel().keyFor(selector); @@ -794,7 +643,7 @@ abstract class AbstractNioWorker implements Worker { public void close(NioServerSocketChannel channel, ChannelFuture future) { - boolean isIoThread = isIoThread(); + boolean inEventLoop = inEventLoop(); boolean bound = channel.isBound(); try { @@ -813,13 +662,13 @@ abstract class AbstractNioWorker implements Worker { if (channel.setClosed()) { future.setSuccess(); if (bound) { - if (isIoThread) { + if (inEventLoop) { fireChannelUnbound(channel); } else { fireChannelUnboundLater(channel); } } - if (isIoThread) { + if (inEventLoop) { fireChannelClosed(channel); } else { fireChannelClosedLater(channel); @@ -832,7 +681,7 @@ abstract class AbstractNioWorker implements Worker { } } catch (Throwable t) { future.setFailure(t); - if (isIoThread) { + if (inEventLoop) { fireExceptionCaught(channel, t); } else { fireExceptionCaughtLater(channel, t); @@ -844,7 +693,7 @@ abstract class AbstractNioWorker implements Worker { public void close(AbstractNioChannel channel, ChannelFuture future) { boolean connected = channel.isConnected(); boolean bound = channel.isBound(); - boolean iothread = isIoThread(); + boolean inEventLoop = inEventLoop(); try { channel.getJdkChannel().close(); @@ -853,14 +702,14 @@ abstract class AbstractNioWorker implements Worker { if (channel.setClosed()) { future.setSuccess(); if (connected) { - if (iothread) { + if (inEventLoop) { fireChannelDisconnected(channel); } else { fireChannelDisconnectedLater(channel); } } if (bound) { - if (iothread) { + if (inEventLoop) { fireChannelUnbound(channel); } else { fireChannelUnboundLater(channel); @@ -868,7 +717,7 @@ abstract class AbstractNioWorker implements Worker { } cleanUpWriteBuffer(channel); - if (iothread) { + if (inEventLoop) { fireChannelClosed(channel); } else { fireChannelClosedLater(channel); @@ -878,10 +727,9 @@ abstract class AbstractNioWorker implements Worker { } } catch (Throwable t) { future.setFailure(t); - if (iothread) { + if (inEventLoop) { fireExceptionCaught(channel, t); } else { - System.out.println(thread + "==" + channel.getWorker().thread); fireExceptionCaughtLater(channel, t); } } @@ -936,7 +784,7 @@ abstract class AbstractNioWorker implements Worker { } if (fireExceptionCaught) { - if (isIoThread()) { + if (inEventLoop()) { fireExceptionCaught(channel, cause); } else { fireExceptionCaughtLater(channel, cause); @@ -946,7 +794,7 @@ abstract class AbstractNioWorker implements Worker { public void setInterestOps(AbstractNioChannel channel, ChannelFuture future, int interestOps) { boolean changed = false; - boolean iothread = isIoThread(); + boolean inEventLoop = inEventLoop(); try { // interestOps can change at any time and at any thread. // Acquire a lock to avoid possible race condition. @@ -969,7 +817,7 @@ abstract class AbstractNioWorker implements Worker { future.setSuccess(); if (changed) { - if (iothread) { + if (inEventLoop) { fireChannelInterestChanged(channel); } else { fireChannelInterestChangedLater(channel); @@ -983,7 +831,7 @@ abstract class AbstractNioWorker implements Worker { case 0: if (channel.getRawInterestOps() != interestOps) { key.interestOps(interestOps); - if (!iothread && + if (!inEventLoop && wakenUp.compareAndSet(false, true)) { selector.wakeup(); } @@ -993,7 +841,7 @@ abstract class AbstractNioWorker implements Worker { case 1: case 2: if (channel.getRawInterestOps() != interestOps) { - if (iothread) { + if (inEventLoop) { key.interestOps(interestOps); changed = true; } else { @@ -1021,7 +869,7 @@ abstract class AbstractNioWorker implements Worker { future.setSuccess(); if (changed) { - if (iothread) { + if (inEventLoop) { fireChannelInterestChanged(channel); } else { fireChannelInterestChangedLater(channel); @@ -1031,14 +879,14 @@ abstract class AbstractNioWorker implements Worker { // setInterestOps() was called on a closed channel. ClosedChannelException cce = new ClosedChannelException(); future.setFailure(cce); - if (iothread) { + if (inEventLoop) { fireExceptionCaught(channel, cce); } else { fireExceptionCaughtLater(channel, cce); } } catch (Throwable t) { future.setFailure(t); - if (iothread) { + if (inEventLoop) { fireExceptionCaught(channel, t); } else { fireExceptionCaughtLater(channel, t); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioChannel.java deleted file mode 100644 index 1f6dd63977..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioChannel.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import io.netty.channel.Channel; - -public interface NioChannel extends Channel { - - /** - * Returns the {@link AbstractNioWorker} which handles the IO of the {@link Channel} - * - */ - AbstractNioWorker getWorker(); -} diff --git a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java new file mode 100644 index 0000000000..08802e835b --- /dev/null +++ b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java @@ -0,0 +1,162 @@ +package io.netty.channel; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.*; + +public class SingleThreadEventLoopTest { + + private SingleThreadEventLoopImpl loop; + + @Before + public void newEventLoop() { + loop = new SingleThreadEventLoopImpl(); + } + + @After + public void stopEventLoop() { + if (!loop.isShutdown()) { + loop.shutdown(); + } + while (!loop.isTerminated()) { + try { + loop.awaitTermination(1, TimeUnit.DAYS); + } catch (InterruptedException e) { + // Ignore + } + } + assertEquals(1, loop.cleanedUp.get()); + } + + @Test + public void shutdownBeforeStart() throws Exception { + loop.shutdown(); + } + + @Test + public void shutdownAfterStart() throws Exception { + final AtomicBoolean interrupted = new AtomicBoolean(); + final CountDownLatch latch = new CountDownLatch(2); + loop.execute(new Runnable() { + @Override + public void run() { + latch.countDown(); + while (latch.getCount() > 0) { + try { + latch.await(); + } catch (InterruptedException ignored) { + interrupted.set(true); + } + } + } + }); + + // Wait for the event loop thread to start. + while (latch.getCount() >= 2) { + Thread.yield(); + } + + // Request the event loop thread to stop - it will call wakeup(false) to interrupt the thread. + loop.shutdown(); + + // Make the task terminate by itself. + latch.countDown(); + + // Wait until the event loop is terminated. + while (!loop.isTerminated()) { + loop.awaitTermination(1, TimeUnit.DAYS); + } + + // Make sure loop.shutdown() above triggered wakeup(). + assertTrue(interrupted.get()); + } + + @Test + public void shutdownWithPendingTasks() throws Exception { + final int NUM_TASKS = 3; + final AtomicInteger ranTasks = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(1); + final Runnable task = new Runnable() { + @Override + public void run() { + ranTasks.incrementAndGet(); + while (latch.getCount() > 0) { + try { + latch.await(); + } catch (InterruptedException e) { + // Ignored + } + } + } + }; + + for (int i = 0; i < NUM_TASKS; i ++) { + loop.execute(task); + } + + // At this point, the first task should be running and stuck at latch.await(). + while (ranTasks.get() == 0) { + Thread.yield(); + } + assertEquals(1, ranTasks.get()); + + // Shut down the event loop to test if the other tasks are run before termination. + loop.shutdown(); + + // Let the other tasks run. + latch.countDown(); + + // Wait until the event loop is terminated. + while (!loop.isTerminated()) { + loop.awaitTermination(1, TimeUnit.DAYS); + } + + // Make sure loop.shutdown() above triggered wakeup(). + assertEquals(NUM_TASKS, ranTasks.get()); + } + + private static class SingleThreadEventLoopImpl extends SingleThreadEventLoop { + + final AtomicInteger cleanedUp = new AtomicInteger(); + + @Override + protected void run() { + for (;;) { + Runnable task; + try { + task = takeTask(); + task.run(); + } catch (InterruptedException e) { + // Waken up by interruptThread() + } + + if (isShutdown() && peekTask() == null) { + break; + } + } + } + + protected void cleanup() { + cleanedUp.incrementAndGet(); + } + + @Override + protected void wakeup(boolean inEventLoop) { + if (!inEventLoop) { + interruptThread(); + } + } + + @Override + public void attach(Channel channel, ChannelFuture future) { + // Untested + } + } +}