diff --git a/codec/src/main/java/io/netty/handler/codec/DatagramPacketDecoder.java b/codec/src/main/java/io/netty/handler/codec/DatagramPacketDecoder.java new file mode 100644 index 0000000000..bc0426cac1 --- /dev/null +++ b/codec/src/main/java/io/netty/handler/codec/DatagramPacketDecoder.java @@ -0,0 +1,111 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.DatagramPacket; +import io.netty.handler.codec.protobuf.ProtobufDecoder; +import static io.netty.util.internal.ObjectUtil.checkNotNull; + +import java.util.List; + +/** + * A decoder that decodes the content of the received {@link DatagramPacket} using + * the specified {@link ByteBuf} decoder. E.g., + * + *

+ * {@link ChannelPipeline} pipeline = ...;
+ * pipeline.addLast("udpDecoder", new {@link DatagramPacketDecoder}(new {@link ProtobufDecoder}(...));
+ * 
+ */ +public class DatagramPacketDecoder extends MessageToMessageDecoder { + + private final MessageToMessageDecoder decoder; + + /** + * Create a {@link DatagramPacket} decoder using the specified {@link ByteBuf} decoder. + * + * @param decoder the specified {@link ByteBuf} decoder + */ + public DatagramPacketDecoder(MessageToMessageDecoder decoder) { + this.decoder = checkNotNull(decoder, "decoder"); + } + + @Override + public boolean acceptInboundMessage(Object msg) throws Exception { + if (msg instanceof DatagramPacket) { + return decoder.acceptInboundMessage(((DatagramPacket) msg).content()); + } + return false; + } + + @Override + protected void decode(ChannelHandlerContext ctx, DatagramPacket msg, List out) throws Exception { + decoder.decode(ctx, msg.content(), out); + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + decoder.channelRegistered(ctx); + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + decoder.channelUnregistered(ctx); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + decoder.channelActive(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + decoder.channelInactive(ctx); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + decoder.channelReadComplete(ctx); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + decoder.userEventTriggered(ctx, evt); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + decoder.channelWritabilityChanged(ctx); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + decoder.exceptionCaught(ctx, cause); + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + decoder.handlerAdded(ctx); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + decoder.handlerRemoved(ctx); + } +} diff --git a/codec/src/main/java/io/netty/handler/codec/DatagramPacketEncoder.java b/codec/src/main/java/io/netty/handler/codec/DatagramPacketEncoder.java new file mode 100644 index 0000000000..bbdc89e1c0 --- /dev/null +++ b/codec/src/main/java/io/netty/handler/codec/DatagramPacketEncoder.java @@ -0,0 +1,152 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.AddressedEnvelope; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.channel.socket.DatagramPacket; +import io.netty.handler.codec.protobuf.ProtobufEncoder; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.internal.RecyclableArrayList; +import io.netty.util.internal.StringUtil; +import static io.netty.util.internal.ObjectUtil.checkNotNull; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.List; + +/** + * An encoder that encodes the content in {@link AddressedEnvelope} to {@link DatagramPacket} using + * the specified message encoder. E.g., + * + *

+ * {@link ChannelPipeline} pipeline = ...;
+ * pipeline.addLast("udpEncoder", new {@link DatagramPacketEncoder}(new {@link ProtobufEncoder}(...));
+ * 
+ * + * Note: As UDP packets are out-of-order, you should make sure the encoded message size are not greater than + * the max safe packet size in your particular network path which guarantees no packet fragmentation. + * + * @param the type of message to be encoded + */ +public class DatagramPacketEncoder extends MessageToMessageEncoder> { + + private final MessageToMessageEncoder encoder; + + /** + * Create an encoder that encodes the content in {@link AddressedEnvelope} to {@link DatagramPacket} using + * the specified message encoder. + * + * @param encoder the specified message encoder + */ + public DatagramPacketEncoder(MessageToMessageEncoder encoder) { + this.encoder = checkNotNull(encoder, "encoder"); + } + + @Override + public boolean acceptOutboundMessage(Object msg) throws Exception { + if (super.acceptOutboundMessage(msg)) { + AddressedEnvelope envelope = (AddressedEnvelope) msg; + return encoder.acceptOutboundMessage(envelope.content()) + && envelope.sender() instanceof InetSocketAddress + && envelope.recipient() instanceof InetSocketAddress; + } + return false; + } + + @Override + protected void encode( + ChannelHandlerContext ctx, AddressedEnvelope msg, List out) throws Exception { + RecyclableArrayList buffers = null; + try { + buffers = RecyclableArrayList.newInstance(); + encoder.encode(ctx, msg.content(), buffers); + if (buffers.size() != 1) { + throw new EncoderException( + StringUtil.simpleClassName(encoder) + " must produce only one message."); + } + Object content = buffers.get(0); + if (content instanceof ByteBuf) { + out.add(new DatagramPacket(((ByteBuf) content).retain(), msg.recipient(), msg.sender())); + } else { + throw new EncoderException( + StringUtil.simpleClassName(encoder) + " must produce only ByteBuf."); + } + } finally { + if (buffers != null) { + for (Object o : buffers) { + ReferenceCountUtil.release(o); + } + buffers.recycle(); + } + } + } + + @Override + public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { + encoder.bind(ctx, localAddress, promise); + } + + @Override + public void connect( + ChannelHandlerContext ctx, SocketAddress remoteAddress, + SocketAddress localAddress, ChannelPromise promise) throws Exception { + encoder.connect(ctx, remoteAddress, localAddress, promise); + } + + @Override + public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + encoder.disconnect(ctx, promise); + } + + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + encoder.close(ctx, promise); + } + + @Override + public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + encoder.deregister(ctx, promise); + } + + @Override + public void read(ChannelHandlerContext ctx) throws Exception { + encoder.read(ctx); + } + + @Override + public void flush(ChannelHandlerContext ctx) throws Exception { + encoder.flush(ctx); + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + encoder.handlerAdded(ctx); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + encoder.handlerRemoved(ctx); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + encoder.exceptionCaught(ctx, cause); + } +} diff --git a/codec/src/test/java/io/netty/handler/codec/DatagramPacketDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/DatagramPacketDecoderTest.java new file mode 100644 index 0000000000..f7ae0e2c03 --- /dev/null +++ b/codec/src/test/java/io/netty/handler/codec/DatagramPacketDecoderTest.java @@ -0,0 +1,58 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.channel.socket.DatagramPacket; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.util.CharsetUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.net.InetSocketAddress; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class DatagramPacketDecoderTest { + + private EmbeddedChannel channel; + + @Before + public void setUp() { + channel = new EmbeddedChannel( + new DatagramPacketDecoder( + new StringDecoder(CharsetUtil.UTF_8))); + } + + @After + public void tearDown() { + assertFalse(channel.finish()); + } + + @Test + public void testDecode() { + InetSocketAddress recipient = new InetSocketAddress("127.0.0.1", 10000); + InetSocketAddress sender = new InetSocketAddress("127.0.0.1", 20000); + ByteBuf content = Unpooled.wrappedBuffer("netty".getBytes(CharsetUtil.UTF_8)); + assertTrue(channel.writeInbound(new DatagramPacket(content, recipient, sender))); + assertEquals("netty", channel.readInbound()); + } +} diff --git a/codec/src/test/java/io/netty/handler/codec/DatagramPacketEncoderTest.java b/codec/src/test/java/io/netty/handler/codec/DatagramPacketEncoderTest.java new file mode 100644 index 0000000000..ee70cb7bb7 --- /dev/null +++ b/codec/src/test/java/io/netty/handler/codec/DatagramPacketEncoderTest.java @@ -0,0 +1,84 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec; + +import io.netty.channel.DefaultAddressedEnvelope; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.channel.socket.DatagramPacket; +import io.netty.handler.codec.string.StringEncoder; +import io.netty.util.CharsetUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.net.InetSocketAddress; + +import static org.junit.Assert.*; + +public class DatagramPacketEncoderTest { + + private EmbeddedChannel channel; + + @Before + public void setUp() { + channel = new EmbeddedChannel( + new DatagramPacketEncoder( + new StringEncoder(CharsetUtil.UTF_8))); + } + + @After + public void tearDown() { + assertFalse(channel.finish()); + } + + @Test + public void testEncode() { + InetSocketAddress recipient = new InetSocketAddress("127.0.0.1", 10000); + InetSocketAddress sender = new InetSocketAddress("127.0.0.1", 20000); + assertTrue(channel.writeOutbound( + new DefaultAddressedEnvelope("netty", recipient, sender))); + DatagramPacket packet = channel.readOutbound(); + try { + assertEquals("netty", packet.content().toString(CharsetUtil.UTF_8)); + assertEquals(recipient, packet.recipient()); + assertEquals(sender, packet.sender()); + } finally { + packet.release(); + } + } + + @Test + public void testUnmatchedMessageType() { + InetSocketAddress recipient = new InetSocketAddress("127.0.0.1", 10000); + InetSocketAddress sender = new InetSocketAddress("127.0.0.1", 20000); + DefaultAddressedEnvelope envelope = + new DefaultAddressedEnvelope(1L, recipient, sender); + assertTrue(channel.writeOutbound(envelope)); + DefaultAddressedEnvelope output = channel.readOutbound(); + try { + assertSame(envelope, output); + } finally { + output.release(); + } + } + + @Test + public void testUnmatchedType() { + String netty = "netty"; + assertTrue(channel.writeOutbound(netty)); + assertSame(netty, channel.readOutbound()); + } +}