Implement codec-redis

Motivation:

- To encode/decode RESP (REdis Serialization Protocol) using Netty
- http://redis.io/topics/protocol

Modifications:

- Add RedisEncoder, RedisDecoder
- Add RedisBulkStringAggregator and RedisArrayAggregator
- Add tests

Result:

- Added codec-redis
- codec-redis can encode/decode RESP (REdis Serialization Protocol)
This commit is contained in:
Jongyeol Choi 2016-04-14 07:30:09 +09:00 committed by Norman Maurer
parent 6108b7297b
commit 96455a9558
30 changed files with 2532 additions and 3 deletions

View File

@ -365,7 +365,7 @@ public final class ByteBufUtil {
*/
public static ByteBuf writeUtf8(ByteBufAllocator alloc, CharSequence seq) {
// UTF-8 uses max. 3 bytes per char, so calculate the worst case.
ByteBuf buf = alloc.buffer(seq.length() * MAX_BYTES_PER_CHAR_UTF8);
ByteBuf buf = alloc.buffer(utf8MaxBytes(seq));
writeUtf8(buf, seq);
return buf;
}
@ -378,7 +378,7 @@ public final class ByteBufUtil {
*/
public static int writeUtf8(ByteBuf buf, CharSequence seq) {
final int len = seq.length();
buf.ensureWritable(len * MAX_BYTES_PER_CHAR_UTF8);
buf.ensureWritable(utf8MaxBytes(seq));
for (;;) {
if (buf instanceof AbstractByteBuf) {
@ -445,6 +445,13 @@ public final class ByteBufUtil {
return writerIndex - oldWriterIndex;
}
/**
* Returns max bytes length of UTF8 character sequence.
*/
public static int utf8MaxBytes(CharSequence seq) {
return seq.length() * MAX_BYTES_PER_CHAR_UTF8;
}
/**
* Encode a {@link CharSequence} in <a href="http://en.wikipedia.org/wiki/ASCII">ASCII</a> and write
* it to a {@link ByteBuf} allocated with {@code alloc}.

39
codec-redis/pom.xml Normal file
View File

@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2016 The Netty Project
~
~ The Netty Project licenses this file to you under the Apache License,
~ version 2.0 (the "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at:
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
~ License for the specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.1.0.Final-SNAPSHOT</version>
</parent>
<artifactId>netty-codec-redis</artifactId>
<packaging>jar</packaging>
<name>Netty/Codec/Redis</name>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-codec</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,39 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.redis;
import io.netty.util.internal.ObjectUtil;
/**
* Abstract class for Simple Strings or Errors.
*/
public abstract class AbstractStringRedisMessage implements RedisMessage {
private final String content;
AbstractStringRedisMessage(String content) {
this.content = ObjectUtil.checkNotNull(content, "content");
}
/**
* Get string content of this {@link AbstractStringRedisMessage}.
*
* @return content of this message.
*/
public final String content() {
return content;
}
}

View File

@ -0,0 +1,61 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.redis;
import io.netty.util.internal.StringUtil;
/**
* Header of Redis Array Message.
*/
public class ArrayHeaderRedisMessage implements RedisMessage {
private final long length;
/**
* Creates a {@link ArrayHeaderRedisMessage} for the given {@code length}.
*/
public ArrayHeaderRedisMessage(long length) {
if (length < RedisConstants.NULL_VALUE) {
throw new RedisCodecException("length: " + length + " (expected: >= " + RedisConstants.NULL_VALUE + ")");
}
this.length = length;
}
/**
* Get length of this array object.
*/
public final long length() {
return length;
}
/**
* Returns whether the content of this message is {@code null}.
*
* @return indicates whether the content of this message is {@code null}.
*/
public boolean isNull() {
return length == RedisConstants.NULL_VALUE;
}
@Override
public String toString() {
return new StringBuilder(StringUtil.simpleClassName(this))
.append('[')
.append("length=")
.append(length)
.append(']').toString();
}
}

View File

@ -0,0 +1,179 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.redis;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.StringUtil;
import java.util.Collections;
import java.util.List;
/**
* Arrays of <a href="http://redis.io/topics/protocol">RESP</a>.
*/
public class ArrayRedisMessage extends AbstractReferenceCounted implements RedisMessage {
private final List<RedisMessage> children;
private ArrayRedisMessage() {
children = Collections.emptyList();
}
/**
* Creates a {@link ArrayRedisMessage} for the given {@code content}.
*
* @param children the children.
*/
public ArrayRedisMessage(List<RedisMessage> children) {
// do not retain here. children are already retained when created.
this.children = ObjectUtil.checkNotNull(children, "children");
}
/**
* Get children of this Arrays. It can be null or empty.
*
* @return list of {@link RedisMessage}s.
*/
public final List<RedisMessage> children() {
return children;
}
/**
* Returns whether the content of this message is {@code null}.
*
* @return indicates whether the content of this message is {@code null}.
*/
public boolean isNull() {
return false;
}
@Override
protected void deallocate() {
for (RedisMessage msg : children) {
ReferenceCountUtil.release(msg);
}
}
@Override
public ArrayRedisMessage touch(Object hint) {
for (RedisMessage msg : children) {
ReferenceCountUtil.touch(msg);
}
return this;
}
@Override
public String toString() {
return new StringBuilder(StringUtil.simpleClassName(this))
.append('[')
.append("children=")
.append(children.size())
.append(']').toString();
}
/**
* A predefined null array instance for {@link ArrayRedisMessage}.
*/
public static final ArrayRedisMessage NULL_INSTANCE = new ArrayRedisMessage() {
@Override
public boolean isNull() {
return true;
}
@Override
public ArrayRedisMessage retain() {
return this;
}
@Override
public ArrayRedisMessage retain(int increment) {
return this;
}
@Override
public ArrayRedisMessage touch() {
return this;
}
@Override
public ArrayRedisMessage touch(Object hint) {
return this;
}
@Override
public boolean release() {
return false;
}
@Override
public boolean release(int decrement) {
return false;
}
@Override
public String toString() {
return "NullArrayRedisMessage";
}
};
/**
* A predefined empty array instance for {@link ArrayRedisMessage}.
*/
public static final ArrayRedisMessage EMPTY_INSTANCE = new ArrayRedisMessage() {
@Override
public boolean isNull() {
return false;
}
@Override
public ArrayRedisMessage retain() {
return this;
}
@Override
public ArrayRedisMessage retain(int increment) {
return this;
}
@Override
public ArrayRedisMessage touch() {
return this;
}
@Override
public ArrayRedisMessage touch(Object hint) {
return this;
}
@Override
public boolean release() {
return false;
}
@Override
public boolean release(int decrement) {
return false;
}
@Override
public String toString() {
return "EmptyArrayRedisMessage";
}
};
}

View File

@ -0,0 +1,49 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.redis;
/**
* The header of Bulk Strings in <a href="http://redis.io/topics/protocol">RESP</a>.
*/
public class BulkStringHeaderRedisMessage implements RedisMessage {
private final int bulkStringLength;
/**
* Creates a {@link BulkStringHeaderRedisMessage}.
*
* @param bulkStringLength follow content length.
*/
public BulkStringHeaderRedisMessage(int bulkStringLength) {
this.bulkStringLength = bulkStringLength;
}
/**
* Return {@code bulkStringLength} for this content.
*/
public final int bulkStringLength() {
return bulkStringLength;
}
/**
* Returns whether the content of this message is {@code null}.
*
* @return indicates whether the content of this message is {@code null}.
*/
public boolean isNull() {
return bulkStringLength == RedisConstants.NULL_VALUE;
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.ChannelPipeline;
/**
* A chunk of bulk strings which is used for Redis chunked transfer-encoding.
* {@link RedisDecoder} generates {@link BulkStringRedisContent} after
* {@link BulkStringHeaderRedisMessage} when the content is large or the encoding of the content is chunked.
* If you prefer not to receive {@link BulkStringRedisContent} in your handler,
* place {@link RedisBulkStringAggregator} after {@link RedisDecoder} in the {@link ChannelPipeline}.
*/
public interface BulkStringRedisContent extends RedisMessage, ByteBufHolder {
}

View File

@ -0,0 +1,44 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DefaultByteBufHolder;
import io.netty.util.internal.StringUtil;
/**
* A default implementation of {@link BulkStringRedisContent}.
*/
public class DefaultBulkStringRedisContent extends DefaultByteBufHolder implements BulkStringRedisContent {
/**
* Creates a {@link DefaultBulkStringRedisContent} for the given {@code content}.
*
* @param content the content, can be {@code null}.
*/
public DefaultBulkStringRedisContent(ByteBuf content) {
super(content);
}
@Override
public String toString() {
return new StringBuilder(StringUtil.simpleClassName(this))
.append('[')
.append("content=")
.append(content())
.append(']').toString();
}
}

View File

@ -0,0 +1,33 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ByteBuf;
/**
* A default implementation for {@link LastBulkStringRedisContent}.
*/
public final class DefaultLastBulkStringRedisContent extends DefaultBulkStringRedisContent
implements LastBulkStringRedisContent {
/**
* Creates a {@link DefaultLastBulkStringRedisContent} for the given {@code content}.
* @param content the content, can be {@code null}.
*/
public DefaultLastBulkStringRedisContent(ByteBuf content) {
super(content);
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.redis;
import io.netty.util.internal.StringUtil;
/**
* Errors of <a href="http://redis.io/topics/protocol">RESP</a>.
*/
public final class ErrorRedisMessage extends AbstractStringRedisMessage {
/**
* Creates a {@link ErrorRedisMessage} from {@code content}.
*
* @param content the message content, must not be {@code null}.
*/
public ErrorRedisMessage(String content) {
super(content);
}
@Override
public String toString() {
return new StringBuilder(StringUtil.simpleClassName(this))
.append('[')
.append("content=")
.append(content())
.append(']').toString();
}
}

View File

@ -0,0 +1,152 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
import io.netty.util.collection.LongObjectHashMap;
import io.netty.util.collection.LongObjectMap;
import java.util.HashMap;
import java.util.Map;
/**
* A default fixed redis message pool.
*/
public final class FixedRedisMessagePool implements RedisMessagePool {
private static final String[] DEFAULT_SIMPLE_STRINGS = {
"OK",
"PONG",
"QUEUED",
};
private static final String[] DEFAULT_ERRORS = {
"ERR",
"ERR index out of range",
"ERR no such key",
"ERR source and destination objects are the same",
"ERR syntax error",
"BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.",
"BUSYKEY Target key name already exists.",
"EXECABORT Transaction discarded because of previous errors.",
"LOADING Redis is loading the dataset in memory",
"MASTERDOWN Link with MASTER is down and slave-serve-stale-data is set to 'no'.",
"MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. " +
"Commands that may modify the data set are disabled. Please check Redis logs for details " +
"about the error.",
"NOAUTH Authentication required.",
"NOREPLICAS Not enough good slaves to write.",
"NOSCRIPT No matching script. Please use EVAL.",
"OOM command not allowed when used memory > 'maxmemory'.",
"READONLY You can't write against a read only slave.",
"WRONGTYPE Operation against a key holding the wrong kind of value",
};
private static final long MIN_CACHED_INTEGER_NUMBER = RedisConstants.NULL_VALUE; // inclusive
private static final long MAX_CACHED_INTEGER_NUMBER = 128; // exclusive
// cached integer size cannot larger than `int` range because of Collection.
private static final int SIZE_CACHED_INTEGER_NUMBER = (int) (MAX_CACHED_INTEGER_NUMBER - MIN_CACHED_INTEGER_NUMBER);
/**
* A shared object for {@link FixedRedisMessagePool}.
*/
public static final FixedRedisMessagePool INSTANCE = new FixedRedisMessagePool();
// internal caches.
private Map<ByteBuf, SimpleStringRedisMessage> byteBufToSimpleStrings;
private Map<String, SimpleStringRedisMessage> stringToSimpleStrings;
private Map<ByteBuf, ErrorRedisMessage> byteBufToErrors;
private Map<String, ErrorRedisMessage> stringToErrors;
private Map<ByteBuf, IntegerRedisMessage> byteBufToIntegers;
private LongObjectMap<IntegerRedisMessage> longToIntegers;
private LongObjectMap<byte[]> longToByteBufs;
/**
* Creates a {@link FixedRedisMessagePool} instance.
*/
private FixedRedisMessagePool() {
byteBufToSimpleStrings = new HashMap<ByteBuf, SimpleStringRedisMessage>(DEFAULT_SIMPLE_STRINGS.length, 1.0f);
stringToSimpleStrings = new HashMap<String, SimpleStringRedisMessage>(DEFAULT_SIMPLE_STRINGS.length, 1.0f);
for (String message : DEFAULT_SIMPLE_STRINGS) {
ByteBuf key = Unpooled.unmodifiableBuffer(
Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer(message.getBytes(CharsetUtil.UTF_8))));
SimpleStringRedisMessage cached = new SimpleStringRedisMessage(message);
byteBufToSimpleStrings.put(key, cached);
stringToSimpleStrings.put(message, cached);
}
byteBufToErrors = new HashMap<ByteBuf, ErrorRedisMessage>(DEFAULT_ERRORS.length, 1.0f);
stringToErrors = new HashMap<String, ErrorRedisMessage>(DEFAULT_ERRORS.length, 1.0f);
for (String message : DEFAULT_ERRORS) {
ByteBuf key = Unpooled.unmodifiableBuffer(
Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer(message.getBytes(CharsetUtil.UTF_8))));
ErrorRedisMessage cached = new ErrorRedisMessage(message);
byteBufToErrors.put(key, cached);
stringToErrors.put(message, cached);
}
byteBufToIntegers = new HashMap<ByteBuf, IntegerRedisMessage>(SIZE_CACHED_INTEGER_NUMBER, 1.0f);
longToIntegers = new LongObjectHashMap<IntegerRedisMessage>(SIZE_CACHED_INTEGER_NUMBER, 1.0f);
longToByteBufs = new LongObjectHashMap<byte[]>(SIZE_CACHED_INTEGER_NUMBER, 1.0f);
for (long value = MIN_CACHED_INTEGER_NUMBER; value < MAX_CACHED_INTEGER_NUMBER; value++) {
byte[] keyBytes = RedisCodecUtil.longToAsciiBytes(value);
ByteBuf keyByteBuf = Unpooled.unmodifiableBuffer(Unpooled.unreleasableBuffer(
Unpooled.wrappedBuffer(keyBytes)));
IntegerRedisMessage cached = new IntegerRedisMessage(value);
byteBufToIntegers.put(keyByteBuf, cached);
longToIntegers.put(value, cached);
longToByteBufs.put(value, keyBytes);
}
}
@Override
public SimpleStringRedisMessage getSimpleString(String content) {
return stringToSimpleStrings.get(content);
}
@Override
public SimpleStringRedisMessage getSimpleString(ByteBuf content) {
return byteBufToSimpleStrings.get(content);
}
@Override
public ErrorRedisMessage getError(String content) {
return stringToErrors.get(content);
}
@Override
public ErrorRedisMessage getError(ByteBuf content) {
return byteBufToErrors.get(content);
}
@Override
public IntegerRedisMessage getInteger(long value) {
return longToIntegers.get(value);
}
@Override
public IntegerRedisMessage getInteger(ByteBuf content) {
return byteBufToIntegers.get(content);
}
@Override
public byte[] getByteBufOfInteger(long value) {
return longToByteBufs.get(value);
}
}

View File

@ -0,0 +1,175 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DefaultByteBufHolder;
import io.netty.buffer.Unpooled;
import io.netty.util.internal.StringUtil;
/**
* An aggregated bulk string of <a href="http://redis.io/topics/protocol">RESP</a>.
*/
public class FullBulkStringRedisMessage extends DefaultByteBufHolder implements LastBulkStringRedisContent {
private FullBulkStringRedisMessage() {
this(Unpooled.EMPTY_BUFFER);
}
/**
* Creates a {@link FullBulkStringRedisMessage} for the given {@code content}.
*
* @param content the content, must not be {@code null}. If content is null or empty,
* use {@link FullBulkStringRedisMessage#NULL_INSTANCE} or {@link FullBulkStringRedisMessage#EMPTY_INSTANCE}
* instead of constructor.
*/
public FullBulkStringRedisMessage(ByteBuf content) {
super(content);
}
/**
* Returns whether the content of this message is {@code null}.
*
* @return indicates whether the content of this message is {@code null}.
*/
public boolean isNull() {
return false;
}
@Override
public String toString() {
return new StringBuilder(StringUtil.simpleClassName(this))
.append('[')
.append("content=")
.append(content())
.append(']').toString();
}
/**
* A predefined null instance of {@link FullBulkStringRedisMessage}.
*/
public static final FullBulkStringRedisMessage NULL_INSTANCE = new FullBulkStringRedisMessage() {
@Override
public boolean isNull() {
return true;
}
@Override
public ByteBuf content() {
return Unpooled.EMPTY_BUFFER;
}
@Override
public FullBulkStringRedisMessage copy() {
return this;
}
@Override
public FullBulkStringRedisMessage duplicate() {
return this;
}
@Override
public int refCnt() {
return 1;
}
@Override
public FullBulkStringRedisMessage retain() {
return this;
}
@Override
public FullBulkStringRedisMessage retain(int increment) {
return this;
}
@Override
public FullBulkStringRedisMessage touch() {
return this;
}
@Override
public FullBulkStringRedisMessage touch(Object hint) {
return this;
}
@Override
public boolean release() {
return false;
}
@Override
public boolean release(int decrement) {
return false;
}
};
/**
* A predefined empty instance of {@link FullBulkStringRedisMessage}.
*/
public static final FullBulkStringRedisMessage EMPTY_INSTANCE = new FullBulkStringRedisMessage() {
@Override
public ByteBuf content() {
return Unpooled.EMPTY_BUFFER;
}
@Override
public FullBulkStringRedisMessage copy() {
return EMPTY_INSTANCE;
}
@Override
public FullBulkStringRedisMessage duplicate() {
return this;
}
@Override
public int refCnt() {
return 1;
}
@Override
public FullBulkStringRedisMessage retain() {
return this;
}
@Override
public FullBulkStringRedisMessage retain(int increment) {
return this;
}
@Override
public FullBulkStringRedisMessage touch() {
return this;
}
@Override
public FullBulkStringRedisMessage touch(Object hint) {
return this;
}
@Override
public boolean release() {
return false;
}
@Override
public boolean release(int decrement) {
return false;
}
};
}

View File

@ -0,0 +1,53 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.redis;
import io.netty.util.internal.StringUtil;
/**
* Integers of <a href="http://redis.io/topics/protocol">RESP</a>.
*/
public final class IntegerRedisMessage implements RedisMessage {
private final long value;
/**
* Creates a {@link IntegerRedisMessage} for the given {@code content}.
*
* @param value the message content.
*/
public IntegerRedisMessage(long value) {
this.value = value;
}
/**
* Get long value of this {@link IntegerRedisMessage}.
*
* @return long value
*/
public long value() {
return value;
}
@Override
public String toString() {
return new StringBuilder(StringUtil.simpleClassName(this))
.append('[')
.append("value=")
.append(value)
.append(']').toString();
}
}

View File

@ -0,0 +1,82 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.Unpooled;
/**
* A last chunk of Bulk Strings.
*/
public interface LastBulkStringRedisContent extends BulkStringRedisContent {
/**
* The 'end of content' marker in chunked encoding.
*/
LastBulkStringRedisContent EMPTY_LAST_CONTENT = new LastBulkStringRedisContent() {
@Override
public ByteBuf content() {
return Unpooled.EMPTY_BUFFER;
}
@Override
public ByteBufHolder copy() {
return this;
}
@Override
public ByteBufHolder retain(int increment) {
return this;
}
@Override
public ByteBufHolder retain() {
return this;
}
@Override
public int refCnt() {
return 1;
}
@Override
public ByteBufHolder touch() {
return this;
}
@Override
public ByteBufHolder touch(Object hint) {
return this;
}
@Override
public boolean release() {
return false;
}
@Override
public boolean release(int decrement) {
return false;
}
@Override
public ByteBufHolder duplicate() {
return this;
}
};
}

View File

@ -0,0 +1,91 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.redis;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.CodecException;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.ReferenceCountUtil;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
/**
* Aggregates {@link RedisMessage} parts into {@link ArrayRedisMessage}. This decoder
* should be used together with {@link RedisDecoder}.
*/
public final class RedisArrayAggregator extends MessageToMessageDecoder<RedisMessage> {
private final Deque<AggregateState> depths = new ArrayDeque<AggregateState>(4);
@Override
protected void decode(ChannelHandlerContext ctx, RedisMessage msg, List<Object> out) throws Exception {
if (msg instanceof ArrayHeaderRedisMessage) {
msg = decodeRedisArrayHeader((ArrayHeaderRedisMessage) msg);
if (msg == null) {
return;
}
} else {
ReferenceCountUtil.retain(msg);
}
while (!depths.isEmpty()) {
AggregateState current = depths.peek();
current.children.add(msg);
// if current aggregation completed, go to parent aggregation.
if (current.children.size() == current.length) {
msg = new ArrayRedisMessage(current.children);
depths.pop();
} else {
// not aggregated yet. try next time.
return;
}
}
out.add(msg);
}
private RedisMessage decodeRedisArrayHeader(ArrayHeaderRedisMessage header) {
if (header.isNull()) {
return ArrayRedisMessage.NULL_INSTANCE;
} else if (header.length() == 0L) {
return ArrayRedisMessage.EMPTY_INSTANCE;
} else if (header.length() > 0L) {
// Currently, this codec doesn't support `long` length for arrays because Java's List.size() is int.
if (header.length() > Integer.MAX_VALUE) {
throw new CodecException("this codec doesn't support longer length than " + Integer.MAX_VALUE);
}
// start aggregating array
depths.push(new AggregateState((int) header.length()));
return null;
} else {
throw new CodecException("bad length: " + header.length());
}
}
private static final class AggregateState {
private final int length;
private final List<RedisMessage> children;
AggregateState(int length) {
this.length = length;
this.children = new ArrayList<RedisMessage>(length);
}
}
}

View File

@ -0,0 +1,105 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.MessageAggregator;
/**
* A {@link ChannelHandler} that aggregates an {@link BulkStringHeaderRedisMessage}
* and its following {@link BulkStringRedisContent}s into a single {@link FullBulkStringRedisMessage}
* with no following {@link BulkStringRedisContent}s. It is useful when you don't want to take
* care of {@link RedisMessage}s whose transfer encoding is 'chunked'. Insert this
* handler after {@link RedisDecoder} in the {@link ChannelPipeline}:
* <pre>
* {@link ChannelPipeline} p = ...;
* ...
* p.addLast("encoder", new {@link RedisEncoder}());
* p.addLast("decoder", new {@link RedisDecoder}());
* p.addLast("aggregator", <b>new {@link RedisBulkStringAggregator}()</b>);
* ...
* p.addLast("handler", new HttpRequestHandler());
* </pre>
* Be aware that you need to have the {@link RedisEncoder} before the {@link RedisBulkStringAggregator}
* in the {@link ChannelPipeline}.
*/
public final class RedisBulkStringAggregator extends MessageAggregator<RedisMessage, BulkStringHeaderRedisMessage,
BulkStringRedisContent, FullBulkStringRedisMessage> {
/**
* Creates a new instance.
*/
public RedisBulkStringAggregator() {
super(RedisConstants.REDIS_MESSAGE_MAX_LENGTH);
}
@Override
protected boolean isStartMessage(RedisMessage msg) throws Exception {
return msg instanceof BulkStringHeaderRedisMessage && !isAggregated(msg);
}
@Override
protected boolean isContentMessage(RedisMessage msg) throws Exception {
return msg instanceof BulkStringRedisContent;
}
@Override
protected boolean isLastContentMessage(BulkStringRedisContent msg) throws Exception {
return msg instanceof LastBulkStringRedisContent;
}
@Override
protected boolean isAggregated(RedisMessage msg) throws Exception {
return msg instanceof FullBulkStringRedisMessage;
}
@Override
protected boolean isContentLengthInvalid(BulkStringHeaderRedisMessage start, int maxContentLength)
throws Exception {
return start.bulkStringLength() > maxContentLength;
}
@Override
protected Object newContinueResponse(BulkStringHeaderRedisMessage start, int maxContentLength,
ChannelPipeline pipeline) throws Exception {
return null;
}
@Override
protected boolean closeAfterContinueResponse(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected boolean ignoreContentAfterContinueResponse(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected FullBulkStringRedisMessage beginAggregation(BulkStringHeaderRedisMessage start, ByteBuf content)
throws Exception {
switch (start.bulkStringLength()) {
case RedisConstants.NULL_VALUE:
return FullBulkStringRedisMessage.NULL_INSTANCE;
case 0:
return FullBulkStringRedisMessage.EMPTY_INSTANCE;
default:
return new FullBulkStringRedisMessage(content);
}
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.redis;
import io.netty.handler.codec.CodecException;
/**
* An {@link Exception} which is thrown by {@link RedisEncoder} or {@link RedisDecoder}.
*/
public final class RedisCodecException extends CodecException {
private static final long serialVersionUID = 5570454251549268063L;
/**
* Creates a new instance.
*/
public RedisCodecException(String message) {
super(message);
}
/**
* Creates a new instance.
*/
public RedisCodecException(Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,55 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.redis;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.PlatformDependent;
/**
* Utilities for codec-redis.
*/
final class RedisCodecUtil {
private RedisCodecUtil() {
}
static byte[] longToAsciiBytes(long value) {
return Long.toString(value).getBytes(CharsetUtil.US_ASCII);
}
/**
* Returns a {@code short} value using endian order.
*/
static short makeShort(char first, char second) {
return PlatformDependent.BIG_ENDIAN_NATIVE_ORDER ?
(short) ((second << 8) | first) : (short) ((first << 8) | second);
}
/**
* Returns a {@code byte[]} of {@code short} value. This is opposite of {@code makeShort()}.
*/
static byte[] shortToBytes(short value) {
byte[] bytes = new byte[2];
if (PlatformDependent.BIG_ENDIAN_NATIVE_ORDER) {
bytes[1] = (byte) ((value >> 8) & 0xff);
bytes[0] = (byte) (value & 0xff);
} else {
bytes[0] = (byte) ((value >> 8) & 0xff);
bytes[1] = (byte) (value & 0xff);
}
return bytes;
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.redis;
/**
* Constant values for Redis encoder/decoder.
*/
final class RedisConstants {
private RedisConstants() {
}
static final int TYPE_LENGTH = 1;
static final int EOL_LENGTH = 2;
static final int NULL_LENGTH = 2;
static final int NULL_VALUE = -1;
static final int REDIS_MESSAGE_MAX_LENGTH = 512 * 1024 * 1024; // 512MB
static final int POSITIVE_LONG_MAX_LENGTH = 19; // length of Long.MAX_VALUE
static final int LONG_MAX_LENGTH = POSITIVE_LONG_MAX_LENGTH + 1; // +1 is sign
static final short NULL_SHORT = RedisCodecUtil.makeShort('-', '1');
static final short EOL_SHORT = RedisCodecUtil.makeShort('\r', '\n');
}

View File

@ -0,0 +1,306 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.ByteProcessor;
import io.netty.util.CharsetUtil;
import java.util.List;
/**
* Decodes the Redis protocol into {@link RedisMessage} objects following
* <a href="http://redis.io/topics/protocol">RESP (REdis Serialization Protocol)</a>.
*
* {@link RedisMessage} parts can be aggregated to {@link RedisMessage} using
* {@link RedisArrayAggregator} or processed directly.
*/
public final class RedisDecoder extends ByteToMessageDecoder {
private final ToPositiveLongProcessor toPositiveLongProcessor = new ToPositiveLongProcessor();
private final int maxInlineMessageLength;
private final RedisMessagePool messagePool;
// current decoding states
private State state = State.DECODE_TYPE;
private RedisMessageType type;
private int remainingBulkLength;
private enum State {
DECODE_TYPE,
DECODE_INLINE, // SIMPLE_STRING, ERROR, INTEGER
DECODE_LENGTH, // BULK_STRING, ARRAY_HEADER
DECODE_BULK_STRING_EOL,
DECODE_BULK_STRING_CONTENT,
}
/**
* Creates a new instance with default {@code maxInlineMessageLength} and {@code messageaPool}.
*/
public RedisDecoder() {
// 1024 * 64 is max inline length of current Redis server implementation.
this(1024 * 64, FixedRedisMessagePool.INSTANCE);
}
/**
* Creates a new instance.
* @param maxInlineMessageLength the maximum length of inline message.
* @param messagePool the predefined message pool.
*/
public RedisDecoder(int maxInlineMessageLength, RedisMessagePool messagePool) {
if (maxInlineMessageLength <= 0 || maxInlineMessageLength > RedisConstants.REDIS_MESSAGE_MAX_LENGTH) {
throw new RedisCodecException("maxInlineMessageLength: " + maxInlineMessageLength +
" (expected: <= " + RedisConstants.REDIS_MESSAGE_MAX_LENGTH + ")");
}
this.maxInlineMessageLength = maxInlineMessageLength;
this.messagePool = messagePool;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
try {
for (;;) {
switch (state) {
case DECODE_TYPE:
if (!decodeType(in)) {
return;
}
break;
case DECODE_INLINE:
if (!decodeInline(in, out)) {
return;
}
break;
case DECODE_LENGTH:
if (!decodeLength(in, out)) {
return;
}
break;
case DECODE_BULK_STRING_EOL:
if (!decodeBulkStringEndOfLine(in, out)) {
return;
}
break;
case DECODE_BULK_STRING_CONTENT:
if (!decodeBulkStringContent(in, out)) {
return;
}
break;
default:
throw new RedisCodecException("Unknown state: " + state);
}
}
} catch (RedisCodecException e) {
resetDecoder();
throw e;
} catch (Exception e) {
resetDecoder();
throw new RedisCodecException(e);
}
}
private void resetDecoder() {
state = State.DECODE_TYPE;
remainingBulkLength = 0;
}
private boolean decodeType(ByteBuf in) throws Exception {
if (!in.isReadable()) {
return false;
}
type = RedisMessageType.valueOf(in.readByte());
state = type.isInline() ? State.DECODE_INLINE : State.DECODE_LENGTH;
return true;
}
private boolean decodeInline(ByteBuf in, List<Object> out) throws Exception {
ByteBuf lineBytes = readLine(in);
if (lineBytes == null) {
if (in.readableBytes() > maxInlineMessageLength) {
throw new RedisCodecException("length: " + in.readableBytes() +
" (expected: <= " + maxInlineMessageLength + ")");
}
return false;
}
out.add(newInlineRedisMessage(type, lineBytes));
resetDecoder();
return true;
}
private boolean decodeLength(ByteBuf in, List<Object> out) throws Exception {
ByteBuf lineByteBuf = readLine(in);
if (lineByteBuf == null) {
return false;
}
final long length = parseRedisNumber(lineByteBuf);
if (length < RedisConstants.NULL_VALUE) {
throw new RedisCodecException("length: " + length + " (expected: >= " + RedisConstants.NULL_VALUE + ")");
}
switch (type) {
case ARRAY_HEADER:
out.add(new ArrayHeaderRedisMessage(length));
resetDecoder();
return true;
case BULK_STRING:
if (length > RedisConstants.REDIS_MESSAGE_MAX_LENGTH) {
throw new RedisCodecException("length: " + length + " (expected: <= " +
RedisConstants.REDIS_MESSAGE_MAX_LENGTH + ")");
}
remainingBulkLength = (int) length; // range(int) is already checked.
out.add(new BulkStringHeaderRedisMessage(remainingBulkLength));
return decodeBulkString(remainingBulkLength, in, out);
default:
throw new RedisCodecException("bad type: " + type);
}
}
private boolean decodeBulkString(int length, ByteBuf in, List<Object> out) throws Exception {
switch (length) {
case RedisConstants.NULL_VALUE: // $-1\r\n
out.add(FullBulkStringRedisMessage.NULL_INSTANCE);
resetDecoder();
return true;
case 0:
state = State.DECODE_BULK_STRING_EOL;
return decodeBulkStringEndOfLine(in, out);
default: // expectedBulkLength is always positive.
state = State.DECODE_BULK_STRING_CONTENT;
return decodeBulkStringContent(in, out);
}
}
// $0\r\n <here> \r\n
private boolean decodeBulkStringEndOfLine(ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < RedisConstants.EOL_LENGTH) {
return false;
}
readEndOfLine(in);
out.add(FullBulkStringRedisMessage.EMPTY_INSTANCE);
resetDecoder();
return true;
}
// ${expectedBulkLength}\r\n <here> {data...}\r\n
private boolean decodeBulkStringContent(ByteBuf in, List<Object> out) throws Exception {
final int readableBytes = in.readableBytes();
if (readableBytes == 0) {
return false;
}
// if this is last frame.
if (readableBytes >= remainingBulkLength + RedisConstants.EOL_LENGTH) {
ByteBuf content = in.readSlice(remainingBulkLength).retain();
readEndOfLine(in);
out.add(new DefaultLastBulkStringRedisContent(content));
resetDecoder();
return true;
}
// chunked write.
int toRead = Math.min(remainingBulkLength, readableBytes);
remainingBulkLength -= toRead;
out.add(new DefaultBulkStringRedisContent(in.readSlice(toRead).retain()));
return true;
}
private static void readEndOfLine(final ByteBuf in) {
final short delim = in.readShort();
if (RedisConstants.EOL_SHORT == delim) {
return;
}
final byte[] bytes = RedisCodecUtil.shortToBytes(delim);
throw new RedisCodecException("delimiter: [" + bytes[0] + "," + bytes[1] + "] (expected: \\r\\n)");
}
private RedisMessage newInlineRedisMessage(RedisMessageType messageType, ByteBuf content) {
switch (messageType) {
case SIMPLE_STRING: {
SimpleStringRedisMessage cached = messagePool.getSimpleString(content);
return cached != null ? cached : new SimpleStringRedisMessage(content.toString(CharsetUtil.UTF_8));
}
case ERROR: {
ErrorRedisMessage cached = messagePool.getError(content);
return cached != null ? cached : new ErrorRedisMessage(content.toString(CharsetUtil.UTF_8));
}
case INTEGER: {
IntegerRedisMessage cached = messagePool.getInteger(content);
return cached != null ? cached : new IntegerRedisMessage(parseRedisNumber(content));
}
default:
throw new RedisCodecException("bad type: " + messageType);
}
}
private static ByteBuf readLine(ByteBuf in) {
if (!in.isReadable(RedisConstants.EOL_LENGTH)) {
return null;
}
final int lfIndex = in.forEachByte(ByteProcessor.FIND_LF);
if (lfIndex < 0) {
return null;
}
ByteBuf data = in.readSlice(lfIndex - in.readerIndex() - 1); // `-1` is for CR
readEndOfLine(in); // validate CR LF
return data;
}
private long parseRedisNumber(ByteBuf byteBuf) {
final int readableBytes = byteBuf.readableBytes();
final boolean negative = readableBytes > 0 && byteBuf.getByte(byteBuf.readerIndex()) == '-';
final int extraOneByteForNegative = negative ? 1 : 0;
if (readableBytes <= extraOneByteForNegative) {
throw new RedisCodecException("no number to parse: " + byteBuf.toString(CharsetUtil.US_ASCII));
}
if (readableBytes > RedisConstants.POSITIVE_LONG_MAX_LENGTH + extraOneByteForNegative) {
throw new RedisCodecException("too many characters to be a valid RESP Integer: " +
byteBuf.toString(CharsetUtil.US_ASCII));
}
if (negative) {
return -parsePositiveNumber(byteBuf.skipBytes(extraOneByteForNegative));
}
return parsePositiveNumber(byteBuf);
}
private long parsePositiveNumber(ByteBuf byteBuf) {
toPositiveLongProcessor.reset();
byteBuf.forEachByte(toPositiveLongProcessor);
return toPositiveLongProcessor.content();
}
private static final class ToPositiveLongProcessor implements ByteProcessor {
private long result;
@Override
public boolean process(byte value) throws Exception {
if (value < '0' || value > '9') {
throw new RedisCodecException("bad byte in number: " + value);
}
result = result * 10 + (value - '0');
return true;
}
public long content() {
return result;
}
public void reset() {
result = 0;
}
}
}

View File

@ -0,0 +1,197 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.CodecException;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.internal.ObjectUtil;
import java.util.List;
/**
* Encodes {@link RedisMessage} into bytes following
* <a href="http://redis.io/topics/protocol">RESP (REdis Serialization Protocol)</a>.
*/
public class RedisEncoder extends MessageToMessageEncoder<RedisMessage> {
private final RedisMessagePool messagePool;
/**
* Creates a new instance with default {@code messagePool}.
*/
public RedisEncoder() {
this(FixedRedisMessagePool.INSTANCE);
}
/**
* Creates a new instance.
* @param messagePool the predefined message pool.
*/
public RedisEncoder(RedisMessagePool messagePool) {
this.messagePool = ObjectUtil.checkNotNull(messagePool, "messagePool");
}
@Override
protected void encode(ChannelHandlerContext ctx, RedisMessage msg, List<Object> out) throws Exception {
try {
writeRedisMessage(ctx.alloc(), msg, out);
} catch (CodecException e) {
throw e;
} catch (Exception e) {
throw new CodecException(e);
}
}
private void writeRedisMessage(ByteBufAllocator allocator, RedisMessage msg, List<Object> out) {
if (msg instanceof SimpleStringRedisMessage) {
writeSimpleStringMessage(allocator, (SimpleStringRedisMessage) msg, out);
} else if (msg instanceof ErrorRedisMessage) {
writeErrorMessage(allocator, (ErrorRedisMessage) msg, out);
} else if (msg instanceof IntegerRedisMessage) {
writeIntegerMessage(allocator, (IntegerRedisMessage) msg, out);
} else if (msg instanceof FullBulkStringRedisMessage) {
writeFullBulkStringMessage(allocator, (FullBulkStringRedisMessage) msg, out);
} else if (msg instanceof BulkStringRedisContent) {
writeBulkStringContent(allocator, (BulkStringRedisContent) msg, out);
} else if (msg instanceof BulkStringHeaderRedisMessage) {
writeBulkStringHeader(allocator, (BulkStringHeaderRedisMessage) msg, out);
} else if (msg instanceof ArrayHeaderRedisMessage) {
writeArrayHeader(allocator, (ArrayHeaderRedisMessage) msg, out);
} else if (msg instanceof ArrayRedisMessage) {
writeArrayMessage(allocator, (ArrayRedisMessage) msg, out);
} else {
throw new CodecException("unknown message type: " + msg);
}
}
private static void writeSimpleStringMessage(ByteBufAllocator allocator, SimpleStringRedisMessage msg,
List<Object> out) {
writeString(allocator, RedisMessageType.SIMPLE_STRING.value(), msg.content(), out);
}
private static void writeErrorMessage(ByteBufAllocator allocator, ErrorRedisMessage msg, List<Object> out) {
writeString(allocator, RedisMessageType.ERROR.value(), msg.content(), out);
}
private static void writeString(ByteBufAllocator allocator, byte type, String content, List<Object> out) {
ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + ByteBufUtil.utf8MaxBytes(content) +
RedisConstants.EOL_LENGTH);
buf.writeByte(type);
ByteBufUtil.writeUtf8(buf, content);
buf.writeShort(RedisConstants.EOL_SHORT);
out.add(buf);
}
private void writeIntegerMessage(ByteBufAllocator allocator, IntegerRedisMessage msg, List<Object> out) {
ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.LONG_MAX_LENGTH +
RedisConstants.EOL_LENGTH);
buf.writeByte(RedisMessageType.INTEGER.value());
buf.writeBytes(numberToBytes(msg.value()));
buf.writeShort(RedisConstants.EOL_SHORT);
out.add(buf);
}
private void writeBulkStringHeader(ByteBufAllocator allocator, BulkStringHeaderRedisMessage msg, List<Object> out) {
final ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH +
(msg.isNull() ? RedisConstants.NULL_LENGTH :
RedisConstants.LONG_MAX_LENGTH + RedisConstants.EOL_LENGTH));
buf.writeByte(RedisMessageType.BULK_STRING.value());
if (msg.isNull()) {
buf.writeShort(RedisConstants.NULL_SHORT);
} else {
buf.writeBytes(numberToBytes(msg.bulkStringLength()));
buf.writeShort(RedisConstants.EOL_SHORT);
}
out.add(buf);
}
private static void writeBulkStringContent(ByteBufAllocator allocator, BulkStringRedisContent msg,
List<Object> out) {
out.add(msg.content().retain());
if (msg instanceof LastBulkStringRedisContent) {
out.add(allocator.ioBuffer(RedisConstants.EOL_LENGTH).writeShort(RedisConstants.EOL_SHORT));
}
}
private void writeFullBulkStringMessage(ByteBufAllocator allocator, FullBulkStringRedisMessage msg,
List<Object> out) {
if (msg.isNull()) {
ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.NULL_LENGTH +
RedisConstants.EOL_LENGTH);
buf.writeByte(RedisMessageType.BULK_STRING.value());
buf.writeShort(RedisConstants.NULL_SHORT);
buf.writeShort(RedisConstants.EOL_SHORT);
out.add(buf);
} else {
ByteBuf headerBuf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.LONG_MAX_LENGTH +
RedisConstants.EOL_LENGTH);
headerBuf.writeByte(RedisMessageType.BULK_STRING.value());
headerBuf.writeBytes(numberToBytes(msg.content().readableBytes()));
headerBuf.writeShort(RedisConstants.EOL_SHORT);
out.add(headerBuf);
out.add(msg.content().retain());
out.add(allocator.ioBuffer(RedisConstants.EOL_LENGTH).writeShort(RedisConstants.EOL_SHORT));
}
}
/**
* Write array header only without body. Use this if you want to write arrays as streaming.
*/
private void writeArrayHeader(ByteBufAllocator allocator, ArrayHeaderRedisMessage msg, List<Object> out) {
writeArrayHeader(allocator, msg.isNull(), msg.length(), out);
}
/**
* Write full constructed array message.
*/
private void writeArrayMessage(ByteBufAllocator allocator, ArrayRedisMessage msg, List<Object> out) {
if (msg.isNull()) {
writeArrayHeader(allocator, msg.isNull(), RedisConstants.NULL_VALUE, out);
} else {
writeArrayHeader(allocator, msg.isNull(), msg.children().size(), out);
for (RedisMessage child : msg.children()) {
writeRedisMessage(allocator, child, out);
}
}
}
private void writeArrayHeader(ByteBufAllocator allocator, boolean isNull, long length, List<Object> out) {
if (isNull) {
final ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.NULL_LENGTH +
RedisConstants.EOL_LENGTH);
buf.writeByte(RedisMessageType.ARRAY_HEADER.value());
buf.writeShort(RedisConstants.NULL_SHORT);
buf.writeShort(RedisConstants.EOL_SHORT);
out.add(buf);
} else {
final ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.LONG_MAX_LENGTH +
RedisConstants.EOL_LENGTH);
buf.writeByte(RedisMessageType.ARRAY_HEADER.value());
buf.writeBytes(numberToBytes(length));
buf.writeShort(RedisConstants.EOL_SHORT);
out.add(buf);
}
}
private byte[] numberToBytes(long value) {
byte[] bytes = messagePool.getByteBufOfInteger(value);
return bytes != null ? bytes : RedisCodecUtil.longToAsciiBytes(value);
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.redis;
/**
* RedisMessage is base interface for codec-redis.
*/
public interface RedisMessage {
}

View File

@ -0,0 +1,59 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ByteBuf;
/**
* A strategy interface for caching {@link RedisMessage}s.
*/
public interface RedisMessagePool {
/**
* Returns {@link SimpleStringRedisMessage} for given {@code content}. Returns {@code null} it does not exist.
*/
SimpleStringRedisMessage getSimpleString(String content);
/**
* Returns {@link SimpleStringRedisMessage} for given {@code content}. Returns {@code null} it does not exist.
*/
SimpleStringRedisMessage getSimpleString(ByteBuf content);
/**
* Returns {@link ErrorRedisMessage} for given {@code content}. Returns {@code null} it does not exist.
*/
ErrorRedisMessage getError(String content);
/**
* Returns {@link ErrorRedisMessage} for given {@code content}. Returns {@code null} it does not exist.
*/
ErrorRedisMessage getError(ByteBuf content);
/**
* Returns {@link IntegerRedisMessage} for given {@code value}. Returns {@code null} it does not exist.
*/
IntegerRedisMessage getInteger(long value);
/**
* Returns {@link IntegerRedisMessage} for given {@code content}. Returns {@code null} it does not exist.
*/
IntegerRedisMessage getInteger(ByteBuf content);
/**
* Returns {@code byte[]} for given {@code msg}. Returns {@code null} it does not exist.
*/
byte[] getByteBufOfInteger(long value);
}

View File

@ -0,0 +1,72 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.redis;
/**
* Type of <a href="http://redis.io/topics/protocol">RESP (REdis Serialization Protocol)</a>.
*/
public enum RedisMessageType {
SIMPLE_STRING((byte) '+', true),
ERROR((byte) '-', true),
INTEGER((byte) ':', true),
BULK_STRING((byte) '$', false),
ARRAY_HEADER((byte) '*', false),
ARRAY((byte) '*', false); // for aggregated
private final byte value;
private final boolean inline;
RedisMessageType(byte value, boolean inline) {
this.value = value;
this.inline = inline;
}
/**
* Returns prefix {@code byte} for this type.
*/
public byte value() {
return value;
}
/**
* Returns {@code true} if this type is inline type, or returns {@code false}. If this is {@code true},
* this type doesn't have length field.
*/
public boolean isInline() {
return inline;
}
/**
* Return {@link RedisMessageType} for this type prefix {@code byte}.
*/
public static RedisMessageType valueOf(byte value) {
switch (value) {
case '+':
return SIMPLE_STRING;
case '-':
return ERROR;
case ':':
return INTEGER;
case '$':
return BULK_STRING;
case '*':
return ARRAY_HEADER;
default:
throw new RedisCodecException("Unknown RedisMessageType: " + value);
}
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.redis;
import io.netty.util.internal.StringUtil;
/**
* Simple Strings of <a href="http://redis.io/topics/protocol">RESP</a>.
*/
public final class SimpleStringRedisMessage extends AbstractStringRedisMessage {
/**
* Creates a {@link SimpleStringRedisMessage} for the given {@code content}.
*
* @param content the message content, must not be {@code null}.
*/
public SimpleStringRedisMessage(String content) {
super(content);
}
@Override
public String toString() {
return new StringBuilder(StringUtil.simpleClassName(this))
.append('[')
.append("content=")
.append(content())
.append(']').toString();
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
/**
* Encoder, decoder for Redis.
*/
package io.netty.handler.codec.redis;

View File

@ -0,0 +1,53 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
final class RedisCodecTestUtil {
private RedisCodecTestUtil() {
}
static byte[] bytesOf(long value) {
return bytesOf(Long.toString(value));
}
static byte[] bytesOf(String s) {
return s.getBytes(CharsetUtil.UTF_8);
}
static byte[] bytesOf(ByteBuf buf) {
byte[] data = new byte[buf.readableBytes()];
buf.readBytes(data);
return data;
}
static String stringOf(ByteBuf buf) {
return new String(bytesOf(buf));
}
static ByteBuf byteBufOf(String s) {
return byteBufOf(bytesOf(s));
}
static ByteBuf byteBufOf(byte[] data) {
return Unpooled.wrappedBuffer(data);
}
}

View File

@ -0,0 +1,257 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.IllegalReferenceCountException;
import io.netty.util.ReferenceCountUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import static io.netty.handler.codec.redis.RedisCodecTestUtil.*;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
/**
* Verifies the correct functionality of the {@link RedisDecoder} and {@link RedisArrayAggregator}.
*/
public class RedisDecoderTest {
private EmbeddedChannel channel;
@Before
public void setup() throws Exception {
channel = new EmbeddedChannel(
new RedisDecoder(),
new RedisBulkStringAggregator(),
new RedisArrayAggregator());
}
@After
public void teardown() throws Exception {
assertFalse(channel.finish());
}
@Test
public void shouldDecodeSimpleString() {
assertFalse(channel.writeInbound(byteBufOf("+")));
assertFalse(channel.writeInbound(byteBufOf("O")));
assertFalse(channel.writeInbound(byteBufOf("K")));
assertTrue(channel.writeInbound(byteBufOf("\r\n")));
SimpleStringRedisMessage msg = channel.readInbound();
assertThat(msg.content(), is("OK"));
ReferenceCountUtil.release(msg);
}
@Test
public void shouldDecodeTwoSimpleStrings() {
assertFalse(channel.writeInbound(byteBufOf("+")));
assertFalse(channel.writeInbound(byteBufOf("O")));
assertFalse(channel.writeInbound(byteBufOf("K")));
assertTrue(channel.writeInbound(byteBufOf("\r\n+SEC")));
assertTrue(channel.writeInbound(byteBufOf("OND\r\n")));
SimpleStringRedisMessage msg1 = channel.readInbound();
assertThat(msg1.content(), is("OK"));
ReferenceCountUtil.release(msg1);
SimpleStringRedisMessage msg2 = channel.readInbound();
assertThat(msg2.content(), is("SECOND"));
ReferenceCountUtil.release(msg2);
}
@Test
public void shouldDecodeError() {
String content = "ERROR sample message";
assertFalse(channel.writeInbound(byteBufOf("-")));
assertFalse(channel.writeInbound(byteBufOf(content)));
assertFalse(channel.writeInbound(byteBufOf("\r")));
assertTrue(channel.writeInbound(byteBufOf("\n")));
ErrorRedisMessage msg = channel.readInbound();
assertThat(msg.content(), is(content));
ReferenceCountUtil.release(msg);
}
@Test
public void shouldDecodeInteger() {
long value = 1234L;
byte[] content = bytesOf(value);
assertFalse(channel.writeInbound(byteBufOf(":")));
assertFalse(channel.writeInbound(byteBufOf(content)));
assertTrue(channel.writeInbound(byteBufOf("\r\n")));
IntegerRedisMessage msg = channel.readInbound();
assertThat(msg.value(), is(value));
ReferenceCountUtil.release(msg);
}
@Test
public void shouldDecodeBulkString() {
String buf1 = "bulk\nst";
String buf2 = "ring\ntest\n1234";
byte[] content = bytesOf(buf1 + buf2);
assertFalse(channel.writeInbound(byteBufOf("$")));
assertFalse(channel.writeInbound(byteBufOf(Integer.toString(content.length))));
assertFalse(channel.writeInbound(byteBufOf("\r\n")));
assertFalse(channel.writeInbound(byteBufOf(buf1)));
assertFalse(channel.writeInbound(byteBufOf(buf2)));
assertTrue(channel.writeInbound(byteBufOf("\r\n")));
FullBulkStringRedisMessage msg = channel.readInbound();
assertThat(bytesOf(msg.content()), is(content));
ReferenceCountUtil.release(msg);
}
@Test
public void shouldDecodeEmptyBulkString() {
byte[] content = bytesOf("");
assertFalse(channel.writeInbound(byteBufOf("$")));
assertFalse(channel.writeInbound(byteBufOf(Integer.toString(content.length))));
assertFalse(channel.writeInbound(byteBufOf("\r\n")));
assertFalse(channel.writeInbound(byteBufOf(content)));
assertTrue(channel.writeInbound(byteBufOf("\r\n")));
FullBulkStringRedisMessage msg = channel.readInbound();
assertThat(bytesOf(msg.content()), is(content));
ReferenceCountUtil.release(msg);
}
@Test
public void shouldDecodeNullBulkString() {
assertFalse(channel.writeInbound(byteBufOf("$")));
assertFalse(channel.writeInbound(byteBufOf(Integer.toString(-1))));
assertTrue(channel.writeInbound(byteBufOf("\r\n")));
FullBulkStringRedisMessage msg = channel.readInbound();
assertThat(msg.isNull(), is(true));
ReferenceCountUtil.release(msg);
}
@Test
public void shouldDecodeSimpleArray() throws Exception {
assertFalse(channel.writeInbound(byteBufOf("*3\r\n")));
assertFalse(channel.writeInbound(byteBufOf(":1234\r\n")));
assertFalse(channel.writeInbound(byteBufOf("+sim")));
assertFalse(channel.writeInbound(byteBufOf("ple\r\n-err")));
assertTrue(channel.writeInbound(byteBufOf("or\r\n")));
ArrayRedisMessage msg = channel.readInbound();
List<RedisMessage> children = msg.children();
assertThat(msg.children().size(), is(equalTo(3)));
assertThat(children.get(0), instanceOf(IntegerRedisMessage.class));
assertThat(((IntegerRedisMessage) children.get(0)).value(), is(1234L));
assertThat(children.get(1), instanceOf(SimpleStringRedisMessage.class));
assertThat(((SimpleStringRedisMessage) children.get(1)).content(), is("simple"));
assertThat(children.get(2), instanceOf(ErrorRedisMessage.class));
assertThat(((ErrorRedisMessage) children.get(2)).content(), is("error"));
ReferenceCountUtil.release(msg);
}
@Test
public void shouldDecodeNestedArray() throws Exception {
ByteBuf buf = Unpooled.buffer();
buf.writeBytes(byteBufOf("*2\r\n"));
buf.writeBytes(byteBufOf("*3\r\n:1\r\n:2\r\n:3\r\n"));
buf.writeBytes(byteBufOf("*2\r\n+Foo\r\n-Bar\r\n"));
assertTrue(channel.writeInbound(buf));
ArrayRedisMessage msg = channel.readInbound();
List<RedisMessage> children = msg.children();
assertThat(msg.children().size(), is(2));
ArrayRedisMessage intArray = (ArrayRedisMessage) children.get(0);
ArrayRedisMessage strArray = (ArrayRedisMessage) children.get(1);
assertThat(intArray.children().size(), is(3));
assertThat(((IntegerRedisMessage) intArray.children().get(0)).value(), is(1L));
assertThat(((IntegerRedisMessage) intArray.children().get(1)).value(), is(2L));
assertThat(((IntegerRedisMessage) intArray.children().get(2)).value(), is(3L));
assertThat(strArray.children().size(), is(2));
assertThat(((SimpleStringRedisMessage) strArray.children().get(0)).content(), is("Foo"));
assertThat(((ErrorRedisMessage) strArray.children().get(1)).content(), is("Bar"));
ReferenceCountUtil.release(msg);
}
@Test(expected = IllegalReferenceCountException.class)
public void shouldErrorOnDoubleReleaseArrayReferenceCounted() throws Exception {
ByteBuf buf = Unpooled.buffer();
buf.writeBytes(byteBufOf("*2\r\n"));
buf.writeBytes(byteBufOf("*3\r\n:1\r\n:2\r\n:3\r\n"));
buf.writeBytes(byteBufOf("*2\r\n+Foo\r\n-Bar\r\n"));
assertTrue(channel.writeInbound(buf));
ArrayRedisMessage msg = channel.readInbound();
ReferenceCountUtil.release(msg);
ReferenceCountUtil.release(msg);
}
@Test(expected = IllegalReferenceCountException.class)
public void shouldErrorOnReleaseArrayChildReferenceCounted() throws Exception {
ByteBuf buf = Unpooled.buffer();
buf.writeBytes(byteBufOf("*2\r\n"));
buf.writeBytes(byteBufOf("*3\r\n:1\r\n:2\r\n:3\r\n"));
buf.writeBytes(byteBufOf("$3\r\nFoo\r\n"));
assertTrue(channel.writeInbound(buf));
ArrayRedisMessage msg = channel.readInbound();
List<RedisMessage> children = msg.children();
ReferenceCountUtil.release(msg);
ReferenceCountUtil.release(children.get(1));
}
@Test(expected = IllegalReferenceCountException.class)
public void shouldErrorOnReleasecontentOfArrayChildReferenceCounted() throws Exception {
ByteBuf buf = Unpooled.buffer();
buf.writeBytes(byteBufOf("*2\r\n"));
buf.writeBytes(byteBufOf("$3\r\nFoo\r\n$3\r\nBar\r\n"));
assertTrue(channel.writeInbound(buf));
ArrayRedisMessage msg = channel.readInbound();
List<RedisMessage> children = msg.children();
ByteBuf childBuf = ((FullBulkStringRedisMessage) children.get(0)).content();
ReferenceCountUtil.release(msg);
ReferenceCountUtil.release(childBuf);
}
}

View File

@ -0,0 +1,184 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.redis;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
import static io.netty.handler.codec.redis.RedisCodecTestUtil.*;
/**
* Verifies the correct functionality of the {@link RedisEncoder}.
*/
public class RedisEncoderTest {
private EmbeddedChannel channel;
@Before
public void setup() throws Exception {
channel = new EmbeddedChannel(new RedisEncoder());
}
@After
public void teardown() throws Exception {
assertFalse(channel.finish());
}
@Test
public void shouldEncodeSimpleString() {
RedisMessage msg = new SimpleStringRedisMessage("simple");
boolean result = channel.writeOutbound(msg);
assertThat(result, is(true));
ByteBuf written = readAll(channel);
assertThat(bytesOf(written), is(bytesOf("+simple\r\n")));
written.release();
}
@Test
public void shouldEncodeError() {
RedisMessage msg = new ErrorRedisMessage("error1");
boolean result = channel.writeOutbound(msg);
assertThat(result, is(true));
ByteBuf written = readAll(channel);
assertThat(bytesOf(written), is(equalTo(bytesOf("-error1\r\n"))));
written.release();
}
@Test
public void shouldEncodeInteger() {
RedisMessage msg = new IntegerRedisMessage(1234L);
boolean result = channel.writeOutbound(msg);
assertThat(result, is(true));
ByteBuf written = readAll(channel);
assertThat(bytesOf(written), is(equalTo(bytesOf(":1234\r\n"))));
written.release();
}
@Test
public void shouldEncodeBulkStringContent() {
RedisMessage header = new BulkStringHeaderRedisMessage(16);
RedisMessage body1 = new DefaultBulkStringRedisContent(byteBufOf("bulk\nstr").retain());
RedisMessage body2 = new DefaultLastBulkStringRedisContent(byteBufOf("ing\ntest").retain());
assertThat(channel.writeOutbound(header), is(true));
assertThat(channel.writeOutbound(body1), is(true));
assertThat(channel.writeOutbound(body2), is(true));
ByteBuf written = readAll(channel);
assertThat(bytesOf(written), is(equalTo(bytesOf("$16\r\nbulk\nstring\ntest\r\n"))));
written.release();
}
@Test
public void shouldEncodeFullBulkString() {
ByteBuf bulkString = byteBufOf("bulk\nstring\ntest").retain();
int length = bulkString.readableBytes();
RedisMessage msg = new FullBulkStringRedisMessage(bulkString);
boolean result = channel.writeOutbound(msg);
assertThat(result, is(true));
ByteBuf written = readAll(channel);
assertThat(bytesOf(written), is(equalTo(bytesOf("$" + length + "\r\nbulk\nstring\ntest\r\n"))));
written.release();
}
@Test
public void shouldEncodeSimpleArray() {
List<RedisMessage> children = new ArrayList<RedisMessage>();
children.add(new FullBulkStringRedisMessage(byteBufOf("foo").retain()));
children.add(new FullBulkStringRedisMessage(byteBufOf("bar").retain()));
RedisMessage msg = new ArrayRedisMessage(children);
boolean result = channel.writeOutbound(msg);
assertThat(result, is(true));
ByteBuf written = readAll(channel);
assertThat(bytesOf(written), is(equalTo(bytesOf("*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"))));
written.release();
}
@Test
public void shouldEncodeNullArray() {
RedisMessage msg = ArrayRedisMessage.NULL_INSTANCE;
boolean result = channel.writeOutbound(msg);
assertThat(result, is(true));
ByteBuf written = readAll(channel);
assertThat(bytesOf(written), is(equalTo(bytesOf("*-1\r\n"))));
written.release();
}
@Test
public void shouldEncodeEmptyArray() {
RedisMessage msg = ArrayRedisMessage.EMPTY_INSTANCE;
boolean result = channel.writeOutbound(msg);
assertThat(result, is(true));
ByteBuf written = readAll(channel);
assertThat(bytesOf(written), is(equalTo(bytesOf("*0\r\n"))));
written.release();
}
@Test
public void shouldEncodeNestedArray() {
List<RedisMessage> grandChildren = new ArrayList<RedisMessage>();
grandChildren.add(new FullBulkStringRedisMessage(byteBufOf("bar")));
grandChildren.add(new IntegerRedisMessage(-1234L));
List<RedisMessage> children = new ArrayList<RedisMessage>();
children.add(new SimpleStringRedisMessage("foo"));
children.add(new ArrayRedisMessage(grandChildren));
RedisMessage msg = new ArrayRedisMessage(children);
boolean result = channel.writeOutbound(msg);
assertThat(result, is(true));
ByteBuf written = readAll(channel);
assertThat(bytesOf(written), is(equalTo(bytesOf("*2\r\n+foo\r\n*2\r\n$3\r\nbar\r\n:-1234\r\n"))));
written.release();
}
private static ByteBuf readAll(EmbeddedChannel channel) {
ByteBuf buf = Unpooled.buffer();
ByteBuf read;
while ((read = channel.readOutbound()) != null) {
buf.writeBytes(read);
}
return buf;
}
}

View File

@ -239,7 +239,7 @@
<module>codec-http2</module>
<module>codec-memcache</module>
<module>codec-mqtt</module>
<module>codec-smtp</module>
<module>codec-redis</module>
<module>codec-socks</module>
<module>codec-stomp</module>
<module>codec-xml</module>