Add DatagramPacketEncoder and DatagramPacketDecoder

Motivation:

UDP-oriented codec reusing the existing encoders and decoders would be helpful. See #1350

Modifications:

Add DatagramPacketEncoder and DatagramPacketDecoder to reuse the existing encoders and decoders.

Result:

People can use DatagramPacketEncoder and DatagramPacketDecoder to wrap existing encoders and decoders to create UDP-oriented codec.
This commit is contained in:
Xiaoyan Lin 2016-03-06 17:58:48 -08:00 committed by Norman Maurer
parent 52bfaae1a0
commit 4fb585965c
4 changed files with 405 additions and 0 deletions

View File

@ -0,0 +1,111 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import java.util.List;
/**
* A decoder that decodes the content of the received {@link DatagramPacket} using
* the specified {@link ByteBuf} decoder. E.g.,
*
* <pre><code>
* {@link ChannelPipeline} pipeline = ...;
* pipeline.addLast("udpDecoder", new {@link DatagramPacketDecoder}(new {@link ProtobufDecoder}(...));
* </code></pre>
*/
public class DatagramPacketDecoder extends MessageToMessageDecoder<DatagramPacket> {
private final MessageToMessageDecoder<ByteBuf> decoder;
/**
* Create a {@link DatagramPacket} decoder using the specified {@link ByteBuf} decoder.
*
* @param decoder the specified {@link ByteBuf} decoder
*/
public DatagramPacketDecoder(MessageToMessageDecoder<ByteBuf> decoder) {
this.decoder = checkNotNull(decoder, "decoder");
}
@Override
public boolean acceptInboundMessage(Object msg) throws Exception {
if (msg instanceof DatagramPacket) {
return decoder.acceptInboundMessage(((DatagramPacket) msg).content());
}
return false;
}
@Override
protected void decode(ChannelHandlerContext ctx, DatagramPacket msg, List<Object> out) throws Exception {
decoder.decode(ctx, msg.content(), out);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
decoder.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
decoder.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
decoder.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
decoder.channelInactive(ctx);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
decoder.channelReadComplete(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
decoder.userEventTriggered(ctx, evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
decoder.channelWritabilityChanged(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
decoder.exceptionCaught(ctx, cause);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
decoder.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
decoder.handlerRemoved(ctx);
}
}

View File

@ -0,0 +1,152 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.AddressedEnvelope;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.RecyclableArrayList;
import io.netty.util.internal.StringUtil;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
/**
* An encoder that encodes the content in {@link AddressedEnvelope} to {@link DatagramPacket} using
* the specified message encoder. E.g.,
*
* <pre><code>
* {@link ChannelPipeline} pipeline = ...;
* pipeline.addLast("udpEncoder", new {@link DatagramPacketEncoder}(new {@link ProtobufEncoder}(...));
* </code></pre>
*
* Note: As UDP packets are out-of-order, you should make sure the encoded message size are not greater than
* the max safe packet size in your particular network path which guarantees no packet fragmentation.
*
* @param <M> the type of message to be encoded
*/
public class DatagramPacketEncoder<M> extends MessageToMessageEncoder<AddressedEnvelope<M, InetSocketAddress>> {
private final MessageToMessageEncoder<? super M> encoder;
/**
* Create an encoder that encodes the content in {@link AddressedEnvelope} to {@link DatagramPacket} using
* the specified message encoder.
*
* @param encoder the specified message encoder
*/
public DatagramPacketEncoder(MessageToMessageEncoder<? super M> encoder) {
this.encoder = checkNotNull(encoder, "encoder");
}
@Override
public boolean acceptOutboundMessage(Object msg) throws Exception {
if (super.acceptOutboundMessage(msg)) {
AddressedEnvelope envelope = (AddressedEnvelope) msg;
return encoder.acceptOutboundMessage(envelope.content())
&& envelope.sender() instanceof InetSocketAddress
&& envelope.recipient() instanceof InetSocketAddress;
}
return false;
}
@Override
protected void encode(
ChannelHandlerContext ctx, AddressedEnvelope<M, InetSocketAddress> msg, List<Object> out) throws Exception {
RecyclableArrayList buffers = null;
try {
buffers = RecyclableArrayList.newInstance();
encoder.encode(ctx, msg.content(), buffers);
if (buffers.size() != 1) {
throw new EncoderException(
StringUtil.simpleClassName(encoder) + " must produce only one message.");
}
Object content = buffers.get(0);
if (content instanceof ByteBuf) {
out.add(new DatagramPacket(((ByteBuf) content).retain(), msg.recipient(), msg.sender()));
} else {
throw new EncoderException(
StringUtil.simpleClassName(encoder) + " must produce only ByteBuf.");
}
} finally {
if (buffers != null) {
for (Object o : buffers) {
ReferenceCountUtil.release(o);
}
buffers.recycle();
}
}
}
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
encoder.bind(ctx, localAddress, promise);
}
@Override
public void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
encoder.connect(ctx, remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
encoder.disconnect(ctx, promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
encoder.close(ctx, promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
encoder.deregister(ctx, promise);
}
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
encoder.read(ctx);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
encoder.flush(ctx);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
encoder.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
encoder.handlerRemoved(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
encoder.exceptionCaught(ctx, cause);
}
}

View File

@ -0,0 +1,58 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.util.CharsetUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.net.InetSocketAddress;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class DatagramPacketDecoderTest {
private EmbeddedChannel channel;
@Before
public void setUp() {
channel = new EmbeddedChannel(
new DatagramPacketDecoder(
new StringDecoder(CharsetUtil.UTF_8)));
}
@After
public void tearDown() {
assertFalse(channel.finish());
}
@Test
public void testDecode() {
InetSocketAddress recipient = new InetSocketAddress("127.0.0.1", 10000);
InetSocketAddress sender = new InetSocketAddress("127.0.0.1", 20000);
ByteBuf content = Unpooled.wrappedBuffer("netty".getBytes(CharsetUtil.UTF_8));
assertTrue(channel.writeInbound(new DatagramPacket(content, recipient, sender)));
assertEquals("netty", channel.readInbound());
}
}

View File

@ -0,0 +1,84 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec;
import io.netty.channel.DefaultAddressedEnvelope;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.net.InetSocketAddress;
import static org.junit.Assert.*;
public class DatagramPacketEncoderTest {
private EmbeddedChannel channel;
@Before
public void setUp() {
channel = new EmbeddedChannel(
new DatagramPacketEncoder<String>(
new StringEncoder(CharsetUtil.UTF_8)));
}
@After
public void tearDown() {
assertFalse(channel.finish());
}
@Test
public void testEncode() {
InetSocketAddress recipient = new InetSocketAddress("127.0.0.1", 10000);
InetSocketAddress sender = new InetSocketAddress("127.0.0.1", 20000);
assertTrue(channel.writeOutbound(
new DefaultAddressedEnvelope<String, InetSocketAddress>("netty", recipient, sender)));
DatagramPacket packet = channel.readOutbound();
try {
assertEquals("netty", packet.content().toString(CharsetUtil.UTF_8));
assertEquals(recipient, packet.recipient());
assertEquals(sender, packet.sender());
} finally {
packet.release();
}
}
@Test
public void testUnmatchedMessageType() {
InetSocketAddress recipient = new InetSocketAddress("127.0.0.1", 10000);
InetSocketAddress sender = new InetSocketAddress("127.0.0.1", 20000);
DefaultAddressedEnvelope<Long, InetSocketAddress> envelope =
new DefaultAddressedEnvelope<Long, InetSocketAddress>(1L, recipient, sender);
assertTrue(channel.writeOutbound(envelope));
DefaultAddressedEnvelope<Long, InetSocketAddress> output = channel.readOutbound();
try {
assertSame(envelope, output);
} finally {
output.release();
}
}
@Test
public void testUnmatchedType() {
String netty = "netty";
assertTrue(channel.writeOutbound(netty));
assertSame(netty, channel.readOutbound());
}
}