diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java b/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java index ddb8153ef6..9a572b5134 100644 --- a/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/redis/RedisEncoder.java @@ -23,32 +23,62 @@ import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelDownstreamHandler; +import org.jboss.netty.channel.ChannelHandler.Sharable; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +@Sharable public class RedisEncoder extends SimpleChannelDownstreamHandler { - private Queue pool = new ConcurrentLinkedQueue(); + private final Queue pool; + /** + * Calls {@link #RedisEncoder(boolean)} with false + */ + public RedisEncoder() { + this(false); + } + + /** + * Create a new {@link RedisEncoder} instance + * + * @param poolBuffers true if the {@link ChannelBuffer}'s should be pooled. This should be used with caution as this + * can lead to unnecessary big memory consummation if one of the written values is very big and the rest is very small. + */ + public RedisEncoder(boolean poolBuffers) { + if (poolBuffers) { + pool = new ConcurrentLinkedQueue(); + } else { + pool = null; + } + } + + @Override public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception { Object o = e.getMessage(); if (o instanceof Command) { Command command = (Command) o; - ChannelBuffer cb = pool.poll(); + ChannelBuffer cb = null; + if (pool != null) { + cb = pool.poll(); + } if (cb == null) { cb = ChannelBuffers.dynamicBuffer(); } command.write(cb); ChannelFuture future = e.getFuture(); - final ChannelBuffer finalCb = cb; - future.addListener(new ChannelFutureListener() { - public void operationComplete(ChannelFuture channelFuture) throws Exception { - finalCb.clear(); - pool.add(finalCb); - } - }); + + if (pool != null) { + final ChannelBuffer finalCb = cb; + future.addListener(new ChannelFutureListener() { + public void operationComplete(ChannelFuture channelFuture) throws Exception { + finalCb.clear(); + pool.add(finalCb); + } + }); + } Channels.write(ctx, future, cb); } else { super.writeRequested(ctx, e);