diff --git a/src/main/java/org/jboss/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java b/src/main/java/org/jboss/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java new file mode 100644 index 0000000000..57fbfe7782 --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java @@ -0,0 +1,65 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.handler.codec.protobuf; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.handler.codec.frame.FrameDecoder; + +import com.google.protobuf.CodedInputStream; + +/** + * A decoder that splits the received {@link ChannelBuffer}s dynamically by the + * value of the length field in the message. {@link ProtobufVarint32FrameDecoder} + * should be used to decode a binary message which has an integer header field + * encoded as Google Protocol Buffer Base + * 128 Varints (32-bit) integer that represents the length of the message + * body. + * + * @see com.google.protobuf.CodedInputStream + * + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Tomasz Blachowicz (tblachowicz@gmail.com) + * + * @version $Rev$, $Date$ + */ +public class ProtobufVarint32FrameDecoder extends FrameDecoder { + + @Override + protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { + buffer.markReaderIndex(); + byte[] buf = new byte[5]; + for (int i = 0; i < 5; i ++) { + if (!buffer.readable()) { + break; + } + + buf[i] = buffer.readByte(); + if (buf[i] >= 0) { + int messageSize = CodedInputStream.newInstance(buf, 0, i + 1).readRawVarint32(); + if (buffer.readableBytes() < messageSize) { + break; + } + + return buffer.readBytes(messageSize); + } + } + + buffer.resetReaderIndex(); + return null; + } +} diff --git a/src/main/java/org/jboss/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrepender.java b/src/main/java/org/jboss/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrepender.java new file mode 100644 index 0000000000..11a3079c57 --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrepender.java @@ -0,0 +1,59 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.handler.codec.protobuf; + +import static org.jboss.netty.buffer.ChannelBuffers.*; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBufferOutputStream; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipelineCoverage; +import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; + +import com.google.protobuf.CodedOutputStream; + +/** + * An encoder that prepends the length of the message. The length value is + * prepended as a binary form. encoded as Google Protocol Buffer + * Base + * 128 Varints (32-bit). + * + * @see com.google.protobuf.CodedOutputStream + * + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Tomasz Blachowicz (tblachowicz@gmail.com) + * @version $Rev$, $Date$ + */ +@ChannelPipelineCoverage("all") +public class ProtobufVarint32LengthFieldPrepender extends OneToOneEncoder { + + @Override + protected Object encode(ChannelHandlerContext ctx, Channel channel, + Object msg) throws Exception { + ChannelBuffer body = (ChannelBuffer) msg; + int length = body.readableBytes(); + ChannelBuffer header = + channel.getConfig().getBufferFactory().getBuffer( + CodedOutputStream.computeRawVarint32Size(length)); + CodedOutputStream codedOutputStream = CodedOutputStream + .newInstance(new ChannelBufferOutputStream(header)); + codedOutputStream.writeRawVarint32(length); + codedOutputStream.flush(); + return wrappedBuffer(header, body); + } + +} diff --git a/src/test/java/org/jboss/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoderTest.java b/src/test/java/org/jboss/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoderTest.java new file mode 100644 index 0000000000..18ebb2b8af --- /dev/null +++ b/src/test/java/org/jboss/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoderTest.java @@ -0,0 +1,71 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.handler.codec.protobuf; + +import static org.hamcrest.core.Is.*; +import static org.hamcrest.core.IsNull.*; +import static org.jboss.netty.buffer.ChannelBuffers.*; +import static org.junit.Assert.*; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.handler.codec.embedder.DecoderEmbedder; +import org.junit.Before; +import org.junit.Test; + +/** + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Tomasz Blachowicz (tblachowicz@gmail.com) + * @version $Rev$, $Date$ + */ +public class ProtobufVarint32FrameDecoderTest { + + private DecoderEmbedder embedder; + + @Before + public void setUp() { + embedder = new DecoderEmbedder( + new ProtobufVarint32FrameDecoder()); + } + + @Test + public void testTinyDecode() { + byte[] b = new byte[] { 4, 1, 1, 1, 1 }; + embedder.offer(wrappedBuffer(b, 0, 1)); + assertThat(embedder.poll(), is(nullValue())); + embedder.offer(wrappedBuffer(b, 1, 2)); + assertThat(embedder.poll(), is(nullValue())); + embedder.offer(wrappedBuffer(b, 3, b.length - 3)); + assertThat(embedder.poll(), + is(wrappedBuffer(new byte[] { 1, 1, 1, 1 }))); + } + + @Test + public void testRegularDecode() { + byte[] b = new byte[2048]; + for (int i = 2; i < 2048; i ++) { + b[i] = 1; + } + b[0] = -2; + b[1] = 15; + embedder.offer(wrappedBuffer(b, 0, 127)); + assertThat(embedder.poll(), is(nullValue())); + embedder.offer(wrappedBuffer(b, 127, 600)); + assertThat(embedder.poll(), is(nullValue())); + embedder.offer(wrappedBuffer(b, 727, b.length - 727)); + assertThat(embedder.poll(), is(wrappedBuffer(b, 2, b.length - 2))); + } + +} diff --git a/src/test/java/org/jboss/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrependerTest.java b/src/test/java/org/jboss/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrependerTest.java new file mode 100644 index 0000000000..1d313054f8 --- /dev/null +++ b/src/test/java/org/jboss/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrependerTest.java @@ -0,0 +1,60 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.handler.codec.protobuf; + +import static org.hamcrest.core.Is.*; +import static org.jboss.netty.buffer.ChannelBuffers.*; +import static org.junit.Assert.*; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.handler.codec.embedder.EncoderEmbedder; +import org.junit.Before; +import org.junit.Test; + +/** + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Tomasz Blachowicz (tblachowicz@gmail.com) + * @version $Rev$, $Date$ + */ +public class ProtobufVarint32LengthFieldPrependerTest { + + private EncoderEmbedder embedder; + + @Before + public void setUp() { + embedder = new EncoderEmbedder( + new ProtobufVarint32LengthFieldPrepender()); + } + + @Test + public void testTinyEncode() { + byte[] b = new byte[] { 4, 1, 1, 1, 1 }; + embedder.offer(wrappedBuffer(b, 1, b.length - 1)); + assertThat(embedder.poll(), is(wrappedBuffer(b))); + } + + @Test + public void testRegularDecode() { + byte[] b = new byte[2048]; + for (int i = 2; i < 2048; i ++) { + b[i] = 1; + } + b[0] = -2; + b[1] = 15; + embedder.offer(wrappedBuffer(b, 2, b.length - 2)); + assertThat(embedder.poll(), is(wrappedBuffer(b))); + } +}