Remove Redis client codec (will continue work on master)
This commit is contained in:
parent
2c3fef467a
commit
1d74c13fe7
@ -1,121 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2011 The Netty Project
|
|
||||||
*
|
|
||||||
* The Netty Project licenses this file to you under the Apache License,
|
|
||||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at:
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package org.jboss.netty.example.redis;
|
|
||||||
|
|
||||||
import org.jboss.netty.bootstrap.ClientBootstrap;
|
|
||||||
import org.jboss.netty.channel.Channel;
|
|
||||||
import org.jboss.netty.channel.ChannelFuture;
|
|
||||||
import org.jboss.netty.channel.ChannelPipeline;
|
|
||||||
import org.jboss.netty.channel.ChannelPipelineFactory;
|
|
||||||
import org.jboss.netty.channel.Channels;
|
|
||||||
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
|
|
||||||
import org.jboss.netty.handler.codec.redis.Command;
|
|
||||||
import org.jboss.netty.handler.codec.redis.RedisCommandEncoder;
|
|
||||||
import org.jboss.netty.handler.codec.redis.RedisReplyDecoder;
|
|
||||||
import org.jboss.netty.handler.codec.redis.Reply;
|
|
||||||
import org.jboss.netty.handler.queue.BlockingReadHandler;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
|
|
||||||
public final class RedisClient {
|
|
||||||
private static final byte[] VALUE = "value".getBytes();
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
ExecutorService executor = Executors.newCachedThreadPool();
|
|
||||||
final ClientBootstrap cb = new ClientBootstrap(new NioClientSocketChannelFactory(executor, executor));
|
|
||||||
final BlockingReadHandler<Reply> blockingReadHandler = new BlockingReadHandler<Reply>();
|
|
||||||
cb.setPipelineFactory(new ChannelPipelineFactory() {
|
|
||||||
public ChannelPipeline getPipeline() throws Exception {
|
|
||||||
ChannelPipeline pipeline = Channels.pipeline();
|
|
||||||
pipeline.addLast("redisEncoder", new RedisCommandEncoder());
|
|
||||||
pipeline.addLast("redisDecoder", new RedisReplyDecoder());
|
|
||||||
pipeline.addLast("result", blockingReadHandler);
|
|
||||||
return pipeline;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
ChannelFuture redis = cb.connect(new InetSocketAddress("localhost", 6379));
|
|
||||||
redis.await().rethrowIfFailed();
|
|
||||||
Channel channel = redis.getChannel();
|
|
||||||
|
|
||||||
channel.write(new Command("set", "1", "value"));
|
|
||||||
System.out.print(blockingReadHandler.read());
|
|
||||||
channel.write(new Command("get", "1"));
|
|
||||||
System.out.print(blockingReadHandler.read());
|
|
||||||
|
|
||||||
int CALLS = 1000000;
|
|
||||||
int PIPELINE = 50;
|
|
||||||
requestResponse(blockingReadHandler, channel, CALLS);
|
|
||||||
pipelinedIndividualRequests(blockingReadHandler, channel, CALLS * 10, PIPELINE);
|
|
||||||
pipelinedListOfRequests(blockingReadHandler, channel, CALLS * 10, PIPELINE);
|
|
||||||
|
|
||||||
channel.close();
|
|
||||||
cb.releaseExternalResources();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void pipelinedListOfRequests(BlockingReadHandler<Reply> blockingReadHandler, Channel channel, long CALLS, int PIPELINE) throws IOException, InterruptedException {
|
|
||||||
long start = System.currentTimeMillis();
|
|
||||||
byte[] SET_BYTES = "SET".getBytes();
|
|
||||||
for (int i = 0; i < CALLS / PIPELINE; i++) {
|
|
||||||
List<Command> list = new ArrayList<Command>();
|
|
||||||
for (int j = 0; j < PIPELINE; j++) {
|
|
||||||
int base = i * PIPELINE;
|
|
||||||
list.add(new Command(SET_BYTES, String.valueOf(base + j).getBytes(), VALUE));
|
|
||||||
}
|
|
||||||
channel.write(list);
|
|
||||||
for (int j = 0; j < PIPELINE; j++) {
|
|
||||||
blockingReadHandler.read();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
long end = System.currentTimeMillis();
|
|
||||||
System.out.println(CALLS * 1000 / (end - start) + " calls per second");
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void pipelinedIndividualRequests(BlockingReadHandler<Reply> blockingReadHandler, Channel channel, long CALLS, int PIPELINE) throws IOException, InterruptedException {
|
|
||||||
long start = System.currentTimeMillis();
|
|
||||||
byte[] SET_BYTES = "SET".getBytes();
|
|
||||||
for (int i = 0; i < CALLS / PIPELINE; i++) {
|
|
||||||
int base = i * PIPELINE;
|
|
||||||
for (int j = 0; j < PIPELINE; j++) {
|
|
||||||
channel.write(new Command(SET_BYTES, String.valueOf(base + j).getBytes(), VALUE));
|
|
||||||
}
|
|
||||||
for (int j = 0; j < PIPELINE; j++) {
|
|
||||||
blockingReadHandler.read();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
long end = System.currentTimeMillis();
|
|
||||||
System.out.println(CALLS * 1000 / (end - start) + " calls per second");
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void requestResponse(BlockingReadHandler<Reply> blockingReadHandler, Channel channel, int CALLS) throws IOException, InterruptedException {
|
|
||||||
long start = System.currentTimeMillis();
|
|
||||||
byte[] SET_BYTES = "SET".getBytes();
|
|
||||||
for (int i = 0; i < CALLS; i++) {
|
|
||||||
channel.write(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE));
|
|
||||||
blockingReadHandler.read();
|
|
||||||
}
|
|
||||||
long end = System.currentTimeMillis();
|
|
||||||
System.out.println(CALLS * 1000 / (end - start) + " calls per second");
|
|
||||||
}
|
|
||||||
|
|
||||||
private RedisClient() {
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,49 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2011 The Netty Project
|
|
||||||
*
|
|
||||||
* The Netty Project licenses this file to you under the Apache License,
|
|
||||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at:
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package org.jboss.netty.handler.codec.redis;
|
|
||||||
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffers;
|
|
||||||
|
|
||||||
public class BulkReply extends Reply {
|
|
||||||
static final char MARKER = '$';
|
|
||||||
|
|
||||||
private final ChannelBuffer data;
|
|
||||||
|
|
||||||
public BulkReply(byte[] data) {
|
|
||||||
this(data == null? null : ChannelBuffers.wrappedBuffer(data));
|
|
||||||
}
|
|
||||||
|
|
||||||
public BulkReply(ChannelBuffer data) {
|
|
||||||
this.data = data;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ChannelBuffer data() {
|
|
||||||
return data;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void write(ChannelBuffer out) {
|
|
||||||
out.writeByte(MARKER);
|
|
||||||
if (data == null) {
|
|
||||||
out.writeBytes(Command.NEG_ONE_AND_CRLF);
|
|
||||||
} else {
|
|
||||||
out.writeBytes(Command.numAndCRLF(data.readableBytes()));
|
|
||||||
out.writeBytes(data, data.readerIndex(), data.readableBytes());
|
|
||||||
out.writeBytes(Command.CRLF);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,154 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2011 The Netty Project
|
|
||||||
*
|
|
||||||
* The Netty Project licenses this file to you under the Apache License,
|
|
||||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at:
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package org.jboss.netty.handler.codec.redis;
|
|
||||||
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffers;
|
|
||||||
import org.jboss.netty.util.CharsetUtil;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Command serialization.
|
|
||||||
*/
|
|
||||||
public class Command {
|
|
||||||
static final byte[] ARGS_PREFIX = "*".getBytes();
|
|
||||||
static final byte[] CRLF = "\r\n".getBytes();
|
|
||||||
static final byte[] BYTES_PREFIX = "$".getBytes();
|
|
||||||
static final byte[] EMPTY_BYTES = new byte[0];
|
|
||||||
static final byte[] NEG_ONE_AND_CRLF = convertWithCRLF(-1);
|
|
||||||
|
|
||||||
private ChannelBuffer[] arguments;
|
|
||||||
private final Object[] objects;
|
|
||||||
|
|
||||||
public Command(byte[]... arguments) {
|
|
||||||
if (arguments == null) {
|
|
||||||
this.arguments = null;
|
|
||||||
objects = null;
|
|
||||||
} else {
|
|
||||||
this.arguments = new ChannelBuffer[arguments.length];
|
|
||||||
for (int i = 0; i < arguments.length; i ++) {
|
|
||||||
byte[] a = arguments[i];
|
|
||||||
if (a == null) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
this.arguments[i] = ChannelBuffers.wrappedBuffer(a);
|
|
||||||
}
|
|
||||||
objects = this.arguments;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public Command(ChannelBuffer[] arguments) {
|
|
||||||
this.arguments = arguments;
|
|
||||||
objects = arguments;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Command(Object... objects) {
|
|
||||||
this.objects = objects;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String name() {
|
|
||||||
if (arguments == null) {
|
|
||||||
Object o = objects[0];
|
|
||||||
if (o instanceof ChannelBuffer) {
|
|
||||||
return ((ChannelBuffer) o).toString(CharsetUtil.UTF_8);
|
|
||||||
}
|
|
||||||
if (o == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return o.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
ChannelBuffer name = arguments[0];
|
|
||||||
if (name == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return name.toString(CharsetUtil.UTF_8);
|
|
||||||
}
|
|
||||||
|
|
||||||
void write(ChannelBuffer out) {
|
|
||||||
writeDirect(out, objects);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void writeDirect(ChannelBuffer out, Object... objects) {
|
|
||||||
int length = objects.length;
|
|
||||||
ChannelBuffer[] arguments = new ChannelBuffer[length];
|
|
||||||
for (int i = 0; i < length; i++) {
|
|
||||||
Object object = objects[i];
|
|
||||||
if (object == null) {
|
|
||||||
arguments[i] = ChannelBuffers.EMPTY_BUFFER;
|
|
||||||
} else if (object instanceof ChannelBuffer) {
|
|
||||||
arguments[i] = (ChannelBuffer) object;
|
|
||||||
} else {
|
|
||||||
arguments[i] = ChannelBuffers.copiedBuffer(object.toString(), CharsetUtil.UTF_8);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
writeDirect(out, arguments);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void writeDirect(ChannelBuffer out, ChannelBuffer[] arguments) {
|
|
||||||
out.writeBytes(ARGS_PREFIX);
|
|
||||||
out.writeBytes(numAndCRLF(arguments.length));
|
|
||||||
for (ChannelBuffer argument : arguments) {
|
|
||||||
out.writeBytes(BYTES_PREFIX);
|
|
||||||
out.writeBytes(numAndCRLF(argument.readableBytes()));
|
|
||||||
out.writeBytes(argument, argument.readerIndex(), argument.readableBytes());
|
|
||||||
out.writeBytes(CRLF);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final int NUM_MAP_LENGTH = 256;
|
|
||||||
private static final byte[][] numAndCRLFMap = new byte[NUM_MAP_LENGTH][];
|
|
||||||
static {
|
|
||||||
for (int i = 0; i < NUM_MAP_LENGTH; i++) {
|
|
||||||
numAndCRLFMap[i] = convertWithCRLF(i);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Optimized for the direct to ASCII bytes case
|
|
||||||
// Could be even more optimized but it is already
|
|
||||||
// about twice as fast as using Long.toString().getBytes()
|
|
||||||
static byte[] numAndCRLF(long value) {
|
|
||||||
if (value >= 0 && value < NUM_MAP_LENGTH) {
|
|
||||||
return numAndCRLFMap[(int) value];
|
|
||||||
} else if (value == -1) {
|
|
||||||
return NEG_ONE_AND_CRLF;
|
|
||||||
}
|
|
||||||
return convertWithCRLF(value);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static byte[] convertWithCRLF(long value) {
|
|
||||||
boolean negative = value < 0;
|
|
||||||
int index = negative ? 2 : 1;
|
|
||||||
long current = negative ? -value : value;
|
|
||||||
while ((current /= 10) > 0) {
|
|
||||||
index++;
|
|
||||||
}
|
|
||||||
byte[] bytes = new byte[index + 2];
|
|
||||||
if (negative) {
|
|
||||||
bytes[0] = '-';
|
|
||||||
}
|
|
||||||
current = negative ? -value : value;
|
|
||||||
long tmp = current;
|
|
||||||
while ((tmp /= 10) > 0) {
|
|
||||||
bytes[--index] = (byte) ('0' + current % 10);
|
|
||||||
current = tmp;
|
|
||||||
}
|
|
||||||
bytes[--index] = (byte) ('0' + current);
|
|
||||||
// add CRLF
|
|
||||||
bytes[bytes.length - 2] = '\r';
|
|
||||||
bytes[bytes.length - 1] = '\n';
|
|
||||||
return bytes;
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,51 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2011 The Netty Project
|
|
||||||
*
|
|
||||||
* The Netty Project licenses this file to you under the Apache License,
|
|
||||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at:
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package org.jboss.netty.handler.codec.redis;
|
|
||||||
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffers;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@link Reply} which will be returned if an error was detected
|
|
||||||
*
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public class ErrorReply extends Reply {
|
|
||||||
static final char MARKER = '-';
|
|
||||||
private static final byte[] ERR = "ERR ".getBytes();
|
|
||||||
|
|
||||||
private final ChannelBuffer data;
|
|
||||||
|
|
||||||
public ErrorReply(byte[] data) {
|
|
||||||
this(data == null? null : ChannelBuffers.wrappedBuffer(data));
|
|
||||||
}
|
|
||||||
|
|
||||||
public ErrorReply(ChannelBuffer data) {
|
|
||||||
this.data = data;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ChannelBuffer data() {
|
|
||||||
return data;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void write(ChannelBuffer out) {
|
|
||||||
out.writeByte(MARKER);
|
|
||||||
out.writeBytes(ERR);
|
|
||||||
out.writeBytes(data, 0, data.readableBytes());
|
|
||||||
out.writeBytes(Command.CRLF);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,42 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2011 The Netty Project
|
|
||||||
*
|
|
||||||
* The Netty Project licenses this file to you under the Apache License,
|
|
||||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at:
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package org.jboss.netty.handler.codec.redis;
|
|
||||||
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@link Reply} which will get returned if a {@link Integer} was requested via <code>GET</code>
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public class IntegerReply extends Reply {
|
|
||||||
static final char MARKER = ':';
|
|
||||||
|
|
||||||
private final long value;
|
|
||||||
|
|
||||||
public IntegerReply(long value) {
|
|
||||||
this.value = value;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long value() {
|
|
||||||
return value;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void write(ChannelBuffer out) {
|
|
||||||
out.writeByte(MARKER);
|
|
||||||
out.writeBytes(Command.numAndCRLF(value));
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,124 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2011 The Netty Project
|
|
||||||
*
|
|
||||||
* The Netty Project licenses this file to you under the Apache License,
|
|
||||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at:
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package org.jboss.netty.handler.codec.redis;
|
|
||||||
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
|
||||||
import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@link Reply} which contains a bulk of {@link Reply}'s
|
|
||||||
*/
|
|
||||||
public class MultiBulkReply extends Reply {
|
|
||||||
static final char MARKER = '*';
|
|
||||||
|
|
||||||
// State
|
|
||||||
private Object[] values;
|
|
||||||
private int size;
|
|
||||||
private int num;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new instance with empty values.
|
|
||||||
*/
|
|
||||||
public MultiBulkReply() {
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new instance with the specified values.
|
|
||||||
*
|
|
||||||
* @param values an array whose elements are either {@link ChannelBuffer} or {@link Number}.
|
|
||||||
*/
|
|
||||||
public MultiBulkReply(Object... values) {
|
|
||||||
if (values != null) {
|
|
||||||
for (Object v: values) {
|
|
||||||
if (v == null) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (!(v instanceof ChannelBuffer || v instanceof Number)) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
"values contains an element whose type is neither " +
|
|
||||||
ChannelBuffer.class.getSimpleName() + " nor " + Number.class.getSimpleName() + ": " +
|
|
||||||
v.getClass().getName());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
this.values = values;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns an array whose elements are either {@link ChannelBuffer} or {@link Number}.
|
|
||||||
*/
|
|
||||||
public Object[] values() {
|
|
||||||
return values;
|
|
||||||
}
|
|
||||||
|
|
||||||
void read(RedisReplyDecoder decoder, ChannelBuffer in) throws Exception {
|
|
||||||
// If we attempted to read the size before, skip the '*' and reread it
|
|
||||||
if (size == -1) {
|
|
||||||
byte star = in.readByte();
|
|
||||||
if (star == MARKER) {
|
|
||||||
size = 0;
|
|
||||||
} else {
|
|
||||||
throw new CorruptedFrameException("Unexpected character in stream: " + star);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (size == 0) {
|
|
||||||
// If the read fails, we need to skip the star
|
|
||||||
size = -1;
|
|
||||||
// Read the size, if this is successful we won't read the star again
|
|
||||||
size = RedisReplyDecoder.readInteger(in);
|
|
||||||
values = new Object[size];
|
|
||||||
decoder.checkpoint();
|
|
||||||
}
|
|
||||||
for (int i = num; i < size; i++) {
|
|
||||||
int read = in.readByte();
|
|
||||||
if (read == BulkReply.MARKER) {
|
|
||||||
values[i] = RedisReplyDecoder.readBytes(in);
|
|
||||||
} else if (read == IntegerReply.MARKER) {
|
|
||||||
values[i] = RedisReplyDecoder.readInteger(in);
|
|
||||||
} else {
|
|
||||||
throw new CorruptedFrameException("Unexpected character in stream: " + read);
|
|
||||||
}
|
|
||||||
num = i + 1;
|
|
||||||
decoder.checkpoint();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void write(ChannelBuffer out) {
|
|
||||||
out.writeByte(MARKER);
|
|
||||||
if (values == null) {
|
|
||||||
out.writeBytes(Command.NEG_ONE_AND_CRLF);
|
|
||||||
} else {
|
|
||||||
out.writeBytes(Command.numAndCRLF(values.length));
|
|
||||||
for (Object value : values) {
|
|
||||||
if (value == null) {
|
|
||||||
out.writeByte(BulkReply.MARKER);
|
|
||||||
out.writeBytes(Command.NEG_ONE_AND_CRLF);
|
|
||||||
} else if (value instanceof ChannelBuffer) {
|
|
||||||
ChannelBuffer bytes = (ChannelBuffer) value;
|
|
||||||
out.writeByte(BulkReply.MARKER);
|
|
||||||
int length = bytes.readableBytes();
|
|
||||||
out.writeBytes(Command.numAndCRLF(length));
|
|
||||||
out.writeBytes(bytes, bytes.readerIndex(), length);
|
|
||||||
out.writeBytes(Command.CRLF);
|
|
||||||
} else if (value instanceof Number) {
|
|
||||||
out.writeByte(IntegerReply.MARKER);
|
|
||||||
out.writeBytes(Command.numAndCRLF(((Number) value).longValue()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,29 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2011 The Netty Project
|
|
||||||
*
|
|
||||||
* The Netty Project licenses this file to you under the Apache License,
|
|
||||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at:
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package org.jboss.netty.handler.codec.redis;
|
|
||||||
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
|
||||||
|
|
||||||
public class PSubscribeReply extends SubscribeReply {
|
|
||||||
|
|
||||||
public PSubscribeReply(byte[][] patterns) {
|
|
||||||
super(patterns);
|
|
||||||
}
|
|
||||||
|
|
||||||
public PSubscribeReply(ChannelBuffer[] patterns) {
|
|
||||||
super(patterns);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,29 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2011 The Netty Project
|
|
||||||
*
|
|
||||||
* The Netty Project licenses this file to you under the Apache License,
|
|
||||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at:
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package org.jboss.netty.handler.codec.redis;
|
|
||||||
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
|
||||||
|
|
||||||
public class PUnsubscribeReply extends UnsubscribeReply {
|
|
||||||
|
|
||||||
public PUnsubscribeReply(byte[][] patterns) {
|
|
||||||
super(patterns);
|
|
||||||
}
|
|
||||||
|
|
||||||
public PUnsubscribeReply(ChannelBuffer[] patterns) {
|
|
||||||
super(patterns);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,63 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2011 The Netty Project
|
|
||||||
*
|
|
||||||
* The Netty Project licenses this file to you under the Apache License,
|
|
||||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at:
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package org.jboss.netty.handler.codec.redis;
|
|
||||||
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffers;
|
|
||||||
import org.jboss.netty.channel.ChannelFuture;
|
|
||||||
import org.jboss.netty.channel.ChannelHandler.Sharable;
|
|
||||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
|
||||||
import org.jboss.netty.channel.Channels;
|
|
||||||
import org.jboss.netty.channel.MessageEvent;
|
|
||||||
import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@link SimpleChannelDownstreamHandler} which encodes {@link Command}'s to {@link ChannelBuffer}'s
|
|
||||||
*/
|
|
||||||
@Sharable
|
|
||||||
public class RedisCommandEncoder extends SimpleChannelDownstreamHandler {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
|
|
||||||
Object o = e.getMessage();
|
|
||||||
if (o instanceof Command) {
|
|
||||||
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
|
|
||||||
ChannelFuture future = e.getFuture();
|
|
||||||
|
|
||||||
Command command = (Command) o;
|
|
||||||
command.write(cb);
|
|
||||||
Channels.write(ctx, future, cb);
|
|
||||||
|
|
||||||
} else if (o instanceof Iterable) {
|
|
||||||
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
|
|
||||||
ChannelFuture future = e.getFuture();
|
|
||||||
|
|
||||||
// Useful for transactions and database select
|
|
||||||
for (Object i : (Iterable<?>) o) {
|
|
||||||
if (i instanceof Command) {
|
|
||||||
Command command = (Command) i;
|
|
||||||
command.write(cb);
|
|
||||||
} else {
|
|
||||||
super.writeRequested(ctx, e);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Channels.write(ctx, future, cb);
|
|
||||||
} else {
|
|
||||||
super.writeRequested(ctx, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,144 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2011 The Netty Project
|
|
||||||
*
|
|
||||||
* The Netty Project licenses this file to you under the Apache License,
|
|
||||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at:
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package org.jboss.netty.handler.codec.redis;
|
|
||||||
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
|
||||||
import org.jboss.netty.buffer.ChannelBufferIndexFinder;
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffers;
|
|
||||||
import org.jboss.netty.channel.Channel;
|
|
||||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
|
||||||
import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
|
|
||||||
import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
|
|
||||||
import org.jboss.netty.handler.codec.replay.VoidEnum;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@link ReplayingDecoder} which handles Redis protocol
|
|
||||||
*/
|
|
||||||
public class RedisReplyDecoder extends ReplayingDecoder<VoidEnum> {
|
|
||||||
|
|
||||||
private static final char CR = '\r';
|
|
||||||
private static final char LF = '\n';
|
|
||||||
private static final char ZERO = '0';
|
|
||||||
|
|
||||||
// We track the current multibulk reply in the case
|
|
||||||
// where we do not get a complete reply in a single
|
|
||||||
// decode invocation.
|
|
||||||
private MultiBulkReply reply;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return a byte array which contains only the content of the request. The size of the content is read from the given {@link ChannelBuffer}
|
|
||||||
* via the {@link #readInteger(ChannelBuffer)} method
|
|
||||||
*
|
|
||||||
* @param is the {@link ChannelBuffer} to read from
|
|
||||||
* @return content
|
|
||||||
* @throws CorruptedFrameException if the line-ending is not CRLF
|
|
||||||
*/
|
|
||||||
static ChannelBuffer readBytes(ChannelBuffer is) throws Exception {
|
|
||||||
int size = readInteger(is);
|
|
||||||
if (size == -1) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
ChannelBuffer bytes = ChannelBuffers.buffer(size);
|
|
||||||
is.readBytes(bytes, 0, size);
|
|
||||||
bytes.writerIndex(size);
|
|
||||||
int cr = is.readByte();
|
|
||||||
int lf = is.readByte();
|
|
||||||
if (cr != CR || lf != LF) {
|
|
||||||
throw new CorruptedFrameException("Improper line ending: " + cr + ", " + lf);
|
|
||||||
}
|
|
||||||
return bytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Read an {@link Integer} from the {@link ChannelBuffer}
|
|
||||||
*/
|
|
||||||
static int readInteger(ChannelBuffer in) throws Exception {
|
|
||||||
int size = 0;
|
|
||||||
int sign = 1;
|
|
||||||
int read = in.readByte();
|
|
||||||
if (read == '-') {
|
|
||||||
read = in.readByte();
|
|
||||||
sign = -1;
|
|
||||||
}
|
|
||||||
do {
|
|
||||||
if (read == CR) {
|
|
||||||
if (in.readByte() == LF) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
int value = read - ZERO;
|
|
||||||
if (value >= 0 && value < 10) {
|
|
||||||
size *= 10;
|
|
||||||
size += value;
|
|
||||||
} else {
|
|
||||||
throw new CorruptedFrameException("Invalid character in integer");
|
|
||||||
}
|
|
||||||
read = in.readByte();
|
|
||||||
} while (true);
|
|
||||||
return size * sign;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void checkpoint() {
|
|
||||||
super.checkpoint();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Object decode(ChannelHandlerContext channelHandlerContext, Channel channel, ChannelBuffer channelBuffer, VoidEnum anEnum) throws Exception {
|
|
||||||
if (reply != null) {
|
|
||||||
reply.read(this, channelBuffer);
|
|
||||||
Reply ret = reply;
|
|
||||||
reply = null;
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
int code = channelBuffer.readByte();
|
|
||||||
switch (code) {
|
|
||||||
case StatusReply.MARKER: {
|
|
||||||
ChannelBuffer status = channelBuffer.readBytes(channelBuffer.bytesBefore(ChannelBufferIndexFinder.CRLF));
|
|
||||||
channelBuffer.skipBytes(2);
|
|
||||||
return new StatusReply(status);
|
|
||||||
}
|
|
||||||
case ErrorReply.MARKER: {
|
|
||||||
ChannelBuffer error = channelBuffer.readBytes(channelBuffer.bytesBefore(ChannelBufferIndexFinder.CRLF));
|
|
||||||
channelBuffer.skipBytes(2);
|
|
||||||
return new ErrorReply(error);
|
|
||||||
}
|
|
||||||
case IntegerReply.MARKER: {
|
|
||||||
return new IntegerReply(readInteger(channelBuffer));
|
|
||||||
}
|
|
||||||
case BulkReply.MARKER: {
|
|
||||||
return new BulkReply(readBytes(channelBuffer));
|
|
||||||
}
|
|
||||||
case MultiBulkReply.MARKER: {
|
|
||||||
return decodeMultiBulkReply(channelBuffer);
|
|
||||||
}
|
|
||||||
default: {
|
|
||||||
throw new IOException("Unexpected character in stream: " + code);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private MultiBulkReply decodeMultiBulkReply(ChannelBuffer is) throws Exception {
|
|
||||||
if (reply == null) {
|
|
||||||
reply = new MultiBulkReply();
|
|
||||||
}
|
|
||||||
reply.read(this, is);
|
|
||||||
return reply;
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,39 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2011 The Netty Project
|
|
||||||
*
|
|
||||||
* The Netty Project licenses this file to you under the Apache License,
|
|
||||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at:
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package org.jboss.netty.handler.codec.redis;
|
|
||||||
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffers;
|
|
||||||
import org.jboss.netty.util.CharsetUtil;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@link Reply} which was sent as a Response to the written {@link Command}
|
|
||||||
* *
|
|
||||||
*/
|
|
||||||
public abstract class Reply {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Write the content of the {@link Reply} to the given {@link ChannelBuffer}
|
|
||||||
*/
|
|
||||||
abstract void write(ChannelBuffer out);
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer();
|
|
||||||
write(channelBuffer);
|
|
||||||
return channelBuffer.toString(CharsetUtil.UTF_8);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,47 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2011 The Netty Project
|
|
||||||
*
|
|
||||||
* The Netty Project licenses this file to you under the Apache License,
|
|
||||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at:
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package org.jboss.netty.handler.codec.redis;
|
|
||||||
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffers;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@link Reply} which contains the status
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public class StatusReply extends Reply {
|
|
||||||
static final char MARKER = '+';
|
|
||||||
private final ChannelBuffer data;
|
|
||||||
|
|
||||||
public StatusReply(byte[] data) {
|
|
||||||
this(data == null? null : ChannelBuffers.wrappedBuffer(data));
|
|
||||||
}
|
|
||||||
|
|
||||||
public StatusReply(ChannelBuffer data) {
|
|
||||||
this.data = data;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ChannelBuffer data() {
|
|
||||||
return data;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void write(ChannelBuffer out) {
|
|
||||||
out.writeByte(MARKER);
|
|
||||||
out.writeBytes(data, data.readerIndex(), data.readableBytes());
|
|
||||||
out.writeBytes(Command.CRLF);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,48 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2011 The Netty Project
|
|
||||||
*
|
|
||||||
* The Netty Project licenses this file to you under the Apache License,
|
|
||||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at:
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package org.jboss.netty.handler.codec.redis;
|
|
||||||
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffers;
|
|
||||||
|
|
||||||
public class SubscribeReply extends Reply {
|
|
||||||
|
|
||||||
private final ChannelBuffer[] patterns;
|
|
||||||
|
|
||||||
public SubscribeReply(byte[][] patterns) {
|
|
||||||
this.patterns = new ChannelBuffer[patterns.length];
|
|
||||||
for (int i = 0; i < patterns.length; i ++) {
|
|
||||||
byte[] p = patterns[i];
|
|
||||||
if (p == null) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
this.patterns[i] = ChannelBuffers.wrappedBuffer(p);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public SubscribeReply(ChannelBuffer[] patterns) {
|
|
||||||
this.patterns = patterns;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ChannelBuffer[] patterns() {
|
|
||||||
return patterns;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void write(ChannelBuffer out) {
|
|
||||||
// Do nothing
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,48 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2011 The Netty Project
|
|
||||||
*
|
|
||||||
* The Netty Project licenses this file to you under the Apache License,
|
|
||||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at:
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package org.jboss.netty.handler.codec.redis;
|
|
||||||
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffers;
|
|
||||||
|
|
||||||
public class UnsubscribeReply extends Reply {
|
|
||||||
|
|
||||||
private final ChannelBuffer[] patterns;
|
|
||||||
|
|
||||||
public UnsubscribeReply(byte[][] patterns) {
|
|
||||||
this.patterns = new ChannelBuffer[patterns.length];
|
|
||||||
for (int i = 0; i < patterns.length; i ++) {
|
|
||||||
byte[] p = patterns[i];
|
|
||||||
if (p == null) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
this.patterns[i] = ChannelBuffers.wrappedBuffer(p);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public UnsubscribeReply(ChannelBuffer[] patterns) {
|
|
||||||
this.patterns = patterns;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ChannelBuffer[] patterns() {
|
|
||||||
return patterns;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void write(ChannelBuffer out) {
|
|
||||||
// Do nothing.
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,24 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2011 The Netty Project
|
|
||||||
*
|
|
||||||
* The Netty Project licenses this file to you under the Apache License,
|
|
||||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at:
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Encoder and decoder which transform a
|
|
||||||
* <a href="http://http://redis.io/topics/protocol">Redis protocol commands and replies</a>
|
|
||||||
* into a {@link org.jboss.netty.buffer.ChannelBuffer}
|
|
||||||
* and vice versa.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
package org.jboss.netty.handler.codec.redis;
|
|
@ -1,134 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2011 The Netty Project
|
|
||||||
*
|
|
||||||
* The Netty Project licenses this file to you under the Apache License,
|
|
||||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at:
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package org.jboss.netty.handler.codec.redis;
|
|
||||||
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffers;
|
|
||||||
import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
|
|
||||||
import org.jboss.netty.util.CharsetUtil;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertNull;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
public class RedisCodecTest {
|
|
||||||
|
|
||||||
private DecoderEmbedder<ChannelBuffer> embedder;
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setUp() {
|
|
||||||
embedder = new DecoderEmbedder<ChannelBuffer>(new RedisReplyDecoder());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void decodeReplies() throws IOException {
|
|
||||||
{
|
|
||||||
Object receive = decode("+OK\r\n".getBytes());
|
|
||||||
assertTrue(receive instanceof StatusReply);
|
|
||||||
assertEquals("OK", ((StatusReply) receive).data().toString(CharsetUtil.UTF_8));
|
|
||||||
}
|
|
||||||
{
|
|
||||||
Object receive = decode("-ERROR\r\n".getBytes());
|
|
||||||
assertTrue(receive instanceof ErrorReply);
|
|
||||||
assertEquals("ERROR", ((ErrorReply) receive).data().toString(CharsetUtil.UTF_8));
|
|
||||||
}
|
|
||||||
{
|
|
||||||
Object receive = decode(":123\r\n".getBytes());
|
|
||||||
assertTrue(receive instanceof IntegerReply);
|
|
||||||
assertEquals(123, ((IntegerReply) receive).value());
|
|
||||||
}
|
|
||||||
{
|
|
||||||
Object receive = decode("$5\r\nnetty\r\n".getBytes());
|
|
||||||
assertTrue(receive instanceof BulkReply);
|
|
||||||
assertEquals("netty", ((BulkReply) receive).data().toString(CharsetUtil.UTF_8));
|
|
||||||
}
|
|
||||||
{
|
|
||||||
Object receive = decode("*2\r\n$5\r\nnetty\r\n$5\r\nrules\r\n".getBytes());
|
|
||||||
assertTrue(receive instanceof MultiBulkReply);
|
|
||||||
assertEquals("netty", ((ChannelBuffer) ((MultiBulkReply) receive).values()[0]).toString(CharsetUtil.UTF_8));
|
|
||||||
assertEquals("rules", ((ChannelBuffer) ((MultiBulkReply) receive).values()[1]).toString(CharsetUtil.UTF_8));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private Object decode(byte[] bytes) {
|
|
||||||
embedder.offer(wrappedBuffer(bytes));
|
|
||||||
return embedder.poll();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void encodeCommands() throws IOException {
|
|
||||||
String setCommand = "*3\r\n" +
|
|
||||||
"$3\r\n" +
|
|
||||||
"SET\r\n" +
|
|
||||||
"$5\r\n" +
|
|
||||||
"mykey\r\n" +
|
|
||||||
"$7\r\n" +
|
|
||||||
"myvalue\r\n";
|
|
||||||
Command command = new Command("SET", "mykey", "myvalue");
|
|
||||||
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
|
|
||||||
command.write(cb);
|
|
||||||
assertEquals(setCommand, cb.toString(CharsetUtil.US_ASCII));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testReplayDecoding() {
|
|
||||||
{
|
|
||||||
embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n".getBytes()));
|
|
||||||
Object receive = embedder.poll();
|
|
||||||
assertNull(receive);
|
|
||||||
embedder.offer(wrappedBuffer("$5\r\nrules\r\n".getBytes()));
|
|
||||||
receive = embedder.poll();
|
|
||||||
assertTrue(receive instanceof MultiBulkReply);
|
|
||||||
assertEquals("netty", ((ChannelBuffer) ((MultiBulkReply) receive).values()[0]).toString(CharsetUtil.UTF_8));
|
|
||||||
assertEquals("rules", ((ChannelBuffer) ((MultiBulkReply) receive).values()[1]).toString(CharsetUtil.UTF_8));
|
|
||||||
}
|
|
||||||
{
|
|
||||||
embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n$5\r\nr".getBytes()));
|
|
||||||
Object receive = embedder.poll();
|
|
||||||
assertNull(receive);
|
|
||||||
embedder.offer(wrappedBuffer("ules\r\n".getBytes()));
|
|
||||||
receive = embedder.poll();
|
|
||||||
assertTrue(receive instanceof MultiBulkReply);
|
|
||||||
assertEquals("netty", ((ChannelBuffer) ((MultiBulkReply) receive).values()[0]).toString(CharsetUtil.UTF_8));
|
|
||||||
assertEquals("rules", ((ChannelBuffer) ((MultiBulkReply) receive).values()[1]).toString(CharsetUtil.UTF_8));
|
|
||||||
}
|
|
||||||
{
|
|
||||||
embedder.offer(wrappedBuffer("*2".getBytes()));
|
|
||||||
Object receive = embedder.poll();
|
|
||||||
assertNull(receive);
|
|
||||||
embedder.offer(wrappedBuffer("\r\n$5\r\nnetty\r\n$5\r\nrules\r\n".getBytes()));
|
|
||||||
receive = embedder.poll();
|
|
||||||
assertTrue(receive instanceof MultiBulkReply);
|
|
||||||
assertEquals("netty", ((ChannelBuffer) ((MultiBulkReply) receive).values()[0]).toString(CharsetUtil.UTF_8));
|
|
||||||
assertEquals("rules", ((ChannelBuffer) ((MultiBulkReply) receive).values()[1]).toString(CharsetUtil.UTF_8));
|
|
||||||
}
|
|
||||||
{
|
|
||||||
embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n$5\r\nrules\r".getBytes()));
|
|
||||||
Object receive = embedder.poll();
|
|
||||||
assertNull(receive);
|
|
||||||
embedder.offer(wrappedBuffer("\n".getBytes()));
|
|
||||||
receive = embedder.poll();
|
|
||||||
assertTrue(receive instanceof MultiBulkReply);
|
|
||||||
assertEquals("netty", ((ChannelBuffer) ((MultiBulkReply) receive).values()[0]).toString(CharsetUtil.UTF_8));
|
|
||||||
assertEquals("rules", ((ChannelBuffer) ((MultiBulkReply) receive).values()[1]).toString(CharsetUtil.UTF_8));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
x
Reference in New Issue
Block a user