diff --git a/src/main/java/org/jboss/netty/example/redis/RedisClient.java b/src/main/java/org/jboss/netty/example/redis/RedisClient.java index 53c16c147c..fed888cf95 100644 --- a/src/main/java/org/jboss/netty/example/redis/RedisClient.java +++ b/src/main/java/org/jboss/netty/example/redis/RedisClient.java @@ -63,20 +63,21 @@ public final class RedisClient { int CALLS = 1000000; int PIPELINE = 50; requestResponse(blockingReadHandler, channel, CALLS); - pipelinedIndividualRequests(blockingReadHandler, channel, CALLS, PIPELINE); - pipelinedListOfRequests(blockingReadHandler, channel, CALLS, PIPELINE); + pipelinedIndividualRequests(blockingReadHandler, channel, CALLS * 10, PIPELINE); + pipelinedListOfRequests(blockingReadHandler, channel, CALLS * 10, PIPELINE); channel.close(); cb.releaseExternalResources(); } - private static void pipelinedListOfRequests(BlockingReadHandler blockingReadHandler, Channel channel, int CALLS, int PIPELINE) throws IOException, InterruptedException { + private static void pipelinedListOfRequests(BlockingReadHandler blockingReadHandler, Channel channel, long CALLS, int PIPELINE) throws IOException, InterruptedException { long start = System.currentTimeMillis(); byte[] SET_BYTES = "SET".getBytes(); for (int i = 0; i < CALLS / PIPELINE; i++) { List list = new ArrayList(); for (int j = 0; j < PIPELINE; j++) { - list.add(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE)); + int base = i * PIPELINE; + list.add(new Command(SET_BYTES, String.valueOf(base + j).getBytes(), VALUE)); } channel.write(list); for (int j = 0; j < PIPELINE; j++) { @@ -87,12 +88,13 @@ public final class RedisClient { System.out.println(CALLS * 1000 / (end - start) + " calls per second"); } - private static void pipelinedIndividualRequests(BlockingReadHandler blockingReadHandler, Channel channel, int CALLS, int PIPELINE) throws IOException, InterruptedException { + private static void pipelinedIndividualRequests(BlockingReadHandler blockingReadHandler, Channel channel, long CALLS, int PIPELINE) throws IOException, InterruptedException { long start = System.currentTimeMillis(); byte[] SET_BYTES = "SET".getBytes(); for (int i = 0; i < CALLS / PIPELINE; i++) { + int base = i * PIPELINE; for (int j = 0; j < PIPELINE; j++) { - channel.write(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE)); + channel.write(new Command(SET_BYTES, String.valueOf(base + j).getBytes(), VALUE)); } for (int j = 0; j < PIPELINE; j++) { blockingReadHandler.read(); diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java b/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java index 20ed530e0b..acc38dcad7 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java @@ -18,60 +18,34 @@ package org.jboss.netty.handler.codec.redis; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.ChannelHandler.Sharable; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelDownstreamHandler; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; - /** * {@link SimpleChannelDownstreamHandler} which encodes {@link Command}'s to {@link ChannelBuffer}'s */ @Sharable public class RedisEncoder extends SimpleChannelDownstreamHandler { - private final Queue pool; - - /** - * Calls {@link #RedisEncoder(boolean)} with false - */ public RedisEncoder() { - this(false); } - /** - * Create a new {@link RedisEncoder} instance - * - * @param poolBuffers true if the {@link ChannelBuffer}'s should be pooled. This should be used with caution as this - * can lead to unnecessary big memory consummation if one of the written values is very big and the rest is very small. - */ - public RedisEncoder(boolean poolBuffers) { - if (poolBuffers) { - pool = new ConcurrentLinkedQueue(); - } else { - pool = null; - } - } - - @Override public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception { Object o = e.getMessage(); if (o instanceof Command) { - ChannelBuffer cb = getChannelBuffer(); + ChannelBuffer cb = ChannelBuffers.dynamicBuffer(); ChannelFuture future = e.getFuture(); Command command = (Command) o; command.write(cb); - returnToPool(cb, future); Channels.write(ctx, future, cb); } else if (o instanceof Iterable) { - ChannelBuffer cb = getChannelBuffer(); + ChannelBuffer cb = ChannelBuffers.dynamicBuffer(); ChannelFuture future = e.getFuture(); // Useful for transactions and database select @@ -80,41 +54,13 @@ public class RedisEncoder extends SimpleChannelDownstreamHandler { Command command = (Command) i; command.write(cb); } else { - if (pool != null) { - cb.clear(); - pool.add(cb); - super.writeRequested(ctx, e); - return; - } + super.writeRequested(ctx, e); + return; } } - returnToPool(cb, future); Channels.write(ctx, future, cb); } else { super.writeRequested(ctx, e); } } - - private void returnToPool(ChannelBuffer cb, ChannelFuture future) { - if (pool != null) { - final ChannelBuffer finalCb = cb; - future.addListener(new ChannelFutureListener() { - public void operationComplete(ChannelFuture channelFuture) throws Exception { - finalCb.clear(); - pool.add(finalCb); - } - }); - } - } - - private ChannelBuffer getChannelBuffer() { - ChannelBuffer cb = null; - if (pool != null) { - cb = pool.poll(); - } - if (cb == null) { - cb = ChannelBuffers.dynamicBuffer(); - } - return cb; - } }