From f2577f736194e590c938350815918622dd03b432 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 20 May 2016 09:07:53 +0200 Subject: [PATCH] [#4906] Ensure addLast(...) works as expected in EmbeddedChannel Motivation: If the user will use addLast(...) on the ChannelPipeline of EmbeddedChannel after its constructor was run it will break the EmbeddedChannel as it will not be able to collect inbound messages and exceptions. Modifications: Ensure addLast(...) work as expected by move the logic of handling messages and exceptions ti protected methods of DefaultChannelPipeline and use a custom implementation for EmbeddedChannel Result: addLast(...) works as expected when using EmbeddedChannel. --- .../io/netty/channel/AbstractChannel.java | 9 +- .../netty/channel/DefaultChannelPipeline.java | 205 ++++++++++-------- .../channel/embedded/EmbeddedChannel.java | 23 +- .../netty/channel/PendingWriteQueueTest.java | 13 +- 4 files changed, 144 insertions(+), 106 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index bd876750b4..6749487606 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -80,7 +80,14 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha protected AbstractChannel(Channel parent) { this.parent = parent; unsafe = newUnsafe(); - pipeline = new DefaultChannelPipeline(this); + pipeline = newChannelPipeline(); + } + + /** + * Returns a new {@link DefaultChannelPipeline} instance. + */ + protected DefaultChannelPipeline newChannelPipeline() { + return new DefaultChannelPipeline(this); } @Override diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index fa45b3c2d9..0e5056bfe7 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -20,6 +20,7 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.FastThreadLocal; +import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; @@ -40,10 +41,13 @@ import java.util.concurrent.RejectedExecutionException; * The default {@link ChannelPipeline} implementation. It is usually created * by a {@link Channel} implementation when the {@link Channel} is created. */ -final class DefaultChannelPipeline implements ChannelPipeline { +public class DefaultChannelPipeline implements ChannelPipeline { static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class); + private static final String HEAD_NAME = generateName0(HeadContext.class); + private static final String TAIL_NAME = generateName0(TailContext.class); + private static final FastThreadLocal, String>> nameCaches = new FastThreadLocal, String>>() { @Override @@ -52,7 +56,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } }; - final AbstractChannel channel; + private final Channel channel; final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; @@ -75,11 +79,11 @@ final class DefaultChannelPipeline implements ChannelPipeline { */ private boolean registered; - public DefaultChannelPipeline(AbstractChannel channel) { - if (channel == null) { - throw new NullPointerException("channel"); - } - this.channel = channel; + // - protected as this should only be called from within the same package or if someone extends + // DefaultChannelPipeline. + // - Tied to AbstractChannel as we need to ensure that callHandlerAddedForAllHandlers() is correctly called. + protected DefaultChannelPipeline(AbstractChannel channel) { + this.channel = ObjectUtil.checkNotNull(channel, "channel"); tail = new TailContext(this); head = new HeadContext(this); @@ -112,17 +116,17 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public Channel channel() { + public final Channel channel() { return channel; } @Override - public ChannelPipeline addFirst(String name, ChannelHandler handler) { + public final ChannelPipeline addFirst(String name, ChannelHandler handler) { return addFirst(null, name, handler); } @Override - public ChannelPipeline addFirst(EventExecutorGroup group, final String name, ChannelHandler handler) { + public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; final EventExecutor executor; synchronized (this) { @@ -165,12 +169,12 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline addLast(String name, ChannelHandler handler) { + public final ChannelPipeline addLast(String name, ChannelHandler handler) { return addLast(null, name, handler); } @Override - public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) { + public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final EventExecutor executor; final AbstractChannelHandlerContext newCtx; synchronized (this) { @@ -212,13 +216,13 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) { + public final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) { return addBefore(null, baseName, name, handler); } @Override - public ChannelPipeline addBefore( - EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) { + public final ChannelPipeline addBefore( + EventExecutorGroup group, String baseName, String name, ChannelHandler handler) { final EventExecutor executor; final AbstractChannelHandlerContext newCtx; final AbstractChannelHandlerContext ctx; @@ -262,12 +266,12 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) { + public final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) { return addAfter(null, baseName, name, handler); } @Override - public ChannelPipeline addAfter( + public final ChannelPipeline addAfter( EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) { final EventExecutor executor; final AbstractChannelHandlerContext newCtx; @@ -312,12 +316,12 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline addFirst(ChannelHandler... handlers) { + public final ChannelPipeline addFirst(ChannelHandler... handlers) { return addFirst(null, handlers); } @Override - public ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) { + public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } @@ -341,12 +345,12 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline addLast(ChannelHandler... handlers) { + public final ChannelPipeline addLast(ChannelHandler... handlers) { return addLast(null, handlers); } @Override - public ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { + public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } @@ -390,19 +394,19 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline remove(ChannelHandler handler) { + public final ChannelPipeline remove(ChannelHandler handler) { remove(getContextOrDie(handler)); return this; } @Override - public ChannelHandler remove(String name) { + public final ChannelHandler remove(String name) { return remove(getContextOrDie(name)).handler(); } @SuppressWarnings("unchecked") @Override - public T remove(Class handlerType) { + public final T remove(Class handlerType) { return (T) remove(getContextOrDie(handlerType)).handler(); } @@ -445,7 +449,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelHandler removeFirst() { + public final ChannelHandler removeFirst() { if (head.next == tail) { throw new NoSuchElementException(); } @@ -453,7 +457,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelHandler removeLast() { + public final ChannelHandler removeLast() { if (head.next == tail) { throw new NoSuchElementException(); } @@ -461,19 +465,19 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) { + public final ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) { replace(getContextOrDie(oldHandler), newName, newHandler); return this; } @Override - public ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) { + public final ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) { return replace(getContextOrDie(oldName), newName, newHandler); } @Override @SuppressWarnings("unchecked") - public T replace( + public final T replace( Class oldHandlerType, String newName, ChannelHandler newHandler) { return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler); } @@ -604,7 +608,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelHandler first() { + public final ChannelHandler first() { ChannelHandlerContext first = firstContext(); if (first == null) { return null; @@ -613,7 +617,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelHandlerContext firstContext() { + public final ChannelHandlerContext firstContext() { AbstractChannelHandlerContext first = head.next; if (first == tail) { return null; @@ -622,7 +626,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelHandler last() { + public final ChannelHandler last() { AbstractChannelHandlerContext last = tail.prev; if (last == head) { return null; @@ -631,7 +635,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelHandlerContext lastContext() { + public final ChannelHandlerContext lastContext() { AbstractChannelHandlerContext last = tail.prev; if (last == head) { return null; @@ -640,7 +644,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelHandler get(String name) { + public final ChannelHandler get(String name) { ChannelHandlerContext ctx = context(name); if (ctx == null) { return null; @@ -651,7 +655,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { @SuppressWarnings("unchecked") @Override - public T get(Class handlerType) { + public final T get(Class handlerType) { ChannelHandlerContext ctx = context(handlerType); if (ctx == null) { return null; @@ -661,7 +665,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelHandlerContext context(String name) { + public final ChannelHandlerContext context(String name) { if (name == null) { throw new NullPointerException("name"); } @@ -670,7 +674,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelHandlerContext context(ChannelHandler handler) { + public final ChannelHandlerContext context(ChannelHandler handler) { if (handler == null) { throw new NullPointerException("handler"); } @@ -691,7 +695,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelHandlerContext context(Class handlerType) { + public final ChannelHandlerContext context(Class handlerType) { if (handlerType == null) { throw new NullPointerException("handlerType"); } @@ -709,7 +713,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public List names() { + public final List names() { List list = new ArrayList(); AbstractChannelHandlerContext ctx = head.next; for (;;) { @@ -722,7 +726,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public Map toMap() { + public final Map toMap() { Map map = new LinkedHashMap(); AbstractChannelHandlerContext ctx = head.next; for (;;) { @@ -735,7 +739,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public Iterator> iterator() { + public final Iterator> iterator() { return toMap().entrySet().iterator(); } @@ -743,7 +747,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { * Returns the {@link String} representation of this pipeline. */ @Override - public String toString() { + public final String toString() { StringBuilder buf = new StringBuilder() .append(StringUtil.simpleClassName(this)) .append('{'); @@ -771,13 +775,13 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline fireChannelRegistered() { + public final ChannelPipeline fireChannelRegistered() { head.fireChannelRegistered(); return this; } @Override - public ChannelPipeline fireChannelUnregistered() { + public final ChannelPipeline fireChannelUnregistered() { head.fireChannelUnregistered(); // Remove all handlers sequentially if channel is closed and unregistered. @@ -858,7 +862,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline fireChannelActive() { + public final ChannelPipeline fireChannelActive() { head.fireChannelActive(); if (channel.config().isAutoRead()) { @@ -869,31 +873,31 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline fireChannelInactive() { + public final ChannelPipeline fireChannelInactive() { head.fireChannelInactive(); return this; } @Override - public ChannelPipeline fireExceptionCaught(Throwable cause) { + public final ChannelPipeline fireExceptionCaught(Throwable cause) { head.fireExceptionCaught(cause); return this; } @Override - public ChannelPipeline fireUserEventTriggered(Object event) { + public final ChannelPipeline fireUserEventTriggered(Object event) { head.fireUserEventTriggered(event); return this; } @Override - public ChannelPipeline fireChannelRead(Object msg) { + public final ChannelPipeline fireChannelRead(Object msg) { head.fireChannelRead(msg); return this; } @Override - public ChannelPipeline fireChannelReadComplete() { + public final ChannelPipeline fireChannelReadComplete() { head.fireChannelReadComplete(); if (channel.config().isAutoRead()) { read(); @@ -902,100 +906,101 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline fireChannelWritabilityChanged() { + public final ChannelPipeline fireChannelWritabilityChanged() { head.fireChannelWritabilityChanged(); return this; } @Override - public ChannelFuture bind(SocketAddress localAddress) { + public final ChannelFuture bind(SocketAddress localAddress) { return tail.bind(localAddress); } @Override - public ChannelFuture connect(SocketAddress remoteAddress) { + public final ChannelFuture connect(SocketAddress remoteAddress) { return tail.connect(remoteAddress); } @Override - public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { + public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { return tail.connect(remoteAddress, localAddress); } @Override - public ChannelFuture disconnect() { + public final ChannelFuture disconnect() { return tail.disconnect(); } @Override - public ChannelFuture close() { + public final ChannelFuture close() { return tail.close(); } @Override - public ChannelFuture deregister() { + public final ChannelFuture deregister() { return tail.deregister(); } @Override - public ChannelPipeline flush() { + public final ChannelPipeline flush() { tail.flush(); return this; } @Override - public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { + public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); } @Override - public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { + public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return tail.connect(remoteAddress, promise); } @Override - public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + public final ChannelFuture connect( + SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { return tail.connect(remoteAddress, localAddress, promise); } @Override - public ChannelFuture disconnect(ChannelPromise promise) { + public final ChannelFuture disconnect(ChannelPromise promise) { return tail.disconnect(promise); } @Override - public ChannelFuture close(ChannelPromise promise) { + public final ChannelFuture close(ChannelPromise promise) { return tail.close(promise); } @Override - public ChannelFuture deregister(final ChannelPromise promise) { + public final ChannelFuture deregister(final ChannelPromise promise) { return tail.deregister(promise); } @Override - public ChannelPipeline read() { + public final ChannelPipeline read() { tail.read(); return this; } @Override - public ChannelFuture write(Object msg) { + public final ChannelFuture write(Object msg) { return tail.write(msg); } @Override - public ChannelFuture write(Object msg, ChannelPromise promise) { + public final ChannelFuture write(Object msg, ChannelPromise promise) { return tail.write(msg, promise); } @Override - public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { + public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { return tail.writeAndFlush(msg, promise); } @Override - public ChannelFuture writeAndFlush(Object msg) { + public final ChannelFuture writeAndFlush(Object msg) { return tail.writeAndFlush(msg); } @@ -1044,9 +1049,9 @@ final class DefaultChannelPipeline implements ChannelPipeline { } /** - * Should be called before {@link #fireChannelRegistered()} is called the first time. + * Must be called before {@link #fireChannelRegistered()} is called the first time. */ - void callHandlerAddedForAllHandlers() { + final void callHandlerAddedForAllHandlers() { // This should only called from within the EventLoop. assert channel.eventLoop().inEventLoop(); @@ -1098,10 +1103,38 @@ final class DefaultChannelPipeline implements ChannelPipeline { return eventExecutor; } - // A special catch-all handler that handles both bytes and messages. - static final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { + /** + * Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user + * in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}. + */ + protected void onUnhandledInboundException(Throwable cause) { + try { + logger.warn( + "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " + + "It usually means the last handler in the pipeline did not handle the exception.", + cause); + } finally { + ReferenceCountUtil.release(cause); + } + } - private static final String TAIL_NAME = generateName0(TailContext.class); + /** + * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user + * in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible + * to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point. + */ + protected void onUnhandledInboundMessage(Object msg) { + try { + logger.debug( + "Discarded inbound message {} that reached at the tail of the pipeline. " + + "Please check your pipeline configuration.", msg); + } finally { + ReferenceCountUtil.release(msg); + } + } + + // A special catch-all handler that handles both bytes and messages. + final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { TailContext(DefaultChannelPipeline pipeline) { super(pipeline, null, TAIL_NAME, true, false); @@ -1143,36 +1176,22 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - try { - logger.warn( - "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " + - "It usually means the last handler in the pipeline did not handle the exception.", - cause); - } finally { - ReferenceCountUtil.release(cause); - } + onUnhandledInboundException(cause); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - try { - logger.debug( - "Discarded inbound message {} that reached at the tail of the pipeline. " + - "Please check your pipeline configuration.", msg); - } finally { - ReferenceCountUtil.release(msg); - } + onUnhandledInboundMessage(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } } - static final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler { + static final class HeadContext extends AbstractChannelHandlerContext + implements ChannelOutboundHandler { - private static final String HEAD_NAME = generateName0(HeadContext.class); - - protected final Unsafe unsafe; + private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java index 54a0abeae1..91783cdb9c 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java @@ -21,14 +21,13 @@ import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelConfig; +import io.netty.channel.DefaultChannelPipeline; import io.netty.channel.EventLoop; import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.ObjectUtil; @@ -109,7 +108,11 @@ public class EmbeddedChannel extends AbstractChannel { ChannelFuture future = loop.register(this); assert future.isDone(); - p.addLast(new LastInboundHandler()); + } + + @Override + protected final DefaultChannelPipeline newChannelPipeline() { + return new EmbeddedChannelPipeline(this); } @Override @@ -501,15 +504,19 @@ public class EmbeddedChannel extends AbstractChannel { } } - private final class LastInboundHandler extends ChannelInboundHandlerAdapter { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - inboundMessages().add(msg); + private final class EmbeddedChannelPipeline extends DefaultChannelPipeline { + public EmbeddedChannelPipeline(EmbeddedChannel channel) { + super(channel); } @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + protected void onUnhandledInboundException(Throwable cause) { recordException(cause); } + + @Override + protected void onUnhandledInboundMessage(Object msg) { + inboundMessages().add(msg); + } } } diff --git a/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java b/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java index 92bdaadc1b..69517603c1 100644 --- a/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java +++ b/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java @@ -197,9 +197,14 @@ public class PendingWriteQueueTest { assertNull(channel.readOutbound()); } + private static EmbeddedChannel newChannel() { + // Add a handler so we can access a ChannelHandlerContext via the ChannelPipeline. + return new EmbeddedChannel(new ChannelHandlerAdapter() { }); + } + @Test public void testRemoveAndFailAllReentrantFailAll() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext()); ChannelPromise promise = channel.newPromise(); @@ -224,7 +229,7 @@ public class PendingWriteQueueTest { @Test public void testRemoveAndFailAllReentrantWrite() { final List failOrder = Collections.synchronizedList(new ArrayList()); - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext()); ChannelPromise promise = channel.newPromise(); @@ -267,7 +272,7 @@ public class PendingWriteQueueTest { @Test public void testRemoveAndWriteAllReentrance() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext()); ChannelPromise promise = channel.newPromise(); @@ -296,7 +301,7 @@ public class PendingWriteQueueTest { // See https://github.com/netty/netty/issues/3967 @Test public void testCloseChannelOnCreation() { - EmbeddedChannel channel = new EmbeddedChannel(new ChannelInboundHandlerAdapter()); + EmbeddedChannel channel = newChannel(); ChannelHandlerContext context = channel.pipeline().firstContext(); channel.close().syncUninterruptibly();