From fa02ffddae42307fd5571ce6af8b122ca8a79f75 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Fri, 22 Mar 2013 09:06:08 +0900 Subject: [PATCH] Remove TaskScheduler and ImmediateEventExecutor that requires TaskScheduler - Related issue: #817 --- .../concurrent/AbstractEventExecutor.java | 69 ++- ...AbstractEventExecutorWithoutScheduler.java | 94 ---- .../concurrent/ImmediateEventExecutor.java | 78 --- .../concurrent/SingleThreadEventExecutor.java | 2 +- .../netty/util/concurrent/TaskScheduler.java | 474 ------------------ .../ImmediateEventExecutorTest.java | 51 -- .../channel/embedded/EmbeddedEventLoop.java | 4 +- .../channel/group/DefaultChannelGroup.java | 4 +- .../channel/group/ImmediateEventExecutor.java | 4 +- 9 files changed, 61 insertions(+), 719 deletions(-) delete mode 100644 common/src/main/java/io/netty/util/concurrent/AbstractEventExecutorWithoutScheduler.java delete mode 100644 common/src/main/java/io/netty/util/concurrent/ImmediateEventExecutor.java delete mode 100644 common/src/main/java/io/netty/util/concurrent/TaskScheduler.java delete mode 100644 common/src/test/java/io/netty/util/concurrent/ImmediateEventExecutorTest.java diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java index a5edf5e1bc..7b1cb9b711 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java @@ -15,41 +15,80 @@ */ package io.netty.util.concurrent; +import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.Callable; +import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; - /** - * Abstract base class for {@link EventExecutor} implementations that use a {@link TaskScheduler} to support - * scheduling tasks. + * Abstract base class for {@link EventExecutor} implementations. */ -public abstract class AbstractEventExecutor extends AbstractEventExecutorWithoutScheduler { - private final TaskScheduler scheduler; +public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor { - protected AbstractEventExecutor(TaskScheduler scheduler) { - if (scheduler == null) { - throw new NullPointerException("scheduler"); - } - this.scheduler = scheduler; + @Override + public EventExecutor next() { + return this; } @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - return scheduler.schedule(this, command, delay, unit); + public Promise newPromise() { + return new DefaultPromise(this); + } + + @Override + public Future newSucceededFuture(V result) { + return new SucceededFuture(this, result); + } + + @Override + public Future newFailedFuture(Throwable cause) { + return new FailedFuture(this, cause); + } + + @Override + public Future submit(Runnable task) { + return (Future) super.submit(task); + } + + @Override + public Future submit(Runnable task, T result) { + return (Future) super.submit(task, result); + } + + @Override + public Future submit(Callable task) { + return (Future) super.submit(task); + } + + @Override + protected final RunnableFuture newTaskFor(Runnable runnable, T value) { + return new PromiseTask(this, runnable, value); + } + + @Override + protected final RunnableFuture newTaskFor(Callable callable) { + return new PromiseTask(this, callable); + } + + @Override + public ScheduledFuture schedule(Runnable command, long delay, + TimeUnit unit) { + throw new UnsupportedOperationException(); } @Override public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - return scheduler.schedule(this, callable, delay, unit); + throw new UnsupportedOperationException(); } @Override public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - return scheduler.scheduleAtFixedRate(this, command, initialDelay, period, unit); + throw new UnsupportedOperationException(); } @Override public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - return scheduler.scheduleWithFixedDelay(this, command, initialDelay, delay, unit); + throw new UnsupportedOperationException(); } + } diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutorWithoutScheduler.java b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutorWithoutScheduler.java deleted file mode 100644 index eef08f5232..0000000000 --- a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutorWithoutScheduler.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright 2013 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.util.concurrent; - -import java.util.concurrent.AbstractExecutorService; -import java.util.concurrent.Callable; -import java.util.concurrent.RunnableFuture; -import java.util.concurrent.TimeUnit; - -/** - * Abstract base class for {@link EventExecutor} implementations. - */ -public abstract class AbstractEventExecutorWithoutScheduler extends AbstractExecutorService implements EventExecutor { - - @Override - public EventExecutor next() { - return this; - } - - @Override - public Promise newPromise() { - return new DefaultPromise(this); - } - - @Override - public Future newSucceededFuture(V result) { - return new SucceededFuture(this, result); - } - - @Override - public Future newFailedFuture(Throwable cause) { - return new FailedFuture(this, cause); - } - - @Override - public Future submit(Runnable task) { - return (Future) super.submit(task); - } - - @Override - public Future submit(Runnable task, T result) { - return (Future) super.submit(task, result); - } - - @Override - public Future submit(Callable task) { - return (Future) super.submit(task); - } - - @Override - protected final RunnableFuture newTaskFor(Runnable runnable, T value) { - return new PromiseTask(this, runnable, value); - } - - @Override - protected final RunnableFuture newTaskFor(Callable callable) { - return new PromiseTask(this, callable); - } - - @Override - public ScheduledFuture schedule(Runnable command, long delay, - TimeUnit unit) { - throw new UnsupportedOperationException(); - } - - @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - throw new UnsupportedOperationException(); - } - - @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - throw new UnsupportedOperationException(); - } - - @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - throw new UnsupportedOperationException(); - } - -} diff --git a/common/src/main/java/io/netty/util/concurrent/ImmediateEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/ImmediateEventExecutor.java deleted file mode 100644 index 7e2986e5b5..0000000000 --- a/common/src/main/java/io/netty/util/concurrent/ImmediateEventExecutor.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright 2013 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.util.concurrent; - -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; - -/** - * {@link EventExecutor} which executes the command in the caller thread. - */ -public final class ImmediateEventExecutor extends AbstractEventExecutor { - - public ImmediateEventExecutor(TaskScheduler scheduler) { - super(scheduler); - } - - @Override - public EventExecutorGroup parent() { - return null; - } - - @Override - public boolean inEventLoop() { - return true; - } - - @Override - public boolean inEventLoop(Thread thread) { - return true; - } - - @Override - public void shutdown() { - // NOOP - } - - @Override - public boolean isShutdown() { - return false; - } - - @Override - public boolean isTerminated() { - return false; - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) { - return false; - } - - @Override - public List shutdownNow() { - return Collections.emptyList(); - } - - @Override - public void execute(Runnable command) { - if (command == null) { - throw new NullPointerException("command"); - } - command.run(); - } -} diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index dc3aea782e..d4b5ba24e2 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -43,7 +43,7 @@ import java.util.concurrent.atomic.AtomicLong; * Abstract base class for {@link EventExecutor}'s that execute all its submitted tasks in a single thread. * */ -public abstract class SingleThreadEventExecutor extends AbstractEventExecutorWithoutScheduler { +public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { private static final InternalLogger logger = InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class); diff --git a/common/src/main/java/io/netty/util/concurrent/TaskScheduler.java b/common/src/main/java/io/netty/util/concurrent/TaskScheduler.java deleted file mode 100644 index e3db543399..0000000000 --- a/common/src/main/java/io/netty/util/concurrent/TaskScheduler.java +++ /dev/null @@ -1,474 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.util.concurrent; - -import io.netty.util.internal.logging.InternalLogger; -import io.netty.util.internal.logging.InternalLoggerFactory; - -import java.util.Iterator; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.DelayQueue; -import java.util.concurrent.Delayed; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicLong; - -public final class TaskScheduler { - - private static final InternalLogger logger = - InternalLoggerFactory.getInstance(TaskScheduler.class); - - private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1); - private static final long START_TIME = System.nanoTime(); - private static final AtomicLong nextTaskId = new AtomicLong(); - private static long nanoTime() { - return System.nanoTime() - START_TIME; - } - - private static long deadlineNanos(long delay) { - return nanoTime() + delay; - } - - private final BlockingQueue> taskQueue = new DelayQueue>(); - private final Thread thread; - private final Object stateLock = new Object(); - private final Semaphore threadLock = new Semaphore(0); - /** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */ - private volatile int state; - - public TaskScheduler(ThreadFactory threadFactory) { - if (threadFactory == null) { - throw new NullPointerException("threadFactory"); - } - - thread = threadFactory.newThread(new Runnable() { - @Override - public void run() { - try { - for (;;) { - ScheduledFutureTask task; - try { - task = taskQueue.take(); - runTask(task); - } catch (InterruptedException e) { - // Waken up by interruptThread() - } - - if (isShutdown() && taskQueue.peek() == null) { - break; - } - } - } finally { - try { - // Run all remaining tasks and shutdown hooks. - try { - cleanupTasks(); - } finally { - synchronized (stateLock) { - state = 3; - } - } - cleanupTasks(); - } finally { - threadLock.release(); - assert taskQueue.isEmpty(); - } - } - } - - private void runTask(ScheduledFutureTask task) { - EventExecutor executor = task.executor(); - if (executor == null) { - task.run(); - } else { - if (executor.isShutdown()) { - task.cancel(false); - } else { - try { - executor.execute(task); - } catch (RejectedExecutionException e) { - task.cancel(false); - } - } - } - } - - private void cleanupTasks() { - for (;;) { - boolean ran = false; - cancelScheduledTasks(); - for (;;) { - final ScheduledFutureTask task = taskQueue.poll(); - if (task == null) { - break; - } - - try { - runTask(task); - ran = true; - } catch (Throwable t) { - logger.warn("A task raised an exception.", t); - } - } - - if (!ran && taskQueue.isEmpty()) { - break; - } - } - } - }); - } - - private boolean inSameThread() { - return Thread.currentThread() == thread; - } - - public void shutdown() { - boolean inSameThread = inSameThread(); - boolean wakeup = false; - if (inSameThread) { - synchronized (stateLock) { - assert state == 1; - state = 2; - wakeup = true; - } - } else { - synchronized (stateLock) { - switch (state) { - case 0: - state = 3; - threadLock.release(); - break; - case 1: - state = 2; - wakeup = true; - break; - } - } - } - - if (wakeup && !inSameThread && isShutdown()) { - thread.interrupt(); - } - } - - public boolean isShutdown() { - return state >= 2; - } - - public boolean isTerminated() { - return state == 3; - } - - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - if (unit == null) { - throw new NullPointerException("unit"); - } - - if (inSameThread()) { - throw new IllegalStateException("cannot await termination of the current thread"); - } - - if (threadLock.tryAcquire(timeout, unit)) { - threadLock.release(); - } - - return isTerminated(); - } - - public ScheduledFuture schedule( - EventExecutor executor, Runnable command, long delay, TimeUnit unit) { - if (executor == null) { - throw new NullPointerException("executor"); - } - if (command == null) { - throw new NullPointerException("command"); - } - if (unit == null) { - throw new NullPointerException("unit"); - } - if (delay < 0) { - throw new IllegalArgumentException( - String.format("delay: %d (expected: >= 0)", delay)); - } - return schedule(new ScheduledFutureTask(this, executor, - command, null, deadlineNanos(unit.toNanos(delay)))); - } - - public ScheduledFuture schedule( - EventExecutor executor, Callable callable, long delay, TimeUnit unit) { - if (executor == null) { - throw new NullPointerException("executor"); - } - if (callable == null) { - throw new NullPointerException("callable"); - } - if (unit == null) { - throw new NullPointerException("unit"); - } - if (delay < 0) { - throw new IllegalArgumentException( - String.format("delay: %d (expected: >= 0)", delay)); - } - return schedule(new ScheduledFutureTask(this, executor, callable, deadlineNanos(unit.toNanos(delay)))); - } - - public ScheduledFuture scheduleAtFixedRate( - EventExecutor executor, Runnable command, long initialDelay, long period, TimeUnit unit) { - if (executor == null) { - throw new NullPointerException("executor"); - } - if (command == null) { - throw new NullPointerException("command"); - } - if (unit == null) { - throw new NullPointerException("unit"); - } - if (initialDelay < 0) { - throw new IllegalArgumentException( - String.format("initialDelay: %d (expected: >= 0)", initialDelay)); - } - if (period <= 0) { - throw new IllegalArgumentException( - String.format("period: %d (expected: > 0)", period)); - } - - return schedule(new ScheduledFutureTask( - this, executor, Executors.callable(command, null), - deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period))); - } - - public ScheduledFuture scheduleWithFixedDelay( - EventExecutor executor, Runnable command, long initialDelay, long delay, TimeUnit unit) { - if (executor == null) { - throw new NullPointerException("executor"); - } - if (command == null) { - throw new NullPointerException("command"); - } - if (unit == null) { - throw new NullPointerException("unit"); - } - if (initialDelay < 0) { - throw new IllegalArgumentException( - String.format("initialDelay: %d (expected: >= 0)", initialDelay)); - } - if (delay <= 0) { - throw new IllegalArgumentException( - String.format("delay: %d (expected: > 0)", delay)); - } - - return schedule(new ScheduledFutureTask( - this, executor, Executors.callable(command, null), - deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay))); - } - - private ScheduledFuture schedule(ScheduledFutureTask task) { - if (isShutdown()) { - reject(); - } - taskQueue.add(task); - if (isShutdown()) { - task.cancel(false); - } - - boolean started = false; - if (!inSameThread()) { - synchronized (stateLock) { - if (state == 0) { - state = 1; - thread.start(); - started = true; - } - } - } - - if (started) { - schedule(new ScheduledFutureTask(this, new ImmediateEventExecutor(this) - , Executors.callable(new PurgeTask(), null), - deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL)); - } - - return task; - } - - private static void reject() { - throw new RejectedExecutionException("event executor shut down"); - } - - private void cancelScheduledTasks() { - if (taskQueue.isEmpty()) { - return; - } - - for (ScheduledFutureTask task: taskQueue.toArray(new ScheduledFutureTask[taskQueue.size()])) { - task.cancel(false); - } - - taskQueue.clear(); - } - - private static class ScheduledFutureTask extends PromiseTask implements ScheduledFuture { - - @SuppressWarnings("rawtypes") - private static final AtomicIntegerFieldUpdater uncancellableUpdater = - AtomicIntegerFieldUpdater.newUpdater(ScheduledFutureTask.class, "uncancellable"); - - private final long id = nextTaskId.getAndIncrement(); - private long deadlineNanos; - /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */ - private final long periodNanos; - private final TaskScheduler scheduler; - - @SuppressWarnings("UnusedDeclaration") - private volatile int uncancellable; - - ScheduledFutureTask(TaskScheduler scheduler, EventExecutor executor, - Runnable runnable, V result, long nanoTime) { - this(scheduler, executor, Executors.callable(runnable, result), nanoTime); - } - - ScheduledFutureTask(TaskScheduler scheduler, EventExecutor executor, - Callable callable, long nanoTime, long period) { - super(executor, callable); - if (period == 0) { - throw new IllegalArgumentException("period: 0 (expected: != 0)"); - } - deadlineNanos = nanoTime; - periodNanos = period; - this.scheduler = scheduler; - } - - ScheduledFutureTask(TaskScheduler scheduler, EventExecutor executor, Callable callable, long nanoTime) { - super(executor, callable); - deadlineNanos = nanoTime; - periodNanos = 0; - this.scheduler = scheduler; - } - - public long deadlineNanos() { - return deadlineNanos; - } - - public long delayNanos() { - return Math.max(0, deadlineNanos() - nanoTime()); - } - - @Override - public long getDelay(TimeUnit unit) { - return unit.convert(delayNanos(), TimeUnit.NANOSECONDS); - } - - @Override - public int compareTo(Delayed o) { - if (this == o) { - return 0; - } - - ScheduledFutureTask that = (ScheduledFutureTask) o; - long d = deadlineNanos() - that.deadlineNanos(); - if (d < 0) { - return -1; - } else if (d > 0) { - return 1; - } else if (id < that.id) { - return -1; - } else if (id == that.id) { - throw new Error(); - } else { - return 1; - } - } - - @Override - public int hashCode() { - return super.hashCode(); - } - - @Override - public boolean equals(Object obj) { - return super.equals(obj); - } - - @Override - public void run() { - try { - if (periodNanos == 0) { - if (setUncancellable()) { - V result = task.call(); - setSuccessInternal(result); - } - } else { - task.call(); - if (!scheduler.isShutdown()) { - long p = periodNanos; - if (p > 0) { - deadlineNanos += p; - } else { - deadlineNanos = nanoTime() - p; - } - if (!isDone()) { - scheduler.schedule(this); - } - } - } - } catch (Throwable cause) { - setFailureInternal(cause); - } - } - - @Override - public boolean isCancelled() { - if (cause() instanceof CancellationException) { - return true; - } - return false; - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - if (!isDone()) { - if (setUncancellable()) { - return tryFailureInternal(new CancellationException()); - } - } - return false; - } - - private boolean setUncancellable() { - return uncancellableUpdater.compareAndSet(this, 0, 1); - } - } - - private final class PurgeTask implements Runnable { - @Override - public void run() { - Iterator> i = taskQueue.iterator(); - while (i.hasNext()) { - ScheduledFutureTask task = i.next(); - if (task.isCancelled()) { - i.remove(); - } - } - } - } -} diff --git a/common/src/test/java/io/netty/util/concurrent/ImmediateEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/ImmediateEventExecutorTest.java deleted file mode 100644 index 143fd64cf1..0000000000 --- a/common/src/test/java/io/netty/util/concurrent/ImmediateEventExecutorTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.util.concurrent; - -import static org.junit.Assert.*; - -import org.easymock.EasyMock; -import org.junit.Test; - -import java.util.concurrent.Executors; - -public class ImmediateEventExecutorTest { - - @Test - public void shouldExecuteImmediately() { - TaskScheduler scheduler = new TaskScheduler(Executors.defaultThreadFactory()); - ImmediateEventExecutor e = new ImmediateEventExecutor(scheduler); - long startTime = System.nanoTime(); - e.execute(new Runnable() { - @Override - public void run() { - long startTime = System.nanoTime(); - for (;;) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // Ignore - } - if (System.nanoTime() - startTime >= 1000000000L) { - break; - } - } - } - }); - assertTrue(System.nanoTime() - startTime >= 1000000000L); - scheduler.shutdown(); - } -} diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java index 6887bc94c8..2996660e53 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java @@ -20,7 +20,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; -import io.netty.util.concurrent.AbstractEventExecutorWithoutScheduler; +import io.netty.util.concurrent.AbstractEventExecutor; import java.util.ArrayDeque; import java.util.Collections; @@ -28,7 +28,7 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.TimeUnit; -final class EmbeddedEventLoop extends AbstractEventExecutorWithoutScheduler implements EventLoop { +final class EmbeddedEventLoop extends AbstractEventExecutor implements EventLoop { private final Queue tasks = new ArrayDeque(2); diff --git a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java index 26c7c9ad3c..bb5b5096c6 100644 --- a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java +++ b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java @@ -21,7 +21,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.FileRegion; import io.netty.channel.ServerChannel; -import io.netty.util.concurrent.AbstractEventExecutorWithoutScheduler; +import io.netty.util.concurrent.AbstractEventExecutor; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.internal.PlatformDependent; @@ -313,7 +313,7 @@ public class DefaultChannelGroup extends AbstractSet implements Channel "(name: " + name() + ", size: " + size() + ')'; } - static final class ImmediateEventExecutor extends AbstractEventExecutorWithoutScheduler { + static final class ImmediateEventExecutor extends AbstractEventExecutor { @Override public EventExecutorGroup parent() { diff --git a/transport/src/main/java/io/netty/channel/group/ImmediateEventExecutor.java b/transport/src/main/java/io/netty/channel/group/ImmediateEventExecutor.java index 6a418445c2..84df873aba 100644 --- a/transport/src/main/java/io/netty/channel/group/ImmediateEventExecutor.java +++ b/transport/src/main/java/io/netty/channel/group/ImmediateEventExecutor.java @@ -15,7 +15,7 @@ */ package io.netty.channel.group; -import io.netty.util.concurrent.AbstractEventExecutorWithoutScheduler; +import io.netty.util.concurrent.AbstractEventExecutor; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorGroup; @@ -25,7 +25,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; -final class ImmediateEventExecutor extends AbstractEventExecutorWithoutScheduler { +final class ImmediateEventExecutor extends AbstractEventExecutor { @Override public EventExecutorGroup parent() {