Add the 'nextBufferType' parameter to ByteArrayEncoder like did to StringEncoder / Consistent parameter order

This commit is contained in:
Trustin Lee 2013-02-08 01:36:41 +09:00
parent b8c0751023
commit d8c0bf3be2
12 changed files with 147 additions and 59 deletions

View File

@ -15,14 +15,16 @@
*/
package io.netty.handler.codec.bytes;
import io.netty.buffer.BufType;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundMessageHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.MessageToMessageEncoder;
/**
* Encodes the requested array of bytes into a {@link ByteBuf}.
@ -48,22 +50,53 @@ import io.netty.handler.codec.MessageToMessageEncoder;
* }
* </pre>
*/
public class ByteArrayEncoder extends MessageToMessageEncoder<byte[]> {
public class ByteArrayEncoder extends ChannelOutboundMessageHandlerAdapter<byte[]> {
public ByteArrayEncoder() {
super(byte[].class);
}
private final BufType nextBufferType;
@Override
public MessageBuf<byte[]> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return Unpooled.messageBuffer();
}
@Override
protected ByteBuf encode(ChannelHandlerContext ctx, byte[] msg) throws Exception {
if (msg.length == 0) {
return null;
public ByteArrayEncoder(BufType nextBufferType) {
if (nextBufferType == null) {
throw new NullPointerException("nextBufferType");
}
this.nextBufferType = nextBufferType;
}
@Override
public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
MessageBuf<Object> in = ctx.outboundMessageBuffer();
MessageBuf<Object> msgOut = ctx.nextOutboundMessageBuffer();
ByteBuf byteOut = ctx.nextOutboundByteBuffer();
try {
for (;;) {
Object m = in.poll();
if (m == null) {
break;
}
if (!(m instanceof byte[])) {
msgOut.add(m);
continue;
}
byte[] a = (byte[]) m;
if (a.length == 0) {
continue;
}
switch (nextBufferType) {
case BYTE:
byteOut.writeBytes(a);
break;
case MESSAGE:
msgOut.add(Unpooled.wrappedBuffer(a));
break;
default:
throw new Error();
}
}
} finally {
ctx.flush(promise);
}
return Unpooled.wrappedBuffer(msg);
}
}

View File

@ -15,13 +15,16 @@
*/
package io.netty.handler.codec.string;
import io.netty.buffer.BufType;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundMessageHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.nio.charset.Charset;
@ -48,32 +51,67 @@ import java.nio.charset.Charset;
* @apiviz.landmark
*/
@Sharable
public class StringEncoder extends MessageToMessageEncoder<CharSequence> {
public class StringEncoder extends ChannelOutboundMessageHandlerAdapter<CharSequence> {
private final BufType nextBufferType;
// TODO Use CharsetEncoder instead.
private final Charset charset;
/**
* Creates a new instance with the current system character set.
*/
public StringEncoder() {
this(Charset.defaultCharset());
public StringEncoder(BufType nextBufferType) {
this(nextBufferType, Charset.defaultCharset());
}
/**
* Creates a new instance with the specified character set.
*/
public StringEncoder(Charset charset) {
super(CharSequence.class);
public StringEncoder(BufType nextBufferType, Charset charset) {
if (nextBufferType == null) {
throw new NullPointerException("nextBufferType");
}
if (charset == null) {
throw new NullPointerException("charset");
}
this.nextBufferType = nextBufferType;
this.charset = charset;
}
@Override
protected Object encode(ChannelHandlerContext ctx, CharSequence msg) throws Exception {
return Unpooled.copiedBuffer(msg, charset);
public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
MessageBuf<Object> in = ctx.outboundMessageBuffer();
MessageBuf<Object> msgOut = ctx.nextOutboundMessageBuffer();
ByteBuf byteOut = ctx.nextOutboundByteBuffer();
try {
for (;;) {
Object m = in.poll();
if (m == null) {
break;
}
if (!(m instanceof CharSequence)) {
msgOut.add(m);
continue;
}
CharSequence s = (CharSequence) m;
ByteBuf encoded = Unpooled.copiedBuffer(s, charset);
switch (nextBufferType) {
case BYTE:
byteOut.writeBytes(encoded);
break;
case MESSAGE:
msgOut.add(encoded);
break;
default:
throw new Error();
}
}
} finally {
ctx.flush(promise);
}
}
}

View File

@ -15,27 +15,27 @@
*/
package io.netty.handler.codec.bytes;
import io.netty.buffer.BufType;
import io.netty.buffer.ByteBuf;
import io.netty.channel.embedded.EmbeddedMessageChannel;
import org.junit.Before;
import org.junit.Test;
import java.util.Random;
import static io.netty.buffer.Unpooled.*;
import static org.hamcrest.core.Is.*;
import static org.hamcrest.core.IsNull.*;
import static org.junit.Assert.*;
import io.netty.buffer.ByteBuf;
import io.netty.channel.embedded.EmbeddedMessageChannel;
import java.util.Random;
import org.junit.Before;
import org.junit.Test;
/**
*/
@SuppressWarnings("ZeroLengthArrayAllocation")
public class ByteArrayEncoderTest {
private EmbeddedMessageChannel ch;
@Before
public void setUp() {
ch = new EmbeddedMessageChannel(new ByteArrayEncoder());
ch = new EmbeddedMessageChannel(new ByteArrayEncoder(BufType.MESSAGE));
}
@Test

View File

@ -16,14 +16,15 @@
package io.netty.example.filetransfer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.BufType;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
@ -58,7 +59,7 @@ public class FileServer {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new StringEncoder(CharsetUtil.UTF_8),
new StringEncoder(BufType.BYTE, CharsetUtil.UTF_8),
new LineBasedFrameDecoder(8192),
new StringDecoder(CharsetUtil.UTF_8),
new FileHandler());

View File

@ -16,11 +16,12 @@
package io.netty.example.rxtx;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.BufType;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.rxtx.RxtxChannel;
import io.netty.channel.rxtx.RxtxDeviceAddress;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
@ -40,7 +41,7 @@ public final class RxtxClient {
public void initChannel(RxtxChannel ch) throws Exception {
ch.pipeline().addLast(
new LineBasedFrameDecoder(32768),
new StringEncoder(),
new StringEncoder(BufType.BYTE),
new StringDecoder(),
new RxtxClientHandler()
);

View File

@ -15,6 +15,7 @@
*/
package io.netty.example.securechat;
import io.netty.buffer.BufType;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
@ -51,7 +52,7 @@ public class SecureChatClientInitializer extends ChannelInitializer<SocketChanne
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(
8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("encoder", new StringEncoder(BufType.BYTE));
// and then business logic.
pipeline.addLast("handler", new SecureChatClientHandler());

View File

@ -15,6 +15,7 @@
*/
package io.netty.example.securechat;
import io.netty.buffer.BufType;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
@ -54,7 +55,7 @@ public class SecureChatServerInitializer extends ChannelInitializer<SocketChanne
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(
8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("encoder", new StringEncoder(BufType.BYTE));
// and then business logic.
pipeline.addLast("handler", new SecureChatServerHandler());

View File

@ -15,6 +15,7 @@
*/
package io.netty.example.telnet;
import io.netty.buffer.BufType;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
@ -28,7 +29,7 @@ import io.netty.handler.codec.string.StringEncoder;
*/
public class TelnetClientInitializer extends ChannelInitializer<SocketChannel> {
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
private static final StringEncoder ENCODER = new StringEncoder(BufType.BYTE);
private static final TelnetClientHandler CLIENTHANDLER = new TelnetClientHandler();
@Override
public void initChannel(SocketChannel ch) throws Exception {

View File

@ -15,6 +15,7 @@
*/
package io.netty.example.telnet;
import io.netty.buffer.BufType;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
@ -28,7 +29,7 @@ import io.netty.handler.codec.string.StringEncoder;
*/
public class TelnetServerPipelineFactory extends ChannelInitializer<SocketChannel> {
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
private static final StringEncoder ENCODER = new StringEncoder(BufType.BYTE);
private static final TelnetServerHandler SERVERHANDLER = new TelnetServerHandler();
@Override

View File

@ -17,6 +17,7 @@ package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.BufType;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
@ -76,7 +77,7 @@ public class SocketStartTlsTest extends AbstractSocketTest {
public void initChannel(SocketChannel sch) throws Exception {
ChannelPipeline p = sch.pipeline();
p.addLast("logger", new ByteLoggingHandler(LOG_LEVEL));
p.addLast(new LineBasedFrameDecoder(64), new StringDecoder(), new StringEncoder());
p.addLast(new LineBasedFrameDecoder(64), new StringDecoder(), new StringEncoder(BufType.BYTE));
p.addLast(executor, sh);
}
});
@ -86,7 +87,7 @@ public class SocketStartTlsTest extends AbstractSocketTest {
public void initChannel(SocketChannel sch) throws Exception {
ChannelPipeline p = sch.pipeline();
p.addLast("logger", new ByteLoggingHandler(LOG_LEVEL));
p.addLast(new LineBasedFrameDecoder(64), new StringDecoder(), new StringEncoder());
p.addLast(new LineBasedFrameDecoder(64), new StringDecoder(), new StringEncoder(BufType.BYTE));
p.addLast(executor, ch);
}
});

View File

@ -17,6 +17,7 @@ package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.BufType;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
@ -66,7 +67,7 @@ public class SocketStringEchoTest extends AbstractSocketTest {
public void initChannel(SocketChannel sch) throws Exception {
sch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(512, Delimiters.lineDelimiter()));
sch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.ISO_8859_1));
sch.pipeline().addBefore("decoder", "encoder", new StringEncoder(CharsetUtil.ISO_8859_1));
sch.pipeline().addBefore("decoder", "encoder", new StringEncoder(BufType.BYTE, CharsetUtil.ISO_8859_1));
sch.pipeline().addAfter("decoder", "handler", sh);
}
});
@ -76,7 +77,7 @@ public class SocketStringEchoTest extends AbstractSocketTest {
public void initChannel(SocketChannel sch) throws Exception {
sch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(512, Delimiters.lineDelimiter()));
sch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.ISO_8859_1));
sch.pipeline().addBefore("decoder", "encoder", new StringEncoder(CharsetUtil.ISO_8859_1));
sch.pipeline().addBefore("decoder", "encoder", new StringEncoder(BufType.BYTE, CharsetUtil.ISO_8859_1));
sch.pipeline().addAfter("decoder", "handler", ch);
}
});

View File

@ -1274,23 +1274,32 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
int msgSinkSize = msgSink.size();
if (msgSinkSize != 0) {
MessageBuf<Object> in = msgSink;
for (;;) {
Object m = in.poll();
if (m == null) {
break;
}
if (m instanceof Freeable) {
((Freeable) m).free();
}
int discardedMessages = 0;
MessageBuf<Object> in = msgSink;
for (;;) {
Object m = in.poll();
if (m == null) {
break;
}
if (m instanceof ByteBuf) {
ByteBuf src = (ByteBuf) m;
byteSink.writeBytes(src, src.readerIndex(), src.readableBytes());
} else {
discardedMessages ++;
}
if (m instanceof Freeable) {
((Freeable) m).free();
}
}
if (discardedMessages != 0) {
logger.warn(
"Discarded {} outbound message(s) that reached at the end of the pipeline. " +
"Please check your pipeline configuration.", msgSinkSize);
"Please check your pipeline configuration.", discardedMessages);
}
unsafe.flush(promise);
}
}