Overall Redis codec cleanup (more to come)
- Prefer ChannelBuffer over byte[] - Hide the methods that users are not usually interested in - Rename RedisEncoder/Decoder to RedisCommandEncoder/ReplyDecoder - Add getter methods to Replies - Rename getX() to X() because I like it for read-only properties
This commit is contained in:
parent
c98ab997c5
commit
3e1a055f36
|
@ -23,8 +23,8 @@ import org.jboss.netty.channel.ChannelPipelineFactory;
|
|||
import org.jboss.netty.channel.Channels;
|
||||
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
|
||||
import org.jboss.netty.handler.codec.redis.Command;
|
||||
import org.jboss.netty.handler.codec.redis.RedisDecoder;
|
||||
import org.jboss.netty.handler.codec.redis.RedisEncoder;
|
||||
import org.jboss.netty.handler.codec.redis.RedisCommandEncoder;
|
||||
import org.jboss.netty.handler.codec.redis.RedisReplyDecoder;
|
||||
import org.jboss.netty.handler.codec.redis.Reply;
|
||||
import org.jboss.netty.handler.queue.BlockingReadHandler;
|
||||
|
||||
|
@ -45,8 +45,8 @@ public final class RedisClient {
|
|||
cb.setPipelineFactory(new ChannelPipelineFactory() {
|
||||
public ChannelPipeline getPipeline() throws Exception {
|
||||
ChannelPipeline pipeline = Channels.pipeline();
|
||||
pipeline.addLast("redisEncoder", new RedisEncoder());
|
||||
pipeline.addLast("redisDecoder", new RedisDecoder());
|
||||
pipeline.addLast("redisEncoder", new RedisCommandEncoder());
|
||||
pipeline.addLast("redisDecoder", new RedisReplyDecoder());
|
||||
pipeline.addLast("result", blockingReadHandler);
|
||||
return pipeline;
|
||||
}
|
||||
|
|
|
@ -16,26 +16,34 @@
|
|||
package org.jboss.netty.handler.codec.redis;
|
||||
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
|
||||
public class BulkReply extends Reply {
|
||||
public static final char MARKER = '$';
|
||||
public final byte[] bytes;
|
||||
static final char MARKER = '$';
|
||||
|
||||
public BulkReply(byte[] bytes) {
|
||||
this.bytes = bytes;
|
||||
private final ChannelBuffer data;
|
||||
|
||||
public BulkReply(byte[] data) {
|
||||
this(data == null? null : ChannelBuffers.wrappedBuffer(data));
|
||||
}
|
||||
|
||||
public BulkReply(ChannelBuffer data) {
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
public ChannelBuffer data() {
|
||||
return data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ChannelBuffer os) throws IOException {
|
||||
os.writeByte(MARKER);
|
||||
if (bytes == null) {
|
||||
os.writeBytes(Command.NEG_ONE_AND_CRLF);
|
||||
void write(ChannelBuffer out) {
|
||||
out.writeByte(MARKER);
|
||||
if (data == null) {
|
||||
out.writeBytes(Command.NEG_ONE_AND_CRLF);
|
||||
} else {
|
||||
os.writeBytes(Command.numAndCRLF(bytes.length));
|
||||
os.writeBytes(bytes);
|
||||
os.writeBytes(Command.CRLF);
|
||||
out.writeBytes(Command.numAndCRLF(data.readableBytes()));
|
||||
out.writeBytes(data, data.readerIndex(), data.readableBytes());
|
||||
out.writeBytes(Command.CRLF);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,10 +16,9 @@
|
|||
package org.jboss.netty.handler.codec.redis;
|
||||
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
import org.jboss.netty.util.CharsetUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Command serialization.
|
||||
*/
|
||||
|
@ -30,23 +29,27 @@ public class Command {
|
|||
static final byte[] EMPTY_BYTES = new byte[0];
|
||||
static final byte[] NEG_ONE_AND_CRLF = convertWithCRLF(-1);
|
||||
|
||||
private byte[][] arguments;
|
||||
private Object[] objects;
|
||||
private ChannelBuffer[] arguments;
|
||||
private final Object[] objects;
|
||||
|
||||
public String getName() {
|
||||
public Command(byte[]... arguments) {
|
||||
if (arguments == null) {
|
||||
Object o = objects[0];
|
||||
if (o instanceof byte[]) {
|
||||
return new String((byte[]) o);
|
||||
} else {
|
||||
return o.toString();
|
||||
}
|
||||
this.arguments = null;
|
||||
objects = null;
|
||||
} else {
|
||||
return new String(arguments[0]);
|
||||
this.arguments = new ChannelBuffer[arguments.length];
|
||||
for (int i = 0; i < arguments.length; i ++) {
|
||||
byte[] a = arguments[i];
|
||||
if (a == null) {
|
||||
continue;
|
||||
}
|
||||
this.arguments[i] = ChannelBuffers.wrappedBuffer(a);
|
||||
}
|
||||
objects = this.arguments;
|
||||
}
|
||||
}
|
||||
|
||||
public Command(byte[]... arguments) {
|
||||
public Command(ChannelBuffer[] arguments) {
|
||||
this.arguments = arguments;
|
||||
objects = arguments;
|
||||
}
|
||||
|
@ -55,79 +58,97 @@ public class Command {
|
|||
this.objects = objects;
|
||||
}
|
||||
|
||||
public void write(ChannelBuffer os) throws IOException {
|
||||
writeDirect(os, objects);
|
||||
public String name() {
|
||||
if (arguments == null) {
|
||||
Object o = objects[0];
|
||||
if (o instanceof ChannelBuffer) {
|
||||
return ((ChannelBuffer) o).toString(CharsetUtil.UTF_8);
|
||||
}
|
||||
if (o == null) {
|
||||
return null;
|
||||
}
|
||||
return o.toString();
|
||||
}
|
||||
|
||||
ChannelBuffer name = arguments[0];
|
||||
if (name == null) {
|
||||
return null;
|
||||
}
|
||||
return name.toString(CharsetUtil.UTF_8);
|
||||
}
|
||||
|
||||
public static void writeDirect(ChannelBuffer os, Object... objects) throws IOException {
|
||||
void write(ChannelBuffer out) {
|
||||
writeDirect(out, objects);
|
||||
}
|
||||
|
||||
private static void writeDirect(ChannelBuffer out, Object... objects) {
|
||||
int length = objects.length;
|
||||
byte[][] arguments = new byte[length][];
|
||||
ChannelBuffer[] arguments = new ChannelBuffer[length];
|
||||
for (int i = 0; i < length; i++) {
|
||||
Object object = objects[i];
|
||||
if (object == null) {
|
||||
arguments[i] = EMPTY_BYTES;
|
||||
} else if (object instanceof byte[]) {
|
||||
arguments[i] = (byte[]) object;
|
||||
arguments[i] = ChannelBuffers.EMPTY_BUFFER;
|
||||
} else if (object instanceof ChannelBuffer) {
|
||||
arguments[i] = (ChannelBuffer) object;
|
||||
} else {
|
||||
arguments[i] = object.toString().getBytes(CharsetUtil.UTF_8);
|
||||
arguments[i] = ChannelBuffers.copiedBuffer(object.toString(), CharsetUtil.UTF_8);
|
||||
}
|
||||
}
|
||||
writeDirect(os, arguments);
|
||||
writeDirect(out, arguments);
|
||||
}
|
||||
|
||||
private static void writeDirect(ChannelBuffer os, byte[][] arguments) throws IOException {
|
||||
os.writeBytes(ARGS_PREFIX);
|
||||
os.writeBytes(numAndCRLF(arguments.length));
|
||||
for (byte[] argument : arguments) {
|
||||
os.writeBytes(BYTES_PREFIX);
|
||||
os.writeBytes(numAndCRLF(argument.length));
|
||||
os.writeBytes(argument);
|
||||
os.writeBytes(CRLF);
|
||||
private static void writeDirect(ChannelBuffer out, ChannelBuffer[] arguments) {
|
||||
out.writeBytes(ARGS_PREFIX);
|
||||
out.writeBytes(numAndCRLF(arguments.length));
|
||||
for (ChannelBuffer argument : arguments) {
|
||||
out.writeBytes(BYTES_PREFIX);
|
||||
out.writeBytes(numAndCRLF(argument.readableBytes()));
|
||||
out.writeBytes(argument, argument.readerIndex(), argument.readableBytes());
|
||||
out.writeBytes(CRLF);
|
||||
}
|
||||
}
|
||||
|
||||
private static final int NUM_MAP_LENGTH = 256;
|
||||
private static byte[][] numAndCRLFMap = new byte[NUM_MAP_LENGTH][];
|
||||
private static final byte[][] numAndCRLFMap = new byte[NUM_MAP_LENGTH][];
|
||||
static {
|
||||
for (int i = 0; i < NUM_MAP_LENGTH; i++) {
|
||||
numAndCRLFMap[i] = convertWithCRLF(i);
|
||||
}
|
||||
for (int i = 0; i < NUM_MAP_LENGTH; i++) {
|
||||
numAndCRLFMap[i] = convertWithCRLF(i);
|
||||
}
|
||||
}
|
||||
|
||||
// Optimized for the direct to ASCII bytes case
|
||||
// Could be even more optimized but it is already
|
||||
// about twice as fast as using Long.toString().getBytes()
|
||||
public static byte[] numAndCRLF(long value) {
|
||||
if (value >= 0 && value < NUM_MAP_LENGTH) {
|
||||
return numAndCRLFMap[(int) value];
|
||||
} else if (value == -1) {
|
||||
return NEG_ONE_AND_CRLF;
|
||||
}
|
||||
return convertWithCRLF(value);
|
||||
static byte[] numAndCRLF(long value) {
|
||||
if (value >= 0 && value < NUM_MAP_LENGTH) {
|
||||
return numAndCRLFMap[(int) value];
|
||||
} else if (value == -1) {
|
||||
return NEG_ONE_AND_CRLF;
|
||||
}
|
||||
return convertWithCRLF(value);
|
||||
}
|
||||
|
||||
private static byte[] convertWithCRLF(long value) {
|
||||
boolean negative = value < 0;
|
||||
int index = negative ? 2 : 1;
|
||||
long current = negative ? -value : value;
|
||||
while ((current /= 10) > 0) {
|
||||
index++;
|
||||
}
|
||||
byte[] bytes = new byte[index + 2];
|
||||
if (negative) {
|
||||
bytes[0] = '-';
|
||||
}
|
||||
current = negative ? -value : value;
|
||||
long tmp = current;
|
||||
while ((tmp /= 10) > 0) {
|
||||
bytes[--index] = (byte) ('0' + (current % 10));
|
||||
current = tmp;
|
||||
}
|
||||
bytes[--index] = (byte) ('0' + current);
|
||||
// add CRLF
|
||||
bytes[bytes.length - 2] = '\r';
|
||||
bytes[bytes.length - 1] = '\n';
|
||||
return bytes;
|
||||
boolean negative = value < 0;
|
||||
int index = negative ? 2 : 1;
|
||||
long current = negative ? -value : value;
|
||||
while ((current /= 10) > 0) {
|
||||
index++;
|
||||
}
|
||||
byte[] bytes = new byte[index + 2];
|
||||
if (negative) {
|
||||
bytes[0] = '-';
|
||||
}
|
||||
current = negative ? -value : value;
|
||||
long tmp = current;
|
||||
while ((tmp /= 10) > 0) {
|
||||
bytes[--index] = (byte) ('0' + current % 10);
|
||||
current = tmp;
|
||||
}
|
||||
bytes[--index] = (byte) ('0' + current);
|
||||
// add CRLF
|
||||
bytes[bytes.length - 2] = '\r';
|
||||
bytes[bytes.length - 1] = '\n';
|
||||
return bytes;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -16,8 +16,7 @@
|
|||
package org.jboss.netty.handler.codec.redis;
|
||||
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
|
||||
/**
|
||||
* {@link Reply} which will be returned if an error was detected
|
||||
|
@ -25,19 +24,28 @@ import java.io.IOException;
|
|||
*
|
||||
*/
|
||||
public class ErrorReply extends Reply {
|
||||
public static final char MARKER = '-';
|
||||
static final char MARKER = '-';
|
||||
private static final byte[] ERR = "ERR ".getBytes();
|
||||
public final ChannelBuffer error;
|
||||
|
||||
public ErrorReply(ChannelBuffer error) {
|
||||
this.error = error;
|
||||
private final ChannelBuffer data;
|
||||
|
||||
public ErrorReply(byte[] data) {
|
||||
this(data == null? null : ChannelBuffers.wrappedBuffer(data));
|
||||
}
|
||||
|
||||
public ErrorReply(ChannelBuffer data) {
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
public ChannelBuffer data() {
|
||||
return data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ChannelBuffer os) throws IOException {
|
||||
os.writeByte(MARKER);
|
||||
os.writeBytes(ERR);
|
||||
os.writeBytes(error, 0, error.readableBytes());
|
||||
os.writeBytes(Command.CRLF);
|
||||
public void write(ChannelBuffer out) {
|
||||
out.writeByte(MARKER);
|
||||
out.writeBytes(ERR);
|
||||
out.writeBytes(data, 0, data.readableBytes());
|
||||
out.writeBytes(Command.CRLF);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,23 +17,26 @@ package org.jboss.netty.handler.codec.redis;
|
|||
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* {@link Reply} which will get returned if a {@link Integer} was requested via <code>GET</code>
|
||||
*
|
||||
*/
|
||||
public class IntegerReply extends Reply {
|
||||
public static final char MARKER = ':';
|
||||
public final long integer;
|
||||
static final char MARKER = ':';
|
||||
|
||||
public IntegerReply(long integer) {
|
||||
this.integer = integer;
|
||||
private final long value;
|
||||
|
||||
public IntegerReply(long value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public long value() {
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ChannelBuffer os) throws IOException {
|
||||
os.writeByte(MARKER);
|
||||
os.writeBytes(Command.numAndCRLF(integer));
|
||||
void write(ChannelBuffer out) {
|
||||
out.writeByte(MARKER);
|
||||
out.writeBytes(Command.numAndCRLF(value));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,82 +16,107 @@
|
|||
package org.jboss.netty.handler.codec.redis;
|
||||
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
|
||||
|
||||
/**
|
||||
* {@link Reply} which contains a bulk of {@link Reply}'s
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class MultiBulkReply extends Reply {
|
||||
public static final char MARKER = '*';
|
||||
static final char MARKER = '*';
|
||||
|
||||
// State
|
||||
public Object[] byteArrays;
|
||||
private Object[] values;
|
||||
private int size;
|
||||
private int num;
|
||||
|
||||
/**
|
||||
* Creates a new instance with empty values.
|
||||
*/
|
||||
public MultiBulkReply() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance with the specified values.
|
||||
*
|
||||
* @param values an array whose elements are either {@link ChannelBuffer} or {@link Number}.
|
||||
*/
|
||||
public MultiBulkReply(Object... values) {
|
||||
this.byteArrays = values;
|
||||
if (values != null) {
|
||||
for (Object v: values) {
|
||||
if (v == null) {
|
||||
continue;
|
||||
}
|
||||
if (!(v instanceof ChannelBuffer || v instanceof Number)) {
|
||||
throw new IllegalArgumentException(
|
||||
"values contains an element whose type is neither " +
|
||||
ChannelBuffer.class.getSimpleName() + " nor " + Number.class.getSimpleName() + ": " +
|
||||
v.getClass().getName());
|
||||
}
|
||||
}
|
||||
this.values = values;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an array whose elements are either {@link ChannelBuffer} or {@link Number}.
|
||||
*/
|
||||
public Object[] values() {
|
||||
return values;
|
||||
}
|
||||
|
||||
public void read(RedisDecoder rd, ChannelBuffer is) throws IOException {
|
||||
void read(RedisReplyDecoder decoder, ChannelBuffer in) throws Exception {
|
||||
// If we attempted to read the size before, skip the '*' and reread it
|
||||
if (size == -1) {
|
||||
byte star = is.readByte();
|
||||
byte star = in.readByte();
|
||||
if (star == MARKER) {
|
||||
size = 0;
|
||||
} else {
|
||||
throw new AssertionError("Unexpected character in stream: " + star);
|
||||
throw new CorruptedFrameException("Unexpected character in stream: " + star);
|
||||
}
|
||||
}
|
||||
if (size == 0) {
|
||||
// If the read fails, we need to skip the star
|
||||
size = -1;
|
||||
// Read the size, if this is successful we won't read the star again
|
||||
size = RedisDecoder.readInteger(is);
|
||||
byteArrays = new Object[size];
|
||||
rd.checkpoint();
|
||||
size = RedisReplyDecoder.readInteger(in);
|
||||
values = new Object[size];
|
||||
decoder.checkpoint();
|
||||
}
|
||||
for (int i = num; i < size; i++) {
|
||||
int read = is.readByte();
|
||||
int read = in.readByte();
|
||||
if (read == BulkReply.MARKER) {
|
||||
byteArrays[i] = RedisDecoder.readBytes(is);
|
||||
values[i] = RedisReplyDecoder.readBytes(in);
|
||||
} else if (read == IntegerReply.MARKER) {
|
||||
byteArrays[i] = RedisDecoder.readInteger(is);
|
||||
values[i] = RedisReplyDecoder.readInteger(in);
|
||||
} else {
|
||||
throw new IOException("Unexpected character in stream: " + read);
|
||||
throw new CorruptedFrameException("Unexpected character in stream: " + read);
|
||||
}
|
||||
num = i + 1;
|
||||
rd.checkpoint();
|
||||
decoder.checkpoint();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ChannelBuffer os) throws IOException {
|
||||
os.writeByte(MARKER);
|
||||
if (byteArrays == null) {
|
||||
os.writeBytes(Command.NEG_ONE_AND_CRLF);
|
||||
void write(ChannelBuffer out) {
|
||||
out.writeByte(MARKER);
|
||||
if (values == null) {
|
||||
out.writeBytes(Command.NEG_ONE_AND_CRLF);
|
||||
} else {
|
||||
os.writeBytes(Command.numAndCRLF(byteArrays.length));
|
||||
for (Object value : byteArrays) {
|
||||
out.writeBytes(Command.numAndCRLF(values.length));
|
||||
for (Object value : values) {
|
||||
if (value == null) {
|
||||
os.writeByte(BulkReply.MARKER);
|
||||
os.writeBytes(Command.NEG_ONE_AND_CRLF);
|
||||
} else if (value instanceof byte[]) {
|
||||
byte[] bytes = (byte[]) value;
|
||||
os.writeByte(BulkReply.MARKER);
|
||||
int length = bytes.length;
|
||||
os.writeBytes(Command.numAndCRLF(length));
|
||||
os.writeBytes(bytes);
|
||||
os.writeBytes(Command.CRLF);
|
||||
out.writeByte(BulkReply.MARKER);
|
||||
out.writeBytes(Command.NEG_ONE_AND_CRLF);
|
||||
} else if (value instanceof ChannelBuffer) {
|
||||
ChannelBuffer bytes = (ChannelBuffer) value;
|
||||
out.writeByte(BulkReply.MARKER);
|
||||
int length = bytes.readableBytes();
|
||||
out.writeBytes(Command.numAndCRLF(length));
|
||||
out.writeBytes(bytes, bytes.readerIndex(), length);
|
||||
out.writeBytes(Command.CRLF);
|
||||
} else if (value instanceof Number) {
|
||||
os.writeByte(IntegerReply.MARKER);
|
||||
os.writeBytes(Command.numAndCRLF(((Number) value).longValue()));
|
||||
out.writeByte(IntegerReply.MARKER);
|
||||
out.writeBytes(Command.numAndCRLF(((Number) value).longValue()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,10 +15,15 @@
|
|||
*/
|
||||
package org.jboss.netty.handler.codec.redis;
|
||||
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
public class PSubscribeReply extends SubscribeReply {
|
||||
|
||||
public PSubscribeReply(byte[][] patterns) {
|
||||
super(patterns);
|
||||
}
|
||||
|
||||
public PSubscribeReply(ChannelBuffer[] patterns) {
|
||||
super(patterns);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,16 +17,13 @@ package org.jboss.netty.handler.codec.redis;
|
|||
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class PUnsubscribeReply extends UnsubscribeReply {
|
||||
|
||||
public PUnsubscribeReply(byte[][] patterns) {
|
||||
super(patterns);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ChannelBuffer os) throws IOException {
|
||||
// Do nothing
|
||||
public PUnsubscribeReply(ChannelBuffer[] patterns) {
|
||||
super(patterns);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
|
|||
* {@link SimpleChannelDownstreamHandler} which encodes {@link Command}'s to {@link ChannelBuffer}'s
|
||||
*/
|
||||
@Sharable
|
||||
public class RedisEncoder extends SimpleChannelDownstreamHandler {
|
||||
public class RedisCommandEncoder extends SimpleChannelDownstreamHandler {
|
||||
|
||||
@Override
|
||||
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
|
|
@ -17,18 +17,19 @@ package org.jboss.netty.handler.codec.redis;
|
|||
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBufferIndexFinder;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
|
||||
import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
|
||||
import org.jboss.netty.handler.codec.replay.VoidEnum;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* {@link ReplayingDecoder} which handles Redis protocol
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class RedisDecoder extends ReplayingDecoder<State> {
|
||||
public class RedisReplyDecoder extends ReplayingDecoder<VoidEnum> {
|
||||
|
||||
private static final char CR = '\r';
|
||||
private static final char LF = '\n';
|
||||
|
@ -45,41 +46,38 @@ public class RedisDecoder extends ReplayingDecoder<State> {
|
|||
*
|
||||
* @param is the {@link ChannelBuffer} to read from
|
||||
* @return content
|
||||
* @throws IOException is thrown if the line-ending is not CRLF
|
||||
* @throws CorruptedFrameException if the line-ending is not CRLF
|
||||
*/
|
||||
public static byte[] readBytes(ChannelBuffer is) throws IOException {
|
||||
static ChannelBuffer readBytes(ChannelBuffer is) throws Exception {
|
||||
int size = readInteger(is);
|
||||
if (size == -1) {
|
||||
return null;
|
||||
}
|
||||
byte[] bytes = new byte[size];
|
||||
is.readBytes(bytes, 0, size);
|
||||
|
||||
ChannelBuffer bytes = ChannelBuffers.buffer(size);
|
||||
is.readBytes(bytes);
|
||||
int cr = is.readByte();
|
||||
int lf = is.readByte();
|
||||
if (cr != CR || lf != LF) {
|
||||
throw new IOException("Improper line ending: " + cr + ", " + lf);
|
||||
throw new CorruptedFrameException("Improper line ending: " + cr + ", " + lf);
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read an {@link Integer} from the {@link ChannelBuffer}
|
||||
*
|
||||
* @param is
|
||||
* @return integer
|
||||
* @throws IOException
|
||||
*/
|
||||
public static int readInteger(ChannelBuffer is) throws IOException {
|
||||
static int readInteger(ChannelBuffer in) throws Exception {
|
||||
int size = 0;
|
||||
int sign = 1;
|
||||
int read = is.readByte();
|
||||
int read = in.readByte();
|
||||
if (read == '-') {
|
||||
read = is.readByte();
|
||||
read = in.readByte();
|
||||
sign = -1;
|
||||
}
|
||||
do {
|
||||
if (read == CR) {
|
||||
if (is.readByte() == LF) {
|
||||
if (in.readByte() == LF) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -88,20 +86,20 @@ public class RedisDecoder extends ReplayingDecoder<State> {
|
|||
size *= 10;
|
||||
size += value;
|
||||
} else {
|
||||
throw new IOException("Invalid character in integer");
|
||||
throw new CorruptedFrameException("Invalid character in integer");
|
||||
}
|
||||
read = is.readByte();
|
||||
read = in.readByte();
|
||||
} while (true);
|
||||
return size * sign;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkpoint() {
|
||||
protected void checkpoint() {
|
||||
super.checkpoint();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object decode(ChannelHandlerContext channelHandlerContext, Channel channel, ChannelBuffer channelBuffer, State anEnum) throws Exception {
|
||||
protected Object decode(ChannelHandlerContext channelHandlerContext, Channel channel, ChannelBuffer channelBuffer, VoidEnum anEnum) throws Exception {
|
||||
if (reply != null) {
|
||||
reply.read(this, channelBuffer);
|
||||
Reply ret = reply;
|
||||
|
@ -135,7 +133,7 @@ public class RedisDecoder extends ReplayingDecoder<State> {
|
|||
}
|
||||
}
|
||||
|
||||
private MultiBulkReply decodeMultiBulkReply(ChannelBuffer is) throws IOException {
|
||||
private MultiBulkReply decodeMultiBulkReply(ChannelBuffer is) throws Exception {
|
||||
if (reply == null) {
|
||||
reply = new MultiBulkReply();
|
||||
}
|
||||
|
@ -143,7 +141,3 @@ public class RedisDecoder extends ReplayingDecoder<State> {
|
|||
return reply;
|
||||
}
|
||||
}
|
||||
|
||||
enum State {
|
||||
|
||||
}
|
|
@ -19,8 +19,6 @@ import org.jboss.netty.buffer.ChannelBuffer;
|
|||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
import org.jboss.netty.util.CharsetUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* {@link Reply} which was sent as a Response to the written {@link Command}
|
||||
* *
|
||||
|
@ -30,17 +28,12 @@ public abstract class Reply {
|
|||
/**
|
||||
* Write the content of the {@link Reply} to the given {@link ChannelBuffer}
|
||||
*/
|
||||
public abstract void write(ChannelBuffer os) throws IOException;
|
||||
abstract void write(ChannelBuffer out);
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer();
|
||||
try {
|
||||
write(channelBuffer);
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError("Trustin says this won't happen either");
|
||||
}
|
||||
write(channelBuffer);
|
||||
return channelBuffer.toString(CharsetUtil.UTF_8);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -16,25 +16,32 @@
|
|||
package org.jboss.netty.handler.codec.redis;
|
||||
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
|
||||
/**
|
||||
* {@link Reply} which contains the status
|
||||
*
|
||||
*/
|
||||
public class StatusReply extends Reply {
|
||||
public static final char MARKER = '+';
|
||||
public final ChannelBuffer status;
|
||||
static final char MARKER = '+';
|
||||
private final ChannelBuffer data;
|
||||
|
||||
public StatusReply(ChannelBuffer status) {
|
||||
this.status = status;
|
||||
public StatusReply(byte[] data) {
|
||||
this(data == null? null : ChannelBuffers.wrappedBuffer(data));
|
||||
}
|
||||
|
||||
public StatusReply(ChannelBuffer data) {
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
public ChannelBuffer data() {
|
||||
return data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ChannelBuffer os) throws IOException {
|
||||
os.writeByte(MARKER);
|
||||
os.writeBytes(status, 0, status.readableBytes());
|
||||
os.writeBytes(Command.CRLF);
|
||||
void write(ChannelBuffer out) {
|
||||
out.writeByte(MARKER);
|
||||
out.writeBytes(data, data.readerIndex(), data.readableBytes());
|
||||
out.writeBytes(Command.CRLF);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,23 +16,33 @@
|
|||
package org.jboss.netty.handler.codec.redis;
|
||||
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
|
||||
public class SubscribeReply extends Reply {
|
||||
|
||||
private final byte[][] patterns;
|
||||
private final ChannelBuffer[] patterns;
|
||||
|
||||
public SubscribeReply(byte[][] patterns) {
|
||||
this.patterns = new ChannelBuffer[patterns.length];
|
||||
for (int i = 0; i < patterns.length; i ++) {
|
||||
byte[] p = patterns[i];
|
||||
if (p == null) {
|
||||
continue;
|
||||
}
|
||||
this.patterns[i] = ChannelBuffers.wrappedBuffer(p);
|
||||
}
|
||||
}
|
||||
|
||||
public SubscribeReply(ChannelBuffer[] patterns) {
|
||||
this.patterns = patterns;
|
||||
}
|
||||
|
||||
public byte[][] getPatterns() {
|
||||
public ChannelBuffer[] patterns() {
|
||||
return patterns;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ChannelBuffer os) throws IOException {
|
||||
void write(ChannelBuffer out) {
|
||||
// Do nothing
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,23 +16,33 @@
|
|||
package org.jboss.netty.handler.codec.redis;
|
||||
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
|
||||
public class UnsubscribeReply extends Reply {
|
||||
|
||||
private final byte[][] patterns;
|
||||
private final ChannelBuffer[] patterns;
|
||||
|
||||
public UnsubscribeReply(byte[][] patterns) {
|
||||
this.patterns = new ChannelBuffer[patterns.length];
|
||||
for (int i = 0; i < patterns.length; i ++) {
|
||||
byte[] p = patterns[i];
|
||||
if (p == null) {
|
||||
continue;
|
||||
}
|
||||
this.patterns[i] = ChannelBuffers.wrappedBuffer(p);
|
||||
}
|
||||
}
|
||||
|
||||
public UnsubscribeReply(ChannelBuffer[] patterns) {
|
||||
this.patterns = patterns;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ChannelBuffer os) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
public byte[][] getPatterns() {
|
||||
public ChannelBuffer[] patterns() {
|
||||
return patterns;
|
||||
}
|
||||
|
||||
@Override
|
||||
void write(ChannelBuffer out) {
|
||||
// Do nothing.
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ public class RedisCodecTest {
|
|||
|
||||
@Before
|
||||
public void setUp() {
|
||||
embedder = new DecoderEmbedder<ChannelBuffer>(new RedisDecoder());
|
||||
embedder = new DecoderEmbedder<ChannelBuffer>(new RedisReplyDecoder());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -43,28 +43,28 @@ public class RedisCodecTest {
|
|||
{
|
||||
Object receive = decode("+OK\r\n".getBytes());
|
||||
assertTrue(receive instanceof StatusReply);
|
||||
assertEquals("OK", ((StatusReply) receive).status.toString(CharsetUtil.UTF_8));
|
||||
assertEquals("OK", ((StatusReply) receive).data().toString(CharsetUtil.UTF_8));
|
||||
}
|
||||
{
|
||||
Object receive = decode("-ERROR\r\n".getBytes());
|
||||
assertTrue(receive instanceof ErrorReply);
|
||||
assertEquals("ERROR", ((ErrorReply) receive).error.toString(CharsetUtil.UTF_8));
|
||||
assertEquals("ERROR", ((ErrorReply) receive).data().toString(CharsetUtil.UTF_8));
|
||||
}
|
||||
{
|
||||
Object receive = decode(":123\r\n".getBytes());
|
||||
assertTrue(receive instanceof IntegerReply);
|
||||
assertEquals(123, ((IntegerReply) receive).integer);
|
||||
assertEquals(123, ((IntegerReply) receive).value());
|
||||
}
|
||||
{
|
||||
Object receive = decode("$5\r\nnetty\r\n".getBytes());
|
||||
assertTrue(receive instanceof BulkReply);
|
||||
assertEquals("netty", new String(((BulkReply) receive).bytes));
|
||||
assertEquals("netty", ((BulkReply) receive).data().toString(CharsetUtil.UTF_8));
|
||||
}
|
||||
{
|
||||
Object receive = decode("*2\r\n$5\r\nnetty\r\n$5\r\nrules\r\n".getBytes());
|
||||
assertTrue(receive instanceof MultiBulkReply);
|
||||
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
|
||||
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
|
||||
assertEquals("netty", ((ChannelBuffer) ((MultiBulkReply) receive).values()[0]).toString(CharsetUtil.UTF_8));
|
||||
assertEquals("rules", ((ChannelBuffer) ((MultiBulkReply) receive).values()[1]).toString(CharsetUtil.UTF_8));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -97,8 +97,8 @@ public class RedisCodecTest {
|
|||
embedder.offer(wrappedBuffer("$5\r\nrules\r\n".getBytes()));
|
||||
receive = embedder.poll();
|
||||
assertTrue(receive instanceof MultiBulkReply);
|
||||
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
|
||||
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
|
||||
assertEquals("netty", ((ChannelBuffer) ((MultiBulkReply) receive).values()[0]).toString(CharsetUtil.UTF_8));
|
||||
assertEquals("rules", ((ChannelBuffer) ((MultiBulkReply) receive).values()[1]).toString(CharsetUtil.UTF_8));
|
||||
}
|
||||
{
|
||||
embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n$5\r\nr".getBytes()));
|
||||
|
@ -107,8 +107,8 @@ public class RedisCodecTest {
|
|||
embedder.offer(wrappedBuffer("ules\r\n".getBytes()));
|
||||
receive = embedder.poll();
|
||||
assertTrue(receive instanceof MultiBulkReply);
|
||||
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
|
||||
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
|
||||
assertEquals("netty", ((ChannelBuffer) ((MultiBulkReply) receive).values()[0]).toString(CharsetUtil.UTF_8));
|
||||
assertEquals("rules", ((ChannelBuffer) ((MultiBulkReply) receive).values()[1]).toString(CharsetUtil.UTF_8));
|
||||
}
|
||||
{
|
||||
embedder.offer(wrappedBuffer("*2".getBytes()));
|
||||
|
@ -117,8 +117,8 @@ public class RedisCodecTest {
|
|||
embedder.offer(wrappedBuffer("\r\n$5\r\nnetty\r\n$5\r\nrules\r\n".getBytes()));
|
||||
receive = embedder.poll();
|
||||
assertTrue(receive instanceof MultiBulkReply);
|
||||
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
|
||||
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
|
||||
assertEquals("netty", ((ChannelBuffer) ((MultiBulkReply) receive).values()[0]).toString(CharsetUtil.UTF_8));
|
||||
assertEquals("rules", ((ChannelBuffer) ((MultiBulkReply) receive).values()[1]).toString(CharsetUtil.UTF_8));
|
||||
}
|
||||
{
|
||||
embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n$5\r\nrules\r".getBytes()));
|
||||
|
@ -127,8 +127,8 @@ public class RedisCodecTest {
|
|||
embedder.offer(wrappedBuffer("\n".getBytes()));
|
||||
receive = embedder.poll();
|
||||
assertTrue(receive instanceof MultiBulkReply);
|
||||
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
|
||||
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
|
||||
assertEquals("netty", ((ChannelBuffer) ((MultiBulkReply) receive).values()[0]).toString(CharsetUtil.UTF_8));
|
||||
assertEquals("rules", ((ChannelBuffer) ((MultiBulkReply) receive).values()[1]).toString(CharsetUtil.UTF_8));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user