diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java b/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java index 7c65d71690..a00383ddff 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java @@ -475,7 +475,7 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService } } - private static void reject() { + protected static void reject() { throw new RejectedExecutionException("event executor terminated"); } diff --git a/transport/src/main/java/io/netty/channel/aio/AioCompletionHandler.java b/transport/src/main/java/io/netty/channel/aio/AioCompletionHandler.java index f38660a1d0..ceaa84eecc 100644 --- a/transport/src/main/java/io/netty/channel/aio/AioCompletionHandler.java +++ b/transport/src/main/java/io/netty/channel/aio/AioCompletionHandler.java @@ -60,6 +60,15 @@ public abstract class AioCompletionHandler implements Comp } finally { STACK_DEPTH.set(d); } + } else { + // schedule it with a special runnable to make sure we keep the right + // order and exist the recursive call to prevent stackoverflow + loop.execute(new AioEventLoop.RecursionBreakingRunnable() { + @Override + public void run() { + completed0(result, channel); + } + }); } } else { loop.execute(new Runnable() { @@ -83,6 +92,15 @@ public abstract class AioCompletionHandler implements Comp } finally { STACK_DEPTH.set(d); } + } else { + // schedule it with a special runnable to make sure we keep the right + // order and exist the recursive call to prevent stackoverflow + loop.execute(new AioEventLoop.RecursionBreakingRunnable() { + @Override + public void run() { + failed0(exc, channel); + } + }); } } else { loop.execute(new Runnable() { diff --git a/transport/src/main/java/io/netty/channel/aio/AioEventLoop.java b/transport/src/main/java/io/netty/channel/aio/AioEventLoop.java index 9d0e15f34f..b024d6ef01 100644 --- a/transport/src/main/java/io/netty/channel/aio/AioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/aio/AioEventLoop.java @@ -26,7 +26,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.IdentityHashMap; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadFactory; /** @@ -35,6 +37,7 @@ import java.util.concurrent.ThreadFactory; final class AioEventLoop extends SingleThreadEventLoop { private final Set channels = Collections.newSetFromMap(new IdentityHashMap()); + private LinkedBlockingDeque taskQueue; private final ChannelFutureListener registrationListener = new ChannelFutureListener() { @Override @@ -100,4 +103,37 @@ final class AioEventLoop extends SingleThreadEventLoop { ch.unsafe().close(ch.unsafe().voidFuture()); } } + + @Override + protected Queue newTaskQueue() { + // use a Deque as we need to be able to also add tasks on the first position. + taskQueue = new LinkedBlockingDeque(); + return taskQueue; + } + + @Override + protected void addTask(Runnable task) { + if (task instanceof RecursionBreakingRunnable) { + if (task == null) { + throw new NullPointerException("task"); + } + if (isTerminated()) { + reject(); + } + // put the task at the first postion of the queue as we just schedule it to + // break the recursive operation + taskQueue.addFirst(task); + } else { + super.addTask(task); + } + } + + /** + * Special Runnable which is used by {@link AioCompletionHandler} to break a recursive call and so prevent + * from StackOverFlowError. When a task is executed that implement it needs to put on the first position of + * the queue to guaranteer execution order and break the recursive call. + */ + interface RecursionBreakingRunnable extends Runnable { + // Marker interface + } }