[#858] Merge ChannelPipeline.replaceAndForward into replace and removeAndForward into remove

This commit is contained in:
Norman Maurer 2013-04-13 18:19:33 +02:00
parent 4a792151b0
commit d8387fa4c3
7 changed files with 48 additions and 206 deletions

View File

@ -212,12 +212,12 @@ public abstract class WebSocketClientHandshaker {
throw new IllegalStateException("ChannelPipeline does not contain " +
"a HttpRequestEncoder or HttpClientCodec");
}
p.replaceAndForward(ctx.name(), "ws-decoder", newWebsocketDecoder());
p.replace(ctx.name(), "ws-decoder", newWebsocketDecoder());
} else {
if (p.get(HttpRequestEncoder.class) != null) {
p.remove(HttpRequestEncoder.class);
}
p.replaceAndForward(ctx.name(),
p.replace(ctx.name(),
"ws-decoder", newWebsocketDecoder());
}
}

View File

@ -50,7 +50,7 @@ class WebSocketClientProtocolHandshakeHandler extends ChannelInboundMessageHandl
handshaker.finishHandshake(ctx.channel(), msg);
ctx.fireUserEventTriggered(
WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE);
ctx.pipeline().removeAndForward(this);
ctx.pipeline().remove(this);
return;
}
throw new IllegalStateException("WebSocketClientHandshaker should have been non finished yet");

View File

@ -167,9 +167,9 @@ public abstract class WebSocketServerHandshaker {
return;
}
p.addBefore(ctx.name(), "wsencoder", newWebsocketDecoder());
p.replaceAndForward(ctx.name(), "wsdecoder", newWebSocketEncoder());
p.replace(ctx.name(), "wsdecoder", newWebSocketEncoder());
} else {
p.replaceAndForward(ctx.name(), "wsdecoder", newWebsocketDecoder());
p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder());
p.replace(HttpResponseEncoder.class, "wsencoder", newWebSocketEncoder());
}

View File

