From 0776911ed1a095fcfa52e2d98dcb86cda3e5c20c Mon Sep 17 00:00:00 2001 From: Sam Pullara Date: Sat, 25 Feb 2012 20:45:35 -0800 Subject: [PATCH 1/4] Redis client codec --- .../netty/handler/codec/redis/BulkReply.java | 26 ++++ .../netty/handler/codec/redis/Command.java | 120 +++++++++++++++++ .../netty/handler/codec/redis/ErrorReply.java | 29 ++++ .../handler/codec/redis/IntegerReply.java | 20 +++ .../handler/codec/redis/MultiBulkReply.java | 69 ++++++++++ .../handler/codec/redis/PSubscribeReply.java | 9 ++ .../codec/redis/PUnsubscribeReply.java | 17 +++ .../handler/codec/redis/RedisDecoder.java | 127 ++++++++++++++++++ .../handler/codec/redis/RedisEncoder.java | 40 ++++++ .../netty/handler/codec/redis/Reply.java | 32 +++++ .../handler/codec/redis/StatusReply.java | 20 +++ .../handler/codec/redis/SubscribeReply.java | 22 +++ .../handler/codec/redis/UnsubscribeReply.java | 23 ++++ .../handler/codec/redis/RedisClient.java | 53 ++++++++ 14 files changed, 607 insertions(+) create mode 100644 src/main/java/org/jboss/netty/handler/codec/redis/BulkReply.java create mode 100644 src/main/java/org/jboss/netty/handler/codec/redis/Command.java create mode 100644 src/main/java/org/jboss/netty/handler/codec/redis/ErrorReply.java create mode 100644 src/main/java/org/jboss/netty/handler/codec/redis/IntegerReply.java create mode 100644 src/main/java/org/jboss/netty/handler/codec/redis/MultiBulkReply.java create mode 100644 src/main/java/org/jboss/netty/handler/codec/redis/PSubscribeReply.java create mode 100644 src/main/java/org/jboss/netty/handler/codec/redis/PUnsubscribeReply.java create mode 100644 src/main/java/org/jboss/netty/handler/codec/redis/RedisDecoder.java create mode 100644 src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java create mode 100644 src/main/java/org/jboss/netty/handler/codec/redis/Reply.java create mode 100644 src/main/java/org/jboss/netty/handler/codec/redis/StatusReply.java create mode 100644 src/main/java/org/jboss/netty/handler/codec/redis/SubscribeReply.java create mode 100644 src/main/java/org/jboss/netty/handler/codec/redis/UnsubscribeReply.java create mode 100644 src/test/java/org/jboss/netty/handler/codec/redis/RedisClient.java diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/BulkReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/BulkReply.java new file mode 100644 index 0000000000..356bec6abe --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/codec/redis/BulkReply.java @@ -0,0 +1,26 @@ +package org.jboss.netty.handler.codec.redis; + +import org.jboss.netty.buffer.ChannelBuffer; + +import java.io.IOException; + +public class BulkReply extends Reply { + public static final char MARKER = '$'; + public final byte[] bytes; + + public BulkReply(byte[] bytes) { + this.bytes = bytes; + } + + public void write(ChannelBuffer os) throws IOException { + os.writeByte(MARKER); + if (bytes == null) { + os.writeBytes(Command.NEG_ONE); + } else { + os.writeBytes(Command.numToBytes(bytes.length)); + os.writeBytes(Command.CRLF); + os.writeBytes(bytes); + } + os.writeBytes(Command.CRLF); + } +} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/Command.java b/src/main/java/org/jboss/netty/handler/codec/redis/Command.java new file mode 100644 index 0000000000..61dff5c883 --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/codec/redis/Command.java @@ -0,0 +1,120 @@ +package org.jboss.netty.handler.codec.redis; + +import org.jboss.netty.buffer.ChannelBuffer; + +import java.io.IOException; + +/** + * Command serialization. + * User: sam + * Date: 7/27/11 + * Time: 3:04 PM + * To change this template use File | Settings | File Templates. + */ +public class Command { + public static final byte[] ARGS_PREFIX = "*".getBytes(); + public static final byte[] CRLF = "\r\n".getBytes(); + public static final byte[] BYTES_PREFIX = "$".getBytes(); + public static final byte[] EMPTY_BYTES = new byte[0]; + public static final byte[] NEG_ONE = Command.numToBytes(-1); + + private byte[][] arguments; + private Object[] objects; + + public String getName() { + if (arguments == null) { + Object o = objects[0]; + if (o instanceof byte[]) { + return new String((byte[]) o); + } else { + return o.toString(); + } + } else { + return new String(arguments[0]); + } + } + + public Command(byte[]... arguments) { + this.arguments = arguments; + } + + public Command(Object... objects) { + this.objects = objects; + } + + public void write(ChannelBuffer os) throws IOException { + writeDirect(os, objects); + } + + public static void writeDirect(ChannelBuffer os, Object... objects) throws IOException { + int length = objects.length; + byte[][] arguments = new byte[length][]; + for (int i = 0; i < length; i++) { + Object object = objects[i]; + if (object == null) { + arguments[i] = EMPTY_BYTES; + } else if (object instanceof byte[]) { + arguments[i] = (byte[]) object; + } else { + arguments[i] = object.toString().getBytes(Reply.UTF_8); + } + } + writeDirect(os, arguments); + } + + private static void writeDirect(ChannelBuffer os, byte[][] arguments) throws IOException { + os.writeBytes(ARGS_PREFIX); + os.writeBytes(Command.numToBytes(arguments.length)); + os.writeBytes(CRLF); + for (byte[] argument : arguments) { + os.writeBytes(BYTES_PREFIX); + os.writeBytes(Command.numToBytes(argument.length)); + os.writeBytes(CRLF); + os.writeBytes(argument); + os.writeBytes(CRLF); + } + } + + private static final int NUM_MAP_LENGTH = 256; + private static byte[][] numMap = new byte[NUM_MAP_LENGTH][]; + + static { + for (int i = 0; i < NUM_MAP_LENGTH; i++) { + numMap[i] = convert(i); + } + } + + // Optimized for the direct to ASCII bytes case + // Could be even more optimized but it is already + // about twice as fast as using Long.toString().getBytes() + public static byte[] numToBytes(long value) { + if (value >= 0 && value < NUM_MAP_LENGTH) { + return numMap[((int) value)]; + } else if (value == -1) { + return NEG_ONE; + } + return convert(value); + } + + private static byte[] convert(long value) { + boolean negative = value < 0; + int index = negative ? 2 : 1; + long current = negative ? -value : value; + while ((current /= 10) > 0) { + index++; + } + byte[] bytes = new byte[index]; + if (negative) { + bytes[0] = '-'; + } + current = negative ? -value : value; + long tmp = current; + while ((tmp /= 10) > 0) { + bytes[--index] = (byte) ('0' + (current % 10)); + current = tmp; + } + bytes[--index] = (byte) ('0' + current); + return bytes; + } + +} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/ErrorReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/ErrorReply.java new file mode 100644 index 0000000000..3e168183cb --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/codec/redis/ErrorReply.java @@ -0,0 +1,29 @@ +package org.jboss.netty.handler.codec.redis; + +import org.jboss.netty.buffer.ChannelBuffer; + +import java.io.IOException; + +/** + * Created by IntelliJ IDEA. + * User: sam + * Date: 7/29/11 + * Time: 10:23 AM + * To change this template use File | Settings | File Templates. + */ +public class ErrorReply extends Reply { + public static final char MARKER = '-'; + private static final byte[] ERR = "ERR ".getBytes(UTF_8); + public final String error; + + public ErrorReply(String error) { + this.error = error; + } + + public void write(ChannelBuffer os) throws IOException { + os.writeByte(MARKER); + os.writeBytes(ERR); + os.writeBytes(error.getBytes(UTF_8)); + os.writeBytes(Command.CRLF); + } +} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/IntegerReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/IntegerReply.java new file mode 100644 index 0000000000..615e774d2f --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/codec/redis/IntegerReply.java @@ -0,0 +1,20 @@ +package org.jboss.netty.handler.codec.redis; + +import org.jboss.netty.buffer.ChannelBuffer; + +import java.io.IOException; + +public class IntegerReply extends Reply { + public static final char MARKER = ':'; + public final long integer; + + public IntegerReply(long integer) { + this.integer = integer; + } + + public void write(ChannelBuffer os) throws IOException { + os.writeByte(MARKER); + os.writeBytes(Command.numToBytes(integer)); + os.writeBytes(Command.CRLF); + } +} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/MultiBulkReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/MultiBulkReply.java new file mode 100644 index 0000000000..a390be0630 --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/codec/redis/MultiBulkReply.java @@ -0,0 +1,69 @@ +package org.jboss.netty.handler.codec.redis; + +import org.jboss.netty.buffer.ChannelBuffer; + +import java.io.IOException; + +public class MultiBulkReply extends Reply { + public static final char MARKER = '*'; + + // State + public Object[] byteArrays; + private int size; + private int num; + + public MultiBulkReply() { + } + + public void read(RedisDecoder rd, ChannelBuffer is) throws IOException { + if (num == 0) { + size = RedisDecoder.readInteger(is); + byteArrays = new Object[size]; + rd.checkpoint(); + } + for (int i = num; i < size; i++) { + int read = is.readByte(); + if (read == BulkReply.MARKER) { + byteArrays[i] = rd.readBytes(is); + } else if (read == IntegerReply.MARKER) { + byteArrays[i] = RedisDecoder.readInteger(is); + } else { + throw new IOException("Unexpected character in stream: " + read); + } + num = i; + rd.checkpoint(); + } + } + + public MultiBulkReply(Object... values) { + this.byteArrays = values; + } + + public void write(ChannelBuffer os) throws IOException { + os.writeByte(MARKER); + if (byteArrays == null) { + os.writeBytes(Command.NEG_ONE); + os.writeBytes(Command.CRLF); + } else { + os.writeBytes(Command.numToBytes(byteArrays.length)); + os.writeBytes(Command.CRLF); + for (Object value : byteArrays) { + if (value == null) { + os.writeByte(BulkReply.MARKER); + os.writeBytes(Command.NEG_ONE); + } else if (value instanceof byte[]) { + byte[] bytes = (byte[]) value; + os.writeByte(BulkReply.MARKER); + int length = bytes.length; + os.writeBytes(Command.numToBytes(length)); + os.writeBytes(Command.CRLF); + os.writeBytes(bytes); + } else if (value instanceof Number) { + os.writeByte(IntegerReply.MARKER); + os.writeBytes(Command.numToBytes(((Number) value).longValue())); + } + os.writeBytes(Command.CRLF); + } + } + } +} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/PSubscribeReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/PSubscribeReply.java new file mode 100644 index 0000000000..2e68aaac1c --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/codec/redis/PSubscribeReply.java @@ -0,0 +1,9 @@ +package org.jboss.netty.handler.codec.redis; + +public class PSubscribeReply extends SubscribeReply { + + public PSubscribeReply(byte[][] patterns) { + super(patterns); + } + +} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/PUnsubscribeReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/PUnsubscribeReply.java new file mode 100644 index 0000000000..445517395d --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/codec/redis/PUnsubscribeReply.java @@ -0,0 +1,17 @@ +package org.jboss.netty.handler.codec.redis; + +import org.jboss.netty.buffer.ChannelBuffer; + +import java.io.IOException; + +public class PUnsubscribeReply extends UnsubscribeReply { + + public PUnsubscribeReply(byte[][] patterns) { + super(patterns); + } + + @Override + public void write(ChannelBuffer os) throws IOException { + + } +} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/RedisDecoder.java b/src/main/java/org/jboss/netty/handler/codec/redis/RedisDecoder.java new file mode 100644 index 0000000000..371f9aeaa0 --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/codec/redis/RedisDecoder.java @@ -0,0 +1,127 @@ +package org.jboss.netty.handler.codec.redis; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.handler.codec.replay.ReplayingDecoder; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; + +enum State { + +} + +public class RedisDecoder extends ReplayingDecoder { + + private static final char CR = '\r'; + private static final char LF = '\n'; + private static final char ZERO = '0'; + + // We track the current multibulk reply in the case + // where we do not get a complete reply in a single + // decode invocation. + private MultiBulkReply reply; + + public byte[] readBytes(ChannelBuffer is) throws IOException { + int size = readInteger(is); + if (size == -1) { + return null; + } + if (super.actualReadableBytes() < size + 2) { + // Trigger error + is.skipBytes(size + 2); + throw new AssertionError("Trustin says this isn't possible"); + } + byte[] bytes = new byte[size]; + is.readBytes(bytes, 0, size); + int cr = is.readByte(); + int lf = is.readByte(); + if (cr != CR || lf != LF) { + throw new IOException("Improper line ending: " + cr + ", " + lf); + } + return bytes; + } + + public static int readInteger(ChannelBuffer is) throws IOException { + int size = 0; + int sign = 1; + int read = is.readByte(); + if (read == '-') { + read = is.readByte(); + sign = -1; + } + do { + if (read == CR) { + if (is.readByte() == LF) { + break; + } + } + int value = read - ZERO; + if (value >= 0 && value < 10) { + size *= 10; + size += value; + } else { + throw new IOException("Invalid character in integer"); + } + read = is.readByte(); + } while (true); + return size * sign; + } + + public Reply receive(final ChannelBuffer is) throws IOException { + int code = is.readByte(); + switch (code) { + case StatusReply.MARKER: { + return new StatusReply(new DataInputStream(new ChannelBufferInputStream(is)).readLine()); + } + case ErrorReply.MARKER: { + return new ErrorReply(new DataInputStream(new ChannelBufferInputStream(is)).readLine()); + } + case IntegerReply.MARKER: { + return new IntegerReply(readInteger(is)); + } + case BulkReply.MARKER: { + return new BulkReply(readBytes(is)); + } + case MultiBulkReply.MARKER: { + return decodeMultiBulkReply(is); + } + default: { + throw new IOException("Unexpected character in stream: " + code); + } + } + } + + @Override + public void checkpoint() { + super.checkpoint(); + } + + @Override + protected Object decode(ChannelHandlerContext channelHandlerContext, Channel channel, ChannelBuffer channelBuffer, State anEnum) throws Exception { + return receive(channelBuffer); + } + + public MultiBulkReply decodeMultiBulkReply(ChannelBuffer is) throws IOException { + if (reply == null) { + reply = new MultiBulkReply(); + } + reply.read(this, is); + return reply; + } + + private static class ChannelBufferInputStream extends InputStream { + private final ChannelBuffer is; + + public ChannelBufferInputStream(ChannelBuffer is) { + this.is = is; + } + + @Override + public int read() throws IOException { + return is.readByte(); + } + } +} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java b/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java new file mode 100644 index 0000000000..db63ebd974 --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java @@ -0,0 +1,40 @@ +package org.jboss.netty.handler.codec.redis; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelDownstreamHandler; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class RedisEncoder extends SimpleChannelDownstreamHandler { + + private Queue pool = new ConcurrentLinkedQueue(); + + @Override + public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception { + Object o = e.getMessage(); + if (o instanceof Command) { + Command command = (Command) o; + ChannelBuffer cb = pool.poll(); + if (cb == null) { + cb = ChannelBuffers.dynamicBuffer(); + } + command.write(cb); + ChannelFuture future = e.getFuture(); + final ChannelBuffer finalCb = cb; + future.addListener(new ChannelFutureListener() { + public void operationComplete(ChannelFuture channelFuture) throws Exception { + finalCb.clear(); + pool.add(finalCb); + } + }); + Channels.write(ctx, future, cb); + } + } +} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/Reply.java b/src/main/java/org/jboss/netty/handler/codec/redis/Reply.java new file mode 100644 index 0000000000..d1a43e9c9e --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/codec/redis/Reply.java @@ -0,0 +1,32 @@ +package org.jboss.netty.handler.codec.redis; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +import java.io.IOException; +import java.nio.charset.Charset; + +/** + * Replies. + * User: sam + * Date: 7/27/11 + * Time: 3:04 PM + * To change this template use File | Settings | File Templates. + */ +public abstract class Reply { + + public static final Charset UTF_8 = Charset.forName("UTF-8"); + + public abstract void write(ChannelBuffer os) throws IOException; + + public String toString() { + ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer(); + try { + write(channelBuffer); + } catch (IOException e) { + throw new AssertionError("Trustin says this won't happen either"); + } + return channelBuffer.toString(UTF_8); + } + +} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/StatusReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/StatusReply.java new file mode 100644 index 0000000000..678a08f3d0 --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/codec/redis/StatusReply.java @@ -0,0 +1,20 @@ +package org.jboss.netty.handler.codec.redis; + +import org.jboss.netty.buffer.ChannelBuffer; + +import java.io.IOException; + +public class StatusReply extends Reply { + public static final char MARKER = '+'; + public final String status; + + public StatusReply(String status) { + this.status = status; + } + + public void write(ChannelBuffer os) throws IOException { + os.writeByte(MARKER); + os.writeBytes(status.getBytes(UTF_8)); + os.writeBytes(Command.CRLF); + } +} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/SubscribeReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/SubscribeReply.java new file mode 100644 index 0000000000..9d19decc34 --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/codec/redis/SubscribeReply.java @@ -0,0 +1,22 @@ +package org.jboss.netty.handler.codec.redis; + +import org.jboss.netty.buffer.ChannelBuffer; + +import java.io.IOException; + +public class SubscribeReply extends Reply { + + private final byte[][] patterns; + + public SubscribeReply(byte[][] patterns) { + this.patterns = patterns; + } + + public byte[][] getPatterns() { + return patterns; + } + + @Override + public void write(ChannelBuffer os) throws IOException { + } +} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/UnsubscribeReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/UnsubscribeReply.java new file mode 100644 index 0000000000..9a8d82e984 --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/codec/redis/UnsubscribeReply.java @@ -0,0 +1,23 @@ +package org.jboss.netty.handler.codec.redis; + +import org.jboss.netty.buffer.ChannelBuffer; + +import java.io.IOException; + +public class UnsubscribeReply extends Reply { + + private final byte[][] patterns; + + public UnsubscribeReply(byte[][] patterns) { + this.patterns = patterns; + } + + @Override + public void write(ChannelBuffer os) throws IOException { + + } + + public byte[][] getPatterns() { + return patterns; + } +} diff --git a/src/test/java/org/jboss/netty/handler/codec/redis/RedisClient.java b/src/test/java/org/jboss/netty/handler/codec/redis/RedisClient.java new file mode 100644 index 0000000000..1e7b21f9f5 --- /dev/null +++ b/src/test/java/org/jboss/netty/handler/codec/redis/RedisClient.java @@ -0,0 +1,53 @@ +package org.jboss.netty.handler.codec.redis; + +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; +import org.jboss.netty.handler.queue.BlockingReadHandler; + +import java.net.InetSocketAddress; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class RedisClient { + private static final byte[] VALUE = "value".getBytes(Reply.UTF_8); + + public static void main(String[] args) throws Exception { + ExecutorService executor = Executors.newCachedThreadPool(); + final ClientBootstrap cb = new ClientBootstrap(new NioClientSocketChannelFactory(executor, executor)); + final BlockingReadHandler blockingReadHandler = new BlockingReadHandler(); + cb.setPipelineFactory(new ChannelPipelineFactory() { + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline pipeline = Channels.pipeline(); + pipeline.addLast("redisEncoder", new RedisEncoder()); + pipeline.addLast("redisDecoder", new RedisDecoder()); + pipeline.addLast("result", blockingReadHandler); + return pipeline; + } + }); + ChannelFuture redis = cb.connect(new InetSocketAddress("localhost", 6379)); + redis.await().rethrowIfFailed(); + Channel channel = redis.getChannel(); + + channel.write(new Command("set", "1", "value")); + System.out.print(blockingReadHandler.read()); + channel.write(new Command("get", "1")); + System.out.print(blockingReadHandler.read()); + + int CALLS = 1000000; + long start = System.currentTimeMillis(); + for (int i = 0; i < CALLS; i++) { + channel.write(new Command("SET".getBytes(), Command.numToBytes(i), VALUE)); + blockingReadHandler.read(); + } + long end = System.currentTimeMillis(); + System.out.println(CALLS * 1000 / (end - start) + " calls per second"); + + channel.close(); + cb.releaseExternalResources(); + } +} From b3cdeff919879d60d49ef5eb8c6c4a6767fc199b Mon Sep 17 00:00:00 2001 From: Sam Pullara Date: Sun, 26 Feb 2012 10:23:24 -0800 Subject: [PATCH 2/4] suggestions from both repositories applied --- .../netty/handler/codec/redis/BulkReply.java | 7 +- .../netty/handler/codec/redis/Command.java | 73 ++++++++++--------- .../netty/handler/codec/redis/ErrorReply.java | 7 -- .../handler/codec/redis/IntegerReply.java | 3 +- .../handler/codec/redis/MultiBulkReply.java | 15 ++-- .../handler/codec/redis/RedisDecoder.java | 26 +++---- .../handler/codec/redis/RedisEncoder.java | 2 + .../handler/codec/redis/RedisClient.java | 63 ++++++++-------- 8 files changed, 90 insertions(+), 106 deletions(-) diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/BulkReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/BulkReply.java index 356bec6abe..d9907cbcd0 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/BulkReply.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/BulkReply.java @@ -15,12 +15,11 @@ public class BulkReply extends Reply { public void write(ChannelBuffer os) throws IOException { os.writeByte(MARKER); if (bytes == null) { - os.writeBytes(Command.NEG_ONE); + os.writeBytes(Command.NEG_ONE_AND_CRLF); } else { - os.writeBytes(Command.numToBytes(bytes.length)); - os.writeBytes(Command.CRLF); + os.writeBytes(Command.numAndCRLF(bytes.length)); os.writeBytes(bytes); + os.writeBytes(Command.CRLF); } - os.writeBytes(Command.CRLF); } } diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/Command.java b/src/main/java/org/jboss/netty/handler/codec/redis/Command.java index 61dff5c883..6d85cad59b 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/Command.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/Command.java @@ -16,7 +16,7 @@ public class Command { public static final byte[] CRLF = "\r\n".getBytes(); public static final byte[] BYTES_PREFIX = "$".getBytes(); public static final byte[] EMPTY_BYTES = new byte[0]; - public static final byte[] NEG_ONE = Command.numToBytes(-1); + public static final byte[] NEG_ONE_AND_CRLF = convertWithCRLF(-1); private byte[][] arguments; private Object[] objects; @@ -36,6 +36,7 @@ public class Command { public Command(byte[]... arguments) { this.arguments = arguments; + objects = arguments; } public Command(Object... objects) { @@ -64,57 +65,57 @@ public class Command { private static void writeDirect(ChannelBuffer os, byte[][] arguments) throws IOException { os.writeBytes(ARGS_PREFIX); - os.writeBytes(Command.numToBytes(arguments.length)); - os.writeBytes(CRLF); + os.writeBytes(numAndCRLF(arguments.length)); for (byte[] argument : arguments) { os.writeBytes(BYTES_PREFIX); - os.writeBytes(Command.numToBytes(argument.length)); - os.writeBytes(CRLF); + os.writeBytes(numAndCRLF(argument.length)); os.writeBytes(argument); os.writeBytes(CRLF); } } private static final int NUM_MAP_LENGTH = 256; - private static byte[][] numMap = new byte[NUM_MAP_LENGTH][]; - + private static byte[][] numAndCRLFMap = new byte[NUM_MAP_LENGTH][]; static { - for (int i = 0; i < NUM_MAP_LENGTH; i++) { - numMap[i] = convert(i); - } + for (int i = 0; i < NUM_MAP_LENGTH; i++) { + numAndCRLFMap[i] = convertWithCRLF(i); + } } // Optimized for the direct to ASCII bytes case // Could be even more optimized but it is already // about twice as fast as using Long.toString().getBytes() - public static byte[] numToBytes(long value) { - if (value >= 0 && value < NUM_MAP_LENGTH) { - return numMap[((int) value)]; - } else if (value == -1) { - return NEG_ONE; - } - return convert(value); + public static byte[] numAndCRLF(long value) { + if (value >= 0 && value < NUM_MAP_LENGTH) { + return numAndCRLFMap[(int) value]; + } else if (value == -1) { + return NEG_ONE_AND_CRLF; + } + return convertWithCRLF(value); } - private static byte[] convert(long value) { - boolean negative = value < 0; - int index = negative ? 2 : 1; - long current = negative ? -value : value; - while ((current /= 10) > 0) { - index++; - } - byte[] bytes = new byte[index]; - if (negative) { - bytes[0] = '-'; - } - current = negative ? -value : value; - long tmp = current; - while ((tmp /= 10) > 0) { - bytes[--index] = (byte) ('0' + (current % 10)); - current = tmp; - } - bytes[--index] = (byte) ('0' + current); - return bytes; + private static byte[] convertWithCRLF(long value) { + boolean negative = value < 0; + int index = negative ? 2 : 1; + long current = negative ? -value : value; + while ((current /= 10) > 0) { + index++; + } + byte[] bytes = new byte[index + 2]; + if (negative) { + bytes[0] = '-'; + } + current = negative ? -value : value; + long tmp = current; + while ((tmp /= 10) > 0) { + bytes[--index] = (byte) ('0' + (current % 10)); + current = tmp; + } + bytes[--index] = (byte) ('0' + current); + // add CRLF + bytes[bytes.length - 2] = '\r'; + bytes[bytes.length - 1] = '\n'; + return bytes; } } diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/ErrorReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/ErrorReply.java index 3e168183cb..a394339da1 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/ErrorReply.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/ErrorReply.java @@ -4,13 +4,6 @@ import org.jboss.netty.buffer.ChannelBuffer; import java.io.IOException; -/** - * Created by IntelliJ IDEA. - * User: sam - * Date: 7/29/11 - * Time: 10:23 AM - * To change this template use File | Settings | File Templates. - */ public class ErrorReply extends Reply { public static final char MARKER = '-'; private static final byte[] ERR = "ERR ".getBytes(UTF_8); diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/IntegerReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/IntegerReply.java index 615e774d2f..3e790a4d67 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/IntegerReply.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/IntegerReply.java @@ -14,7 +14,6 @@ public class IntegerReply extends Reply { public void write(ChannelBuffer os) throws IOException { os.writeByte(MARKER); - os.writeBytes(Command.numToBytes(integer)); - os.writeBytes(Command.CRLF); + os.writeBytes(Command.numAndCRLF(integer)); } } diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/MultiBulkReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/MultiBulkReply.java index a390be0630..3a06bf584e 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/MultiBulkReply.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/MultiBulkReply.java @@ -42,27 +42,24 @@ public class MultiBulkReply extends Reply { public void write(ChannelBuffer os) throws IOException { os.writeByte(MARKER); if (byteArrays == null) { - os.writeBytes(Command.NEG_ONE); - os.writeBytes(Command.CRLF); + os.writeBytes(Command.NEG_ONE_AND_CRLF); } else { - os.writeBytes(Command.numToBytes(byteArrays.length)); - os.writeBytes(Command.CRLF); + os.writeBytes(Command.numAndCRLF(byteArrays.length)); for (Object value : byteArrays) { if (value == null) { os.writeByte(BulkReply.MARKER); - os.writeBytes(Command.NEG_ONE); + os.writeBytes(Command.NEG_ONE_AND_CRLF); } else if (value instanceof byte[]) { byte[] bytes = (byte[]) value; os.writeByte(BulkReply.MARKER); int length = bytes.length; - os.writeBytes(Command.numToBytes(length)); - os.writeBytes(Command.CRLF); + os.writeBytes(Command.numAndCRLF(length)); os.writeBytes(bytes); + os.writeBytes(Command.CRLF); } else if (value instanceof Number) { os.writeByte(IntegerReply.MARKER); - os.writeBytes(Command.numToBytes(((Number) value).longValue())); + os.writeBytes(Command.numAndCRLF(((Number) value).longValue())); } - os.writeBytes(Command.CRLF); } } } diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/RedisDecoder.java b/src/main/java/org/jboss/netty/handler/codec/redis/RedisDecoder.java index 371f9aeaa0..13e7d37d77 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/RedisDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/RedisDecoder.java @@ -1,13 +1,13 @@ package org.jboss.netty.handler.codec.redis; import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBufferIndexFinder; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.replay.ReplayingDecoder; -import java.io.DataInputStream; import java.io.IOException; -import java.io.InputStream; +import java.nio.charset.Charset; enum State { @@ -18,6 +18,7 @@ public class RedisDecoder extends ReplayingDecoder { private static final char CR = '\r'; private static final char LF = '\n'; private static final char ZERO = '0'; + public static final Charset UTF_8 = Charset.forName("UTF-8"); // We track the current multibulk reply in the case // where we do not get a complete reply in a single @@ -74,10 +75,14 @@ public class RedisDecoder extends ReplayingDecoder { int code = is.readByte(); switch (code) { case StatusReply.MARKER: { - return new StatusReply(new DataInputStream(new ChannelBufferInputStream(is)).readLine()); + String status = is.readBytes(is.bytesBefore(ChannelBufferIndexFinder.CRLF)).toString(UTF_8); + is.skipBytes(2); + return new StatusReply(status); } case ErrorReply.MARKER: { - return new ErrorReply(new DataInputStream(new ChannelBufferInputStream(is)).readLine()); + String error = is.readBytes(is.bytesBefore(ChannelBufferIndexFinder.CRLF)).toString(UTF_8); + is.skipBytes(2); + return new ErrorReply(error); } case IntegerReply.MARKER: { return new IntegerReply(readInteger(is)); @@ -111,17 +116,4 @@ public class RedisDecoder extends ReplayingDecoder { reply.read(this, is); return reply; } - - private static class ChannelBufferInputStream extends InputStream { - private final ChannelBuffer is; - - public ChannelBufferInputStream(ChannelBuffer is) { - this.is = is; - } - - @Override - public int read() throws IOException { - return is.readByte(); - } - } } diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java b/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java index db63ebd974..7a84f2d865 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java @@ -35,6 +35,8 @@ public class RedisEncoder extends SimpleChannelDownstreamHandler { } }); Channels.write(ctx, future, cb); + } else { + super.writeRequested(ctx, e); } } } diff --git a/src/test/java/org/jboss/netty/handler/codec/redis/RedisClient.java b/src/test/java/org/jboss/netty/handler/codec/redis/RedisClient.java index 1e7b21f9f5..e42e60f896 100644 --- a/src/test/java/org/jboss/netty/handler/codec/redis/RedisClient.java +++ b/src/test/java/org/jboss/netty/handler/codec/redis/RedisClient.java @@ -14,40 +14,41 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class RedisClient { - private static final byte[] VALUE = "value".getBytes(Reply.UTF_8); + private static final byte[] VALUE = "value".getBytes(Reply.UTF_8); public static void main(String[] args) throws Exception { - ExecutorService executor = Executors.newCachedThreadPool(); - final ClientBootstrap cb = new ClientBootstrap(new NioClientSocketChannelFactory(executor, executor)); - final BlockingReadHandler blockingReadHandler = new BlockingReadHandler(); - cb.setPipelineFactory(new ChannelPipelineFactory() { - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); - pipeline.addLast("redisEncoder", new RedisEncoder()); - pipeline.addLast("redisDecoder", new RedisDecoder()); - pipeline.addLast("result", blockingReadHandler); - return pipeline; - } - }); - ChannelFuture redis = cb.connect(new InetSocketAddress("localhost", 6379)); - redis.await().rethrowIfFailed(); - Channel channel = redis.getChannel(); + ExecutorService executor = Executors.newCachedThreadPool(); + final ClientBootstrap cb = new ClientBootstrap(new NioClientSocketChannelFactory(executor, executor)); + final BlockingReadHandler blockingReadHandler = new BlockingReadHandler(); + cb.setPipelineFactory(new ChannelPipelineFactory() { + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline pipeline = Channels.pipeline(); + pipeline.addLast("redisEncoder", new RedisEncoder()); + pipeline.addLast("redisDecoder", new RedisDecoder()); + pipeline.addLast("result", blockingReadHandler); + return pipeline; + } + }); + ChannelFuture redis = cb.connect(new InetSocketAddress("localhost", 6379)); + redis.await().rethrowIfFailed(); + Channel channel = redis.getChannel(); - channel.write(new Command("set", "1", "value")); - System.out.print(blockingReadHandler.read()); - channel.write(new Command("get", "1")); - System.out.print(blockingReadHandler.read()); + channel.write(new Command("set", "1", "value")); + System.out.print(blockingReadHandler.read()); + channel.write(new Command("get", "1")); + System.out.print(blockingReadHandler.read()); - int CALLS = 1000000; - long start = System.currentTimeMillis(); - for (int i = 0; i < CALLS; i++) { - channel.write(new Command("SET".getBytes(), Command.numToBytes(i), VALUE)); - blockingReadHandler.read(); + int CALLS = 1000000; + long start = System.currentTimeMillis(); + byte[] SET_BYTES = "SET".getBytes(); + for (int i = 0; i < CALLS; i++) { + channel.write(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE)); + blockingReadHandler.read(); + } + long end = System.currentTimeMillis(); + System.out.println(CALLS * 1000 / (end - start) + " calls per second"); + + channel.close(); + cb.releaseExternalResources(); } - long end = System.currentTimeMillis(); - System.out.println(CALLS * 1000 / (end - start) + " calls per second"); - - channel.close(); - cb.releaseExternalResources(); - } } From a0a59d916dc447f9b6f1af02d6ebe2017aa32fbf Mon Sep 17 00:00:00 2001 From: Sam Pullara Date: Sat, 10 Mar 2012 12:04:05 -0800 Subject: [PATCH 3/4] passes the JDK5 and license stuff wow --- .../netty/handler/codec/redis/BulkReply.java | 15 ++++++++++++ .../netty/handler/codec/redis/Command.java | 17 ++++++++++++- .../netty/handler/codec/redis/ErrorReply.java | 19 +++++++++++++-- .../handler/codec/redis/IntegerReply.java | 15 ++++++++++++ .../handler/codec/redis/MultiBulkReply.java | 15 ++++++++++++ .../handler/codec/redis/PSubscribeReply.java | 15 ++++++++++++ .../codec/redis/PUnsubscribeReply.java | 15 ++++++++++++ .../handler/codec/redis/RedisDecoder.java | 23 ++++++++++++++---- .../handler/codec/redis/RedisEncoder.java | 15 ++++++++++++ .../netty/handler/codec/redis/Reply.java | 21 ++++++++++++---- .../handler/codec/redis/StatusReply.java | 17 ++++++++++++- .../handler/codec/redis/SubscribeReply.java | 15 ++++++++++++ .../handler/codec/redis/UnsubscribeReply.java | 15 ++++++++++++ .../handler/codec/redis/package-info.java | 24 +++++++++++++++++++ .../handler/codec/redis/RedisClient.java | 2 +- 15 files changed, 230 insertions(+), 13 deletions(-) create mode 100644 src/main/java/org/jboss/netty/handler/codec/redis/package-info.java diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/BulkReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/BulkReply.java index d9907cbcd0..352bed4c63 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/BulkReply.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/BulkReply.java @@ -1,3 +1,18 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ package org.jboss.netty.handler.codec.redis; import org.jboss.netty.buffer.ChannelBuffer; diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/Command.java b/src/main/java/org/jboss/netty/handler/codec/redis/Command.java index 6d85cad59b..1efc6cf822 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/Command.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/Command.java @@ -1,3 +1,18 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ package org.jboss.netty.handler.codec.redis; import org.jboss.netty.buffer.ChannelBuffer; @@ -57,7 +72,7 @@ public class Command { } else if (object instanceof byte[]) { arguments[i] = (byte[]) object; } else { - arguments[i] = object.toString().getBytes(Reply.UTF_8); + arguments[i] = object.toString().getBytes("UTF-8"); } } writeDirect(os, arguments); diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/ErrorReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/ErrorReply.java index a394339da1..f884b7942b 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/ErrorReply.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/ErrorReply.java @@ -1,3 +1,18 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ package org.jboss.netty.handler.codec.redis; import org.jboss.netty.buffer.ChannelBuffer; @@ -6,7 +21,7 @@ import java.io.IOException; public class ErrorReply extends Reply { public static final char MARKER = '-'; - private static final byte[] ERR = "ERR ".getBytes(UTF_8); + private static final byte[] ERR = "ERR ".getBytes(); public final String error; public ErrorReply(String error) { @@ -16,7 +31,7 @@ public class ErrorReply extends Reply { public void write(ChannelBuffer os) throws IOException { os.writeByte(MARKER); os.writeBytes(ERR); - os.writeBytes(error.getBytes(UTF_8)); + os.writeBytes(error.getBytes("UTF-8")); os.writeBytes(Command.CRLF); } } diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/IntegerReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/IntegerReply.java index 3e790a4d67..1af70a6e65 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/IntegerReply.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/IntegerReply.java @@ -1,3 +1,18 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ package org.jboss.netty.handler.codec.redis; import org.jboss.netty.buffer.ChannelBuffer; diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/MultiBulkReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/MultiBulkReply.java index 3a06bf584e..de1f84739b 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/MultiBulkReply.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/MultiBulkReply.java @@ -1,3 +1,18 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ package org.jboss.netty.handler.codec.redis; import org.jboss.netty.buffer.ChannelBuffer; diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/PSubscribeReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/PSubscribeReply.java index 2e68aaac1c..2ed1ae8f20 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/PSubscribeReply.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/PSubscribeReply.java @@ -1,3 +1,18 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ package org.jboss.netty.handler.codec.redis; public class PSubscribeReply extends SubscribeReply { diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/PUnsubscribeReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/PUnsubscribeReply.java index 445517395d..4b05d7af4c 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/PUnsubscribeReply.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/PUnsubscribeReply.java @@ -1,3 +1,18 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ package org.jboss.netty.handler.codec.redis; import org.jboss.netty.buffer.ChannelBuffer; diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/RedisDecoder.java b/src/main/java/org/jboss/netty/handler/codec/redis/RedisDecoder.java index 13e7d37d77..71a32e31c9 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/RedisDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/RedisDecoder.java @@ -1,3 +1,18 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ package org.jboss.netty.handler.codec.redis; import org.jboss.netty.buffer.ChannelBuffer; @@ -9,10 +24,6 @@ import org.jboss.netty.handler.codec.replay.ReplayingDecoder; import java.io.IOException; import java.nio.charset.Charset; -enum State { - -} - public class RedisDecoder extends ReplayingDecoder { private static final char CR = '\r'; @@ -117,3 +128,7 @@ public class RedisDecoder extends ReplayingDecoder { return reply; } } + +enum State { + +} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java b/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java index 7a84f2d865..ddb8153ef6 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java @@ -1,3 +1,18 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ package org.jboss.netty.handler.codec.redis; import org.jboss.netty.buffer.ChannelBuffer; diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/Reply.java b/src/main/java/org/jboss/netty/handler/codec/redis/Reply.java index d1a43e9c9e..fb1599a2ee 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/Reply.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/Reply.java @@ -1,10 +1,25 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ package org.jboss.netty.handler.codec.redis; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.util.CharsetUtil; import java.io.IOException; -import java.nio.charset.Charset; /** * Replies. @@ -15,8 +30,6 @@ import java.nio.charset.Charset; */ public abstract class Reply { - public static final Charset UTF_8 = Charset.forName("UTF-8"); - public abstract void write(ChannelBuffer os) throws IOException; public String toString() { @@ -26,7 +39,7 @@ public abstract class Reply { } catch (IOException e) { throw new AssertionError("Trustin says this won't happen either"); } - return channelBuffer.toString(UTF_8); + return channelBuffer.toString(CharsetUtil.UTF_8); } } diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/StatusReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/StatusReply.java index 678a08f3d0..bd433293f5 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/StatusReply.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/StatusReply.java @@ -1,3 +1,18 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ package org.jboss.netty.handler.codec.redis; import org.jboss.netty.buffer.ChannelBuffer; @@ -14,7 +29,7 @@ public class StatusReply extends Reply { public void write(ChannelBuffer os) throws IOException { os.writeByte(MARKER); - os.writeBytes(status.getBytes(UTF_8)); + os.writeBytes(status.getBytes("UTF-8")); os.writeBytes(Command.CRLF); } } diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/SubscribeReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/SubscribeReply.java index 9d19decc34..ab22cf1e43 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/SubscribeReply.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/SubscribeReply.java @@ -1,3 +1,18 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ package org.jboss.netty.handler.codec.redis; import org.jboss.netty.buffer.ChannelBuffer; diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/UnsubscribeReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/UnsubscribeReply.java index 9a8d82e984..30d37374be 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/UnsubscribeReply.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/UnsubscribeReply.java @@ -1,3 +1,18 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ package org.jboss.netty.handler.codec.redis; import org.jboss.netty.buffer.ChannelBuffer; diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/package-info.java b/src/main/java/org/jboss/netty/handler/codec/redis/package-info.java new file mode 100644 index 0000000000..477e00f632 --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/codec/redis/package-info.java @@ -0,0 +1,24 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * Encoder and decoder which transform a + * Redis protocol commands and replies + * into a {@link org.jboss.netty.buffer.ChannelBuffer} + * and vice versa. + * + */ +package org.jboss.netty.handler.codec.redis; diff --git a/src/test/java/org/jboss/netty/handler/codec/redis/RedisClient.java b/src/test/java/org/jboss/netty/handler/codec/redis/RedisClient.java index e42e60f896..9a0fbeb155 100644 --- a/src/test/java/org/jboss/netty/handler/codec/redis/RedisClient.java +++ b/src/test/java/org/jboss/netty/handler/codec/redis/RedisClient.java @@ -14,7 +14,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class RedisClient { - private static final byte[] VALUE = "value".getBytes(Reply.UTF_8); + private static final byte[] VALUE = "value".getBytes(); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); From f7a1ec61f2741cc7b89c848e0737be23dfae1f70 Mon Sep 17 00:00:00 2001 From: Sam Pullara Date: Sat, 10 Mar 2012 22:12:17 -0800 Subject: [PATCH 4/4] Fixed several bugs in the replay state implementation --- .../handler/codec/redis/MultiBulkReply.java | 16 ++- .../handler/codec/redis/RedisDecoder.java | 6 + .../handler/codec/redis/RedisCodecTest.java | 134 ++++++++++++++++++ 3 files changed, 154 insertions(+), 2 deletions(-) create mode 100644 src/test/java/org/jboss/netty/handler/codec/redis/RedisCodecTest.java diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/MultiBulkReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/MultiBulkReply.java index de1f84739b..3c81a6142a 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/MultiBulkReply.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/MultiBulkReply.java @@ -31,7 +31,19 @@ public class MultiBulkReply extends Reply { } public void read(RedisDecoder rd, ChannelBuffer is) throws IOException { - if (num == 0) { + // If we attempted to read the size before, skip the '*' and reread it + if (size == -1) { + byte star = is.readByte(); + if (star == MARKER) { + size = 0; + } else { + throw new AssertionError("Unexpected character in stream: " + star); + } + } + if (size == 0) { + // If the read fails, we need to skip the star + size = -1; + // Read the size, if this is successful we won't read the star again size = RedisDecoder.readInteger(is); byteArrays = new Object[size]; rd.checkpoint(); @@ -45,7 +57,7 @@ public class MultiBulkReply extends Reply { } else { throw new IOException("Unexpected character in stream: " + read); } - num = i; + num = i + 1; rd.checkpoint(); } } diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/RedisDecoder.java b/src/main/java/org/jboss/netty/handler/codec/redis/RedisDecoder.java index 71a32e31c9..5feda2d301 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/RedisDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/RedisDecoder.java @@ -83,6 +83,12 @@ public class RedisDecoder extends ReplayingDecoder { } public Reply receive(final ChannelBuffer is) throws IOException { + if (reply != null) { + reply.read(this, is); + Reply ret = reply; + reply = null; + return ret; + } int code = is.readByte(); switch (code) { case StatusReply.MARKER: { diff --git a/src/test/java/org/jboss/netty/handler/codec/redis/RedisCodecTest.java b/src/test/java/org/jboss/netty/handler/codec/redis/RedisCodecTest.java new file mode 100644 index 0000000000..53367f7e8a --- /dev/null +++ b/src/test/java/org/jboss/netty/handler/codec/redis/RedisCodecTest.java @@ -0,0 +1,134 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.handler.codec.redis; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.handler.codec.embedder.DecoderEmbedder; +import org.jboss.netty.util.CharsetUtil; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class RedisCodecTest { + + private DecoderEmbedder embedder; + + @Before + public void setUp() { + embedder = new DecoderEmbedder(new RedisDecoder()); + } + + @Test + public void decodeReplies() throws IOException { + { + Object receive = decode("+OK\r\n".getBytes()); + assertTrue(receive instanceof StatusReply); + assertEquals("OK", ((StatusReply) receive).status); + } + { + Object receive = decode("-ERROR\r\n".getBytes()); + assertTrue(receive instanceof ErrorReply); + assertEquals("ERROR", ((ErrorReply) receive).error); + } + { + Object receive = decode(":123\r\n".getBytes()); + assertTrue(receive instanceof IntegerReply); + assertEquals(123, ((IntegerReply) receive).integer); + } + { + Object receive = decode("$5\r\nnetty\r\n".getBytes()); + assertTrue(receive instanceof BulkReply); + assertEquals("netty", new String(((BulkReply) receive).bytes)); + } + { + Object receive = decode("*2\r\n$5\r\nnetty\r\n$5\r\nrules\r\n".getBytes()); + assertTrue(receive instanceof MultiBulkReply); + assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0])); + assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1])); + } + } + + private Object decode(byte[] bytes) { + embedder.offer(wrappedBuffer(bytes)); + return embedder.poll(); + } + + @Test + public void encodeCommands() throws IOException { + String setCommand = "*3\r\n" + + "$3\r\n" + + "SET\r\n" + + "$5\r\n" + + "mykey\r\n" + + "$7\r\n" + + "myvalue\r\n"; + Command command = new Command("SET", "mykey", "myvalue"); + ChannelBuffer cb = ChannelBuffers.dynamicBuffer(); + command.write(cb); + assertEquals(setCommand, cb.toString(CharsetUtil.US_ASCII)); + } + + @Test + public void testReplayDecoding() { + { + embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n".getBytes())); + Object receive = embedder.poll(); + assertNull(receive); + embedder.offer(wrappedBuffer("$5\r\nrules\r\n".getBytes())); + receive = embedder.poll(); + assertTrue(receive instanceof MultiBulkReply); + assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0])); + assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1])); + } + { + embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n$5\r\nr".getBytes())); + Object receive = embedder.poll(); + assertNull(receive); + embedder.offer(wrappedBuffer("ules\r\n".getBytes())); + receive = embedder.poll(); + assertTrue(receive instanceof MultiBulkReply); + assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0])); + assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1])); + } + { + embedder.offer(wrappedBuffer("*2".getBytes())); + Object receive = embedder.poll(); + assertNull(receive); + embedder.offer(wrappedBuffer("\r\n$5\r\nnetty\r\n$5\r\nrules\r\n".getBytes())); + receive = embedder.poll(); + assertTrue(receive instanceof MultiBulkReply); + assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0])); + assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1])); + } + { + embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n$5\r\nrules\r".getBytes())); + Object receive = embedder.poll(); + assertNull(receive); + embedder.offer(wrappedBuffer("\n".getBytes())); + receive = embedder.poll(); + assertTrue(receive instanceof MultiBulkReply); + assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0])); + assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1])); + } + } +}