fix bug: scheduled tasks may not be executed (#9980)

Motivation:

If there was always a task in the taskQueue of GlobalEvenExecutor, scheduled tasks in the
scheduledTaskQueue will never be executed.

Related to  #1614

Modifications:

fix bug in GlobalEventExecutor#takeTask

Result:

fix bug
This commit is contained in:
Ruwei 2020-01-31 17:57:38 +08:00 committed by GitHub
parent 0671b18e24
commit 6e5f229589
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 160 additions and 8 deletions

View File

@ -89,7 +89,7 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im
return task;
} else {
long delayNanos = scheduledTask.delayNanos();
Runnable task;
Runnable task = null;
if (delayNanos > 0) {
try {
task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
@ -97,11 +97,12 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im
// Waken up.
return null;
}
} else {
task = taskQueue.poll();
}
if (task == null) {
// We need to fetch the scheduled tasks now as otherwise there may be a chance that
// scheduled tasks are never executed if there is always one task in the taskQueue.
// This is for example true for the read task of OIO Transport
// See https://github.com/netty/netty/issues/1614
fetchFromScheduledTaskQueue();
task = taskQueue.poll();
}

View File

@ -108,6 +108,62 @@ public class GlobalEventExecutorTest {
assertEquals(group, capturedGroup.get());
}
@Test(timeout = 5000)
public void testTakeTask() throws Exception {
//add task
TestRunnable beforeTask = new TestRunnable(0);
e.execute(beforeTask);
//add scheduled task
TestRunnable scheduledTask = new TestRunnable(0);
ScheduledFuture<?> f = e.schedule(scheduledTask , 1500, TimeUnit.MILLISECONDS);
//add task
TestRunnable afterTask = new TestRunnable(0);
e.execute(afterTask);
f.sync();
assertThat(beforeTask.ran.get(), is(true));
assertThat(scheduledTask.ran.get(), is(true));
assertThat(afterTask.ran.get(), is(true));
}
@Test(timeout = 5000)
public void testTakeTaskAlwaysHasTask() throws Exception {
//for https://github.com/netty/netty/issues/1614
//add scheduled task
TestRunnable t = new TestRunnable(0);
ScheduledFuture<?> f = e.schedule(t, 1500, TimeUnit.MILLISECONDS);
final Runnable doNothing = new Runnable() {
@Override
public void run() {
//NOOP
}
};
final AtomicBoolean stop = new AtomicBoolean(false);
//ensure always has at least one task in taskQueue
//check if scheduled tasks are triggered
try {
new Thread(new Runnable() {
@Override
public void run() {
while (!stop.get()) {
e.execute(doNothing);
}
}
}).start();
f.sync();
assertThat(t.ran.get(), is(true));
} finally {
stop.set(true);
}
}
private static final class TestRunnable implements Runnable {
final AtomicBoolean ran = new AtomicBoolean();
final long delay;

View File

@ -20,10 +20,6 @@ import org.junit.Test;
import io.netty.util.concurrent.AbstractEventExecutor.LazyRunnable;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.Callable;
@ -34,9 +30,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
public class SingleThreadEventExecutorTest {
@Test
@ -320,4 +320,99 @@ public class SingleThreadEventExecutorTest {
assertTrue(f.isSuccess());
}
}
@Test(timeout = 5000)
public void testTakeTask() throws Exception {
final SingleThreadEventExecutor executor =
new SingleThreadEventExecutor(null, Executors.defaultThreadFactory(), true) {
@Override
protected void run() {
while (!confirmShutdown()) {
Runnable task = takeTask();
if (task != null) {
task.run();
}
}
}
};
//add task
TestRunnable beforeTask = new TestRunnable();
executor.execute(beforeTask);
//add scheduled task
TestRunnable scheduledTask = new TestRunnable();
ScheduledFuture<?> f = executor.schedule(scheduledTask , 1500, TimeUnit.MILLISECONDS);
//add task
TestRunnable afterTask = new TestRunnable();
executor.execute(afterTask);
f.sync();
assertThat(beforeTask.ran.get(), is(true));
assertThat(scheduledTask.ran.get(), is(true));
assertThat(afterTask.ran.get(), is(true));
}
@Test(timeout = 5000)
public void testTakeTaskAlwaysHasTask() throws Exception {
//for https://github.com/netty/netty/issues/1614
final SingleThreadEventExecutor executor =
new SingleThreadEventExecutor(null, Executors.defaultThreadFactory(), true) {
@Override
protected void run() {
while (!confirmShutdown()) {
Runnable task = takeTask();
if (task != null) {
task.run();
}
}
}
};
//add scheduled task
TestRunnable t = new TestRunnable();
ScheduledFuture<?> f = executor.schedule(t, 1500, TimeUnit.MILLISECONDS);
final Runnable doNothing = new Runnable() {
@Override
public void run() {
//NOOP
}
};
final AtomicBoolean stop = new AtomicBoolean(false);
//ensure always has at least one task in taskQueue
//check if scheduled tasks are triggered
try {
new Thread(new Runnable() {
@Override
public void run() {
while (!stop.get()) {
executor.execute(doNothing);
}
}
}).start();
f.sync();
assertThat(t.ran.get(), is(true));
} finally {
stop.set(true);
}
}
private static final class TestRunnable implements Runnable {
final AtomicBoolean ran = new AtomicBoolean();
TestRunnable() {
}
@Override
public void run() {
ran.set(true);
}
}
}