@ -117,7 +117,7 @@ public class PortUnificationServerHandler extends ChannelInboundByteHandlerAdapt
p.addLast("ssl", new SslHandler(engine));
p.addLast("unificationA", new PortUnificationServerHandler(false, detectGzip));
p.removeAndForward(this);
p.remove(this);
}
private void enableGzip(ChannelHandlerContext ctx) {
@ -125,7 +125,7 @@ public class PortUnificationServerHandler extends ChannelInboundByteHandlerAdapt
p.addLast("gzipdeflater", ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
p.addLast("gzipinflater", ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
p.addLast("unificationB", new PortUnificationServerHandler(detectSsl, false));
p.removeAndForward(this);
p.remove(this);
}
private void switchToHttp(ChannelHandlerContext ctx) {
@ -134,7 +134,7 @@ public class PortUnificationServerHandler extends ChannelInboundByteHandlerAdapt
p.addLast("encoder", new HttpResponseEncoder());
p.addLast("deflater", new HttpContentCompressor());
p.addLast("handler", new HttpSnoopServerHandler());
p.removeAndForward(this);
p.remove(this);
}
private void switchToFactorial(ChannelHandlerContext ctx) {
@ -142,6 +142,6 @@ public class PortUnificationServerHandler extends ChannelInboundByteHandlerAdapt
p.addLast("decoder", new BigIntegerDecoder());
p.addLast("encoder", new NumberEncoder());
p.addLast("handler", new FactorialServerHandler());
p.removeAndForward(this);
p.remove(this);
}
}

View File

@ -382,18 +382,6 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI
*/
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
/**
* Removes the specified {@link ChannelHandler} from this pipeline.
* All the remaining content in the {@link Buf) (if any) of the
* {@link ChannelHandler} will be discarded.
*
* @throws NoSuchElementException
* if there's no such handler in this pipeline
* @throws NullPointerException
* if the specified handler is {@code null}
*/
ChannelPipeline remove(ChannelHandler handler);
/**
* Removes the specified {@link ChannelHandler} from this pipeline
* and transfer the content of its {@link Buf} to the next
@ -406,23 +394,7 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI
* @throws NullPointerException
* if the specified handler is {@code null}
*/
ChannelPipeline removeAndForward(ChannelHandler handler);
/**
* Removes the {@link ChannelHandler} with the specified name from this
* pipeline. All the remaining content in the {@link Buf) (if any) of the
* {@link ChannelHandler} will be discarded.
*
* @param name the name under which the {@link ChannelHandler} was stored.
*
* @return the removed handler
*
* @throws NoSuchElementException
* if there's no such handler with the specified name in this pipeline
* @throws NullPointerException
* if the specified name is {@code null}
*/
ChannelHandler remove(String name);
ChannelPipeline remove(ChannelHandler handler);
/**
* Removes the {@link ChannelHandler} with the specified name from this
@ -438,25 +410,7 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI
* @throws NullPointerException
* if the specified name is {@code null}
*/
ChannelHandler removeAndForward(String name);
/**
* Removes the {@link ChannelHandler} of the specified type from this
* pipeline. All the remaining content in the {@link Buf) (if any) of the {@link ChannelHandler}
* will be discarded.
*
*
* @param <T> the type of the handler
* @param handlerType the type of the handler
*
* @return the removed handler
*
* @throws NoSuchElementException
* if there's no such handler of the specified type in this pipeline
* @throws NullPointerException
* if the specified handler type is {@code null}
*/
<T extends ChannelHandler> T remove(Class<T> handlerType);
ChannelHandler remove(String name);
/**
* Removes the {@link ChannelHandler} of the specified type from this
@ -473,7 +427,7 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI
* @throws NullPointerException
* if the specified handler type is {@code null}
*/
<T extends ChannelHandler> T removeAndForward(Class<T> handlerType);
<T extends ChannelHandler> T remove(Class<T> handlerType);
/**
* Removes the first {@link ChannelHandler} in this pipeline.
@ -501,30 +455,6 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI
*/
ChannelHandler removeLast();
/**
* Replaces the specified {@link ChannelHandler} with a new handler in
* this pipeline.
*
* All the remaining content in the {@link Buf) (if any) of the {@link ChannelHandler}
* will be discarded.
*
* @param oldHandler the {@link ChannelHandler} to be replaced
* @param newName the name under which the replacement should be added
* @param newHandler the {@link ChannelHandler} which is used as replacement
*
* @return itself
*
* @throws NoSuchElementException
* if the specified old handler does not exist in this pipeline
* @throws IllegalArgumentException
* if a handler with the specified new name already exists in this
* pipeline, except for the handler to be replaced
* @throws NullPointerException
* if the specified old handler, new name, or new handler is
* {@code null}
*/
ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
/**
* Replaces the specified {@link ChannelHandler} with a new handler in
* this pipeline and transfer the content of its {@link Buf} to the next
@ -545,31 +475,7 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI
* if the specified old handler, new name, or new handler is
* {@code null}
*/
ChannelPipeline replaceAndForward(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
/**
* Replaces the {@link ChannelHandler} of the specified name with a new
* handler in this pipeline.
*
* All the remaining content of the {@link Buf) (if any) of the to be replaced
* {@link ChannelHandler} will be discarded.
*
* @param oldHandler the {@link ChannelHandler} to be replaced
* @param newName the name under which the replacement should be added
* @param newHandler the {@link ChannelHandler} which is used as replacement
*
* @return the removed handler
*
* @throws NoSuchElementException
* if the handler with the specified old name does not exist in this pipeline
* @throws IllegalArgumentException
* if a handler with the specified new name already exists in this
* pipeline, except for the handler to be replaced
* @throws NullPointerException
* if the specified old handler, new name, or new handler is
* {@code null}
*/
ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);
ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
/**
* Replaces the {@link ChannelHandler} of the specified name with a new
@ -591,32 +497,7 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI
* if the specified old handler, new name, or new handler is
* {@code null}
*/
ChannelHandler replaceAndForward(String oldName, String newName, ChannelHandler newHandler);
/**
* Replaces the {@link ChannelHandler} of the specified type with a new
* handler in this pipeline.
*
* All the remaining content of the {@link Buf) (if any) of the to be replaced
* {@link ChannelHandler} will be discarded.
*
* @param oldHandlerType the type of the handler to be removed
* @param newName the name under which the replacement should be added
* @param newHandler the {@link ChannelHandler} which is used as replacement
*
* @return the removed handler
*
* @throws NoSuchElementException
* if the handler of the specified old handler type does not exist
* in this pipeline
* @throws IllegalArgumentException
* if a handler with the specified new name already exists in this
* pipeline, except for the handler to be replaced
* @throws NullPointerException
* if the specified old handler, new name, or new handler is
* {@code null}
*/
<T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName, ChannelHandler newHandler);
ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);
/**
* Replaces the {@link ChannelHandler} of the specified type with a new
@ -639,7 +520,7 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI
* if the specified old handler, new name, or new handler is
* {@code null}
*/
<T extends ChannelHandler> T replaceAndForward(Class<T> oldHandlerType, String newName,
<T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName,
ChannelHandler newHandler);
/**

View File

@ -401,33 +401,22 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline remove(ChannelHandler handler) {
remove(getContextOrDie(handler), false);
return this;
}
@Override
public ChannelPipeline removeAndForward(ChannelHandler handler) {
remove(getContextOrDie(handler), true);
remove(getContextOrDie(handler));
return this;
}
@Override
public ChannelHandler remove(String name) {
return remove(getContextOrDie(name), false).handler();
}
@Override
public ChannelHandler removeAndForward(String name) {
return remove(getContextOrDie(name), true).handler();
return remove(getContextOrDie(name)).handler();
}
@SuppressWarnings("unchecked")
@Override
public <T extends ChannelHandler> T remove(Class<T> handlerType) {
return (T) remove(getContextOrDie(handlerType), false).handler();
return (T) remove(getContextOrDie(handlerType)).handler();
}
private DefaultChannelHandlerContext remove(final DefaultChannelHandlerContext ctx, final boolean forward) {
private DefaultChannelHandlerContext remove(final DefaultChannelHandlerContext ctx) {
assert ctx != head && ctx != tail;
DefaultChannelHandlerContext context;
@ -435,14 +424,14 @@ final class DefaultChannelPipeline implements ChannelPipeline {
synchronized (this) {
if (!ctx.channel().isRegistered() || ctx.executor().inEventLoop()) {
remove0(ctx, forward);
remove0(ctx);
return ctx;
} else {
future = ctx.executor().submit(new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
remove0(ctx, forward);
remove0(ctx);
}
}
});
@ -458,13 +447,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return context;
}
@SuppressWarnings("unchecked")
@Override
public <T extends ChannelHandler> T removeAndForward(Class<T> handlerType) {
return (T) remove(getContextOrDie(handlerType), true).handler();
}
private void remove0(DefaultChannelHandlerContext ctx, boolean forward) {
private void remove0(DefaultChannelHandlerContext ctx) {
callBeforeRemove(ctx);
DefaultChannelHandlerContext prev = ctx.prev;
@ -473,7 +456,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
next.prev = prev;
name2ctx.remove(ctx.name());
callAfterRemove(ctx, prev, next, forward);
callAfterRemove(ctx, prev, next);
}
@Override
@ -481,7 +464,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
if (head.next == tail) {
throw new NoSuchElementException();
}
return remove(head.next, false).handler();
return remove(head.next).handler();
}
@Override
@ -489,41 +472,30 @@ final class DefaultChannelPipeline implements ChannelPipeline {
if (head.next == tail) {
throw new NoSuchElementException();
}
return remove(tail.prev, false).handler();
return remove(tail.prev).handler();
}
@Override
public ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
replace(getContextOrDie(oldHandler), newName, newHandler, false);
return this;
}
@Override
public ChannelPipeline replaceAndForward(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
replace(getContextOrDie(oldHandler), newName, newHandler, true);
replace(getContextOrDie(oldHandler), newName, newHandler);
return this;
}
@Override
public ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
return replace(getContextOrDie(oldName), newName, newHandler, false);
}
@Override
public ChannelHandler replaceAndForward(String oldName, String newName, ChannelHandler newHandler) {
return replace(getContextOrDie(oldName), newName, newHandler, true);
return replace(getContextOrDie(oldName), newName, newHandler);
}
@Override
@SuppressWarnings("unchecked")
public <T extends ChannelHandler> T replace(
Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler, false);
return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
}
private ChannelHandler replace(
final DefaultChannelHandlerContext ctx, final String newName,
ChannelHandler newHandler, final boolean forward) {
ChannelHandler newHandler) {
assert ctx != head && ctx != tail;
@ -538,7 +510,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
new DefaultChannelHandlerContext(this, ctx.executor, newName, newHandler);
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
replace0(ctx, newName, newCtx, forward);
replace0(ctx, newName, newCtx);
return ctx.handler();
} else {
@ -546,7 +518,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
replace0(ctx, newName, newCtx, forward);
replace0(ctx, newName, newCtx);
}
}
});
@ -561,15 +533,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return ctx.handler();
}
@SuppressWarnings("unchecked")
@Override
public <T extends ChannelHandler> T replaceAndForward(
Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler, true);
}
private void replace0(DefaultChannelHandlerContext ctx, String newName,
DefaultChannelHandlerContext newCtx, boolean forward) {
DefaultChannelHandlerContext newCtx) {
boolean sameName = ctx.name().equals(newName);
DefaultChannelHandlerContext prev = ctx.prev;
@ -592,7 +557,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
ChannelPipelineException addException = null;
boolean removed = false;
try {
callAfterRemove(ctx, newCtx, newCtx, forward);
callAfterRemove(ctx, newCtx, newCtx);
removed = true;
} catch (ChannelPipelineException e) {
removeException = e;
@ -646,7 +611,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} catch (Throwable t) {
boolean removed = false;
try {
remove((DefaultChannelHandlerContext) ctx, false);
remove((DefaultChannelHandlerContext) ctx);
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
@ -678,7 +643,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
private static void callAfterRemove(
final DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext ctxPrev,
DefaultChannelHandlerContext ctxNext, boolean forward) {
DefaultChannelHandlerContext ctxNext) {
final ChannelHandler handler = ctx.handler();
@ -691,11 +656,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
".afterRemove() has thrown an exception.", t);
}
if (forward) {
ctx.forwardBufferContent(ctxPrev, ctxNext);
} else {
ctx.clearBuffer();
}
ctx.forwardBufferContent(ctxPrev, ctxNext);
ctx.setRemoved();
}

View File

@ -329,7 +329,7 @@ public class DefaultChannelPipelineTest {
pipeline.context(handler1).inboundByteBuffer().writeLong(8);
assertEquals(8, pipeline.context(handler1).inboundByteBuffer().readableBytes());
assertEquals(0, pipeline.context(handler2).inboundByteBuffer().readableBytes());
pipeline.removeAndForward(handler1);
pipeline.remove(handler1);
assertEquals(8, pipeline.context(handler2).inboundByteBuffer().readableBytes());
latch.countDown();
}
@ -355,7 +355,7 @@ public class DefaultChannelPipelineTest {
public void run() {
pipeline.context(handler1).inboundByteBuffer().writeLong(8);
assertEquals(8, pipeline.context(handler1).inboundByteBuffer().readableBytes());
pipeline.replaceAndForward(handler1, "handler2", handler2);
pipeline.replace(handler1, "handler2", handler2);
assertEquals(8, pipeline.context(handler2).inboundByteBuffer().readableBytes());
latch.countDown();
}
@ -383,7 +383,7 @@ public class DefaultChannelPipelineTest {
pipeline.context(handler2).outboundByteBuffer().writeLong(8);
assertEquals(8, pipeline.context(handler2).outboundByteBuffer().readableBytes());
assertEquals(0, pipeline.context(handler1).outboundByteBuffer().readableBytes());
pipeline.removeAndForward(handler2);
pipeline.remove(handler2);
assertEquals(8, pipeline.context(handler1).outboundByteBuffer().readableBytes());
latch.countDown();
}
@ -409,7 +409,7 @@ public class DefaultChannelPipelineTest {
public void run() {
pipeline.context(handler1).outboundByteBuffer().writeLong(8);
assertEquals(8, pipeline.context(handler1).outboundByteBuffer().readableBytes());
pipeline.replaceAndForward(handler1, "handler2", handler2);
pipeline.replace(handler1, "handler2", handler2);
assertEquals(8, pipeline.context(handler2).outboundByteBuffer().readableBytes());
latch.countDown();
}
@ -439,7 +439,7 @@ public class DefaultChannelPipelineTest {
assertEquals(8, pipeline.context(handler1).outboundByteBuffer().readableBytes());
assertEquals(8, pipeline.context(handler1).inboundByteBuffer().readableBytes());
pipeline.replaceAndForward(handler1, "handler2", handler2);
pipeline.replace(handler1, "handler2", handler2);
assertEquals(8, pipeline.context(handler2).outboundByteBuffer().readableBytes());
assertEquals(8, pipeline.context(handler2).inboundByteBuffer().readableBytes());
@ -479,7 +479,7 @@ public class DefaultChannelPipelineTest {
assertEquals(0, pipeline.context(handler1).outboundByteBuffer().readableBytes());
assertEquals(0, pipeline.context(handler3).inboundByteBuffer().readableBytes());
pipeline.removeAndForward(handler2);
pipeline.remove(handler2);
assertEquals(8, pipeline.context(handler1).outboundByteBuffer().readableBytes());
assertEquals(8, pipeline.context(handler3).inboundByteBuffer().readableBytes());
latch.countDown();
@ -509,7 +509,7 @@ public class DefaultChannelPipelineTest {
pipeline.context(handler1).inboundMessageBuffer().add(new Object());
assertEquals(1, pipeline.context(handler1).inboundMessageBuffer().size());
assertEquals(0, pipeline.context(handler2).inboundMessageBuffer().size());
pipeline.removeAndForward(handler1);
pipeline.remove(handler1);
assertEquals(1, pipeline.context(handler2).inboundMessageBuffer().size());
latch.countDown();
}
@ -535,7 +535,7 @@ public class DefaultChannelPipelineTest {
public void run() {
pipeline.context(handler1).inboundMessageBuffer().add(new Object());
assertEquals(1, pipeline.context(handler1).inboundMessageBuffer().size());
pipeline.replaceAndForward(handler1, "handler2", handler2);
pipeline.replace(handler1, "handler2", handler2);
assertEquals(1, pipeline.context(handler2).inboundMessageBuffer().size());
latch.countDown();
}
@ -563,7 +563,7 @@ public class DefaultChannelPipelineTest {
pipeline.context(handler2).outboundMessageBuffer().add(new Object());
assertEquals(1, pipeline.context(handler2).outboundMessageBuffer().size());
assertEquals(0, pipeline.context(handler1).outboundMessageBuffer().size());
pipeline.removeAndForward(handler2);
pipeline.remove(handler2);
assertEquals(1, pipeline.context(handler1).outboundMessageBuffer().size());
latch.countDown();
}
@ -589,7 +589,7 @@ public class DefaultChannelPipelineTest {
public void run() {
pipeline.context(handler1).outboundMessageBuffer().add(new Object());
assertEquals(1, pipeline.context(handler1).outboundMessageBuffer().size());
pipeline.replaceAndForward(handler1, "handler2", handler2);
pipeline.replace(handler1, "handler2", handler2);
assertEquals(1, pipeline.context(handler2).outboundMessageBuffer().size());
latch.countDown();
}
@ -619,7 +619,7 @@ public class DefaultChannelPipelineTest {
assertEquals(1, pipeline.context(handler1).outboundMessageBuffer().size());
assertEquals(1, pipeline.context(handler1).inboundMessageBuffer().size());
pipeline.replaceAndForward(handler1, "handler2", handler2);
pipeline.replace(handler1, "handler2", handler2);
assertEquals(1, pipeline.context(handler2).outboundMessageBuffer().size());
assertEquals(1, pipeline.context(handler2).inboundMessageBuffer().size());
@ -692,7 +692,7 @@ public class DefaultChannelPipelineTest {
assertEquals(0, pipeline.context(handler1).outboundMessageBuffer().size());
assertEquals(0, pipeline.context(handler3).inboundMessageBuffer().size());
pipeline.removeAndForward(handler2);
pipeline.remove(handler2);
assertEquals(1, pipeline.context(handler1).outboundMessageBuffer().size());
assertEquals(1, pipeline.context(handler3).inboundMessageBuffer().size());
latch.countDown();