diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index 2744a7bddf..d48031d46d 100755 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -45,7 +45,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements private final DefaultChannelPipeline pipeline; private final String name; private final ChannelHandler handler; - private boolean needsLazyBufInit; // Will be set to null if no child executor should be used, otherwise it will be set to the // child executor. @@ -182,7 +181,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements executor = null; inByteBuf = null; inMsgBuf = null; - needsLazyBufInit = true; } DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, String name, TailHandler handler) { @@ -302,30 +300,19 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } } - private void lazyInitHeadHandler() { - if (needsLazyBufInit) { - EventExecutor exec = executor(); - if (exec.inEventLoop()) { - if (needsLazyBufInit) { - needsLazyBufInit = false; - HeadHandler headHandler = (HeadHandler) handler; - headHandler.init(this); - outByteBuf = headHandler.byteSink; - outMsgBuf = headHandler.msgSink; - } - } else { - try { - getFromFuture(exec.submit(new Runnable() { - @Override - public void run() { - lazyInitHeadHandler(); - } - })); - } catch (Exception e) { - throw new ChannelPipelineException("failed to initialize an outbound buffer lazily", e); - } - } + void initHeadHandler() { + // Must be called for the head handler. + HeadHandler h = (HeadHandler) handler; + if (h.initialized) { + return; } + + assert executor().inEventLoop(); + + h.init(this); + h.initialized = true; + outByteBuf = h.byteSink; + outMsgBuf = h.msgSink; } private void fillInboundBridge() { @@ -563,41 +550,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements waitForFuture(executor().submit(r)); } - /** - * Waits for a future to finish and gets the result. If the task is interrupted, then the current thread - * will be interrupted and this will return {@code null}. It is expected that the task performs any - * appropriate locking. - *

- * If the internal call throws a {@link Throwable}, but it is not an instance of {@link Error}, - * {@link RuntimeException}, or {@link Exception}, then it is wrapped inside an {@link AssertionError} - * and that is thrown instead.

- * - * @param future wait for this future - * @param the return value type - * @return the task's return value, or {@code null} if the task was interrupted. - * @see Future#get() - * @throws Error if the task threw this. - * @throws RuntimeException if the task threw this. - * @throws Exception if the task threw this. - * @throws ChannelPipelineException with a {@link Throwable} as a cause, if the task threw another type of - * {@link Throwable}. - */ - private static T getFromFuture(Future future) throws Exception { - try { - return future.get(); - } catch (ExecutionException ex) { - // In the arbitrary case, we can throw Error, RuntimeException, and Exception - - PlatformDependent.throwException(ex.getCause()); - } catch (InterruptedException ex) { - // Interrupt the calling thread (note that this method is not called from the event loop) - - Thread.currentThread().interrupt(); - } - - return null; - } - /** * Waits for a future to finish. If the task is interrupted, then the current thread will be interrupted. * It is expected that the task performs any appropriate locking. @@ -740,7 +692,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements @Override public ChannelHandlerContext fireChannelRegistered() { - lazyInitHeadHandler(); final DefaultChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { @@ -793,7 +744,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements @Override public ChannelHandlerContext fireChannelActive() { - lazyInitHeadHandler(); final DefaultChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 4faed5641b..33e8a61532 100755 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -908,6 +908,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline fireChannelRegistered() { + head.initHeadHandler(); head.fireChannelRegistered(); return this; } @@ -926,6 +927,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline fireChannelActive() { firedChannelActive = true; + head.initHeadHandler(); head.fireChannelActive(); if (channel.config().isAutoRead()) { @@ -1200,12 +1202,14 @@ final class DefaultChannelPipeline implements ChannelPipeline { protected final Unsafe unsafe; ByteBuf byteSink; MessageBuf msgSink; + boolean initialized; protected HeadHandler(Unsafe unsafe) { this.unsafe = unsafe; } void init(ChannelHandlerContext ctx) { + assert !initialized; switch (ctx.channel().metadata().bufferType()) { case BYTE: byteSink = ctx.alloc().ioBuffer();