remove pooling

This commit is contained in:
Sam Pullara 2012-03-28 16:54:54 -07:00
parent d0dd93974a
commit b75933b835
2 changed files with 12 additions and 64 deletions

View File

@ -63,20 +63,21 @@ public final class RedisClient {
int CALLS = 1000000;
int PIPELINE = 50;
requestResponse(blockingReadHandler, channel, CALLS);
pipelinedIndividualRequests(blockingReadHandler, channel, CALLS, PIPELINE);
pipelinedListOfRequests(blockingReadHandler, channel, CALLS, PIPELINE);
pipelinedIndividualRequests(blockingReadHandler, channel, CALLS * 10, PIPELINE);
pipelinedListOfRequests(blockingReadHandler, channel, CALLS * 10, PIPELINE);
channel.close();
cb.releaseExternalResources();
}
private static void pipelinedListOfRequests(BlockingReadHandler<Reply> blockingReadHandler, Channel channel, int CALLS, int PIPELINE) throws IOException, InterruptedException {
private static void pipelinedListOfRequests(BlockingReadHandler<Reply> blockingReadHandler, Channel channel, long CALLS, int PIPELINE) throws IOException, InterruptedException {
long start = System.currentTimeMillis();
byte[] SET_BYTES = "SET".getBytes();
for (int i = 0; i < CALLS / PIPELINE; i++) {
List<Command> list = new ArrayList<Command>();
for (int j = 0; j < PIPELINE; j++) {
list.add(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE));
int base = i * PIPELINE;
list.add(new Command(SET_BYTES, String.valueOf(base + j).getBytes(), VALUE));
}
channel.write(list);
for (int j = 0; j < PIPELINE; j++) {
@ -87,12 +88,13 @@ public final class RedisClient {
System.out.println(CALLS * 1000 / (end - start) + " calls per second");
}
private static void pipelinedIndividualRequests(BlockingReadHandler<Reply> blockingReadHandler, Channel channel, int CALLS, int PIPELINE) throws IOException, InterruptedException {
private static void pipelinedIndividualRequests(BlockingReadHandler<Reply> blockingReadHandler, Channel channel, long CALLS, int PIPELINE) throws IOException, InterruptedException {
long start = System.currentTimeMillis();
byte[] SET_BYTES = "SET".getBytes();
for (int i = 0; i < CALLS / PIPELINE; i++) {
int base = i * PIPELINE;
for (int j = 0; j < PIPELINE; j++) {
channel.write(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE));
channel.write(new Command(SET_BYTES, String.valueOf(base + j).getBytes(), VALUE));
}
for (int j = 0; j < PIPELINE; j++) {
blockingReadHandler.read();

View File

@ -18,60 +18,34 @@ package org.jboss.netty.handler.codec.redis;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* {@link SimpleChannelDownstreamHandler} which encodes {@link Command}'s to {@link ChannelBuffer}'s
*/
@Sharable
public class RedisEncoder extends SimpleChannelDownstreamHandler {
private final Queue<ChannelBuffer> pool;
/**
* Calls {@link #RedisEncoder(boolean)} with <code>false</code>
*/
public RedisEncoder() {
this(false);
}
/**
* Create a new {@link RedisEncoder} instance
*
* @param poolBuffers <code>true</code> if the {@link ChannelBuffer}'s should be pooled. This should be used with caution as this
* can lead to unnecessary big memory consummation if one of the written values is very big and the rest is very small.
*/
public RedisEncoder(boolean poolBuffers) {
if (poolBuffers) {
pool = new ConcurrentLinkedQueue<ChannelBuffer>();
} else {
pool = null;
}
}
@Override
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Object o = e.getMessage();
if (o instanceof Command) {
ChannelBuffer cb = getChannelBuffer();
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
ChannelFuture future = e.getFuture();
Command command = (Command) o;
command.write(cb);
returnToPool(cb, future);
Channels.write(ctx, future, cb);
} else if (o instanceof Iterable) {
ChannelBuffer cb = getChannelBuffer();
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
ChannelFuture future = e.getFuture();
// Useful for transactions and database select
@ -80,41 +54,13 @@ public class RedisEncoder extends SimpleChannelDownstreamHandler {
Command command = (Command) i;
command.write(cb);
} else {
if (pool != null) {
cb.clear();
pool.add(cb);
super.writeRequested(ctx, e);
return;
}
super.writeRequested(ctx, e);
return;
}
}
returnToPool(cb, future);
Channels.write(ctx, future, cb);
} else {
super.writeRequested(ctx, e);
}
}
private void returnToPool(ChannelBuffer cb, ChannelFuture future) {
if (pool != null) {
final ChannelBuffer finalCb = cb;
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture channelFuture) throws Exception {
finalCb.clear();
pool.add(finalCb);
}
});
}
}
private ChannelBuffer getChannelBuffer() {
ChannelBuffer cb = null;
if (pool != null) {
cb = pool.poll();
}
if (cb == null) {
cb = ChannelBuffers.dynamicBuffer();
}
return cb;
}
}