[#4906] Ensure addLast(...) works as expected in EmbeddedChannel

Motivation:

If the user will use addLast(...) on the ChannelPipeline of EmbeddedChannel after its constructor was run it will break the EmbeddedChannel as it will not be able to collect inbound messages and exceptions.

Modifications:

Ensure addLast(...) work as expected by move the logic of handling messages and exceptions ti protected methods of DefaultChannelPipeline and use a custom implementation for EmbeddedChannel

Result:

addLast(...) works as expected when using EmbeddedChannel.
This commit is contained in:
Norman Maurer 2016-05-20 09:07:53 +02:00
parent 2a14f74979
commit 7547a448e0
4 changed files with 150 additions and 112 deletions

View File

@ -78,7 +78,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
this.parent = parent; this.parent = parent;
id = newId(); id = newId();
unsafe = newUnsafe(); unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this); pipeline = newChannelPipeline();
} }
/** /**
@ -91,7 +91,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
this.parent = parent; this.parent = parent;
this.id = id; this.id = id;
unsafe = newUnsafe(); unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this); pipeline = newChannelPipeline();
} }
@Override @Override
@ -107,6 +107,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return DefaultChannelId.newInstance(); return DefaultChannelId.newInstance();
} }
/**
* Returns a new {@link DefaultChannelPipeline} instance.
*/
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
@Override @Override
public boolean isWritable() { public boolean isWritable() {
ChannelOutboundBuffer buf = unsafe.outboundBuffer(); ChannelOutboundBuffer buf = unsafe.outboundBuffer();

View File

@ -21,6 +21,7 @@ import io.netty.util.ResourceLeakDetector;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
@ -41,10 +42,13 @@ import java.util.concurrent.RejectedExecutionException;
* The default {@link ChannelPipeline} implementation. It is usually created * The default {@link ChannelPipeline} implementation. It is usually created
* by a {@link Channel} implementation when the {@link Channel} is created. * by a {@link Channel} implementation when the {@link Channel} is created.
*/ */
final class DefaultChannelPipeline implements ChannelPipeline { public class DefaultChannelPipeline implements ChannelPipeline {
static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class); static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
private static final String HEAD_NAME = generateName0(HeadContext.class);
private static final String TAIL_NAME = generateName0(TailContext.class);
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches = private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
new FastThreadLocal<Map<Class<?>, String>>() { new FastThreadLocal<Map<Class<?>, String>>() {
@Override @Override
@ -53,7 +57,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
}; };
final AbstractChannel channel; private final Channel channel;
final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail; final AbstractChannelHandlerContext tail;
@ -80,11 +84,11 @@ final class DefaultChannelPipeline implements ChannelPipeline {
*/ */
private boolean registered; private boolean registered;
public DefaultChannelPipeline(AbstractChannel channel) { // - protected as this should only be called from within the same package or if someone extends
if (channel == null) { // DefaultChannelPipeline.
throw new NullPointerException("channel"); // - Tied to AbstractChannel as we need to ensure that callHandlerAddedForAllHandlers() is correctly called.
} protected DefaultChannelPipeline(AbstractChannel channel) {
this.channel = channel; this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null); succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true); voidPromise = new VoidChannelPromise(channel, true);
@ -95,7 +99,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
tail.prev = head; tail.prev = head;
} }
Object touch(Object msg, AbstractChannelHandlerContext next) { final Object touch(Object msg, AbstractChannelHandlerContext next) {
return touch ? ReferenceCountUtil.touch(msg, next) : msg; return touch ? ReferenceCountUtil.touch(msg, next) : msg;
} }
@ -123,17 +127,17 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public Channel channel() { public final Channel channel() {
return channel; return channel;
} }
@Override @Override
public ChannelPipeline addFirst(String name, ChannelHandler handler) { public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
return addFirst(null, name, handler); return addFirst(null, name, handler);
} }
@Override @Override
public ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) { public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx; final AbstractChannelHandlerContext newCtx;
final EventExecutor executor; final EventExecutor executor;
synchronized (this) { synchronized (this) {
@ -180,12 +184,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public ChannelPipeline addLast(String name, ChannelHandler handler) { public final ChannelPipeline addLast(String name, ChannelHandler handler) {
return addLast(null, name, handler); return addLast(null, name, handler);
} }
@Override @Override
public ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final EventExecutor executor; final EventExecutor executor;
final AbstractChannelHandlerContext newCtx; final AbstractChannelHandlerContext newCtx;
synchronized (this) { synchronized (this) {
@ -231,12 +235,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) { public final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
return addBefore(null, baseName, name, handler); return addBefore(null, baseName, name, handler);
} }
@Override @Override
public ChannelPipeline addBefore( public final ChannelPipeline addBefore(
EventExecutorGroup group, String baseName, String name, ChannelHandler handler) { EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
final EventExecutor executor; final EventExecutor executor;
final AbstractChannelHandlerContext newCtx; final AbstractChannelHandlerContext newCtx;
@ -285,12 +289,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) { public final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
return addAfter(null, baseName, name, handler); return addAfter(null, baseName, name, handler);
} }
@Override @Override
public ChannelPipeline addAfter( public final ChannelPipeline addAfter(
EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) { EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) {
final EventExecutor executor; final EventExecutor executor;
final AbstractChannelHandlerContext newCtx; final AbstractChannelHandlerContext newCtx;
@ -335,12 +339,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public ChannelPipeline addFirst(ChannelHandler... handlers) { public final ChannelPipeline addFirst(ChannelHandler... handlers) {
return addFirst(null, handlers); return addFirst(null, handlers);
} }
@Override @Override
public ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) { public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) { if (handlers == null) {
throw new NullPointerException("handlers"); throw new NullPointerException("handlers");
} }
@ -364,12 +368,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public ChannelPipeline addLast(ChannelHandler... handlers) { public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers); return addLast(null, handlers);
} }
@Override @Override
public ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) { if (handlers == null) {
throw new NullPointerException("handlers"); throw new NullPointerException("handlers");
} }
@ -413,19 +417,19 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public ChannelPipeline remove(ChannelHandler handler) { public final ChannelPipeline remove(ChannelHandler handler) {
remove(getContextOrDie(handler)); remove(getContextOrDie(handler));
return this; return this;
} }
@Override @Override
public ChannelHandler remove(String name) { public final ChannelHandler remove(String name) {
return remove(getContextOrDie(name)).handler(); return remove(getContextOrDie(name)).handler();
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public <T extends ChannelHandler> T remove(Class<T> handlerType) { public final <T extends ChannelHandler> T remove(Class<T> handlerType) {
return (T) remove(getContextOrDie(handlerType)).handler(); return (T) remove(getContextOrDie(handlerType)).handler();
} }
@ -468,7 +472,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public ChannelHandler removeFirst() { public final ChannelHandler removeFirst() {
if (head.next == tail) { if (head.next == tail) {
throw new NoSuchElementException(); throw new NoSuchElementException();
} }
@ -476,7 +480,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public ChannelHandler removeLast() { public final ChannelHandler removeLast() {
if (head.next == tail) { if (head.next == tail) {
throw new NoSuchElementException(); throw new NoSuchElementException();
} }
@ -484,19 +488,19 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) { public final ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
replace(getContextOrDie(oldHandler), newName, newHandler); replace(getContextOrDie(oldHandler), newName, newHandler);
return this; return this;
} }
@Override @Override
public ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) { public final ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
return replace(getContextOrDie(oldName), newName, newHandler); return replace(getContextOrDie(oldName), newName, newHandler);
} }
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <T extends ChannelHandler> T replace( public final <T extends ChannelHandler> T replace(
Class<T> oldHandlerType, String newName, ChannelHandler newHandler) { Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler); return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
} }
@ -631,7 +635,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public ChannelHandler first() { public final ChannelHandler first() {
ChannelHandlerContext first = firstContext(); ChannelHandlerContext first = firstContext();
if (first == null) { if (first == null) {
return null; return null;
@ -640,7 +644,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public ChannelHandlerContext firstContext() { public final ChannelHandlerContext firstContext() {
AbstractChannelHandlerContext first = head.next; AbstractChannelHandlerContext first = head.next;
if (first == tail) { if (first == tail) {
return null; return null;
@ -649,7 +653,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public ChannelHandler last() { public final ChannelHandler last() {
AbstractChannelHandlerContext last = tail.prev; AbstractChannelHandlerContext last = tail.prev;
if (last == head) { if (last == head) {
return null; return null;
@ -658,7 +662,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public ChannelHandlerContext lastContext() { public final ChannelHandlerContext lastContext() {
AbstractChannelHandlerContext last = tail.prev; AbstractChannelHandlerContext last = tail.prev;
if (last == head) { if (last == head) {
return null; return null;
@ -667,7 +671,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public ChannelHandler get(String name) { public final ChannelHandler get(String name) {
ChannelHandlerContext ctx = context(name); ChannelHandlerContext ctx = context(name);
if (ctx == null) { if (ctx == null) {
return null; return null;
@ -678,7 +682,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public <T extends ChannelHandler> T get(Class<T> handlerType) { public final <T extends ChannelHandler> T get(Class<T> handlerType) {
ChannelHandlerContext ctx = context(handlerType); ChannelHandlerContext ctx = context(handlerType);
if (ctx == null) { if (ctx == null) {
return null; return null;
@ -688,7 +692,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public ChannelHandlerContext context(String name) { public final ChannelHandlerContext context(String name) {
if (name == null) { if (name == null) {
throw new NullPointerException("name"); throw new NullPointerException("name");
} }
@ -697,7 +701,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public ChannelHandlerContext context(ChannelHandler handler) { public final ChannelHandlerContext context(ChannelHandler handler) {
if (handler == null) { if (handler == null) {
throw new NullPointerException("handler"); throw new NullPointerException("handler");
} }
@ -718,7 +722,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) { public final ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
if (handlerType == null) { if (handlerType == null) {
throw new NullPointerException("handlerType"); throw new NullPointerException("handlerType");
} }
@ -736,7 +740,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public List<String> names() { public final List<String> names() {
List<String> list = new ArrayList<String>(); List<String> list = new ArrayList<String>();
AbstractChannelHandlerContext ctx = head.next; AbstractChannelHandlerContext ctx = head.next;
for (;;) { for (;;) {
@ -749,7 +753,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public Map<String, ChannelHandler> toMap() { public final Map<String, ChannelHandler> toMap() {
Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>(); Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
AbstractChannelHandlerContext ctx = head.next; AbstractChannelHandlerContext ctx = head.next;
for (;;) { for (;;) {
@ -762,7 +766,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public Iterator<Map.Entry<String, ChannelHandler>> iterator() { public final Iterator<Map.Entry<String, ChannelHandler>> iterator() {
return toMap().entrySet().iterator(); return toMap().entrySet().iterator();
} }
@ -770,7 +774,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
* Returns the {@link String} representation of this pipeline. * Returns the {@link String} representation of this pipeline.
*/ */
@Override @Override
public String toString() { public final String toString() {
StringBuilder buf = new StringBuilder() StringBuilder buf = new StringBuilder()
.append(StringUtil.simpleClassName(this)) .append(StringUtil.simpleClassName(this))
.append('{'); .append('{');
@ -798,13 +802,13 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public ChannelPipeline fireChannelRegistered() { public final ChannelPipeline fireChannelRegistered() {
head.fireChannelRegistered(); head.fireChannelRegistered();
return this; return this;
} }
@Override @Override
public ChannelPipeline fireChannelUnregistered() { public final ChannelPipeline fireChannelUnregistered() {
head.fireChannelUnregistered(); head.fireChannelUnregistered();
// Remove all handlers sequentially if channel is closed and unregistered. // Remove all handlers sequentially if channel is closed and unregistered.
@ -885,7 +889,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public ChannelPipeline fireChannelActive() { public final ChannelPipeline fireChannelActive() {
head.fireChannelActive(); head.fireChannelActive();
if (channel.config().isAutoRead()) { if (channel.config().isAutoRead()) {
@ -896,31 +900,31 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public ChannelPipeline fireChannelInactive() { public final ChannelPipeline fireChannelInactive() {
head.fireChannelInactive(); head.fireChannelInactive();
return this; return this;
} }
@Override @Override
public ChannelPipeline fireExceptionCaught(Throwable cause) { public final ChannelPipeline fireExceptionCaught(Throwable cause) {
head.fireExceptionCaught(cause); head.fireExceptionCaught(cause);
return this; return this;
} }
@Override @Override
public ChannelPipeline fireUserEventTriggered(Object event) { public final ChannelPipeline fireUserEventTriggered(Object event) {
head.fireUserEventTriggered(event); head.fireUserEventTriggered(event);
return this; return this;
} }
@Override @Override
public ChannelPipeline fireChannelRead(Object msg) { public final ChannelPipeline fireChannelRead(Object msg) {
head.fireChannelRead(msg); head.fireChannelRead(msg);
return this; return this;
} }
@Override @Override
public ChannelPipeline fireChannelReadComplete() { public final ChannelPipeline fireChannelReadComplete() {
head.fireChannelReadComplete(); head.fireChannelReadComplete();
if (channel.config().isAutoRead()) { if (channel.config().isAutoRead()) {
read(); read();
@ -929,125 +933,126 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public ChannelPipeline fireChannelWritabilityChanged() { public final ChannelPipeline fireChannelWritabilityChanged() {
head.fireChannelWritabilityChanged(); head.fireChannelWritabilityChanged();
return this; return this;
} }
@Override @Override
public ChannelFuture bind(SocketAddress localAddress) { public final ChannelFuture bind(SocketAddress localAddress) {
return tail.bind(localAddress); return tail.bind(localAddress);
} }
@Override @Override
public ChannelFuture connect(SocketAddress remoteAddress) { public final ChannelFuture connect(SocketAddress remoteAddress) {
return tail.connect(remoteAddress); return tail.connect(remoteAddress);
} }
@Override @Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return tail.connect(remoteAddress, localAddress); return tail.connect(remoteAddress, localAddress);
} }
@Override @Override
public ChannelFuture disconnect() { public final ChannelFuture disconnect() {
return tail.disconnect(); return tail.disconnect();
} }
@Override @Override
public ChannelFuture close() { public final ChannelFuture close() {
return tail.close(); return tail.close();
} }
@Override @Override
public ChannelFuture deregister() { public final ChannelFuture deregister() {
return tail.deregister(); return tail.deregister();
} }
@Override @Override
public ChannelPipeline flush() { public final ChannelPipeline flush() {
tail.flush(); tail.flush();
return this; return this;
} }
@Override @Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise); return tail.bind(localAddress, promise);
} }
@Override @Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise); return tail.connect(remoteAddress, promise);
} }
@Override @Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { public final ChannelFuture connect(
SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, localAddress, promise); return tail.connect(remoteAddress, localAddress, promise);
} }
@Override @Override
public ChannelFuture disconnect(ChannelPromise promise) { public final ChannelFuture disconnect(ChannelPromise promise) {
return tail.disconnect(promise); return tail.disconnect(promise);
} }
@Override @Override
public ChannelFuture close(ChannelPromise promise) { public final ChannelFuture close(ChannelPromise promise) {
return tail.close(promise); return tail.close(promise);
} }
@Override @Override
public ChannelFuture deregister(final ChannelPromise promise) { public final ChannelFuture deregister(final ChannelPromise promise) {
return tail.deregister(promise); return tail.deregister(promise);
} }
@Override @Override
public ChannelPipeline read() { public final ChannelPipeline read() {
tail.read(); tail.read();
return this; return this;
} }
@Override @Override
public ChannelFuture write(Object msg) { public final ChannelFuture write(Object msg) {
return tail.write(msg); return tail.write(msg);
} }
@Override @Override
public ChannelFuture write(Object msg, ChannelPromise promise) { public final ChannelFuture write(Object msg, ChannelPromise promise) {
return tail.write(msg, promise); return tail.write(msg, promise);
} }
@Override @Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
return tail.writeAndFlush(msg, promise); return tail.writeAndFlush(msg, promise);
} }
@Override @Override
public ChannelFuture writeAndFlush(Object msg) { public final ChannelFuture writeAndFlush(Object msg) {
return tail.writeAndFlush(msg); return tail.writeAndFlush(msg);
} }
@Override @Override
public ChannelPromise newPromise() { public final ChannelPromise newPromise() {
return new DefaultChannelPromise(channel); return new DefaultChannelPromise(channel);
} }
@Override @Override
public ChannelProgressivePromise newProgressivePromise() { public final ChannelProgressivePromise newProgressivePromise() {
return new DefaultChannelProgressivePromise(channel); return new DefaultChannelProgressivePromise(channel);
} }
@Override @Override
public ChannelFuture newSucceededFuture() { public final ChannelFuture newSucceededFuture() {
return succeededFuture; return succeededFuture;
} }
@Override @Override
public ChannelFuture newFailedFuture(Throwable cause) { public final ChannelFuture newFailedFuture(Throwable cause) {
return new FailedChannelFuture(channel, null, cause); return new FailedChannelFuture(channel, null, cause);
} }
@Override @Override
public ChannelPromise voidPromise() { public final ChannelPromise voidPromise() {
return voidPromise; return voidPromise;
} }
@ -1096,9 +1101,9 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
/** /**
* Should be called before {@link #fireChannelRegistered()} is called the first time. * Must be called before {@link #fireChannelRegistered()} is called the first time.
*/ */
void callHandlerAddedForAllHandlers() { final void callHandlerAddedForAllHandlers() {
// This should only called from within the EventLoop. // This should only called from within the EventLoop.
assert channel.eventLoop().inEventLoop(); assert channel.eventLoop().inEventLoop();
@ -1150,10 +1155,38 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return eventExecutor; return eventExecutor;
} }
// A special catch-all handler that handles both bytes and messages. /**
static final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { * Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user
* in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}.
*/
protected void onUnhandledInboundException(Throwable cause) {
try {
logger.warn(
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
"It usually means the last handler in the pipeline did not handle the exception.",
cause);
} finally {
ReferenceCountUtil.release(cause);
}
}
private static final String TAIL_NAME = generateName0(TailContext.class); /**
* Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user
* in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible
* to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point.
*/
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
// A special catch-all handler that handles both bytes and messages.
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) { TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false); super(pipeline, null, TAIL_NAME, true, false);
@ -1195,36 +1228,22 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
try { onUnhandledInboundException(cause);
logger.warn(
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
"It usually means the last handler in the pipeline did not handle the exception.",
cause);
} finally {
ReferenceCountUtil.release(cause);
}
} }
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try { onUnhandledInboundMessage(msg);
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
} }
@Override @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
} }
static final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler { static final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler {
private static final String HEAD_NAME = generateName0(HeadContext.class); private final Unsafe unsafe;
protected final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) { HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true); super(pipeline, null, HEAD_NAME, false, true);

View File

@ -21,15 +21,14 @@ import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId; import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelConfig; import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.DefaultChannelPipeline;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.ObjectUtil;
@ -154,7 +153,11 @@ public class EmbeddedChannel extends AbstractChannel {
ChannelFuture future = loop.register(this); ChannelFuture future = loop.register(this);
assert future.isDone(); assert future.isDone();
p.addLast(new LastInboundHandler()); }
@Override
protected final DefaultChannelPipeline newChannelPipeline() {
return new EmbeddedChannelPipeline(this);
} }
@Override @Override
@ -548,15 +551,19 @@ public class EmbeddedChannel extends AbstractChannel {
} }
} }
private final class LastInboundHandler extends ChannelInboundHandlerAdapter { private final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
@Override public EmbeddedChannelPipeline(EmbeddedChannel channel) {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { super(channel);
inboundMessages().add(msg);
} }
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { protected void onUnhandledInboundException(Throwable cause) {
recordException(cause); recordException(cause);
} }
@Override
protected void onUnhandledInboundMessage(Object msg) {
inboundMessages().add(msg);
}
} }
} }

View File

@ -197,9 +197,14 @@ public class PendingWriteQueueTest {
assertNull(channel.readOutbound()); assertNull(channel.readOutbound());
} }
private static EmbeddedChannel newChannel() {
// Add a handler so we can access a ChannelHandlerContext via the ChannelPipeline.
return new EmbeddedChannel(new ChannelHandlerAdapter() { });
}
@Test @Test
public void testRemoveAndFailAllReentrantFailAll() { public void testRemoveAndFailAllReentrantFailAll() {
EmbeddedChannel channel = new EmbeddedChannel(); EmbeddedChannel channel = newChannel();
final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext()); final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext());
ChannelPromise promise = channel.newPromise(); ChannelPromise promise = channel.newPromise();
@ -224,7 +229,7 @@ public class PendingWriteQueueTest {
@Test @Test
public void testRemoveAndFailAllReentrantWrite() { public void testRemoveAndFailAllReentrantWrite() {
final List<Integer> failOrder = Collections.synchronizedList(new ArrayList<Integer>()); final List<Integer> failOrder = Collections.synchronizedList(new ArrayList<Integer>());
EmbeddedChannel channel = new EmbeddedChannel(); EmbeddedChannel channel = newChannel();
final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext()); final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext());
ChannelPromise promise = channel.newPromise(); ChannelPromise promise = channel.newPromise();
@ -267,7 +272,7 @@ public class PendingWriteQueueTest {
@Test @Test
public void testRemoveAndWriteAllReentrance() { public void testRemoveAndWriteAllReentrance() {
EmbeddedChannel channel = new EmbeddedChannel(); EmbeddedChannel channel = newChannel();
final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext()); final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext());
ChannelPromise promise = channel.newPromise(); ChannelPromise promise = channel.newPromise();
@ -296,7 +301,7 @@ public class PendingWriteQueueTest {
// See https://github.com/netty/netty/issues/3967 // See https://github.com/netty/netty/issues/3967
@Test @Test
public void testCloseChannelOnCreation() { public void testCloseChannelOnCreation() {
EmbeddedChannel channel = new EmbeddedChannel(new ChannelInboundHandlerAdapter()); EmbeddedChannel channel = newChannel();
ChannelHandlerContext context = channel.pipeline().firstContext(); ChannelHandlerContext context = channel.pipeline().firstContext();
channel.close().syncUninterruptibly(); channel.close().syncUninterruptibly();