2012-06-04 22:31:44 +02:00
|
|
|
/*
|
|
|
|
* Copyright 2012 The Netty Project
|
|
|
|
*
|
|
|
|
* The Netty Project licenses this file to you under the Apache License,
|
|
|
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
|
|
* with the License. You may obtain a copy of the License at:
|
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
|
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
|
|
* License for the specific language governing permissions and limitations
|
|
|
|
* under the License.
|
|
|
|
*/
|
2012-04-03 15:03:04 +02:00
|
|
|
package io.netty.channel;
|
|
|
|
|
2013-12-08 06:12:10 +01:00
|
|
|
import ch.qos.logback.classic.Logger;
|
|
|
|
import ch.qos.logback.classic.spi.ILoggingEvent;
|
|
|
|
import ch.qos.logback.core.Appender;
|
2013-03-21 09:48:10 +01:00
|
|
|
import io.netty.channel.local.LocalChannel;
|
2013-04-27 09:39:19 +02:00
|
|
|
import io.netty.util.concurrent.EventExecutor;
|
Reduce the chance of RejectedExecutionException
When a Netty application shuts down, a user often sees a REE
(RejectedExecutionException).
A REE is raised due to various reasons we don't have control over, such
as:
- A client connects to a server while the server is shutting down.
- An event is triggered for a closed Channel while its event loop is
also shutting down. Some of them are:
- channelDeregistered (triggered after a channel is closed)
- freeIn/OutboundBuffer (triggered after channelDeregistered)
- userEventTriggered (triggered anytime)
To address this issue, a new method called confirmShutdown() has been
added to SingleThreadEventExecutor. After a user calls shutdown(),
confirmShutdown() runs any remaining tasks in the task queue and ensures
no events are triggered for last 2 seconds. If any task are added to
the task queue before 2 seconds passes, confirmShutdown() prevents the
event loop from terminating by returning false.
Now that SingleThreadEventExecutor needs to accept tasks even after
shutdown(), its execute() method only rejects the task after the event
loop is terminated (i.e. isTerminated() returns true.) Except that,
there's no change in semantics.
SingleThreadEventExecutor also checks if its subclass called
confirmShutdown() in its run() implementation, so that Netty developers
can make sure they shut down their event loop impementation correctly.
It also fixes a bug in AioSocketChannel, revealed by delayed shutdown,
where an inboundBufferUpdated() event is triggered on a closed Channel
with deallocated buffers.
Caveats:
Because SingleThreadEventExecutor.takeTask() does not have a notion of
timeout, confirmShutdown() adds a dummy task (WAKEUP_TASK) to wake up
takeTask() immediately and instead sleeps hard-coded 100ms. I'll
address this issue later by modifying takeTask() times out dynamically.
Miscellaneous changes:
SingleThreadEventExecutor.wakeup() now has the default implementation.
Instead of interrupting the current thread, it simply adds a dummy task
(WAKEUP_TASK) to the task queue, which is more elegant and efficient.
NioEventLoop is the only implementation that overrides it. All other
implementations' wakeup()s were removed thanks to this change.
2012-11-22 12:45:49 +01:00
|
|
|
import org.junit.After;
|
|
|
|
import org.junit.Before;
|
|
|
|
import org.junit.Test;
|
2013-12-08 06:12:10 +01:00
|
|
|
import org.slf4j.LoggerFactory;
|
2012-04-03 15:03:04 +02:00
|
|
|
|
2013-12-08 06:12:10 +01:00
|
|
|
import java.util.ArrayList;
|
|
|
|
import java.util.Iterator;
|
|
|
|
import java.util.List;
|
2012-05-11 13:19:57 +02:00
|
|
|
import java.util.Queue;
|
2012-04-03 15:03:04 +02:00
|
|
|
import java.util.concurrent.CountDownLatch;
|
2013-03-22 01:00:38 +01:00
|
|
|
import java.util.concurrent.ExecutionException;
|
2012-06-04 20:32:12 +02:00
|
|
|
import java.util.concurrent.Executors;
|
2012-05-11 13:19:57 +02:00
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
2013-03-21 09:48:10 +01:00
|
|
|
import java.util.concurrent.RejectedExecutionException;
|
2012-05-11 13:19:57 +02:00
|
|
|
import java.util.concurrent.ScheduledFuture;
|
2012-04-03 15:03:04 +02:00
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
2012-05-11 13:19:57 +02:00
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
2012-04-03 15:03:04 +02:00
|
|
|
|
2013-03-21 09:48:10 +01:00
|
|
|
import static org.hamcrest.CoreMatchers.*;
|
Reduce the chance of RejectedExecutionException
When a Netty application shuts down, a user often sees a REE
(RejectedExecutionException).
A REE is raised due to various reasons we don't have control over, such
as:
- A client connects to a server while the server is shutting down.
- An event is triggered for a closed Channel while its event loop is
also shutting down. Some of them are:
- channelDeregistered (triggered after a channel is closed)
- freeIn/OutboundBuffer (triggered after channelDeregistered)
- userEventTriggered (triggered anytime)
To address this issue, a new method called confirmShutdown() has been
added to SingleThreadEventExecutor. After a user calls shutdown(),
confirmShutdown() runs any remaining tasks in the task queue and ensures
no events are triggered for last 2 seconds. If any task are added to
the task queue before 2 seconds passes, confirmShutdown() prevents the
event loop from terminating by returning false.
Now that SingleThreadEventExecutor needs to accept tasks even after
shutdown(), its execute() method only rejects the task after the event
loop is terminated (i.e. isTerminated() returns true.) Except that,
there's no change in semantics.
SingleThreadEventExecutor also checks if its subclass called
confirmShutdown() in its run() implementation, so that Netty developers
can make sure they shut down their event loop impementation correctly.
It also fixes a bug in AioSocketChannel, revealed by delayed shutdown,
where an inboundBufferUpdated() event is triggered on a closed Channel
with deallocated buffers.
Caveats:
Because SingleThreadEventExecutor.takeTask() does not have a notion of
timeout, confirmShutdown() adds a dummy task (WAKEUP_TASK) to wake up
takeTask() immediately and instead sleeps hard-coded 100ms. I'll
address this issue later by modifying takeTask() times out dynamically.
Miscellaneous changes:
SingleThreadEventExecutor.wakeup() now has the default implementation.
Instead of interrupting the current thread, it simply adds a dummy task
(WAKEUP_TASK) to the task queue, which is more elegant and efficient.
NioEventLoop is the only implementation that overrides it. All other
implementations' wakeup()s were removed thanks to this change.
2012-11-22 12:45:49 +01:00
|
|
|
import static org.junit.Assert.*;
|
2012-04-03 15:03:04 +02:00
|
|
|
|
|
|
|
public class SingleThreadEventLoopTest {
|
|
|
|
|
2013-04-27 09:39:19 +02:00
|
|
|
private static final Runnable NOOP = new Runnable() {
|
|
|
|
@Override
|
|
|
|
public void run() { }
|
|
|
|
};
|
|
|
|
|
2013-03-22 01:00:38 +01:00
|
|
|
private SingleThreadEventLoopA loopA;
|
|
|
|
private SingleThreadEventLoopB loopB;
|
2012-04-03 15:03:04 +02:00
|
|
|
|
|
|
|
@Before
|
|
|
|
public void newEventLoop() {
|
2013-03-22 01:00:38 +01:00
|
|
|
loopA = new SingleThreadEventLoopA();
|
|
|
|
loopB = new SingleThreadEventLoopB();
|
2012-04-03 15:03:04 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@After
|
|
|
|
public void stopEventLoop() {
|
2013-04-27 09:39:19 +02:00
|
|
|
if (!loopA.isShuttingDown()) {
|
|
|
|
loopA.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
|
2012-04-03 15:03:04 +02:00
|
|
|
}
|
2013-04-27 09:39:19 +02:00
|
|
|
if (!loopB.isShuttingDown()) {
|
|
|
|
loopB.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
|
2013-03-22 01:00:38 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
while (!loopA.isTerminated()) {
|
2012-04-03 15:03:04 +02:00
|
|
|
try {
|
2013-03-22 01:00:38 +01:00
|
|
|
loopA.awaitTermination(1, TimeUnit.DAYS);
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
// Ignore
|
|
|
|
}
|
|
|
|
}
|
|
|
|
assertEquals(1, loopA.cleanedUp.get());
|
|
|
|
|
|
|
|
while (!loopB.isTerminated()) {
|
|
|
|
try {
|
|
|
|
loopB.awaitTermination(1, TimeUnit.DAYS);
|
2012-04-03 15:03:04 +02:00
|
|
|
} catch (InterruptedException e) {
|
|
|
|
// Ignore
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Test
|
2013-04-27 09:39:19 +02:00
|
|
|
@SuppressWarnings("deprecation")
|
2012-04-03 15:03:04 +02:00
|
|
|
public void shutdownBeforeStart() throws Exception {
|
2013-03-22 01:00:38 +01:00
|
|
|
loopA.shutdown();
|
2013-04-27 09:39:19 +02:00
|
|
|
assertRejection(loopA);
|
2012-04-03 15:03:04 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Test
|
2013-04-27 09:39:19 +02:00
|
|
|
@SuppressWarnings("deprecation")
|
2012-04-03 15:03:04 +02:00
|
|
|
public void shutdownAfterStart() throws Exception {
|
2012-07-08 14:28:56 +02:00
|
|
|
final CountDownLatch latch = new CountDownLatch(1);
|
2013-03-22 01:00:38 +01:00
|
|
|
loopA.execute(new Runnable() {
|
2012-04-03 15:03:04 +02:00
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
latch.countDown();
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
// Wait for the event loop thread to start.
|
2012-07-08 14:28:56 +02:00
|
|
|
latch.await();
|
2012-04-03 15:03:04 +02:00
|
|
|
|
Reduce the chance of RejectedExecutionException
When a Netty application shuts down, a user often sees a REE
(RejectedExecutionException).
A REE is raised due to various reasons we don't have control over, such
as:
- A client connects to a server while the server is shutting down.
- An event is triggered for a closed Channel while its event loop is
also shutting down. Some of them are:
- channelDeregistered (triggered after a channel is closed)
- freeIn/OutboundBuffer (triggered after channelDeregistered)
- userEventTriggered (triggered anytime)
To address this issue, a new method called confirmShutdown() has been
added to SingleThreadEventExecutor. After a user calls shutdown(),
confirmShutdown() runs any remaining tasks in the task queue and ensures
no events are triggered for last 2 seconds. If any task are added to
the task queue before 2 seconds passes, confirmShutdown() prevents the
event loop from terminating by returning false.
Now that SingleThreadEventExecutor needs to accept tasks even after
shutdown(), its execute() method only rejects the task after the event
loop is terminated (i.e. isTerminated() returns true.) Except that,
there's no change in semantics.
SingleThreadEventExecutor also checks if its subclass called
confirmShutdown() in its run() implementation, so that Netty developers
can make sure they shut down their event loop impementation correctly.
It also fixes a bug in AioSocketChannel, revealed by delayed shutdown,
where an inboundBufferUpdated() event is triggered on a closed Channel
with deallocated buffers.
Caveats:
Because SingleThreadEventExecutor.takeTask() does not have a notion of
timeout, confirmShutdown() adds a dummy task (WAKEUP_TASK) to wake up
takeTask() immediately and instead sleeps hard-coded 100ms. I'll
address this issue later by modifying takeTask() times out dynamically.
Miscellaneous changes:
SingleThreadEventExecutor.wakeup() now has the default implementation.
Instead of interrupting the current thread, it simply adds a dummy task
(WAKEUP_TASK) to the task queue, which is more elegant and efficient.
NioEventLoop is the only implementation that overrides it. All other
implementations' wakeup()s were removed thanks to this change.
2012-11-22 12:45:49 +01:00
|
|
|
// Request the event loop thread to stop.
|
2013-03-22 01:00:38 +01:00
|
|
|
loopA.shutdown();
|
2013-04-27 09:39:19 +02:00
|
|
|
assertRejection(loopA);
|
2012-04-03 15:03:04 +02:00
|
|
|
|
2013-03-22 01:00:38 +01:00
|
|
|
assertTrue(loopA.isShutdown());
|
Reduce the chance of RejectedExecutionException
When a Netty application shuts down, a user often sees a REE
(RejectedExecutionException).
A REE is raised due to various reasons we don't have control over, such
as:
- A client connects to a server while the server is shutting down.
- An event is triggered for a closed Channel while its event loop is
also shutting down. Some of them are:
- channelDeregistered (triggered after a channel is closed)
- freeIn/OutboundBuffer (triggered after channelDeregistered)
- userEventTriggered (triggered anytime)
To address this issue, a new method called confirmShutdown() has been
added to SingleThreadEventExecutor. After a user calls shutdown(),
confirmShutdown() runs any remaining tasks in the task queue and ensures
no events are triggered for last 2 seconds. If any task are added to
the task queue before 2 seconds passes, confirmShutdown() prevents the
event loop from terminating by returning false.
Now that SingleThreadEventExecutor needs to accept tasks even after
shutdown(), its execute() method only rejects the task after the event
loop is terminated (i.e. isTerminated() returns true.) Except that,
there's no change in semantics.
SingleThreadEventExecutor also checks if its subclass called
confirmShutdown() in its run() implementation, so that Netty developers
can make sure they shut down their event loop impementation correctly.
It also fixes a bug in AioSocketChannel, revealed by delayed shutdown,
where an inboundBufferUpdated() event is triggered on a closed Channel
with deallocated buffers.
Caveats:
Because SingleThreadEventExecutor.takeTask() does not have a notion of
timeout, confirmShutdown() adds a dummy task (WAKEUP_TASK) to wake up
takeTask() immediately and instead sleeps hard-coded 100ms. I'll
address this issue later by modifying takeTask() times out dynamically.
Miscellaneous changes:
SingleThreadEventExecutor.wakeup() now has the default implementation.
Instead of interrupting the current thread, it simply adds a dummy task
(WAKEUP_TASK) to the task queue, which is more elegant and efficient.
NioEventLoop is the only implementation that overrides it. All other
implementations' wakeup()s were removed thanks to this change.
2012-11-22 12:45:49 +01:00
|
|
|
|
2012-04-03 15:03:04 +02:00
|
|
|
// Wait until the event loop is terminated.
|
2013-03-22 01:00:38 +01:00
|
|
|
while (!loopA.isTerminated()) {
|
|
|
|
loopA.awaitTermination(1, TimeUnit.DAYS);
|
2012-04-03 15:03:04 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2013-04-27 09:39:19 +02:00
|
|
|
private static void assertRejection(EventExecutor loop) {
|
|
|
|
try {
|
|
|
|
loop.execute(NOOP);
|
|
|
|
fail("A task must be rejected after shutdown() is called.");
|
|
|
|
} catch (RejectedExecutionException e) {
|
|
|
|
// Expected
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-05-11 13:19:57 +02:00
|
|
|
@Test
|
2013-03-22 01:00:38 +01:00
|
|
|
public void scheduleTaskA() throws Exception {
|
|
|
|
testScheduleTask(loopA);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Test
|
|
|
|
public void scheduleTaskB() throws Exception {
|
|
|
|
testScheduleTask(loopB);
|
|
|
|
}
|
|
|
|
|
|
|
|
private static void testScheduleTask(EventLoop loopA) throws InterruptedException, ExecutionException {
|
2012-05-11 13:19:57 +02:00
|
|
|
long startTime = System.nanoTime();
|
|
|
|
final AtomicLong endTime = new AtomicLong();
|
2013-03-22 01:00:38 +01:00
|
|
|
loopA.schedule(new Runnable() {
|
2012-05-11 13:19:57 +02:00
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
endTime.set(System.nanoTime());
|
|
|
|
}
|
|
|
|
}, 500, TimeUnit.MILLISECONDS).get();
|
|
|
|
assertTrue(endTime.get() - startTime >= TimeUnit.MILLISECONDS.toNanos(500));
|
|
|
|
}
|
|
|
|
|
|
|
|
@Test
|
2013-03-22 01:00:38 +01:00
|
|
|
public void scheduleTaskAtFixedRateA() throws Exception {
|
|
|
|
testScheduleTaskAtFixedRate(loopA);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Test
|
|
|
|
public void scheduleTaskAtFixedRateB() throws Exception {
|
|
|
|
testScheduleTaskAtFixedRate(loopB);
|
|
|
|
}
|
|
|
|
|
|
|
|
private static void testScheduleTaskAtFixedRate(EventLoop loopA) throws InterruptedException {
|
2012-05-11 13:19:57 +02:00
|
|
|
final Queue<Long> timestamps = new LinkedBlockingQueue<Long>();
|
2013-03-22 01:00:38 +01:00
|
|
|
ScheduledFuture<?> f = loopA.scheduleAtFixedRate(new Runnable() {
|
2012-05-11 13:19:57 +02:00
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
timestamps.add(System.nanoTime());
|
|
|
|
try {
|
|
|
|
Thread.sleep(50);
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
// Ignore
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}, 100, 100, TimeUnit.MILLISECONDS);
|
|
|
|
Thread.sleep(550);
|
|
|
|
assertTrue(f.cancel(true));
|
|
|
|
assertEquals(5, timestamps.size());
|
|
|
|
|
|
|
|
// Check if the task was run without a lag.
|
|
|
|
Long previousTimestamp = null;
|
|
|
|
for (Long t: timestamps) {
|
|
|
|
if (previousTimestamp == null) {
|
|
|
|
previousTimestamp = t;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
assertTrue(t.longValue() - previousTimestamp.longValue() >= TimeUnit.MILLISECONDS.toNanos(90));
|
|
|
|
previousTimestamp = t;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Test
|
2013-03-22 01:00:38 +01:00
|
|
|
public void scheduleLaggyTaskAtFixedRateA() throws Exception {
|
|
|
|
testScheduleLaggyTaskAtFixedRate(loopA);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Test
|
|
|
|
public void scheduleLaggyTaskAtFixedRateB() throws Exception {
|
|
|
|
testScheduleLaggyTaskAtFixedRate(loopB);
|
|
|
|
}
|
|
|
|
|
|
|
|
private static void testScheduleLaggyTaskAtFixedRate(EventLoop loopA) throws InterruptedException {
|
2012-05-11 13:19:57 +02:00
|
|
|
final Queue<Long> timestamps = new LinkedBlockingQueue<Long>();
|
2013-03-22 01:00:38 +01:00
|
|
|
ScheduledFuture<?> f = loopA.scheduleAtFixedRate(new Runnable() {
|
2012-05-11 13:19:57 +02:00
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
boolean empty = timestamps.isEmpty();
|
|
|
|
timestamps.add(System.nanoTime());
|
|
|
|
if (empty) {
|
|
|
|
try {
|
2013-02-05 08:27:37 +01:00
|
|
|
Thread.sleep(401);
|
2012-05-11 13:19:57 +02:00
|
|
|
} catch (InterruptedException e) {
|
|
|
|
// Ignore
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}, 100, 100, TimeUnit.MILLISECONDS);
|
|
|
|
Thread.sleep(550);
|
|
|
|
assertTrue(f.cancel(true));
|
|
|
|
assertEquals(5, timestamps.size());
|
|
|
|
|
|
|
|
// Check if the task was run with lag.
|
|
|
|
int i = 0;
|
|
|
|
Long previousTimestamp = null;
|
|
|
|
for (Long t: timestamps) {
|
|
|
|
if (previousTimestamp == null) {
|
|
|
|
previousTimestamp = t;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
long diff = t.longValue() - previousTimestamp.longValue();
|
|
|
|
if (i == 0) {
|
|
|
|
assertTrue(diff >= TimeUnit.MILLISECONDS.toNanos(400));
|
|
|
|
} else {
|
|
|
|
assertTrue(diff <= TimeUnit.MILLISECONDS.toNanos(10));
|
|
|
|
}
|
|
|
|
previousTimestamp = t;
|
|
|
|
i ++;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Test
|
2013-03-22 01:00:38 +01:00
|
|
|
public void scheduleTaskWithFixedDelayA() throws Exception {
|
|
|
|
testScheduleTaskWithFixedDelay(loopA);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Test
|
|
|
|
public void scheduleTaskWithFixedDelayB() throws Exception {
|
|
|
|
testScheduleTaskWithFixedDelay(loopB);
|
|
|
|
}
|
|
|
|
|
|
|
|
private static void testScheduleTaskWithFixedDelay(EventLoop loopA) throws InterruptedException {
|
2012-05-11 13:19:57 +02:00
|
|
|
final Queue<Long> timestamps = new LinkedBlockingQueue<Long>();
|
2013-03-22 01:00:38 +01:00
|
|
|
ScheduledFuture<?> f = loopA.scheduleWithFixedDelay(new Runnable() {
|
2012-05-11 13:19:57 +02:00
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
timestamps.add(System.nanoTime());
|
|
|
|
try {
|
2013-02-05 08:27:37 +01:00
|
|
|
Thread.sleep(51);
|
2012-05-11 13:19:57 +02:00
|
|
|
} catch (InterruptedException e) {
|
|
|
|
// Ignore
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}, 100, 100, TimeUnit.MILLISECONDS);
|
|
|
|
Thread.sleep(500);
|
|
|
|
assertTrue(f.cancel(true));
|
|
|
|
assertEquals(3, timestamps.size());
|
|
|
|
|
|
|
|
// Check if the task was run without a lag.
|
|
|
|
Long previousTimestamp = null;
|
|
|
|
for (Long t: timestamps) {
|
|
|
|
if (previousTimestamp == null) {
|
|
|
|
previousTimestamp = t;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
assertTrue(t.longValue() - previousTimestamp.longValue() >= TimeUnit.MILLISECONDS.toNanos(150));
|
|
|
|
previousTimestamp = t;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-04-03 15:03:04 +02:00
|
|
|
@Test
|
2013-04-27 09:39:19 +02:00
|
|
|
@SuppressWarnings("deprecation")
|
2012-04-03 15:03:04 +02:00
|
|
|
public void shutdownWithPendingTasks() throws Exception {
|
|
|
|
final int NUM_TASKS = 3;
|
|
|
|
final AtomicInteger ranTasks = new AtomicInteger();
|
|
|
|
final CountDownLatch latch = new CountDownLatch(1);
|
|
|
|
final Runnable task = new Runnable() {
|
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
ranTasks.incrementAndGet();
|
|
|
|
while (latch.getCount() > 0) {
|
|
|
|
try {
|
|
|
|
latch.await();
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
// Ignored
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
for (int i = 0; i < NUM_TASKS; i ++) {
|
2013-03-22 01:00:38 +01:00
|
|
|
loopA.execute(task);
|
2012-04-03 15:03:04 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// At this point, the first task should be running and stuck at latch.await().
|
|
|
|
while (ranTasks.get() == 0) {
|
|
|
|
Thread.yield();
|
|
|
|
}
|
|
|
|
assertEquals(1, ranTasks.get());
|
|
|
|
|
|
|
|
// Shut down the event loop to test if the other tasks are run before termination.
|
2013-03-22 01:00:38 +01:00
|
|
|
loopA.shutdown();
|
2012-04-03 15:03:04 +02:00
|
|
|
|
|
|
|
// Let the other tasks run.
|
|
|
|
latch.countDown();
|
|
|
|
|
|
|
|
// Wait until the event loop is terminated.
|
2013-03-22 01:00:38 +01:00
|
|
|
while (!loopA.isTerminated()) {
|
|
|
|
loopA.awaitTermination(1, TimeUnit.DAYS);
|
2012-04-03 15:03:04 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// Make sure loop.shutdown() above triggered wakeup().
|
|
|
|
assertEquals(NUM_TASKS, ranTasks.get());
|
|
|
|
}
|
|
|
|
|
2013-03-21 09:48:10 +01:00
|
|
|
@Test(timeout = 10000)
|
2013-04-27 09:39:19 +02:00
|
|
|
@SuppressWarnings("deprecation")
|
|
|
|
public void testRegistrationAfterShutdown() throws Exception {
|
2013-03-22 01:00:38 +01:00
|
|
|
loopA.shutdown();
|
|
|
|
ChannelFuture f = loopA.register(new LocalChannel());
|
2013-03-21 09:48:10 +01:00
|
|
|
f.awaitUninterruptibly();
|
|
|
|
assertFalse(f.isSuccess());
|
|
|
|
assertThat(f.cause(), is(instanceOf(RejectedExecutionException.class)));
|
|
|
|
}
|
|
|
|
|
|
|
|
@Test(timeout = 10000)
|
2013-04-27 09:39:19 +02:00
|
|
|
@SuppressWarnings("deprecation")
|
|
|
|
public void testRegistrationAfterShutdown2() throws Exception {
|
2013-03-22 01:00:38 +01:00
|
|
|
loopA.shutdown();
|
2013-03-21 09:48:10 +01:00
|
|
|
final CountDownLatch latch = new CountDownLatch(1);
|
|
|
|
Channel ch = new LocalChannel();
|
|
|
|
ChannelPromise promise = ch.newPromise();
|
|
|
|
promise.addListener(new ChannelFutureListener() {
|
|
|
|
@Override
|
|
|
|
public void operationComplete(ChannelFuture future) throws Exception {
|
|
|
|
latch.countDown();
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
2013-12-08 06:12:10 +01:00
|
|
|
// Disable logging temporarily.
|
|
|
|
Logger root = (Logger) LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME);
|
|
|
|
List<Appender<ILoggingEvent>> appenders = new ArrayList<Appender<ILoggingEvent>>();
|
|
|
|
for (Iterator<Appender<ILoggingEvent>> i = root.iteratorForAppenders(); i.hasNext();) {
|
|
|
|
Appender<ILoggingEvent> a = i.next();
|
|
|
|
appenders.add(a);
|
|
|
|
root.detachAppender(a);
|
|
|
|
}
|
2013-03-21 09:48:10 +01:00
|
|
|
|
2013-12-08 06:12:10 +01:00
|
|
|
try {
|
|
|
|
ChannelFuture f = loopA.register(ch, promise);
|
|
|
|
f.awaitUninterruptibly();
|
|
|
|
assertFalse(f.isSuccess());
|
|
|
|
assertThat(f.cause(), is(instanceOf(RejectedExecutionException.class)));
|
|
|
|
|
|
|
|
// Ensure the listener was notified.
|
|
|
|
assertFalse(latch.await(1, TimeUnit.SECONDS));
|
|
|
|
assertFalse(ch.isOpen());
|
|
|
|
} finally {
|
|
|
|
for (Appender<ILoggingEvent> a: appenders) {
|
|
|
|
root.addAppender(a);
|
|
|
|
}
|
|
|
|
}
|
2013-03-21 09:48:10 +01:00
|
|
|
}
|
|
|
|
|
2013-04-27 09:39:19 +02:00
|
|
|
@Test(timeout = 5000)
|
|
|
|
public void testGracefulShutdownQuietPeriod() throws Exception {
|
|
|
|
loopA.shutdownGracefully(1, Integer.MAX_VALUE, TimeUnit.SECONDS);
|
|
|
|
// Keep Scheduling tasks for another 2 seconds.
|
|
|
|
for (int i = 0; i < 20; i ++) {
|
|
|
|
Thread.sleep(100);
|
|
|
|
loopA.execute(NOOP);
|
|
|
|
}
|
|
|
|
|
|
|
|
long startTime = System.nanoTime();
|
|
|
|
|
|
|
|
assertThat(loopA.isShuttingDown(), is(true));
|
|
|
|
assertThat(loopA.isShutdown(), is(false));
|
|
|
|
|
|
|
|
while (!loopA.isTerminated()) {
|
|
|
|
loopA.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
|
|
|
|
}
|
|
|
|
|
|
|
|
assertTrue(System.nanoTime() - startTime >= TimeUnit.SECONDS.toNanos(1));
|
|
|
|
}
|
|
|
|
|
|
|
|
@Test(timeout = 5000)
|
|
|
|
public void testGracefulShutdownTimeout() throws Exception {
|
|
|
|
loopA.shutdownGracefully(2, 2, TimeUnit.SECONDS);
|
|
|
|
// Keep Scheduling tasks for another 3 seconds.
|
|
|
|
// Submitted tasks must be rejected after 2 second timeout.
|
|
|
|
for (int i = 0; i < 10; i ++) {
|
|
|
|
Thread.sleep(100);
|
|
|
|
loopA.execute(NOOP);
|
|
|
|
}
|
|
|
|
|
|
|
|
try {
|
|
|
|
for (int i = 0; i < 20; i ++) {
|
|
|
|
Thread.sleep(100);
|
|
|
|
loopA.execute(NOOP);
|
|
|
|
}
|
|
|
|
fail("shutdownGracefully() must reject a task after timeout.");
|
|
|
|
} catch (RejectedExecutionException e) {
|
|
|
|
// Expected
|
|
|
|
}
|
|
|
|
|
|
|
|
assertThat(loopA.isShuttingDown(), is(true));
|
|
|
|
assertThat(loopA.isShutdown(), is(true));
|
|
|
|
}
|
|
|
|
|
2013-03-22 01:00:38 +01:00
|
|
|
private static class SingleThreadEventLoopA extends SingleThreadEventLoop {
|
2012-04-03 15:03:04 +02:00
|
|
|
|
|
|
|
final AtomicInteger cleanedUp = new AtomicInteger();
|
|
|
|
|
2013-03-22 01:00:38 +01:00
|
|
|
SingleThreadEventLoopA() {
|
2013-04-27 09:39:19 +02:00
|
|
|
super(null, Executors.defaultThreadFactory(), true);
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
|
2012-04-03 15:03:04 +02:00
|
|
|
@Override
|
|
|
|
protected void run() {
|
|
|
|
for (;;) {
|
2013-04-27 09:39:19 +02:00
|
|
|
Runnable task = takeTask();
|
|
|
|
if (task != null) {
|
2012-04-03 15:03:04 +02:00
|
|
|
task.run();
|
2013-04-27 09:39:19 +02:00
|
|
|
updateLastExecutionTime();
|
2012-04-03 15:03:04 +02:00
|
|
|
}
|
|
|
|
|
2013-04-27 09:39:19 +02:00
|
|
|
if (confirmShutdown()) {
|
2012-04-03 15:03:04 +02:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-05-11 13:19:57 +02:00
|
|
|
@Override
|
2012-04-03 15:03:04 +02:00
|
|
|
protected void cleanup() {
|
|
|
|
cleanedUp.incrementAndGet();
|
|
|
|
}
|
|
|
|
}
|
2013-03-22 01:00:38 +01:00
|
|
|
|
|
|
|
private static class SingleThreadEventLoopB extends SingleThreadEventLoop {
|
|
|
|
|
|
|
|
SingleThreadEventLoopB() {
|
2013-04-27 09:39:19 +02:00
|
|
|
super(null, Executors.defaultThreadFactory(), false);
|
2013-03-22 01:00:38 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
protected void run() {
|
|
|
|
for (;;) {
|
|
|
|
try {
|
2013-03-22 06:33:47 +01:00
|
|
|
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(delayNanos(System.nanoTime())));
|
2013-03-22 01:00:38 +01:00
|
|
|
} catch (InterruptedException e) {
|
|
|
|
// Waken up by interruptThread()
|
|
|
|
}
|
|
|
|
|
|
|
|
runAllTasks();
|
|
|
|
|
2013-04-27 09:39:19 +02:00
|
|
|
if (confirmShutdown()) {
|
2013-03-22 01:00:38 +01:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
protected void wakeup(boolean inEventLoop) {
|
|
|
|
interruptThread();
|
|
|
|
}
|
|
|
|
}
|
2012-04-03 15:03:04 +02:00
|
|
|
}
|