Optimize DefaultChannelPipeline in terms of memory usage and initialization time
Motivation: Each of DefaultChannelPipeline instance creates an head and tail that wraps a handler. These are used to chain together other DefaultChannelHandlerContext that are created once a new ChannelHandler is added. There are a few things here that can be improved in terms of memory usage and initialization time. Modification: - Only generate the name for the tail and head one time as it will never change anyway - Only compute the skipFlags for the tail and head one time as it will never change - Rename DefaultChannelHandlerContext to AbstractChannelHandlerContext and make it abstract - Create a new DefaultChannelHandlerContext that is used when a ChannelHandler is added to the DefaultChannelPipeline - Rename TailHandler to TailContext and HeadHandler to HeadContext and let them extend AbstractChannelHandlerContext. This way we can save 2 object creations per DefaultChannelPipeline Result: - Less memory usage because we have 2 less objects per DefaultChannelPipeline - Faster creation of DefaultChannelPipeline as we not need to compute the skipFlags and not need to generate the name for the head and tail
This commit is contained in:
parent
805ba157e4
commit
6b17aaad71
@ -0,0 +1,546 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2012 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.channel;
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBufAllocator;
|
||||||
|
import io.netty.channel.ChannelHandler.Skip;
|
||||||
|
import io.netty.util.Attribute;
|
||||||
|
import io.netty.util.AttributeKey;
|
||||||
|
import io.netty.util.ReferenceCountUtil;
|
||||||
|
import io.netty.util.ResourceLeakHint;
|
||||||
|
import io.netty.util.concurrent.EventExecutor;
|
||||||
|
import io.netty.util.internal.PlatformDependent;
|
||||||
|
import io.netty.util.internal.StringUtil;
|
||||||
|
|
||||||
|
import java.net.SocketAddress;
|
||||||
|
import java.util.WeakHashMap;
|
||||||
|
|
||||||
|
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
|
||||||
|
|
||||||
|
// This class keeps an integer member field 'skipFlags' whose each bit tells if the corresponding handler method
|
||||||
|
// is annotated with @Skip. 'skipFlags' is retrieved in runtime via the reflection API and is cached.
|
||||||
|
// The following constants signify which bit of 'skipFlags' corresponds to which handler method:
|
||||||
|
|
||||||
|
static final int MASK_HANDLER_ADDED = 1;
|
||||||
|
static final int MASK_HANDLER_REMOVED = 1 << 1;
|
||||||
|
|
||||||
|
private static final int MASK_EXCEPTION_CAUGHT = 1 << 2;
|
||||||
|
private static final int MASK_CHANNEL_REGISTERED = 1 << 3;
|
||||||
|
private static final int MASK_CHANNEL_UNREGISTERED = 1 << 4;
|
||||||
|
private static final int MASK_CHANNEL_ACTIVE = 1 << 5;
|
||||||
|
private static final int MASK_CHANNEL_INACTIVE = 1 << 6;
|
||||||
|
private static final int MASK_CHANNEL_READ = 1 << 7;
|
||||||
|
private static final int MASK_CHANNEL_READ_COMPLETE = 1 << 8;
|
||||||
|
private static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 9;
|
||||||
|
private static final int MASK_USER_EVENT_TRIGGERED = 1 << 10;
|
||||||
|
|
||||||
|
private static final int MASK_BIND = 1 << 11;
|
||||||
|
private static final int MASK_CONNECT = 1 << 12;
|
||||||
|
private static final int MASK_DISCONNECT = 1 << 13;
|
||||||
|
private static final int MASK_CLOSE = 1 << 14;
|
||||||
|
private static final int MASK_DEREGISTER = 1 << 15;
|
||||||
|
private static final int MASK_READ = 1 << 16;
|
||||||
|
private static final int MASK_WRITE = 1 << 17;
|
||||||
|
private static final int MASK_FLUSH = 1 << 18;
|
||||||
|
|
||||||
|
private static final int MASKGROUP_INBOUND = MASK_EXCEPTION_CAUGHT |
|
||||||
|
MASK_CHANNEL_REGISTERED |
|
||||||
|
MASK_CHANNEL_UNREGISTERED |
|
||||||
|
MASK_CHANNEL_ACTIVE |
|
||||||
|
MASK_CHANNEL_INACTIVE |
|
||||||
|
MASK_CHANNEL_READ |
|
||||||
|
MASK_CHANNEL_READ_COMPLETE |
|
||||||
|
MASK_CHANNEL_WRITABILITY_CHANGED |
|
||||||
|
MASK_USER_EVENT_TRIGGERED;
|
||||||
|
|
||||||
|
private static final int MASKGROUP_OUTBOUND = MASK_BIND |
|
||||||
|
MASK_CONNECT |
|
||||||
|
MASK_DISCONNECT |
|
||||||
|
MASK_CLOSE |
|
||||||
|
MASK_DEREGISTER |
|
||||||
|
MASK_READ |
|
||||||
|
MASK_WRITE |
|
||||||
|
MASK_FLUSH;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cache the result of the costly generation of {@link #skipFlags} in the partitioned synchronized
|
||||||
|
* {@link WeakHashMap}.
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private static final WeakHashMap<Class<?>, Integer>[] skipFlagsCache =
|
||||||
|
new WeakHashMap[Runtime.getRuntime().availableProcessors()];
|
||||||
|
|
||||||
|
static {
|
||||||
|
for (int i = 0; i < skipFlagsCache.length; i ++) {
|
||||||
|
skipFlagsCache[i] = new WeakHashMap<Class<?>, Integer>();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an integer bitset that tells which handler methods were annotated with {@link Skip}.
|
||||||
|
* It gets the value from {@link #skipFlagsCache} if an handler of the same type were queried before.
|
||||||
|
* Otherwise, it delegates to {@link #skipFlags0(Class)} to get it.
|
||||||
|
*/
|
||||||
|
static int skipFlags(ChannelHandler handler) {
|
||||||
|
WeakHashMap<Class<?>, Integer> cache =
|
||||||
|
skipFlagsCache[(int) (Thread.currentThread().getId() % skipFlagsCache.length)];
|
||||||
|
Class<? extends ChannelHandler> handlerType = handler.getClass();
|
||||||
|
int flagsVal;
|
||||||
|
synchronized (cache) {
|
||||||
|
Integer flags = cache.get(handlerType);
|
||||||
|
if (flags != null) {
|
||||||
|
flagsVal = flags;
|
||||||
|
} else {
|
||||||
|
flagsVal = skipFlags0(handlerType);
|
||||||
|
cache.put(handlerType, Integer.valueOf(flagsVal));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return flagsVal;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determines the {@link #skipFlags} of the specified {@code handlerType} using the reflection API.
|
||||||
|
*/
|
||||||
|
static int skipFlags0(Class<? extends ChannelHandler> handlerType) {
|
||||||
|
int flags = 0;
|
||||||
|
try {
|
||||||
|
if (isSkippable(handlerType, "handlerAdded")) {
|
||||||
|
flags |= MASK_HANDLER_ADDED;
|
||||||
|
}
|
||||||
|
if (isSkippable(handlerType, "handlerRemoved")) {
|
||||||
|
flags |= MASK_HANDLER_REMOVED;
|
||||||
|
}
|
||||||
|
if (isSkippable(handlerType, "exceptionCaught", Throwable.class)) {
|
||||||
|
flags |= MASK_EXCEPTION_CAUGHT;
|
||||||
|
}
|
||||||
|
if (isSkippable(handlerType, "channelRegistered")) {
|
||||||
|
flags |= MASK_CHANNEL_REGISTERED;
|
||||||
|
}
|
||||||
|
if (isSkippable(handlerType, "channelUnregistered")) {
|
||||||
|
flags |= MASK_CHANNEL_UNREGISTERED;
|
||||||
|
}
|
||||||
|
if (isSkippable(handlerType, "channelActive")) {
|
||||||
|
flags |= MASK_CHANNEL_ACTIVE;
|
||||||
|
}
|
||||||
|
if (isSkippable(handlerType, "channelInactive")) {
|
||||||
|
flags |= MASK_CHANNEL_INACTIVE;
|
||||||
|
}
|
||||||
|
if (isSkippable(handlerType, "channelRead", Object.class)) {
|
||||||
|
flags |= MASK_CHANNEL_READ;
|
||||||
|
}
|
||||||
|
if (isSkippable(handlerType, "channelReadComplete")) {
|
||||||
|
flags |= MASK_CHANNEL_READ_COMPLETE;
|
||||||
|
}
|
||||||
|
if (isSkippable(handlerType, "channelWritabilityChanged")) {
|
||||||
|
flags |= MASK_CHANNEL_WRITABILITY_CHANGED;
|
||||||
|
}
|
||||||
|
if (isSkippable(handlerType, "userEventTriggered", Object.class)) {
|
||||||
|
flags |= MASK_USER_EVENT_TRIGGERED;
|
||||||
|
}
|
||||||
|
if (isSkippable(handlerType, "bind", SocketAddress.class, ChannelPromise.class)) {
|
||||||
|
flags |= MASK_BIND;
|
||||||
|
}
|
||||||
|
if (isSkippable(handlerType, "connect", SocketAddress.class, SocketAddress.class, ChannelPromise.class)) {
|
||||||
|
flags |= MASK_CONNECT;
|
||||||
|
}
|
||||||
|
if (isSkippable(handlerType, "disconnect", ChannelPromise.class)) {
|
||||||
|
flags |= MASK_DISCONNECT;
|
||||||
|
}
|
||||||
|
if (isSkippable(handlerType, "close", ChannelPromise.class)) {
|
||||||
|
flags |= MASK_CLOSE;
|
||||||
|
}
|
||||||
|
if (isSkippable(handlerType, "deregister", ChannelPromise.class)) {
|
||||||
|
flags |= MASK_DEREGISTER;
|
||||||
|
}
|
||||||
|
if (isSkippable(handlerType, "read")) {
|
||||||
|
flags |= MASK_READ;
|
||||||
|
}
|
||||||
|
if (isSkippable(handlerType, "write", Object.class, ChannelPromise.class)) {
|
||||||
|
flags |= MASK_WRITE;
|
||||||
|
}
|
||||||
|
if (isSkippable(handlerType, "flush")) {
|
||||||
|
flags |= MASK_FLUSH;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
// Should never reach here.
|
||||||
|
PlatformDependent.throwException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
return flags;
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
private static boolean isSkippable(
|
||||||
|
Class<?> handlerType, String methodName, Class<?>... paramTypes) throws Exception {
|
||||||
|
|
||||||
|
Class[] newParamTypes = new Class[paramTypes.length + 1];
|
||||||
|
newParamTypes[0] = ChannelHandlerContext.class;
|
||||||
|
System.arraycopy(paramTypes, 0, newParamTypes, 1, paramTypes.length);
|
||||||
|
|
||||||
|
return handlerType.getMethod(methodName, newParamTypes).isAnnotationPresent(Skip.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
volatile AbstractChannelHandlerContext next;
|
||||||
|
volatile AbstractChannelHandlerContext prev;
|
||||||
|
|
||||||
|
private final AbstractChannel channel;
|
||||||
|
private final DefaultChannelPipeline pipeline;
|
||||||
|
private final String name;
|
||||||
|
private boolean removed;
|
||||||
|
|
||||||
|
final int skipFlags;
|
||||||
|
|
||||||
|
// Will be set to null if no child executor should be used, otherwise it will be set to the
|
||||||
|
// child executor.
|
||||||
|
final ChannelHandlerInvoker invoker;
|
||||||
|
private ChannelFuture succeededFuture;
|
||||||
|
|
||||||
|
// Lazily instantiated tasks used to trigger events to a handler with different executor.
|
||||||
|
// These needs to be volatile as otherwise an other Thread may see an half initialized instance.
|
||||||
|
// See the JMM for more details
|
||||||
|
volatile Runnable invokeChannelReadCompleteTask;
|
||||||
|
volatile Runnable invokeReadTask;
|
||||||
|
volatile Runnable invokeFlushTask;
|
||||||
|
volatile Runnable invokeChannelWritableStateChangedTask;
|
||||||
|
|
||||||
|
AbstractChannelHandlerContext(
|
||||||
|
DefaultChannelPipeline pipeline, ChannelHandlerInvoker invoker, String name, int skipFlags) {
|
||||||
|
|
||||||
|
if (name == null) {
|
||||||
|
throw new NullPointerException("name");
|
||||||
|
}
|
||||||
|
|
||||||
|
channel = pipeline.channel;
|
||||||
|
this.pipeline = pipeline;
|
||||||
|
this.name = name;
|
||||||
|
this.invoker = invoker;
|
||||||
|
this.skipFlags = skipFlags;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Invocation initiated by {@link DefaultChannelPipeline#teardownAll()}}. */
|
||||||
|
void teardown() {
|
||||||
|
EventExecutor executor = executor();
|
||||||
|
if (executor.inEventLoop()) {
|
||||||
|
teardown0();
|
||||||
|
} else {
|
||||||
|
executor.execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
teardown0();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void teardown0() {
|
||||||
|
AbstractChannelHandlerContext prev = this.prev;
|
||||||
|
if (prev != null) {
|
||||||
|
synchronized (pipeline) {
|
||||||
|
pipeline.remove0(this);
|
||||||
|
}
|
||||||
|
prev.teardown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Channel channel() {
|
||||||
|
return channel;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelPipeline pipeline() {
|
||||||
|
return pipeline;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ByteBufAllocator alloc() {
|
||||||
|
return channel().config().getAllocator();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EventExecutor executor() {
|
||||||
|
return invoker().executor();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandlerInvoker invoker() {
|
||||||
|
if (invoker == null) {
|
||||||
|
return channel.unsafe().invoker();
|
||||||
|
}
|
||||||
|
return invoker;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String name() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> Attribute<T> attr(AttributeKey<T> key) {
|
||||||
|
return channel.attr(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> boolean hasAttr(AttributeKey<T> key) {
|
||||||
|
return channel.hasAttr(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandlerContext fireChannelRegistered() {
|
||||||
|
AbstractChannelHandlerContext next = findContextInbound();
|
||||||
|
next.invoker().invokeChannelRegistered(next);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandlerContext fireChannelUnregistered() {
|
||||||
|
AbstractChannelHandlerContext next = findContextInbound();
|
||||||
|
next.invoker().invokeChannelUnregistered(next);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandlerContext fireChannelActive() {
|
||||||
|
AbstractChannelHandlerContext next = findContextInbound();
|
||||||
|
next.invoker().invokeChannelActive(next);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandlerContext fireChannelInactive() {
|
||||||
|
AbstractChannelHandlerContext next = findContextInbound();
|
||||||
|
next.invoker().invokeChannelInactive(next);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
|
||||||
|
AbstractChannelHandlerContext next = findContextInbound();
|
||||||
|
next.invoker().invokeExceptionCaught(next, cause);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandlerContext fireUserEventTriggered(Object event) {
|
||||||
|
AbstractChannelHandlerContext next = findContextInbound();
|
||||||
|
next.invoker().invokeUserEventTriggered(next, event);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandlerContext fireChannelRead(Object msg) {
|
||||||
|
AbstractChannelHandlerContext next = findContextInbound();
|
||||||
|
ReferenceCountUtil.touch(msg, next);
|
||||||
|
next.invoker().invokeChannelRead(next, msg);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandlerContext fireChannelReadComplete() {
|
||||||
|
AbstractChannelHandlerContext next = findContextInbound();
|
||||||
|
next.invoker().invokeChannelReadComplete(next);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandlerContext fireChannelWritabilityChanged() {
|
||||||
|
AbstractChannelHandlerContext next = findContextInbound();
|
||||||
|
next.invoker().invokeChannelWritabilityChanged(next);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture bind(SocketAddress localAddress) {
|
||||||
|
return bind(localAddress, newPromise());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture connect(SocketAddress remoteAddress) {
|
||||||
|
return connect(remoteAddress, newPromise());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
|
||||||
|
return connect(remoteAddress, localAddress, newPromise());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture disconnect() {
|
||||||
|
return disconnect(newPromise());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture close() {
|
||||||
|
return close(newPromise());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture deregister() {
|
||||||
|
return deregister(newPromise());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
|
||||||
|
AbstractChannelHandlerContext next = findContextOutbound();
|
||||||
|
next.invoker().invokeBind(next, localAddress, promise);
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
|
||||||
|
return connect(remoteAddress, null, promise);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
|
||||||
|
AbstractChannelHandlerContext next = findContextOutbound();
|
||||||
|
next.invoker().invokeConnect(next, remoteAddress, localAddress, promise);
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture disconnect(ChannelPromise promise) {
|
||||||
|
if (!channel().metadata().hasDisconnect()) {
|
||||||
|
return close(promise);
|
||||||
|
}
|
||||||
|
|
||||||
|
AbstractChannelHandlerContext next = findContextOutbound();
|
||||||
|
next.invoker().invokeDisconnect(next, promise);
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture close(ChannelPromise promise) {
|
||||||
|
AbstractChannelHandlerContext next = findContextOutbound();
|
||||||
|
next.invoker().invokeClose(next, promise);
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture deregister(ChannelPromise promise) {
|
||||||
|
AbstractChannelHandlerContext next = findContextOutbound();
|
||||||
|
next.invoker().invokeDeregister(next, promise);
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandlerContext read() {
|
||||||
|
AbstractChannelHandlerContext next = findContextOutbound();
|
||||||
|
next.invoker().invokeRead(next);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture write(Object msg) {
|
||||||
|
return write(msg, newPromise());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture write(Object msg, ChannelPromise promise) {
|
||||||
|
AbstractChannelHandlerContext next = findContextOutbound();
|
||||||
|
ReferenceCountUtil.touch(msg, next);
|
||||||
|
next.invoker().invokeWrite(next, msg, promise);
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandlerContext flush() {
|
||||||
|
AbstractChannelHandlerContext next = findContextOutbound();
|
||||||
|
next.invoker().invokeFlush(next);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
|
||||||
|
AbstractChannelHandlerContext next;
|
||||||
|
next = findContextOutbound();
|
||||||
|
ReferenceCountUtil.touch(msg, next);
|
||||||
|
next.invoker().invokeWrite(next, msg, promise);
|
||||||
|
next = findContextOutbound();
|
||||||
|
next.invoker().invokeFlush(next);
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture writeAndFlush(Object msg) {
|
||||||
|
return writeAndFlush(msg, newPromise());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelPromise newPromise() {
|
||||||
|
return new DefaultChannelPromise(channel(), executor());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelProgressivePromise newProgressivePromise() {
|
||||||
|
return new DefaultChannelProgressivePromise(channel(), executor());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture newSucceededFuture() {
|
||||||
|
ChannelFuture succeededFuture = this.succeededFuture;
|
||||||
|
if (succeededFuture == null) {
|
||||||
|
this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
|
||||||
|
}
|
||||||
|
return succeededFuture;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture newFailedFuture(Throwable cause) {
|
||||||
|
return new FailedChannelFuture(channel(), executor(), cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
private AbstractChannelHandlerContext findContextInbound() {
|
||||||
|
AbstractChannelHandlerContext ctx = this;
|
||||||
|
do {
|
||||||
|
ctx = ctx.next;
|
||||||
|
} while ((ctx.skipFlags & MASKGROUP_INBOUND) == MASKGROUP_INBOUND);
|
||||||
|
return ctx;
|
||||||
|
}
|
||||||
|
|
||||||
|
private AbstractChannelHandlerContext findContextOutbound() {
|
||||||
|
AbstractChannelHandlerContext ctx = this;
|
||||||
|
do {
|
||||||
|
ctx = ctx.prev;
|
||||||
|
} while ((ctx.skipFlags & MASKGROUP_OUTBOUND) == MASKGROUP_OUTBOUND);
|
||||||
|
return ctx;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelPromise voidPromise() {
|
||||||
|
return channel.voidPromise();
|
||||||
|
}
|
||||||
|
|
||||||
|
void setRemoved() {
|
||||||
|
removed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isRemoved() {
|
||||||
|
return removed;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toHintString() {
|
||||||
|
return '\'' + name + "' will handle the message from this point.";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel + ')';
|
||||||
|
}
|
||||||
|
}
|
@ -181,7 +181,7 @@ public class ChannelHandlerAppender extends ChannelHandlerAdapter {
|
|||||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||||
added = true;
|
added = true;
|
||||||
|
|
||||||
DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx;
|
AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx;
|
||||||
DefaultChannelPipeline pipeline = (DefaultChannelPipeline) dctx.pipeline();
|
DefaultChannelPipeline pipeline = (DefaultChannelPipeline) dctx.pipeline();
|
||||||
String name = dctx.name();
|
String name = dctx.name();
|
||||||
try {
|
try {
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* Copyright 2012 The Netty Project
|
* Copyright 2014 The Netty Project
|
||||||
*
|
*
|
||||||
* The Netty Project licenses this file to you under the Apache License,
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
@ -15,543 +15,24 @@
|
|||||||
*/
|
*/
|
||||||
package io.netty.channel;
|
package io.netty.channel;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBufAllocator;
|
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
|
||||||
import io.netty.channel.ChannelHandler.Skip;
|
|
||||||
import io.netty.util.Attribute;
|
|
||||||
import io.netty.util.AttributeKey;
|
|
||||||
import io.netty.util.ReferenceCountUtil;
|
|
||||||
import io.netty.util.ResourceLeakHint;
|
|
||||||
import io.netty.util.concurrent.EventExecutor;
|
|
||||||
import io.netty.util.internal.PlatformDependent;
|
|
||||||
import io.netty.util.internal.StringUtil;
|
|
||||||
|
|
||||||
import java.net.SocketAddress;
|
|
||||||
import java.util.WeakHashMap;
|
|
||||||
|
|
||||||
final class DefaultChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
|
|
||||||
|
|
||||||
// This class keeps an integer member field 'skipFlags' whose each bit tells if the corresponding handler method
|
|
||||||
// is annotated with @Skip. 'skipFlags' is retrieved in runtime via the reflection API and is cached.
|
|
||||||
// The following constants signify which bit of 'skipFlags' corresponds to which handler method:
|
|
||||||
|
|
||||||
static final int MASK_HANDLER_ADDED = 1;
|
|
||||||
static final int MASK_HANDLER_REMOVED = 1 << 1;
|
|
||||||
|
|
||||||
private static final int MASK_EXCEPTION_CAUGHT = 1 << 2;
|
|
||||||
private static final int MASK_CHANNEL_REGISTERED = 1 << 3;
|
|
||||||
private static final int MASK_CHANNEL_UNREGISTERED = 1 << 4;
|
|
||||||
private static final int MASK_CHANNEL_ACTIVE = 1 << 5;
|
|
||||||
private static final int MASK_CHANNEL_INACTIVE = 1 << 6;
|
|
||||||
private static final int MASK_CHANNEL_READ = 1 << 7;
|
|
||||||
private static final int MASK_CHANNEL_READ_COMPLETE = 1 << 8;
|
|
||||||
private static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 9;
|
|
||||||
private static final int MASK_USER_EVENT_TRIGGERED = 1 << 10;
|
|
||||||
|
|
||||||
private static final int MASK_BIND = 1 << 11;
|
|
||||||
private static final int MASK_CONNECT = 1 << 12;
|
|
||||||
private static final int MASK_DISCONNECT = 1 << 13;
|
|
||||||
private static final int MASK_CLOSE = 1 << 14;
|
|
||||||
private static final int MASK_DEREGISTER = 1 << 15;
|
|
||||||
private static final int MASK_READ = 1 << 16;
|
|
||||||
private static final int MASK_WRITE = 1 << 17;
|
|
||||||
private static final int MASK_FLUSH = 1 << 18;
|
|
||||||
|
|
||||||
private static final int MASKGROUP_INBOUND = MASK_EXCEPTION_CAUGHT |
|
|
||||||
MASK_CHANNEL_REGISTERED |
|
|
||||||
MASK_CHANNEL_UNREGISTERED |
|
|
||||||
MASK_CHANNEL_ACTIVE |
|
|
||||||
MASK_CHANNEL_INACTIVE |
|
|
||||||
MASK_CHANNEL_READ |
|
|
||||||
MASK_CHANNEL_READ_COMPLETE |
|
|
||||||
MASK_CHANNEL_WRITABILITY_CHANGED |
|
|
||||||
MASK_USER_EVENT_TRIGGERED;
|
|
||||||
|
|
||||||
private static final int MASKGROUP_OUTBOUND = MASK_BIND |
|
|
||||||
MASK_CONNECT |
|
|
||||||
MASK_DISCONNECT |
|
|
||||||
MASK_CLOSE |
|
|
||||||
MASK_DEREGISTER |
|
|
||||||
MASK_READ |
|
|
||||||
MASK_WRITE |
|
|
||||||
MASK_FLUSH;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Cache the result of the costly generation of {@link #skipFlags} in the partitioned synchronized
|
|
||||||
* {@link WeakHashMap}.
|
|
||||||
*/
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private static final WeakHashMap<Class<?>, Integer>[] skipFlagsCache =
|
|
||||||
new WeakHashMap[Runtime.getRuntime().availableProcessors()];
|
|
||||||
|
|
||||||
static {
|
|
||||||
for (int i = 0; i < skipFlagsCache.length; i ++) {
|
|
||||||
skipFlagsCache[i] = new WeakHashMap<Class<?>, Integer>();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns an integer bitset that tells which handler methods were annotated with {@link Skip}.
|
|
||||||
* It gets the value from {@link #skipFlagsCache} if an handler of the same type were queried before.
|
|
||||||
* Otherwise, it delegates to {@link #skipFlags0(Class)} to get it.
|
|
||||||
*/
|
|
||||||
private static int skipFlags(ChannelHandler handler) {
|
|
||||||
WeakHashMap<Class<?>, Integer> cache =
|
|
||||||
skipFlagsCache[(int) (Thread.currentThread().getId() % skipFlagsCache.length)];
|
|
||||||
Class<? extends ChannelHandler> handlerType = handler.getClass();
|
|
||||||
int flagsVal;
|
|
||||||
synchronized (cache) {
|
|
||||||
Integer flags = cache.get(handlerType);
|
|
||||||
if (flags != null) {
|
|
||||||
flagsVal = flags;
|
|
||||||
} else {
|
|
||||||
flagsVal = skipFlags0(handlerType);
|
|
||||||
cache.put(handlerType, Integer.valueOf(flagsVal));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return flagsVal;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Determines the {@link #skipFlags} of the specified {@code handlerType} using the reflection API.
|
|
||||||
*/
|
|
||||||
private static int skipFlags0(Class<? extends ChannelHandler> handlerType) {
|
|
||||||
int flags = 0;
|
|
||||||
try {
|
|
||||||
if (isSkippable(handlerType, "handlerAdded")) {
|
|
||||||
flags |= MASK_HANDLER_ADDED;
|
|
||||||
}
|
|
||||||
if (isSkippable(handlerType, "handlerRemoved")) {
|
|
||||||
flags |= MASK_HANDLER_REMOVED;
|
|
||||||
}
|
|
||||||
if (isSkippable(handlerType, "exceptionCaught", Throwable.class)) {
|
|
||||||
flags |= MASK_EXCEPTION_CAUGHT;
|
|
||||||
}
|
|
||||||
if (isSkippable(handlerType, "channelRegistered")) {
|
|
||||||
flags |= MASK_CHANNEL_REGISTERED;
|
|
||||||
}
|
|
||||||
if (isSkippable(handlerType, "channelUnregistered")) {
|
|
||||||
flags |= MASK_CHANNEL_UNREGISTERED;
|
|
||||||
}
|
|
||||||
if (isSkippable(handlerType, "channelActive")) {
|
|
||||||
flags |= MASK_CHANNEL_ACTIVE;
|
|
||||||
}
|
|
||||||
if (isSkippable(handlerType, "channelInactive")) {
|
|
||||||
flags |= MASK_CHANNEL_INACTIVE;
|
|
||||||
}
|
|
||||||
if (isSkippable(handlerType, "channelRead", Object.class)) {
|
|
||||||
flags |= MASK_CHANNEL_READ;
|
|
||||||
}
|
|
||||||
if (isSkippable(handlerType, "channelReadComplete")) {
|
|
||||||
flags |= MASK_CHANNEL_READ_COMPLETE;
|
|
||||||
}
|
|
||||||
if (isSkippable(handlerType, "channelWritabilityChanged")) {
|
|
||||||
flags |= MASK_CHANNEL_WRITABILITY_CHANGED;
|
|
||||||
}
|
|
||||||
if (isSkippable(handlerType, "userEventTriggered", Object.class)) {
|
|
||||||
flags |= MASK_USER_EVENT_TRIGGERED;
|
|
||||||
}
|
|
||||||
if (isSkippable(handlerType, "bind", SocketAddress.class, ChannelPromise.class)) {
|
|
||||||
flags |= MASK_BIND;
|
|
||||||
}
|
|
||||||
if (isSkippable(handlerType, "connect", SocketAddress.class, SocketAddress.class, ChannelPromise.class)) {
|
|
||||||
flags |= MASK_CONNECT;
|
|
||||||
}
|
|
||||||
if (isSkippable(handlerType, "disconnect", ChannelPromise.class)) {
|
|
||||||
flags |= MASK_DISCONNECT;
|
|
||||||
}
|
|
||||||
if (isSkippable(handlerType, "close", ChannelPromise.class)) {
|
|
||||||
flags |= MASK_CLOSE;
|
|
||||||
}
|
|
||||||
if (isSkippable(handlerType, "deregister", ChannelPromise.class)) {
|
|
||||||
flags |= MASK_DEREGISTER;
|
|
||||||
}
|
|
||||||
if (isSkippable(handlerType, "read")) {
|
|
||||||
flags |= MASK_READ;
|
|
||||||
}
|
|
||||||
if (isSkippable(handlerType, "write", Object.class, ChannelPromise.class)) {
|
|
||||||
flags |= MASK_WRITE;
|
|
||||||
}
|
|
||||||
if (isSkippable(handlerType, "flush")) {
|
|
||||||
flags |= MASK_FLUSH;
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
// Should never reach here.
|
|
||||||
PlatformDependent.throwException(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
return flags;
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("rawtypes")
|
|
||||||
private static boolean isSkippable(
|
|
||||||
Class<?> handlerType, String methodName, Class<?>... paramTypes) throws Exception {
|
|
||||||
|
|
||||||
Class[] newParamTypes = new Class[paramTypes.length + 1];
|
|
||||||
newParamTypes[0] = ChannelHandlerContext.class;
|
|
||||||
System.arraycopy(paramTypes, 0, newParamTypes, 1, paramTypes.length);
|
|
||||||
|
|
||||||
return handlerType.getMethod(methodName, newParamTypes).isAnnotationPresent(Skip.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
volatile DefaultChannelHandlerContext next;
|
|
||||||
volatile DefaultChannelHandlerContext prev;
|
|
||||||
|
|
||||||
private final AbstractChannel channel;
|
|
||||||
private final DefaultChannelPipeline pipeline;
|
|
||||||
private final String name;
|
|
||||||
private final ChannelHandler handler;
|
private final ChannelHandler handler;
|
||||||
private boolean removed;
|
|
||||||
|
|
||||||
final int skipFlags;
|
|
||||||
|
|
||||||
// Will be set to null if no child executor should be used, otherwise it will be set to the
|
|
||||||
// child executor.
|
|
||||||
final ChannelHandlerInvoker invoker;
|
|
||||||
private ChannelFuture succeededFuture;
|
|
||||||
|
|
||||||
// Lazily instantiated tasks used to trigger events to a handler with different executor.
|
|
||||||
// These needs to be volatile as otherwise an other Thread may see an half initialized instance.
|
|
||||||
// See the JMM for more details
|
|
||||||
volatile Runnable invokeChannelReadCompleteTask;
|
|
||||||
volatile Runnable invokeReadTask;
|
|
||||||
volatile Runnable invokeFlushTask;
|
|
||||||
volatile Runnable invokeChannelWritableStateChangedTask;
|
|
||||||
|
|
||||||
DefaultChannelHandlerContext(
|
DefaultChannelHandlerContext(
|
||||||
DefaultChannelPipeline pipeline, ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
|
DefaultChannelPipeline pipeline, ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
|
||||||
|
super(pipeline, invoker, name, skipFlags(checkNull(handler)));
|
||||||
if (name == null) {
|
this.handler = handler;
|
||||||
throw new NullPointerException("name");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static ChannelHandler checkNull(ChannelHandler handler) {
|
||||||
if (handler == null) {
|
if (handler == null) {
|
||||||
throw new NullPointerException("handler");
|
throw new NullPointerException("handler");
|
||||||
}
|
}
|
||||||
|
return handler;
|
||||||
channel = pipeline.channel;
|
|
||||||
this.pipeline = pipeline;
|
|
||||||
this.name = name;
|
|
||||||
this.handler = handler;
|
|
||||||
this.invoker = invoker;
|
|
||||||
|
|
||||||
skipFlags = skipFlags(handler);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Invocation initiated by {@link DefaultChannelPipeline#teardownAll()}}. */
|
|
||||||
void teardown() {
|
|
||||||
EventExecutor executor = executor();
|
|
||||||
if (executor.inEventLoop()) {
|
|
||||||
teardown0();
|
|
||||||
} else {
|
|
||||||
executor.execute(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
teardown0();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void teardown0() {
|
|
||||||
DefaultChannelHandlerContext prev = this.prev;
|
|
||||||
if (prev != null) {
|
|
||||||
synchronized (pipeline) {
|
|
||||||
pipeline.remove0(this);
|
|
||||||
}
|
|
||||||
prev.teardown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Channel channel() {
|
|
||||||
return channel;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelPipeline pipeline() {
|
|
||||||
return pipeline;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ByteBufAllocator alloc() {
|
|
||||||
return channel().config().getAllocator();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public EventExecutor executor() {
|
|
||||||
return invoker().executor();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelHandlerInvoker invoker() {
|
|
||||||
if (invoker == null) {
|
|
||||||
return channel.unsafe().invoker();
|
|
||||||
}
|
|
||||||
return invoker;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandler handler() {
|
public ChannelHandler handler() {
|
||||||
return handler;
|
return handler;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public String name() {
|
|
||||||
return name;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <T> Attribute<T> attr(AttributeKey<T> key) {
|
|
||||||
return channel.attr(key);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <T> boolean hasAttr(AttributeKey<T> key) {
|
|
||||||
return channel.hasAttr(key);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelHandlerContext fireChannelRegistered() {
|
|
||||||
DefaultChannelHandlerContext next = findContextInbound();
|
|
||||||
next.invoker().invokeChannelRegistered(next);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelHandlerContext fireChannelUnregistered() {
|
|
||||||
DefaultChannelHandlerContext next = findContextInbound();
|
|
||||||
next.invoker().invokeChannelUnregistered(next);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelHandlerContext fireChannelActive() {
|
|
||||||
DefaultChannelHandlerContext next = findContextInbound();
|
|
||||||
next.invoker().invokeChannelActive(next);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelHandlerContext fireChannelInactive() {
|
|
||||||
DefaultChannelHandlerContext next = findContextInbound();
|
|
||||||
next.invoker().invokeChannelInactive(next);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
|
|
||||||
DefaultChannelHandlerContext next = findContextInbound();
|
|
||||||
next.invoker().invokeExceptionCaught(next, cause);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelHandlerContext fireUserEventTriggered(Object event) {
|
|
||||||
DefaultChannelHandlerContext next = findContextInbound();
|
|
||||||
next.invoker().invokeUserEventTriggered(next, event);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelHandlerContext fireChannelRead(Object msg) {
|
|
||||||
DefaultChannelHandlerContext next = findContextInbound();
|
|
||||||
ReferenceCountUtil.touch(msg, next);
|
|
||||||
next.invoker().invokeChannelRead(next, msg);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelHandlerContext fireChannelReadComplete() {
|
|
||||||
DefaultChannelHandlerContext next = findContextInbound();
|
|
||||||
next.invoker().invokeChannelReadComplete(next);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelHandlerContext fireChannelWritabilityChanged() {
|
|
||||||
DefaultChannelHandlerContext next = findContextInbound();
|
|
||||||
next.invoker().invokeChannelWritabilityChanged(next);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelFuture bind(SocketAddress localAddress) {
|
|
||||||
return bind(localAddress, newPromise());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelFuture connect(SocketAddress remoteAddress) {
|
|
||||||
return connect(remoteAddress, newPromise());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
|
|
||||||
return connect(remoteAddress, localAddress, newPromise());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelFuture disconnect() {
|
|
||||||
return disconnect(newPromise());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelFuture close() {
|
|
||||||
return close(newPromise());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelFuture deregister() {
|
|
||||||
return deregister(newPromise());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
|
|
||||||
DefaultChannelHandlerContext next = findContextOutbound();
|
|
||||||
next.invoker().invokeBind(next, localAddress, promise);
|
|
||||||
return promise;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
|
|
||||||
return connect(remoteAddress, null, promise);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
|
|
||||||
DefaultChannelHandlerContext next = findContextOutbound();
|
|
||||||
next.invoker().invokeConnect(next, remoteAddress, localAddress, promise);
|
|
||||||
return promise;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelFuture disconnect(ChannelPromise promise) {
|
|
||||||
if (!channel().metadata().hasDisconnect()) {
|
|
||||||
return close(promise);
|
|
||||||
}
|
|
||||||
|
|
||||||
DefaultChannelHandlerContext next = findContextOutbound();
|
|
||||||
next.invoker().invokeDisconnect(next, promise);
|
|
||||||
return promise;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelFuture close(ChannelPromise promise) {
|
|
||||||
DefaultChannelHandlerContext next = findContextOutbound();
|
|
||||||
next.invoker().invokeClose(next, promise);
|
|
||||||
return promise;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelFuture deregister(ChannelPromise promise) {
|
|
||||||
DefaultChannelHandlerContext next = findContextOutbound();
|
|
||||||
next.invoker().invokeDeregister(next, promise);
|
|
||||||
return promise;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelHandlerContext read() {
|
|
||||||
DefaultChannelHandlerContext next = findContextOutbound();
|
|
||||||
next.invoker().invokeRead(next);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelFuture write(Object msg) {
|
|
||||||
return write(msg, newPromise());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelFuture write(Object msg, ChannelPromise promise) {
|
|
||||||
DefaultChannelHandlerContext next = findContextOutbound();
|
|
||||||
ReferenceCountUtil.touch(msg, next);
|
|
||||||
next.invoker().invokeWrite(next, msg, promise);
|
|
||||||
return promise;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelHandlerContext flush() {
|
|
||||||
DefaultChannelHandlerContext next = findContextOutbound();
|
|
||||||
next.invoker().invokeFlush(next);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
|
|
||||||
DefaultChannelHandlerContext next;
|
|
||||||
next = findContextOutbound();
|
|
||||||
ReferenceCountUtil.touch(msg, next);
|
|
||||||
next.invoker().invokeWrite(next, msg, promise);
|
|
||||||
next = findContextOutbound();
|
|
||||||
next.invoker().invokeFlush(next);
|
|
||||||
return promise;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelFuture writeAndFlush(Object msg) {
|
|
||||||
return writeAndFlush(msg, newPromise());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelPromise newPromise() {
|
|
||||||
return new DefaultChannelPromise(channel(), executor());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelProgressivePromise newProgressivePromise() {
|
|
||||||
return new DefaultChannelProgressivePromise(channel(), executor());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelFuture newSucceededFuture() {
|
|
||||||
ChannelFuture succeededFuture = this.succeededFuture;
|
|
||||||
if (succeededFuture == null) {
|
|
||||||
this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
|
|
||||||
}
|
|
||||||
return succeededFuture;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelFuture newFailedFuture(Throwable cause) {
|
|
||||||
return new FailedChannelFuture(channel(), executor(), cause);
|
|
||||||
}
|
|
||||||
|
|
||||||
private DefaultChannelHandlerContext findContextInbound() {
|
|
||||||
DefaultChannelHandlerContext ctx = this;
|
|
||||||
do {
|
|
||||||
ctx = ctx.next;
|
|
||||||
} while ((ctx.skipFlags & MASKGROUP_INBOUND) == MASKGROUP_INBOUND);
|
|
||||||
return ctx;
|
|
||||||
}
|
|
||||||
|
|
||||||
private DefaultChannelHandlerContext findContextOutbound() {
|
|
||||||
DefaultChannelHandlerContext ctx = this;
|
|
||||||
do {
|
|
||||||
ctx = ctx.prev;
|
|
||||||
} while ((ctx.skipFlags & MASKGROUP_OUTBOUND) == MASKGROUP_OUTBOUND);
|
|
||||||
return ctx;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelPromise voidPromise() {
|
|
||||||
return channel.voidPromise();
|
|
||||||
}
|
|
||||||
|
|
||||||
void setRemoved() {
|
|
||||||
removed = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isRemoved() {
|
|
||||||
return removed;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toHintString() {
|
|
||||||
return '\'' + name + "' will handle the message from this point.";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel + ')';
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -165,7 +165,7 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
|
|||||||
if (executor.inEventLoop()) {
|
if (executor.inEventLoop()) {
|
||||||
invokeChannelReadCompleteNow(ctx);
|
invokeChannelReadCompleteNow(ctx);
|
||||||
} else {
|
} else {
|
||||||
DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx;
|
AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx;
|
||||||
Runnable task = dctx.invokeChannelReadCompleteTask;
|
Runnable task = dctx.invokeChannelReadCompleteTask;
|
||||||
if (task == null) {
|
if (task == null) {
|
||||||
dctx.invokeChannelReadCompleteTask = task = new Runnable() {
|
dctx.invokeChannelReadCompleteTask = task = new Runnable() {
|
||||||
@ -184,7 +184,7 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
|
|||||||
if (executor.inEventLoop()) {
|
if (executor.inEventLoop()) {
|
||||||
invokeChannelWritabilityChangedNow(ctx);
|
invokeChannelWritabilityChangedNow(ctx);
|
||||||
} else {
|
} else {
|
||||||
DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx;
|
AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx;
|
||||||
Runnable task = dctx.invokeChannelWritableStateChangedTask;
|
Runnable task = dctx.invokeChannelWritableStateChangedTask;
|
||||||
if (task == null) {
|
if (task == null) {
|
||||||
dctx.invokeChannelWritableStateChangedTask = task = new Runnable() {
|
dctx.invokeChannelWritableStateChangedTask = task = new Runnable() {
|
||||||
@ -307,7 +307,7 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
|
|||||||
if (executor.inEventLoop()) {
|
if (executor.inEventLoop()) {
|
||||||
invokeReadNow(ctx);
|
invokeReadNow(ctx);
|
||||||
} else {
|
} else {
|
||||||
DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx;
|
AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx;
|
||||||
Runnable task = dctx.invokeReadTask;
|
Runnable task = dctx.invokeReadTask;
|
||||||
if (task == null) {
|
if (task == null) {
|
||||||
dctx.invokeReadTask = task = new Runnable() {
|
dctx.invokeReadTask = task = new Runnable() {
|
||||||
@ -353,7 +353,7 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
|
|||||||
if (executor.inEventLoop()) {
|
if (executor.inEventLoop()) {
|
||||||
invokeFlushNow(ctx);
|
invokeFlushNow(ctx);
|
||||||
} else {
|
} else {
|
||||||
DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx;
|
AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx;
|
||||||
Runnable task = dctx.invokeFlushTask;
|
Runnable task = dctx.invokeFlushTask;
|
||||||
if (task == null) {
|
if (task == null) {
|
||||||
dctx.invokeFlushTask = task = new Runnable() {
|
dctx.invokeFlushTask = task = new Runnable() {
|
||||||
|
@ -57,11 +57,11 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
|
|
||||||
final AbstractChannel channel;
|
final AbstractChannel channel;
|
||||||
|
|
||||||
final DefaultChannelHandlerContext head;
|
final AbstractChannelHandlerContext head;
|
||||||
final DefaultChannelHandlerContext tail;
|
final AbstractChannelHandlerContext tail;
|
||||||
|
|
||||||
private final Map<String, DefaultChannelHandlerContext> name2ctx =
|
private final Map<String, AbstractChannelHandlerContext> name2ctx =
|
||||||
new HashMap<String, DefaultChannelHandlerContext>(4);
|
new HashMap<String, AbstractChannelHandlerContext>(4);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @see #findInvoker(EventExecutorGroup)
|
* @see #findInvoker(EventExecutorGroup)
|
||||||
@ -74,11 +74,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
this.channel = channel;
|
this.channel = channel;
|
||||||
|
|
||||||
TailHandler tailHandler = new TailHandler();
|
tail = new TailContext(this);
|
||||||
tail = new DefaultChannelHandlerContext(this, null, generateName(tailHandler), tailHandler);
|
head = new HeadContext(this);
|
||||||
|
|
||||||
HeadHandler headHandler = new HeadHandler(channel.unsafe());
|
|
||||||
head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler);
|
|
||||||
|
|
||||||
head.next = tail;
|
head.next = tail;
|
||||||
tail.prev = head;
|
tail.prev = head;
|
||||||
@ -112,10 +109,10 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addFirst0(String name, DefaultChannelHandlerContext newCtx) {
|
private void addFirst0(String name, AbstractChannelHandlerContext newCtx) {
|
||||||
checkMultiplicity(newCtx);
|
checkMultiplicity(newCtx);
|
||||||
|
|
||||||
DefaultChannelHandlerContext nextCtx = head.next;
|
AbstractChannelHandlerContext nextCtx = head.next;
|
||||||
newCtx.prev = head;
|
newCtx.prev = head;
|
||||||
newCtx.next = nextCtx;
|
newCtx.next = nextCtx;
|
||||||
head.next = newCtx;
|
head.next = newCtx;
|
||||||
@ -149,10 +146,10 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addLast0(final String name, DefaultChannelHandlerContext newCtx) {
|
private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
|
||||||
checkMultiplicity(newCtx);
|
checkMultiplicity(newCtx);
|
||||||
|
|
||||||
DefaultChannelHandlerContext prev = tail.prev;
|
AbstractChannelHandlerContext prev = tail.prev;
|
||||||
newCtx.prev = prev;
|
newCtx.prev = prev;
|
||||||
newCtx.next = tail;
|
newCtx.next = tail;
|
||||||
prev.next = newCtx;
|
prev.next = newCtx;
|
||||||
@ -171,7 +168,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
@Override
|
@Override
|
||||||
public ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
|
public ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
|
AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
|
||||||
checkDuplicateName(name);
|
checkDuplicateName(name);
|
||||||
addBefore0(name, ctx, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
|
addBefore0(name, ctx, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
|
||||||
}
|
}
|
||||||
@ -182,14 +179,15 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
public ChannelPipeline addBefore(
|
public ChannelPipeline addBefore(
|
||||||
ChannelHandlerInvoker invoker, String baseName, final String name, ChannelHandler handler) {
|
ChannelHandlerInvoker invoker, String baseName, final String name, ChannelHandler handler) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
|
AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
|
||||||
checkDuplicateName(name);
|
checkDuplicateName(name);
|
||||||
addBefore0(name, ctx, new DefaultChannelHandlerContext(this, invoker, name, handler));
|
addBefore0(name, ctx, new DefaultChannelHandlerContext(this, invoker, name, handler));
|
||||||
}
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addBefore0(final String name, DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext newCtx) {
|
private void addBefore0(
|
||||||
|
final String name, AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
|
||||||
checkMultiplicity(newCtx);
|
checkMultiplicity(newCtx);
|
||||||
|
|
||||||
newCtx.prev = ctx.prev;
|
newCtx.prev = ctx.prev;
|
||||||
@ -210,7 +208,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
@Override
|
@Override
|
||||||
public ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
|
public ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
|
AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
|
||||||
checkDuplicateName(name);
|
checkDuplicateName(name);
|
||||||
addAfter0(name, ctx, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
|
addAfter0(name, ctx, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
|
||||||
}
|
}
|
||||||
@ -221,14 +219,14 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
public ChannelPipeline addAfter(
|
public ChannelPipeline addAfter(
|
||||||
ChannelHandlerInvoker invoker, String baseName, final String name, ChannelHandler handler) {
|
ChannelHandlerInvoker invoker, String baseName, final String name, ChannelHandler handler) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
|
AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
|
||||||
checkDuplicateName(name);
|
checkDuplicateName(name);
|
||||||
addAfter0(name, ctx, new DefaultChannelHandlerContext(this, invoker, name, handler));
|
addAfter0(name, ctx, new DefaultChannelHandlerContext(this, invoker, name, handler));
|
||||||
}
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addAfter0(final String name, DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext newCtx) {
|
private void addAfter0(final String name, AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
|
||||||
checkDuplicateName(name);
|
checkDuplicateName(name);
|
||||||
checkMultiplicity(newCtx);
|
checkMultiplicity(newCtx);
|
||||||
|
|
||||||
@ -367,7 +365,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
synchronized (cache) {
|
synchronized (cache) {
|
||||||
name = cache.get(handlerType);
|
name = cache.get(handlerType);
|
||||||
if (name == null) {
|
if (name == null) {
|
||||||
name = StringUtil.simpleClassName(handlerType) + "#0";
|
name = generateName0(handlerType);
|
||||||
cache.put(handlerType, name);
|
cache.put(handlerType, name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -390,6 +388,10 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
return name;
|
return name;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static String generateName0(Class<?> handlerType) {
|
||||||
|
return StringUtil.simpleClassName(handlerType) + "#0";
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline remove(ChannelHandler handler) {
|
public ChannelPipeline remove(ChannelHandler handler) {
|
||||||
remove(getContextOrDie(handler));
|
remove(getContextOrDie(handler));
|
||||||
@ -407,10 +409,10 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
return (T) remove(getContextOrDie(handlerType)).handler();
|
return (T) remove(getContextOrDie(handlerType)).handler();
|
||||||
}
|
}
|
||||||
|
|
||||||
private DefaultChannelHandlerContext remove(final DefaultChannelHandlerContext ctx) {
|
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
|
||||||
assert ctx != head && ctx != tail;
|
assert ctx != head && ctx != tail;
|
||||||
|
|
||||||
DefaultChannelHandlerContext context;
|
AbstractChannelHandlerContext context;
|
||||||
Future<?> future;
|
Future<?> future;
|
||||||
|
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
@ -438,9 +440,9 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
|
|
||||||
void remove0(DefaultChannelHandlerContext ctx) {
|
void remove0(AbstractChannelHandlerContext ctx) {
|
||||||
DefaultChannelHandlerContext prev = ctx.prev;
|
AbstractChannelHandlerContext prev = ctx.prev;
|
||||||
DefaultChannelHandlerContext next = ctx.next;
|
AbstractChannelHandlerContext next = ctx.next;
|
||||||
prev.next = next;
|
prev.next = next;
|
||||||
next.prev = prev;
|
next.prev = prev;
|
||||||
name2ctx.remove(ctx.name());
|
name2ctx.remove(ctx.name());
|
||||||
@ -482,7 +484,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private ChannelHandler replace(
|
private ChannelHandler replace(
|
||||||
final DefaultChannelHandlerContext ctx, final String newName,
|
final AbstractChannelHandlerContext ctx, final String newName,
|
||||||
ChannelHandler newHandler) {
|
ChannelHandler newHandler) {
|
||||||
|
|
||||||
assert ctx != head && ctx != tail;
|
assert ctx != head && ctx != tail;
|
||||||
@ -494,7 +496,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
checkDuplicateName(newName);
|
checkDuplicateName(newName);
|
||||||
}
|
}
|
||||||
|
|
||||||
final DefaultChannelHandlerContext newCtx =
|
final AbstractChannelHandlerContext newCtx =
|
||||||
new DefaultChannelHandlerContext(this, ctx.invoker, newName, newHandler);
|
new DefaultChannelHandlerContext(this, ctx.invoker, newName, newHandler);
|
||||||
|
|
||||||
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
|
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
|
||||||
@ -520,12 +522,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
return ctx.handler();
|
return ctx.handler();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void replace0(DefaultChannelHandlerContext oldCtx, String newName,
|
private void replace0(AbstractChannelHandlerContext oldCtx, String newName,
|
||||||
DefaultChannelHandlerContext newCtx) {
|
AbstractChannelHandlerContext newCtx) {
|
||||||
checkMultiplicity(newCtx);
|
checkMultiplicity(newCtx);
|
||||||
|
|
||||||
DefaultChannelHandlerContext prev = oldCtx.prev;
|
AbstractChannelHandlerContext prev = oldCtx.prev;
|
||||||
DefaultChannelHandlerContext next = oldCtx.next;
|
AbstractChannelHandlerContext next = oldCtx.next;
|
||||||
newCtx.prev = prev;
|
newCtx.prev = prev;
|
||||||
newCtx.next = next;
|
newCtx.next = next;
|
||||||
|
|
||||||
@ -565,8 +567,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void callHandlerAdded(final DefaultChannelHandlerContext ctx) {
|
private void callHandlerAdded(final AbstractChannelHandlerContext ctx) {
|
||||||
if ((ctx.skipFlags & DefaultChannelHandlerContext.MASK_HANDLER_ADDED) != 0) {
|
if ((ctx.skipFlags & AbstractChannelHandlerContext.MASK_HANDLER_ADDED) != 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -582,7 +584,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
callHandlerAdded0(ctx);
|
callHandlerAdded0(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void callHandlerAdded0(final DefaultChannelHandlerContext ctx) {
|
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
|
||||||
try {
|
try {
|
||||||
ctx.handler().handlerAdded(ctx);
|
ctx.handler().handlerAdded(ctx);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
@ -608,8 +610,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void callHandlerRemoved(final DefaultChannelHandlerContext ctx) {
|
private void callHandlerRemoved(final AbstractChannelHandlerContext ctx) {
|
||||||
if ((ctx.skipFlags & DefaultChannelHandlerContext.MASK_HANDLER_REMOVED) != 0) {
|
if ((ctx.skipFlags & AbstractChannelHandlerContext.MASK_HANDLER_REMOVED) != 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -625,7 +627,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
callHandlerRemoved0(ctx);
|
callHandlerRemoved0(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void callHandlerRemoved0(final DefaultChannelHandlerContext ctx) {
|
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
|
||||||
// Notify the complete removal.
|
// Notify the complete removal.
|
||||||
try {
|
try {
|
||||||
ctx.handler().handlerRemoved(ctx);
|
ctx.handler().handlerRemoved(ctx);
|
||||||
@ -674,7 +676,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandlerContext firstContext() {
|
public ChannelHandlerContext firstContext() {
|
||||||
DefaultChannelHandlerContext first = head.next;
|
AbstractChannelHandlerContext first = head.next;
|
||||||
if (first == tail) {
|
if (first == tail) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -683,7 +685,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandler last() {
|
public ChannelHandler last() {
|
||||||
DefaultChannelHandlerContext last = tail.prev;
|
AbstractChannelHandlerContext last = tail.prev;
|
||||||
if (last == head) {
|
if (last == head) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -692,7 +694,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandlerContext lastContext() {
|
public ChannelHandlerContext lastContext() {
|
||||||
DefaultChannelHandlerContext last = tail.prev;
|
AbstractChannelHandlerContext last = tail.prev;
|
||||||
if (last == head) {
|
if (last == head) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -737,7 +739,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
throw new NullPointerException("handler");
|
throw new NullPointerException("handler");
|
||||||
}
|
}
|
||||||
|
|
||||||
DefaultChannelHandlerContext ctx = head.next;
|
AbstractChannelHandlerContext ctx = head.next;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
|
|
||||||
if (ctx == null) {
|
if (ctx == null) {
|
||||||
@ -758,7 +760,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
throw new NullPointerException("handlerType");
|
throw new NullPointerException("handlerType");
|
||||||
}
|
}
|
||||||
|
|
||||||
DefaultChannelHandlerContext ctx = head.next;
|
AbstractChannelHandlerContext ctx = head.next;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (ctx == null) {
|
if (ctx == null) {
|
||||||
return null;
|
return null;
|
||||||
@ -773,7 +775,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
@Override
|
@Override
|
||||||
public List<String> names() {
|
public List<String> names() {
|
||||||
List<String> list = new ArrayList<String>();
|
List<String> list = new ArrayList<String>();
|
||||||
DefaultChannelHandlerContext ctx = head.next;
|
AbstractChannelHandlerContext ctx = head.next;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (ctx == null) {
|
if (ctx == null) {
|
||||||
return list;
|
return list;
|
||||||
@ -786,7 +788,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
@Override
|
@Override
|
||||||
public Map<String, ChannelHandler> toMap() {
|
public Map<String, ChannelHandler> toMap() {
|
||||||
Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
|
Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
|
||||||
DefaultChannelHandlerContext ctx = head.next;
|
AbstractChannelHandlerContext ctx = head.next;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (ctx == tail) {
|
if (ctx == tail) {
|
||||||
return map;
|
return map;
|
||||||
@ -809,7 +811,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
StringBuilder buf = new StringBuilder();
|
StringBuilder buf = new StringBuilder();
|
||||||
buf.append(StringUtil.simpleClassName(this));
|
buf.append(StringUtil.simpleClassName(this));
|
||||||
buf.append('{');
|
buf.append('{');
|
||||||
DefaultChannelHandlerContext ctx = head.next;
|
AbstractChannelHandlerContext ctx = head.next;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (ctx == tail) {
|
if (ctx == tail) {
|
||||||
break;
|
break;
|
||||||
@ -1006,8 +1008,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private DefaultChannelHandlerContext getContextOrDie(String name) {
|
private AbstractChannelHandlerContext getContextOrDie(String name) {
|
||||||
DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) context(name);
|
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
|
||||||
if (ctx == null) {
|
if (ctx == null) {
|
||||||
throw new NoSuchElementException(name);
|
throw new NoSuchElementException(name);
|
||||||
} else {
|
} else {
|
||||||
@ -1015,8 +1017,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private DefaultChannelHandlerContext getContextOrDie(ChannelHandler handler) {
|
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
|
||||||
DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) context(handler);
|
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
|
||||||
if (ctx == null) {
|
if (ctx == null) {
|
||||||
throw new NoSuchElementException(handler.getClass().getName());
|
throw new NoSuchElementException(handler.getClass().getName());
|
||||||
} else {
|
} else {
|
||||||
@ -1024,8 +1026,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private DefaultChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
|
private AbstractChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
|
||||||
DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) context(handlerType);
|
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType);
|
||||||
if (ctx == null) {
|
if (ctx == null) {
|
||||||
throw new NoSuchElementException(handlerType.getName());
|
throw new NoSuchElementException(handlerType.getName());
|
||||||
} else {
|
} else {
|
||||||
@ -1033,8 +1035,18 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// A special catch-all handler that handles both bytes and messages.
|
static final class TailContext extends AbstractChannelHandlerContext implements ChannelHandler {
|
||||||
static final class TailHandler extends ChannelHandlerAdapter {
|
private static final int SKIP_FLAGS = skipFlags0(TailContext.class);
|
||||||
|
private static final String TAIL_NAME = generateName0(TailContext.class);
|
||||||
|
|
||||||
|
TailContext(DefaultChannelPipeline pipeline) {
|
||||||
|
super(pipeline, null, TAIL_NAME, SKIP_FLAGS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandler handler() {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
|
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
|
||||||
@ -1074,14 +1086,80 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
|
||||||
|
|
||||||
|
@Skip
|
||||||
|
@Override
|
||||||
|
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }
|
||||||
|
|
||||||
|
@Skip
|
||||||
|
@Override
|
||||||
|
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
|
||||||
|
|
||||||
|
@Skip
|
||||||
|
@Override
|
||||||
|
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
|
||||||
|
throws Exception {
|
||||||
|
ctx.bind(localAddress, promise);
|
||||||
}
|
}
|
||||||
|
|
||||||
static final class HeadHandler extends ChannelHandlerAdapter {
|
@Skip
|
||||||
|
@Override
|
||||||
|
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
|
||||||
|
SocketAddress localAddress, ChannelPromise promise) throws Exception {
|
||||||
|
ctx.connect(remoteAddress, localAddress, promise);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Skip
|
||||||
|
@Override
|
||||||
|
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
|
||||||
|
ctx.disconnect(promise);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Skip
|
||||||
|
@Override
|
||||||
|
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
|
||||||
|
ctx.close(promise);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Skip
|
||||||
|
@Override
|
||||||
|
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
|
||||||
|
ctx.deregister(promise);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Skip
|
||||||
|
@Override
|
||||||
|
public void read(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
ctx.read();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Skip
|
||||||
|
@Override
|
||||||
|
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
||||||
|
ctx.write(msg, promise);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Skip
|
||||||
|
@Override
|
||||||
|
public void flush(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
ctx.flush();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static final class HeadContext extends AbstractChannelHandlerContext implements ChannelHandler {
|
||||||
|
private static final int SKIP_FLAGS = skipFlags0(HeadContext.class);
|
||||||
|
private static final String HEAD_NAME = generateName0(HeadContext.class);
|
||||||
|
|
||||||
private final Unsafe unsafe;
|
private final Unsafe unsafe;
|
||||||
|
|
||||||
HeadHandler(Unsafe unsafe) {
|
HeadContext(DefaultChannelPipeline pipeline) {
|
||||||
this.unsafe = unsafe;
|
super(pipeline, null, HEAD_NAME, SKIP_FLAGS);
|
||||||
|
unsafe = pipeline.channel().unsafe();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandler handler() {
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -1128,5 +1206,67 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
public void flush(ChannelHandlerContext ctx) throws Exception {
|
public void flush(ChannelHandlerContext ctx) throws Exception {
|
||||||
unsafe.flush();
|
unsafe.flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Skip
|
||||||
|
@Override
|
||||||
|
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }
|
||||||
|
|
||||||
|
@Skip
|
||||||
|
@Override
|
||||||
|
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
|
||||||
|
|
||||||
|
@Skip
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||||
|
ctx.fireExceptionCaught(cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Skip
|
||||||
|
@Override
|
||||||
|
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
ctx.fireChannelRegistered();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Skip
|
||||||
|
@Override
|
||||||
|
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
ctx.fireChannelUnregistered();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Skip
|
||||||
|
@Override
|
||||||
|
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
ctx.fireChannelActive();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Skip
|
||||||
|
@Override
|
||||||
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
ctx.fireChannelInactive();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Skip
|
||||||
|
@Override
|
||||||
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||||
|
ctx.fireChannelRead(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Skip
|
||||||
|
@Override
|
||||||
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
ctx.fireChannelReadComplete();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Skip
|
||||||
|
@Override
|
||||||
|
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||||
|
ctx.fireUserEventTriggered(evt);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Skip
|
||||||
|
@Override
|
||||||
|
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
ctx.fireChannelWritabilityChanged();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -259,7 +259,7 @@ public class DefaultChannelPipelineTest {
|
|||||||
pipeline.addBefore("1", "0", newHandler());
|
pipeline.addBefore("1", "0", newHandler());
|
||||||
pipeline.addAfter("10", "11", newHandler());
|
pipeline.addAfter("10", "11", newHandler());
|
||||||
|
|
||||||
DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) pipeline.firstContext();
|
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) pipeline.firstContext();
|
||||||
assertNotNull(ctx);
|
assertNotNull(ctx);
|
||||||
while (ctx != null) {
|
while (ctx != null) {
|
||||||
int i = toInt(ctx.name());
|
int i = toInt(ctx.name());
|
||||||
@ -548,8 +548,8 @@ public class DefaultChannelPipelineTest {
|
|||||||
assertNull(pipeline.last());
|
assertNull(pipeline.last());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static int next(DefaultChannelHandlerContext ctx) {
|
private static int next(AbstractChannelHandlerContext ctx) {
|
||||||
DefaultChannelHandlerContext next = ctx.next;
|
AbstractChannelHandlerContext next = ctx.next;
|
||||||
if (next == null) {
|
if (next == null) {
|
||||||
return Integer.MAX_VALUE;
|
return Integer.MAX_VALUE;
|
||||||
}
|
}
|
||||||
@ -566,7 +566,7 @@ public class DefaultChannelPipelineTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static void verifyContextNumber(ChannelPipeline pipeline, int expectedNumber) {
|
private static void verifyContextNumber(ChannelPipeline pipeline, int expectedNumber) {
|
||||||
DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) pipeline.firstContext();
|
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) pipeline.firstContext();
|
||||||
int handlerNumber = 0;
|
int handlerNumber = 0;
|
||||||
while (ctx != ((DefaultChannelPipeline) pipeline).tail) {
|
while (ctx != ((DefaultChannelPipeline) pipeline).tail) {
|
||||||
handlerNumber++;
|
handlerNumber++;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user