Fixed several bugs in the replay state implementation

This commit is contained in:
Sam Pullara 2012-03-10 22:12:17 -08:00
parent a0a59d916d
commit f7a1ec61f2
3 changed files with 154 additions and 2 deletions

View File

@ -31,7 +31,19 @@ public class MultiBulkReply extends Reply {
}
public void read(RedisDecoder rd, ChannelBuffer is) throws IOException {
if (num == 0) {
// If we attempted to read the size before, skip the '*' and reread it
if (size == -1) {
byte star = is.readByte();
if (star == MARKER) {
size = 0;
} else {
throw new AssertionError("Unexpected character in stream: " + star);
}
}
if (size == 0) {
// If the read fails, we need to skip the star
size = -1;
// Read the size, if this is successful we won't read the star again
size = RedisDecoder.readInteger(is);
byteArrays = new Object[size];
rd.checkpoint();
@ -45,7 +57,7 @@ public class MultiBulkReply extends Reply {
} else {
throw new IOException("Unexpected character in stream: " + read);
}
num = i;
num = i + 1;
rd.checkpoint();
}
}

View File

@ -83,6 +83,12 @@ public class RedisDecoder extends ReplayingDecoder<State> {
}
public Reply receive(final ChannelBuffer is) throws IOException {
if (reply != null) {
reply.read(this, is);
Reply ret = reply;
reply = null;
return ret;
}
int code = is.readByte();
switch (code) {
case StatusReply.MARKER: {

View File

@ -0,0 +1,134 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.codec.redis;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
import org.jboss.netty.util.CharsetUtil;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class RedisCodecTest {
private DecoderEmbedder<ChannelBuffer> embedder;
@Before
public void setUp() {
embedder = new DecoderEmbedder<ChannelBuffer>(new RedisDecoder());
}
@Test
public void decodeReplies() throws IOException {
{
Object receive = decode("+OK\r\n".getBytes());
assertTrue(receive instanceof StatusReply);
assertEquals("OK", ((StatusReply) receive).status);
}
{
Object receive = decode("-ERROR\r\n".getBytes());
assertTrue(receive instanceof ErrorReply);
assertEquals("ERROR", ((ErrorReply) receive).error);
}
{
Object receive = decode(":123\r\n".getBytes());
assertTrue(receive instanceof IntegerReply);
assertEquals(123, ((IntegerReply) receive).integer);
}
{
Object receive = decode("$5\r\nnetty\r\n".getBytes());
assertTrue(receive instanceof BulkReply);
assertEquals("netty", new String(((BulkReply) receive).bytes));
}
{
Object receive = decode("*2\r\n$5\r\nnetty\r\n$5\r\nrules\r\n".getBytes());
assertTrue(receive instanceof MultiBulkReply);
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
}
}
private Object decode(byte[] bytes) {
embedder.offer(wrappedBuffer(bytes));
return embedder.poll();
}
@Test
public void encodeCommands() throws IOException {
String setCommand = "*3\r\n" +
"$3\r\n" +
"SET\r\n" +
"$5\r\n" +
"mykey\r\n" +
"$7\r\n" +
"myvalue\r\n";
Command command = new Command("SET", "mykey", "myvalue");
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
command.write(cb);
assertEquals(setCommand, cb.toString(CharsetUtil.US_ASCII));
}
@Test
public void testReplayDecoding() {
{
embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n".getBytes()));
Object receive = embedder.poll();
assertNull(receive);
embedder.offer(wrappedBuffer("$5\r\nrules\r\n".getBytes()));
receive = embedder.poll();
assertTrue(receive instanceof MultiBulkReply);
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
}
{
embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n$5\r\nr".getBytes()));
Object receive = embedder.poll();
assertNull(receive);
embedder.offer(wrappedBuffer("ules\r\n".getBytes()));
receive = embedder.poll();
assertTrue(receive instanceof MultiBulkReply);
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
}
{
embedder.offer(wrappedBuffer("*2".getBytes()));
Object receive = embedder.poll();
assertNull(receive);
embedder.offer(wrappedBuffer("\r\n$5\r\nnetty\r\n$5\r\nrules\r\n".getBytes()));
receive = embedder.poll();
assertTrue(receive instanceof MultiBulkReply);
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
}
{
embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n$5\r\nrules\r".getBytes()));
Object receive = embedder.poll();
assertNull(receive);
embedder.offer(wrappedBuffer("\n".getBytes()));
receive = embedder.poll();
assertTrue(receive instanceof MultiBulkReply);
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
}
}
}