diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 65c7a515d7..818bff5008 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -78,7 +78,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha this.parent = parent; id = newId(); unsafe = newUnsafe(); - pipeline = new DefaultChannelPipeline(this); + pipeline = newChannelPipeline(); } /** @@ -91,7 +91,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha this.parent = parent; this.id = id; unsafe = newUnsafe(); - pipeline = new DefaultChannelPipeline(this); + pipeline = newChannelPipeline(); } @Override @@ -107,6 +107,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return DefaultChannelId.newInstance(); } + /** + * Returns a new {@link DefaultChannelPipeline} instance. + */ + protected DefaultChannelPipeline newChannelPipeline() { + return new DefaultChannelPipeline(this); + } + @Override public boolean isWritable() { ChannelOutboundBuffer buf = unsafe.outboundBuffer(); diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 3babb42754..011600f14f 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -21,6 +21,7 @@ import io.netty.util.ResourceLeakDetector; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.FastThreadLocal; +import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; @@ -41,10 +42,13 @@ import java.util.concurrent.RejectedExecutionException; * The default {@link ChannelPipeline} implementation. It is usually created * by a {@link Channel} implementation when the {@link Channel} is created. */ -final class DefaultChannelPipeline implements ChannelPipeline { +public class DefaultChannelPipeline implements ChannelPipeline { static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class); + private static final String HEAD_NAME = generateName0(HeadContext.class); + private static final String TAIL_NAME = generateName0(TailContext.class); + private static final FastThreadLocal, String>> nameCaches = new FastThreadLocal, String>>() { @Override @@ -53,7 +57,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } }; - final AbstractChannel channel; + private final Channel channel; final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; @@ -80,11 +84,11 @@ final class DefaultChannelPipeline implements ChannelPipeline { */ private boolean registered; - public DefaultChannelPipeline(AbstractChannel channel) { - if (channel == null) { - throw new NullPointerException("channel"); - } - this.channel = channel; + // - protected as this should only be called from within the same package or if someone extends + // DefaultChannelPipeline. + // - Tied to AbstractChannel as we need to ensure that callHandlerAddedForAllHandlers() is correctly called. + protected DefaultChannelPipeline(AbstractChannel channel) { + this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); @@ -95,7 +99,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { tail.prev = head; } - Object touch(Object msg, AbstractChannelHandlerContext next) { + final Object touch(Object msg, AbstractChannelHandlerContext next) { return touch ? ReferenceCountUtil.touch(msg, next) : msg; } @@ -123,17 +127,17 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public Channel channel() { + public final Channel channel() { return channel; } @Override - public ChannelPipeline addFirst(String name, ChannelHandler handler) { + public final ChannelPipeline addFirst(String name, ChannelHandler handler) { return addFirst(null, name, handler); } @Override - public ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) { + public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; final EventExecutor executor; synchronized (this) { @@ -180,12 +184,12 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline addLast(String name, ChannelHandler handler) { + public final ChannelPipeline addLast(String name, ChannelHandler handler) { return addLast(null, name, handler); } @Override - public ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { + public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final EventExecutor executor; final AbstractChannelHandlerContext newCtx; synchronized (this) { @@ -231,12 +235,12 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) { + public final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) { return addBefore(null, baseName, name, handler); } @Override - public ChannelPipeline addBefore( + public final ChannelPipeline addBefore( EventExecutorGroup group, String baseName, String name, ChannelHandler handler) { final EventExecutor executor; final AbstractChannelHandlerContext newCtx; @@ -285,12 +289,12 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) { + public final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) { return addAfter(null, baseName, name, handler); } @Override - public ChannelPipeline addAfter( + public final ChannelPipeline addAfter( EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) { final EventExecutor executor; final AbstractChannelHandlerContext newCtx; @@ -335,12 +339,12 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline addFirst(ChannelHandler... handlers) { + public final ChannelPipeline addFirst(ChannelHandler... handlers) { return addFirst(null, handlers); } @Override - public ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) { + public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } @@ -364,12 +368,12 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline addLast(ChannelHandler... handlers) { + public final ChannelPipeline addLast(ChannelHandler... handlers) { return addLast(null, handlers); } @Override - public ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { + public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } @@ -413,19 +417,19 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline remove(ChannelHandler handler) { + public final ChannelPipeline remove(ChannelHandler handler) { remove(getContextOrDie(handler)); return this; } @Override - public ChannelHandler remove(String name) { + public final ChannelHandler remove(String name) { return remove(getContextOrDie(name)).handler(); } @SuppressWarnings("unchecked") @Override - public T remove(Class handlerType) { + public final T remove(Class handlerType) { return (T) remove(getContextOrDie(handlerType)).handler(); } @@ -468,7 +472,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelHandler removeFirst() { + public final ChannelHandler removeFirst() { if (head.next == tail) { throw new NoSuchElementException(); } @@ -476,7 +480,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelHandler removeLast() { + public final ChannelHandler removeLast() { if (head.next == tail) { throw new NoSuchElementException(); } @@ -484,19 +488,19 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) { + public final ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) { replace(getContextOrDie(oldHandler), newName, newHandler); return this; } @Override - public ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) { + public final ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) { return replace(getContextOrDie(oldName), newName, newHandler); } @Override @SuppressWarnings("unchecked") - public T replace( + public final T replace( Class oldHandlerType, String newName, ChannelHandler newHandler) { return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler); } @@ -631,7 +635,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelHandler first() { + public final ChannelHandler first() { ChannelHandlerContext first = firstContext(); if (first == null) { return null; @@ -640,7 +644,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelHandlerContext firstContext() { + public final ChannelHandlerContext firstContext() { AbstractChannelHandlerContext first = head.next; if (first == tail) { return null; @@ -649,7 +653,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelHandler last() { + public final ChannelHandler last() { AbstractChannelHandlerContext last = tail.prev; if (last == head) { return null; @@ -658,7 +662,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelHandlerContext lastContext() { + public final ChannelHandlerContext lastContext() { AbstractChannelHandlerContext last = tail.prev; if (last == head) { return null; @@ -667,7 +671,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelHandler get(String name) { + public final ChannelHandler get(String name) { ChannelHandlerContext ctx = context(name); if (ctx == null) { return null; @@ -678,7 +682,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { @SuppressWarnings("unchecked") @Override - public T get(Class handlerType) { + public final T get(Class handlerType) { ChannelHandlerContext ctx = context(handlerType); if (ctx == null) { return null; @@ -688,7 +692,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelHandlerContext context(String name) { + public final ChannelHandlerContext context(String name) { if (name == null) { throw new NullPointerException("name"); } @@ -697,7 +701,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelHandlerContext context(ChannelHandler handler) { + public final ChannelHandlerContext context(ChannelHandler handler) { if (handler == null) { throw new NullPointerException("handler"); } @@ -718,7 +722,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelHandlerContext context(Class handlerType) { + public final ChannelHandlerContext context(Class handlerType) { if (handlerType == null) { throw new NullPointerException("handlerType"); } @@ -736,7 +740,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public List names() { + public final List names() { List list = new ArrayList(); AbstractChannelHandlerContext ctx = head.next; for (;;) { @@ -749,7 +753,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public Map toMap() { + public final Map toMap() { Map map = new LinkedHashMap(); AbstractChannelHandlerContext ctx = head.next; for (;;) { @@ -762,7 +766,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public Iterator> iterator() { + public final Iterator> iterator() { return toMap().entrySet().iterator(); } @@ -770,7 +774,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { * Returns the {@link String} representation of this pipeline. */ @Override - public String toString() { + public final String toString() { StringBuilder buf = new StringBuilder() .append(StringUtil.simpleClassName(this)) .append('{'); @@ -798,13 +802,13 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline fireChannelRegistered() { + public final ChannelPipeline fireChannelRegistered() { head.fireChannelRegistered(); return this; } @Override - public ChannelPipeline fireChannelUnregistered() { + public final ChannelPipeline fireChannelUnregistered() { head.fireChannelUnregistered(); // Remove all handlers sequentially if channel is closed and unregistered. @@ -885,7 +889,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline fireChannelActive() { + public final ChannelPipeline fireChannelActive() { head.fireChannelActive(); if (channel.config().isAutoRead()) { @@ -896,31 +900,31 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline fireChannelInactive() { + public final ChannelPipeline fireChannelInactive() { head.fireChannelInactive(); return this; } @Override - public ChannelPipeline fireExceptionCaught(Throwable cause) { + public final ChannelPipeline fireExceptionCaught(Throwable cause) { head.fireExceptionCaught(cause); return this; } @Override - public ChannelPipeline fireUserEventTriggered(Object event) { + public final ChannelPipeline fireUserEventTriggered(Object event) { head.fireUserEventTriggered(event); return this; } @Override - public ChannelPipeline fireChannelRead(Object msg) { + public final ChannelPipeline fireChannelRead(Object msg) { head.fireChannelRead(msg); return this; } @Override - public ChannelPipeline fireChannelReadComplete() { + public final ChannelPipeline fireChannelReadComplete() { head.fireChannelReadComplete(); if (channel.config().isAutoRead()) { read(); @@ -929,125 +933,126 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline fireChannelWritabilityChanged() { + public final ChannelPipeline fireChannelWritabilityChanged() { head.fireChannelWritabilityChanged(); return this; } @Override - public ChannelFuture bind(SocketAddress localAddress) { + public final ChannelFuture bind(SocketAddress localAddress) { return tail.bind(localAddress); } @Override - public ChannelFuture connect(SocketAddress remoteAddress) { + public final ChannelFuture connect(SocketAddress remoteAddress) { return tail.connect(remoteAddress); } @Override - public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { + public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { return tail.connect(remoteAddress, localAddress); } @Override - public ChannelFuture disconnect() { + public final ChannelFuture disconnect() { return tail.disconnect(); } @Override - public ChannelFuture close() { + public final ChannelFuture close() { return tail.close(); } @Override - public ChannelFuture deregister() { + public final ChannelFuture deregister() { return tail.deregister(); } @Override - public ChannelPipeline flush() { + public final ChannelPipeline flush() { tail.flush(); return this; } @Override - public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { + public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); } @Override - public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { + public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return tail.connect(remoteAddress, promise); } @Override - public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + public final ChannelFuture connect( + SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { return tail.connect(remoteAddress, localAddress, promise); } @Override - public ChannelFuture disconnect(ChannelPromise promise) { + public final ChannelFuture disconnect(ChannelPromise promise) { return tail.disconnect(promise); } @Override - public ChannelFuture close(ChannelPromise promise) { + public final ChannelFuture close(ChannelPromise promise) { return tail.close(promise); } @Override - public ChannelFuture deregister(final ChannelPromise promise) { + public final ChannelFuture deregister(final ChannelPromise promise) { return tail.deregister(promise); } @Override - public ChannelPipeline read() { + public final ChannelPipeline read() { tail.read(); return this; } @Override - public ChannelFuture write(Object msg) { + public final ChannelFuture write(Object msg) { return tail.write(msg); } @Override - public ChannelFuture write(Object msg, ChannelPromise promise) { + public final ChannelFuture write(Object msg, ChannelPromise promise) { return tail.write(msg, promise); } @Override - public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { + public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { return tail.writeAndFlush(msg, promise); } @Override - public ChannelFuture writeAndFlush(Object msg) { + public final ChannelFuture writeAndFlush(Object msg) { return tail.writeAndFlush(msg); } @Override - public ChannelPromise newPromise() { + public final ChannelPromise newPromise() { return new DefaultChannelPromise(channel); } @Override - public ChannelProgressivePromise newProgressivePromise() { + public final ChannelProgressivePromise newProgressivePromise() { return new DefaultChannelProgressivePromise(channel); } @Override - public ChannelFuture newSucceededFuture() { + public final ChannelFuture newSucceededFuture() { return succeededFuture; } @Override - public ChannelFuture newFailedFuture(Throwable cause) { + public final ChannelFuture newFailedFuture(Throwable cause) { return new FailedChannelFuture(channel, null, cause); } @Override - public ChannelPromise voidPromise() { + public final ChannelPromise voidPromise() { return voidPromise; } @@ -1096,9 +1101,9 @@ final class DefaultChannelPipeline implements ChannelPipeline { } /** - * Should be called before {@link #fireChannelRegistered()} is called the first time. + * Must be called before {@link #fireChannelRegistered()} is called the first time. */ - void callHandlerAddedForAllHandlers() { + final void callHandlerAddedForAllHandlers() { // This should only called from within the EventLoop. assert channel.eventLoop().inEventLoop(); @@ -1150,10 +1155,38 @@ final class DefaultChannelPipeline implements ChannelPipeline { return eventExecutor; } - // A special catch-all handler that handles both bytes and messages. - static final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { + /** + * Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user + * in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}. + */ + protected void onUnhandledInboundException(Throwable cause) { + try { + logger.warn( + "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " + + "It usually means the last handler in the pipeline did not handle the exception.", + cause); + } finally { + ReferenceCountUtil.release(cause); + } + } - private static final String TAIL_NAME = generateName0(TailContext.class); + /** + * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user + * in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible + * to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point. + */ + protected void onUnhandledInboundMessage(Object msg) { + try { + logger.debug( + "Discarded inbound message {} that reached at the tail of the pipeline. " + + "Please check your pipeline configuration.", msg); + } finally { + ReferenceCountUtil.release(msg); + } + } + + // A special catch-all handler that handles both bytes and messages. + final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { TailContext(DefaultChannelPipeline pipeline) { super(pipeline, null, TAIL_NAME, true, false); @@ -1195,36 +1228,22 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - try { - logger.warn( - "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " + - "It usually means the last handler in the pipeline did not handle the exception.", - cause); - } finally { - ReferenceCountUtil.release(cause); - } + onUnhandledInboundException(cause); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - try { - logger.debug( - "Discarded inbound message {} that reached at the tail of the pipeline. " + - "Please check your pipeline configuration.", msg); - } finally { - ReferenceCountUtil.release(msg); - } + onUnhandledInboundMessage(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } } - static final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler { + static final class HeadContext extends AbstractChannelHandlerContext + implements ChannelOutboundHandler { - private static final String HEAD_NAME = generateName0(HeadContext.class); - - protected final Unsafe unsafe; + private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java index 9d2cecc9fd..1dee69e8ed 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java @@ -21,15 +21,14 @@ import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelId; -import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelConfig; +import io.netty.channel.DefaultChannelPipeline; import io.netty.channel.EventLoop; import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.ObjectUtil; @@ -154,7 +153,11 @@ public class EmbeddedChannel extends AbstractChannel { ChannelFuture future = loop.register(this); assert future.isDone(); - p.addLast(new LastInboundHandler()); + } + + @Override + protected final DefaultChannelPipeline newChannelPipeline() { + return new EmbeddedChannelPipeline(this); } @Override @@ -548,15 +551,19 @@ public class EmbeddedChannel extends AbstractChannel { } } - private final class LastInboundHandler extends ChannelInboundHandlerAdapter { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - inboundMessages().add(msg); + private final class EmbeddedChannelPipeline extends DefaultChannelPipeline { + public EmbeddedChannelPipeline(EmbeddedChannel channel) { + super(channel); } @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + protected void onUnhandledInboundException(Throwable cause) { recordException(cause); } + + @Override + protected void onUnhandledInboundMessage(Object msg) { + inboundMessages().add(msg); + } } } diff --git a/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java b/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java index 1f592bf920..03fbd1c37d 100644 --- a/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java +++ b/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java @@ -197,9 +197,14 @@ public class PendingWriteQueueTest { assertNull(channel.readOutbound()); } + private static EmbeddedChannel newChannel() { + // Add a handler so we can access a ChannelHandlerContext via the ChannelPipeline. + return new EmbeddedChannel(new ChannelHandlerAdapter() { }); + } + @Test public void testRemoveAndFailAllReentrantFailAll() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext()); ChannelPromise promise = channel.newPromise(); @@ -224,7 +229,7 @@ public class PendingWriteQueueTest { @Test public void testRemoveAndFailAllReentrantWrite() { final List failOrder = Collections.synchronizedList(new ArrayList()); - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext()); ChannelPromise promise = channel.newPromise(); @@ -267,7 +272,7 @@ public class PendingWriteQueueTest { @Test public void testRemoveAndWriteAllReentrance() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext()); ChannelPromise promise = channel.newPromise(); @@ -296,7 +301,7 @@ public class PendingWriteQueueTest { // See https://github.com/netty/netty/issues/3967 @Test public void testCloseChannelOnCreation() { - EmbeddedChannel channel = new EmbeddedChannel(new ChannelInboundHandlerAdapter()); + EmbeddedChannel channel = newChannel(); ChannelHandlerContext context = channel.pipeline().firstContext(); channel.close().syncUninterruptibly();