From 96455a95584dd4e3bbefefb9c208e5927a8b919f Mon Sep 17 00:00:00 2001 From: Jongyeol Choi Date: Thu, 14 Apr 2016 07:30:09 +0900 Subject: [PATCH] Implement codec-redis Motivation: - To encode/decode RESP (REdis Serialization Protocol) using Netty - http://redis.io/topics/protocol Modifications: - Add RedisEncoder, RedisDecoder - Add RedisBulkStringAggregator and RedisArrayAggregator - Add tests Result: - Added codec-redis - codec-redis can encode/decode RESP (REdis Serialization Protocol) --- .../java/io/netty/buffer/ByteBufUtil.java | 11 +- codec-redis/pom.xml | 39 +++ .../redis/AbstractStringRedisMessage.java | 39 +++ .../codec/redis/ArrayHeaderRedisMessage.java | 61 ++++ .../codec/redis/ArrayRedisMessage.java | 179 ++++++++++ .../redis/BulkStringHeaderRedisMessage.java | 49 +++ .../codec/redis/BulkStringRedisContent.java | 29 ++ .../redis/DefaultBulkStringRedisContent.java | 44 +++ .../DefaultLastBulkStringRedisContent.java | 33 ++ .../codec/redis/ErrorRedisMessage.java | 42 +++ .../codec/redis/FixedRedisMessagePool.java | 152 +++++++++ .../redis/FullBulkStringRedisMessage.java | 175 ++++++++++ .../codec/redis/IntegerRedisMessage.java | 53 +++ .../redis/LastBulkStringRedisContent.java | 82 +++++ .../codec/redis/RedisArrayAggregator.java | 91 ++++++ .../redis/RedisBulkStringAggregator.java | 105 ++++++ .../codec/redis/RedisCodecException.java | 40 +++ .../handler/codec/redis/RedisCodecUtil.java | 55 ++++ .../handler/codec/redis/RedisConstants.java | 43 +++ .../handler/codec/redis/RedisDecoder.java | 306 ++++++++++++++++++ .../handler/codec/redis/RedisEncoder.java | 197 +++++++++++ .../handler/codec/redis/RedisMessage.java | 22 ++ .../handler/codec/redis/RedisMessagePool.java | 59 ++++ .../handler/codec/redis/RedisMessageType.java | 72 +++++ .../codec/redis/SimpleStringRedisMessage.java | 42 +++ .../handler/codec/redis/package-info.java | 19 ++ .../codec/redis/RedisCodecTestUtil.java | 53 +++ .../handler/codec/redis/RedisDecoderTest.java | 257 +++++++++++++++ .../handler/codec/redis/RedisEncoderTest.java | 184 +++++++++++ pom.xml | 2 +- 30 files changed, 2532 insertions(+), 3 deletions(-) create mode 100644 codec-redis/pom.xml create mode 100644 codec-redis/src/main/java/io/netty/handler/codec/redis/AbstractStringRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/handler/codec/redis/ArrayHeaderRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/handler/codec/redis/ArrayRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/handler/codec/redis/BulkStringHeaderRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/handler/codec/redis/BulkStringRedisContent.java create mode 100644 codec-redis/src/main/java/io/netty/handler/codec/redis/DefaultBulkStringRedisContent.java create mode 100644 codec-redis/src/main/java/io/netty/handler/codec/redis/DefaultLastBulkStringRedisContent.java create mode 100644 codec-redis/src/main/java/io/netty/handler/codec/redis/ErrorRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/handler/codec/redis/FixedRedisMessagePool.java create mode 100644 codec-redis/src/main/java/io/netty/handler/codec/redis/FullBulkStringRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/handler/codec/redis/IntegerRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/handler/codec/redis/LastBulkStringRedisContent.java create mode 100644 codec-redis/src/main/java/io/netty/handler/codec/redis/RedisArrayAggregator.java create mode 100644 codec-redis/src/main/java/io/netty/handler/codec/redis/RedisBulkStringAggregator.java create mode 100644 codec-redis/src/main/java/io/netty/handler/codec/redis/RedisCodecException.java create mode 100644 codec-redis/src/main/java/io/netty/handler/codec/redis/RedisCodecUtil.java create mode 100644 codec-redis/src/main/java/io/netty/handler/codec/redis/RedisConstants.java create mode 100644 codec-redis/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java create mode 100644 codec-redis/src/main/java/io/netty/handler/codec/redis/RedisEncoder.java create mode 100644 codec-redis/src/main/java/io/netty/handler/codec/redis/RedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/handler/codec/redis/RedisMessagePool.java create mode 100644 codec-redis/src/main/java/io/netty/handler/codec/redis/RedisMessageType.java create mode 100644 codec-redis/src/main/java/io/netty/handler/codec/redis/SimpleStringRedisMessage.java create mode 100644 codec-redis/src/main/java/io/netty/handler/codec/redis/package-info.java create mode 100644 codec-redis/src/test/java/io/netty/handler/codec/redis/RedisCodecTestUtil.java create mode 100644 codec-redis/src/test/java/io/netty/handler/codec/redis/RedisDecoderTest.java create mode 100644 codec-redis/src/test/java/io/netty/handler/codec/redis/RedisEncoderTest.java diff --git a/buffer/src/main/java/io/netty/buffer/ByteBufUtil.java b/buffer/src/main/java/io/netty/buffer/ByteBufUtil.java index 3e231285a0..9588758ca2 100644 --- a/buffer/src/main/java/io/netty/buffer/ByteBufUtil.java +++ b/buffer/src/main/java/io/netty/buffer/ByteBufUtil.java @@ -365,7 +365,7 @@ public final class ByteBufUtil { */ public static ByteBuf writeUtf8(ByteBufAllocator alloc, CharSequence seq) { // UTF-8 uses max. 3 bytes per char, so calculate the worst case. - ByteBuf buf = alloc.buffer(seq.length() * MAX_BYTES_PER_CHAR_UTF8); + ByteBuf buf = alloc.buffer(utf8MaxBytes(seq)); writeUtf8(buf, seq); return buf; } @@ -378,7 +378,7 @@ public final class ByteBufUtil { */ public static int writeUtf8(ByteBuf buf, CharSequence seq) { final int len = seq.length(); - buf.ensureWritable(len * MAX_BYTES_PER_CHAR_UTF8); + buf.ensureWritable(utf8MaxBytes(seq)); for (;;) { if (buf instanceof AbstractByteBuf) { @@ -445,6 +445,13 @@ public final class ByteBufUtil { return writerIndex - oldWriterIndex; } + /** + * Returns max bytes length of UTF8 character sequence. + */ + public static int utf8MaxBytes(CharSequence seq) { + return seq.length() * MAX_BYTES_PER_CHAR_UTF8; + } + /** * Encode a {@link CharSequence} in ASCII and write * it to a {@link ByteBuf} allocated with {@code alloc}. diff --git a/codec-redis/pom.xml b/codec-redis/pom.xml new file mode 100644 index 0000000000..0b4a80faee --- /dev/null +++ b/codec-redis/pom.xml @@ -0,0 +1,39 @@ + + + + + 4.0.0 + + io.netty + netty-parent + 4.1.0.Final-SNAPSHOT + + + netty-codec-redis + jar + + Netty/Codec/Redis + + + + ${project.groupId} + netty-codec + ${project.version} + + + + diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/AbstractStringRedisMessage.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/AbstractStringRedisMessage.java new file mode 100644 index 0000000000..3088fa17ab --- /dev/null +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/AbstractStringRedisMessage.java @@ -0,0 +1,39 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.redis; + +import io.netty.util.internal.ObjectUtil; + +/** + * Abstract class for Simple Strings or Errors. + */ +public abstract class AbstractStringRedisMessage implements RedisMessage { + + private final String content; + + AbstractStringRedisMessage(String content) { + this.content = ObjectUtil.checkNotNull(content, "content"); + } + + /** + * Get string content of this {@link AbstractStringRedisMessage}. + * + * @return content of this message. + */ + public final String content() { + return content; + } +} diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/ArrayHeaderRedisMessage.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/ArrayHeaderRedisMessage.java new file mode 100644 index 0000000000..1fb893d2d4 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/ArrayHeaderRedisMessage.java @@ -0,0 +1,61 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.redis; + +import io.netty.util.internal.StringUtil; + +/** + * Header of Redis Array Message. + */ +public class ArrayHeaderRedisMessage implements RedisMessage { + + private final long length; + + /** + * Creates a {@link ArrayHeaderRedisMessage} for the given {@code length}. + */ + public ArrayHeaderRedisMessage(long length) { + if (length < RedisConstants.NULL_VALUE) { + throw new RedisCodecException("length: " + length + " (expected: >= " + RedisConstants.NULL_VALUE + ")"); + } + this.length = length; + } + + /** + * Get length of this array object. + */ + public final long length() { + return length; + } + + /** + * Returns whether the content of this message is {@code null}. + * + * @return indicates whether the content of this message is {@code null}. + */ + public boolean isNull() { + return length == RedisConstants.NULL_VALUE; + } + + @Override + public String toString() { + return new StringBuilder(StringUtil.simpleClassName(this)) + .append('[') + .append("length=") + .append(length) + .append(']').toString(); + } +} diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/ArrayRedisMessage.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/ArrayRedisMessage.java new file mode 100644 index 0000000000..c9490ab8a5 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/ArrayRedisMessage.java @@ -0,0 +1,179 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.redis; + +import io.netty.util.AbstractReferenceCounted; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.internal.ObjectUtil; +import io.netty.util.internal.StringUtil; + +import java.util.Collections; +import java.util.List; + +/** + * Arrays of RESP. + */ +public class ArrayRedisMessage extends AbstractReferenceCounted implements RedisMessage { + + private final List children; + + private ArrayRedisMessage() { + children = Collections.emptyList(); + } + + /** + * Creates a {@link ArrayRedisMessage} for the given {@code content}. + * + * @param children the children. + */ + public ArrayRedisMessage(List children) { + // do not retain here. children are already retained when created. + this.children = ObjectUtil.checkNotNull(children, "children"); + } + + /** + * Get children of this Arrays. It can be null or empty. + * + * @return list of {@link RedisMessage}s. + */ + public final List children() { + return children; + } + + /** + * Returns whether the content of this message is {@code null}. + * + * @return indicates whether the content of this message is {@code null}. + */ + public boolean isNull() { + return false; + } + + @Override + protected void deallocate() { + for (RedisMessage msg : children) { + ReferenceCountUtil.release(msg); + } + } + + @Override + public ArrayRedisMessage touch(Object hint) { + for (RedisMessage msg : children) { + ReferenceCountUtil.touch(msg); + } + return this; + } + + @Override + public String toString() { + return new StringBuilder(StringUtil.simpleClassName(this)) + .append('[') + .append("children=") + .append(children.size()) + .append(']').toString(); + } + + /** + * A predefined null array instance for {@link ArrayRedisMessage}. + */ + public static final ArrayRedisMessage NULL_INSTANCE = new ArrayRedisMessage() { + @Override + public boolean isNull() { + return true; + } + + @Override + public ArrayRedisMessage retain() { + return this; + } + + @Override + public ArrayRedisMessage retain(int increment) { + return this; + } + + @Override + public ArrayRedisMessage touch() { + return this; + } + + @Override + public ArrayRedisMessage touch(Object hint) { + return this; + } + + @Override + public boolean release() { + return false; + } + + @Override + public boolean release(int decrement) { + return false; + } + + @Override + public String toString() { + return "NullArrayRedisMessage"; + } + }; + + /** + * A predefined empty array instance for {@link ArrayRedisMessage}. + */ + public static final ArrayRedisMessage EMPTY_INSTANCE = new ArrayRedisMessage() { + @Override + public boolean isNull() { + return false; + } + + @Override + public ArrayRedisMessage retain() { + return this; + } + + @Override + public ArrayRedisMessage retain(int increment) { + return this; + } + + @Override + public ArrayRedisMessage touch() { + return this; + } + + @Override + public ArrayRedisMessage touch(Object hint) { + return this; + } + + @Override + public boolean release() { + return false; + } + + @Override + public boolean release(int decrement) { + return false; + } + + @Override + public String toString() { + return "EmptyArrayRedisMessage"; + } + }; + +} diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/BulkStringHeaderRedisMessage.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/BulkStringHeaderRedisMessage.java new file mode 100644 index 0000000000..7bc46812a6 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/BulkStringHeaderRedisMessage.java @@ -0,0 +1,49 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.redis; + +/** + * The header of Bulk Strings in RESP. + */ +public class BulkStringHeaderRedisMessage implements RedisMessage { + + private final int bulkStringLength; + + /** + * Creates a {@link BulkStringHeaderRedisMessage}. + * + * @param bulkStringLength follow content length. + */ + public BulkStringHeaderRedisMessage(int bulkStringLength) { + this.bulkStringLength = bulkStringLength; + } + + /** + * Return {@code bulkStringLength} for this content. + */ + public final int bulkStringLength() { + return bulkStringLength; + } + + /** + * Returns whether the content of this message is {@code null}. + * + * @return indicates whether the content of this message is {@code null}. + */ + public boolean isNull() { + return bulkStringLength == RedisConstants.NULL_VALUE; + } +} diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/BulkStringRedisContent.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/BulkStringRedisContent.java new file mode 100644 index 0000000000..00598b75dc --- /dev/null +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/BulkStringRedisContent.java @@ -0,0 +1,29 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.redis; + +import io.netty.buffer.ByteBufHolder; +import io.netty.channel.ChannelPipeline; + +/** + * A chunk of bulk strings which is used for Redis chunked transfer-encoding. + * {@link RedisDecoder} generates {@link BulkStringRedisContent} after + * {@link BulkStringHeaderRedisMessage} when the content is large or the encoding of the content is chunked. + * If you prefer not to receive {@link BulkStringRedisContent} in your handler, + * place {@link RedisBulkStringAggregator} after {@link RedisDecoder} in the {@link ChannelPipeline}. + */ +public interface BulkStringRedisContent extends RedisMessage, ByteBufHolder { +} diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/DefaultBulkStringRedisContent.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/DefaultBulkStringRedisContent.java new file mode 100644 index 0000000000..9328700499 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/DefaultBulkStringRedisContent.java @@ -0,0 +1,44 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.redis; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.DefaultByteBufHolder; +import io.netty.util.internal.StringUtil; + +/** + * A default implementation of {@link BulkStringRedisContent}. + */ +public class DefaultBulkStringRedisContent extends DefaultByteBufHolder implements BulkStringRedisContent { + + /** + * Creates a {@link DefaultBulkStringRedisContent} for the given {@code content}. + * + * @param content the content, can be {@code null}. + */ + public DefaultBulkStringRedisContent(ByteBuf content) { + super(content); + } + + @Override + public String toString() { + return new StringBuilder(StringUtil.simpleClassName(this)) + .append('[') + .append("content=") + .append(content()) + .append(']').toString(); + } +} diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/DefaultLastBulkStringRedisContent.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/DefaultLastBulkStringRedisContent.java new file mode 100644 index 0000000000..f171ccd32c --- /dev/null +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/DefaultLastBulkStringRedisContent.java @@ -0,0 +1,33 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.redis; + +import io.netty.buffer.ByteBuf; + +/** + * A default implementation for {@link LastBulkStringRedisContent}. + */ +public final class DefaultLastBulkStringRedisContent extends DefaultBulkStringRedisContent + implements LastBulkStringRedisContent { + + /** + * Creates a {@link DefaultLastBulkStringRedisContent} for the given {@code content}. + * @param content the content, can be {@code null}. + */ + public DefaultLastBulkStringRedisContent(ByteBuf content) { + super(content); + } +} diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/ErrorRedisMessage.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/ErrorRedisMessage.java new file mode 100644 index 0000000000..fd3f4b161b --- /dev/null +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/ErrorRedisMessage.java @@ -0,0 +1,42 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.redis; + +import io.netty.util.internal.StringUtil; + +/** + * Errors of RESP. + */ +public final class ErrorRedisMessage extends AbstractStringRedisMessage { + + /** + * Creates a {@link ErrorRedisMessage} from {@code content}. + * + * @param content the message content, must not be {@code null}. + */ + public ErrorRedisMessage(String content) { + super(content); + } + + @Override + public String toString() { + return new StringBuilder(StringUtil.simpleClassName(this)) + .append('[') + .append("content=") + .append(content()) + .append(']').toString(); + } +} diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/FixedRedisMessagePool.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/FixedRedisMessagePool.java new file mode 100644 index 0000000000..4b910715cb --- /dev/null +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/FixedRedisMessagePool.java @@ -0,0 +1,152 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.redis; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.CharsetUtil; +import io.netty.util.collection.LongObjectHashMap; +import io.netty.util.collection.LongObjectMap; + +import java.util.HashMap; +import java.util.Map; + +/** + * A default fixed redis message pool. + */ +public final class FixedRedisMessagePool implements RedisMessagePool { + + private static final String[] DEFAULT_SIMPLE_STRINGS = { + "OK", + "PONG", + "QUEUED", + }; + + private static final String[] DEFAULT_ERRORS = { + "ERR", + "ERR index out of range", + "ERR no such key", + "ERR source and destination objects are the same", + "ERR syntax error", + "BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.", + "BUSYKEY Target key name already exists.", + "EXECABORT Transaction discarded because of previous errors.", + "LOADING Redis is loading the dataset in memory", + "MASTERDOWN Link with MASTER is down and slave-serve-stale-data is set to 'no'.", + "MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. " + + "Commands that may modify the data set are disabled. Please check Redis logs for details " + + "about the error.", + "NOAUTH Authentication required.", + "NOREPLICAS Not enough good slaves to write.", + "NOSCRIPT No matching script. Please use EVAL.", + "OOM command not allowed when used memory > 'maxmemory'.", + "READONLY You can't write against a read only slave.", + "WRONGTYPE Operation against a key holding the wrong kind of value", + }; + + private static final long MIN_CACHED_INTEGER_NUMBER = RedisConstants.NULL_VALUE; // inclusive + private static final long MAX_CACHED_INTEGER_NUMBER = 128; // exclusive + + // cached integer size cannot larger than `int` range because of Collection. + private static final int SIZE_CACHED_INTEGER_NUMBER = (int) (MAX_CACHED_INTEGER_NUMBER - MIN_CACHED_INTEGER_NUMBER); + + /** + * A shared object for {@link FixedRedisMessagePool}. + */ + public static final FixedRedisMessagePool INSTANCE = new FixedRedisMessagePool(); + + // internal caches. + private Map byteBufToSimpleStrings; + private Map stringToSimpleStrings; + private Map byteBufToErrors; + private Map stringToErrors; + private Map byteBufToIntegers; + private LongObjectMap longToIntegers; + private LongObjectMap longToByteBufs; + + /** + * Creates a {@link FixedRedisMessagePool} instance. + */ + private FixedRedisMessagePool() { + byteBufToSimpleStrings = new HashMap(DEFAULT_SIMPLE_STRINGS.length, 1.0f); + stringToSimpleStrings = new HashMap(DEFAULT_SIMPLE_STRINGS.length, 1.0f); + for (String message : DEFAULT_SIMPLE_STRINGS) { + ByteBuf key = Unpooled.unmodifiableBuffer( + Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer(message.getBytes(CharsetUtil.UTF_8)))); + SimpleStringRedisMessage cached = new SimpleStringRedisMessage(message); + byteBufToSimpleStrings.put(key, cached); + stringToSimpleStrings.put(message, cached); + } + + byteBufToErrors = new HashMap(DEFAULT_ERRORS.length, 1.0f); + stringToErrors = new HashMap(DEFAULT_ERRORS.length, 1.0f); + for (String message : DEFAULT_ERRORS) { + ByteBuf key = Unpooled.unmodifiableBuffer( + Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer(message.getBytes(CharsetUtil.UTF_8)))); + ErrorRedisMessage cached = new ErrorRedisMessage(message); + byteBufToErrors.put(key, cached); + stringToErrors.put(message, cached); + } + + byteBufToIntegers = new HashMap(SIZE_CACHED_INTEGER_NUMBER, 1.0f); + longToIntegers = new LongObjectHashMap(SIZE_CACHED_INTEGER_NUMBER, 1.0f); + longToByteBufs = new LongObjectHashMap(SIZE_CACHED_INTEGER_NUMBER, 1.0f); + for (long value = MIN_CACHED_INTEGER_NUMBER; value < MAX_CACHED_INTEGER_NUMBER; value++) { + byte[] keyBytes = RedisCodecUtil.longToAsciiBytes(value); + ByteBuf keyByteBuf = Unpooled.unmodifiableBuffer(Unpooled.unreleasableBuffer( + Unpooled.wrappedBuffer(keyBytes))); + IntegerRedisMessage cached = new IntegerRedisMessage(value); + byteBufToIntegers.put(keyByteBuf, cached); + longToIntegers.put(value, cached); + longToByteBufs.put(value, keyBytes); + } + } + + @Override + public SimpleStringRedisMessage getSimpleString(String content) { + return stringToSimpleStrings.get(content); + } + + @Override + public SimpleStringRedisMessage getSimpleString(ByteBuf content) { + return byteBufToSimpleStrings.get(content); + } + + @Override + public ErrorRedisMessage getError(String content) { + return stringToErrors.get(content); + } + + @Override + public ErrorRedisMessage getError(ByteBuf content) { + return byteBufToErrors.get(content); + } + + @Override + public IntegerRedisMessage getInteger(long value) { + return longToIntegers.get(value); + } + + @Override + public IntegerRedisMessage getInteger(ByteBuf content) { + return byteBufToIntegers.get(content); + } + + @Override + public byte[] getByteBufOfInteger(long value) { + return longToByteBufs.get(value); + } +} diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/FullBulkStringRedisMessage.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/FullBulkStringRedisMessage.java new file mode 100644 index 0000000000..54213139c8 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/FullBulkStringRedisMessage.java @@ -0,0 +1,175 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.redis; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.DefaultByteBufHolder; +import io.netty.buffer.Unpooled; +import io.netty.util.internal.StringUtil; + +/** + * An aggregated bulk string of RESP. + */ +public class FullBulkStringRedisMessage extends DefaultByteBufHolder implements LastBulkStringRedisContent { + + private FullBulkStringRedisMessage() { + this(Unpooled.EMPTY_BUFFER); + } + + /** + * Creates a {@link FullBulkStringRedisMessage} for the given {@code content}. + * + * @param content the content, must not be {@code null}. If content is null or empty, + * use {@link FullBulkStringRedisMessage#NULL_INSTANCE} or {@link FullBulkStringRedisMessage#EMPTY_INSTANCE} + * instead of constructor. + */ + public FullBulkStringRedisMessage(ByteBuf content) { + super(content); + } + + /** + * Returns whether the content of this message is {@code null}. + * + * @return indicates whether the content of this message is {@code null}. + */ + public boolean isNull() { + return false; + } + + @Override + public String toString() { + return new StringBuilder(StringUtil.simpleClassName(this)) + .append('[') + .append("content=") + .append(content()) + .append(']').toString(); + } + + /** + * A predefined null instance of {@link FullBulkStringRedisMessage}. + */ + public static final FullBulkStringRedisMessage NULL_INSTANCE = new FullBulkStringRedisMessage() { + @Override + public boolean isNull() { + return true; + } + + @Override + public ByteBuf content() { + return Unpooled.EMPTY_BUFFER; + } + + @Override + public FullBulkStringRedisMessage copy() { + return this; + } + + @Override + public FullBulkStringRedisMessage duplicate() { + return this; + } + + @Override + public int refCnt() { + return 1; + } + + @Override + public FullBulkStringRedisMessage retain() { + return this; + } + + @Override + public FullBulkStringRedisMessage retain(int increment) { + return this; + } + + @Override + public FullBulkStringRedisMessage touch() { + return this; + } + + @Override + public FullBulkStringRedisMessage touch(Object hint) { + return this; + } + + @Override + public boolean release() { + return false; + } + + @Override + public boolean release(int decrement) { + return false; + } + }; + + /** + * A predefined empty instance of {@link FullBulkStringRedisMessage}. + */ + public static final FullBulkStringRedisMessage EMPTY_INSTANCE = new FullBulkStringRedisMessage() { + @Override + public ByteBuf content() { + return Unpooled.EMPTY_BUFFER; + } + + @Override + public FullBulkStringRedisMessage copy() { + return EMPTY_INSTANCE; + } + + @Override + public FullBulkStringRedisMessage duplicate() { + return this; + } + + @Override + public int refCnt() { + return 1; + } + + @Override + public FullBulkStringRedisMessage retain() { + return this; + } + + @Override + public FullBulkStringRedisMessage retain(int increment) { + return this; + } + + @Override + public FullBulkStringRedisMessage touch() { + return this; + } + + @Override + public FullBulkStringRedisMessage touch(Object hint) { + return this; + } + + @Override + public boolean release() { + return false; + } + + @Override + public boolean release(int decrement) { + return false; + } + }; +} diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/IntegerRedisMessage.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/IntegerRedisMessage.java new file mode 100644 index 0000000000..abac2a910b --- /dev/null +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/IntegerRedisMessage.java @@ -0,0 +1,53 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.redis; + +import io.netty.util.internal.StringUtil; + +/** + * Integers of RESP. + */ +public final class IntegerRedisMessage implements RedisMessage { + + private final long value; + + /** + * Creates a {@link IntegerRedisMessage} for the given {@code content}. + * + * @param value the message content. + */ + public IntegerRedisMessage(long value) { + this.value = value; + } + + /** + * Get long value of this {@link IntegerRedisMessage}. + * + * @return long value + */ + public long value() { + return value; + } + + @Override + public String toString() { + return new StringBuilder(StringUtil.simpleClassName(this)) + .append('[') + .append("value=") + .append(value) + .append(']').toString(); + } +} diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/LastBulkStringRedisContent.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/LastBulkStringRedisContent.java new file mode 100644 index 0000000000..e8fc0d9be8 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/LastBulkStringRedisContent.java @@ -0,0 +1,82 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.redis; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufHolder; +import io.netty.buffer.Unpooled; + +/** + * A last chunk of Bulk Strings. + */ +public interface LastBulkStringRedisContent extends BulkStringRedisContent { + + /** + * The 'end of content' marker in chunked encoding. + */ + LastBulkStringRedisContent EMPTY_LAST_CONTENT = new LastBulkStringRedisContent() { + + @Override + public ByteBuf content() { + return Unpooled.EMPTY_BUFFER; + } + + @Override + public ByteBufHolder copy() { + return this; + } + + @Override + public ByteBufHolder retain(int increment) { + return this; + } + + @Override + public ByteBufHolder retain() { + return this; + } + + @Override + public int refCnt() { + return 1; + } + + @Override + public ByteBufHolder touch() { + return this; + } + + @Override + public ByteBufHolder touch(Object hint) { + return this; + } + + @Override + public boolean release() { + return false; + } + + @Override + public boolean release(int decrement) { + return false; + } + + @Override + public ByteBufHolder duplicate() { + return this; + } + }; +} diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisArrayAggregator.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisArrayAggregator.java new file mode 100644 index 0000000000..b8444fd3d0 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisArrayAggregator.java @@ -0,0 +1,91 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.redis; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.CodecException; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.util.ReferenceCountUtil; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.List; + +/** + * Aggregates {@link RedisMessage} parts into {@link ArrayRedisMessage}. This decoder + * should be used together with {@link RedisDecoder}. + */ +public final class RedisArrayAggregator extends MessageToMessageDecoder { + + private final Deque depths = new ArrayDeque(4); + + @Override + protected void decode(ChannelHandlerContext ctx, RedisMessage msg, List out) throws Exception { + if (msg instanceof ArrayHeaderRedisMessage) { + msg = decodeRedisArrayHeader((ArrayHeaderRedisMessage) msg); + if (msg == null) { + return; + } + } else { + ReferenceCountUtil.retain(msg); + } + + while (!depths.isEmpty()) { + AggregateState current = depths.peek(); + current.children.add(msg); + + // if current aggregation completed, go to parent aggregation. + if (current.children.size() == current.length) { + msg = new ArrayRedisMessage(current.children); + depths.pop(); + } else { + // not aggregated yet. try next time. + return; + } + } + + out.add(msg); + } + + private RedisMessage decodeRedisArrayHeader(ArrayHeaderRedisMessage header) { + if (header.isNull()) { + return ArrayRedisMessage.NULL_INSTANCE; + } else if (header.length() == 0L) { + return ArrayRedisMessage.EMPTY_INSTANCE; + } else if (header.length() > 0L) { + // Currently, this codec doesn't support `long` length for arrays because Java's List.size() is int. + if (header.length() > Integer.MAX_VALUE) { + throw new CodecException("this codec doesn't support longer length than " + Integer.MAX_VALUE); + } + + // start aggregating array + depths.push(new AggregateState((int) header.length())); + return null; + } else { + throw new CodecException("bad length: " + header.length()); + } + } + + private static final class AggregateState { + private final int length; + private final List children; + AggregateState(int length) { + this.length = length; + this.children = new ArrayList(length); + } + } +} diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisBulkStringAggregator.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisBulkStringAggregator.java new file mode 100644 index 0000000000..02ec0a4f9b --- /dev/null +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisBulkStringAggregator.java @@ -0,0 +1,105 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.redis; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.MessageAggregator; + +/** + * A {@link ChannelHandler} that aggregates an {@link BulkStringHeaderRedisMessage} + * and its following {@link BulkStringRedisContent}s into a single {@link FullBulkStringRedisMessage} + * with no following {@link BulkStringRedisContent}s. It is useful when you don't want to take + * care of {@link RedisMessage}s whose transfer encoding is 'chunked'. Insert this + * handler after {@link RedisDecoder} in the {@link ChannelPipeline}: + *
+ * {@link ChannelPipeline} p = ...;
+ * ...
+ * p.addLast("encoder", new {@link RedisEncoder}());
+ * p.addLast("decoder", new {@link RedisDecoder}());
+ * p.addLast("aggregator", new {@link RedisBulkStringAggregator}());
+ * ...
+ * p.addLast("handler", new HttpRequestHandler());
+ * 
+ * Be aware that you need to have the {@link RedisEncoder} before the {@link RedisBulkStringAggregator} + * in the {@link ChannelPipeline}. + */ +public final class RedisBulkStringAggregator extends MessageAggregator { + + /** + * Creates a new instance. + */ + public RedisBulkStringAggregator() { + super(RedisConstants.REDIS_MESSAGE_MAX_LENGTH); + } + + @Override + protected boolean isStartMessage(RedisMessage msg) throws Exception { + return msg instanceof BulkStringHeaderRedisMessage && !isAggregated(msg); + } + + @Override + protected boolean isContentMessage(RedisMessage msg) throws Exception { + return msg instanceof BulkStringRedisContent; + } + + @Override + protected boolean isLastContentMessage(BulkStringRedisContent msg) throws Exception { + return msg instanceof LastBulkStringRedisContent; + } + + @Override + protected boolean isAggregated(RedisMessage msg) throws Exception { + return msg instanceof FullBulkStringRedisMessage; + } + + @Override + protected boolean isContentLengthInvalid(BulkStringHeaderRedisMessage start, int maxContentLength) + throws Exception { + return start.bulkStringLength() > maxContentLength; + } + + @Override + protected Object newContinueResponse(BulkStringHeaderRedisMessage start, int maxContentLength, + ChannelPipeline pipeline) throws Exception { + return null; + } + + @Override + protected boolean closeAfterContinueResponse(Object msg) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean ignoreContentAfterContinueResponse(Object msg) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected FullBulkStringRedisMessage beginAggregation(BulkStringHeaderRedisMessage start, ByteBuf content) + throws Exception { + switch (start.bulkStringLength()) { + case RedisConstants.NULL_VALUE: + return FullBulkStringRedisMessage.NULL_INSTANCE; + case 0: + return FullBulkStringRedisMessage.EMPTY_INSTANCE; + default: + return new FullBulkStringRedisMessage(content); + } + } +} diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisCodecException.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisCodecException.java new file mode 100644 index 0000000000..203745aff5 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisCodecException.java @@ -0,0 +1,40 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.redis; + +import io.netty.handler.codec.CodecException; + +/** + * An {@link Exception} which is thrown by {@link RedisEncoder} or {@link RedisDecoder}. + */ +public final class RedisCodecException extends CodecException { + + private static final long serialVersionUID = 5570454251549268063L; + + /** + * Creates a new instance. + */ + public RedisCodecException(String message) { + super(message); + } + + /** + * Creates a new instance. + */ + public RedisCodecException(Throwable cause) { + super(cause); + } +} diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisCodecUtil.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisCodecUtil.java new file mode 100644 index 0000000000..1675903c42 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisCodecUtil.java @@ -0,0 +1,55 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.redis; + +import io.netty.util.CharsetUtil; +import io.netty.util.internal.PlatformDependent; + +/** + * Utilities for codec-redis. + */ +final class RedisCodecUtil { + + private RedisCodecUtil() { + } + + static byte[] longToAsciiBytes(long value) { + return Long.toString(value).getBytes(CharsetUtil.US_ASCII); + } + + /** + * Returns a {@code short} value using endian order. + */ + static short makeShort(char first, char second) { + return PlatformDependent.BIG_ENDIAN_NATIVE_ORDER ? + (short) ((second << 8) | first) : (short) ((first << 8) | second); + } + + /** + * Returns a {@code byte[]} of {@code short} value. This is opposite of {@code makeShort()}. + */ + static byte[] shortToBytes(short value) { + byte[] bytes = new byte[2]; + if (PlatformDependent.BIG_ENDIAN_NATIVE_ORDER) { + bytes[1] = (byte) ((value >> 8) & 0xff); + bytes[0] = (byte) (value & 0xff); + } else { + bytes[0] = (byte) ((value >> 8) & 0xff); + bytes[1] = (byte) (value & 0xff); + } + return bytes; + } +} diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisConstants.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisConstants.java new file mode 100644 index 0000000000..9fae6f1232 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisConstants.java @@ -0,0 +1,43 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.redis; + +/** + * Constant values for Redis encoder/decoder. + */ +final class RedisConstants { + + private RedisConstants() { + } + + static final int TYPE_LENGTH = 1; + + static final int EOL_LENGTH = 2; + + static final int NULL_LENGTH = 2; + + static final int NULL_VALUE = -1; + + static final int REDIS_MESSAGE_MAX_LENGTH = 512 * 1024 * 1024; // 512MB + + static final int POSITIVE_LONG_MAX_LENGTH = 19; // length of Long.MAX_VALUE + + static final int LONG_MAX_LENGTH = POSITIVE_LONG_MAX_LENGTH + 1; // +1 is sign + + static final short NULL_SHORT = RedisCodecUtil.makeShort('-', '1'); + + static final short EOL_SHORT = RedisCodecUtil.makeShort('\r', '\n'); +} diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java new file mode 100644 index 0000000000..fe3f6c1ec8 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java @@ -0,0 +1,306 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.redis; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.util.ByteProcessor; +import io.netty.util.CharsetUtil; + +import java.util.List; + +/** + * Decodes the Redis protocol into {@link RedisMessage} objects following + * RESP (REdis Serialization Protocol). + * + * {@link RedisMessage} parts can be aggregated to {@link RedisMessage} using + * {@link RedisArrayAggregator} or processed directly. + */ +public final class RedisDecoder extends ByteToMessageDecoder { + + private final ToPositiveLongProcessor toPositiveLongProcessor = new ToPositiveLongProcessor(); + + private final int maxInlineMessageLength; + private final RedisMessagePool messagePool; + + // current decoding states + private State state = State.DECODE_TYPE; + private RedisMessageType type; + private int remainingBulkLength; + + private enum State { + DECODE_TYPE, + DECODE_INLINE, // SIMPLE_STRING, ERROR, INTEGER + DECODE_LENGTH, // BULK_STRING, ARRAY_HEADER + DECODE_BULK_STRING_EOL, + DECODE_BULK_STRING_CONTENT, + } + + /** + * Creates a new instance with default {@code maxInlineMessageLength} and {@code messageaPool}. + */ + public RedisDecoder() { + // 1024 * 64 is max inline length of current Redis server implementation. + this(1024 * 64, FixedRedisMessagePool.INSTANCE); + } + + /** + * Creates a new instance. + * @param maxInlineMessageLength the maximum length of inline message. + * @param messagePool the predefined message pool. + */ + public RedisDecoder(int maxInlineMessageLength, RedisMessagePool messagePool) { + if (maxInlineMessageLength <= 0 || maxInlineMessageLength > RedisConstants.REDIS_MESSAGE_MAX_LENGTH) { + throw new RedisCodecException("maxInlineMessageLength: " + maxInlineMessageLength + + " (expected: <= " + RedisConstants.REDIS_MESSAGE_MAX_LENGTH + ")"); + } + this.maxInlineMessageLength = maxInlineMessageLength; + this.messagePool = messagePool; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + try { + for (;;) { + switch (state) { + case DECODE_TYPE: + if (!decodeType(in)) { + return; + } + break; + case DECODE_INLINE: + if (!decodeInline(in, out)) { + return; + } + break; + case DECODE_LENGTH: + if (!decodeLength(in, out)) { + return; + } + break; + case DECODE_BULK_STRING_EOL: + if (!decodeBulkStringEndOfLine(in, out)) { + return; + } + break; + case DECODE_BULK_STRING_CONTENT: + if (!decodeBulkStringContent(in, out)) { + return; + } + break; + default: + throw new RedisCodecException("Unknown state: " + state); + } + } + } catch (RedisCodecException e) { + resetDecoder(); + throw e; + } catch (Exception e) { + resetDecoder(); + throw new RedisCodecException(e); + } + } + + private void resetDecoder() { + state = State.DECODE_TYPE; + remainingBulkLength = 0; + } + + private boolean decodeType(ByteBuf in) throws Exception { + if (!in.isReadable()) { + return false; + } + type = RedisMessageType.valueOf(in.readByte()); + state = type.isInline() ? State.DECODE_INLINE : State.DECODE_LENGTH; + return true; + } + + private boolean decodeInline(ByteBuf in, List out) throws Exception { + ByteBuf lineBytes = readLine(in); + if (lineBytes == null) { + if (in.readableBytes() > maxInlineMessageLength) { + throw new RedisCodecException("length: " + in.readableBytes() + + " (expected: <= " + maxInlineMessageLength + ")"); + } + return false; + } + out.add(newInlineRedisMessage(type, lineBytes)); + resetDecoder(); + return true; + } + + private boolean decodeLength(ByteBuf in, List out) throws Exception { + ByteBuf lineByteBuf = readLine(in); + if (lineByteBuf == null) { + return false; + } + final long length = parseRedisNumber(lineByteBuf); + if (length < RedisConstants.NULL_VALUE) { + throw new RedisCodecException("length: " + length + " (expected: >= " + RedisConstants.NULL_VALUE + ")"); + } + switch (type) { + case ARRAY_HEADER: + out.add(new ArrayHeaderRedisMessage(length)); + resetDecoder(); + return true; + case BULK_STRING: + if (length > RedisConstants.REDIS_MESSAGE_MAX_LENGTH) { + throw new RedisCodecException("length: " + length + " (expected: <= " + + RedisConstants.REDIS_MESSAGE_MAX_LENGTH + ")"); + } + remainingBulkLength = (int) length; // range(int) is already checked. + out.add(new BulkStringHeaderRedisMessage(remainingBulkLength)); + return decodeBulkString(remainingBulkLength, in, out); + default: + throw new RedisCodecException("bad type: " + type); + } + } + + private boolean decodeBulkString(int length, ByteBuf in, List out) throws Exception { + switch (length) { + case RedisConstants.NULL_VALUE: // $-1\r\n + out.add(FullBulkStringRedisMessage.NULL_INSTANCE); + resetDecoder(); + return true; + case 0: + state = State.DECODE_BULK_STRING_EOL; + return decodeBulkStringEndOfLine(in, out); + default: // expectedBulkLength is always positive. + state = State.DECODE_BULK_STRING_CONTENT; + return decodeBulkStringContent(in, out); + } + } + + // $0\r\n \r\n + private boolean decodeBulkStringEndOfLine(ByteBuf in, List out) throws Exception { + if (in.readableBytes() < RedisConstants.EOL_LENGTH) { + return false; + } + readEndOfLine(in); + out.add(FullBulkStringRedisMessage.EMPTY_INSTANCE); + resetDecoder(); + return true; + } + + // ${expectedBulkLength}\r\n {data...}\r\n + private boolean decodeBulkStringContent(ByteBuf in, List out) throws Exception { + final int readableBytes = in.readableBytes(); + if (readableBytes == 0) { + return false; + } + + // if this is last frame. + if (readableBytes >= remainingBulkLength + RedisConstants.EOL_LENGTH) { + ByteBuf content = in.readSlice(remainingBulkLength).retain(); + readEndOfLine(in); + out.add(new DefaultLastBulkStringRedisContent(content)); + resetDecoder(); + return true; + } + + // chunked write. + int toRead = Math.min(remainingBulkLength, readableBytes); + remainingBulkLength -= toRead; + out.add(new DefaultBulkStringRedisContent(in.readSlice(toRead).retain())); + return true; + } + + private static void readEndOfLine(final ByteBuf in) { + final short delim = in.readShort(); + if (RedisConstants.EOL_SHORT == delim) { + return; + } + final byte[] bytes = RedisCodecUtil.shortToBytes(delim); + throw new RedisCodecException("delimiter: [" + bytes[0] + "," + bytes[1] + "] (expected: \\r\\n)"); + } + + private RedisMessage newInlineRedisMessage(RedisMessageType messageType, ByteBuf content) { + switch (messageType) { + case SIMPLE_STRING: { + SimpleStringRedisMessage cached = messagePool.getSimpleString(content); + return cached != null ? cached : new SimpleStringRedisMessage(content.toString(CharsetUtil.UTF_8)); + } + case ERROR: { + ErrorRedisMessage cached = messagePool.getError(content); + return cached != null ? cached : new ErrorRedisMessage(content.toString(CharsetUtil.UTF_8)); + } + case INTEGER: { + IntegerRedisMessage cached = messagePool.getInteger(content); + return cached != null ? cached : new IntegerRedisMessage(parseRedisNumber(content)); + } + default: + throw new RedisCodecException("bad type: " + messageType); + } + } + + private static ByteBuf readLine(ByteBuf in) { + if (!in.isReadable(RedisConstants.EOL_LENGTH)) { + return null; + } + final int lfIndex = in.forEachByte(ByteProcessor.FIND_LF); + if (lfIndex < 0) { + return null; + } + ByteBuf data = in.readSlice(lfIndex - in.readerIndex() - 1); // `-1` is for CR + readEndOfLine(in); // validate CR LF + return data; + } + + private long parseRedisNumber(ByteBuf byteBuf) { + final int readableBytes = byteBuf.readableBytes(); + final boolean negative = readableBytes > 0 && byteBuf.getByte(byteBuf.readerIndex()) == '-'; + final int extraOneByteForNegative = negative ? 1 : 0; + if (readableBytes <= extraOneByteForNegative) { + throw new RedisCodecException("no number to parse: " + byteBuf.toString(CharsetUtil.US_ASCII)); + } + if (readableBytes > RedisConstants.POSITIVE_LONG_MAX_LENGTH + extraOneByteForNegative) { + throw new RedisCodecException("too many characters to be a valid RESP Integer: " + + byteBuf.toString(CharsetUtil.US_ASCII)); + } + if (negative) { + return -parsePositiveNumber(byteBuf.skipBytes(extraOneByteForNegative)); + } + return parsePositiveNumber(byteBuf); + } + + private long parsePositiveNumber(ByteBuf byteBuf) { + toPositiveLongProcessor.reset(); + byteBuf.forEachByte(toPositiveLongProcessor); + return toPositiveLongProcessor.content(); + } + + private static final class ToPositiveLongProcessor implements ByteProcessor { + private long result; + + @Override + public boolean process(byte value) throws Exception { + if (value < '0' || value > '9') { + throw new RedisCodecException("bad byte in number: " + value); + } + result = result * 10 + (value - '0'); + return true; + } + + public long content() { + return result; + } + + public void reset() { + result = 0; + } + } +} diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisEncoder.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisEncoder.java new file mode 100644 index 0000000000..fa1915a771 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisEncoder.java @@ -0,0 +1,197 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.redis; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.CodecException; +import io.netty.handler.codec.MessageToMessageEncoder; +import io.netty.util.internal.ObjectUtil; + +import java.util.List; + +/** + * Encodes {@link RedisMessage} into bytes following + * RESP (REdis Serialization Protocol). + */ +public class RedisEncoder extends MessageToMessageEncoder { + + private final RedisMessagePool messagePool; + + /** + * Creates a new instance with default {@code messagePool}. + */ + public RedisEncoder() { + this(FixedRedisMessagePool.INSTANCE); + } + + /** + * Creates a new instance. + * @param messagePool the predefined message pool. + */ + public RedisEncoder(RedisMessagePool messagePool) { + this.messagePool = ObjectUtil.checkNotNull(messagePool, "messagePool"); + } + + @Override + protected void encode(ChannelHandlerContext ctx, RedisMessage msg, List out) throws Exception { + try { + writeRedisMessage(ctx.alloc(), msg, out); + } catch (CodecException e) { + throw e; + } catch (Exception e) { + throw new CodecException(e); + } + } + + private void writeRedisMessage(ByteBufAllocator allocator, RedisMessage msg, List out) { + if (msg instanceof SimpleStringRedisMessage) { + writeSimpleStringMessage(allocator, (SimpleStringRedisMessage) msg, out); + } else if (msg instanceof ErrorRedisMessage) { + writeErrorMessage(allocator, (ErrorRedisMessage) msg, out); + } else if (msg instanceof IntegerRedisMessage) { + writeIntegerMessage(allocator, (IntegerRedisMessage) msg, out); + } else if (msg instanceof FullBulkStringRedisMessage) { + writeFullBulkStringMessage(allocator, (FullBulkStringRedisMessage) msg, out); + } else if (msg instanceof BulkStringRedisContent) { + writeBulkStringContent(allocator, (BulkStringRedisContent) msg, out); + } else if (msg instanceof BulkStringHeaderRedisMessage) { + writeBulkStringHeader(allocator, (BulkStringHeaderRedisMessage) msg, out); + } else if (msg instanceof ArrayHeaderRedisMessage) { + writeArrayHeader(allocator, (ArrayHeaderRedisMessage) msg, out); + } else if (msg instanceof ArrayRedisMessage) { + writeArrayMessage(allocator, (ArrayRedisMessage) msg, out); + } else { + throw new CodecException("unknown message type: " + msg); + } + } + + private static void writeSimpleStringMessage(ByteBufAllocator allocator, SimpleStringRedisMessage msg, + List out) { + writeString(allocator, RedisMessageType.SIMPLE_STRING.value(), msg.content(), out); + } + + private static void writeErrorMessage(ByteBufAllocator allocator, ErrorRedisMessage msg, List out) { + writeString(allocator, RedisMessageType.ERROR.value(), msg.content(), out); + } + + private static void writeString(ByteBufAllocator allocator, byte type, String content, List out) { + ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + ByteBufUtil.utf8MaxBytes(content) + + RedisConstants.EOL_LENGTH); + buf.writeByte(type); + ByteBufUtil.writeUtf8(buf, content); + buf.writeShort(RedisConstants.EOL_SHORT); + out.add(buf); + } + + private void writeIntegerMessage(ByteBufAllocator allocator, IntegerRedisMessage msg, List out) { + ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.LONG_MAX_LENGTH + + RedisConstants.EOL_LENGTH); + buf.writeByte(RedisMessageType.INTEGER.value()); + buf.writeBytes(numberToBytes(msg.value())); + buf.writeShort(RedisConstants.EOL_SHORT); + out.add(buf); + } + + private void writeBulkStringHeader(ByteBufAllocator allocator, BulkStringHeaderRedisMessage msg, List out) { + final ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + + (msg.isNull() ? RedisConstants.NULL_LENGTH : + RedisConstants.LONG_MAX_LENGTH + RedisConstants.EOL_LENGTH)); + buf.writeByte(RedisMessageType.BULK_STRING.value()); + if (msg.isNull()) { + buf.writeShort(RedisConstants.NULL_SHORT); + } else { + buf.writeBytes(numberToBytes(msg.bulkStringLength())); + buf.writeShort(RedisConstants.EOL_SHORT); + } + out.add(buf); + } + + private static void writeBulkStringContent(ByteBufAllocator allocator, BulkStringRedisContent msg, + List out) { + out.add(msg.content().retain()); + if (msg instanceof LastBulkStringRedisContent) { + out.add(allocator.ioBuffer(RedisConstants.EOL_LENGTH).writeShort(RedisConstants.EOL_SHORT)); + } + } + + private void writeFullBulkStringMessage(ByteBufAllocator allocator, FullBulkStringRedisMessage msg, + List out) { + if (msg.isNull()) { + ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.NULL_LENGTH + + RedisConstants.EOL_LENGTH); + buf.writeByte(RedisMessageType.BULK_STRING.value()); + buf.writeShort(RedisConstants.NULL_SHORT); + buf.writeShort(RedisConstants.EOL_SHORT); + out.add(buf); + } else { + ByteBuf headerBuf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.LONG_MAX_LENGTH + + RedisConstants.EOL_LENGTH); + headerBuf.writeByte(RedisMessageType.BULK_STRING.value()); + headerBuf.writeBytes(numberToBytes(msg.content().readableBytes())); + headerBuf.writeShort(RedisConstants.EOL_SHORT); + out.add(headerBuf); + out.add(msg.content().retain()); + out.add(allocator.ioBuffer(RedisConstants.EOL_LENGTH).writeShort(RedisConstants.EOL_SHORT)); + } + } + + /** + * Write array header only without body. Use this if you want to write arrays as streaming. + */ + private void writeArrayHeader(ByteBufAllocator allocator, ArrayHeaderRedisMessage msg, List out) { + writeArrayHeader(allocator, msg.isNull(), msg.length(), out); + } + + /** + * Write full constructed array message. + */ + private void writeArrayMessage(ByteBufAllocator allocator, ArrayRedisMessage msg, List out) { + if (msg.isNull()) { + writeArrayHeader(allocator, msg.isNull(), RedisConstants.NULL_VALUE, out); + } else { + writeArrayHeader(allocator, msg.isNull(), msg.children().size(), out); + for (RedisMessage child : msg.children()) { + writeRedisMessage(allocator, child, out); + } + } + } + + private void writeArrayHeader(ByteBufAllocator allocator, boolean isNull, long length, List out) { + if (isNull) { + final ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.NULL_LENGTH + + RedisConstants.EOL_LENGTH); + buf.writeByte(RedisMessageType.ARRAY_HEADER.value()); + buf.writeShort(RedisConstants.NULL_SHORT); + buf.writeShort(RedisConstants.EOL_SHORT); + out.add(buf); + } else { + final ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.LONG_MAX_LENGTH + + RedisConstants.EOL_LENGTH); + buf.writeByte(RedisMessageType.ARRAY_HEADER.value()); + buf.writeBytes(numberToBytes(length)); + buf.writeShort(RedisConstants.EOL_SHORT); + out.add(buf); + } + } + + private byte[] numberToBytes(long value) { + byte[] bytes = messagePool.getByteBufOfInteger(value); + return bytes != null ? bytes : RedisCodecUtil.longToAsciiBytes(value); + } +} diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisMessage.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisMessage.java new file mode 100644 index 0000000000..57c0a2c812 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisMessage.java @@ -0,0 +1,22 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.redis; + +/** + * RedisMessage is base interface for codec-redis. + */ +public interface RedisMessage { +} diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisMessagePool.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisMessagePool.java new file mode 100644 index 0000000000..653fd6796e --- /dev/null +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisMessagePool.java @@ -0,0 +1,59 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.redis; + +import io.netty.buffer.ByteBuf; + +/** + * A strategy interface for caching {@link RedisMessage}s. + */ +public interface RedisMessagePool { + + /** + * Returns {@link SimpleStringRedisMessage} for given {@code content}. Returns {@code null} it does not exist. + */ + SimpleStringRedisMessage getSimpleString(String content); + + /** + * Returns {@link SimpleStringRedisMessage} for given {@code content}. Returns {@code null} it does not exist. + */ + SimpleStringRedisMessage getSimpleString(ByteBuf content); + + /** + * Returns {@link ErrorRedisMessage} for given {@code content}. Returns {@code null} it does not exist. + */ + ErrorRedisMessage getError(String content); + + /** + * Returns {@link ErrorRedisMessage} for given {@code content}. Returns {@code null} it does not exist. + */ + ErrorRedisMessage getError(ByteBuf content); + + /** + * Returns {@link IntegerRedisMessage} for given {@code value}. Returns {@code null} it does not exist. + */ + IntegerRedisMessage getInteger(long value); + + /** + * Returns {@link IntegerRedisMessage} for given {@code content}. Returns {@code null} it does not exist. + */ + IntegerRedisMessage getInteger(ByteBuf content); + + /** + * Returns {@code byte[]} for given {@code msg}. Returns {@code null} it does not exist. + */ + byte[] getByteBufOfInteger(long value); +} diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisMessageType.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisMessageType.java new file mode 100644 index 0000000000..2096fb5d57 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisMessageType.java @@ -0,0 +1,72 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.redis; + +/** + * Type of RESP (REdis Serialization Protocol). + */ +public enum RedisMessageType { + + SIMPLE_STRING((byte) '+', true), + ERROR((byte) '-', true), + INTEGER((byte) ':', true), + BULK_STRING((byte) '$', false), + ARRAY_HEADER((byte) '*', false), + ARRAY((byte) '*', false); // for aggregated + + private final byte value; + private final boolean inline; + + RedisMessageType(byte value, boolean inline) { + this.value = value; + this.inline = inline; + } + + /** + * Returns prefix {@code byte} for this type. + */ + public byte value() { + return value; + } + + /** + * Returns {@code true} if this type is inline type, or returns {@code false}. If this is {@code true}, + * this type doesn't have length field. + */ + public boolean isInline() { + return inline; + } + + /** + * Return {@link RedisMessageType} for this type prefix {@code byte}. + */ + public static RedisMessageType valueOf(byte value) { + switch (value) { + case '+': + return SIMPLE_STRING; + case '-': + return ERROR; + case ':': + return INTEGER; + case '$': + return BULK_STRING; + case '*': + return ARRAY_HEADER; + default: + throw new RedisCodecException("Unknown RedisMessageType: " + value); + } + } +} diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/SimpleStringRedisMessage.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/SimpleStringRedisMessage.java new file mode 100644 index 0000000000..7e9378b833 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/SimpleStringRedisMessage.java @@ -0,0 +1,42 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.redis; + +import io.netty.util.internal.StringUtil; + +/** + * Simple Strings of RESP. + */ +public final class SimpleStringRedisMessage extends AbstractStringRedisMessage { + + /** + * Creates a {@link SimpleStringRedisMessage} for the given {@code content}. + * + * @param content the message content, must not be {@code null}. + */ + public SimpleStringRedisMessage(String content) { + super(content); + } + + @Override + public String toString() { + return new StringBuilder(StringUtil.simpleClassName(this)) + .append('[') + .append("content=") + .append(content()) + .append(']').toString(); + } +} diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/package-info.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/package-info.java new file mode 100644 index 0000000000..57a9e2a61a --- /dev/null +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/package-info.java @@ -0,0 +1,19 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +/** + * Encoder, decoder for Redis. + */ +package io.netty.handler.codec.redis; diff --git a/codec-redis/src/test/java/io/netty/handler/codec/redis/RedisCodecTestUtil.java b/codec-redis/src/test/java/io/netty/handler/codec/redis/RedisCodecTestUtil.java new file mode 100644 index 0000000000..72e3beca05 --- /dev/null +++ b/codec-redis/src/test/java/io/netty/handler/codec/redis/RedisCodecTestUtil.java @@ -0,0 +1,53 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.redis; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.CharsetUtil; + +final class RedisCodecTestUtil { + + private RedisCodecTestUtil() { + } + + static byte[] bytesOf(long value) { + return bytesOf(Long.toString(value)); + } + + static byte[] bytesOf(String s) { + return s.getBytes(CharsetUtil.UTF_8); + } + + static byte[] bytesOf(ByteBuf buf) { + byte[] data = new byte[buf.readableBytes()]; + buf.readBytes(data); + return data; + } + + static String stringOf(ByteBuf buf) { + return new String(bytesOf(buf)); + } + + static ByteBuf byteBufOf(String s) { + return byteBufOf(bytesOf(s)); + } + + static ByteBuf byteBufOf(byte[] data) { + return Unpooled.wrappedBuffer(data); + } +} diff --git a/codec-redis/src/test/java/io/netty/handler/codec/redis/RedisDecoderTest.java b/codec-redis/src/test/java/io/netty/handler/codec/redis/RedisDecoderTest.java new file mode 100644 index 0000000000..7eafd8215f --- /dev/null +++ b/codec-redis/src/test/java/io/netty/handler/codec/redis/RedisDecoderTest.java @@ -0,0 +1,257 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.redis; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.util.IllegalReferenceCountException; +import io.netty.util.ReferenceCountUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +import static io.netty.handler.codec.redis.RedisCodecTestUtil.*; +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; + +/** + * Verifies the correct functionality of the {@link RedisDecoder} and {@link RedisArrayAggregator}. + */ +public class RedisDecoderTest { + + private EmbeddedChannel channel; + + @Before + public void setup() throws Exception { + channel = new EmbeddedChannel( + new RedisDecoder(), + new RedisBulkStringAggregator(), + new RedisArrayAggregator()); + } + + @After + public void teardown() throws Exception { + assertFalse(channel.finish()); + } + + @Test + public void shouldDecodeSimpleString() { + assertFalse(channel.writeInbound(byteBufOf("+"))); + assertFalse(channel.writeInbound(byteBufOf("O"))); + assertFalse(channel.writeInbound(byteBufOf("K"))); + assertTrue(channel.writeInbound(byteBufOf("\r\n"))); + + SimpleStringRedisMessage msg = channel.readInbound(); + + assertThat(msg.content(), is("OK")); + + ReferenceCountUtil.release(msg); + } + + @Test + public void shouldDecodeTwoSimpleStrings() { + assertFalse(channel.writeInbound(byteBufOf("+"))); + assertFalse(channel.writeInbound(byteBufOf("O"))); + assertFalse(channel.writeInbound(byteBufOf("K"))); + assertTrue(channel.writeInbound(byteBufOf("\r\n+SEC"))); + assertTrue(channel.writeInbound(byteBufOf("OND\r\n"))); + + SimpleStringRedisMessage msg1 = channel.readInbound(); + assertThat(msg1.content(), is("OK")); + ReferenceCountUtil.release(msg1); + + SimpleStringRedisMessage msg2 = channel.readInbound(); + assertThat(msg2.content(), is("SECOND")); + ReferenceCountUtil.release(msg2); + } + + @Test + public void shouldDecodeError() { + String content = "ERROR sample message"; + assertFalse(channel.writeInbound(byteBufOf("-"))); + assertFalse(channel.writeInbound(byteBufOf(content))); + assertFalse(channel.writeInbound(byteBufOf("\r"))); + assertTrue(channel.writeInbound(byteBufOf("\n"))); + + ErrorRedisMessage msg = channel.readInbound(); + + assertThat(msg.content(), is(content)); + + ReferenceCountUtil.release(msg); + } + + @Test + public void shouldDecodeInteger() { + long value = 1234L; + byte[] content = bytesOf(value); + assertFalse(channel.writeInbound(byteBufOf(":"))); + assertFalse(channel.writeInbound(byteBufOf(content))); + assertTrue(channel.writeInbound(byteBufOf("\r\n"))); + + IntegerRedisMessage msg = channel.readInbound(); + + assertThat(msg.value(), is(value)); + + ReferenceCountUtil.release(msg); + } + + @Test + public void shouldDecodeBulkString() { + String buf1 = "bulk\nst"; + String buf2 = "ring\ntest\n1234"; + byte[] content = bytesOf(buf1 + buf2); + assertFalse(channel.writeInbound(byteBufOf("$"))); + assertFalse(channel.writeInbound(byteBufOf(Integer.toString(content.length)))); + assertFalse(channel.writeInbound(byteBufOf("\r\n"))); + assertFalse(channel.writeInbound(byteBufOf(buf1))); + assertFalse(channel.writeInbound(byteBufOf(buf2))); + assertTrue(channel.writeInbound(byteBufOf("\r\n"))); + + FullBulkStringRedisMessage msg = channel.readInbound(); + + assertThat(bytesOf(msg.content()), is(content)); + + ReferenceCountUtil.release(msg); + } + + @Test + public void shouldDecodeEmptyBulkString() { + byte[] content = bytesOf(""); + assertFalse(channel.writeInbound(byteBufOf("$"))); + assertFalse(channel.writeInbound(byteBufOf(Integer.toString(content.length)))); + assertFalse(channel.writeInbound(byteBufOf("\r\n"))); + assertFalse(channel.writeInbound(byteBufOf(content))); + assertTrue(channel.writeInbound(byteBufOf("\r\n"))); + + FullBulkStringRedisMessage msg = channel.readInbound(); + + assertThat(bytesOf(msg.content()), is(content)); + + ReferenceCountUtil.release(msg); + } + + @Test + public void shouldDecodeNullBulkString() { + assertFalse(channel.writeInbound(byteBufOf("$"))); + assertFalse(channel.writeInbound(byteBufOf(Integer.toString(-1)))); + assertTrue(channel.writeInbound(byteBufOf("\r\n"))); + + FullBulkStringRedisMessage msg = channel.readInbound(); + + assertThat(msg.isNull(), is(true)); + + ReferenceCountUtil.release(msg); + } + + @Test + public void shouldDecodeSimpleArray() throws Exception { + assertFalse(channel.writeInbound(byteBufOf("*3\r\n"))); + assertFalse(channel.writeInbound(byteBufOf(":1234\r\n"))); + assertFalse(channel.writeInbound(byteBufOf("+sim"))); + assertFalse(channel.writeInbound(byteBufOf("ple\r\n-err"))); + assertTrue(channel.writeInbound(byteBufOf("or\r\n"))); + + ArrayRedisMessage msg = channel.readInbound(); + List children = msg.children(); + + assertThat(msg.children().size(), is(equalTo(3))); + + assertThat(children.get(0), instanceOf(IntegerRedisMessage.class)); + assertThat(((IntegerRedisMessage) children.get(0)).value(), is(1234L)); + assertThat(children.get(1), instanceOf(SimpleStringRedisMessage.class)); + assertThat(((SimpleStringRedisMessage) children.get(1)).content(), is("simple")); + assertThat(children.get(2), instanceOf(ErrorRedisMessage.class)); + assertThat(((ErrorRedisMessage) children.get(2)).content(), is("error")); + + ReferenceCountUtil.release(msg); + } + + @Test + public void shouldDecodeNestedArray() throws Exception { + ByteBuf buf = Unpooled.buffer(); + buf.writeBytes(byteBufOf("*2\r\n")); + buf.writeBytes(byteBufOf("*3\r\n:1\r\n:2\r\n:3\r\n")); + buf.writeBytes(byteBufOf("*2\r\n+Foo\r\n-Bar\r\n")); + assertTrue(channel.writeInbound(buf)); + + ArrayRedisMessage msg = channel.readInbound(); + List children = msg.children(); + + assertThat(msg.children().size(), is(2)); + + ArrayRedisMessage intArray = (ArrayRedisMessage) children.get(0); + ArrayRedisMessage strArray = (ArrayRedisMessage) children.get(1); + + assertThat(intArray.children().size(), is(3)); + assertThat(((IntegerRedisMessage) intArray.children().get(0)).value(), is(1L)); + assertThat(((IntegerRedisMessage) intArray.children().get(1)).value(), is(2L)); + assertThat(((IntegerRedisMessage) intArray.children().get(2)).value(), is(3L)); + + assertThat(strArray.children().size(), is(2)); + assertThat(((SimpleStringRedisMessage) strArray.children().get(0)).content(), is("Foo")); + assertThat(((ErrorRedisMessage) strArray.children().get(1)).content(), is("Bar")); + + ReferenceCountUtil.release(msg); + } + + @Test(expected = IllegalReferenceCountException.class) + public void shouldErrorOnDoubleReleaseArrayReferenceCounted() throws Exception { + ByteBuf buf = Unpooled.buffer(); + buf.writeBytes(byteBufOf("*2\r\n")); + buf.writeBytes(byteBufOf("*3\r\n:1\r\n:2\r\n:3\r\n")); + buf.writeBytes(byteBufOf("*2\r\n+Foo\r\n-Bar\r\n")); + assertTrue(channel.writeInbound(buf)); + + ArrayRedisMessage msg = channel.readInbound(); + + ReferenceCountUtil.release(msg); + ReferenceCountUtil.release(msg); + } + + @Test(expected = IllegalReferenceCountException.class) + public void shouldErrorOnReleaseArrayChildReferenceCounted() throws Exception { + ByteBuf buf = Unpooled.buffer(); + buf.writeBytes(byteBufOf("*2\r\n")); + buf.writeBytes(byteBufOf("*3\r\n:1\r\n:2\r\n:3\r\n")); + buf.writeBytes(byteBufOf("$3\r\nFoo\r\n")); + assertTrue(channel.writeInbound(buf)); + + ArrayRedisMessage msg = channel.readInbound(); + + List children = msg.children(); + ReferenceCountUtil.release(msg); + ReferenceCountUtil.release(children.get(1)); + } + + @Test(expected = IllegalReferenceCountException.class) + public void shouldErrorOnReleasecontentOfArrayChildReferenceCounted() throws Exception { + ByteBuf buf = Unpooled.buffer(); + buf.writeBytes(byteBufOf("*2\r\n")); + buf.writeBytes(byteBufOf("$3\r\nFoo\r\n$3\r\nBar\r\n")); + assertTrue(channel.writeInbound(buf)); + + ArrayRedisMessage msg = channel.readInbound(); + + List children = msg.children(); + ByteBuf childBuf = ((FullBulkStringRedisMessage) children.get(0)).content(); + ReferenceCountUtil.release(msg); + ReferenceCountUtil.release(childBuf); + } +} diff --git a/codec-redis/src/test/java/io/netty/handler/codec/redis/RedisEncoderTest.java b/codec-redis/src/test/java/io/netty/handler/codec/redis/RedisEncoderTest.java new file mode 100644 index 0000000000..31bbc9688d --- /dev/null +++ b/codec-redis/src/test/java/io/netty/handler/codec/redis/RedisEncoderTest.java @@ -0,0 +1,184 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.redis; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; + +import static io.netty.handler.codec.redis.RedisCodecTestUtil.*; + +/** + * Verifies the correct functionality of the {@link RedisEncoder}. + */ +public class RedisEncoderTest { + + private EmbeddedChannel channel; + + @Before + public void setup() throws Exception { + channel = new EmbeddedChannel(new RedisEncoder()); + } + + @After + public void teardown() throws Exception { + assertFalse(channel.finish()); + } + + @Test + public void shouldEncodeSimpleString() { + RedisMessage msg = new SimpleStringRedisMessage("simple"); + + boolean result = channel.writeOutbound(msg); + assertThat(result, is(true)); + + ByteBuf written = readAll(channel); + assertThat(bytesOf(written), is(bytesOf("+simple\r\n"))); + written.release(); + } + + @Test + public void shouldEncodeError() { + RedisMessage msg = new ErrorRedisMessage("error1"); + + boolean result = channel.writeOutbound(msg); + assertThat(result, is(true)); + + ByteBuf written = readAll(channel); + assertThat(bytesOf(written), is(equalTo(bytesOf("-error1\r\n")))); + written.release(); + } + + @Test + public void shouldEncodeInteger() { + RedisMessage msg = new IntegerRedisMessage(1234L); + + boolean result = channel.writeOutbound(msg); + assertThat(result, is(true)); + + ByteBuf written = readAll(channel); + assertThat(bytesOf(written), is(equalTo(bytesOf(":1234\r\n")))); + written.release(); + } + + @Test + public void shouldEncodeBulkStringContent() { + RedisMessage header = new BulkStringHeaderRedisMessage(16); + RedisMessage body1 = new DefaultBulkStringRedisContent(byteBufOf("bulk\nstr").retain()); + RedisMessage body2 = new DefaultLastBulkStringRedisContent(byteBufOf("ing\ntest").retain()); + + assertThat(channel.writeOutbound(header), is(true)); + assertThat(channel.writeOutbound(body1), is(true)); + assertThat(channel.writeOutbound(body2), is(true)); + + ByteBuf written = readAll(channel); + assertThat(bytesOf(written), is(equalTo(bytesOf("$16\r\nbulk\nstring\ntest\r\n")))); + written.release(); + } + + @Test + public void shouldEncodeFullBulkString() { + ByteBuf bulkString = byteBufOf("bulk\nstring\ntest").retain(); + int length = bulkString.readableBytes(); + RedisMessage msg = new FullBulkStringRedisMessage(bulkString); + + boolean result = channel.writeOutbound(msg); + assertThat(result, is(true)); + + ByteBuf written = readAll(channel); + assertThat(bytesOf(written), is(equalTo(bytesOf("$" + length + "\r\nbulk\nstring\ntest\r\n")))); + written.release(); + } + + @Test + public void shouldEncodeSimpleArray() { + List children = new ArrayList(); + children.add(new FullBulkStringRedisMessage(byteBufOf("foo").retain())); + children.add(new FullBulkStringRedisMessage(byteBufOf("bar").retain())); + RedisMessage msg = new ArrayRedisMessage(children); + + boolean result = channel.writeOutbound(msg); + assertThat(result, is(true)); + + ByteBuf written = readAll(channel); + assertThat(bytesOf(written), is(equalTo(bytesOf("*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n")))); + written.release(); + } + + @Test + public void shouldEncodeNullArray() { + RedisMessage msg = ArrayRedisMessage.NULL_INSTANCE; + + boolean result = channel.writeOutbound(msg); + assertThat(result, is(true)); + + ByteBuf written = readAll(channel); + assertThat(bytesOf(written), is(equalTo(bytesOf("*-1\r\n")))); + written.release(); + } + + @Test + public void shouldEncodeEmptyArray() { + RedisMessage msg = ArrayRedisMessage.EMPTY_INSTANCE; + + boolean result = channel.writeOutbound(msg); + assertThat(result, is(true)); + + ByteBuf written = readAll(channel); + assertThat(bytesOf(written), is(equalTo(bytesOf("*0\r\n")))); + written.release(); + } + + @Test + public void shouldEncodeNestedArray() { + List grandChildren = new ArrayList(); + grandChildren.add(new FullBulkStringRedisMessage(byteBufOf("bar"))); + grandChildren.add(new IntegerRedisMessage(-1234L)); + List children = new ArrayList(); + children.add(new SimpleStringRedisMessage("foo")); + children.add(new ArrayRedisMessage(grandChildren)); + RedisMessage msg = new ArrayRedisMessage(children); + + boolean result = channel.writeOutbound(msg); + assertThat(result, is(true)); + + ByteBuf written = readAll(channel); + assertThat(bytesOf(written), is(equalTo(bytesOf("*2\r\n+foo\r\n*2\r\n$3\r\nbar\r\n:-1234\r\n")))); + written.release(); + } + + private static ByteBuf readAll(EmbeddedChannel channel) { + ByteBuf buf = Unpooled.buffer(); + ByteBuf read; + while ((read = channel.readOutbound()) != null) { + buf.writeBytes(read); + } + return buf; + } +} diff --git a/pom.xml b/pom.xml index 9d422bbeac..a70ef9fa21 100644 --- a/pom.xml +++ b/pom.xml @@ -239,7 +239,7 @@ codec-http2 codec-memcache codec-mqtt - codec-smtp + codec-redis codec-socks codec-stomp codec-xml