diff --git a/src/main/java/org/jboss/netty/example/redis/RedisClient.java b/src/main/java/org/jboss/netty/example/redis/RedisClient.java deleted file mode 100644 index 0f961ce903..0000000000 --- a/src/main/java/org/jboss/netty/example/redis/RedisClient.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.example.redis; - -import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.jboss.netty.handler.codec.redis.Command; -import org.jboss.netty.handler.codec.redis.RedisCommandEncoder; -import org.jboss.netty.handler.codec.redis.RedisReplyDecoder; -import org.jboss.netty.handler.codec.redis.Reply; -import org.jboss.netty.handler.queue.BlockingReadHandler; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -public final class RedisClient { - private static final byte[] VALUE = "value".getBytes(); - - public static void main(String[] args) throws Exception { - ExecutorService executor = Executors.newCachedThreadPool(); - final ClientBootstrap cb = new ClientBootstrap(new NioClientSocketChannelFactory(executor, executor)); - final BlockingReadHandler blockingReadHandler = new BlockingReadHandler(); - cb.setPipelineFactory(new ChannelPipelineFactory() { - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); - pipeline.addLast("redisEncoder", new RedisCommandEncoder()); - pipeline.addLast("redisDecoder", new RedisReplyDecoder()); - pipeline.addLast("result", blockingReadHandler); - return pipeline; - } - }); - ChannelFuture redis = cb.connect(new InetSocketAddress("localhost", 6379)); - redis.await().rethrowIfFailed(); - Channel channel = redis.getChannel(); - - channel.write(new Command("set", "1", "value")); - System.out.print(blockingReadHandler.read()); - channel.write(new Command("get", "1")); - System.out.print(blockingReadHandler.read()); - - int CALLS = 1000000; - int PIPELINE = 50; - requestResponse(blockingReadHandler, channel, CALLS); - pipelinedIndividualRequests(blockingReadHandler, channel, CALLS * 10, PIPELINE); - pipelinedListOfRequests(blockingReadHandler, channel, CALLS * 10, PIPELINE); - - channel.close(); - cb.releaseExternalResources(); - } - - private static void pipelinedListOfRequests(BlockingReadHandler blockingReadHandler, Channel channel, long CALLS, int PIPELINE) throws IOException, InterruptedException { - long start = System.currentTimeMillis(); - byte[] SET_BYTES = "SET".getBytes(); - for (int i = 0; i < CALLS / PIPELINE; i++) { - List list = new ArrayList(); - for (int j = 0; j < PIPELINE; j++) { - int base = i * PIPELINE; - list.add(new Command(SET_BYTES, String.valueOf(base + j).getBytes(), VALUE)); - } - channel.write(list); - for (int j = 0; j < PIPELINE; j++) { - blockingReadHandler.read(); - } - } - long end = System.currentTimeMillis(); - System.out.println(CALLS * 1000 / (end - start) + " calls per second"); - } - - private static void pipelinedIndividualRequests(BlockingReadHandler blockingReadHandler, Channel channel, long CALLS, int PIPELINE) throws IOException, InterruptedException { - long start = System.currentTimeMillis(); - byte[] SET_BYTES = "SET".getBytes(); - for (int i = 0; i < CALLS / PIPELINE; i++) { - int base = i * PIPELINE; - for (int j = 0; j < PIPELINE; j++) { - channel.write(new Command(SET_BYTES, String.valueOf(base + j).getBytes(), VALUE)); - } - for (int j = 0; j < PIPELINE; j++) { - blockingReadHandler.read(); - } - } - long end = System.currentTimeMillis(); - System.out.println(CALLS * 1000 / (end - start) + " calls per second"); - } - - private static void requestResponse(BlockingReadHandler blockingReadHandler, Channel channel, int CALLS) throws IOException, InterruptedException { - long start = System.currentTimeMillis(); - byte[] SET_BYTES = "SET".getBytes(); - for (int i = 0; i < CALLS; i++) { - channel.write(new Command(SET_BYTES, String.valueOf(i).getBytes(), VALUE)); - blockingReadHandler.read(); - } - long end = System.currentTimeMillis(); - System.out.println(CALLS * 1000 / (end - start) + " calls per second"); - } - - private RedisClient() { - - } -} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/BulkReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/BulkReply.java deleted file mode 100644 index d2a01b205f..0000000000 --- a/src/main/java/org/jboss/netty/handler/codec/redis/BulkReply.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.handler.codec.redis; - -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; - -public class BulkReply extends Reply { - static final char MARKER = '$'; - - private final ChannelBuffer data; - - public BulkReply(byte[] data) { - this(data == null? null : ChannelBuffers.wrappedBuffer(data)); - } - - public BulkReply(ChannelBuffer data) { - this.data = data; - } - - public ChannelBuffer data() { - return data; - } - - @Override - void write(ChannelBuffer out) { - out.writeByte(MARKER); - if (data == null) { - out.writeBytes(Command.NEG_ONE_AND_CRLF); - } else { - out.writeBytes(Command.numAndCRLF(data.readableBytes())); - out.writeBytes(data, data.readerIndex(), data.readableBytes()); - out.writeBytes(Command.CRLF); - } - } -} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/Command.java b/src/main/java/org/jboss/netty/handler/codec/redis/Command.java deleted file mode 100644 index 0fc1e903be..0000000000 --- a/src/main/java/org/jboss/netty/handler/codec/redis/Command.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.handler.codec.redis; - -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.util.CharsetUtil; - -/** - * Command serialization. - */ -public class Command { - static final byte[] ARGS_PREFIX = "*".getBytes(); - static final byte[] CRLF = "\r\n".getBytes(); - static final byte[] BYTES_PREFIX = "$".getBytes(); - static final byte[] EMPTY_BYTES = new byte[0]; - static final byte[] NEG_ONE_AND_CRLF = convertWithCRLF(-1); - - private ChannelBuffer[] arguments; - private final Object[] objects; - - public Command(byte[]... arguments) { - if (arguments == null) { - this.arguments = null; - objects = null; - } else { - this.arguments = new ChannelBuffer[arguments.length]; - for (int i = 0; i < arguments.length; i ++) { - byte[] a = arguments[i]; - if (a == null) { - continue; - } - this.arguments[i] = ChannelBuffers.wrappedBuffer(a); - } - objects = this.arguments; - } - } - - public Command(ChannelBuffer[] arguments) { - this.arguments = arguments; - objects = arguments; - } - - public Command(Object... objects) { - this.objects = objects; - } - - public String name() { - if (arguments == null) { - Object o = objects[0]; - if (o instanceof ChannelBuffer) { - return ((ChannelBuffer) o).toString(CharsetUtil.UTF_8); - } - if (o == null) { - return null; - } - return o.toString(); - } - - ChannelBuffer name = arguments[0]; - if (name == null) { - return null; - } - return name.toString(CharsetUtil.UTF_8); - } - - void write(ChannelBuffer out) { - writeDirect(out, objects); - } - - private static void writeDirect(ChannelBuffer out, Object... objects) { - int length = objects.length; - ChannelBuffer[] arguments = new ChannelBuffer[length]; - for (int i = 0; i < length; i++) { - Object object = objects[i]; - if (object == null) { - arguments[i] = ChannelBuffers.EMPTY_BUFFER; - } else if (object instanceof ChannelBuffer) { - arguments[i] = (ChannelBuffer) object; - } else { - arguments[i] = ChannelBuffers.copiedBuffer(object.toString(), CharsetUtil.UTF_8); - } - } - writeDirect(out, arguments); - } - - private static void writeDirect(ChannelBuffer out, ChannelBuffer[] arguments) { - out.writeBytes(ARGS_PREFIX); - out.writeBytes(numAndCRLF(arguments.length)); - for (ChannelBuffer argument : arguments) { - out.writeBytes(BYTES_PREFIX); - out.writeBytes(numAndCRLF(argument.readableBytes())); - out.writeBytes(argument, argument.readerIndex(), argument.readableBytes()); - out.writeBytes(CRLF); - } - } - - private static final int NUM_MAP_LENGTH = 256; - private static final byte[][] numAndCRLFMap = new byte[NUM_MAP_LENGTH][]; - static { - for (int i = 0; i < NUM_MAP_LENGTH; i++) { - numAndCRLFMap[i] = convertWithCRLF(i); - } - } - - // Optimized for the direct to ASCII bytes case - // Could be even more optimized but it is already - // about twice as fast as using Long.toString().getBytes() - static byte[] numAndCRLF(long value) { - if (value >= 0 && value < NUM_MAP_LENGTH) { - return numAndCRLFMap[(int) value]; - } else if (value == -1) { - return NEG_ONE_AND_CRLF; - } - return convertWithCRLF(value); - } - - private static byte[] convertWithCRLF(long value) { - boolean negative = value < 0; - int index = negative ? 2 : 1; - long current = negative ? -value : value; - while ((current /= 10) > 0) { - index++; - } - byte[] bytes = new byte[index + 2]; - if (negative) { - bytes[0] = '-'; - } - current = negative ? -value : value; - long tmp = current; - while ((tmp /= 10) > 0) { - bytes[--index] = (byte) ('0' + current % 10); - current = tmp; - } - bytes[--index] = (byte) ('0' + current); - // add CRLF - bytes[bytes.length - 2] = '\r'; - bytes[bytes.length - 1] = '\n'; - return bytes; - } -} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/ErrorReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/ErrorReply.java deleted file mode 100644 index 76d1486f28..0000000000 --- a/src/main/java/org/jboss/netty/handler/codec/redis/ErrorReply.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.handler.codec.redis; - -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; - -/** - * {@link Reply} which will be returned if an error was detected - * - * - */ -public class ErrorReply extends Reply { - static final char MARKER = '-'; - private static final byte[] ERR = "ERR ".getBytes(); - - private final ChannelBuffer data; - - public ErrorReply(byte[] data) { - this(data == null? null : ChannelBuffers.wrappedBuffer(data)); - } - - public ErrorReply(ChannelBuffer data) { - this.data = data; - } - - public ChannelBuffer data() { - return data; - } - - @Override - public void write(ChannelBuffer out) { - out.writeByte(MARKER); - out.writeBytes(ERR); - out.writeBytes(data, 0, data.readableBytes()); - out.writeBytes(Command.CRLF); - } -} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/IntegerReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/IntegerReply.java deleted file mode 100644 index 8e629d0148..0000000000 --- a/src/main/java/org/jboss/netty/handler/codec/redis/IntegerReply.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.handler.codec.redis; - -import org.jboss.netty.buffer.ChannelBuffer; - -/** - * {@link Reply} which will get returned if a {@link Integer} was requested via GET - * - */ -public class IntegerReply extends Reply { - static final char MARKER = ':'; - - private final long value; - - public IntegerReply(long value) { - this.value = value; - } - - public long value() { - return value; - } - - @Override - void write(ChannelBuffer out) { - out.writeByte(MARKER); - out.writeBytes(Command.numAndCRLF(value)); - } -} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/MultiBulkReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/MultiBulkReply.java deleted file mode 100644 index eb719bd059..0000000000 --- a/src/main/java/org/jboss/netty/handler/codec/redis/MultiBulkReply.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.handler.codec.redis; - -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.handler.codec.frame.CorruptedFrameException; - -/** - * {@link Reply} which contains a bulk of {@link Reply}'s - */ -public class MultiBulkReply extends Reply { - static final char MARKER = '*'; - - // State - private Object[] values; - private int size; - private int num; - - /** - * Creates a new instance with empty values. - */ - public MultiBulkReply() { - } - - /** - * Creates a new instance with the specified values. - * - * @param values an array whose elements are either {@link ChannelBuffer} or {@link Number}. - */ - public MultiBulkReply(Object... values) { - if (values != null) { - for (Object v: values) { - if (v == null) { - continue; - } - if (!(v instanceof ChannelBuffer || v instanceof Number)) { - throw new IllegalArgumentException( - "values contains an element whose type is neither " + - ChannelBuffer.class.getSimpleName() + " nor " + Number.class.getSimpleName() + ": " + - v.getClass().getName()); - } - } - this.values = values; - } - } - - /** - * Returns an array whose elements are either {@link ChannelBuffer} or {@link Number}. - */ - public Object[] values() { - return values; - } - - void read(RedisReplyDecoder decoder, ChannelBuffer in) throws Exception { - // If we attempted to read the size before, skip the '*' and reread it - if (size == -1) { - byte star = in.readByte(); - if (star == MARKER) { - size = 0; - } else { - throw new CorruptedFrameException("Unexpected character in stream: " + star); - } - } - if (size == 0) { - // If the read fails, we need to skip the star - size = -1; - // Read the size, if this is successful we won't read the star again - size = RedisReplyDecoder.readInteger(in); - values = new Object[size]; - decoder.checkpoint(); - } - for (int i = num; i < size; i++) { - int read = in.readByte(); - if (read == BulkReply.MARKER) { - values[i] = RedisReplyDecoder.readBytes(in); - } else if (read == IntegerReply.MARKER) { - values[i] = RedisReplyDecoder.readInteger(in); - } else { - throw new CorruptedFrameException("Unexpected character in stream: " + read); - } - num = i + 1; - decoder.checkpoint(); - } - } - - @Override - void write(ChannelBuffer out) { - out.writeByte(MARKER); - if (values == null) { - out.writeBytes(Command.NEG_ONE_AND_CRLF); - } else { - out.writeBytes(Command.numAndCRLF(values.length)); - for (Object value : values) { - if (value == null) { - out.writeByte(BulkReply.MARKER); - out.writeBytes(Command.NEG_ONE_AND_CRLF); - } else if (value instanceof ChannelBuffer) { - ChannelBuffer bytes = (ChannelBuffer) value; - out.writeByte(BulkReply.MARKER); - int length = bytes.readableBytes(); - out.writeBytes(Command.numAndCRLF(length)); - out.writeBytes(bytes, bytes.readerIndex(), length); - out.writeBytes(Command.CRLF); - } else if (value instanceof Number) { - out.writeByte(IntegerReply.MARKER); - out.writeBytes(Command.numAndCRLF(((Number) value).longValue())); - } - } - } - } -} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/PSubscribeReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/PSubscribeReply.java deleted file mode 100644 index 9101aa2a20..0000000000 --- a/src/main/java/org/jboss/netty/handler/codec/redis/PSubscribeReply.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.handler.codec.redis; - -import org.jboss.netty.buffer.ChannelBuffer; - -public class PSubscribeReply extends SubscribeReply { - - public PSubscribeReply(byte[][] patterns) { - super(patterns); - } - - public PSubscribeReply(ChannelBuffer[] patterns) { - super(patterns); - } -} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/PUnsubscribeReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/PUnsubscribeReply.java deleted file mode 100644 index 2da7b91024..0000000000 --- a/src/main/java/org/jboss/netty/handler/codec/redis/PUnsubscribeReply.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.handler.codec.redis; - -import org.jboss.netty.buffer.ChannelBuffer; - -public class PUnsubscribeReply extends UnsubscribeReply { - - public PUnsubscribeReply(byte[][] patterns) { - super(patterns); - } - - public PUnsubscribeReply(ChannelBuffer[] patterns) { - super(patterns); - } -} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/RedisCommandEncoder.java b/src/main/java/org/jboss/netty/handler/codec/redis/RedisCommandEncoder.java deleted file mode 100644 index aae40617ff..0000000000 --- a/src/main/java/org/jboss/netty/handler/codec/redis/RedisCommandEncoder.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.handler.codec.redis; - -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelHandler.Sharable; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelDownstreamHandler; - -/** - * {@link SimpleChannelDownstreamHandler} which encodes {@link Command}'s to {@link ChannelBuffer}'s - */ -@Sharable -public class RedisCommandEncoder extends SimpleChannelDownstreamHandler { - - @Override - public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - Object o = e.getMessage(); - if (o instanceof Command) { - ChannelBuffer cb = ChannelBuffers.dynamicBuffer(); - ChannelFuture future = e.getFuture(); - - Command command = (Command) o; - command.write(cb); - Channels.write(ctx, future, cb); - - } else if (o instanceof Iterable) { - ChannelBuffer cb = ChannelBuffers.dynamicBuffer(); - ChannelFuture future = e.getFuture(); - - // Useful for transactions and database select - for (Object i : (Iterable) o) { - if (i instanceof Command) { - Command command = (Command) i; - command.write(cb); - } else { - super.writeRequested(ctx, e); - return; - } - } - Channels.write(ctx, future, cb); - } else { - super.writeRequested(ctx, e); - } - } -} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/RedisReplyDecoder.java b/src/main/java/org/jboss/netty/handler/codec/redis/RedisReplyDecoder.java deleted file mode 100644 index f87a72c7ac..0000000000 --- a/src/main/java/org/jboss/netty/handler/codec/redis/RedisReplyDecoder.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.handler.codec.redis; - -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBufferIndexFinder; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.handler.codec.frame.CorruptedFrameException; -import org.jboss.netty.handler.codec.replay.ReplayingDecoder; -import org.jboss.netty.handler.codec.replay.VoidEnum; - -import java.io.IOException; - -/** - * {@link ReplayingDecoder} which handles Redis protocol - */ -public class RedisReplyDecoder extends ReplayingDecoder { - - private static final char CR = '\r'; - private static final char LF = '\n'; - private static final char ZERO = '0'; - - // We track the current multibulk reply in the case - // where we do not get a complete reply in a single - // decode invocation. - private MultiBulkReply reply; - - /** - * Return a byte array which contains only the content of the request. The size of the content is read from the given {@link ChannelBuffer} - * via the {@link #readInteger(ChannelBuffer)} method - * - * @param is the {@link ChannelBuffer} to read from - * @return content - * @throws CorruptedFrameException if the line-ending is not CRLF - */ - static ChannelBuffer readBytes(ChannelBuffer is) throws Exception { - int size = readInteger(is); - if (size == -1) { - return null; - } - - ChannelBuffer bytes = ChannelBuffers.buffer(size); - is.readBytes(bytes, 0, size); - bytes.writerIndex(size); - int cr = is.readByte(); - int lf = is.readByte(); - if (cr != CR || lf != LF) { - throw new CorruptedFrameException("Improper line ending: " + cr + ", " + lf); - } - return bytes; - } - - /** - * Read an {@link Integer} from the {@link ChannelBuffer} - */ - static int readInteger(ChannelBuffer in) throws Exception { - int size = 0; - int sign = 1; - int read = in.readByte(); - if (read == '-') { - read = in.readByte(); - sign = -1; - } - do { - if (read == CR) { - if (in.readByte() == LF) { - break; - } - } - int value = read - ZERO; - if (value >= 0 && value < 10) { - size *= 10; - size += value; - } else { - throw new CorruptedFrameException("Invalid character in integer"); - } - read = in.readByte(); - } while (true); - return size * sign; - } - - @Override - protected void checkpoint() { - super.checkpoint(); - } - - @Override - protected Object decode(ChannelHandlerContext channelHandlerContext, Channel channel, ChannelBuffer channelBuffer, VoidEnum anEnum) throws Exception { - if (reply != null) { - reply.read(this, channelBuffer); - Reply ret = reply; - reply = null; - return ret; - } - int code = channelBuffer.readByte(); - switch (code) { - case StatusReply.MARKER: { - ChannelBuffer status = channelBuffer.readBytes(channelBuffer.bytesBefore(ChannelBufferIndexFinder.CRLF)); - channelBuffer.skipBytes(2); - return new StatusReply(status); - } - case ErrorReply.MARKER: { - ChannelBuffer error = channelBuffer.readBytes(channelBuffer.bytesBefore(ChannelBufferIndexFinder.CRLF)); - channelBuffer.skipBytes(2); - return new ErrorReply(error); - } - case IntegerReply.MARKER: { - return new IntegerReply(readInteger(channelBuffer)); - } - case BulkReply.MARKER: { - return new BulkReply(readBytes(channelBuffer)); - } - case MultiBulkReply.MARKER: { - return decodeMultiBulkReply(channelBuffer); - } - default: { - throw new IOException("Unexpected character in stream: " + code); - } - } - } - - private MultiBulkReply decodeMultiBulkReply(ChannelBuffer is) throws Exception { - if (reply == null) { - reply = new MultiBulkReply(); - } - reply.read(this, is); - return reply; - } -} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/Reply.java b/src/main/java/org/jboss/netty/handler/codec/redis/Reply.java deleted file mode 100644 index 944ba42fff..0000000000 --- a/src/main/java/org/jboss/netty/handler/codec/redis/Reply.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.handler.codec.redis; - -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.util.CharsetUtil; - -/** - * {@link Reply} which was sent as a Response to the written {@link Command} - * * - */ -public abstract class Reply { - - /** - * Write the content of the {@link Reply} to the given {@link ChannelBuffer} - */ - abstract void write(ChannelBuffer out); - - @Override - public String toString() { - ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer(); - write(channelBuffer); - return channelBuffer.toString(CharsetUtil.UTF_8); - } -} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/StatusReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/StatusReply.java deleted file mode 100644 index 1f0cd24ee7..0000000000 --- a/src/main/java/org/jboss/netty/handler/codec/redis/StatusReply.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.handler.codec.redis; - -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; - -/** - * {@link Reply} which contains the status - * - */ -public class StatusReply extends Reply { - static final char MARKER = '+'; - private final ChannelBuffer data; - - public StatusReply(byte[] data) { - this(data == null? null : ChannelBuffers.wrappedBuffer(data)); - } - - public StatusReply(ChannelBuffer data) { - this.data = data; - } - - public ChannelBuffer data() { - return data; - } - - @Override - void write(ChannelBuffer out) { - out.writeByte(MARKER); - out.writeBytes(data, data.readerIndex(), data.readableBytes()); - out.writeBytes(Command.CRLF); - } -} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/SubscribeReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/SubscribeReply.java deleted file mode 100644 index 5e60ac8a46..0000000000 --- a/src/main/java/org/jboss/netty/handler/codec/redis/SubscribeReply.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.handler.codec.redis; - -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; - -public class SubscribeReply extends Reply { - - private final ChannelBuffer[] patterns; - - public SubscribeReply(byte[][] patterns) { - this.patterns = new ChannelBuffer[patterns.length]; - for (int i = 0; i < patterns.length; i ++) { - byte[] p = patterns[i]; - if (p == null) { - continue; - } - this.patterns[i] = ChannelBuffers.wrappedBuffer(p); - } - } - - public SubscribeReply(ChannelBuffer[] patterns) { - this.patterns = patterns; - } - - public ChannelBuffer[] patterns() { - return patterns; - } - - @Override - void write(ChannelBuffer out) { - // Do nothing - } -} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/UnsubscribeReply.java b/src/main/java/org/jboss/netty/handler/codec/redis/UnsubscribeReply.java deleted file mode 100644 index 1a6a06ad34..0000000000 --- a/src/main/java/org/jboss/netty/handler/codec/redis/UnsubscribeReply.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.handler.codec.redis; - -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; - -public class UnsubscribeReply extends Reply { - - private final ChannelBuffer[] patterns; - - public UnsubscribeReply(byte[][] patterns) { - this.patterns = new ChannelBuffer[patterns.length]; - for (int i = 0; i < patterns.length; i ++) { - byte[] p = patterns[i]; - if (p == null) { - continue; - } - this.patterns[i] = ChannelBuffers.wrappedBuffer(p); - } - } - - public UnsubscribeReply(ChannelBuffer[] patterns) { - this.patterns = patterns; - } - - public ChannelBuffer[] patterns() { - return patterns; - } - - @Override - void write(ChannelBuffer out) { - // Do nothing. - } -} diff --git a/src/main/java/org/jboss/netty/handler/codec/redis/package-info.java b/src/main/java/org/jboss/netty/handler/codec/redis/package-info.java deleted file mode 100644 index 477e00f632..0000000000 --- a/src/main/java/org/jboss/netty/handler/codec/redis/package-info.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -/** - * Encoder and decoder which transform a - * Redis protocol commands and replies - * into a {@link org.jboss.netty.buffer.ChannelBuffer} - * and vice versa. - * - */ -package org.jboss.netty.handler.codec.redis; diff --git a/src/test/java/org/jboss/netty/handler/codec/redis/RedisCodecTest.java b/src/test/java/org/jboss/netty/handler/codec/redis/RedisCodecTest.java deleted file mode 100644 index d6df50d5a7..0000000000 --- a/src/test/java/org/jboss/netty/handler/codec/redis/RedisCodecTest.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.handler.codec.redis; - -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.handler.codec.embedder.DecoderEmbedder; -import org.jboss.netty.util.CharsetUtil; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; - -import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -public class RedisCodecTest { - - private DecoderEmbedder embedder; - - @Before - public void setUp() { - embedder = new DecoderEmbedder(new RedisReplyDecoder()); - } - - @Test - public void decodeReplies() throws IOException { - { - Object receive = decode("+OK\r\n".getBytes()); - assertTrue(receive instanceof StatusReply); - assertEquals("OK", ((StatusReply) receive).data().toString(CharsetUtil.UTF_8)); - } - { - Object receive = decode("-ERROR\r\n".getBytes()); - assertTrue(receive instanceof ErrorReply); - assertEquals("ERROR", ((ErrorReply) receive).data().toString(CharsetUtil.UTF_8)); - } - { - Object receive = decode(":123\r\n".getBytes()); - assertTrue(receive instanceof IntegerReply); - assertEquals(123, ((IntegerReply) receive).value()); - } - { - Object receive = decode("$5\r\nnetty\r\n".getBytes()); - assertTrue(receive instanceof BulkReply); - assertEquals("netty", ((BulkReply) receive).data().toString(CharsetUtil.UTF_8)); - } - { - Object receive = decode("*2\r\n$5\r\nnetty\r\n$5\r\nrules\r\n".getBytes()); - assertTrue(receive instanceof MultiBulkReply); - assertEquals("netty", ((ChannelBuffer) ((MultiBulkReply) receive).values()[0]).toString(CharsetUtil.UTF_8)); - assertEquals("rules", ((ChannelBuffer) ((MultiBulkReply) receive).values()[1]).toString(CharsetUtil.UTF_8)); - } - } - - private Object decode(byte[] bytes) { - embedder.offer(wrappedBuffer(bytes)); - return embedder.poll(); - } - - @Test - public void encodeCommands() throws IOException { - String setCommand = "*3\r\n" + - "$3\r\n" + - "SET\r\n" + - "$5\r\n" + - "mykey\r\n" + - "$7\r\n" + - "myvalue\r\n"; - Command command = new Command("SET", "mykey", "myvalue"); - ChannelBuffer cb = ChannelBuffers.dynamicBuffer(); - command.write(cb); - assertEquals(setCommand, cb.toString(CharsetUtil.US_ASCII)); - } - - @Test - public void testReplayDecoding() { - { - embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n".getBytes())); - Object receive = embedder.poll(); - assertNull(receive); - embedder.offer(wrappedBuffer("$5\r\nrules\r\n".getBytes())); - receive = embedder.poll(); - assertTrue(receive instanceof MultiBulkReply); - assertEquals("netty", ((ChannelBuffer) ((MultiBulkReply) receive).values()[0]).toString(CharsetUtil.UTF_8)); - assertEquals("rules", ((ChannelBuffer) ((MultiBulkReply) receive).values()[1]).toString(CharsetUtil.UTF_8)); - } - { - embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n$5\r\nr".getBytes())); - Object receive = embedder.poll(); - assertNull(receive); - embedder.offer(wrappedBuffer("ules\r\n".getBytes())); - receive = embedder.poll(); - assertTrue(receive instanceof MultiBulkReply); - assertEquals("netty", ((ChannelBuffer) ((MultiBulkReply) receive).values()[0]).toString(CharsetUtil.UTF_8)); - assertEquals("rules", ((ChannelBuffer) ((MultiBulkReply) receive).values()[1]).toString(CharsetUtil.UTF_8)); - } - { - embedder.offer(wrappedBuffer("*2".getBytes())); - Object receive = embedder.poll(); - assertNull(receive); - embedder.offer(wrappedBuffer("\r\n$5\r\nnetty\r\n$5\r\nrules\r\n".getBytes())); - receive = embedder.poll(); - assertTrue(receive instanceof MultiBulkReply); - assertEquals("netty", ((ChannelBuffer) ((MultiBulkReply) receive).values()[0]).toString(CharsetUtil.UTF_8)); - assertEquals("rules", ((ChannelBuffer) ((MultiBulkReply) receive).values()[1]).toString(CharsetUtil.UTF_8)); - } - { - embedder.offer(wrappedBuffer("*2\r\n$5\r\nnetty\r\n$5\r\nrules\r".getBytes())); - Object receive = embedder.poll(); - assertNull(receive); - embedder.offer(wrappedBuffer("\n".getBytes())); - receive = embedder.poll(); - assertTrue(receive instanceof MultiBulkReply); - assertEquals("netty", ((ChannelBuffer) ((MultiBulkReply) receive).values()[0]).toString(CharsetUtil.UTF_8)); - assertEquals("rules", ((ChannelBuffer) ((MultiBulkReply) receive).values()[1]).toString(CharsetUtil.UTF_8)); - } - } -}