From 5daf3b10a6c8190f83031bb612607d538b9d068c Mon Sep 17 00:00:00 2001 From: Sam Pullara Date: Wed, 28 Mar 2012 13:03:02 -0700 Subject: [PATCH 1/3] add the ability to send an iterable of commands at a time that will be on the wire together --- .../netty/example/redis/RedisClient.java | 57 ++++++++++--- .../handler/codec/redis/RedisEncoder.java | 83 ++++++++++++------- 2 files changed, 97 insertions(+), 43 deletions(-) diff --git a/src/main/java/org/jboss/netty/example/redis/RedisClient.java b/src/main/java/org/jboss/netty/example/redis/RedisClient.java index 4a098894d6..9518243475 100644 --- a/src/main/java/org/jboss/netty/example/redis/RedisClient.java +++ b/src/main/java/org/jboss/netty/example/redis/RedisClient.java @@ -16,11 +16,7 @@ package org.jboss.netty.example.redis; import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.*; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.handler.codec.redis.Command; import org.jboss.netty.handler.codec.redis.RedisDecoder; @@ -29,6 +25,8 @@ import org.jboss.netty.handler.codec.redis.Reply; import org.jboss.netty.handler.queue.BlockingReadHandler; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -58,20 +56,53 @@ public final class RedisClient { System.out.print(blockingReadHandler.read()); int CALLS = 1000000; - long start = System.currentTimeMillis(); - byte[] SET_BYTES = "SET".getBytes(); - for (int i = 0; i < CALLS; i++) { - channel.write(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE)); - blockingReadHandler.read(); + int PIPELINE = 50; + { + long start = System.currentTimeMillis(); + byte[] SET_BYTES = "SET".getBytes(); + for (int i = 0; i < CALLS; i++) { + channel.write(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE)); + blockingReadHandler.read(); + } + long end = System.currentTimeMillis(); + System.out.println(CALLS * 1000 / (end - start) + " calls per second"); + } + { + long start = System.currentTimeMillis(); + byte[] SET_BYTES = "SET".getBytes(); + for (int i = 0; i < CALLS / PIPELINE; i++) { + for (int j = 0; j < PIPELINE; j++) { + channel.write(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE)); + } + for (int j = 0; j < PIPELINE; j++) { + blockingReadHandler.read(); + } + } + long end = System.currentTimeMillis(); + System.out.println(CALLS * 1000 / (end - start) + " calls per second"); + } + { + long start = System.currentTimeMillis(); + byte[] SET_BYTES = "SET".getBytes(); + for (int i = 0; i < CALLS / PIPELINE; i++) { + List list = new ArrayList(); + for (int j = 0; j < PIPELINE; j++) { + list.add(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE)); + } + channel.write(list); + for (int j = 0; j < PIPELINE; j++) { + blockingReadHandler.read(); + } + } + long end = System.currentTimeMillis(); + System.out.println(CALLS * 1000 / (end - start) + " calls per second"); } - long end = System.currentTimeMillis(); - System.out.println(CALLS * 1000 / (end - start) + " calls per second"); channel.close(); cb.releaseExternalResources(); } private RedisClient() { - + } } diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java b/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java index f401cbe18a..825d4cdbfc 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java @@ -17,12 +17,7 @@ package org.jboss.netty.handler.codec.redis; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelDownstreamHandler; +import org.jboss.netty.channel.*; import org.jboss.netty.channel.ChannelHandler.Sharable; import java.util.Queue; @@ -30,8 +25,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; /** * {@link SimpleChannelDownstreamHandler} which encodes {@link Command}'s to {@link ChannelBuffer}'s - * - * */ @Sharable public class RedisEncoder extends SimpleChannelDownstreamHandler { @@ -44,11 +37,11 @@ public class RedisEncoder extends SimpleChannelDownstreamHandler { public RedisEncoder() { this(false); } - + /** - * Create a new {@link RedisEncoder} instance - * - * @param poolBuffers true if the {@link ChannelBuffer}'s should be pooled. This should be used with caution as this + * Create a new {@link RedisEncoder} instance + * + * @param poolBuffers true if the {@link ChannelBuffer}'s should be pooled. This should be used with caution as this * can lead to unnecessary big memory consummation if one of the written values is very big and the rest is very small. */ public RedisEncoder(boolean poolBuffers) { @@ -58,35 +51,65 @@ public class RedisEncoder extends SimpleChannelDownstreamHandler { pool = null; } } - - + + @Override public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception { Object o = e.getMessage(); if (o instanceof Command) { - Command command = (Command) o; - ChannelBuffer cb = null; - if (pool != null) { - cb = pool.poll(); - } - if (cb == null) { - cb = ChannelBuffers.dynamicBuffer(); - } - command.write(cb); + ChannelBuffer cb = getChannelBuffer(); ChannelFuture future = e.getFuture(); - if (pool != null) { - final ChannelBuffer finalCb = cb; - future.addListener(new ChannelFutureListener() { - public void operationComplete(ChannelFuture channelFuture) throws Exception { - finalCb.clear(); - pool.add(finalCb); + Command command = (Command) o; + command.write(cb); + returnToPool(cb, future); + Channels.write(ctx, future, cb); + + } else if (o instanceof Iterable) { + ChannelBuffer cb = getChannelBuffer(); + ChannelFuture future = e.getFuture(); + + // Useful for transactions and database select + for (Object i : (Iterable) o) { + if (i instanceof Command) { + Command command = (Command) i; + command.write(cb); + } else { + if (pool != null) { + cb.clear(); + pool.add(cb); + super.writeRequested(ctx, e); + return; } - }); + } } + returnToPool(cb, future); Channels.write(ctx, future, cb); } else { super.writeRequested(ctx, e); } } + + private void returnToPool(ChannelBuffer cb, ChannelFuture future) { + if (pool != null) { + final ChannelBuffer finalCb = cb; + future.addListener(new ChannelFutureListener() { + public void operationComplete(ChannelFuture channelFuture) throws Exception { + finalCb.clear(); + pool.add(finalCb); + } + }); + } + } + + private ChannelBuffer getChannelBuffer() { + ChannelBuffer cb = null; + if (pool != null) { + cb = pool.poll(); + } + if (cb == null) { + cb = ChannelBuffers.dynamicBuffer(); + } + return cb; + } } From d0dd93974a7bcc6c906f6f900d3743b77c7d7ad4 Mon Sep 17 00:00:00 2001 From: Sam Pullara Date: Wed, 28 Mar 2012 13:24:28 -0700 Subject: [PATCH 2/3] clean up the style --- .../netty/example/redis/RedisClient.java | 93 +++++++++++-------- .../handler/codec/redis/RedisEncoder.java | 7 +- 2 files changed, 58 insertions(+), 42 deletions(-) diff --git a/src/main/java/org/jboss/netty/example/redis/RedisClient.java b/src/main/java/org/jboss/netty/example/redis/RedisClient.java index 9518243475..53c16c147c 100644 --- a/src/main/java/org/jboss/netty/example/redis/RedisClient.java +++ b/src/main/java/org/jboss/netty/example/redis/RedisClient.java @@ -16,7 +16,11 @@ package org.jboss.netty.example.redis; import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.channel.*; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.handler.codec.redis.Command; import org.jboss.netty.handler.codec.redis.RedisDecoder; @@ -24,6 +28,7 @@ import org.jboss.netty.handler.codec.redis.RedisEncoder; import org.jboss.netty.handler.codec.redis.Reply; import org.jboss.netty.handler.queue.BlockingReadHandler; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; @@ -57,51 +62,57 @@ public final class RedisClient { int CALLS = 1000000; int PIPELINE = 50; - { - long start = System.currentTimeMillis(); - byte[] SET_BYTES = "SET".getBytes(); - for (int i = 0; i < CALLS; i++) { - channel.write(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE)); - blockingReadHandler.read(); - } - long end = System.currentTimeMillis(); - System.out.println(CALLS * 1000 / (end - start) + " calls per second"); - } - { - long start = System.currentTimeMillis(); - byte[] SET_BYTES = "SET".getBytes(); - for (int i = 0; i < CALLS / PIPELINE; i++) { - for (int j = 0; j < PIPELINE; j++) { - channel.write(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE)); - } - for (int j = 0; j < PIPELINE; j++) { - blockingReadHandler.read(); - } - } - long end = System.currentTimeMillis(); - System.out.println(CALLS * 1000 / (end - start) + " calls per second"); - } - { - long start = System.currentTimeMillis(); - byte[] SET_BYTES = "SET".getBytes(); - for (int i = 0; i < CALLS / PIPELINE; i++) { - List list = new ArrayList(); - for (int j = 0; j < PIPELINE; j++) { - list.add(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE)); - } - channel.write(list); - for (int j = 0; j < PIPELINE; j++) { - blockingReadHandler.read(); - } - } - long end = System.currentTimeMillis(); - System.out.println(CALLS * 1000 / (end - start) + " calls per second"); - } + requestResponse(blockingReadHandler, channel, CALLS); + pipelinedIndividualRequests(blockingReadHandler, channel, CALLS, PIPELINE); + pipelinedListOfRequests(blockingReadHandler, channel, CALLS, PIPELINE); channel.close(); cb.releaseExternalResources(); } + private static void pipelinedListOfRequests(BlockingReadHandler blockingReadHandler, Channel channel, int CALLS, int PIPELINE) throws IOException, InterruptedException { + long start = System.currentTimeMillis(); + byte[] SET_BYTES = "SET".getBytes(); + for (int i = 0; i < CALLS / PIPELINE; i++) { + List list = new ArrayList(); + for (int j = 0; j < PIPELINE; j++) { + list.add(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE)); + } + channel.write(list); + for (int j = 0; j < PIPELINE; j++) { + blockingReadHandler.read(); + } + } + long end = System.currentTimeMillis(); + System.out.println(CALLS * 1000 / (end - start) + " calls per second"); + } + + private static void pipelinedIndividualRequests(BlockingReadHandler blockingReadHandler, Channel channel, int CALLS, int PIPELINE) throws IOException, InterruptedException { + long start = System.currentTimeMillis(); + byte[] SET_BYTES = "SET".getBytes(); + for (int i = 0; i < CALLS / PIPELINE; i++) { + for (int j = 0; j < PIPELINE; j++) { + channel.write(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE)); + } + for (int j = 0; j < PIPELINE; j++) { + blockingReadHandler.read(); + } + } + long end = System.currentTimeMillis(); + System.out.println(CALLS * 1000 / (end - start) + " calls per second"); + } + + private static void requestResponse(BlockingReadHandler blockingReadHandler, Channel channel, int CALLS) throws IOException, InterruptedException { + long start = System.currentTimeMillis(); + byte[] SET_BYTES = "SET".getBytes(); + for (int i = 0; i < CALLS; i++) { + channel.write(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE)); + blockingReadHandler.read(); + } + long end = System.currentTimeMillis(); + System.out.println(CALLS * 1000 / (end - start) + " calls per second"); + } + private RedisClient() { } diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java b/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java index 825d4cdbfc..20ed530e0b 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java @@ -17,8 +17,13 @@ package org.jboss.netty.handler.codec.redis; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.*; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.ChannelHandler.Sharable; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelDownstreamHandler; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; From b75933b835862af399bbfca9c883a4b478ce9502 Mon Sep 17 00:00:00 2001 From: Sam Pullara Date: Wed, 28 Mar 2012 16:54:54 -0700 Subject: [PATCH 3/3] remove pooling --- .../netty/example/redis/RedisClient.java | 14 +++-- .../handler/codec/redis/RedisEncoder.java | 62 ++----------------- 2 files changed, 12 insertions(+), 64 deletions(-) diff --git a/src/main/java/org/jboss/netty/example/redis/RedisClient.java b/src/main/java/org/jboss/netty/example/redis/RedisClient.java index 53c16c147c..fed888cf95 100644 --- a/src/main/java/org/jboss/netty/example/redis/RedisClient.java +++ b/src/main/java/org/jboss/netty/example/redis/RedisClient.java @@ -63,20 +63,21 @@ public final class RedisClient { int CALLS = 1000000; int PIPELINE = 50; requestResponse(blockingReadHandler, channel, CALLS); - pipelinedIndividualRequests(blockingReadHandler, channel, CALLS, PIPELINE); - pipelinedListOfRequests(blockingReadHandler, channel, CALLS, PIPELINE); + pipelinedIndividualRequests(blockingReadHandler, channel, CALLS * 10, PIPELINE); + pipelinedListOfRequests(blockingReadHandler, channel, CALLS * 10, PIPELINE); channel.close(); cb.releaseExternalResources(); } - private static void pipelinedListOfRequests(BlockingReadHandler blockingReadHandler, Channel channel, int CALLS, int PIPELINE) throws IOException, InterruptedException { + private static void pipelinedListOfRequests(BlockingReadHandler blockingReadHandler, Channel channel, long CALLS, int PIPELINE) throws IOException, InterruptedException { long start = System.currentTimeMillis(); byte[] SET_BYTES = "SET".getBytes(); for (int i = 0; i < CALLS / PIPELINE; i++) { List list = new ArrayList(); for (int j = 0; j < PIPELINE; j++) { - list.add(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE)); + int base = i * PIPELINE; + list.add(new Command(SET_BYTES, String.valueOf(base + j).getBytes(), VALUE)); } channel.write(list); for (int j = 0; j < PIPELINE; j++) { @@ -87,12 +88,13 @@ public final class RedisClient { System.out.println(CALLS * 1000 / (end - start) + " calls per second"); } - private static void pipelinedIndividualRequests(BlockingReadHandler blockingReadHandler, Channel channel, int CALLS, int PIPELINE) throws IOException, InterruptedException { + private static void pipelinedIndividualRequests(BlockingReadHandler blockingReadHandler, Channel channel, long CALLS, int PIPELINE) throws IOException, InterruptedException { long start = System.currentTimeMillis(); byte[] SET_BYTES = "SET".getBytes(); for (int i = 0; i < CALLS / PIPELINE; i++) { + int base = i * PIPELINE; for (int j = 0; j < PIPELINE; j++) { - channel.write(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE)); + channel.write(new Command(SET_BYTES, String.valueOf(base + j).getBytes(), VALUE)); } for (int j = 0; j < PIPELINE; j++) { blockingReadHandler.read(); diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java b/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java index 20ed530e0b..acc38dcad7 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java @@ -18,60 +18,34 @@ package org.jboss.netty.handler.codec.redis; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.ChannelHandler.Sharable; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelDownstreamHandler; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; - /** * {@link SimpleChannelDownstreamHandler} which encodes {@link Command}'s to {@link ChannelBuffer}'s */ @Sharable public class RedisEncoder extends SimpleChannelDownstreamHandler { - private final Queue pool; - - /** - * Calls {@link #RedisEncoder(boolean)} with false - */ public RedisEncoder() { - this(false); } - /** - * Create a new {@link RedisEncoder} instance - * - * @param poolBuffers true if the {@link ChannelBuffer}'s should be pooled. This should be used with caution as this - * can lead to unnecessary big memory consummation if one of the written values is very big and the rest is very small. - */ - public RedisEncoder(boolean poolBuffers) { - if (poolBuffers) { - pool = new ConcurrentLinkedQueue(); - } else { - pool = null; - } - } - - @Override public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception { Object o = e.getMessage(); if (o instanceof Command) { - ChannelBuffer cb = getChannelBuffer(); + ChannelBuffer cb = ChannelBuffers.dynamicBuffer(); ChannelFuture future = e.getFuture(); Command command = (Command) o; command.write(cb); - returnToPool(cb, future); Channels.write(ctx, future, cb); } else if (o instanceof Iterable) { - ChannelBuffer cb = getChannelBuffer(); + ChannelBuffer cb = ChannelBuffers.dynamicBuffer(); ChannelFuture future = e.getFuture(); // Useful for transactions and database select @@ -80,41 +54,13 @@ public class RedisEncoder extends SimpleChannelDownstreamHandler { Command command = (Command) i; command.write(cb); } else { - if (pool != null) { - cb.clear(); - pool.add(cb); - super.writeRequested(ctx, e); - return; - } + super.writeRequested(ctx, e); + return; } } - returnToPool(cb, future); Channels.write(ctx, future, cb); } else { super.writeRequested(ctx, e); } } - - private void returnToPool(ChannelBuffer cb, ChannelFuture future) { - if (pool != null) { - final ChannelBuffer finalCb = cb; - future.addListener(new ChannelFutureListener() { - public void operationComplete(ChannelFuture channelFuture) throws Exception { - finalCb.clear(); - pool.add(finalCb); - } - }); - } - } - - private ChannelBuffer getChannelBuffer() { - ChannelBuffer cb = null; - if (pool != null) { - cb = pool.poll(); - } - if (cb == null) { - cb = ChannelBuffers.dynamicBuffer(); - } - return cb; - } }