Merge pull request #242 from spullara/3

Adds the ability to send many commands in a single netty call
This commit is contained in:
Trustin Lee 2012-03-29 05:04:44 -07:00
commit 3db4053f77
2 changed files with 68 additions and 50 deletions

View File

@ -28,7 +28,10 @@ import org.jboss.netty.handler.codec.redis.RedisEncoder;
import org.jboss.netty.handler.codec.redis.Reply;
import org.jboss.netty.handler.queue.BlockingReadHandler;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -58,6 +61,50 @@ public final class RedisClient {
System.out.print(blockingReadHandler.read());
int CALLS = 1000000;
int PIPELINE = 50;
requestResponse(blockingReadHandler, channel, CALLS);
pipelinedIndividualRequests(blockingReadHandler, channel, CALLS * 10, PIPELINE);
pipelinedListOfRequests(blockingReadHandler, channel, CALLS * 10, PIPELINE);
channel.close();
cb.releaseExternalResources();
}
private static void pipelinedListOfRequests(BlockingReadHandler<Reply> blockingReadHandler, Channel channel, long CALLS, int PIPELINE) throws IOException, InterruptedException {
long start = System.currentTimeMillis();
byte[] SET_BYTES = "SET".getBytes();
for (int i = 0; i < CALLS / PIPELINE; i++) {
List<Command> list = new ArrayList<Command>();
for (int j = 0; j < PIPELINE; j++) {
int base = i * PIPELINE;
list.add(new Command(SET_BYTES, String.valueOf(base + j).getBytes(), VALUE));
}
channel.write(list);
for (int j = 0; j < PIPELINE; j++) {
blockingReadHandler.read();
}
}
long end = System.currentTimeMillis();
System.out.println(CALLS * 1000 / (end - start) + " calls per second");
}
private static void pipelinedIndividualRequests(BlockingReadHandler<Reply> blockingReadHandler, Channel channel, long CALLS, int PIPELINE) throws IOException, InterruptedException {
long start = System.currentTimeMillis();
byte[] SET_BYTES = "SET".getBytes();
for (int i = 0; i < CALLS / PIPELINE; i++) {
int base = i * PIPELINE;
for (int j = 0; j < PIPELINE; j++) {
channel.write(new Command(SET_BYTES, String.valueOf(base + j).getBytes(), VALUE));
}
for (int j = 0; j < PIPELINE; j++) {
blockingReadHandler.read();
}
}
long end = System.currentTimeMillis();
System.out.println(CALLS * 1000 / (end - start) + " calls per second");
}
private static void requestResponse(BlockingReadHandler<Reply> blockingReadHandler, Channel channel, int CALLS) throws IOException, InterruptedException {
long start = System.currentTimeMillis();
byte[] SET_BYTES = "SET".getBytes();
for (int i = 0; i < CALLS; i++) {
@ -66,12 +113,9 @@ public final class RedisClient {
}
long end = System.currentTimeMillis();
System.out.println(CALLS * 1000 / (end - start) + " calls per second");
channel.close();
cb.releaseExternalResources();
}
private RedisClient() {
}
}

View File

@ -18,71 +18,45 @@ package org.jboss.netty.handler.codec.redis;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* {@link SimpleChannelDownstreamHandler} which encodes {@link Command}'s to {@link ChannelBuffer}'s
*
*
*/
@Sharable
public class RedisEncoder extends SimpleChannelDownstreamHandler {
private final Queue<ChannelBuffer> pool;
/**
* Calls {@link #RedisEncoder(boolean)} with <code>false</code>
*/
public RedisEncoder() {
this(false);
}
/**
* Create a new {@link RedisEncoder} instance
*
* @param poolBuffers <code>true</code> if the {@link ChannelBuffer}'s should be pooled. This should be used with caution as this
* can lead to unnecessary big memory consummation if one of the written values is very big and the rest is very small.
*/
public RedisEncoder(boolean poolBuffers) {
if (poolBuffers) {
pool = new ConcurrentLinkedQueue<ChannelBuffer>();
} else {
pool = null;
}
}
@Override
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Object o = e.getMessage();
if (o instanceof Command) {
Command command = (Command) o;
ChannelBuffer cb = null;
if (pool != null) {
cb = pool.poll();
}
if (cb == null) {
cb = ChannelBuffers.dynamicBuffer();
}
command.write(cb);
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
ChannelFuture future = e.getFuture();
if (pool != null) {
final ChannelBuffer finalCb = cb;
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture channelFuture) throws Exception {
finalCb.clear();
pool.add(finalCb);
}
});
Command command = (Command) o;
command.write(cb);
Channels.write(ctx, future, cb);
} else if (o instanceof Iterable) {
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
ChannelFuture future = e.getFuture();
// Useful for transactions and database select
for (Object i : (Iterable) o) {
if (i instanceof Command) {
Command command = (Command) i;
command.write(cb);
} else {
super.writeRequested(ctx, e);
return;
}
}
Channels.write(ctx, future, cb);
} else {