Some more cleanup of the redis code

This commit is contained in:
norman 2012-03-12 08:15:15 +01:00
parent 4cb3bf2c4a
commit a5a41a51a2
8 changed files with 50 additions and 11 deletions

View File

@ -19,6 +19,11 @@ import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException; import java.io.IOException;
/**
* {@link Reply} which will be returned if an error was detected
*
*
*/
public class ErrorReply extends Reply { public class ErrorReply extends Reply {
public static final char MARKER = '-'; public static final char MARKER = '-';
private static final byte[] ERR = "ERR ".getBytes(); private static final byte[] ERR = "ERR ".getBytes();

View File

@ -19,6 +19,10 @@ import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException; import java.io.IOException;
/**
* {@link Reply} which will get returned if a {@link Integer} was requested via <code>GET</code>
*
*/
public class IntegerReply extends Reply { public class IntegerReply extends Reply {
public static final char MARKER = ':'; public static final char MARKER = ':';
public final long integer; public final long integer;

View File

@ -19,6 +19,11 @@ import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException; import java.io.IOException;
/**
* {@link Reply} which contains a bulk of {@link Reply}'s
*
*
*/
public class MultiBulkReply extends Reply { public class MultiBulkReply extends Reply {
public static final char MARKER = '*'; public static final char MARKER = '*';
@ -55,7 +60,7 @@ public class MultiBulkReply extends Reply {
for (int i = num; i < size; i++) { for (int i = num; i < size; i++) {
int read = is.readByte(); int read = is.readByte();
if (read == BulkReply.MARKER) { if (read == BulkReply.MARKER) {
byteArrays[i] = rd.readBytes(is); byteArrays[i] = RedisDecoder.readBytes(is);
} else if (read == IntegerReply.MARKER) { } else if (read == IntegerReply.MARKER) {
byteArrays[i] = RedisDecoder.readInteger(is); byteArrays[i] = RedisDecoder.readInteger(is);
} else { } else {

View File

@ -27,6 +27,6 @@ public class PUnsubscribeReply extends UnsubscribeReply {
@Override @Override
public void write(ChannelBuffer os) throws IOException { public void write(ChannelBuffer os) throws IOException {
// Do nothing
} }
} }

View File

@ -22,30 +22,36 @@ import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.replay.ReplayingDecoder; import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.Charset;
/**
* {@link ReplayingDecoder} which handles Redis protocol
*
*
*/
public class RedisDecoder extends ReplayingDecoder<State> { public class RedisDecoder extends ReplayingDecoder<State> {
private static final char CR = '\r'; private static final char CR = '\r';
private static final char LF = '\n'; private static final char LF = '\n';
private static final char ZERO = '0'; private static final char ZERO = '0';
public static final Charset UTF_8 = Charset.forName("UTF-8");
// We track the current multibulk reply in the case // We track the current multibulk reply in the case
// where we do not get a complete reply in a single // where we do not get a complete reply in a single
// decode invocation. // decode invocation.
private MultiBulkReply reply; private MultiBulkReply reply;
public byte[] readBytes(ChannelBuffer is) throws IOException { /**
* Return a byte array which contains only the content of the request. The size of the content is read from the given {@link ChannelBuffer}
* via the {@link #readInteger(ChannelBuffer)} method
*
* @param is the {@link ChannelBuffer} to read from
* @return content
* @throws IOException is thrown if the line-ending is not CRLF
*/
public static byte[] readBytes(ChannelBuffer is) throws IOException {
int size = readInteger(is); int size = readInteger(is);
if (size == -1) { if (size == -1) {
return null; return null;
} }
if (super.actualReadableBytes() < size + 2) {
// Trigger error
is.skipBytes(size + 2);
throw new AssertionError("Trustin says this isn't possible");
}
byte[] bytes = new byte[size]; byte[] bytes = new byte[size];
is.readBytes(bytes, 0, size); is.readBytes(bytes, 0, size);
int cr = is.readByte(); int cr = is.readByte();
@ -56,6 +62,13 @@ public class RedisDecoder extends ReplayingDecoder<State> {
return bytes; return bytes;
} }
/**
* Read an {@link Integer} from the {@link ChannelBuffer}
*
* @param is
* @return integer
* @throws IOException
*/
public static int readInteger(ChannelBuffer is) throws IOException { public static int readInteger(ChannelBuffer is) throws IOException {
int size = 0; int size = 0;
int sign = 1; int sign = 1;

View File

@ -21,8 +21,15 @@ import org.jboss.netty.util.CharsetUtil;
import java.io.IOException; import java.io.IOException;
/**
* {@link Reply} which was sent as a Response to the written {@link Command}
* *
*/
public abstract class Reply { public abstract class Reply {
/**
* Write the content of the {@link Reply} to the given {@link ChannelBuffer}
*/
public abstract void write(ChannelBuffer os) throws IOException; public abstract void write(ChannelBuffer os) throws IOException;
@Override @Override

View File

@ -19,6 +19,10 @@ import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException; import java.io.IOException;
/**
* {@link Reply} which contains the status
*
*/
public class StatusReply extends Reply { public class StatusReply extends Reply {
public static final char MARKER = '+'; public static final char MARKER = '+';
public final ChannelBuffer status; public final ChannelBuffer status;

View File

@ -33,5 +33,6 @@ public class SubscribeReply extends Reply {
@Override @Override
public void write(ChannelBuffer os) throws IOException { public void write(ChannelBuffer os) throws IOException {
// Do nothing
} }
} }