diff --git a/src/main/java/org/jboss/netty/example/redis/RedisClient.java b/src/main/java/org/jboss/netty/example/redis/RedisClient.java index 4a098894d6..9518243475 100644 --- a/src/main/java/org/jboss/netty/example/redis/RedisClient.java +++ b/src/main/java/org/jboss/netty/example/redis/RedisClient.java @@ -16,11 +16,7 @@ package org.jboss.netty.example.redis; import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.*; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.handler.codec.redis.Command; import org.jboss.netty.handler.codec.redis.RedisDecoder; @@ -29,6 +25,8 @@ import org.jboss.netty.handler.codec.redis.Reply; import org.jboss.netty.handler.queue.BlockingReadHandler; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -58,20 +56,53 @@ public final class RedisClient { System.out.print(blockingReadHandler.read()); int CALLS = 1000000; - long start = System.currentTimeMillis(); - byte[] SET_BYTES = "SET".getBytes(); - for (int i = 0; i < CALLS; i++) { - channel.write(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE)); - blockingReadHandler.read(); + int PIPELINE = 50; + { + long start = System.currentTimeMillis(); + byte[] SET_BYTES = "SET".getBytes(); + for (int i = 0; i < CALLS; i++) { + channel.write(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE)); + blockingReadHandler.read(); + } + long end = System.currentTimeMillis(); + System.out.println(CALLS * 1000 / (end - start) + " calls per second"); + } + { + long start = System.currentTimeMillis(); + byte[] SET_BYTES = "SET".getBytes(); + for (int i = 0; i < CALLS / PIPELINE; i++) { + for (int j = 0; j < PIPELINE; j++) { + channel.write(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE)); + } + for (int j = 0; j < PIPELINE; j++) { + blockingReadHandler.read(); + } + } + long end = System.currentTimeMillis(); + System.out.println(CALLS * 1000 / (end - start) + " calls per second"); + } + { + long start = System.currentTimeMillis(); + byte[] SET_BYTES = "SET".getBytes(); + for (int i = 0; i < CALLS / PIPELINE; i++) { + List list = new ArrayList(); + for (int j = 0; j < PIPELINE; j++) { + list.add(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE)); + } + channel.write(list); + for (int j = 0; j < PIPELINE; j++) { + blockingReadHandler.read(); + } + } + long end = System.currentTimeMillis(); + System.out.println(CALLS * 1000 / (end - start) + " calls per second"); } - long end = System.currentTimeMillis(); - System.out.println(CALLS * 1000 / (end - start) + " calls per second"); channel.close(); cb.releaseExternalResources(); } private RedisClient() { - + } } diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java b/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java index f401cbe18a..825d4cdbfc 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java @@ -17,12 +17,7 @@ package org.jboss.netty.handler.codec.redis; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelDownstreamHandler; +import org.jboss.netty.channel.*; import org.jboss.netty.channel.ChannelHandler.Sharable; import java.util.Queue; @@ -30,8 +25,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; /** * {@link SimpleChannelDownstreamHandler} which encodes {@link Command}'s to {@link ChannelBuffer}'s - * - * */ @Sharable public class RedisEncoder extends SimpleChannelDownstreamHandler { @@ -44,11 +37,11 @@ public class RedisEncoder extends SimpleChannelDownstreamHandler { public RedisEncoder() { this(false); } - + /** - * Create a new {@link RedisEncoder} instance - * - * @param poolBuffers true if the {@link ChannelBuffer}'s should be pooled. This should be used with caution as this + * Create a new {@link RedisEncoder} instance + * + * @param poolBuffers true if the {@link ChannelBuffer}'s should be pooled. This should be used with caution as this * can lead to unnecessary big memory consummation if one of the written values is very big and the rest is very small. */ public RedisEncoder(boolean poolBuffers) { @@ -58,35 +51,65 @@ public class RedisEncoder extends SimpleChannelDownstreamHandler { pool = null; } } - - + + @Override public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception { Object o = e.getMessage(); if (o instanceof Command) { - Command command = (Command) o; - ChannelBuffer cb = null; - if (pool != null) { - cb = pool.poll(); - } - if (cb == null) { - cb = ChannelBuffers.dynamicBuffer(); - } - command.write(cb); + ChannelBuffer cb = getChannelBuffer(); ChannelFuture future = e.getFuture(); - if (pool != null) { - final ChannelBuffer finalCb = cb; - future.addListener(new ChannelFutureListener() { - public void operationComplete(ChannelFuture channelFuture) throws Exception { - finalCb.clear(); - pool.add(finalCb); + Command command = (Command) o; + command.write(cb); + returnToPool(cb, future); + Channels.write(ctx, future, cb); + + } else if (o instanceof Iterable) { + ChannelBuffer cb = getChannelBuffer(); + ChannelFuture future = e.getFuture(); + + // Useful for transactions and database select + for (Object i : (Iterable) o) { + if (i instanceof Command) { + Command command = (Command) i; + command.write(cb); + } else { + if (pool != null) { + cb.clear(); + pool.add(cb); + super.writeRequested(ctx, e); + return; } - }); + } } + returnToPool(cb, future); Channels.write(ctx, future, cb); } else { super.writeRequested(ctx, e); } } + + private void returnToPool(ChannelBuffer cb, ChannelFuture future) { + if (pool != null) { + final ChannelBuffer finalCb = cb; + future.addListener(new ChannelFutureListener() { + public void operationComplete(ChannelFuture channelFuture) throws Exception { + finalCb.clear(); + pool.add(finalCb); + } + }); + } + } + + private ChannelBuffer getChannelBuffer() { + ChannelBuffer cb = null; + if (pool != null) { + cb = pool.poll(); + } + if (cb == null) { + cb = ChannelBuffers.dynamicBuffer(); + } + return cb; + } }