fix bug: scheduled tasks may not be executed (#9980)

Motivation:

If there was always a task in the taskQueue of GlobalEvenExecutor, scheduled tasks in the
scheduledTaskQueue will never be executed.

Related to  #1614

Modifications:

fix bug in GlobalEventExecutor#takeTask

Result:

fix bug
This commit is contained in:
Ruwei 2020-01-31 17:57:38 +08:00 committed by Norman Maurer
parent 7413372c01
commit a6393c3d01
3 changed files with 144 additions and 4 deletions

View File

@ -93,7 +93,7 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im
return task;
} else {
long delayNanos = scheduledTask.delayNanos();
Runnable task;
Runnable task = null;
if (delayNanos > 0) {
try {
task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
@ -101,11 +101,12 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im
// Waken up.
return null;
}
} else {
task = taskQueue.poll();
}
if (task == null) {
// We need to fetch the scheduled tasks now as otherwise there may be a chance that
// scheduled tasks are never executed if there is always one task in the taskQueue.
// This is for example true for the read task of OIO Transport
// See https://github.com/netty/netty/issues/1614
fetchFromScheduledTaskQueue();
task = taskQueue.poll();
}

View File

@ -102,6 +102,54 @@ public class GlobalEventExecutorTest {
assertEquals(group, capturedGroup.get());
}
@Test(timeout = 5000)
public void testTakeTask() throws Exception {
//add task
TestRunnable beforeTask = new TestRunnable(0);
e.execute(beforeTask);
//add scheduled task
TestRunnable scheduledTask = new TestRunnable(0);
ScheduledFuture<?> f = e.schedule(scheduledTask , 1500, TimeUnit.MILLISECONDS);
//add task
TestRunnable afterTask = new TestRunnable(0);
e.execute(afterTask);
f.sync();
assertThat(beforeTask.ran.get(), is(true));
assertThat(scheduledTask.ran.get(), is(true));
assertThat(afterTask.ran.get(), is(true));
}
@Test(timeout = 5000)
public void testTakeTaskAlwaysHasTask() throws Exception {
//for https://github.com/netty/netty/issues/1614
//add scheduled task
TestRunnable t = new TestRunnable(0);
ScheduledFuture<?> f = e.schedule(t, 1500, TimeUnit.MILLISECONDS);
final Runnable doNothing = () -> { };
final AtomicBoolean stop = new AtomicBoolean(false);
//ensure always has at least one task in taskQueue
//check if scheduled tasks are triggered
try {
new Thread(() -> {
while (!stop.get()) {
e.execute(doNothing);
}
}).start();
f.sync();
assertThat(t.ran.get(), is(true));
} finally {
stop.set(true);
}
}
private static final class TestRunnable implements Runnable {
final AtomicBoolean ran = new AtomicBoolean();
final long delay;

View File

@ -30,9 +30,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
public class SingleThreadEventExecutorTest {
@Test
@ -253,4 +257,91 @@ public class SingleThreadEventExecutorTest {
Assert.assertTrue(f.isSuccess());
}
}
@Test(timeout = 5000)
public void testTakeTask() throws Exception {
final SingleThreadEventExecutor executor =
new SingleThreadEventExecutor(Executors.defaultThreadFactory()) {
@Override
protected void run() {
while (!confirmShutdown()) {
Runnable task = takeTask();
if (task != null) {
task.run();
}
}
}
};
//add task
TestRunnable beforeTask = new TestRunnable();
executor.execute(beforeTask);
//add scheduled task
TestRunnable scheduledTask = new TestRunnable();
ScheduledFuture<?> f = executor.schedule(scheduledTask , 1500, TimeUnit.MILLISECONDS);
//add task
TestRunnable afterTask = new TestRunnable();
executor.execute(afterTask);
f.sync();
assertThat(beforeTask.ran.get(), is(true));
assertThat(scheduledTask.ran.get(), is(true));
assertThat(afterTask.ran.get(), is(true));
}
@Test(timeout = 5000)
public void testTakeTaskAlwaysHasTask() throws Exception {
//for https://github.com/netty/netty/issues/1614
final SingleThreadEventExecutor executor =
new SingleThreadEventExecutor(Executors.defaultThreadFactory()) {
@Override
protected void run() {
while (!confirmShutdown()) {
Runnable task = takeTask();
if (task != null) {
task.run();
}
}
}
};
//add scheduled task
TestRunnable t = new TestRunnable();
ScheduledFuture<?> f = executor.schedule(t, 1500, TimeUnit.MILLISECONDS);
final Runnable doNothing = () -> { };
final AtomicBoolean stop = new AtomicBoolean(false);
//ensure always has at least one task in taskQueue
//check if scheduled tasks are triggered
try {
new Thread(() -> {
while (!stop.get()) {
executor.execute(doNothing);
}
}).start();
f.sync();
assertThat(t.ran.get(), is(true));
} finally {
stop.set(true);
}
}
private static final class TestRunnable implements Runnable {
final AtomicBoolean ran = new AtomicBoolean();
TestRunnable() {
}
@Override
public void run() {
ran.set(true);
}
}
}