Merge redis codec to master branch. See #201
This commit is contained in:
parent
92a907c4d7
commit
0a6f7395f3
|
@ -0,0 +1,41 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2011 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.handler.codec.redis;
|
||||||
|
|
||||||
|
import io.netty.buffer.ChannelBuffer;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public class BulkReply extends Reply {
|
||||||
|
public static final char MARKER = '$';
|
||||||
|
public final byte[] bytes;
|
||||||
|
|
||||||
|
public BulkReply(byte[] bytes) {
|
||||||
|
this.bytes = bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(ChannelBuffer os) throws IOException {
|
||||||
|
os.writeByte(MARKER);
|
||||||
|
if (bytes == null) {
|
||||||
|
os.writeBytes(Command.NEG_ONE_AND_CRLF);
|
||||||
|
} else {
|
||||||
|
os.writeBytes(Command.numAndCRLF(bytes.length));
|
||||||
|
os.writeBytes(bytes);
|
||||||
|
os.writeBytes(Command.CRLF);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
132
codec/src/main/java/io/netty/handler/codec/redis/Command.java
Normal file
132
codec/src/main/java/io/netty/handler/codec/redis/Command.java
Normal file
|
@ -0,0 +1,132 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2011 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.handler.codec.redis;
|
||||||
|
|
||||||
|
import io.netty.buffer.ChannelBuffer;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Command serialization.
|
||||||
|
*/
|
||||||
|
public class Command {
|
||||||
|
public static final byte[] ARGS_PREFIX = "*".getBytes();
|
||||||
|
public static final byte[] CRLF = "\r\n".getBytes();
|
||||||
|
public static final byte[] BYTES_PREFIX = "$".getBytes();
|
||||||
|
public static final byte[] EMPTY_BYTES = new byte[0];
|
||||||
|
public static final byte[] NEG_ONE_AND_CRLF = convertWithCRLF(-1);
|
||||||
|
|
||||||
|
private byte[][] arguments;
|
||||||
|
private Object[] objects;
|
||||||
|
|
||||||
|
public String getName() {
|
||||||
|
if (arguments == null) {
|
||||||
|
Object o = objects[0];
|
||||||
|
if (o instanceof byte[]) {
|
||||||
|
return new String((byte[]) o);
|
||||||
|
} else {
|
||||||
|
return o.toString();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return new String(arguments[0]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Command(byte[]... arguments) {
|
||||||
|
this.arguments = arguments;
|
||||||
|
objects = arguments;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Command(Object... objects) {
|
||||||
|
this.objects = objects;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void write(ChannelBuffer os) throws IOException {
|
||||||
|
writeDirect(os, objects);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void writeDirect(ChannelBuffer os, Object... objects) throws IOException {
|
||||||
|
int length = objects.length;
|
||||||
|
byte[][] arguments = new byte[length][];
|
||||||
|
for (int i = 0; i < length; i++) {
|
||||||
|
Object object = objects[i];
|
||||||
|
if (object == null) {
|
||||||
|
arguments[i] = EMPTY_BYTES;
|
||||||
|
} else if (object instanceof byte[]) {
|
||||||
|
arguments[i] = (byte[]) object;
|
||||||
|
} else {
|
||||||
|
arguments[i] = object.toString().getBytes("UTF-8");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
writeDirect(os, arguments);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void writeDirect(ChannelBuffer os, byte[][] arguments) throws IOException {
|
||||||
|
os.writeBytes(ARGS_PREFIX);
|
||||||
|
os.writeBytes(numAndCRLF(arguments.length));
|
||||||
|
for (byte[] argument : arguments) {
|
||||||
|
os.writeBytes(BYTES_PREFIX);
|
||||||
|
os.writeBytes(numAndCRLF(argument.length));
|
||||||
|
os.writeBytes(argument);
|
||||||
|
os.writeBytes(CRLF);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final int NUM_MAP_LENGTH = 256;
|
||||||
|
private static byte[][] numAndCRLFMap = new byte[NUM_MAP_LENGTH][];
|
||||||
|
static {
|
||||||
|
for (int i = 0; i < NUM_MAP_LENGTH; i++) {
|
||||||
|
numAndCRLFMap[i] = convertWithCRLF(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Optimized for the direct to ASCII bytes case
|
||||||
|
// Could be even more optimized but it is already
|
||||||
|
// about twice as fast as using Long.toString().getBytes()
|
||||||
|
public static byte[] numAndCRLF(long value) {
|
||||||
|
if (value >= 0 && value < NUM_MAP_LENGTH) {
|
||||||
|
return numAndCRLFMap[(int) value];
|
||||||
|
} else if (value == -1) {
|
||||||
|
return NEG_ONE_AND_CRLF;
|
||||||
|
}
|
||||||
|
return convertWithCRLF(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] convertWithCRLF(long value) {
|
||||||
|
boolean negative = value < 0;
|
||||||
|
int index = negative ? 2 : 1;
|
||||||
|
long current = negative ? -value : value;
|
||||||
|
while ((current /= 10) > 0) {
|
||||||
|
index++;
|
||||||
|
}
|
||||||
|
byte[] bytes = new byte[index + 2];
|
||||||
|
if (negative) {
|
||||||
|
bytes[0] = '-';
|
||||||
|
}
|
||||||
|
current = negative ? -value : value;
|
||||||
|
long tmp = current;
|
||||||
|
while ((tmp /= 10) > 0) {
|
||||||
|
bytes[--index] = (byte) ('0' + (current % 10));
|
||||||
|
current = tmp;
|
||||||
|
}
|
||||||
|
bytes[--index] = (byte) ('0' + current);
|
||||||
|
// add CRLF
|
||||||
|
bytes[bytes.length - 2] = '\r';
|
||||||
|
bytes[bytes.length - 1] = '\n';
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,43 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2011 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.handler.codec.redis;
|
||||||
|
|
||||||
|
import io.netty.buffer.ChannelBuffer;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link Reply} which will be returned if an error was detected
|
||||||
|
*
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class ErrorReply extends Reply {
|
||||||
|
public static final char MARKER = '-';
|
||||||
|
private static final byte[] ERR = "ERR ".getBytes();
|
||||||
|
public final ChannelBuffer error;
|
||||||
|
|
||||||
|
public ErrorReply(ChannelBuffer error) {
|
||||||
|
this.error = error;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(ChannelBuffer os) throws IOException {
|
||||||
|
os.writeByte(MARKER);
|
||||||
|
os.writeBytes(ERR);
|
||||||
|
os.writeBytes(error, 0, error.readableBytes());
|
||||||
|
os.writeBytes(Command.CRLF);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,39 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2011 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.handler.codec.redis;
|
||||||
|
|
||||||
|
import io.netty.buffer.ChannelBuffer;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link Reply} which will get returned if a {@link Integer} was requested via <code>GET</code>
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class IntegerReply extends Reply {
|
||||||
|
public static final char MARKER = ':';
|
||||||
|
public final long integer;
|
||||||
|
|
||||||
|
public IntegerReply(long integer) {
|
||||||
|
this.integer = integer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(ChannelBuffer os) throws IOException {
|
||||||
|
os.writeByte(MARKER);
|
||||||
|
os.writeBytes(Command.numAndCRLF(integer));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,99 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2011 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.handler.codec.redis;
|
||||||
|
|
||||||
|
import io.netty.buffer.ChannelBuffer;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link Reply} which contains a bulk of {@link Reply}'s
|
||||||
|
*
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class MultiBulkReply extends Reply {
|
||||||
|
public static final char MARKER = '*';
|
||||||
|
|
||||||
|
// State
|
||||||
|
public Object[] byteArrays;
|
||||||
|
private int size;
|
||||||
|
private int num;
|
||||||
|
|
||||||
|
public MultiBulkReply() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public MultiBulkReply(Object... values) {
|
||||||
|
this.byteArrays = values;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void read(RedisDecoder rd, ChannelBuffer is) throws IOException {
|
||||||
|
// If we attempted to read the size before, skip the '*' and reread it
|
||||||
|
if (size == -1) {
|
||||||
|
byte star = is.readByte();
|
||||||
|
if (star == MARKER) {
|
||||||
|
size = 0;
|
||||||
|
} else {
|
||||||
|
throw new AssertionError("Unexpected character in stream: " + star);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (size == 0) {
|
||||||
|
// If the read fails, we need to skip the star
|
||||||
|
size = -1;
|
||||||
|
// Read the size, if this is successful we won't read the star again
|
||||||
|
size = RedisDecoder.readInteger(is);
|
||||||
|
byteArrays = new Object[size];
|
||||||
|
rd.checkpoint();
|
||||||
|
}
|
||||||
|
for (int i = num; i < size; i++) {
|
||||||
|
int read = is.readByte();
|
||||||
|
if (read == BulkReply.MARKER) {
|
||||||
|
byteArrays[i] = RedisDecoder.readBytes(is);
|
||||||
|
} else if (read == IntegerReply.MARKER) {
|
||||||
|
byteArrays[i] = RedisDecoder.readInteger(is);
|
||||||
|
} else {
|
||||||
|
throw new IOException("Unexpected character in stream: " + read);
|
||||||
|
}
|
||||||
|
num = i + 1;
|
||||||
|
rd.checkpoint();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(ChannelBuffer os) throws IOException {
|
||||||
|
os.writeByte(MARKER);
|
||||||
|
if (byteArrays == null) {
|
||||||
|
os.writeBytes(Command.NEG_ONE_AND_CRLF);
|
||||||
|
} else {
|
||||||
|
os.writeBytes(Command.numAndCRLF(byteArrays.length));
|
||||||
|
for (Object value : byteArrays) {
|
||||||
|
if (value == null) {
|
||||||
|
os.writeByte(BulkReply.MARKER);
|
||||||
|
os.writeBytes(Command.NEG_ONE_AND_CRLF);
|
||||||
|
} else if (value instanceof byte[]) {
|
||||||
|
byte[] bytes = (byte[]) value;
|
||||||
|
os.writeByte(BulkReply.MARKER);
|
||||||
|
int length = bytes.length;
|
||||||
|
os.writeBytes(Command.numAndCRLF(length));
|
||||||
|
os.writeBytes(bytes);
|
||||||
|
os.writeBytes(Command.CRLF);
|
||||||
|
} else if (value instanceof Number) {
|
||||||
|
os.writeByte(IntegerReply.MARKER);
|
||||||
|
os.writeBytes(Command.numAndCRLF(((Number) value).longValue()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,24 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2011 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.handler.codec.redis;
|
||||||
|
|
||||||
|
public class PSubscribeReply extends SubscribeReply {
|
||||||
|
|
||||||
|
public PSubscribeReply(byte[][] patterns) {
|
||||||
|
super(patterns);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,32 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2011 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.handler.codec.redis;
|
||||||
|
|
||||||
|
import io.netty.buffer.ChannelBuffer;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public class PUnsubscribeReply extends UnsubscribeReply {
|
||||||
|
|
||||||
|
public PUnsubscribeReply(byte[][] patterns) {
|
||||||
|
super(patterns);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(ChannelBuffer os) throws IOException {
|
||||||
|
// Do nothing
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,149 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2011 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.handler.codec.redis;
|
||||||
|
|
||||||
|
import io.netty.buffer.ChannelBuffer;
|
||||||
|
import io.netty.buffer.ChannelBufferIndexFinder;
|
||||||
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.handler.codec.replay.ReplayingDecoder;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link ReplayingDecoder} which handles Redis protocol
|
||||||
|
*
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class RedisDecoder extends ReplayingDecoder<State> {
|
||||||
|
|
||||||
|
private static final char CR = '\r';
|
||||||
|
private static final char LF = '\n';
|
||||||
|
private static final char ZERO = '0';
|
||||||
|
|
||||||
|
// We track the current multibulk reply in the case
|
||||||
|
// where we do not get a complete reply in a single
|
||||||
|
// decode invocation.
|
||||||
|
private MultiBulkReply reply;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a byte array which contains only the content of the request. The size of the content is read from the given {@link ChannelBuffer}
|
||||||
|
* via the {@link #readInteger(ChannelBuffer)} method
|
||||||
|
*
|
||||||
|
* @param is the {@link ChannelBuffer} to read from
|
||||||
|
* @return content
|
||||||
|
* @throws IOException is thrown if the line-ending is not CRLF
|
||||||
|
*/
|
||||||
|
public static byte[] readBytes(ChannelBuffer is) throws IOException {
|
||||||
|
int size = readInteger(is);
|
||||||
|
if (size == -1) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
byte[] bytes = new byte[size];
|
||||||
|
is.readBytes(bytes, 0, size);
|
||||||
|
int cr = is.readByte();
|
||||||
|
int lf = is.readByte();
|
||||||
|
if (cr != CR || lf != LF) {
|
||||||
|
throw new IOException("Improper line ending: " + cr + ", " + lf);
|
||||||
|
}
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read an {@link Integer} from the {@link ChannelBuffer}
|
||||||
|
*
|
||||||
|
* @param is
|
||||||
|
* @return integer
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static int readInteger(ChannelBuffer is) throws IOException {
|
||||||
|
int size = 0;
|
||||||
|
int sign = 1;
|
||||||
|
int read = is.readByte();
|
||||||
|
if (read == '-') {
|
||||||
|
read = is.readByte();
|
||||||
|
sign = -1;
|
||||||
|
}
|
||||||
|
do {
|
||||||
|
if (read == CR) {
|
||||||
|
if (is.readByte() == LF) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
int value = read - ZERO;
|
||||||
|
if (value >= 0 && value < 10) {
|
||||||
|
size *= 10;
|
||||||
|
size += value;
|
||||||
|
} else {
|
||||||
|
throw new IOException("Invalid character in integer");
|
||||||
|
}
|
||||||
|
read = is.readByte();
|
||||||
|
} while (true);
|
||||||
|
return size * sign;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void checkpoint() {
|
||||||
|
super.checkpoint();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Object decode(ChannelHandlerContext channelHandlerContext, Channel channel, ChannelBuffer channelBuffer, State anEnum) throws Exception {
|
||||||
|
if (reply != null) {
|
||||||
|
reply.read(this, channelBuffer);
|
||||||
|
Reply ret = reply;
|
||||||
|
reply = null;
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
int code = channelBuffer.readByte();
|
||||||
|
switch (code) {
|
||||||
|
case StatusReply.MARKER: {
|
||||||
|
ChannelBuffer status = channelBuffer.readBytes(channelBuffer.bytesBefore(ChannelBufferIndexFinder.CRLF));
|
||||||
|
channelBuffer.skipBytes(2);
|
||||||
|
return new StatusReply(status);
|
||||||
|
}
|
||||||
|
case ErrorReply.MARKER: {
|
||||||
|
ChannelBuffer error = channelBuffer.readBytes(channelBuffer.bytesBefore(ChannelBufferIndexFinder.CRLF));
|
||||||
|
channelBuffer.skipBytes(2);
|
||||||
|
return new ErrorReply(error);
|
||||||
|
}
|
||||||
|
case IntegerReply.MARKER: {
|
||||||
|
return new IntegerReply(readInteger(channelBuffer));
|
||||||
|
}
|
||||||
|
case BulkReply.MARKER: {
|
||||||
|
return new BulkReply(readBytes(channelBuffer));
|
||||||
|
}
|
||||||
|
case MultiBulkReply.MARKER: {
|
||||||
|
return decodeMultiBulkReply(channelBuffer);
|
||||||
|
}
|
||||||
|
default: {
|
||||||
|
throw new IOException("Unexpected character in stream: " + code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private MultiBulkReply decodeMultiBulkReply(ChannelBuffer is) throws IOException {
|
||||||
|
if (reply == null) {
|
||||||
|
reply = new MultiBulkReply();
|
||||||
|
}
|
||||||
|
reply.read(this, is);
|
||||||
|
return reply;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enum State {
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,92 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2011 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.handler.codec.redis;
|
||||||
|
|
||||||
|
import io.netty.buffer.ChannelBuffer;
|
||||||
|
import io.netty.buffer.ChannelBuffers;
|
||||||
|
import io.netty.channel.ChannelFuture;
|
||||||
|
import io.netty.channel.ChannelFutureListener;
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.channel.Channels;
|
||||||
|
import io.netty.channel.MessageEvent;
|
||||||
|
import io.netty.channel.SimpleChannelDownstreamHandler;
|
||||||
|
import io.netty.channel.ChannelHandler.Sharable;
|
||||||
|
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link SimpleChannelDownstreamHandler} which encodes {@link Command}'s to {@link ChannelBuffer}'s
|
||||||
|
*
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@Sharable
|
||||||
|
public class RedisEncoder extends SimpleChannelDownstreamHandler {
|
||||||
|
|
||||||
|
private final Queue<ChannelBuffer> pool;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calls {@link #RedisEncoder(boolean)} with <code>false</code>
|
||||||
|
*/
|
||||||
|
public RedisEncoder() {
|
||||||
|
this(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new {@link RedisEncoder} instance
|
||||||
|
*
|
||||||
|
* @param poolBuffers <code>true</code> if the {@link ChannelBuffer}'s should be pooled. This should be used with caution as this
|
||||||
|
* can lead to unnecessary big memory consummation if one of the written values is very big and the rest is very small.
|
||||||
|
*/
|
||||||
|
public RedisEncoder(boolean poolBuffers) {
|
||||||
|
if (poolBuffers) {
|
||||||
|
pool = new ConcurrentLinkedQueue<ChannelBuffer>();
|
||||||
|
} else {
|
||||||
|
pool = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
|
||||||
|
Object o = e.getMessage();
|
||||||
|
if (o instanceof Command) {
|
||||||
|
Command command = (Command) o;
|
||||||
|
ChannelBuffer cb = null;
|
||||||
|
if (pool != null) {
|
||||||
|
cb = pool.poll();
|
||||||
|
}
|
||||||
|
if (cb == null) {
|
||||||
|
cb = ChannelBuffers.dynamicBuffer();
|
||||||
|
}
|
||||||
|
command.write(cb);
|
||||||
|
ChannelFuture future = e.getFuture();
|
||||||
|
|
||||||
|
if (pool != null) {
|
||||||
|
final ChannelBuffer finalCb = cb;
|
||||||
|
future.addListener(new ChannelFutureListener() {
|
||||||
|
public void operationComplete(ChannelFuture channelFuture) throws Exception {
|
||||||
|
finalCb.clear();
|
||||||
|
pool.add(finalCb);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Channels.write(ctx, future, cb);
|
||||||
|
} else {
|
||||||
|
super.writeRequested(ctx, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
46
codec/src/main/java/io/netty/handler/codec/redis/Reply.java
Normal file
46
codec/src/main/java/io/netty/handler/codec/redis/Reply.java
Normal file
|
@ -0,0 +1,46 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2011 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.handler.codec.redis;
|
||||||
|
|
||||||
|
import io.netty.buffer.ChannelBuffer;
|
||||||
|
import io.netty.buffer.ChannelBuffers;
|
||||||
|
import io.netty.util.CharsetUtil;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link Reply} which was sent as a Response to the written {@link Command}
|
||||||
|
* *
|
||||||
|
*/
|
||||||
|
public abstract class Reply {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write the content of the {@link Reply} to the given {@link ChannelBuffer}
|
||||||
|
*/
|
||||||
|
public abstract void write(ChannelBuffer os) throws IOException;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer();
|
||||||
|
try {
|
||||||
|
write(channelBuffer);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new AssertionError("Trustin says this won't happen either");
|
||||||
|
}
|
||||||
|
return channelBuffer.toString(CharsetUtil.UTF_8);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,40 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2011 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.handler.codec.redis;
|
||||||
|
|
||||||
|
import io.netty.buffer.ChannelBuffer;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link Reply} which contains the status
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class StatusReply extends Reply {
|
||||||
|
public static final char MARKER = '+';
|
||||||
|
public final ChannelBuffer status;
|
||||||
|
|
||||||
|
public StatusReply(ChannelBuffer status) {
|
||||||
|
this.status = status;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(ChannelBuffer os) throws IOException {
|
||||||
|
os.writeByte(MARKER);
|
||||||
|
os.writeBytes(status, 0, status.readableBytes());
|
||||||
|
os.writeBytes(Command.CRLF);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,38 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2011 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.handler.codec.redis;
|
||||||
|
|
||||||
|
import io.netty.buffer.ChannelBuffer;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public class SubscribeReply extends Reply {
|
||||||
|
|
||||||
|
private final byte[][] patterns;
|
||||||
|
|
||||||
|
public SubscribeReply(byte[][] patterns) {
|
||||||
|
this.patterns = patterns;
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[][] getPatterns() {
|
||||||
|
return patterns;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(ChannelBuffer os) throws IOException {
|
||||||
|
// Do nothing
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,38 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2011 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.handler.codec.redis;
|
||||||
|
|
||||||
|
import io.netty.buffer.ChannelBuffer;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public class UnsubscribeReply extends Reply {
|
||||||
|
|
||||||
|
private final byte[][] patterns;
|
||||||
|
|
||||||
|
public UnsubscribeReply(byte[][] patterns) {
|
||||||
|
this.patterns = patterns;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(ChannelBuffer os) throws IOException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[][] getPatterns() {
|
||||||
|
return patterns;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,24 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2011 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Encoder and decoder which transform a
|
||||||
|
* <a href="http://http://redis.io/topics/protocol">Redis protocol commands and replies</a>
|
||||||
|
* into a {@link org.jboss.netty.buffer.ChannelBuffer}
|
||||||
|
* and vice versa.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
package io.netty.handler.codec.redis;
|
|
@ -0,0 +1,134 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2011 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.handler.codec.redis;
|
||||||
|
|
||||||
|
import io.netty.buffer.ChannelBuffer;
|
||||||
|
import io.netty.buffer.ChannelBuffers;
|
||||||
|
import io.netty.handler.codec.embedder.DecoderEmbedder;
|
||||||
|
import io.netty.util.CharsetUtil;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import static io.netty.buffer.ChannelBuffers.wrappedBuffer;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
public class RedisCodecTest {
|
||||||
|
|
||||||
|
private DecoderEmbedder<ChannelBuffer> embedder;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
embedder = new DecoderEmbedder<ChannelBuffer>(new RedisDecoder());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void decodeReplies() throws IOException {
|
||||||
|
{
|
||||||
|
Object receive = decode("+OK\r\n".getBytes());
|
||||||
|
assertTrue(receive instanceof StatusReply);
|
||||||
|
assertEquals("OK", ((StatusReply) receive).status.toString(CharsetUtil.UTF_8));
|
||||||
|
}
|
||||||
|
{
|
||||||
|
Object receive = decode("-ERROR\r\n".getBytes());
|
||||||
|
assertTrue(receive instanceof ErrorReply);
|
||||||
|
assertEquals("ERROR", ((ErrorReply) receive).error.toString(CharsetUtil.UTF_8));
|
||||||
|
}
|
||||||
|
{
|
||||||
|
Object receive = decode(":123\r\n".getBytes());
|
||||||
|
assertTrue(receive instanceof IntegerReply);
|
||||||
|
assertEquals(123, ((IntegerReply) receive).integer);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
Object receive = decode("$5\r\nnetty\r\n".getBytes());
|
||||||
|
assertTrue(receive instanceof BulkReply);
|
||||||
|
assertEquals("netty", new String(((BulkReply) receive).bytes));
|
||||||
|
}
|
||||||
|
{
|
||||||
|
Object receive = decode("*2\r\n$5\r\nnetty\r\n$5\r\nrules\r\n".getBytes());
|
||||||
|
assertTrue(receive instanceof MultiBulkReply);
|
||||||
|
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
|
||||||
|
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Object decode(byte[] bytes) {
|
||||||
|
embedder.offer(wrappedBuffer(bytes));
|
||||||
|
return embedder.poll();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void encodeCommands() throws IOException {
|
||||||
|
String setCommand = "*3\r\n" +
|
||||||
|
"$3\r\n" +
|
||||||
|
"SET\r\n" +
|
||||||
|
"$5\r\n" +
|
||||||
|
"mykey\r\n" +
|
||||||
|
"$7\r\n" +
|
||||||
|
"myvalue\r\n";
|
||||||
|
Command command = new Command("SET", "mykey", "myvalue");
|
||||||
|
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
|
||||||
|
command.write(cb);
|
||||||
|
assertEquals(setCommand, cb.toString(CharsetUtil.US_ASCII));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReplayDecoding() {
|
||||||
|
{
|
||||||
|
embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n".getBytes()));
|
||||||
|
Object receive = embedder.poll();
|
||||||
|
assertNull(receive);
|
||||||
|
embedder.offer(wrappedBuffer("$5\r\nrules\r\n".getBytes()));
|
||||||
|
receive = embedder.poll();
|
||||||
|
assertTrue(receive instanceof MultiBulkReply);
|
||||||
|
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
|
||||||
|
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
|
||||||
|
}
|
||||||
|
{
|
||||||
|
embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n$5\r\nr".getBytes()));
|
||||||
|
Object receive = embedder.poll();
|
||||||
|
assertNull(receive);
|
||||||
|
embedder.offer(wrappedBuffer("ules\r\n".getBytes()));
|
||||||
|
receive = embedder.poll();
|
||||||
|
assertTrue(receive instanceof MultiBulkReply);
|
||||||
|
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
|
||||||
|
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
|
||||||
|
}
|
||||||
|
{
|
||||||
|
embedder.offer(wrappedBuffer("*2".getBytes()));
|
||||||
|
Object receive = embedder.poll();
|
||||||
|
assertNull(receive);
|
||||||
|
embedder.offer(wrappedBuffer("\r\n$5\r\nnetty\r\n$5\r\nrules\r\n".getBytes()));
|
||||||
|
receive = embedder.poll();
|
||||||
|
assertTrue(receive instanceof MultiBulkReply);
|
||||||
|
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
|
||||||
|
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
|
||||||
|
}
|
||||||
|
{
|
||||||
|
embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n$5\r\nrules\r".getBytes()));
|
||||||
|
Object receive = embedder.poll();
|
||||||
|
assertNull(receive);
|
||||||
|
embedder.offer(wrappedBuffer("\n".getBytes()));
|
||||||
|
receive = embedder.poll();
|
||||||
|
assertTrue(receive instanceof MultiBulkReply);
|
||||||
|
assertEquals("netty", new String((byte[]) ((MultiBulkReply) receive).byteArrays[0]));
|
||||||
|
assertEquals("rules", new String((byte[]) ((MultiBulkReply) receive).byteArrays[1]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user