suggestions from both repositories applied
This commit is contained in:
parent
0776911ed1
commit
b3cdeff919
@ -15,12 +15,11 @@ public class BulkReply extends Reply {
|
||||
public void write(ChannelBuffer os) throws IOException {
|
||||
os.writeByte(MARKER);
|
||||
if (bytes == null) {
|
||||
os.writeBytes(Command.NEG_ONE);
|
||||
os.writeBytes(Command.NEG_ONE_AND_CRLF);
|
||||
} else {
|
||||
os.writeBytes(Command.numToBytes(bytes.length));
|
||||
os.writeBytes(Command.CRLF);
|
||||
os.writeBytes(Command.numAndCRLF(bytes.length));
|
||||
os.writeBytes(bytes);
|
||||
os.writeBytes(Command.CRLF);
|
||||
}
|
||||
os.writeBytes(Command.CRLF);
|
||||
}
|
||||
}
|
||||
|
@ -16,7 +16,7 @@ public class Command {
|
||||
public static final byte[] CRLF = "\r\n".getBytes();
|
||||
public static final byte[] BYTES_PREFIX = "$".getBytes();
|
||||
public static final byte[] EMPTY_BYTES = new byte[0];
|
||||
public static final byte[] NEG_ONE = Command.numToBytes(-1);
|
||||
public static final byte[] NEG_ONE_AND_CRLF = convertWithCRLF(-1);
|
||||
|
||||
private byte[][] arguments;
|
||||
private Object[] objects;
|
||||
@ -36,6 +36,7 @@ public class Command {
|
||||
|
||||
public Command(byte[]... arguments) {
|
||||
this.arguments = arguments;
|
||||
objects = arguments;
|
||||
}
|
||||
|
||||
public Command(Object... objects) {
|
||||
@ -64,57 +65,57 @@ public class Command {
|
||||
|
||||
private static void writeDirect(ChannelBuffer os, byte[][] arguments) throws IOException {
|
||||
os.writeBytes(ARGS_PREFIX);
|
||||
os.writeBytes(Command.numToBytes(arguments.length));
|
||||
os.writeBytes(CRLF);
|
||||
os.writeBytes(numAndCRLF(arguments.length));
|
||||
for (byte[] argument : arguments) {
|
||||
os.writeBytes(BYTES_PREFIX);
|
||||
os.writeBytes(Command.numToBytes(argument.length));
|
||||
os.writeBytes(CRLF);
|
||||
os.writeBytes(numAndCRLF(argument.length));
|
||||
os.writeBytes(argument);
|
||||
os.writeBytes(CRLF);
|
||||
}
|
||||
}
|
||||
|
||||
private static final int NUM_MAP_LENGTH = 256;
|
||||
private static byte[][] numMap = new byte[NUM_MAP_LENGTH][];
|
||||
|
||||
private static byte[][] numAndCRLFMap = new byte[NUM_MAP_LENGTH][];
|
||||
static {
|
||||
for (int i = 0; i < NUM_MAP_LENGTH; i++) {
|
||||
numMap[i] = convert(i);
|
||||
}
|
||||
for (int i = 0; i < NUM_MAP_LENGTH; i++) {
|
||||
numAndCRLFMap[i] = convertWithCRLF(i);
|
||||
}
|
||||
}
|
||||
|
||||
// Optimized for the direct to ASCII bytes case
|
||||
// Could be even more optimized but it is already
|
||||
// about twice as fast as using Long.toString().getBytes()
|
||||
public static byte[] numToBytes(long value) {
|
||||
if (value >= 0 && value < NUM_MAP_LENGTH) {
|
||||
return numMap[((int) value)];
|
||||
} else if (value == -1) {
|
||||
return NEG_ONE;
|
||||
}
|
||||
return convert(value);
|
||||
public static byte[] numAndCRLF(long value) {
|
||||
if (value >= 0 && value < NUM_MAP_LENGTH) {
|
||||
return numAndCRLFMap[(int) value];
|
||||
} else if (value == -1) {
|
||||
return NEG_ONE_AND_CRLF;
|
||||
}
|
||||
return convertWithCRLF(value);
|
||||
}
|
||||
|
||||
private static byte[] convert(long value) {
|
||||
boolean negative = value < 0;
|
||||
int index = negative ? 2 : 1;
|
||||
long current = negative ? -value : value;
|
||||
while ((current /= 10) > 0) {
|
||||
index++;
|
||||
}
|
||||
byte[] bytes = new byte[index];
|
||||
if (negative) {
|
||||
bytes[0] = '-';
|
||||
}
|
||||
current = negative ? -value : value;
|
||||
long tmp = current;
|
||||
while ((tmp /= 10) > 0) {
|
||||
bytes[--index] = (byte) ('0' + (current % 10));
|
||||
current = tmp;
|
||||
}
|
||||
bytes[--index] = (byte) ('0' + current);
|
||||
return bytes;
|
||||
private static byte[] convertWithCRLF(long value) {
|
||||
boolean negative = value < 0;
|
||||
int index = negative ? 2 : 1;
|
||||
long current = negative ? -value : value;
|
||||
while ((current /= 10) > 0) {
|
||||
index++;
|
||||
}
|
||||
byte[] bytes = new byte[index + 2];
|
||||
if (negative) {
|
||||
bytes[0] = '-';
|
||||
}
|
||||
current = negative ? -value : value;
|
||||
long tmp = current;
|
||||
while ((tmp /= 10) > 0) {
|
||||
bytes[--index] = (byte) ('0' + (current % 10));
|
||||
current = tmp;
|
||||
}
|
||||
bytes[--index] = (byte) ('0' + current);
|
||||
// add CRLF
|
||||
bytes[bytes.length - 2] = '\r';
|
||||
bytes[bytes.length - 1] = '\n';
|
||||
return bytes;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -4,13 +4,6 @@ import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Created by IntelliJ IDEA.
|
||||
* User: sam
|
||||
* Date: 7/29/11
|
||||
* Time: 10:23 AM
|
||||
* To change this template use File | Settings | File Templates.
|
||||
*/
|
||||
public class ErrorReply extends Reply {
|
||||
public static final char MARKER = '-';
|
||||
private static final byte[] ERR = "ERR ".getBytes(UTF_8);
|
||||
|
@ -14,7 +14,6 @@ public class IntegerReply extends Reply {
|
||||
|
||||
public void write(ChannelBuffer os) throws IOException {
|
||||
os.writeByte(MARKER);
|
||||
os.writeBytes(Command.numToBytes(integer));
|
||||
os.writeBytes(Command.CRLF);
|
||||
os.writeBytes(Command.numAndCRLF(integer));
|
||||
}
|
||||
}
|
||||
|
@ -42,27 +42,24 @@ public class MultiBulkReply extends Reply {
|
||||
public void write(ChannelBuffer os) throws IOException {
|
||||
os.writeByte(MARKER);
|
||||
if (byteArrays == null) {
|
||||
os.writeBytes(Command.NEG_ONE);
|
||||
os.writeBytes(Command.CRLF);
|
||||
os.writeBytes(Command.NEG_ONE_AND_CRLF);
|
||||
} else {
|
||||
os.writeBytes(Command.numToBytes(byteArrays.length));
|
||||
os.writeBytes(Command.CRLF);
|
||||
os.writeBytes(Command.numAndCRLF(byteArrays.length));
|
||||
for (Object value : byteArrays) {
|
||||
if (value == null) {
|
||||
os.writeByte(BulkReply.MARKER);
|
||||
os.writeBytes(Command.NEG_ONE);
|
||||
os.writeBytes(Command.NEG_ONE_AND_CRLF);
|
||||
} else if (value instanceof byte[]) {
|
||||
byte[] bytes = (byte[]) value;
|
||||
os.writeByte(BulkReply.MARKER);
|
||||
int length = bytes.length;
|
||||
os.writeBytes(Command.numToBytes(length));
|
||||
os.writeBytes(Command.CRLF);
|
||||
os.writeBytes(Command.numAndCRLF(length));
|
||||
os.writeBytes(bytes);
|
||||
os.writeBytes(Command.CRLF);
|
||||
} else if (value instanceof Number) {
|
||||
os.writeByte(IntegerReply.MARKER);
|
||||
os.writeBytes(Command.numToBytes(((Number) value).longValue()));
|
||||
os.writeBytes(Command.numAndCRLF(((Number) value).longValue()));
|
||||
}
|
||||
os.writeBytes(Command.CRLF);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,13 +1,13 @@
|
||||
package org.jboss.netty.handler.codec.redis;
|
||||
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBufferIndexFinder;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.charset.Charset;
|
||||
|
||||
enum State {
|
||||
|
||||
@ -18,6 +18,7 @@ public class RedisDecoder extends ReplayingDecoder<State> {
|
||||
private static final char CR = '\r';
|
||||
private static final char LF = '\n';
|
||||
private static final char ZERO = '0';
|
||||
public static final Charset UTF_8 = Charset.forName("UTF-8");
|
||||
|
||||
// We track the current multibulk reply in the case
|
||||
// where we do not get a complete reply in a single
|
||||
@ -74,10 +75,14 @@ public class RedisDecoder extends ReplayingDecoder<State> {
|
||||
int code = is.readByte();
|
||||
switch (code) {
|
||||
case StatusReply.MARKER: {
|
||||
return new StatusReply(new DataInputStream(new ChannelBufferInputStream(is)).readLine());
|
||||
String status = is.readBytes(is.bytesBefore(ChannelBufferIndexFinder.CRLF)).toString(UTF_8);
|
||||
is.skipBytes(2);
|
||||
return new StatusReply(status);
|
||||
}
|
||||
case ErrorReply.MARKER: {
|
||||
return new ErrorReply(new DataInputStream(new ChannelBufferInputStream(is)).readLine());
|
||||
String error = is.readBytes(is.bytesBefore(ChannelBufferIndexFinder.CRLF)).toString(UTF_8);
|
||||
is.skipBytes(2);
|
||||
return new ErrorReply(error);
|
||||
}
|
||||
case IntegerReply.MARKER: {
|
||||
return new IntegerReply(readInteger(is));
|
||||
@ -111,17 +116,4 @@ public class RedisDecoder extends ReplayingDecoder<State> {
|
||||
reply.read(this, is);
|
||||
return reply;
|
||||
}
|
||||
|
||||
private static class ChannelBufferInputStream extends InputStream {
|
||||
private final ChannelBuffer is;
|
||||
|
||||
public ChannelBufferInputStream(ChannelBuffer is) {
|
||||
this.is = is;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
return is.readByte();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -35,6 +35,8 @@ public class RedisEncoder extends SimpleChannelDownstreamHandler {
|
||||
}
|
||||
});
|
||||
Channels.write(ctx, future, cb);
|
||||
} else {
|
||||
super.writeRequested(ctx, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -14,40 +14,41 @@ import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
public class RedisClient {
|
||||
private static final byte[] VALUE = "value".getBytes(Reply.UTF_8);
|
||||
private static final byte[] VALUE = "value".getBytes(Reply.UTF_8);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
ExecutorService executor = Executors.newCachedThreadPool();
|
||||
final ClientBootstrap cb = new ClientBootstrap(new NioClientSocketChannelFactory(executor, executor));
|
||||
final BlockingReadHandler<Reply> blockingReadHandler = new BlockingReadHandler<Reply>();
|
||||
cb.setPipelineFactory(new ChannelPipelineFactory() {
|
||||
public ChannelPipeline getPipeline() throws Exception {
|
||||
ChannelPipeline pipeline = Channels.pipeline();
|
||||
pipeline.addLast("redisEncoder", new RedisEncoder());
|
||||
pipeline.addLast("redisDecoder", new RedisDecoder());
|
||||
pipeline.addLast("result", blockingReadHandler);
|
||||
return pipeline;
|
||||
}
|
||||
});
|
||||
ChannelFuture redis = cb.connect(new InetSocketAddress("localhost", 6379));
|
||||
redis.await().rethrowIfFailed();
|
||||
Channel channel = redis.getChannel();
|
||||
ExecutorService executor = Executors.newCachedThreadPool();
|
||||
final ClientBootstrap cb = new ClientBootstrap(new NioClientSocketChannelFactory(executor, executor));
|
||||
final BlockingReadHandler<Reply> blockingReadHandler = new BlockingReadHandler<Reply>();
|
||||
cb.setPipelineFactory(new ChannelPipelineFactory() {
|
||||
public ChannelPipeline getPipeline() throws Exception {
|
||||
ChannelPipeline pipeline = Channels.pipeline();
|
||||
pipeline.addLast("redisEncoder", new RedisEncoder());
|
||||
pipeline.addLast("redisDecoder", new RedisDecoder());
|
||||
pipeline.addLast("result", blockingReadHandler);
|
||||
return pipeline;
|
||||
}
|
||||
});
|
||||
ChannelFuture redis = cb.connect(new InetSocketAddress("localhost", 6379));
|
||||
redis.await().rethrowIfFailed();
|
||||
Channel channel = redis.getChannel();
|
||||
|
||||
channel.write(new Command("set", "1", "value"));
|
||||
System.out.print(blockingReadHandler.read());
|
||||
channel.write(new Command("get", "1"));
|
||||
System.out.print(blockingReadHandler.read());
|
||||
channel.write(new Command("set", "1", "value"));
|
||||
System.out.print(blockingReadHandler.read());
|
||||
channel.write(new Command("get", "1"));
|
||||
System.out.print(blockingReadHandler.read());
|
||||
|
||||
int CALLS = 1000000;
|
||||
long start = System.currentTimeMillis();
|
||||
for (int i = 0; i < CALLS; i++) {
|
||||
channel.write(new Command("SET".getBytes(), Command.numToBytes(i), VALUE));
|
||||
blockingReadHandler.read();
|
||||
int CALLS = 1000000;
|
||||
long start = System.currentTimeMillis();
|
||||
byte[] SET_BYTES = "SET".getBytes();
|
||||
for (int i = 0; i < CALLS; i++) {
|
||||
channel.write(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE));
|
||||
blockingReadHandler.read();
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
System.out.println(CALLS * 1000 / (end - start) + " calls per second");
|
||||
|
||||
channel.close();
|
||||
cb.releaseExternalResources();
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
System.out.println(CALLS * 1000 / (end - start) + " calls per second");
|
||||
|
||||
channel.close();
|
||||
cb.releaseExternalResources();
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user