diff --git a/codec/src/main/java/io/netty/handler/codec/redis/BulkReply.java b/codec/src/main/java/io/netty/handler/codec/redis/BulkReply.java
new file mode 100644
index 0000000000..8cac8b5120
--- /dev/null
+++ b/codec/src/main/java/io/netty/handler/codec/redis/BulkReply.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2011 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.handler.codec.redis;
+
+import io.netty.buffer.ChannelBuffer;
+
+import java.io.IOException;
+
+public class BulkReply extends Reply {
+ public static final char MARKER = '$';
+ public final byte[] bytes;
+
+ public BulkReply(byte[] bytes) {
+ this.bytes = bytes;
+ }
+
+ @Override
+ public void write(ChannelBuffer os) throws IOException {
+ os.writeByte(MARKER);
+ if (bytes == null) {
+ os.writeBytes(Command.NEG_ONE_AND_CRLF);
+ } else {
+ os.writeBytes(Command.numAndCRLF(bytes.length));
+ os.writeBytes(bytes);
+ os.writeBytes(Command.CRLF);
+ }
+ }
+}
diff --git a/codec/src/main/java/io/netty/handler/codec/redis/Command.java b/codec/src/main/java/io/netty/handler/codec/redis/Command.java
new file mode 100644
index 0000000000..1d17662251
--- /dev/null
+++ b/codec/src/main/java/io/netty/handler/codec/redis/Command.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2011 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.handler.codec.redis;
+
+import io.netty.buffer.ChannelBuffer;
+
+import java.io.IOException;
+
+/**
+ * Command serialization.
+ */
+public class Command {
+ public static final byte[] ARGS_PREFIX = "*".getBytes();
+ public static final byte[] CRLF = "\r\n".getBytes();
+ public static final byte[] BYTES_PREFIX = "$".getBytes();
+ public static final byte[] EMPTY_BYTES = new byte[0];
+ public static final byte[] NEG_ONE_AND_CRLF = convertWithCRLF(-1);
+
+ private byte[][] arguments;
+ private Object[] objects;
+
+ public String getName() {
+ if (arguments == null) {
+ Object o = objects[0];
+ if (o instanceof byte[]) {
+ return new String((byte[]) o);
+ } else {
+ return o.toString();
+ }
+ } else {
+ return new String(arguments[0]);
+ }
+ }
+
+ public Command(byte[]... arguments) {
+ this.arguments = arguments;
+ objects = arguments;
+ }
+
+ public Command(Object... objects) {
+ this.objects = objects;
+ }
+
+ public void write(ChannelBuffer os) throws IOException {
+ writeDirect(os, objects);
+ }
+
+ public static void writeDirect(ChannelBuffer os, Object... objects) throws IOException {
+ int length = objects.length;
+ byte[][] arguments = new byte[length][];
+ for (int i = 0; i < length; i++) {
+ Object object = objects[i];
+ if (object == null) {
+ arguments[i] = EMPTY_BYTES;
+ } else if (object instanceof byte[]) {
+ arguments[i] = (byte[]) object;
+ } else {
+ arguments[i] = object.toString().getBytes("UTF-8");
+ }
+ }
+ writeDirect(os, arguments);
+ }
+
+ private static void writeDirect(ChannelBuffer os, byte[][] arguments) throws IOException {
+ os.writeBytes(ARGS_PREFIX);
+ os.writeBytes(numAndCRLF(arguments.length));
+ for (byte[] argument : arguments) {
+ os.writeBytes(BYTES_PREFIX);
+ os.writeBytes(numAndCRLF(argument.length));
+ os.writeBytes(argument);
+ os.writeBytes(CRLF);
+ }
+ }
+
+ private static final int NUM_MAP_LENGTH = 256;
+ private static byte[][] numAndCRLFMap = new byte[NUM_MAP_LENGTH][];
+ static {
+ for (int i = 0; i < NUM_MAP_LENGTH; i++) {
+ numAndCRLFMap[i] = convertWithCRLF(i);
+ }
+ }
+
+ // Optimized for the direct to ASCII bytes case
+ // Could be even more optimized but it is already
+ // about twice as fast as using Long.toString().getBytes()
+ public static byte[] numAndCRLF(long value) {
+ if (value >= 0 && value < NUM_MAP_LENGTH) {
+ return numAndCRLFMap[(int) value];
+ } else if (value == -1) {
+ return NEG_ONE_AND_CRLF;
+ }
+ return convertWithCRLF(value);
+ }
+
+ private static byte[] convertWithCRLF(long value) {
+ boolean negative = value < 0;
+ int index = negative ? 2 : 1;
+ long current = negative ? -value : value;
+ while ((current /= 10) > 0) {
+ index++;
+ }
+ byte[] bytes = new byte[index + 2];
+ if (negative) {
+ bytes[0] = '-';
+ }
+ current = negative ? -value : value;
+ long tmp = current;
+ while ((tmp /= 10) > 0) {
+ bytes[--index] = (byte) ('0' + (current % 10));
+ current = tmp;
+ }
+ bytes[--index] = (byte) ('0' + current);
+ // add CRLF
+ bytes[bytes.length - 2] = '\r';
+ bytes[bytes.length - 1] = '\n';
+ return bytes;
+ }
+
+}
diff --git a/codec/src/main/java/io/netty/handler/codec/redis/ErrorReply.java b/codec/src/main/java/io/netty/handler/codec/redis/ErrorReply.java
new file mode 100644
index 0000000000..3bddd602cb
--- /dev/null
+++ b/codec/src/main/java/io/netty/handler/codec/redis/ErrorReply.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2011 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.handler.codec.redis;
+
+import io.netty.buffer.ChannelBuffer;
+
+import java.io.IOException;
+
+/**
+ * {@link Reply} which will be returned if an error was detected
+ *
+ *
+ */
+public class ErrorReply extends Reply {
+ public static final char MARKER = '-';
+ private static final byte[] ERR = "ERR ".getBytes();
+ public final ChannelBuffer error;
+
+ public ErrorReply(ChannelBuffer error) {
+ this.error = error;
+ }
+
+ @Override
+ public void write(ChannelBuffer os) throws IOException {
+ os.writeByte(MARKER);
+ os.writeBytes(ERR);
+ os.writeBytes(error, 0, error.readableBytes());
+ os.writeBytes(Command.CRLF);
+ }
+}
diff --git a/codec/src/main/java/io/netty/handler/codec/redis/IntegerReply.java b/codec/src/main/java/io/netty/handler/codec/redis/IntegerReply.java
new file mode 100644
index 0000000000..69a15f0f06
--- /dev/null
+++ b/codec/src/main/java/io/netty/handler/codec/redis/IntegerReply.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2011 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.handler.codec.redis;
+
+import io.netty.buffer.ChannelBuffer;
+
+import java.io.IOException;
+
+/**
+ * {@link Reply} which will get returned if a {@link Integer} was requested via GET
+ *
+ */
+public class IntegerReply extends Reply {
+ public static final char MARKER = ':';
+ public final long integer;
+
+ public IntegerReply(long integer) {
+ this.integer = integer;
+ }
+
+ @Override
+ public void write(ChannelBuffer os) throws IOException {
+ os.writeByte(MARKER);
+ os.writeBytes(Command.numAndCRLF(integer));
+ }
+}
diff --git a/codec/src/main/java/io/netty/handler/codec/redis/MultiBulkReply.java b/codec/src/main/java/io/netty/handler/codec/redis/MultiBulkReply.java
new file mode 100644
index 0000000000..abcd8db638
--- /dev/null
+++ b/codec/src/main/java/io/netty/handler/codec/redis/MultiBulkReply.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2011 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.handler.codec.redis;
+
+import io.netty.buffer.ChannelBuffer;
+
+import java.io.IOException;
+
+/**
+ * {@link Reply} which contains a bulk of {@link Reply}'s
+ *
+ *
+ */
+public class MultiBulkReply extends Reply {
+ public static final char MARKER = '*';
+
+ // State
+ public Object[] byteArrays;
+ private int size;
+ private int num;
+
+ public MultiBulkReply() {
+ }
+
+ public MultiBulkReply(Object... values) {
+ this.byteArrays = values;
+ }
+
+ public void read(RedisDecoder rd, ChannelBuffer is) throws IOException {
+ // If we attempted to read the size before, skip the '*' and reread it
+ if (size == -1) {
+ byte star = is.readByte();
+ if (star == MARKER) {
+ size = 0;
+ } else {
+ throw new AssertionError("Unexpected character in stream: " + star);
+ }
+ }
+ if (size == 0) {
+ // If the read fails, we need to skip the star
+ size = -1;
+ // Read the size, if this is successful we won't read the star again
+ size = RedisDecoder.readInteger(is);
+ byteArrays = new Object[size];
+ rd.checkpoint();
+ }
+ for (int i = num; i < size; i++) {
+ int read = is.readByte();
+ if (read == BulkReply.MARKER) {
+ byteArrays[i] = RedisDecoder.readBytes(is);
+ } else if (read == IntegerReply.MARKER) {
+ byteArrays[i] = RedisDecoder.readInteger(is);
+ } else {
+ throw new IOException("Unexpected character in stream: " + read);
+ }
+ num = i + 1;
+ rd.checkpoint();
+ }
+ }
+
+ @Override
+ public void write(ChannelBuffer os) throws IOException {
+ os.writeByte(MARKER);
+ if (byteArrays == null) {
+ os.writeBytes(Command.NEG_ONE_AND_CRLF);
+ } else {
+ os.writeBytes(Command.numAndCRLF(byteArrays.length));
+ for (Object value : byteArrays) {
+ if (value == null) {
+ os.writeByte(BulkReply.MARKER);
+ os.writeBytes(Command.NEG_ONE_AND_CRLF);
+ } else if (value instanceof byte[]) {
+ byte[] bytes = (byte[]) value;
+ os.writeByte(BulkReply.MARKER);
+ int length = bytes.length;
+ os.writeBytes(Command.numAndCRLF(length));
+ os.writeBytes(bytes);
+ os.writeBytes(Command.CRLF);
+ } else if (value instanceof Number) {
+ os.writeByte(IntegerReply.MARKER);
+ os.writeBytes(Command.numAndCRLF(((Number) value).longValue()));
+ }
+ }
+ }
+ }
+}
diff --git a/codec/src/main/java/io/netty/handler/codec/redis/PSubscribeReply.java b/codec/src/main/java/io/netty/handler/codec/redis/PSubscribeReply.java
new file mode 100644
index 0000000000..c4b2ebb8ba
--- /dev/null
+++ b/codec/src/main/java/io/netty/handler/codec/redis/PSubscribeReply.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2011 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.handler.codec.redis;
+
+public class PSubscribeReply extends SubscribeReply {
+
+ public PSubscribeReply(byte[][] patterns) {
+ super(patterns);
+ }
+
+}
diff --git a/codec/src/main/java/io/netty/handler/codec/redis/PUnsubscribeReply.java b/codec/src/main/java/io/netty/handler/codec/redis/PUnsubscribeReply.java
new file mode 100644
index 0000000000..f1bb34caf0
--- /dev/null
+++ b/codec/src/main/java/io/netty/handler/codec/redis/PUnsubscribeReply.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2011 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.handler.codec.redis;
+
+import io.netty.buffer.ChannelBuffer;
+
+import java.io.IOException;
+
+public class PUnsubscribeReply extends UnsubscribeReply {
+
+ public PUnsubscribeReply(byte[][] patterns) {
+ super(patterns);
+ }
+
+ @Override
+ public void write(ChannelBuffer os) throws IOException {
+ // Do nothing
+ }
+}
diff --git a/codec/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java b/codec/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java
new file mode 100644
index 0000000000..96fa44ad6d
--- /dev/null
+++ b/codec/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2011 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.handler.codec.redis;
+
+import io.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ChannelBufferIndexFinder;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.replay.ReplayingDecoder;
+
+import java.io.IOException;
+
+/**
+ * {@link ReplayingDecoder} which handles Redis protocol
+ *
+ *
+ */
+public class RedisDecoder extends ReplayingDecoder {
+
+ private static final char CR = '\r';
+ private static final char LF = '\n';
+ private static final char ZERO = '0';
+
+ // We track the current multibulk reply in the case
+ // where we do not get a complete reply in a single
+ // decode invocation.
+ private MultiBulkReply reply;
+
+ /**
+ * Return a byte array which contains only the content of the request. The size of the content is read from the given {@link ChannelBuffer}
+ * via the {@link #readInteger(ChannelBuffer)} method
+ *
+ * @param is the {@link ChannelBuffer} to read from
+ * @return content
+ * @throws IOException is thrown if the line-ending is not CRLF
+ */
+ public static byte[] readBytes(ChannelBuffer is) throws IOException {
+ int size = readInteger(is);
+ if (size == -1) {
+ return null;
+ }
+ byte[] bytes = new byte[size];
+ is.readBytes(bytes, 0, size);
+ int cr = is.readByte();
+ int lf = is.readByte();
+ if (cr != CR || lf != LF) {
+ throw new IOException("Improper line ending: " + cr + ", " + lf);
+ }
+ return bytes;
+ }
+
+ /**
+ * Read an {@link Integer} from the {@link ChannelBuffer}
+ *
+ * @param is
+ * @return integer
+ * @throws IOException
+ */
+ public static int readInteger(ChannelBuffer is) throws IOException {
+ int size = 0;
+ int sign = 1;
+ int read = is.readByte();
+ if (read == '-') {
+ read = is.readByte();
+ sign = -1;
+ }
+ do {
+ if (read == CR) {
+ if (is.readByte() == LF) {
+ break;
+ }
+ }
+ int value = read - ZERO;
+ if (value >= 0 && value < 10) {
+ size *= 10;
+ size += value;
+ } else {
+ throw new IOException("Invalid character in integer");
+ }
+ read = is.readByte();
+ } while (true);
+ return size * sign;
+ }
+
+ @Override
+ public void checkpoint() {
+ super.checkpoint();
+ }
+
+ @Override
+ protected Object decode(ChannelHandlerContext channelHandlerContext, Channel channel, ChannelBuffer channelBuffer, State anEnum) throws Exception {
+ if (reply != null) {
+ reply.read(this, channelBuffer);
+ Reply ret = reply;
+ reply = null;
+ return ret;
+ }
+ int code = channelBuffer.readByte();
+ switch (code) {
+ case StatusReply.MARKER: {
+ ChannelBuffer status = channelBuffer.readBytes(channelBuffer.bytesBefore(ChannelBufferIndexFinder.CRLF));
+ channelBuffer.skipBytes(2);
+ return new StatusReply(status);
+ }
+ case ErrorReply.MARKER: {
+ ChannelBuffer error = channelBuffer.readBytes(channelBuffer.bytesBefore(ChannelBufferIndexFinder.CRLF));
+ channelBuffer.skipBytes(2);
+ return new ErrorReply(error);
+ }
+ case IntegerReply.MARKER: {
+ return new IntegerReply(readInteger(channelBuffer));
+ }
+ case BulkReply.MARKER: {
+ return new BulkReply(readBytes(channelBuffer));
+ }
+ case MultiBulkReply.MARKER: {
+ return decodeMultiBulkReply(channelBuffer);
+ }
+ default: {
+ throw new IOException("Unexpected character in stream: " + code);
+ }
+ }
+ }
+
+ private MultiBulkReply decodeMultiBulkReply(ChannelBuffer is) throws IOException {
+ if (reply == null) {
+ reply = new MultiBulkReply();
+ }
+ reply.read(this, is);
+ return reply;
+ }
+}
+
+enum State {
+
+}
diff --git a/codec/src/main/java/io/netty/handler/codec/redis/RedisEncoder.java b/codec/src/main/java/io/netty/handler/codec/redis/RedisEncoder.java
new file mode 100644
index 0000000000..f7f4710ab8
--- /dev/null
+++ b/codec/src/main/java/io/netty/handler/codec/redis/RedisEncoder.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2011 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.handler.codec.redis;
+
+import io.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ChannelBuffers;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.Channels;
+import io.netty.channel.MessageEvent;
+import io.netty.channel.SimpleChannelDownstreamHandler;
+import io.netty.channel.ChannelHandler.Sharable;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * {@link SimpleChannelDownstreamHandler} which encodes {@link Command}'s to {@link ChannelBuffer}'s
+ *
+ *
+ */
+@Sharable
+public class RedisEncoder extends SimpleChannelDownstreamHandler {
+
+ private final Queue pool;
+
+ /**
+ * Calls {@link #RedisEncoder(boolean)} with false
+ */
+ public RedisEncoder() {
+ this(false);
+ }
+
+ /**
+ * Create a new {@link RedisEncoder} instance
+ *
+ * @param poolBuffers true
if the {@link ChannelBuffer}'s should be pooled. This should be used with caution as this
+ * can lead to unnecessary big memory consummation if one of the written values is very big and the rest is very small.
+ */
+ public RedisEncoder(boolean poolBuffers) {
+ if (poolBuffers) {
+ pool = new ConcurrentLinkedQueue();
+ } else {
+ pool = null;
+ }
+ }
+
+
+ @Override
+ public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ Object o = e.getMessage();
+ if (o instanceof Command) {
+ Command command = (Command) o;
+ ChannelBuffer cb = null;
+ if (pool != null) {
+ cb = pool.poll();
+ }
+ if (cb == null) {
+ cb = ChannelBuffers.dynamicBuffer();
+ }
+ command.write(cb);
+ ChannelFuture future = e.getFuture();
+
+ if (pool != null) {
+ final ChannelBuffer finalCb = cb;
+ future.addListener(new ChannelFutureListener() {
+ public void operationComplete(ChannelFuture channelFuture) throws Exception {
+ finalCb.clear();
+ pool.add(finalCb);
+ }
+ });
+ }
+ Channels.write(ctx, future, cb);
+ } else {
+ super.writeRequested(ctx, e);
+ }
+ }
+}
diff --git a/codec/src/main/java/io/netty/handler/codec/redis/Reply.java b/codec/src/main/java/io/netty/handler/codec/redis/Reply.java
new file mode 100644
index 0000000000..74e87e1250
--- /dev/null
+++ b/codec/src/main/java/io/netty/handler/codec/redis/Reply.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2011 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.handler.codec.redis;
+
+import io.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ChannelBuffers;
+import io.netty.util.CharsetUtil;
+
+import java.io.IOException;
+
+/**
+ * {@link Reply} which was sent as a Response to the written {@link Command}
+ * *
+ */
+public abstract class Reply {
+
+ /**
+ * Write the content of the {@link Reply} to the given {@link ChannelBuffer}
+ */
+ public abstract void write(ChannelBuffer os) throws IOException;
+
+ @Override
+ public String toString() {
+ ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer();
+ try {
+ write(channelBuffer);
+ } catch (IOException e) {
+ throw new AssertionError("Trustin says this won't happen either");
+ }
+ return channelBuffer.toString(CharsetUtil.UTF_8);
+ }
+
+}
diff --git a/codec/src/main/java/io/netty/handler/codec/redis/StatusReply.java b/codec/src/main/java/io/netty/handler/codec/redis/StatusReply.java
new file mode 100644
index 0000000000..3bdfda6098
--- /dev/null
+++ b/codec/src/main/java/io/netty/handler/codec/redis/StatusReply.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2011 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.handler.codec.redis;
+
+import io.netty.buffer.ChannelBuffer;
+
+import java.io.IOException;
+
+/**
+ * {@link Reply} which contains the status
+ *
+ */
+public class StatusReply extends Reply {
+ public static final char MARKER = '+';
+ public final ChannelBuffer status;
+
+ public StatusReply(ChannelBuffer status) {
+ this.status = status;
+ }
+
+ @Override
+ public void write(ChannelBuffer os) throws IOException {
+ os.writeByte(MARKER);
+ os.writeBytes(status, 0, status.readableBytes());
+ os.writeBytes(Command.CRLF);
+ }
+}
diff --git a/codec/src/main/java/io/netty/handler/codec/redis/SubscribeReply.java b/codec/src/main/java/io/netty/handler/codec/redis/SubscribeReply.java
new file mode 100644
index 0000000000..7783dd817a
--- /dev/null
+++ b/codec/src/main/java/io/netty/handler/codec/redis/SubscribeReply.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2011 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.handler.codec.redis;
+
+import io.netty.buffer.ChannelBuffer;
+
+import java.io.IOException;
+
+public class SubscribeReply extends Reply {
+
+ private final byte[][] patterns;
+
+ public SubscribeReply(byte[][] patterns) {
+ this.patterns = patterns;
+ }
+
+ public byte[][] getPatterns() {
+ return patterns;
+ }
+
+ @Override
+ public void write(ChannelBuffer os) throws IOException {
+ // Do nothing
+ }
+}
diff --git a/codec/src/main/java/io/netty/handler/codec/redis/UnsubscribeReply.java b/codec/src/main/java/io/netty/handler/codec/redis/UnsubscribeReply.java
new file mode 100644
index 0000000000..9b0f4de2be
--- /dev/null
+++ b/codec/src/main/java/io/netty/handler/codec/redis/UnsubscribeReply.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2011 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.handler.codec.redis;
+
+import io.netty.buffer.ChannelBuffer;
+
+import java.io.IOException;
+
+public class UnsubscribeReply extends Reply {
+
+ private final byte[][] patterns;
+
+ public UnsubscribeReply(byte[][] patterns) {
+ this.patterns = patterns;
+ }
+
+ @Override
+ public void write(ChannelBuffer os) throws IOException {
+
+ }
+
+ public byte[][] getPatterns() {
+ return patterns;
+ }
+}
diff --git a/codec/src/main/java/io/netty/handler/codec/redis/package-info.java b/codec/src/main/java/io/netty/handler/codec/redis/package-info.java
new file mode 100644
index 0000000000..e0744cd5d0
--- /dev/null
+++ b/codec/src/main/java/io/netty/handler/codec/redis/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2011 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Encoder and decoder which transform a
+ * Redis protocol commands and replies
+ * into a {@link org.jboss.netty.buffer.ChannelBuffer}
+ * and vice versa.
+ *
+ */
+package io.netty.handler.codec.redis;
diff --git a/codec/src/test/java/io/netty/handler/codec/redis/RedisCodecTest.java b/codec/src/test/java/io/netty/handler/codec/redis/RedisCodecTest.java
new file mode 100644
index 0000000000..fdf5309940
--- /dev/null
+++ b/codec/src/test/java/io/netty/handler/codec/redis/RedisCodecTest.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2011 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.handler.codec.redis;
+
+import io.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ChannelBuffers;
+import io.netty.handler.codec.embedder.DecoderEmbedder;
+import io.netty.util.CharsetUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static io.netty.buffer.ChannelBuffers.wrappedBuffer;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class RedisCodecTest {
+
+ private DecoderEmbedder embedder;
+
+ @Before
+ public void setUp() {
+ embedder = new DecoderEmbedder(new RedisDecoder());
+ }
+
+ @Test
+ public void decodeReplies() throws IOException {
+ {
+ Object receive = decode("+OK\r\n".getBytes());
+ assertTrue(receive instanceof StatusReply);
+ assertEquals("OK", ((StatusReply) receive).status.toString(CharsetUtil.UTF_8));
+ }
+ {
+ Object receive = decode("-ERROR\r\n".getBytes());
+ assertTrue(receive instanceof ErrorReply);
+ assertEquals("ERROR", ((ErrorReply) receive).error.toString(CharsetUtil.UTF_8));
+ }
+ {
+ Object receive = decode(":123\r\n".getBytes());
+ assertTrue(receive instanceof IntegerReply);
+ assertEquals(123, ((IntegerReply) receive).integer);
+ }
+ {
+ Object receive = decode("$5\r\nnetty\r\n".getBytes());
+ assertTrue(receive instanceof BulkReply);
+ assertEquals("netty", new String(((BulkReply) receive).bytes));
+ }
+ {
+ Object receive = decode("*2\r\n$5\r\nnetty\r\n$5\r\nrules\r\n".getBytes());
+ assertTrue(receive instanceof MultiBulkReply);
+ assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
+ assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
+ }
+ }
+
+ private Object decode(byte[] bytes) {
+ embedder.offer(wrappedBuffer(bytes));
+ return embedder.poll();
+ }
+
+ @Test
+ public void encodeCommands() throws IOException {
+ String setCommand = "*3\r\n" +
+ "$3\r\n" +
+ "SET\r\n" +
+ "$5\r\n" +
+ "mykey\r\n" +
+ "$7\r\n" +
+ "myvalue\r\n";
+ Command command = new Command("SET", "mykey", "myvalue");
+ ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
+ command.write(cb);
+ assertEquals(setCommand, cb.toString(CharsetUtil.US_ASCII));
+ }
+
+ @Test
+ public void testReplayDecoding() {
+ {
+ embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n".getBytes()));
+ Object receive = embedder.poll();
+ assertNull(receive);
+ embedder.offer(wrappedBuffer("$5\r\nrules\r\n".getBytes()));
+ receive = embedder.poll();
+ assertTrue(receive instanceof MultiBulkReply);
+ assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
+ assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
+ }
+ {
+ embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n$5\r\nr".getBytes()));
+ Object receive = embedder.poll();
+ assertNull(receive);
+ embedder.offer(wrappedBuffer("ules\r\n".getBytes()));
+ receive = embedder.poll();
+ assertTrue(receive instanceof MultiBulkReply);
+ assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
+ assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
+ }
+ {
+ embedder.offer(wrappedBuffer("*2".getBytes()));
+ Object receive = embedder.poll();
+ assertNull(receive);
+ embedder.offer(wrappedBuffer("\r\n$5\r\nnetty\r\n$5\r\nrules\r\n".getBytes()));
+ receive = embedder.poll();
+ assertTrue(receive instanceof MultiBulkReply);
+ assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
+ assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
+ }
+ {
+ embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n$5\r\nrules\r".getBytes()));
+ Object receive = embedder.poll();
+ assertNull(receive);
+ embedder.offer(wrappedBuffer("\n".getBytes()));
+ receive = embedder.poll();
+ assertTrue(receive instanceof MultiBulkReply);
+ assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
+ assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
+ }
+ }
+}