Make pooling configurable

This commit is contained in:
norman 2012-03-12 07:22:48 +01:00
parent 26513bb4f8
commit 05883f5523

View File

@ -23,32 +23,62 @@ import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@Sharable
public class RedisEncoder extends SimpleChannelDownstreamHandler {
private Queue<ChannelBuffer> pool = new ConcurrentLinkedQueue<ChannelBuffer>();
private final Queue<ChannelBuffer> pool;
/**
* Calls {@link #RedisEncoder(boolean)} with <code>false</code>
*/
public RedisEncoder() {
this(false);
}
/**
* Create a new {@link RedisEncoder} instance
*
* @param poolBuffers <code>true</code> if the {@link ChannelBuffer}'s should be pooled. This should be used with caution as this
* can lead to unnecessary big memory consummation if one of the written values is very big and the rest is very small.
*/
public RedisEncoder(boolean poolBuffers) {
if (poolBuffers) {
pool = new ConcurrentLinkedQueue<ChannelBuffer>();
} else {
pool = null;
}
}
@Override
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Object o = e.getMessage();
if (o instanceof Command) {
Command command = (Command) o;
ChannelBuffer cb = pool.poll();
ChannelBuffer cb = null;
if (pool != null) {
cb = pool.poll();
}
if (cb == null) {
cb = ChannelBuffers.dynamicBuffer();
}
command.write(cb);
ChannelFuture future = e.getFuture();
final ChannelBuffer finalCb = cb;
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture channelFuture) throws Exception {
finalCb.clear();
pool.add(finalCb);
}
});
if (pool != null) {
final ChannelBuffer finalCb = cb;
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture channelFuture) throws Exception {
finalCb.clear();
pool.add(finalCb);
}
});
}
Channels.write(ctx, future, cb);
} else {
super.writeRequested(ctx, e);