From a065b1cee2c882458fdf7a6a5709d8516c583833 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Fri, 30 Mar 2012 17:03:02 +0900 Subject: [PATCH] Issue #242: Add the ability to send many commands with a single call Also: * Code cleanup * Hide internal constants from a user --- .../io/netty/handler/codec/redis/Command.java | 10 +-- .../handler/codec/redis/RedisDecoder.java | 5 -- .../handler/codec/redis/RedisEncoder.java | 67 ++++++------------- 3 files changed, 24 insertions(+), 58 deletions(-) diff --git a/codec/src/main/java/io/netty/handler/codec/redis/Command.java b/codec/src/main/java/io/netty/handler/codec/redis/Command.java index 3d637e47aa..a0bf17d5ab 100644 --- a/codec/src/main/java/io/netty/handler/codec/redis/Command.java +++ b/codec/src/main/java/io/netty/handler/codec/redis/Command.java @@ -24,11 +24,11 @@ import java.io.IOException; * Command serialization. */ public class Command { - public static final byte[] ARGS_PREFIX = "*".getBytes(); - public static final byte[] CRLF = "\r\n".getBytes(); - public static final byte[] BYTES_PREFIX = "$".getBytes(); - public static final byte[] EMPTY_BYTES = new byte[0]; - public static final byte[] NEG_ONE_AND_CRLF = convertWithCRLF(-1); + static final byte[] ARGS_PREFIX = "*".getBytes(); + static final byte[] CRLF = "\r\n".getBytes(); + static final byte[] BYTES_PREFIX = "$".getBytes(); + static final byte[] EMPTY_BYTES = new byte[0]; + static final byte[] NEG_ONE_AND_CRLF = convertWithCRLF(-1); private byte[][] arguments; private Object[] objects; diff --git a/codec/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java b/codec/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java index 96fa44ad6d..9bca7d6afc 100644 --- a/codec/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java @@ -44,7 +44,6 @@ public class RedisDecoder extends ReplayingDecoder { * via the {@link #readInteger(ChannelBuffer)} method * * @param is the {@link ChannelBuffer} to read from - * @return content * @throws IOException is thrown if the line-ending is not CRLF */ public static byte[] readBytes(ChannelBuffer is) throws IOException { @@ -64,10 +63,6 @@ public class RedisDecoder extends ReplayingDecoder { /** * Read an {@link Integer} from the {@link ChannelBuffer} - * - * @param is - * @return integer - * @throws IOException */ public static int readInteger(ChannelBuffer is) throws IOException { int size = 0; diff --git a/codec/src/main/java/io/netty/handler/codec/redis/RedisEncoder.java b/codec/src/main/java/io/netty/handler/codec/redis/RedisEncoder.java index f7f4710ab8..2adaffca3e 100644 --- a/codec/src/main/java/io/netty/handler/codec/redis/RedisEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/redis/RedisEncoder.java @@ -18,71 +18,42 @@ package io.netty.handler.codec.redis; import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffers; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.Channels; import io.netty.channel.MessageEvent; import io.netty.channel.SimpleChannelDownstreamHandler; -import io.netty.channel.ChannelHandler.Sharable; - -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; /** * {@link SimpleChannelDownstreamHandler} which encodes {@link Command}'s to {@link ChannelBuffer}'s - * - * */ @Sharable public class RedisEncoder extends SimpleChannelDownstreamHandler { - private final Queue pool; - - /** - * Calls {@link #RedisEncoder(boolean)} with false - */ - public RedisEncoder() { - this(false); - } - - /** - * Create a new {@link RedisEncoder} instance - * - * @param poolBuffers true if the {@link ChannelBuffer}'s should be pooled. This should be used with caution as this - * can lead to unnecessary big memory consummation if one of the written values is very big and the rest is very small. - */ - public RedisEncoder(boolean poolBuffers) { - if (poolBuffers) { - pool = new ConcurrentLinkedQueue(); - } else { - pool = null; - } - } - - @Override public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception { Object o = e.getMessage(); if (o instanceof Command) { - Command command = (Command) o; - ChannelBuffer cb = null; - if (pool != null) { - cb = pool.poll(); - } - if (cb == null) { - cb = ChannelBuffers.dynamicBuffer(); - } - command.write(cb); + ChannelBuffer cb = ChannelBuffers.dynamicBuffer(); ChannelFuture future = e.getFuture(); - if (pool != null) { - final ChannelBuffer finalCb = cb; - future.addListener(new ChannelFutureListener() { - public void operationComplete(ChannelFuture channelFuture) throws Exception { - finalCb.clear(); - pool.add(finalCb); - } - }); + Command command = (Command) o; + command.write(cb); + Channels.write(ctx, future, cb); + + } else if (o instanceof Iterable) { + ChannelBuffer cb = ChannelBuffers.dynamicBuffer(); + ChannelFuture future = e.getFuture(); + + // Useful for transactions and database select + for (Object i : (Iterable) o) { + if (i instanceof Command) { + Command command = (Command) i; + command.write(cb); + } else { + super.writeRequested(ctx, e); + return; + } } Channels.write(ctx, future, cb); } else {