[#4906] Ensure addLast(...) works as expected in EmbeddedChannel
Motivation: If the user will use addLast(...) on the ChannelPipeline of EmbeddedChannel after its constructor was run it will break the EmbeddedChannel as it will not be able to collect inbound messages and exceptions. Modifications: Ensure addLast(...) work as expected by move the logic of handling messages and exceptions ti protected methods of DefaultChannelPipeline and use a custom implementation for EmbeddedChannel Result: addLast(...) works as expected when using EmbeddedChannel.
This commit is contained in:
parent
09c97df642
commit
f2577f7361
@ -80,7 +80,14 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
protected AbstractChannel(Channel parent) {
|
protected AbstractChannel(Channel parent) {
|
||||||
this.parent = parent;
|
this.parent = parent;
|
||||||
unsafe = newUnsafe();
|
unsafe = newUnsafe();
|
||||||
pipeline = new DefaultChannelPipeline(this);
|
pipeline = newChannelPipeline();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a new {@link DefaultChannelPipeline} instance.
|
||||||
|
*/
|
||||||
|
protected DefaultChannelPipeline newChannelPipeline() {
|
||||||
|
return new DefaultChannelPipeline(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -20,6 +20,7 @@ import io.netty.util.ReferenceCountUtil;
|
|||||||
import io.netty.util.concurrent.EventExecutor;
|
import io.netty.util.concurrent.EventExecutor;
|
||||||
import io.netty.util.concurrent.EventExecutorGroup;
|
import io.netty.util.concurrent.EventExecutorGroup;
|
||||||
import io.netty.util.concurrent.FastThreadLocal;
|
import io.netty.util.concurrent.FastThreadLocal;
|
||||||
|
import io.netty.util.internal.ObjectUtil;
|
||||||
import io.netty.util.internal.OneTimeTask;
|
import io.netty.util.internal.OneTimeTask;
|
||||||
import io.netty.util.internal.StringUtil;
|
import io.netty.util.internal.StringUtil;
|
||||||
import io.netty.util.internal.logging.InternalLogger;
|
import io.netty.util.internal.logging.InternalLogger;
|
||||||
@ -40,10 +41,13 @@ import java.util.concurrent.RejectedExecutionException;
|
|||||||
* The default {@link ChannelPipeline} implementation. It is usually created
|
* The default {@link ChannelPipeline} implementation. It is usually created
|
||||||
* by a {@link Channel} implementation when the {@link Channel} is created.
|
* by a {@link Channel} implementation when the {@link Channel} is created.
|
||||||
*/
|
*/
|
||||||
final class DefaultChannelPipeline implements ChannelPipeline {
|
public class DefaultChannelPipeline implements ChannelPipeline {
|
||||||
|
|
||||||
static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
|
static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
|
||||||
|
|
||||||
|
private static final String HEAD_NAME = generateName0(HeadContext.class);
|
||||||
|
private static final String TAIL_NAME = generateName0(TailContext.class);
|
||||||
|
|
||||||
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
|
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
|
||||||
new FastThreadLocal<Map<Class<?>, String>>() {
|
new FastThreadLocal<Map<Class<?>, String>>() {
|
||||||
@Override
|
@Override
|
||||||
@ -52,7 +56,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
final AbstractChannel channel;
|
private final Channel channel;
|
||||||
|
|
||||||
final AbstractChannelHandlerContext head;
|
final AbstractChannelHandlerContext head;
|
||||||
final AbstractChannelHandlerContext tail;
|
final AbstractChannelHandlerContext tail;
|
||||||
@ -75,11 +79,11 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
*/
|
*/
|
||||||
private boolean registered;
|
private boolean registered;
|
||||||
|
|
||||||
public DefaultChannelPipeline(AbstractChannel channel) {
|
// - protected as this should only be called from within the same package or if someone extends
|
||||||
if (channel == null) {
|
// DefaultChannelPipeline.
|
||||||
throw new NullPointerException("channel");
|
// - Tied to AbstractChannel as we need to ensure that callHandlerAddedForAllHandlers() is correctly called.
|
||||||
}
|
protected DefaultChannelPipeline(AbstractChannel channel) {
|
||||||
this.channel = channel;
|
this.channel = ObjectUtil.checkNotNull(channel, "channel");
|
||||||
|
|
||||||
tail = new TailContext(this);
|
tail = new TailContext(this);
|
||||||
head = new HeadContext(this);
|
head = new HeadContext(this);
|
||||||
@ -112,17 +116,17 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Channel channel() {
|
public final Channel channel() {
|
||||||
return channel;
|
return channel;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline addFirst(String name, ChannelHandler handler) {
|
public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
|
||||||
return addFirst(null, name, handler);
|
return addFirst(null, name, handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline addFirst(EventExecutorGroup group, final String name, ChannelHandler handler) {
|
public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
|
||||||
final AbstractChannelHandlerContext newCtx;
|
final AbstractChannelHandlerContext newCtx;
|
||||||
final EventExecutor executor;
|
final EventExecutor executor;
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
@ -165,12 +169,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline addLast(String name, ChannelHandler handler) {
|
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
|
||||||
return addLast(null, name, handler);
|
return addLast(null, name, handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
|
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
|
||||||
final EventExecutor executor;
|
final EventExecutor executor;
|
||||||
final AbstractChannelHandlerContext newCtx;
|
final AbstractChannelHandlerContext newCtx;
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
@ -212,13 +216,13 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
|
public final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
|
||||||
return addBefore(null, baseName, name, handler);
|
return addBefore(null, baseName, name, handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline addBefore(
|
public final ChannelPipeline addBefore(
|
||||||
EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) {
|
EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
|
||||||
final EventExecutor executor;
|
final EventExecutor executor;
|
||||||
final AbstractChannelHandlerContext newCtx;
|
final AbstractChannelHandlerContext newCtx;
|
||||||
final AbstractChannelHandlerContext ctx;
|
final AbstractChannelHandlerContext ctx;
|
||||||
@ -262,12 +266,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
|
public final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
|
||||||
return addAfter(null, baseName, name, handler);
|
return addAfter(null, baseName, name, handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline addAfter(
|
public final ChannelPipeline addAfter(
|
||||||
EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) {
|
EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) {
|
||||||
final EventExecutor executor;
|
final EventExecutor executor;
|
||||||
final AbstractChannelHandlerContext newCtx;
|
final AbstractChannelHandlerContext newCtx;
|
||||||
@ -312,12 +316,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline addFirst(ChannelHandler... handlers) {
|
public final ChannelPipeline addFirst(ChannelHandler... handlers) {
|
||||||
return addFirst(null, handlers);
|
return addFirst(null, handlers);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
|
public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
|
||||||
if (handlers == null) {
|
if (handlers == null) {
|
||||||
throw new NullPointerException("handlers");
|
throw new NullPointerException("handlers");
|
||||||
}
|
}
|
||||||
@ -341,12 +345,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline addLast(ChannelHandler... handlers) {
|
public final ChannelPipeline addLast(ChannelHandler... handlers) {
|
||||||
return addLast(null, handlers);
|
return addLast(null, handlers);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
|
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
|
||||||
if (handlers == null) {
|
if (handlers == null) {
|
||||||
throw new NullPointerException("handlers");
|
throw new NullPointerException("handlers");
|
||||||
}
|
}
|
||||||
@ -390,19 +394,19 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline remove(ChannelHandler handler) {
|
public final ChannelPipeline remove(ChannelHandler handler) {
|
||||||
remove(getContextOrDie(handler));
|
remove(getContextOrDie(handler));
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandler remove(String name) {
|
public final ChannelHandler remove(String name) {
|
||||||
return remove(getContextOrDie(name)).handler();
|
return remove(getContextOrDie(name)).handler();
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public <T extends ChannelHandler> T remove(Class<T> handlerType) {
|
public final <T extends ChannelHandler> T remove(Class<T> handlerType) {
|
||||||
return (T) remove(getContextOrDie(handlerType)).handler();
|
return (T) remove(getContextOrDie(handlerType)).handler();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -445,7 +449,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandler removeFirst() {
|
public final ChannelHandler removeFirst() {
|
||||||
if (head.next == tail) {
|
if (head.next == tail) {
|
||||||
throw new NoSuchElementException();
|
throw new NoSuchElementException();
|
||||||
}
|
}
|
||||||
@ -453,7 +457,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandler removeLast() {
|
public final ChannelHandler removeLast() {
|
||||||
if (head.next == tail) {
|
if (head.next == tail) {
|
||||||
throw new NoSuchElementException();
|
throw new NoSuchElementException();
|
||||||
}
|
}
|
||||||
@ -461,19 +465,19 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
|
public final ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
|
||||||
replace(getContextOrDie(oldHandler), newName, newHandler);
|
replace(getContextOrDie(oldHandler), newName, newHandler);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
|
public final ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
|
||||||
return replace(getContextOrDie(oldName), newName, newHandler);
|
return replace(getContextOrDie(oldName), newName, newHandler);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public <T extends ChannelHandler> T replace(
|
public final <T extends ChannelHandler> T replace(
|
||||||
Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
|
Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
|
||||||
return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
|
return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
|
||||||
}
|
}
|
||||||
@ -604,7 +608,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandler first() {
|
public final ChannelHandler first() {
|
||||||
ChannelHandlerContext first = firstContext();
|
ChannelHandlerContext first = firstContext();
|
||||||
if (first == null) {
|
if (first == null) {
|
||||||
return null;
|
return null;
|
||||||
@ -613,7 +617,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandlerContext firstContext() {
|
public final ChannelHandlerContext firstContext() {
|
||||||
AbstractChannelHandlerContext first = head.next;
|
AbstractChannelHandlerContext first = head.next;
|
||||||
if (first == tail) {
|
if (first == tail) {
|
||||||
return null;
|
return null;
|
||||||
@ -622,7 +626,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandler last() {
|
public final ChannelHandler last() {
|
||||||
AbstractChannelHandlerContext last = tail.prev;
|
AbstractChannelHandlerContext last = tail.prev;
|
||||||
if (last == head) {
|
if (last == head) {
|
||||||
return null;
|
return null;
|
||||||
@ -631,7 +635,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandlerContext lastContext() {
|
public final ChannelHandlerContext lastContext() {
|
||||||
AbstractChannelHandlerContext last = tail.prev;
|
AbstractChannelHandlerContext last = tail.prev;
|
||||||
if (last == head) {
|
if (last == head) {
|
||||||
return null;
|
return null;
|
||||||
@ -640,7 +644,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandler get(String name) {
|
public final ChannelHandler get(String name) {
|
||||||
ChannelHandlerContext ctx = context(name);
|
ChannelHandlerContext ctx = context(name);
|
||||||
if (ctx == null) {
|
if (ctx == null) {
|
||||||
return null;
|
return null;
|
||||||
@ -651,7 +655,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public <T extends ChannelHandler> T get(Class<T> handlerType) {
|
public final <T extends ChannelHandler> T get(Class<T> handlerType) {
|
||||||
ChannelHandlerContext ctx = context(handlerType);
|
ChannelHandlerContext ctx = context(handlerType);
|
||||||
if (ctx == null) {
|
if (ctx == null) {
|
||||||
return null;
|
return null;
|
||||||
@ -661,7 +665,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandlerContext context(String name) {
|
public final ChannelHandlerContext context(String name) {
|
||||||
if (name == null) {
|
if (name == null) {
|
||||||
throw new NullPointerException("name");
|
throw new NullPointerException("name");
|
||||||
}
|
}
|
||||||
@ -670,7 +674,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandlerContext context(ChannelHandler handler) {
|
public final ChannelHandlerContext context(ChannelHandler handler) {
|
||||||
if (handler == null) {
|
if (handler == null) {
|
||||||
throw new NullPointerException("handler");
|
throw new NullPointerException("handler");
|
||||||
}
|
}
|
||||||
@ -691,7 +695,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
|
public final ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
|
||||||
if (handlerType == null) {
|
if (handlerType == null) {
|
||||||
throw new NullPointerException("handlerType");
|
throw new NullPointerException("handlerType");
|
||||||
}
|
}
|
||||||
@ -709,7 +713,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<String> names() {
|
public final List<String> names() {
|
||||||
List<String> list = new ArrayList<String>();
|
List<String> list = new ArrayList<String>();
|
||||||
AbstractChannelHandlerContext ctx = head.next;
|
AbstractChannelHandlerContext ctx = head.next;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
@ -722,7 +726,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, ChannelHandler> toMap() {
|
public final Map<String, ChannelHandler> toMap() {
|
||||||
Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
|
Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
|
||||||
AbstractChannelHandlerContext ctx = head.next;
|
AbstractChannelHandlerContext ctx = head.next;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
@ -735,7 +739,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Iterator<Map.Entry<String, ChannelHandler>> iterator() {
|
public final Iterator<Map.Entry<String, ChannelHandler>> iterator() {
|
||||||
return toMap().entrySet().iterator();
|
return toMap().entrySet().iterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -743,7 +747,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
* Returns the {@link String} representation of this pipeline.
|
* Returns the {@link String} representation of this pipeline.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public final String toString() {
|
||||||
StringBuilder buf = new StringBuilder()
|
StringBuilder buf = new StringBuilder()
|
||||||
.append(StringUtil.simpleClassName(this))
|
.append(StringUtil.simpleClassName(this))
|
||||||
.append('{');
|
.append('{');
|
||||||
@ -771,13 +775,13 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline fireChannelRegistered() {
|
public final ChannelPipeline fireChannelRegistered() {
|
||||||
head.fireChannelRegistered();
|
head.fireChannelRegistered();
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline fireChannelUnregistered() {
|
public final ChannelPipeline fireChannelUnregistered() {
|
||||||
head.fireChannelUnregistered();
|
head.fireChannelUnregistered();
|
||||||
|
|
||||||
// Remove all handlers sequentially if channel is closed and unregistered.
|
// Remove all handlers sequentially if channel is closed and unregistered.
|
||||||
@ -858,7 +862,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline fireChannelActive() {
|
public final ChannelPipeline fireChannelActive() {
|
||||||
head.fireChannelActive();
|
head.fireChannelActive();
|
||||||
|
|
||||||
if (channel.config().isAutoRead()) {
|
if (channel.config().isAutoRead()) {
|
||||||
@ -869,31 +873,31 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline fireChannelInactive() {
|
public final ChannelPipeline fireChannelInactive() {
|
||||||
head.fireChannelInactive();
|
head.fireChannelInactive();
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline fireExceptionCaught(Throwable cause) {
|
public final ChannelPipeline fireExceptionCaught(Throwable cause) {
|
||||||
head.fireExceptionCaught(cause);
|
head.fireExceptionCaught(cause);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline fireUserEventTriggered(Object event) {
|
public final ChannelPipeline fireUserEventTriggered(Object event) {
|
||||||
head.fireUserEventTriggered(event);
|
head.fireUserEventTriggered(event);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline fireChannelRead(Object msg) {
|
public final ChannelPipeline fireChannelRead(Object msg) {
|
||||||
head.fireChannelRead(msg);
|
head.fireChannelRead(msg);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline fireChannelReadComplete() {
|
public final ChannelPipeline fireChannelReadComplete() {
|
||||||
head.fireChannelReadComplete();
|
head.fireChannelReadComplete();
|
||||||
if (channel.config().isAutoRead()) {
|
if (channel.config().isAutoRead()) {
|
||||||
read();
|
read();
|
||||||
@ -902,100 +906,101 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline fireChannelWritabilityChanged() {
|
public final ChannelPipeline fireChannelWritabilityChanged() {
|
||||||
head.fireChannelWritabilityChanged();
|
head.fireChannelWritabilityChanged();
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture bind(SocketAddress localAddress) {
|
public final ChannelFuture bind(SocketAddress localAddress) {
|
||||||
return tail.bind(localAddress);
|
return tail.bind(localAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture connect(SocketAddress remoteAddress) {
|
public final ChannelFuture connect(SocketAddress remoteAddress) {
|
||||||
return tail.connect(remoteAddress);
|
return tail.connect(remoteAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
|
public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
|
||||||
return tail.connect(remoteAddress, localAddress);
|
return tail.connect(remoteAddress, localAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture disconnect() {
|
public final ChannelFuture disconnect() {
|
||||||
return tail.disconnect();
|
return tail.disconnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture close() {
|
public final ChannelFuture close() {
|
||||||
return tail.close();
|
return tail.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture deregister() {
|
public final ChannelFuture deregister() {
|
||||||
return tail.deregister();
|
return tail.deregister();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline flush() {
|
public final ChannelPipeline flush() {
|
||||||
tail.flush();
|
tail.flush();
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
|
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
|
||||||
return tail.bind(localAddress, promise);
|
return tail.bind(localAddress, promise);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
|
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
|
||||||
return tail.connect(remoteAddress, promise);
|
return tail.connect(remoteAddress, promise);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
|
public final ChannelFuture connect(
|
||||||
|
SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
|
||||||
return tail.connect(remoteAddress, localAddress, promise);
|
return tail.connect(remoteAddress, localAddress, promise);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture disconnect(ChannelPromise promise) {
|
public final ChannelFuture disconnect(ChannelPromise promise) {
|
||||||
return tail.disconnect(promise);
|
return tail.disconnect(promise);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture close(ChannelPromise promise) {
|
public final ChannelFuture close(ChannelPromise promise) {
|
||||||
return tail.close(promise);
|
return tail.close(promise);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture deregister(final ChannelPromise promise) {
|
public final ChannelFuture deregister(final ChannelPromise promise) {
|
||||||
return tail.deregister(promise);
|
return tail.deregister(promise);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline read() {
|
public final ChannelPipeline read() {
|
||||||
tail.read();
|
tail.read();
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture write(Object msg) {
|
public final ChannelFuture write(Object msg) {
|
||||||
return tail.write(msg);
|
return tail.write(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture write(Object msg, ChannelPromise promise) {
|
public final ChannelFuture write(Object msg, ChannelPromise promise) {
|
||||||
return tail.write(msg, promise);
|
return tail.write(msg, promise);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
|
public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
|
||||||
return tail.writeAndFlush(msg, promise);
|
return tail.writeAndFlush(msg, promise);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture writeAndFlush(Object msg) {
|
public final ChannelFuture writeAndFlush(Object msg) {
|
||||||
return tail.writeAndFlush(msg);
|
return tail.writeAndFlush(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1044,9 +1049,9 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Should be called before {@link #fireChannelRegistered()} is called the first time.
|
* Must be called before {@link #fireChannelRegistered()} is called the first time.
|
||||||
*/
|
*/
|
||||||
void callHandlerAddedForAllHandlers() {
|
final void callHandlerAddedForAllHandlers() {
|
||||||
// This should only called from within the EventLoop.
|
// This should only called from within the EventLoop.
|
||||||
assert channel.eventLoop().inEventLoop();
|
assert channel.eventLoop().inEventLoop();
|
||||||
|
|
||||||
@ -1098,10 +1103,38 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
return eventExecutor;
|
return eventExecutor;
|
||||||
}
|
}
|
||||||
|
|
||||||
// A special catch-all handler that handles both bytes and messages.
|
/**
|
||||||
static final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
|
* Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user
|
||||||
|
* in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}.
|
||||||
|
*/
|
||||||
|
protected void onUnhandledInboundException(Throwable cause) {
|
||||||
|
try {
|
||||||
|
logger.warn(
|
||||||
|
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
|
||||||
|
"It usually means the last handler in the pipeline did not handle the exception.",
|
||||||
|
cause);
|
||||||
|
} finally {
|
||||||
|
ReferenceCountUtil.release(cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static final String TAIL_NAME = generateName0(TailContext.class);
|
/**
|
||||||
|
* Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user
|
||||||
|
* in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible
|
||||||
|
* to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point.
|
||||||
|
*/
|
||||||
|
protected void onUnhandledInboundMessage(Object msg) {
|
||||||
|
try {
|
||||||
|
logger.debug(
|
||||||
|
"Discarded inbound message {} that reached at the tail of the pipeline. " +
|
||||||
|
"Please check your pipeline configuration.", msg);
|
||||||
|
} finally {
|
||||||
|
ReferenceCountUtil.release(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// A special catch-all handler that handles both bytes and messages.
|
||||||
|
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
|
||||||
|
|
||||||
TailContext(DefaultChannelPipeline pipeline) {
|
TailContext(DefaultChannelPipeline pipeline) {
|
||||||
super(pipeline, null, TAIL_NAME, true, false);
|
super(pipeline, null, TAIL_NAME, true, false);
|
||||||
@ -1143,36 +1176,22 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||||
try {
|
onUnhandledInboundException(cause);
|
||||||
logger.warn(
|
|
||||||
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
|
|
||||||
"It usually means the last handler in the pipeline did not handle the exception.",
|
|
||||||
cause);
|
|
||||||
} finally {
|
|
||||||
ReferenceCountUtil.release(cause);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||||
try {
|
onUnhandledInboundMessage(msg);
|
||||||
logger.debug(
|
|
||||||
"Discarded inbound message {} that reached at the tail of the pipeline. " +
|
|
||||||
"Please check your pipeline configuration.", msg);
|
|
||||||
} finally {
|
|
||||||
ReferenceCountUtil.release(msg);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
|
||||||
}
|
}
|
||||||
|
|
||||||
static final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler {
|
static final class HeadContext extends AbstractChannelHandlerContext
|
||||||
|
implements ChannelOutboundHandler {
|
||||||
|
|
||||||
private static final String HEAD_NAME = generateName0(HeadContext.class);
|
private final Unsafe unsafe;
|
||||||
|
|
||||||
protected final Unsafe unsafe;
|
|
||||||
|
|
||||||
HeadContext(DefaultChannelPipeline pipeline) {
|
HeadContext(DefaultChannelPipeline pipeline) {
|
||||||
super(pipeline, null, HEAD_NAME, false, true);
|
super(pipeline, null, HEAD_NAME, false, true);
|
||||||
|
@ -21,14 +21,13 @@ import io.netty.channel.ChannelConfig;
|
|||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelFutureListener;
|
import io.netty.channel.ChannelFutureListener;
|
||||||
import io.netty.channel.ChannelHandler;
|
import io.netty.channel.ChannelHandler;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
|
||||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
|
||||||
import io.netty.channel.ChannelInitializer;
|
import io.netty.channel.ChannelInitializer;
|
||||||
import io.netty.channel.ChannelMetadata;
|
import io.netty.channel.ChannelMetadata;
|
||||||
import io.netty.channel.ChannelOutboundBuffer;
|
import io.netty.channel.ChannelOutboundBuffer;
|
||||||
import io.netty.channel.ChannelPipeline;
|
import io.netty.channel.ChannelPipeline;
|
||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
import io.netty.channel.DefaultChannelConfig;
|
import io.netty.channel.DefaultChannelConfig;
|
||||||
|
import io.netty.channel.DefaultChannelPipeline;
|
||||||
import io.netty.channel.EventLoop;
|
import io.netty.channel.EventLoop;
|
||||||
import io.netty.util.ReferenceCountUtil;
|
import io.netty.util.ReferenceCountUtil;
|
||||||
import io.netty.util.internal.ObjectUtil;
|
import io.netty.util.internal.ObjectUtil;
|
||||||
@ -109,7 +108,11 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
|
|
||||||
ChannelFuture future = loop.register(this);
|
ChannelFuture future = loop.register(this);
|
||||||
assert future.isDone();
|
assert future.isDone();
|
||||||
p.addLast(new LastInboundHandler());
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected final DefaultChannelPipeline newChannelPipeline() {
|
||||||
|
return new EmbeddedChannelPipeline(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -501,15 +504,19 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final class LastInboundHandler extends ChannelInboundHandlerAdapter {
|
private final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
|
||||||
@Override
|
public EmbeddedChannelPipeline(EmbeddedChannel channel) {
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
super(channel);
|
||||||
inboundMessages().add(msg);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
protected void onUnhandledInboundException(Throwable cause) {
|
||||||
recordException(cause);
|
recordException(cause);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void onUnhandledInboundMessage(Object msg) {
|
||||||
|
inboundMessages().add(msg);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -197,9 +197,14 @@ public class PendingWriteQueueTest {
|
|||||||
assertNull(channel.readOutbound());
|
assertNull(channel.readOutbound());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static EmbeddedChannel newChannel() {
|
||||||
|
// Add a handler so we can access a ChannelHandlerContext via the ChannelPipeline.
|
||||||
|
return new EmbeddedChannel(new ChannelHandlerAdapter() { });
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRemoveAndFailAllReentrantFailAll() {
|
public void testRemoveAndFailAllReentrantFailAll() {
|
||||||
EmbeddedChannel channel = new EmbeddedChannel();
|
EmbeddedChannel channel = newChannel();
|
||||||
final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext());
|
final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext());
|
||||||
|
|
||||||
ChannelPromise promise = channel.newPromise();
|
ChannelPromise promise = channel.newPromise();
|
||||||
@ -224,7 +229,7 @@ public class PendingWriteQueueTest {
|
|||||||
@Test
|
@Test
|
||||||
public void testRemoveAndFailAllReentrantWrite() {
|
public void testRemoveAndFailAllReentrantWrite() {
|
||||||
final List<Integer> failOrder = Collections.synchronizedList(new ArrayList<Integer>());
|
final List<Integer> failOrder = Collections.synchronizedList(new ArrayList<Integer>());
|
||||||
EmbeddedChannel channel = new EmbeddedChannel();
|
EmbeddedChannel channel = newChannel();
|
||||||
final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext());
|
final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext());
|
||||||
|
|
||||||
ChannelPromise promise = channel.newPromise();
|
ChannelPromise promise = channel.newPromise();
|
||||||
@ -267,7 +272,7 @@ public class PendingWriteQueueTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRemoveAndWriteAllReentrance() {
|
public void testRemoveAndWriteAllReentrance() {
|
||||||
EmbeddedChannel channel = new EmbeddedChannel();
|
EmbeddedChannel channel = newChannel();
|
||||||
final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext());
|
final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext());
|
||||||
|
|
||||||
ChannelPromise promise = channel.newPromise();
|
ChannelPromise promise = channel.newPromise();
|
||||||
@ -296,7 +301,7 @@ public class PendingWriteQueueTest {
|
|||||||
// See https://github.com/netty/netty/issues/3967
|
// See https://github.com/netty/netty/issues/3967
|
||||||
@Test
|
@Test
|
||||||
public void testCloseChannelOnCreation() {
|
public void testCloseChannelOnCreation() {
|
||||||
EmbeddedChannel channel = new EmbeddedChannel(new ChannelInboundHandlerAdapter());
|
EmbeddedChannel channel = newChannel();
|
||||||
ChannelHandlerContext context = channel.pipeline().firstContext();
|
ChannelHandlerContext context = channel.pipeline().firstContext();
|
||||||
channel.close().syncUninterruptibly();
|
channel.close().syncUninterruptibly();
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user