Merge pull request #201 from spullara/redis_codec

Add Redis client codec
This commit is contained in:
Norman Maurer 2012-03-11 12:38:31 -07:00
commit 143ba76176
16 changed files with 960 additions and 0 deletions

View File

@ -0,0 +1,40 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.codec.redis;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException;
public class BulkReply extends Reply {
public static final char MARKER = '$';
public final byte[] bytes;
public BulkReply(byte[] bytes) {
this.bytes = bytes;
}
public void write(ChannelBuffer os) throws IOException {
os.writeByte(MARKER);
if (bytes == null) {
os.writeBytes(Command.NEG_ONE_AND_CRLF);
} else {
os.writeBytes(Command.numAndCRLF(bytes.length));
os.writeBytes(bytes);
os.writeBytes(Command.CRLF);
}
}
}

View File

@ -0,0 +1,136 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.codec.redis;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException;
/**
* Command serialization.
* User: sam
* Date: 7/27/11
* Time: 3:04 PM
* To change this template use File | Settings | File Templates.
*/
public class Command {
public static final byte[] ARGS_PREFIX = "*".getBytes();
public static final byte[] CRLF = "\r\n".getBytes();
public static final byte[] BYTES_PREFIX = "$".getBytes();
public static final byte[] EMPTY_BYTES = new byte[0];
public static final byte[] NEG_ONE_AND_CRLF = convertWithCRLF(-1);
private byte[][] arguments;
private Object[] objects;
public String getName() {
if (arguments == null) {
Object o = objects[0];
if (o instanceof byte[]) {
return new String((byte[]) o);
} else {
return o.toString();
}
} else {
return new String(arguments[0]);
}
}
public Command(byte[]... arguments) {
this.arguments = arguments;
objects = arguments;
}
public Command(Object... objects) {
this.objects = objects;
}
public void write(ChannelBuffer os) throws IOException {
writeDirect(os, objects);
}
public static void writeDirect(ChannelBuffer os, Object... objects) throws IOException {
int length = objects.length;
byte[][] arguments = new byte[length][];
for (int i = 0; i < length; i++) {
Object object = objects[i];
if (object == null) {
arguments[i] = EMPTY_BYTES;
} else if (object instanceof byte[]) {
arguments[i] = (byte[]) object;
} else {
arguments[i] = object.toString().getBytes("UTF-8");
}
}
writeDirect(os, arguments);
}
private static void writeDirect(ChannelBuffer os, byte[][] arguments) throws IOException {
os.writeBytes(ARGS_PREFIX);
os.writeBytes(numAndCRLF(arguments.length));
for (byte[] argument : arguments) {
os.writeBytes(BYTES_PREFIX);
os.writeBytes(numAndCRLF(argument.length));
os.writeBytes(argument);
os.writeBytes(CRLF);
}
}
private static final int NUM_MAP_LENGTH = 256;
private static byte[][] numAndCRLFMap = new byte[NUM_MAP_LENGTH][];
static {
for (int i = 0; i < NUM_MAP_LENGTH; i++) {
numAndCRLFMap[i] = convertWithCRLF(i);
}
}
// Optimized for the direct to ASCII bytes case
// Could be even more optimized but it is already
// about twice as fast as using Long.toString().getBytes()
public static byte[] numAndCRLF(long value) {
if (value >= 0 && value < NUM_MAP_LENGTH) {
return numAndCRLFMap[(int) value];
} else if (value == -1) {
return NEG_ONE_AND_CRLF;
}
return convertWithCRLF(value);
}
private static byte[] convertWithCRLF(long value) {
boolean negative = value < 0;
int index = negative ? 2 : 1;
long current = negative ? -value : value;
while ((current /= 10) > 0) {
index++;
}
byte[] bytes = new byte[index + 2];
if (negative) {
bytes[0] = '-';
}
current = negative ? -value : value;
long tmp = current;
while ((tmp /= 10) > 0) {
bytes[--index] = (byte) ('0' + (current % 10));
current = tmp;
}
bytes[--index] = (byte) ('0' + current);
// add CRLF
bytes[bytes.length - 2] = '\r';
bytes[bytes.length - 1] = '\n';
return bytes;
}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.codec.redis;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException;
public class ErrorReply extends Reply {
public static final char MARKER = '-';
private static final byte[] ERR = "ERR ".getBytes();
public final String error;
public ErrorReply(String error) {
this.error = error;
}
public void write(ChannelBuffer os) throws IOException {
os.writeByte(MARKER);
os.writeBytes(ERR);
os.writeBytes(error.getBytes("UTF-8"));
os.writeBytes(Command.CRLF);
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.codec.redis;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException;
public class IntegerReply extends Reply {
public static final char MARKER = ':';
public final long integer;
public IntegerReply(long integer) {
this.integer = integer;
}
public void write(ChannelBuffer os) throws IOException {
os.writeByte(MARKER);
os.writeBytes(Command.numAndCRLF(integer));
}
}

View File

@ -0,0 +1,93 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.codec.redis;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException;
public class MultiBulkReply extends Reply {
public static final char MARKER = '*';
// State
public Object[] byteArrays;
private int size;
private int num;
public MultiBulkReply() {
}
public void read(RedisDecoder rd, ChannelBuffer is) throws IOException {
// If we attempted to read the size before, skip the '*' and reread it
if (size == -1) {
byte star = is.readByte();
if (star == MARKER) {
size = 0;
} else {
throw new AssertionError("Unexpected character in stream: " + star);
}
}
if (size == 0) {
// If the read fails, we need to skip the star
size = -1;
// Read the size, if this is successful we won't read the star again
size = RedisDecoder.readInteger(is);
byteArrays = new Object[size];
rd.checkpoint();
}
for (int i = num; i < size; i++) {
int read = is.readByte();
if (read == BulkReply.MARKER) {
byteArrays[i] = rd.readBytes(is);
} else if (read == IntegerReply.MARKER) {
byteArrays[i] = RedisDecoder.readInteger(is);
} else {
throw new IOException("Unexpected character in stream: " + read);
}
num = i + 1;
rd.checkpoint();
}
}
public MultiBulkReply(Object... values) {
this.byteArrays = values;
}
public void write(ChannelBuffer os) throws IOException {
os.writeByte(MARKER);
if (byteArrays == null) {
os.writeBytes(Command.NEG_ONE_AND_CRLF);
} else {
os.writeBytes(Command.numAndCRLF(byteArrays.length));
for (Object value : byteArrays) {
if (value == null) {
os.writeByte(BulkReply.MARKER);
os.writeBytes(Command.NEG_ONE_AND_CRLF);
} else if (value instanceof byte[]) {
byte[] bytes = (byte[]) value;
os.writeByte(BulkReply.MARKER);
int length = bytes.length;
os.writeBytes(Command.numAndCRLF(length));
os.writeBytes(bytes);
os.writeBytes(Command.CRLF);
} else if (value instanceof Number) {
os.writeByte(IntegerReply.MARKER);
os.writeBytes(Command.numAndCRLF(((Number) value).longValue()));
}
}
}
}
}

View File

@ -0,0 +1,24 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.codec.redis;
public class PSubscribeReply extends SubscribeReply {
public PSubscribeReply(byte[][] patterns) {
super(patterns);
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.codec.redis;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException;
public class PUnsubscribeReply extends UnsubscribeReply {
public PUnsubscribeReply(byte[][] patterns) {
super(patterns);
}
@Override
public void write(ChannelBuffer os) throws IOException {
}
}

View File

@ -0,0 +1,140 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.codec.redis;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferIndexFinder;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
import java.io.IOException;
import java.nio.charset.Charset;
public class RedisDecoder extends ReplayingDecoder<State> {
private static final char CR = '\r';
private static final char LF = '\n';
private static final char ZERO = '0';
public static final Charset UTF_8 = Charset.forName("UTF-8");
// We track the current multibulk reply in the case
// where we do not get a complete reply in a single
// decode invocation.
private MultiBulkReply reply;
public byte[] readBytes(ChannelBuffer is) throws IOException {
int size = readInteger(is);
if (size == -1) {
return null;
}
if (super.actualReadableBytes() < size + 2) {
// Trigger error
is.skipBytes(size + 2);
throw new AssertionError("Trustin says this isn't possible");
}
byte[] bytes = new byte[size];
is.readBytes(bytes, 0, size);
int cr = is.readByte();
int lf = is.readByte();
if (cr != CR || lf != LF) {
throw new IOException("Improper line ending: " + cr + ", " + lf);
}
return bytes;
}
public static int readInteger(ChannelBuffer is) throws IOException {
int size = 0;
int sign = 1;
int read = is.readByte();
if (read == '-') {
read = is.readByte();
sign = -1;
}
do {
if (read == CR) {
if (is.readByte() == LF) {
break;
}
}
int value = read - ZERO;
if (value >= 0 && value < 10) {
size *= 10;
size += value;
} else {
throw new IOException("Invalid character in integer");
}
read = is.readByte();
} while (true);
return size * sign;
}
public Reply receive(final ChannelBuffer is) throws IOException {
if (reply != null) {
reply.read(this, is);
Reply ret = reply;
reply = null;
return ret;
}
int code = is.readByte();
switch (code) {
case StatusReply.MARKER: {
String status = is.readBytes(is.bytesBefore(ChannelBufferIndexFinder.CRLF)).toString(UTF_8);
is.skipBytes(2);
return new StatusReply(status);
}
case ErrorReply.MARKER: {
String error = is.readBytes(is.bytesBefore(ChannelBufferIndexFinder.CRLF)).toString(UTF_8);
is.skipBytes(2);
return new ErrorReply(error);
}
case IntegerReply.MARKER: {
return new IntegerReply(readInteger(is));
}
case BulkReply.MARKER: {
return new BulkReply(readBytes(is));
}
case MultiBulkReply.MARKER: {
return decodeMultiBulkReply(is);
}
default: {
throw new IOException("Unexpected character in stream: " + code);
}
}
}
@Override
public void checkpoint() {
super.checkpoint();
}
@Override
protected Object decode(ChannelHandlerContext channelHandlerContext, Channel channel, ChannelBuffer channelBuffer, State anEnum) throws Exception {
return receive(channelBuffer);
}
public MultiBulkReply decodeMultiBulkReply(ChannelBuffer is) throws IOException {
if (reply == null) {
reply = new MultiBulkReply();
}
reply.read(this, is);
return reply;
}
}
enum State {
}

View File

@ -0,0 +1,57 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.codec.redis;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
public class RedisEncoder extends SimpleChannelDownstreamHandler {
private Queue<ChannelBuffer> pool = new ConcurrentLinkedQueue<ChannelBuffer>();
@Override
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Object o = e.getMessage();
if (o instanceof Command) {
Command command = (Command) o;
ChannelBuffer cb = pool.poll();
if (cb == null) {
cb = ChannelBuffers.dynamicBuffer();
}
command.write(cb);
ChannelFuture future = e.getFuture();
final ChannelBuffer finalCb = cb;
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture channelFuture) throws Exception {
finalCb.clear();
pool.add(finalCb);
}
});
Channels.write(ctx, future, cb);
} else {
super.writeRequested(ctx, e);
}
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.codec.redis;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.util.CharsetUtil;
import java.io.IOException;
/**
* Replies.
* User: sam
* Date: 7/27/11
* Time: 3:04 PM
* To change this template use File | Settings | File Templates.
*/
public abstract class Reply {
public abstract void write(ChannelBuffer os) throws IOException;
public String toString() {
ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer();
try {
write(channelBuffer);
} catch (IOException e) {
throw new AssertionError("Trustin says this won't happen either");
}
return channelBuffer.toString(CharsetUtil.UTF_8);
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.codec.redis;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException;
public class StatusReply extends Reply {
public static final char MARKER = '+';
public final String status;
public StatusReply(String status) {
this.status = status;
}
public void write(ChannelBuffer os) throws IOException {
os.writeByte(MARKER);
os.writeBytes(status.getBytes("UTF-8"));
os.writeBytes(Command.CRLF);
}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.codec.redis;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException;
public class SubscribeReply extends Reply {
private final byte[][] patterns;
public SubscribeReply(byte[][] patterns) {
this.patterns = patterns;
}
public byte[][] getPatterns() {
return patterns;
}
@Override
public void write(ChannelBuffer os) throws IOException {
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.codec.redis;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException;
public class UnsubscribeReply extends Reply {
private final byte[][] patterns;
public UnsubscribeReply(byte[][] patterns) {
this.patterns = patterns;
}
@Override
public void write(ChannelBuffer os) throws IOException {
}
public byte[][] getPatterns() {
return patterns;
}
}

View File

@ -0,0 +1,24 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Encoder and decoder which transform a
* <a href="http://http://redis.io/topics/protocol">Redis protocol commands and replies</a>
* into a {@link org.jboss.netty.buffer.ChannelBuffer}
* and vice versa.
*
*/
package org.jboss.netty.handler.codec.redis;

View File

@ -0,0 +1,54 @@
package org.jboss.netty.handler.codec.redis;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.queue.BlockingReadHandler;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class RedisClient {
private static final byte[] VALUE = "value".getBytes();
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
final ClientBootstrap cb = new ClientBootstrap(new NioClientSocketChannelFactory(executor, executor));
final BlockingReadHandler<Reply> blockingReadHandler = new BlockingReadHandler<Reply>();
cb.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("redisEncoder", new RedisEncoder());
pipeline.addLast("redisDecoder", new RedisDecoder());
pipeline.addLast("result", blockingReadHandler);
return pipeline;
}
});
ChannelFuture redis = cb.connect(new InetSocketAddress("localhost", 6379));
redis.await().rethrowIfFailed();
Channel channel = redis.getChannel();
channel.write(new Command("set", "1", "value"));
System.out.print(blockingReadHandler.read());
channel.write(new Command("get", "1"));
System.out.print(blockingReadHandler.read());
int CALLS = 1000000;
long start = System.currentTimeMillis();
byte[] SET_BYTES = "SET".getBytes();
for (int i = 0; i < CALLS; i++) {
channel.write(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE));
blockingReadHandler.read();
}
long end = System.currentTimeMillis();
System.out.println(CALLS * 1000 / (end - start) + " calls per second");
channel.close();
cb.releaseExternalResources();
}
}

View File

@ -0,0 +1,134 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.codec.redis;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
import org.jboss.netty.util.CharsetUtil;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class RedisCodecTest {
private DecoderEmbedder<ChannelBuffer> embedder;
@Before
public void setUp() {
embedder = new DecoderEmbedder<ChannelBuffer>(new RedisDecoder());
}
@Test
public void decodeReplies() throws IOException {
{
Object receive = decode("+OK\r\n".getBytes());
assertTrue(receive instanceof StatusReply);
assertEquals("OK", ((StatusReply) receive).status);
}
{
Object receive = decode("-ERROR\r\n".getBytes());
assertTrue(receive instanceof ErrorReply);
assertEquals("ERROR", ((ErrorReply) receive).error);
}
{
Object receive = decode(":123\r\n".getBytes());
assertTrue(receive instanceof IntegerReply);
assertEquals(123, ((IntegerReply) receive).integer);
}
{
Object receive = decode("$5\r\nnetty\r\n".getBytes());
assertTrue(receive instanceof BulkReply);
assertEquals("netty", new String(((BulkReply) receive).bytes));
}
{
Object receive = decode("*2\r\n$5\r\nnetty\r\n$5\r\nrules\r\n".getBytes());
assertTrue(receive instanceof MultiBulkReply);
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
}
}
private Object decode(byte[] bytes) {
embedder.offer(wrappedBuffer(bytes));
return embedder.poll();
}
@Test
public void encodeCommands() throws IOException {
String setCommand = "*3\r\n" +
"$3\r\n" +
"SET\r\n" +
"$5\r\n" +
"mykey\r\n" +
"$7\r\n" +
"myvalue\r\n";
Command command = new Command("SET", "mykey", "myvalue");
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
command.write(cb);
assertEquals(setCommand, cb.toString(CharsetUtil.US_ASCII));
}
@Test
public void testReplayDecoding() {
{
embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n".getBytes()));
Object receive = embedder.poll();
assertNull(receive);
embedder.offer(wrappedBuffer("$5\r\nrules\r\n".getBytes()));
receive = embedder.poll();
assertTrue(receive instanceof MultiBulkReply);
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
}
{
embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n$5\r\nr".getBytes()));
Object receive = embedder.poll();
assertNull(receive);
embedder.offer(wrappedBuffer("ules\r\n".getBytes()));
receive = embedder.poll();
assertTrue(receive instanceof MultiBulkReply);
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
}
{
embedder.offer(wrappedBuffer("*2".getBytes()));
Object receive = embedder.poll();
assertNull(receive);
embedder.offer(wrappedBuffer("\r\n$5\r\nnetty\r\n$5\r\nrules\r\n".getBytes()));
receive = embedder.poll();
assertTrue(receive instanceof MultiBulkReply);
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
}
{
embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n$5\r\nrules\r".getBytes()));
Object receive = embedder.poll();
assertNull(receive);
embedder.offer(wrappedBuffer("\n".getBytes()));
receive = embedder.poll();
assertTrue(receive instanceof MultiBulkReply);
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
}
}
}