diff --git a/codec/src/main/java/io/netty/handler/codec/base64/Base64Encoder.java b/codec/src/main/java/io/netty/handler/codec/base64/Base64Encoder.java index 31c420a8d0..b79b0fbe04 100644 --- a/codec/src/main/java/io/netty/handler/codec/base64/Base64Encoder.java +++ b/codec/src/main/java/io/netty/handler/codec/base64/Base64Encoder.java @@ -18,10 +18,11 @@ package io.netty.handler.codec.base64; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelHandlerUtil; +import io.netty.channel.ChannelOutboundMessageHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; -import io.netty.handler.codec.MessageToMessageEncoder; /** * Encodes a {@link ByteBuf} into a Base64-encoded {@link ByteBuf}. @@ -38,7 +39,7 @@ import io.netty.handler.codec.MessageToMessageEncoder; * */ @Sharable -public class Base64Encoder extends MessageToMessageEncoder { +public class Base64Encoder extends ChannelOutboundMessageHandlerAdapter { private final boolean breakLines; private final Base64Dialect dialect; @@ -61,8 +62,9 @@ public class Base64Encoder extends MessageToMessageEncoder { } @Override - protected Object encode(ChannelHandlerContext ctx, + public void flush(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { - return Base64.encode(msg, msg.readerIndex(), msg.readableBytes(), breakLines, dialect); + ByteBuf buf = Base64.encode(msg, msg.readerIndex(), msg.readableBytes(), breakLines, dialect); + ChannelHandlerUtil.addToNextOutboundBuffer(ctx, buf); } } diff --git a/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayEncoder.java b/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayEncoder.java index e954f88dd1..14470abccf 100644 --- a/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayEncoder.java @@ -15,7 +15,6 @@ */ package io.netty.handler.codec.bytes; -import io.netty.buffer.BufType; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; @@ -50,22 +49,13 @@ import io.netty.handler.codec.LengthFieldPrepender; */ public class ByteArrayEncoder extends ChannelOutboundMessageHandlerAdapter { - private final BufType nextBufferType; - - public ByteArrayEncoder(BufType nextBufferType) { - if (nextBufferType == null) { - throw new NullPointerException("nextBufferType"); - } - this.nextBufferType = nextBufferType; - } - @Override public void flush(ChannelHandlerContext ctx, byte[] msg) throws Exception { if (msg.length == 0) { return; } - switch (nextBufferType) { + switch (ctx.nextOutboundBufferType()) { case BYTE: ctx.nextOutboundByteBuffer().writeBytes(msg); break; diff --git a/codec/src/test/java/io/netty/handler/codec/bytes/ByteArrayEncoderTest.java b/codec/src/test/java/io/netty/handler/codec/bytes/ByteArrayEncoderTest.java index d1ed582cef..313f36686f 100644 --- a/codec/src/test/java/io/netty/handler/codec/bytes/ByteArrayEncoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/bytes/ByteArrayEncoderTest.java @@ -15,7 +15,6 @@ */ package io.netty.handler.codec.bytes; -import io.netty.buffer.BufType; import io.netty.buffer.ByteBuf; import io.netty.channel.embedded.EmbeddedMessageChannel; import org.junit.Before; @@ -35,7 +34,7 @@ public class ByteArrayEncoderTest { @Before public void setUp() { - ch = new EmbeddedMessageChannel(new ByteArrayEncoder(BufType.MESSAGE)); + ch = new EmbeddedMessageChannel(new ByteArrayEncoder()); } @Test diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java index 90fe10b1b9..2d2eb478fa 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java @@ -24,6 +24,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelOutboundMessageHandler; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; @@ -308,7 +309,7 @@ public class ChunkedWriteHandler }); } } else { - ctx.nextOutboundMessageBuffer().add(currentEvent); + ChannelHandlerUtil.addToNextOutboundBuffer(ctx, currentEvent); this.currentEvent = null; } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java index 43438a3574..61ae81ddac 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java @@ -17,6 +17,7 @@ package io.netty.testsuite.transport.socket; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.BufUtil; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; @@ -25,8 +26,13 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundByteHandlerAdapter; import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOperationHandlerAdapter; +import io.netty.channel.ChannelOutboundMessageHandlerAdapter; +import io.netty.channel.ChannelPromise; import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.ByteToByteEncoder; import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.testsuite.util.BogusSslContextFactory; import org.junit.Test; @@ -54,6 +60,20 @@ public class SocketSslEchoTest extends AbstractSocketTest { } public void testSslEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable { + testSslEcho0(sb, cb, false); + } + + + @Test + public void testSslEchoWithChunkHandler() throws Throwable { + run(); + } + + public void testSslEchoWithChunkHandler(ServerBootstrap sb, Bootstrap cb) throws Throwable { + testSslEcho0(sb, cb, true); + } + + private void testSslEcho0(ServerBootstrap sb, Bootstrap cb, final boolean chunkWriteHandler) throws Throwable { final EchoHandler sh = new EchoHandler(true); final EchoHandler ch = new EchoHandler(false); @@ -66,6 +86,9 @@ public class SocketSslEchoTest extends AbstractSocketTest { @Override public void initChannel(SocketChannel sch) throws Exception { sch.pipeline().addFirst("ssl", new SslHandler(sse)); + if (chunkWriteHandler) { + sch.pipeline().addLast(new ChunkedWriteHandler()); + } sch.pipeline().addLast("handler", sh); } }); @@ -74,6 +97,9 @@ public class SocketSslEchoTest extends AbstractSocketTest { @Override public void initChannel(SocketChannel sch) throws Exception { sch.pipeline().addFirst("ssl", new SslHandler(cse)); + if (chunkWriteHandler) { + sch.pipeline().addLast(new ChunkedWriteHandler()); + } sch.pipeline().addLast("handler", ch); } }); @@ -81,15 +107,9 @@ public class SocketSslEchoTest extends AbstractSocketTest { Channel sc = sb.bind().sync().channel(); Channel cc = cb.connect().sync().channel(); ChannelFuture hf = cc.pipeline().get(SslHandler.class).handshake(); - final ChannelFuture firstByteWriteFuture = - cc.write(Unpooled.wrappedBuffer(data, 0, FIRST_MESSAGE_SIZE)); + cc.write(Unpooled.wrappedBuffer(data, 0, FIRST_MESSAGE_SIZE)); final AtomicBoolean firstByteWriteFutureDone = new AtomicBoolean(); - hf.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - firstByteWriteFutureDone.set(firstByteWriteFuture.isDone()); - } - }); + hf.sync(); assertFalse(firstByteWriteFutureDone.get()); diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java index 18b34b6691..85176854f4 100755 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java @@ -16,6 +16,7 @@ package io.netty.channel; +import io.netty.buffer.BufType; import io.netty.buffer.ByteBuf; import io.netty.buffer.MessageBuf; import io.netty.util.Attribute; @@ -225,25 +226,35 @@ public interface ChannelHandlerContext MessageBuf outboundMessageBuffer(); /** - * Return the {@link ByteBuf} of the next {@link ChannelHandlerContext}. + * Return the {@link ByteBuf} of the next {@link ChannelInboundByteHandler} in the pipeline. */ ByteBuf nextInboundByteBuffer(); /** - * Return the {@link MessageBuf} of the next {@link ChannelHandlerContext}. + * Return the {@link MessageBuf} of the next {@link ChannelInboundMessageHandler} in the pipeline. */ MessageBuf nextInboundMessageBuffer(); /** - * Return the {@link ByteBuf} of the next {@link ChannelHandlerContext}. + * Return the {@link ByteBuf} of the next {@link ChannelOutboundByteHandler} in the pipeline. */ ByteBuf nextOutboundByteBuffer(); /** - * Return the {@link MessageBuf} of the next {@link ChannelHandlerContext}. + * Return the {@link MessageBuf} of the next {@link ChannelOutboundMessageHandler} in the pipeline. */ MessageBuf nextOutboundMessageBuffer(); + /** + * Return the {@link BufType} of the next {@link ChannelInboundHandler} in the pipeline. + */ + BufType nextInboundBufferType(); + + /** + * Return the {@link BufType} of the next {@link ChannelOutboundHandler} in the pipeline. + */ + BufType nextOutboundBufferType(); + @Override ChannelHandlerContext fireChannelRegistered(); diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java b/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java index 6693705f2c..ca68e6cc8c 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java @@ -16,6 +16,7 @@ package io.netty.channel; +import io.netty.buffer.BufType; import io.netty.buffer.BufUtil; import io.netty.buffer.ByteBuf; import io.netty.buffer.MessageBuf; @@ -84,8 +85,6 @@ public final class ChannelHandlerUtil { SingleOutboundMessageHandler handler) throws Exception { MessageBuf in = ctx.outboundMessageBuffer(); - MessageBuf out = null; - final int inSize = in.size(); if (inSize == 0) { ctx.flush(promise); @@ -102,10 +101,7 @@ public final class ChannelHandlerUtil { } if (!handler.acceptOutboundMessage(msg)) { - if (out == null) { - out = ctx.nextOutboundMessageBuffer(); - } - out.add(msg); + addToNextOutboundBuffer(ctx, msg); processed ++; continue; } @@ -205,6 +201,35 @@ public final class ChannelHandlerUtil { throw new IllegalStateException(); } } + + /** + * Add the msg to the next outbound buffer in the {@link ChannelPipeline}. This takes special care of + * msgs that are of type {@link ByteBuf}. + */ + public static boolean addToNextOutboundBuffer(ChannelHandlerContext ctx, Object msg) { + if (msg instanceof ByteBuf) { + if (ctx.nextOutboundBufferType() == BufType.BYTE) { + ctx.nextOutboundByteBuffer().writeBytes((ByteBuf) msg); + return true; + } + } + return ctx.nextOutboundMessageBuffer().add(msg); + } + + /** + * Add the msg to the next inbound buffer in the {@link ChannelPipeline}. This takes special care of + * msgs that are of type {@link ByteBuf}. + */ + public static boolean addToNextInboundBuffer(ChannelHandlerContext ctx, Object msg) { + if (msg instanceof ByteBuf) { + if (ctx.nextInboundBufferType() == BufType.BYTE) { + ctx.nextInboundByteBuffer().writeBytes((ByteBuf) msg); + return true; + } + } + return ctx.nextInboundMessageBuffer().add(msg); + } + private ChannelHandlerUtil() { } public interface SingleInboundMessageHandler { diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandler.java b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandler.java index 5217fdb958..9b55ceb8b9 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandler.java @@ -15,10 +15,15 @@ */ package io.netty.channel; +import io.netty.buffer.ByteBuf; import io.netty.buffer.MessageBuf; /** * Special {@link ChannelInboundHandler} which store the inbound data in a {@link MessageBuf} for futher processing. + * + * If your {@link ChannelOutboundMessageHandler} handles messages of type {@link ByteBuf} or {@link Object} + * and you want to add a {@link ByteBuf} to the next buffer in the {@link ChannelPipeline} use + * {@link ChannelHandlerUtil#addToNextInboundBuffer(ChannelHandlerContext, Object)}. */ public interface ChannelInboundMessageHandler extends ChannelInboundHandler { diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java index b59ccdfc5c..f601fb31d3 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java @@ -15,6 +15,7 @@ */ package io.netty.channel; +import io.netty.buffer.ByteBuf; import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerUtil.SingleInboundMessageHandler; @@ -41,6 +42,10 @@ import io.netty.util.internal.TypeParameterMatcher; * } * * + * If your {@link ChannelInboundMessageHandlerAdapter} handles messages of type {@link ByteBuf} or {@link Object} + * and you want to add a {@link ByteBuf} to the next buffer in the {@link ChannelPipeline} use + * {@link ChannelHandlerUtil#addToNextInboundBuffer(ChannelHandlerContext, Object)}. + * * @param The type of the messages to handle */ public abstract class ChannelInboundMessageHandlerAdapter diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandler.java b/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandler.java index 8e81991dad..eb2a1ad4be 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandler.java @@ -15,12 +15,17 @@ */ package io.netty.channel; +import io.netty.buffer.ByteBuf; import io.netty.buffer.MessageBuf; /** * ChannelOutboundHandler implementation which operates on messages of a specific type * by pass them in a {@link MessageBuf} and consume then from there. * + * If your {@link ChannelOutboundMessageHandler} handles messages of type {@link ByteBuf} or {@link Object} + * and you want to add a {@link ByteBuf} to the next buffer in the {@link ChannelPipeline} use + * {@link ChannelHandlerUtil#addToNextOutboundBuffer(ChannelHandlerContext, Object)}. + * * @param the message type */ public interface ChannelOutboundMessageHandler extends ChannelOutboundHandler { diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java index 34f550286c..ed090219d8 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java @@ -15,6 +15,7 @@ */ package io.netty.channel; +import io.netty.buffer.ByteBuf; import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerUtil.SingleOutboundMessageHandler; @@ -24,6 +25,10 @@ import io.netty.util.internal.TypeParameterMatcher; /** * Abstract base class which handles messages of a specific type. * + * If your {@link ChannelOutboundMessageHandlerAdapter} handles messages of type {@link ByteBuf} or {@link Object} + * and you want to add a {@link ByteBuf} to the next buffer in the {@link ChannelPipeline} use + * {@link ChannelHandlerUtil#addToNextOutboundBuffer(ChannelHandlerContext, Object)}. + * * @param The type of the messages to handle */ public abstract class ChannelOutboundMessageHandlerAdapter diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index e853ad1e01..fa10b1caa9 100755 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -16,6 +16,7 @@ package io.netty.channel; import io.netty.buffer.Buf; +import io.netty.buffer.BufType; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.MessageBuf; @@ -1622,6 +1623,34 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements return ctx; } + @Override + public BufType nextInboundBufferType() { + DefaultChannelHandlerContext ctx = this; + do { + ctx = ctx.next; + } while (!(ctx.handler() instanceof ChannelInboundHandler)); + + if (ctx.handler() instanceof ChannelInboundByteHandler) { + return BufType.BYTE; + } else { + return BufType.MESSAGE; + } + } + + @Override + public BufType nextOutboundBufferType() { + DefaultChannelHandlerContext ctx = this; + do { + ctx = ctx.prev; + } while (!(ctx.handler() instanceof ChannelOutboundHandler)); + + if (ctx.handler() instanceof ChannelOutboundByteHandler) { + return BufType.BYTE; + } else { + return BufType.MESSAGE; + } + } + private DefaultChannelHandlerContext findContextOutbound() { DefaultChannelHandlerContext ctx = this; do {