add the ability to send an iterable of commands at a time that will be on the wire together
This commit is contained in:
parent
80490f5170
commit
5daf3b10a6
@ -16,11 +16,7 @@
|
||||
package org.jboss.netty.example.redis;
|
||||
|
||||
import org.jboss.netty.bootstrap.ClientBootstrap;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.ChannelPipeline;
|
||||
import org.jboss.netty.channel.ChannelPipelineFactory;
|
||||
import org.jboss.netty.channel.Channels;
|
||||
import org.jboss.netty.channel.*;
|
||||
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
|
||||
import org.jboss.netty.handler.codec.redis.Command;
|
||||
import org.jboss.netty.handler.codec.redis.RedisDecoder;
|
||||
@ -29,6 +25,8 @@ import org.jboss.netty.handler.codec.redis.Reply;
|
||||
import org.jboss.netty.handler.queue.BlockingReadHandler;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
@ -58,20 +56,53 @@ public final class RedisClient {
|
||||
System.out.print(blockingReadHandler.read());
|
||||
|
||||
int CALLS = 1000000;
|
||||
long start = System.currentTimeMillis();
|
||||
byte[] SET_BYTES = "SET".getBytes();
|
||||
for (int i = 0; i < CALLS; i++) {
|
||||
channel.write(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE));
|
||||
blockingReadHandler.read();
|
||||
int PIPELINE = 50;
|
||||
{
|
||||
long start = System.currentTimeMillis();
|
||||
byte[] SET_BYTES = "SET".getBytes();
|
||||
for (int i = 0; i < CALLS; i++) {
|
||||
channel.write(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE));
|
||||
blockingReadHandler.read();
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
System.out.println(CALLS * 1000 / (end - start) + " calls per second");
|
||||
}
|
||||
{
|
||||
long start = System.currentTimeMillis();
|
||||
byte[] SET_BYTES = "SET".getBytes();
|
||||
for (int i = 0; i < CALLS / PIPELINE; i++) {
|
||||
for (int j = 0; j < PIPELINE; j++) {
|
||||
channel.write(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE));
|
||||
}
|
||||
for (int j = 0; j < PIPELINE; j++) {
|
||||
blockingReadHandler.read();
|
||||
}
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
System.out.println(CALLS * 1000 / (end - start) + " calls per second");
|
||||
}
|
||||
{
|
||||
long start = System.currentTimeMillis();
|
||||
byte[] SET_BYTES = "SET".getBytes();
|
||||
for (int i = 0; i < CALLS / PIPELINE; i++) {
|
||||
List<Command> list = new ArrayList<Command>();
|
||||
for (int j = 0; j < PIPELINE; j++) {
|
||||
list.add(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE));
|
||||
}
|
||||
channel.write(list);
|
||||
for (int j = 0; j < PIPELINE; j++) {
|
||||
blockingReadHandler.read();
|
||||
}
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
System.out.println(CALLS * 1000 / (end - start) + " calls per second");
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
System.out.println(CALLS * 1000 / (end - start) + " calls per second");
|
||||
|
||||
channel.close();
|
||||
cb.releaseExternalResources();
|
||||
}
|
||||
|
||||
private RedisClient() {
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -17,12 +17,7 @@ package org.jboss.netty.handler.codec.redis;
|
||||
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.ChannelFutureListener;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.Channels;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
|
||||
import org.jboss.netty.channel.*;
|
||||
import org.jboss.netty.channel.ChannelHandler.Sharable;
|
||||
|
||||
import java.util.Queue;
|
||||
@ -30,8 +25,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
/**
|
||||
* {@link SimpleChannelDownstreamHandler} which encodes {@link Command}'s to {@link ChannelBuffer}'s
|
||||
*
|
||||
*
|
||||
*/
|
||||
@Sharable
|
||||
public class RedisEncoder extends SimpleChannelDownstreamHandler {
|
||||
@ -44,11 +37,11 @@ public class RedisEncoder extends SimpleChannelDownstreamHandler {
|
||||
public RedisEncoder() {
|
||||
this(false);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create a new {@link RedisEncoder} instance
|
||||
*
|
||||
* @param poolBuffers <code>true</code> if the {@link ChannelBuffer}'s should be pooled. This should be used with caution as this
|
||||
* Create a new {@link RedisEncoder} instance
|
||||
*
|
||||
* @param poolBuffers <code>true</code> if the {@link ChannelBuffer}'s should be pooled. This should be used with caution as this
|
||||
* can lead to unnecessary big memory consummation if one of the written values is very big and the rest is very small.
|
||||
*/
|
||||
public RedisEncoder(boolean poolBuffers) {
|
||||
@ -58,35 +51,65 @@ public class RedisEncoder extends SimpleChannelDownstreamHandler {
|
||||
pool = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
|
||||
Object o = e.getMessage();
|
||||
if (o instanceof Command) {
|
||||
Command command = (Command) o;
|
||||
ChannelBuffer cb = null;
|
||||
if (pool != null) {
|
||||
cb = pool.poll();
|
||||
}
|
||||
if (cb == null) {
|
||||
cb = ChannelBuffers.dynamicBuffer();
|
||||
}
|
||||
command.write(cb);
|
||||
ChannelBuffer cb = getChannelBuffer();
|
||||
ChannelFuture future = e.getFuture();
|
||||
|
||||
if (pool != null) {
|
||||
final ChannelBuffer finalCb = cb;
|
||||
future.addListener(new ChannelFutureListener() {
|
||||
public void operationComplete(ChannelFuture channelFuture) throws Exception {
|
||||
finalCb.clear();
|
||||
pool.add(finalCb);
|
||||
Command command = (Command) o;
|
||||
command.write(cb);
|
||||
returnToPool(cb, future);
|
||||
Channels.write(ctx, future, cb);
|
||||
|
||||
} else if (o instanceof Iterable) {
|
||||
ChannelBuffer cb = getChannelBuffer();
|
||||
ChannelFuture future = e.getFuture();
|
||||
|
||||
// Useful for transactions and database select
|
||||
for (Object i : (Iterable) o) {
|
||||
if (i instanceof Command) {
|
||||
Command command = (Command) i;
|
||||
command.write(cb);
|
||||
} else {
|
||||
if (pool != null) {
|
||||
cb.clear();
|
||||
pool.add(cb);
|
||||
super.writeRequested(ctx, e);
|
||||
return;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
returnToPool(cb, future);
|
||||
Channels.write(ctx, future, cb);
|
||||
} else {
|
||||
super.writeRequested(ctx, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void returnToPool(ChannelBuffer cb, ChannelFuture future) {
|
||||
if (pool != null) {
|
||||
final ChannelBuffer finalCb = cb;
|
||||
future.addListener(new ChannelFutureListener() {
|
||||
public void operationComplete(ChannelFuture channelFuture) throws Exception {
|
||||
finalCb.clear();
|
||||
pool.add(finalCb);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private ChannelBuffer getChannelBuffer() {
|
||||
ChannelBuffer cb = null;
|
||||
if (pool != null) {
|
||||
cb = pool.poll();
|
||||
}
|
||||
if (cb == null) {
|
||||
cb = ChannelBuffers.dynamicBuffer();
|
||||
}
|
||||
return cb;
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user