From b3cdeff919879d60d49ef5eb8c6c4a6767fc199b Mon Sep 17 00:00:00 2001 From: Sam Pullara Date: Sun, 26 Feb 2012 10:23:24 -0800 Subject: [PATCH] suggestions from both repositories applied --- .../netty/handler/codec/redis/BulkReply.java | 7 +- .../netty/handler/codec/redis/Command.java | 73 ++++++++++--------- .../netty/handler/codec/redis/ErrorReply.java | 7 -- .../handler/codec/redis/IntegerReply.java | 3 +- .../handler/codec/redis/MultiBulkReply.java | 15 ++-- .../handler/codec/redis/RedisDecoder.java | 26 +++---- .../handler/codec/redis/RedisEncoder.java | 2 + .../handler/codec/redis/RedisClient.java | 63 ++++++++-------- 8 files changed, 90 insertions(+), 106 deletions(-) diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/BulkReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/BulkReply.java index 356bec6abe..d9907cbcd0 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/BulkReply.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/BulkReply.java @@ -15,12 +15,11 @@ public class BulkReply extends Reply { public void write(ChannelBuffer os) throws IOException { os.writeByte(MARKER); if (bytes == null) { - os.writeBytes(Command.NEG_ONE); + os.writeBytes(Command.NEG_ONE_AND_CRLF); } else { - os.writeBytes(Command.numToBytes(bytes.length)); - os.writeBytes(Command.CRLF); + os.writeBytes(Command.numAndCRLF(bytes.length)); os.writeBytes(bytes); + os.writeBytes(Command.CRLF); } - os.writeBytes(Command.CRLF); } } diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/Command.java b/src/main/java/org/jboss/netty/handler/codec/redis/Command.java index 61dff5c883..6d85cad59b 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/Command.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/Command.java @@ -16,7 +16,7 @@ public class Command { public static final byte[] CRLF = "\r\n".getBytes(); public static final byte[] BYTES_PREFIX = "$".getBytes(); public static final byte[] EMPTY_BYTES = new byte[0]; - public static final byte[] NEG_ONE = Command.numToBytes(-1); + public static final byte[] NEG_ONE_AND_CRLF = convertWithCRLF(-1); private byte[][] arguments; private Object[] objects; @@ -36,6 +36,7 @@ public class Command { public Command(byte[]... arguments) { this.arguments = arguments; + objects = arguments; } public Command(Object... objects) { @@ -64,57 +65,57 @@ public class Command { private static void writeDirect(ChannelBuffer os, byte[][] arguments) throws IOException { os.writeBytes(ARGS_PREFIX); - os.writeBytes(Command.numToBytes(arguments.length)); - os.writeBytes(CRLF); + os.writeBytes(numAndCRLF(arguments.length)); for (byte[] argument : arguments) { os.writeBytes(BYTES_PREFIX); - os.writeBytes(Command.numToBytes(argument.length)); - os.writeBytes(CRLF); + os.writeBytes(numAndCRLF(argument.length)); os.writeBytes(argument); os.writeBytes(CRLF); } } private static final int NUM_MAP_LENGTH = 256; - private static byte[][] numMap = new byte[NUM_MAP_LENGTH][]; - + private static byte[][] numAndCRLFMap = new byte[NUM_MAP_LENGTH][]; static { - for (int i = 0; i < NUM_MAP_LENGTH; i++) { - numMap[i] = convert(i); - } + for (int i = 0; i < NUM_MAP_LENGTH; i++) { + numAndCRLFMap[i] = convertWithCRLF(i); + } } // Optimized for the direct to ASCII bytes case // Could be even more optimized but it is already // about twice as fast as using Long.toString().getBytes() - public static byte[] numToBytes(long value) { - if (value >= 0 && value < NUM_MAP_LENGTH) { - return numMap[((int) value)]; - } else if (value == -1) { - return NEG_ONE; - } - return convert(value); + public static byte[] numAndCRLF(long value) { + if (value >= 0 && value < NUM_MAP_LENGTH) { + return numAndCRLFMap[(int) value]; + } else if (value == -1) { + return NEG_ONE_AND_CRLF; + } + return convertWithCRLF(value); } - private static byte[] convert(long value) { - boolean negative = value < 0; - int index = negative ? 2 : 1; - long current = negative ? -value : value; - while ((current /= 10) > 0) { - index++; - } - byte[] bytes = new byte[index]; - if (negative) { - bytes[0] = '-'; - } - current = negative ? -value : value; - long tmp = current; - while ((tmp /= 10) > 0) { - bytes[--index] = (byte) ('0' + (current % 10)); - current = tmp; - } - bytes[--index] = (byte) ('0' + current); - return bytes; + private static byte[] convertWithCRLF(long value) { + boolean negative = value < 0; + int index = negative ? 2 : 1; + long current = negative ? -value : value; + while ((current /= 10) > 0) { + index++; + } + byte[] bytes = new byte[index + 2]; + if (negative) { + bytes[0] = '-'; + } + current = negative ? -value : value; + long tmp = current; + while ((tmp /= 10) > 0) { + bytes[--index] = (byte) ('0' + (current % 10)); + current = tmp; + } + bytes[--index] = (byte) ('0' + current); + // add CRLF + bytes[bytes.length - 2] = '\r'; + bytes[bytes.length - 1] = '\n'; + return bytes; } } diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/ErrorReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/ErrorReply.java index 3e168183cb..a394339da1 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/ErrorReply.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/ErrorReply.java @@ -4,13 +4,6 @@ import org.jboss.netty.buffer.ChannelBuffer; import java.io.IOException; -/** - * Created by IntelliJ IDEA. - * User: sam - * Date: 7/29/11 - * Time: 10:23 AM - * To change this template use File | Settings | File Templates. - */ public class ErrorReply extends Reply { public static final char MARKER = '-'; private static final byte[] ERR = "ERR ".getBytes(UTF_8); diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/IntegerReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/IntegerReply.java index 615e774d2f..3e790a4d67 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/IntegerReply.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/IntegerReply.java @@ -14,7 +14,6 @@ public class IntegerReply extends Reply { public void write(ChannelBuffer os) throws IOException { os.writeByte(MARKER); - os.writeBytes(Command.numToBytes(integer)); - os.writeBytes(Command.CRLF); + os.writeBytes(Command.numAndCRLF(integer)); } } diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/MultiBulkReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/MultiBulkReply.java index a390be0630..3a06bf584e 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/MultiBulkReply.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/MultiBulkReply.java @@ -42,27 +42,24 @@ public class MultiBulkReply extends Reply { public void write(ChannelBuffer os) throws IOException { os.writeByte(MARKER); if (byteArrays == null) { - os.writeBytes(Command.NEG_ONE); - os.writeBytes(Command.CRLF); + os.writeBytes(Command.NEG_ONE_AND_CRLF); } else { - os.writeBytes(Command.numToBytes(byteArrays.length)); - os.writeBytes(Command.CRLF); + os.writeBytes(Command.numAndCRLF(byteArrays.length)); for (Object value : byteArrays) { if (value == null) { os.writeByte(BulkReply.MARKER); - os.writeBytes(Command.NEG_ONE); + os.writeBytes(Command.NEG_ONE_AND_CRLF); } else if (value instanceof byte[]) { byte[] bytes = (byte[]) value; os.writeByte(BulkReply.MARKER); int length = bytes.length; - os.writeBytes(Command.numToBytes(length)); - os.writeBytes(Command.CRLF); + os.writeBytes(Command.numAndCRLF(length)); os.writeBytes(bytes); + os.writeBytes(Command.CRLF); } else if (value instanceof Number) { os.writeByte(IntegerReply.MARKER); - os.writeBytes(Command.numToBytes(((Number) value).longValue())); + os.writeBytes(Command.numAndCRLF(((Number) value).longValue())); } - os.writeBytes(Command.CRLF); } } } diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/RedisDecoder.java b/src/main/java/org/jboss/netty/handler/codec/redis/RedisDecoder.java index 371f9aeaa0..13e7d37d77 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/RedisDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/RedisDecoder.java @@ -1,13 +1,13 @@ package org.jboss.netty.handler.codec.redis; import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBufferIndexFinder; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.replay.ReplayingDecoder; -import java.io.DataInputStream; import java.io.IOException; -import java.io.InputStream; +import java.nio.charset.Charset; enum State { @@ -18,6 +18,7 @@ public class RedisDecoder extends ReplayingDecoder { private static final char CR = '\r'; private static final char LF = '\n'; private static final char ZERO = '0'; + public static final Charset UTF_8 = Charset.forName("UTF-8"); // We track the current multibulk reply in the case // where we do not get a complete reply in a single @@ -74,10 +75,14 @@ public class RedisDecoder extends ReplayingDecoder { int code = is.readByte(); switch (code) { case StatusReply.MARKER: { - return new StatusReply(new DataInputStream(new ChannelBufferInputStream(is)).readLine()); + String status = is.readBytes(is.bytesBefore(ChannelBufferIndexFinder.CRLF)).toString(UTF_8); + is.skipBytes(2); + return new StatusReply(status); } case ErrorReply.MARKER: { - return new ErrorReply(new DataInputStream(new ChannelBufferInputStream(is)).readLine()); + String error = is.readBytes(is.bytesBefore(ChannelBufferIndexFinder.CRLF)).toString(UTF_8); + is.skipBytes(2); + return new ErrorReply(error); } case IntegerReply.MARKER: { return new IntegerReply(readInteger(is)); @@ -111,17 +116,4 @@ public class RedisDecoder extends ReplayingDecoder { reply.read(this, is); return reply; } - - private static class ChannelBufferInputStream extends InputStream { - private final ChannelBuffer is; - - public ChannelBufferInputStream(ChannelBuffer is) { - this.is = is; - } - - @Override - public int read() throws IOException { - return is.readByte(); - } - } } diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java b/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java index db63ebd974..7a84f2d865 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java @@ -35,6 +35,8 @@ public class RedisEncoder extends SimpleChannelDownstreamHandler { } }); Channels.write(ctx, future, cb); + } else { + super.writeRequested(ctx, e); } } } diff --git a/src/test/java/org/jboss/netty/handler/codec/redis/RedisClient.java b/src/test/java/org/jboss/netty/handler/codec/redis/RedisClient.java index 1e7b21f9f5..e42e60f896 100644 --- a/src/test/java/org/jboss/netty/handler/codec/redis/RedisClient.java +++ b/src/test/java/org/jboss/netty/handler/codec/redis/RedisClient.java @@ -14,40 +14,41 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class RedisClient { - private static final byte[] VALUE = "value".getBytes(Reply.UTF_8); + private static final byte[] VALUE = "value".getBytes(Reply.UTF_8); public static void main(String[] args) throws Exception { - ExecutorService executor = Executors.newCachedThreadPool(); - final ClientBootstrap cb = new ClientBootstrap(new NioClientSocketChannelFactory(executor, executor)); - final BlockingReadHandler blockingReadHandler = new BlockingReadHandler(); - cb.setPipelineFactory(new ChannelPipelineFactory() { - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); - pipeline.addLast("redisEncoder", new RedisEncoder()); - pipeline.addLast("redisDecoder", new RedisDecoder()); - pipeline.addLast("result", blockingReadHandler); - return pipeline; - } - }); - ChannelFuture redis = cb.connect(new InetSocketAddress("localhost", 6379)); - redis.await().rethrowIfFailed(); - Channel channel = redis.getChannel(); + ExecutorService executor = Executors.newCachedThreadPool(); + final ClientBootstrap cb = new ClientBootstrap(new NioClientSocketChannelFactory(executor, executor)); + final BlockingReadHandler blockingReadHandler = new BlockingReadHandler(); + cb.setPipelineFactory(new ChannelPipelineFactory() { + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline pipeline = Channels.pipeline(); + pipeline.addLast("redisEncoder", new RedisEncoder()); + pipeline.addLast("redisDecoder", new RedisDecoder()); + pipeline.addLast("result", blockingReadHandler); + return pipeline; + } + }); + ChannelFuture redis = cb.connect(new InetSocketAddress("localhost", 6379)); + redis.await().rethrowIfFailed(); + Channel channel = redis.getChannel(); - channel.write(new Command("set", "1", "value")); - System.out.print(blockingReadHandler.read()); - channel.write(new Command("get", "1")); - System.out.print(blockingReadHandler.read()); + channel.write(new Command("set", "1", "value")); + System.out.print(blockingReadHandler.read()); + channel.write(new Command("get", "1")); + System.out.print(blockingReadHandler.read()); - int CALLS = 1000000; - long start = System.currentTimeMillis(); - for (int i = 0; i < CALLS; i++) { - channel.write(new Command("SET".getBytes(), Command.numToBytes(i), VALUE)); - blockingReadHandler.read(); + int CALLS = 1000000; + long start = System.currentTimeMillis(); + byte[] SET_BYTES = "SET".getBytes(); + for (int i = 0; i < CALLS; i++) { + channel.write(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE)); + blockingReadHandler.read(); + } + long end = System.currentTimeMillis(); + System.out.println(CALLS * 1000 / (end - start) + " calls per second"); + + channel.close(); + cb.releaseExternalResources(); } - long end = System.currentTimeMillis(); - System.out.println(CALLS * 1000 / (end - start) + " calls per second"); - - channel.close(); - cb.releaseExternalResources(); - } }