Optimize DefaultChannelPipeline in terms of memory usage and initialization time
Motivation: Each of DefaultChannelPipeline instance creates an head and tail that wraps a handler. These are used to chain together other DefaultChannelHandlerContext that are created once a new ChannelHandler is added. There are a few things here that can be improved in terms of memory usage and initialization time. Modification: - Only generate the name for the tail and head one time as it will never change anyway - Rename DefaultChannelHandlerContext to AbstractChannelHandlerContext and make it abstract - Create a new DefaultChannelHandlerContext that is used when a ChannelHandler is added to the DefaultChannelPipeline - Rename TailHandler to TailContext and HeadHandler to HeadContext and let them extend AbstractChannelHandlerContext. This way we can save 2 object creations per DefaultChannelPipeline Result: - Less memory usage because we have 2 less objects per DefaultChannelPipeline - Faster creation of DefaultChannelPipeline as we not need to generate the name for the head and tail
This commit is contained in:
parent
6b69ccb585
commit
9b468bc275
@ -0,0 +1,998 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2012 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.channel;
|
||||||
|
|
||||||
|
import static io.netty.channel.DefaultChannelPipeline.logger;
|
||||||
|
import io.netty.buffer.ByteBufAllocator;
|
||||||
|
import io.netty.util.DefaultAttributeMap;
|
||||||
|
import io.netty.util.Recycler;
|
||||||
|
import io.netty.util.ReferenceCountUtil;
|
||||||
|
import io.netty.util.concurrent.EventExecutor;
|
||||||
|
import io.netty.util.concurrent.EventExecutorGroup;
|
||||||
|
import io.netty.util.internal.OneTimeTask;
|
||||||
|
import io.netty.util.internal.StringUtil;
|
||||||
|
|
||||||
|
import java.net.SocketAddress;
|
||||||
|
|
||||||
|
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext {
|
||||||
|
|
||||||
|
volatile AbstractChannelHandlerContext next;
|
||||||
|
volatile AbstractChannelHandlerContext prev;
|
||||||
|
|
||||||
|
private final boolean inbound;
|
||||||
|
private final boolean outbound;
|
||||||
|
private final AbstractChannel channel;
|
||||||
|
private final DefaultChannelPipeline pipeline;
|
||||||
|
private final String name;
|
||||||
|
private boolean removed;
|
||||||
|
|
||||||
|
// Will be set to null if no child executor should be used, otherwise it will be set to the
|
||||||
|
// child executor.
|
||||||
|
final EventExecutor executor;
|
||||||
|
private ChannelFuture succeededFuture;
|
||||||
|
|
||||||
|
// Lazily instantiated tasks used to trigger events to a handler with different executor.
|
||||||
|
// These needs to be volatile as otherwise an other Thread may see an half initialized instance.
|
||||||
|
// See the JMM for more details
|
||||||
|
private volatile Runnable invokeChannelReadCompleteTask;
|
||||||
|
private volatile Runnable invokeReadTask;
|
||||||
|
private volatile Runnable invokeChannelWritableStateChangedTask;
|
||||||
|
private volatile Runnable invokeFlushTask;
|
||||||
|
|
||||||
|
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutorGroup group, String name,
|
||||||
|
boolean inbound, boolean outbound) {
|
||||||
|
|
||||||
|
if (name == null) {
|
||||||
|
throw new NullPointerException("name");
|
||||||
|
}
|
||||||
|
|
||||||
|
channel = pipeline.channel;
|
||||||
|
this.pipeline = pipeline;
|
||||||
|
this.name = name;
|
||||||
|
|
||||||
|
if (group != null) {
|
||||||
|
// Pin one of the child executors once and remember it so that the same child executor
|
||||||
|
// is used to fire events for the same channel.
|
||||||
|
EventExecutor childExecutor = pipeline.childExecutors.get(group);
|
||||||
|
if (childExecutor == null) {
|
||||||
|
childExecutor = group.next();
|
||||||
|
pipeline.childExecutors.put(group, childExecutor);
|
||||||
|
}
|
||||||
|
executor = childExecutor;
|
||||||
|
} else {
|
||||||
|
executor = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.inbound = inbound;
|
||||||
|
this.outbound = outbound;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Invocation initiated by {@link DefaultChannelPipeline#teardownAll()}}. */
|
||||||
|
void teardown() {
|
||||||
|
EventExecutor executor = executor();
|
||||||
|
if (executor.inEventLoop()) {
|
||||||
|
teardown0();
|
||||||
|
} else {
|
||||||
|
executor.execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
teardown0();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void teardown0() {
|
||||||
|
AbstractChannelHandlerContext prev = this.prev;
|
||||||
|
if (prev != null) {
|
||||||
|
synchronized (pipeline) {
|
||||||
|
pipeline.remove0(this);
|
||||||
|
}
|
||||||
|
prev.teardown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Channel channel() {
|
||||||
|
return channel;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelPipeline pipeline() {
|
||||||
|
return pipeline;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ByteBufAllocator alloc() {
|
||||||
|
return channel().config().getAllocator();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EventExecutor executor() {
|
||||||
|
if (executor == null) {
|
||||||
|
return channel().eventLoop();
|
||||||
|
} else {
|
||||||
|
return executor;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String name() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandlerContext fireChannelRegistered() {
|
||||||
|
final AbstractChannelHandlerContext next = findContextInbound();
|
||||||
|
EventExecutor executor = next.executor();
|
||||||
|
if (executor.inEventLoop()) {
|
||||||
|
next.invokeChannelRegistered();
|
||||||
|
} else {
|
||||||
|
executor.execute(new OneTimeTask() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
next.invokeChannelRegistered();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void invokeChannelRegistered() {
|
||||||
|
try {
|
||||||
|
((ChannelInboundHandler) handler()).channelRegistered(this);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
notifyHandlerException(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandlerContext fireChannelUnregistered() {
|
||||||
|
final AbstractChannelHandlerContext next = findContextInbound();
|
||||||
|
EventExecutor executor = next.executor();
|
||||||
|
if (executor.inEventLoop()) {
|
||||||
|
next.invokeChannelUnregistered();
|
||||||
|
} else {
|
||||||
|
executor.execute(new OneTimeTask() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
next.invokeChannelUnregistered();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void invokeChannelUnregistered() {
|
||||||
|
try {
|
||||||
|
((ChannelInboundHandler) handler()).channelUnregistered(this);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
notifyHandlerException(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandlerContext fireChannelActive() {
|
||||||
|
final AbstractChannelHandlerContext next = findContextInbound();
|
||||||
|
EventExecutor executor = next.executor();
|
||||||
|
if (executor.inEventLoop()) {
|
||||||
|
next.invokeChannelActive();
|
||||||
|
} else {
|
||||||
|
executor.execute(new OneTimeTask() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
next.invokeChannelActive();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void invokeChannelActive() {
|
||||||
|
try {
|
||||||
|
((ChannelInboundHandler) handler()).channelActive(this);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
notifyHandlerException(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandlerContext fireChannelInactive() {
|
||||||
|
final AbstractChannelHandlerContext next = findContextInbound();
|
||||||
|
EventExecutor executor = next.executor();
|
||||||
|
if (executor.inEventLoop()) {
|
||||||
|
next.invokeChannelInactive();
|
||||||
|
} else {
|
||||||
|
executor.execute(new OneTimeTask() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
next.invokeChannelInactive();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void invokeChannelInactive() {
|
||||||
|
try {
|
||||||
|
((ChannelInboundHandler) handler()).channelInactive(this);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
notifyHandlerException(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
|
||||||
|
if (cause == null) {
|
||||||
|
throw new NullPointerException("cause");
|
||||||
|
}
|
||||||
|
|
||||||
|
final AbstractChannelHandlerContext next = this.next;
|
||||||
|
|
||||||
|
EventExecutor executor = next.executor();
|
||||||
|
if (executor.inEventLoop()) {
|
||||||
|
next.invokeExceptionCaught(cause);
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
executor.execute(new OneTimeTask() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
next.invokeExceptionCaught(cause);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (Throwable t) {
|
||||||
|
if (logger.isWarnEnabled()) {
|
||||||
|
logger.warn("Failed to submit an exceptionCaught() event.", t);
|
||||||
|
logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void invokeExceptionCaught(final Throwable cause) {
|
||||||
|
try {
|
||||||
|
handler().exceptionCaught(this, cause);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
if (logger.isWarnEnabled()) {
|
||||||
|
logger.warn(
|
||||||
|
"An exception was thrown by a user handler's " +
|
||||||
|
"exceptionCaught() method while handling the following exception:", cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandlerContext fireUserEventTriggered(final Object event) {
|
||||||
|
if (event == null) {
|
||||||
|
throw new NullPointerException("event");
|
||||||
|
}
|
||||||
|
|
||||||
|
final AbstractChannelHandlerContext next = findContextInbound();
|
||||||
|
EventExecutor executor = next.executor();
|
||||||
|
if (executor.inEventLoop()) {
|
||||||
|
next.invokeUserEventTriggered(event);
|
||||||
|
} else {
|
||||||
|
executor.execute(new OneTimeTask() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
next.invokeUserEventTriggered(event);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void invokeUserEventTriggered(Object event) {
|
||||||
|
try {
|
||||||
|
((ChannelInboundHandler) handler()).userEventTriggered(this, event);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
notifyHandlerException(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandlerContext fireChannelRead(final Object msg) {
|
||||||
|
if (msg == null) {
|
||||||
|
throw new NullPointerException("msg");
|
||||||
|
}
|
||||||
|
|
||||||
|
final AbstractChannelHandlerContext next = findContextInbound();
|
||||||
|
EventExecutor executor = next.executor();
|
||||||
|
if (executor.inEventLoop()) {
|
||||||
|
next.invokeChannelRead(msg);
|
||||||
|
} else {
|
||||||
|
executor.execute(new OneTimeTask() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
next.invokeChannelRead(msg);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void invokeChannelRead(Object msg) {
|
||||||
|
try {
|
||||||
|
((ChannelInboundHandler) handler()).channelRead(this, msg);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
notifyHandlerException(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandlerContext fireChannelReadComplete() {
|
||||||
|
final AbstractChannelHandlerContext next = findContextInbound();
|
||||||
|
EventExecutor executor = next.executor();
|
||||||
|
if (executor.inEventLoop()) {
|
||||||
|
next.invokeChannelReadComplete();
|
||||||
|
} else {
|
||||||
|
Runnable task = next.invokeChannelReadCompleteTask;
|
||||||
|
if (task == null) {
|
||||||
|
next.invokeChannelReadCompleteTask = task = new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
next.invokeChannelReadComplete();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
executor.execute(task);
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void invokeChannelReadComplete() {
|
||||||
|
try {
|
||||||
|
((ChannelInboundHandler) handler()).channelReadComplete(this);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
notifyHandlerException(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandlerContext fireChannelWritabilityChanged() {
|
||||||
|
final AbstractChannelHandlerContext next = findContextInbound();
|
||||||
|
EventExecutor executor = next.executor();
|
||||||
|
if (executor.inEventLoop()) {
|
||||||
|
next.invokeChannelWritabilityChanged();
|
||||||
|
} else {
|
||||||
|
Runnable task = next.invokeChannelWritableStateChangedTask;
|
||||||
|
if (task == null) {
|
||||||
|
next.invokeChannelWritableStateChangedTask = task = new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
next.invokeChannelWritabilityChanged();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
executor.execute(task);
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void invokeChannelWritabilityChanged() {
|
||||||
|
try {
|
||||||
|
((ChannelInboundHandler) handler()).channelWritabilityChanged(this);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
notifyHandlerException(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture bind(SocketAddress localAddress) {
|
||||||
|
return bind(localAddress, newPromise());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture connect(SocketAddress remoteAddress) {
|
||||||
|
return connect(remoteAddress, newPromise());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
|
||||||
|
return connect(remoteAddress, localAddress, newPromise());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture disconnect() {
|
||||||
|
return disconnect(newPromise());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture close() {
|
||||||
|
return close(newPromise());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture deregister() {
|
||||||
|
return deregister(newPromise());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
|
||||||
|
if (localAddress == null) {
|
||||||
|
throw new NullPointerException("localAddress");
|
||||||
|
}
|
||||||
|
if (!validatePromise(promise, false)) {
|
||||||
|
// cancelled
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
final AbstractChannelHandlerContext next = findContextOutbound();
|
||||||
|
EventExecutor executor = next.executor();
|
||||||
|
if (executor.inEventLoop()) {
|
||||||
|
next.invokeBind(localAddress, promise);
|
||||||
|
} else {
|
||||||
|
safeExecute(executor, new OneTimeTask() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
next.invokeBind(localAddress, promise);
|
||||||
|
}
|
||||||
|
}, promise, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
|
||||||
|
try {
|
||||||
|
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
notifyOutboundHandlerException(t, promise);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
|
||||||
|
return connect(remoteAddress, null, promise);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture connect(
|
||||||
|
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
|
||||||
|
|
||||||
|
if (remoteAddress == null) {
|
||||||
|
throw new NullPointerException("remoteAddress");
|
||||||
|
}
|
||||||
|
if (!validatePromise(promise, false)) {
|
||||||
|
// cancelled
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
final AbstractChannelHandlerContext next = findContextOutbound();
|
||||||
|
EventExecutor executor = next.executor();
|
||||||
|
if (executor.inEventLoop()) {
|
||||||
|
next.invokeConnect(remoteAddress, localAddress, promise);
|
||||||
|
} else {
|
||||||
|
safeExecute(executor, new OneTimeTask() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
next.invokeConnect(remoteAddress, localAddress, promise);
|
||||||
|
}
|
||||||
|
}, promise, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
|
||||||
|
try {
|
||||||
|
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
notifyOutboundHandlerException(t, promise);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture disconnect(final ChannelPromise promise) {
|
||||||
|
if (!validatePromise(promise, false)) {
|
||||||
|
// cancelled
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
final AbstractChannelHandlerContext next = findContextOutbound();
|
||||||
|
EventExecutor executor = next.executor();
|
||||||
|
if (executor.inEventLoop()) {
|
||||||
|
// Translate disconnect to close if the channel has no notion of disconnect-reconnect.
|
||||||
|
// So far, UDP/IP is the only transport that has such behavior.
|
||||||
|
if (!channel().metadata().hasDisconnect()) {
|
||||||
|
next.invokeClose(promise);
|
||||||
|
} else {
|
||||||
|
next.invokeDisconnect(promise);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
safeExecute(executor, new OneTimeTask() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
if (!channel().metadata().hasDisconnect()) {
|
||||||
|
next.invokeClose(promise);
|
||||||
|
} else {
|
||||||
|
next.invokeDisconnect(promise);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, promise, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void invokeDisconnect(ChannelPromise promise) {
|
||||||
|
try {
|
||||||
|
((ChannelOutboundHandler) handler()).disconnect(this, promise);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
notifyOutboundHandlerException(t, promise);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture close(final ChannelPromise promise) {
|
||||||
|
if (!validatePromise(promise, false)) {
|
||||||
|
// cancelled
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
final AbstractChannelHandlerContext next = findContextOutbound();
|
||||||
|
EventExecutor executor = next.executor();
|
||||||
|
if (executor.inEventLoop()) {
|
||||||
|
next.invokeClose(promise);
|
||||||
|
} else {
|
||||||
|
safeExecute(executor, new OneTimeTask() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
next.invokeClose(promise);
|
||||||
|
}
|
||||||
|
}, promise, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void invokeClose(ChannelPromise promise) {
|
||||||
|
try {
|
||||||
|
((ChannelOutboundHandler) handler()).close(this, promise);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
notifyOutboundHandlerException(t, promise);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture deregister(final ChannelPromise promise) {
|
||||||
|
if (!validatePromise(promise, false)) {
|
||||||
|
// cancelled
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
final AbstractChannelHandlerContext next = findContextOutbound();
|
||||||
|
EventExecutor executor = next.executor();
|
||||||
|
if (executor.inEventLoop()) {
|
||||||
|
next.invokeDeregister(promise);
|
||||||
|
} else {
|
||||||
|
safeExecute(executor, new OneTimeTask() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
next.invokeDeregister(promise);
|
||||||
|
}
|
||||||
|
}, promise, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void invokeDeregister(ChannelPromise promise) {
|
||||||
|
try {
|
||||||
|
((ChannelOutboundHandler) handler()).deregister(this, promise);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
notifyOutboundHandlerException(t, promise);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandlerContext read() {
|
||||||
|
final AbstractChannelHandlerContext next = findContextOutbound();
|
||||||
|
EventExecutor executor = next.executor();
|
||||||
|
if (executor.inEventLoop()) {
|
||||||
|
next.invokeRead();
|
||||||
|
} else {
|
||||||
|
Runnable task = next.invokeReadTask;
|
||||||
|
if (task == null) {
|
||||||
|
next.invokeReadTask = task = new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
next.invokeRead();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
executor.execute(task);
|
||||||
|
}
|
||||||
|
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void invokeRead() {
|
||||||
|
try {
|
||||||
|
((ChannelOutboundHandler) handler()).read(this);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
notifyHandlerException(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture write(Object msg) {
|
||||||
|
return write(msg, newPromise());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
|
||||||
|
if (msg == null) {
|
||||||
|
throw new NullPointerException("msg");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!validatePromise(promise, true)) {
|
||||||
|
ReferenceCountUtil.release(msg);
|
||||||
|
// cancelled
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
write(msg, false, promise);
|
||||||
|
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void invokeWrite(Object msg, ChannelPromise promise) {
|
||||||
|
try {
|
||||||
|
((ChannelOutboundHandler) handler()).write(this, msg, promise);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
notifyOutboundHandlerException(t, promise);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandlerContext flush() {
|
||||||
|
final AbstractChannelHandlerContext next = findContextOutbound();
|
||||||
|
EventExecutor executor = next.executor();
|
||||||
|
if (executor.inEventLoop()) {
|
||||||
|
next.invokeFlush();
|
||||||
|
} else {
|
||||||
|
Runnable task = next.invokeFlushTask;
|
||||||
|
if (task == null) {
|
||||||
|
next.invokeFlushTask = task = new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
next.invokeFlush();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
safeExecute(executor, task, channel.voidPromise(), null);
|
||||||
|
}
|
||||||
|
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void invokeFlush() {
|
||||||
|
try {
|
||||||
|
((ChannelOutboundHandler) handler()).flush(this);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
notifyHandlerException(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
|
||||||
|
if (msg == null) {
|
||||||
|
throw new NullPointerException("msg");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!validatePromise(promise, true)) {
|
||||||
|
ReferenceCountUtil.release(msg);
|
||||||
|
// cancelled
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
write(msg, true, promise);
|
||||||
|
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void write(Object msg, boolean flush, ChannelPromise promise) {
|
||||||
|
|
||||||
|
AbstractChannelHandlerContext next = findContextOutbound();
|
||||||
|
EventExecutor executor = next.executor();
|
||||||
|
if (executor.inEventLoop()) {
|
||||||
|
next.invokeWrite(msg, promise);
|
||||||
|
if (flush) {
|
||||||
|
next.invokeFlush();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
int size = channel.estimatorHandle().size(msg);
|
||||||
|
if (size > 0) {
|
||||||
|
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
|
||||||
|
// Check for null as it may be set to null if the channel is closed already
|
||||||
|
if (buffer != null) {
|
||||||
|
buffer.incrementPendingOutboundBytes(size);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Runnable task;
|
||||||
|
if (flush) {
|
||||||
|
task = WriteAndFlushTask.newInstance(next, msg, size, promise);
|
||||||
|
} else {
|
||||||
|
task = WriteTask.newInstance(next, msg, size, promise);
|
||||||
|
}
|
||||||
|
safeExecute(executor, task, promise, msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture writeAndFlush(Object msg) {
|
||||||
|
return writeAndFlush(msg, newPromise());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
|
||||||
|
// only try to fail the promise if its not a VoidChannelPromise, as
|
||||||
|
// the VoidChannelPromise would also fire the cause through the pipeline
|
||||||
|
if (promise instanceof VoidChannelPromise) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!promise.tryFailure(cause)) {
|
||||||
|
if (logger.isWarnEnabled()) {
|
||||||
|
logger.warn("Failed to fail the promise because it's done already: {}", promise, cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void notifyHandlerException(Throwable cause) {
|
||||||
|
if (inExceptionCaught(cause)) {
|
||||||
|
if (logger.isWarnEnabled()) {
|
||||||
|
logger.warn(
|
||||||
|
"An exception was thrown by a user handler " +
|
||||||
|
"while handling an exceptionCaught event", cause);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
invokeExceptionCaught(cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean inExceptionCaught(Throwable cause) {
|
||||||
|
do {
|
||||||
|
StackTraceElement[] trace = cause.getStackTrace();
|
||||||
|
if (trace != null) {
|
||||||
|
for (StackTraceElement t : trace) {
|
||||||
|
if (t == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if ("exceptionCaught".equals(t.getMethodName())) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cause = cause.getCause();
|
||||||
|
} while (cause != null);
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelPromise newPromise() {
|
||||||
|
return new DefaultChannelPromise(channel(), executor());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelProgressivePromise newProgressivePromise() {
|
||||||
|
return new DefaultChannelProgressivePromise(channel(), executor());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture newSucceededFuture() {
|
||||||
|
ChannelFuture succeededFuture = this.succeededFuture;
|
||||||
|
if (succeededFuture == null) {
|
||||||
|
this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
|
||||||
|
}
|
||||||
|
return succeededFuture;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture newFailedFuture(Throwable cause) {
|
||||||
|
return new FailedChannelFuture(channel(), executor(), cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean validatePromise(ChannelPromise promise, boolean allowVoidPromise) {
|
||||||
|
if (promise == null) {
|
||||||
|
throw new NullPointerException("promise");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (promise.isDone()) {
|
||||||
|
// Check if the promise was cancelled and if so signal that the processing of the operation
|
||||||
|
// should not be performed.
|
||||||
|
//
|
||||||
|
// See https://github.com/netty/netty/issues/2349
|
||||||
|
if (promise.isCancelled()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
throw new IllegalArgumentException("promise already done: " + promise);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (promise.channel() != channel()) {
|
||||||
|
throw new IllegalArgumentException(String.format(
|
||||||
|
"promise.channel does not match: %s (expected: %s)", promise.channel(), channel()));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (promise.getClass() == DefaultChannelPromise.class) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (promise instanceof AbstractChannel.CloseFuture) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private AbstractChannelHandlerContext findContextInbound() {
|
||||||
|
AbstractChannelHandlerContext ctx = this;
|
||||||
|
do {
|
||||||
|
ctx = ctx.next;
|
||||||
|
} while (!ctx.inbound);
|
||||||
|
return ctx;
|
||||||
|
}
|
||||||
|
|
||||||
|
private AbstractChannelHandlerContext findContextOutbound() {
|
||||||
|
AbstractChannelHandlerContext ctx = this;
|
||||||
|
do {
|
||||||
|
ctx = ctx.prev;
|
||||||
|
} while (!ctx.outbound);
|
||||||
|
return ctx;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelPromise voidPromise() {
|
||||||
|
return channel.voidPromise();
|
||||||
|
}
|
||||||
|
|
||||||
|
void setRemoved() {
|
||||||
|
removed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isRemoved() {
|
||||||
|
return removed;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
|
||||||
|
try {
|
||||||
|
executor.execute(runnable);
|
||||||
|
} catch (Throwable cause) {
|
||||||
|
try {
|
||||||
|
promise.setFailure(cause);
|
||||||
|
} finally {
|
||||||
|
if (msg != null) {
|
||||||
|
ReferenceCountUtil.release(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract static class AbstractWriteTask extends OneTimeTask {
|
||||||
|
private final Recycler.Handle handle;
|
||||||
|
|
||||||
|
private AbstractChannelHandlerContext ctx;
|
||||||
|
private Object msg;
|
||||||
|
private ChannelPromise promise;
|
||||||
|
private int size;
|
||||||
|
|
||||||
|
private AbstractWriteTask(Recycler.Handle handle) {
|
||||||
|
this.handle = handle;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static void init(AbstractWriteTask task, AbstractChannelHandlerContext ctx,
|
||||||
|
Object msg, int size, ChannelPromise promise) {
|
||||||
|
task.ctx = ctx;
|
||||||
|
task.msg = msg;
|
||||||
|
task.promise = promise;
|
||||||
|
task.size = size;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final void run() {
|
||||||
|
try {
|
||||||
|
if (size > 0) {
|
||||||
|
ChannelOutboundBuffer buffer = ctx.channel.unsafe().outboundBuffer();
|
||||||
|
// Check for null as it may be set to null if the channel is closed already
|
||||||
|
if (buffer != null) {
|
||||||
|
buffer.decrementPendingOutboundBytes(size);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
write(ctx, msg, promise);
|
||||||
|
} finally {
|
||||||
|
// Set to null so the GC can collect them directly
|
||||||
|
ctx = null;
|
||||||
|
msg = null;
|
||||||
|
promise = null;
|
||||||
|
recycle(handle);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
|
||||||
|
ctx.invokeWrite(msg, promise);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract void recycle(Recycler.Handle handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable {
|
||||||
|
|
||||||
|
private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
|
||||||
|
@Override
|
||||||
|
protected WriteTask newObject(Handle handle) {
|
||||||
|
return new WriteTask(handle);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
private static WriteTask newInstance(
|
||||||
|
AbstractChannelHandlerContext ctx, Object msg, int size, ChannelPromise promise) {
|
||||||
|
WriteTask task = RECYCLER.get();
|
||||||
|
init(task, ctx, msg, size, promise);
|
||||||
|
return task;
|
||||||
|
}
|
||||||
|
|
||||||
|
private WriteTask(Recycler.Handle handle) {
|
||||||
|
super(handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void recycle(Recycler.Handle handle) {
|
||||||
|
RECYCLER.recycle(this, handle);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static final class WriteAndFlushTask extends AbstractWriteTask {
|
||||||
|
|
||||||
|
private static final Recycler<WriteAndFlushTask> RECYCLER = new Recycler<WriteAndFlushTask>() {
|
||||||
|
@Override
|
||||||
|
protected WriteAndFlushTask newObject(Handle handle) {
|
||||||
|
return new WriteAndFlushTask(handle);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
private static WriteAndFlushTask newInstance(
|
||||||
|
AbstractChannelHandlerContext ctx, Object msg, int size, ChannelPromise promise) {
|
||||||
|
WriteAndFlushTask task = RECYCLER.get();
|
||||||
|
init(task, ctx, msg, size, promise);
|
||||||
|
return task;
|
||||||
|
}
|
||||||
|
|
||||||
|
private WriteAndFlushTask(Recycler.Handle handle) {
|
||||||
|
super(handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
|
||||||
|
super.write(ctx, msg, promise);
|
||||||
|
ctx.invokeFlush();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void recycle(Recycler.Handle handle) {
|
||||||
|
RECYCLER.recycle(this, handle);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
@ -57,11 +57,11 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
|
|
||||||
final AbstractChannel channel;
|
final AbstractChannel channel;
|
||||||
|
|
||||||
final DefaultChannelHandlerContext head;
|
final AbstractChannelHandlerContext head;
|
||||||
final DefaultChannelHandlerContext tail;
|
final AbstractChannelHandlerContext tail;
|
||||||
|
|
||||||
private final Map<String, DefaultChannelHandlerContext> name2ctx =
|
private final Map<String, AbstractChannelHandlerContext> name2ctx =
|
||||||
new HashMap<String, DefaultChannelHandlerContext>(4);
|
new HashMap<String, AbstractChannelHandlerContext>(4);
|
||||||
|
|
||||||
final Map<EventExecutorGroup, EventExecutor> childExecutors =
|
final Map<EventExecutorGroup, EventExecutor> childExecutors =
|
||||||
new IdentityHashMap<EventExecutorGroup, EventExecutor>();
|
new IdentityHashMap<EventExecutorGroup, EventExecutor>();
|
||||||
@ -72,11 +72,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
this.channel = channel;
|
this.channel = channel;
|
||||||
|
|
||||||
TailHandler tailHandler = new TailHandler();
|
tail = new TailContext(this);
|
||||||
tail = new DefaultChannelHandlerContext(this, null, generateName(tailHandler), tailHandler);
|
head = new HeadContext(this);
|
||||||
|
|
||||||
HeadHandler headHandler = new HeadHandler(channel.unsafe());
|
|
||||||
head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler);
|
|
||||||
|
|
||||||
head.next = tail;
|
head.next = tail;
|
||||||
tail.prev = head;
|
tail.prev = head;
|
||||||
@ -96,17 +93,17 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
public ChannelPipeline addFirst(EventExecutorGroup group, final String name, ChannelHandler handler) {
|
public ChannelPipeline addFirst(EventExecutorGroup group, final String name, ChannelHandler handler) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
checkDuplicateName(name);
|
checkDuplicateName(name);
|
||||||
DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
|
AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
|
||||||
addFirst0(name, newCtx);
|
addFirst0(name, newCtx);
|
||||||
}
|
}
|
||||||
|
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addFirst0(String name, DefaultChannelHandlerContext newCtx) {
|
private void addFirst0(String name, AbstractChannelHandlerContext newCtx) {
|
||||||
checkMultiplicity(newCtx);
|
checkMultiplicity(newCtx);
|
||||||
|
|
||||||
DefaultChannelHandlerContext nextCtx = head.next;
|
AbstractChannelHandlerContext nextCtx = head.next;
|
||||||
newCtx.prev = head;
|
newCtx.prev = head;
|
||||||
newCtx.next = nextCtx;
|
newCtx.next = nextCtx;
|
||||||
head.next = newCtx;
|
head.next = newCtx;
|
||||||
@ -127,17 +124,17 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
checkDuplicateName(name);
|
checkDuplicateName(name);
|
||||||
|
|
||||||
DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
|
AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
|
||||||
addLast0(name, newCtx);
|
addLast0(name, newCtx);
|
||||||
}
|
}
|
||||||
|
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addLast0(final String name, DefaultChannelHandlerContext newCtx) {
|
private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
|
||||||
checkMultiplicity(newCtx);
|
checkMultiplicity(newCtx);
|
||||||
|
|
||||||
DefaultChannelHandlerContext prev = tail.prev;
|
AbstractChannelHandlerContext prev = tail.prev;
|
||||||
newCtx.prev = prev;
|
newCtx.prev = prev;
|
||||||
newCtx.next = tail;
|
newCtx.next = tail;
|
||||||
prev.next = newCtx;
|
prev.next = newCtx;
|
||||||
@ -157,15 +154,16 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
public ChannelPipeline addBefore(
|
public ChannelPipeline addBefore(
|
||||||
EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) {
|
EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
|
AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
|
||||||
checkDuplicateName(name);
|
checkDuplicateName(name);
|
||||||
DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
|
AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
|
||||||
addBefore0(name, ctx, newCtx);
|
addBefore0(name, ctx, newCtx);
|
||||||
}
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addBefore0(final String name, DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext newCtx) {
|
private void addBefore0(
|
||||||
|
final String name, AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
|
||||||
checkMultiplicity(newCtx);
|
checkMultiplicity(newCtx);
|
||||||
|
|
||||||
newCtx.prev = ctx.prev;
|
newCtx.prev = ctx.prev;
|
||||||
@ -187,9 +185,9 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
public ChannelPipeline addAfter(
|
public ChannelPipeline addAfter(
|
||||||
EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) {
|
EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
|
AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
|
||||||
checkDuplicateName(name);
|
checkDuplicateName(name);
|
||||||
DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
|
AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
|
||||||
|
|
||||||
addAfter0(name, ctx, newCtx);
|
addAfter0(name, ctx, newCtx);
|
||||||
}
|
}
|
||||||
@ -197,7 +195,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addAfter0(final String name, DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext newCtx) {
|
private void addAfter0(final String name, AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
|
||||||
checkDuplicateName(name);
|
checkDuplicateName(name);
|
||||||
checkMultiplicity(newCtx);
|
checkMultiplicity(newCtx);
|
||||||
|
|
||||||
@ -268,7 +266,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
synchronized (cache) {
|
synchronized (cache) {
|
||||||
name = cache.get(handlerType);
|
name = cache.get(handlerType);
|
||||||
if (name == null) {
|
if (name == null) {
|
||||||
name = StringUtil.simpleClassName(handlerType) + "#0";
|
name = generateName0(handlerType);
|
||||||
cache.put(handlerType, name);
|
cache.put(handlerType, name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -291,6 +289,10 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
return name;
|
return name;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static String generateName0(Class<?> handlerType) {
|
||||||
|
return StringUtil.simpleClassName(handlerType) + "#0";
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline remove(ChannelHandler handler) {
|
public ChannelPipeline remove(ChannelHandler handler) {
|
||||||
remove(getContextOrDie(handler));
|
remove(getContextOrDie(handler));
|
||||||
@ -308,10 +310,10 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
return (T) remove(getContextOrDie(handlerType)).handler();
|
return (T) remove(getContextOrDie(handlerType)).handler();
|
||||||
}
|
}
|
||||||
|
|
||||||
private DefaultChannelHandlerContext remove(final DefaultChannelHandlerContext ctx) {
|
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
|
||||||
assert ctx != head && ctx != tail;
|
assert ctx != head && ctx != tail;
|
||||||
|
|
||||||
DefaultChannelHandlerContext context;
|
AbstractChannelHandlerContext context;
|
||||||
Future<?> future;
|
Future<?> future;
|
||||||
|
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
@ -339,9 +341,9 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
|
|
||||||
void remove0(DefaultChannelHandlerContext ctx) {
|
void remove0(AbstractChannelHandlerContext ctx) {
|
||||||
DefaultChannelHandlerContext prev = ctx.prev;
|
AbstractChannelHandlerContext prev = ctx.prev;
|
||||||
DefaultChannelHandlerContext next = ctx.next;
|
AbstractChannelHandlerContext next = ctx.next;
|
||||||
prev.next = next;
|
prev.next = next;
|
||||||
next.prev = prev;
|
next.prev = prev;
|
||||||
name2ctx.remove(ctx.name());
|
name2ctx.remove(ctx.name());
|
||||||
@ -383,7 +385,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private ChannelHandler replace(
|
private ChannelHandler replace(
|
||||||
final DefaultChannelHandlerContext ctx, final String newName,
|
final AbstractChannelHandlerContext ctx, final String newName,
|
||||||
ChannelHandler newHandler) {
|
ChannelHandler newHandler) {
|
||||||
|
|
||||||
assert ctx != head && ctx != tail;
|
assert ctx != head && ctx != tail;
|
||||||
@ -395,7 +397,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
checkDuplicateName(newName);
|
checkDuplicateName(newName);
|
||||||
}
|
}
|
||||||
|
|
||||||
final DefaultChannelHandlerContext newCtx =
|
final AbstractChannelHandlerContext newCtx =
|
||||||
new DefaultChannelHandlerContext(this, ctx.executor, newName, newHandler);
|
new DefaultChannelHandlerContext(this, ctx.executor, newName, newHandler);
|
||||||
|
|
||||||
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
|
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
|
||||||
@ -421,12 +423,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
return ctx.handler();
|
return ctx.handler();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void replace0(DefaultChannelHandlerContext oldCtx, String newName,
|
private void replace0(AbstractChannelHandlerContext oldCtx, String newName,
|
||||||
DefaultChannelHandlerContext newCtx) {
|
AbstractChannelHandlerContext newCtx) {
|
||||||
checkMultiplicity(newCtx);
|
checkMultiplicity(newCtx);
|
||||||
|
|
||||||
DefaultChannelHandlerContext prev = oldCtx.prev;
|
AbstractChannelHandlerContext prev = oldCtx.prev;
|
||||||
DefaultChannelHandlerContext next = oldCtx.next;
|
AbstractChannelHandlerContext next = oldCtx.next;
|
||||||
newCtx.prev = prev;
|
newCtx.prev = prev;
|
||||||
newCtx.next = next;
|
newCtx.next = next;
|
||||||
|
|
||||||
@ -485,7 +487,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
boolean removed = false;
|
boolean removed = false;
|
||||||
try {
|
try {
|
||||||
remove((DefaultChannelHandlerContext) ctx);
|
remove((AbstractChannelHandlerContext) ctx);
|
||||||
removed = true;
|
removed = true;
|
||||||
} catch (Throwable t2) {
|
} catch (Throwable t2) {
|
||||||
if (logger.isWarnEnabled()) {
|
if (logger.isWarnEnabled()) {
|
||||||
@ -505,7 +507,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void callHandlerRemoved(final DefaultChannelHandlerContext ctx) {
|
private void callHandlerRemoved(final AbstractChannelHandlerContext ctx) {
|
||||||
if (ctx.channel().isRegistered() && !ctx.executor().inEventLoop()) {
|
if (ctx.channel().isRegistered() && !ctx.executor().inEventLoop()) {
|
||||||
ctx.executor().execute(new Runnable() {
|
ctx.executor().execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
@ -518,7 +520,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
callHandlerRemoved0(ctx);
|
callHandlerRemoved0(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void callHandlerRemoved0(final DefaultChannelHandlerContext ctx) {
|
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
|
||||||
// Notify the complete removal.
|
// Notify the complete removal.
|
||||||
try {
|
try {
|
||||||
ctx.handler().handlerRemoved(ctx);
|
ctx.handler().handlerRemoved(ctx);
|
||||||
@ -567,7 +569,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandlerContext firstContext() {
|
public ChannelHandlerContext firstContext() {
|
||||||
DefaultChannelHandlerContext first = head.next;
|
AbstractChannelHandlerContext first = head.next;
|
||||||
if (first == tail) {
|
if (first == tail) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -576,7 +578,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandler last() {
|
public ChannelHandler last() {
|
||||||
DefaultChannelHandlerContext last = tail.prev;
|
AbstractChannelHandlerContext last = tail.prev;
|
||||||
if (last == head) {
|
if (last == head) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -585,7 +587,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandlerContext lastContext() {
|
public ChannelHandlerContext lastContext() {
|
||||||
DefaultChannelHandlerContext last = tail.prev;
|
AbstractChannelHandlerContext last = tail.prev;
|
||||||
if (last == head) {
|
if (last == head) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -630,7 +632,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
throw new NullPointerException("handler");
|
throw new NullPointerException("handler");
|
||||||
}
|
}
|
||||||
|
|
||||||
DefaultChannelHandlerContext ctx = head.next;
|
AbstractChannelHandlerContext ctx = head.next;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
|
|
||||||
if (ctx == null) {
|
if (ctx == null) {
|
||||||
@ -651,7 +653,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
throw new NullPointerException("handlerType");
|
throw new NullPointerException("handlerType");
|
||||||
}
|
}
|
||||||
|
|
||||||
DefaultChannelHandlerContext ctx = head.next;
|
AbstractChannelHandlerContext ctx = head.next;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (ctx == null) {
|
if (ctx == null) {
|
||||||
return null;
|
return null;
|
||||||
@ -666,7 +668,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
@Override
|
@Override
|
||||||
public List<String> names() {
|
public List<String> names() {
|
||||||
List<String> list = new ArrayList<String>();
|
List<String> list = new ArrayList<String>();
|
||||||
DefaultChannelHandlerContext ctx = head.next;
|
AbstractChannelHandlerContext ctx = head.next;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (ctx == null) {
|
if (ctx == null) {
|
||||||
return list;
|
return list;
|
||||||
@ -679,7 +681,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
@Override
|
@Override
|
||||||
public Map<String, ChannelHandler> toMap() {
|
public Map<String, ChannelHandler> toMap() {
|
||||||
Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
|
Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
|
||||||
DefaultChannelHandlerContext ctx = head.next;
|
AbstractChannelHandlerContext ctx = head.next;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (ctx == tail) {
|
if (ctx == tail) {
|
||||||
return map;
|
return map;
|
||||||
@ -702,7 +704,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
StringBuilder buf = new StringBuilder();
|
StringBuilder buf = new StringBuilder();
|
||||||
buf.append(StringUtil.simpleClassName(this));
|
buf.append(StringUtil.simpleClassName(this));
|
||||||
buf.append('{');
|
buf.append('{');
|
||||||
DefaultChannelHandlerContext ctx = head.next;
|
AbstractChannelHandlerContext ctx = head.next;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (ctx == tail) {
|
if (ctx == tail) {
|
||||||
break;
|
break;
|
||||||
@ -899,8 +901,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private DefaultChannelHandlerContext getContextOrDie(String name) {
|
private AbstractChannelHandlerContext getContextOrDie(String name) {
|
||||||
DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) context(name);
|
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
|
||||||
if (ctx == null) {
|
if (ctx == null) {
|
||||||
throw new NoSuchElementException(name);
|
throw new NoSuchElementException(name);
|
||||||
} else {
|
} else {
|
||||||
@ -908,8 +910,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private DefaultChannelHandlerContext getContextOrDie(ChannelHandler handler) {
|
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
|
||||||
DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) context(handler);
|
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
|
||||||
if (ctx == null) {
|
if (ctx == null) {
|
||||||
throw new NoSuchElementException(handler.getClass().getName());
|
throw new NoSuchElementException(handler.getClass().getName());
|
||||||
} else {
|
} else {
|
||||||
@ -917,8 +919,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private DefaultChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
|
private AbstractChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
|
||||||
DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) context(handlerType);
|
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType);
|
||||||
if (ctx == null) {
|
if (ctx == null) {
|
||||||
throw new NoSuchElementException(handlerType.getName());
|
throw new NoSuchElementException(handlerType.getName());
|
||||||
} else {
|
} else {
|
||||||
@ -927,7 +929,18 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// A special catch-all handler that handles both bytes and messages.
|
// A special catch-all handler that handles both bytes and messages.
|
||||||
static final class TailHandler implements ChannelInboundHandler {
|
static final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
|
||||||
|
|
||||||
|
private static final String TAIL_NAME = generateName0(TailContext.class);
|
||||||
|
|
||||||
|
TailContext(DefaultChannelPipeline pipeline) {
|
||||||
|
super(pipeline, null, TAIL_NAME, true, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandler handler() {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
|
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
|
||||||
@ -975,12 +988,20 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
|
||||||
}
|
}
|
||||||
|
|
||||||
static final class HeadHandler implements ChannelOutboundHandler {
|
static final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler {
|
||||||
|
|
||||||
|
private static final String HEAD_NAME = generateName0(HeadContext.class);
|
||||||
|
|
||||||
protected final Unsafe unsafe;
|
protected final Unsafe unsafe;
|
||||||
|
|
||||||
protected HeadHandler(Unsafe unsafe) {
|
HeadContext(DefaultChannelPipeline pipeline) {
|
||||||
this.unsafe = unsafe;
|
super(pipeline, null, HEAD_NAME, false, true);
|
||||||
|
unsafe = pipeline.channel().unsafe();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelHandler handler() {
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -255,7 +255,7 @@ public class DefaultChannelPipelineTest {
|
|||||||
pipeline.addBefore("1", "0", newHandler());
|
pipeline.addBefore("1", "0", newHandler());
|
||||||
pipeline.addAfter("10", "11", newHandler());
|
pipeline.addAfter("10", "11", newHandler());
|
||||||
|
|
||||||
DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) pipeline.firstContext();
|
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) pipeline.firstContext();
|
||||||
assertNotNull(ctx);
|
assertNotNull(ctx);
|
||||||
while (ctx != null) {
|
while (ctx != null) {
|
||||||
int i = toInt(ctx.name());
|
int i = toInt(ctx.name());
|
||||||
@ -535,8 +535,8 @@ public class DefaultChannelPipelineTest {
|
|||||||
assertNull(pipeline.last());
|
assertNull(pipeline.last());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static int next(DefaultChannelHandlerContext ctx) {
|
private static int next(AbstractChannelHandlerContext ctx) {
|
||||||
DefaultChannelHandlerContext next = ctx.next;
|
AbstractChannelHandlerContext next = ctx.next;
|
||||||
if (next == null) {
|
if (next == null) {
|
||||||
return Integer.MAX_VALUE;
|
return Integer.MAX_VALUE;
|
||||||
}
|
}
|
||||||
@ -553,7 +553,7 @@ public class DefaultChannelPipelineTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static void verifyContextNumber(ChannelPipeline pipeline, int expectedNumber) {
|
private static void verifyContextNumber(ChannelPipeline pipeline, int expectedNumber) {
|
||||||
DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) pipeline.firstContext();
|
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) pipeline.firstContext();
|
||||||
int handlerNumber = 0;
|
int handlerNumber = 0;
|
||||||
while (ctx != ((DefaultChannelPipeline) pipeline).tail) {
|
while (ctx != ((DefaultChannelPipeline) pipeline).tail) {
|
||||||
handlerNumber++;
|
handlerNumber++;
|
||||||
|
Loading…
Reference in New Issue
Block a user