Cleanup DefaultChannelPipeline implementation (#8811)

Motivation:

The DefaultChannelPipeline implementation can be cleaned up a bit and so we can remove the need for AbstractChannelHandlerContext all together.

Modifications:

- Merge DefautChannelHandlerContext and AbstractChannelHandlerContext
- Remove some unnecessary fields
- Some other minor cleanup

Result:

Cleaner code.
This commit is contained in:
Norman Maurer 2019-01-31 07:19:00 +01:00 committed by GitHub
parent b7ceeb1797
commit c193001696
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 1270 additions and 1396 deletions

View File

@ -15,7 +15,6 @@
*/
package io.netty.channel;
import io.netty.channel.Channel.Unsafe;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.concurrent.EventExecutor;
@ -45,10 +44,11 @@ import java.util.function.Predicate;
*/
public class DefaultChannelPipeline implements ChannelPipeline {
static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
private static final String HEAD_NAME = generateName0(HeadContext.class);
private static final String TAIL_NAME = generateName0(TailContext.class);
private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
private static final String HEAD_NAME = generateName0(HeadHandler.class);
private static final String TAIL_NAME = generateName0(TailHandler.class);
private static final ChannelHandler HEAD_HANDLER = new HeadHandler();
private static final ChannelHandler TAIL_HANDLER = new TailHandler();
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
new FastThreadLocal<Map<Class<?>, String>>() {
@ -61,14 +61,13 @@ public class DefaultChannelPipeline implements ChannelPipeline {
private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
AtomicReferenceFieldUpdater.newUpdater(
DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
private final DefaultChannelHandlerContext head;
private final DefaultChannelHandlerContext tail;
private final Channel channel;
private final ChannelFuture succeededFuture;
private final VoidChannelPromise voidPromise;
private final boolean touch = ResourceLeakDetector.isEnabled();
private final List<AbstractChannelHandlerContext> handlers = new ArrayList<>(4);
private final List<DefaultChannelHandlerContext> handlers = new ArrayList<>(4);
private volatile MessageSizeEstimator.Handle estimatorHandle;
@ -77,11 +76,13 @@ public class DefaultChannelPipeline implements ChannelPipeline {
succeededFuture = new SucceededChannelFuture(channel, channel.eventLoop());
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
tail = new DefaultChannelHandlerContext(this, TAIL_NAME, TAIL_HANDLER);
head = new DefaultChannelHandlerContext(this, HEAD_NAME, HEAD_HANDLER);
head.next = tail;
tail.prev = head;
head.setAddComplete();
tail.setAddComplete();
}
final MessageSizeEstimator.Handle estimatorHandle() {
@ -95,11 +96,11 @@ public class DefaultChannelPipeline implements ChannelPipeline {
return handle;
}
final Object touch(Object msg, AbstractChannelHandlerContext next) {
final Object touch(Object msg, DefaultChannelHandlerContext next) {
return touch ? ReferenceCountUtil.touch(msg, next) : msg;
}
private AbstractChannelHandlerContext newContext(String name, ChannelHandler handler) {
private DefaultChannelHandlerContext newContext(String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, name, handler);
}
@ -120,7 +121,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
name = generateName(handler);
}
AbstractChannelHandlerContext newCtx = newContext(name, handler);
DefaultChannelHandlerContext newCtx = newContext(name, handler);
EventExecutor executor = executor();
boolean inEventLoop = executor.inEventLoop();
synchronized (handlers) {
@ -143,8 +144,8 @@ public class DefaultChannelPipeline implements ChannelPipeline {
return this;
}
private void addFirst0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext nextCtx = head.next;
private void addFirst0(DefaultChannelHandlerContext newCtx) {
DefaultChannelHandlerContext nextCtx = head.next;
newCtx.prev = head;
newCtx.next = nextCtx;
head.next = newCtx;
@ -159,7 +160,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
name = generateName(handler);
}
AbstractChannelHandlerContext newCtx = newContext(name, handler);
DefaultChannelHandlerContext newCtx = newContext(name, handler);
EventExecutor executor = executor();
boolean inEventLoop = executor.inEventLoop();
synchronized (this) {
@ -182,8 +183,8 @@ public class DefaultChannelPipeline implements ChannelPipeline {
return this;
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
private void addLast0(DefaultChannelHandlerContext newCtx) {
DefaultChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
@ -193,14 +194,14 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext ctx;
final DefaultChannelHandlerContext ctx;
checkMultiplicity(handler);
if (name == null) {
name = generateName(handler);
}
AbstractChannelHandlerContext newCtx = newContext(name, handler);
DefaultChannelHandlerContext newCtx = newContext(name, handler);
EventExecutor executor = executor();
boolean inEventLoop = executor.inEventLoop();
synchronized (handlers) {
@ -230,7 +231,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
return this;
}
private void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
private void addBefore0(DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext newCtx) {
newCtx.prev = ctx.prev;
newCtx.next = ctx;
ctx.prev.next = newCtx;
@ -240,14 +241,14 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext ctx;
final DefaultChannelHandlerContext ctx;
checkMultiplicity(handler);
if (name == null) {
name = generateName(handler);
}
AbstractChannelHandlerContext newCtx = newContext(name, handler);
DefaultChannelHandlerContext newCtx = newContext(name, handler);
EventExecutor executor = executor();
boolean inEventLoop = executor.inEventLoop();
synchronized (handlers) {
@ -277,7 +278,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
return this;
}
private void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
private void addAfter0(DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext newCtx) {
newCtx.prev = ctx;
newCtx.next = ctx.next;
ctx.next.prev = newCtx;
@ -364,7 +365,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
return StringUtil.simpleClassName(handlerType) + "#0";
}
private int findCtxIdx(Predicate<AbstractChannelHandlerContext> predicate) {
private int findCtxIdx(Predicate<DefaultChannelHandlerContext> predicate) {
for (int i = 0; i < handlers.size(); i++) {
if (predicate.test(handlers.get(i))) {
return i;
@ -375,7 +376,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline remove(ChannelHandler handler) {
final AbstractChannelHandlerContext ctx;
final DefaultChannelHandlerContext ctx;
EventExecutor executor = executor();
boolean inEventLoop = executor.inEventLoop();
synchronized (handlers) {
@ -403,7 +404,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelHandler remove(String name) {
final AbstractChannelHandlerContext ctx;
final DefaultChannelHandlerContext ctx;
EventExecutor executor = executor();
boolean inEventLoop = executor.inEventLoop();
synchronized (handlers) {
@ -432,7 +433,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@SuppressWarnings("unchecked")
@Override
public final <T extends ChannelHandler> T remove(Class<T> handlerType) {
final AbstractChannelHandlerContext ctx;
final DefaultChannelHandlerContext ctx;
EventExecutor executor = executor();
boolean inEventLoop = executor.inEventLoop();
synchronized (handlers) {
@ -473,7 +474,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@SuppressWarnings("unchecked")
private <T extends ChannelHandler> T removeIfExists(IntSupplier idxSupplier) {
final AbstractChannelHandlerContext ctx;
final DefaultChannelHandlerContext ctx;
EventExecutor executor = executor();
boolean inEventLoop = executor.inEventLoop();
synchronized (handlers) {
@ -498,15 +499,15 @@ public class DefaultChannelPipeline implements ChannelPipeline {
return (T) ctx.handler();
}
private void unlink(AbstractChannelHandlerContext ctx) {
private void unlink(DefaultChannelHandlerContext ctx) {
assert ctx != head && ctx != tail;
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
DefaultChannelHandlerContext prev = ctx.prev;
DefaultChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
}
private void remove0(AbstractChannelHandlerContext ctx) {
private void remove0(DefaultChannelHandlerContext ctx) {
unlink(ctx);
callHandlerRemoved0(ctx);
}
@ -530,14 +531,14 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
private ChannelHandler replace(
Predicate<AbstractChannelHandlerContext> predicate, String newName, ChannelHandler newHandler) {
Predicate<DefaultChannelHandlerContext> predicate, String newName, ChannelHandler newHandler) {
checkMultiplicity(newHandler);
if (newName == null) {
newName = generateName(newHandler);
}
AbstractChannelHandlerContext oldCtx;
AbstractChannelHandlerContext newCtx = newContext(newName, newHandler);
DefaultChannelHandlerContext oldCtx;
DefaultChannelHandlerContext newCtx = newContext(newName, newHandler);
EventExecutor executor = executor();
boolean inEventLoop = executor.inEventLoop();
synchronized (handlers) {
@ -553,7 +554,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
throw new IllegalArgumentException("Duplicate handler name: " + newName);
}
}
AbstractChannelHandlerContext removed = handlers.set(idx, newCtx);
DefaultChannelHandlerContext removed = handlers.set(idx, newCtx);
assert removed != null;
if (!inEventLoop) {
@ -571,9 +572,9 @@ public class DefaultChannelPipeline implements ChannelPipeline {
return oldCtx.handler();
}
private void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = oldCtx.prev;
AbstractChannelHandlerContext next = oldCtx.next;
private void replace0(DefaultChannelHandlerContext oldCtx, DefaultChannelHandlerContext newCtx) {
DefaultChannelHandlerContext prev = oldCtx.prev;
DefaultChannelHandlerContext next = oldCtx.next;
newCtx.prev = prev;
newCtx.next = next;
@ -607,7 +608,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
}
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
private void callHandlerAdded0(final DefaultChannelHandlerContext ctx) {
try {
ctx.callHandlerAdded();
} catch (Throwable t) {
@ -639,7 +640,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
}
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
private void callHandlerRemoved0(final DefaultChannelHandlerContext ctx) {
// Notify the complete removal.
try {
ctx.callHandlerRemoved();
@ -662,9 +663,9 @@ public class DefaultChannelPipeline implements ChannelPipeline {
return ctx == null ? null : (T) ctx.handler();
}
private AbstractChannelHandlerContext findCtx(Predicate<AbstractChannelHandlerContext> predicate) {
private DefaultChannelHandlerContext findCtx(Predicate<DefaultChannelHandlerContext> predicate) {
for (int i = 0; i < handlers.size(); i++) {
AbstractChannelHandlerContext ctx = handlers.get(i);
DefaultChannelHandlerContext ctx = handlers.get(i);
if (predicate.test(ctx)) {
return ctx;
}
@ -726,7 +727,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
synchronized (handlers) {
if (!handlers.isEmpty()) {
for (int i = 0; i < handlers.size(); i++) {
AbstractChannelHandlerContext ctx = handlers.get(i);
DefaultChannelHandlerContext ctx = handlers.get(i);
buf.append('(')
.append(ctx.name())
@ -835,7 +836,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
private void destroy0() {
assert executor().inEventLoop();
AbstractChannelHandlerContext ctx = this.tail.prev;
DefaultChannelHandlerContext ctx = this.tail.prev;
while (ctx != head) {
synchronized (handlers) {
handlers.remove(ctx);
@ -1102,208 +1103,115 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
// A special catch-all handler that handles both bytes and messages.
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
private static final class TailHandler implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, TAIL_NAME, TailContext.class);
setAddComplete();
@Override
public void channelRegistered(ChannelHandlerContext ctx) { }
@Override
public void channelUnregistered(ChannelHandlerContext ctx) { }
@Override
public void channelActive(ChannelHandlerContext ctx) {
((DefaultChannelPipeline) ctx.pipeline()).onUnhandledInboundChannelActive();
}
@Override
public EventExecutor executor() {
return pipeline().executor();
public void channelInactive(ChannelHandlerContext ctx) {
((DefaultChannelPipeline) ctx.pipeline()).onUnhandledInboundChannelInactive();
}
@Override
public ChannelHandler handler() {
return this;
public void channelWritabilityChanged(ChannelHandlerContext ctx) {
((DefaultChannelPipeline) ctx.pipeline()).onUnhandledChannelWritabilityChanged();
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
public void handlerAdded(ChannelHandlerContext ctx) { }
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }
public void handlerRemoved(ChannelHandlerContext ctx) { }
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
onUnhandledInboundChannelActive();
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
((DefaultChannelPipeline) ctx.pipeline()).onUnhandledInboundUserEventTriggered(evt);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
onUnhandledInboundChannelInactive();
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
((DefaultChannelPipeline) ctx.pipeline()).onUnhandledInboundException(cause);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
onUnhandledChannelWritabilityChanged();
public void channelRead(ChannelHandlerContext ctx, Object msg) {
((DefaultChannelPipeline) ctx.pipeline()).onUnhandledInboundMessage(msg);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
onUnhandledInboundUserEventTriggered(evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
onUnhandledInboundException(cause);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
onUnhandledInboundMessage(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
onUnhandledInboundChannelReadComplete();
public void channelReadComplete(ChannelHandlerContext ctx) {
((DefaultChannelPipeline) ctx.pipeline()).onUnhandledInboundChannelReadComplete();
}
}
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, HEAD_NAME, HeadContext.class);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
@Override
public EventExecutor executor() {
return pipeline().executor();
}
@Override
public ChannelHandler handler() {
return this;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
private static final class HeadHandler extends ChannelDuplexHandler {
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
ctx.channel().unsafe().bind(localAddress, promise);
}
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
ChannelPromise promise) {
ctx.channel().unsafe().connect(remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.disconnect(promise);
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
ctx.channel().unsafe().disconnect(promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.close(promise);
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
ctx.channel().unsafe().close(promise);
}
@Override
public void register(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.register(promise);
public void register(ChannelHandlerContext ctx, ChannelPromise promise) {
ctx.channel().unsafe().register(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.deregister(promise);
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
ctx.channel().unsafe().deregister(promise);
}
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
ctx.channel().unsafe().beginRead();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ctx.channel().unsafe().write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
@Skip
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
@Skip
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
public void flush(ChannelHandlerContext ctx) {
ctx.channel().unsafe().flush();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
public void channelUnregistered(ChannelHandlerContext ctx) {
ctx.fireChannelUnregistered();
// Remove all handlers sequentially if channel is closed and unregistered.
if (!channel.isOpen()) {
destroy();
if (!ctx.channel().isOpen()) {
((DefaultChannelPipeline) ctx.pipeline()).destroy();
}
}
@Skip
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
@Skip
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
@Skip
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
@Skip
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
@Skip
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
@Skip
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
}
}

View File

@ -331,7 +331,7 @@ public class DefaultChannelPipelineTest {
pipeline.addBefore("1", "0", newHandler());
pipeline.addAfter("10", "11", newHandler());
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) pipeline.firstContext();
DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) pipeline.firstContext();
assertNotNull(ctx);
while (ctx != null) {
int i = toInt(ctx.name());
@ -1583,8 +1583,8 @@ public class DefaultChannelPipelineTest {
}
}
private static int next(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext next = ctx.next;
private static int next(DefaultChannelHandlerContext ctx) {
DefaultChannelHandlerContext next = ctx.next;
if (next == null) {
return Integer.MAX_VALUE;
}
@ -1607,11 +1607,16 @@ public class DefaultChannelPipelineTest {
pipeline.executor().submit(new Runnable() {
@Override
public void run() {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) pipeline.firstContext();
DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) pipeline.firstContext();
int handlerNumber = 0;
while (ctx != ((DefaultChannelPipeline) pipeline).tail) {
handlerNumber++;
ctx = ctx.next;
if (ctx != null) {
for (;;) {
handlerNumber++;
if (ctx == pipeline.lastContext()) {
break;
}
ctx = ctx.next;
}
}
assertEquals(expectedNumber, handlerNumber);
}