diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/AbstractStringRedisMessage.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/AbstractStringRedisMessage.java index 880c4f325f..19be6c126b 100644 --- a/codec-redis/src/main/java/io/netty/handler/codec/redis/AbstractStringRedisMessage.java +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/AbstractStringRedisMessage.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.redis; import io.netty.util.internal.ObjectUtil; +import io.netty.util.internal.StringUtil; import io.netty.util.internal.UnstableApi; /** @@ -38,4 +39,14 @@ public abstract class AbstractStringRedisMessage implements RedisMessage { public final String content() { return content; } + + @Override + public String toString() { + return new StringBuilder(StringUtil.simpleClassName(this)) + .append('[') + .append("content=") + .append(content) + .append(']').toString(); + } + } diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/ArrayRedisMessage.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/ArrayRedisMessage.java index 484871de8f..3421d328be 100644 --- a/codec-redis/src/main/java/io/netty/handler/codec/redis/ArrayRedisMessage.java +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/ArrayRedisMessage.java @@ -137,10 +137,6 @@ public class ArrayRedisMessage extends AbstractReferenceCounted implements Redis * A predefined empty array instance for {@link ArrayRedisMessage}. */ public static final ArrayRedisMessage EMPTY_INSTANCE = new ArrayRedisMessage() { - @Override - public boolean isNull() { - return false; - } @Override public ArrayRedisMessage retain() { diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/ErrorRedisMessage.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/ErrorRedisMessage.java index 5546fc68ea..33a787e0ff 100644 --- a/codec-redis/src/main/java/io/netty/handler/codec/redis/ErrorRedisMessage.java +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/ErrorRedisMessage.java @@ -15,7 +15,6 @@ package io.netty.handler.codec.redis; -import io.netty.util.internal.StringUtil; import io.netty.util.internal.UnstableApi; /** @@ -33,12 +32,4 @@ public final class ErrorRedisMessage extends AbstractStringRedisMessage { super(content); } - @Override - public String toString() { - return new StringBuilder(StringUtil.simpleClassName(this)) - .append('[') - .append("content=") - .append(content()) - .append(']').toString(); - } } diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/InlineCommandRedisMessage.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/InlineCommandRedisMessage.java new file mode 100644 index 0000000000..b648973ba2 --- /dev/null +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/InlineCommandRedisMessage.java @@ -0,0 +1,35 @@ +/* + * Copyright 2018 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.redis; + +import io.netty.util.internal.UnstableApi; + +/** + * Inline commands of RESP. + */ +@UnstableApi +public final class InlineCommandRedisMessage extends AbstractStringRedisMessage { + + /** + * Creates a {@link InlineCommandRedisMessage} for the given {@code content}. + * + * @param content the message content, must not be {@code null}. + */ + public InlineCommandRedisMessage(String content) { + super(content); + } + +} diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisConstants.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisConstants.java index 9fae6f1232..ca68840ac1 100644 --- a/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisConstants.java +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisConstants.java @@ -33,6 +33,9 @@ final class RedisConstants { static final int REDIS_MESSAGE_MAX_LENGTH = 512 * 1024 * 1024; // 512MB + // 64KB is max inline length of current Redis server implementation. + static final int REDIS_INLINE_MESSAGE_MAX_LENGTH = 64 * 1024; + static final int POSITIVE_LONG_MAX_LENGTH = 19; // length of Long.MAX_VALUE static final int LONG_MAX_LENGTH = POSITIVE_LONG_MAX_LENGTH + 1; // +1 is sign diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java index 4c70444efa..ad5e68d29e 100644 --- a/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java @@ -36,6 +36,7 @@ public final class RedisDecoder extends ByteToMessageDecoder { private final ToPositiveLongProcessor toPositiveLongProcessor = new ToPositiveLongProcessor(); + private final boolean decodeInlineCommands; private final int maxInlineMessageLength; private final RedisMessagePool messagePool; @@ -53,25 +54,44 @@ public final class RedisDecoder extends ByteToMessageDecoder { } /** - * Creates a new instance with default {@code maxInlineMessageLength} and {@code messagePool}. + * Creates a new instance with default {@code maxInlineMessageLength} and {@code messagePool} + * and inline command decoding disabled. */ public RedisDecoder() { - // 1024 * 64 is max inline length of current Redis server implementation. - this(1024 * 64, FixedRedisMessagePool.INSTANCE); + this(false); + } + + /** + * Creates a new instance with default {@code maxInlineMessageLength} and {@code messagePool}. + * @param decodeInlineCommands if {@code true}, inline commands will be decoded. + */ + public RedisDecoder(boolean decodeInlineCommands) { + this(RedisConstants.REDIS_INLINE_MESSAGE_MAX_LENGTH, FixedRedisMessagePool.INSTANCE, decodeInlineCommands); + } + + /** + * Creates a new instance with inline command decoding disabled. + * @param maxInlineMessageLength the maximum length of inline message. + * @param messagePool the predefined message pool. + */ + public RedisDecoder(int maxInlineMessageLength, RedisMessagePool messagePool) { + this(maxInlineMessageLength, messagePool, false); } /** * Creates a new instance. * @param maxInlineMessageLength the maximum length of inline message. * @param messagePool the predefined message pool. + * @param decodeInlineCommands if {@code true}, inline commands will be decoded. */ - public RedisDecoder(int maxInlineMessageLength, RedisMessagePool messagePool) { + public RedisDecoder(int maxInlineMessageLength, RedisMessagePool messagePool, boolean decodeInlineCommands) { if (maxInlineMessageLength <= 0 || maxInlineMessageLength > RedisConstants.REDIS_MESSAGE_MAX_LENGTH) { throw new RedisCodecException("maxInlineMessageLength: " + maxInlineMessageLength + " (expected: <= " + RedisConstants.REDIS_MESSAGE_MAX_LENGTH + ")"); } this.maxInlineMessageLength = maxInlineMessageLength; this.messagePool = messagePool; + this.decodeInlineCommands = decodeInlineCommands; } @Override @@ -126,7 +146,8 @@ public final class RedisDecoder extends ByteToMessageDecoder { if (!in.isReadable()) { return false; } - type = RedisMessageType.valueOf(in.readByte()); + + type = RedisMessageType.readFrom(in, decodeInlineCommands); state = type.isInline() ? State.DECODE_INLINE : State.DECODE_LENGTH; return true; } @@ -233,6 +254,8 @@ public final class RedisDecoder extends ByteToMessageDecoder { private RedisMessage newInlineRedisMessage(RedisMessageType messageType, ByteBuf content) { switch (messageType) { + case INLINE_COMMAND: + return new InlineCommandRedisMessage(content.toString(CharsetUtil.UTF_8)); case SIMPLE_STRING: { SimpleStringRedisMessage cached = messagePool.getSimpleString(content); return cached != null ? cached : new SimpleStringRedisMessage(content.toString(CharsetUtil.UTF_8)); diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisEncoder.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisEncoder.java index 45fad5e511..9a72f451c9 100644 --- a/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisEncoder.java +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisEncoder.java @@ -62,7 +62,9 @@ public class RedisEncoder extends MessageToMessageEncoder { } private void writeRedisMessage(ByteBufAllocator allocator, RedisMessage msg, List out) { - if (msg instanceof SimpleStringRedisMessage) { + if (msg instanceof InlineCommandRedisMessage) { + writeInlineCommandMessage(allocator, (InlineCommandRedisMessage) msg, out); + } else if (msg instanceof SimpleStringRedisMessage) { writeSimpleStringMessage(allocator, (SimpleStringRedisMessage) msg, out); } else if (msg instanceof ErrorRedisMessage) { writeErrorMessage(allocator, (ErrorRedisMessage) msg, out); @@ -83,19 +85,25 @@ public class RedisEncoder extends MessageToMessageEncoder { } } + private static void writeInlineCommandMessage(ByteBufAllocator allocator, InlineCommandRedisMessage msg, + List out) { + writeString(allocator, RedisMessageType.INLINE_COMMAND, msg.content(), out); + } + private static void writeSimpleStringMessage(ByteBufAllocator allocator, SimpleStringRedisMessage msg, List out) { - writeString(allocator, RedisMessageType.SIMPLE_STRING.value(), msg.content(), out); + writeString(allocator, RedisMessageType.SIMPLE_STRING, msg.content(), out); } private static void writeErrorMessage(ByteBufAllocator allocator, ErrorRedisMessage msg, List out) { - writeString(allocator, RedisMessageType.ERROR.value(), msg.content(), out); + writeString(allocator, RedisMessageType.ERROR, msg.content(), out); } - private static void writeString(ByteBufAllocator allocator, byte type, String content, List out) { - ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + ByteBufUtil.utf8MaxBytes(content) + + private static void writeString(ByteBufAllocator allocator, RedisMessageType type, String content, + List out) { + ByteBuf buf = allocator.ioBuffer(type.length() + ByteBufUtil.utf8MaxBytes(content) + RedisConstants.EOL_LENGTH); - buf.writeByte(type); + type.writeTo(buf); ByteBufUtil.writeUtf8(buf, content); buf.writeShort(RedisConstants.EOL_SHORT); out.add(buf); @@ -104,7 +112,7 @@ public class RedisEncoder extends MessageToMessageEncoder { private void writeIntegerMessage(ByteBufAllocator allocator, IntegerRedisMessage msg, List out) { ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.LONG_MAX_LENGTH + RedisConstants.EOL_LENGTH); - buf.writeByte(RedisMessageType.INTEGER.value()); + RedisMessageType.INTEGER.writeTo(buf); buf.writeBytes(numberToBytes(msg.value())); buf.writeShort(RedisConstants.EOL_SHORT); out.add(buf); @@ -114,7 +122,7 @@ public class RedisEncoder extends MessageToMessageEncoder { final ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + (msg.isNull() ? RedisConstants.NULL_LENGTH : RedisConstants.LONG_MAX_LENGTH + RedisConstants.EOL_LENGTH)); - buf.writeByte(RedisMessageType.BULK_STRING.value()); + RedisMessageType.BULK_STRING.writeTo(buf); if (msg.isNull()) { buf.writeShort(RedisConstants.NULL_SHORT); } else { @@ -137,14 +145,14 @@ public class RedisEncoder extends MessageToMessageEncoder { if (msg.isNull()) { ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.NULL_LENGTH + RedisConstants.EOL_LENGTH); - buf.writeByte(RedisMessageType.BULK_STRING.value()); + RedisMessageType.BULK_STRING.writeTo(buf); buf.writeShort(RedisConstants.NULL_SHORT); buf.writeShort(RedisConstants.EOL_SHORT); out.add(buf); } else { ByteBuf headerBuf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.LONG_MAX_LENGTH + RedisConstants.EOL_LENGTH); - headerBuf.writeByte(RedisMessageType.BULK_STRING.value()); + RedisMessageType.BULK_STRING.writeTo(headerBuf); headerBuf.writeBytes(numberToBytes(msg.content().readableBytes())); headerBuf.writeShort(RedisConstants.EOL_SHORT); out.add(headerBuf); @@ -178,14 +186,14 @@ public class RedisEncoder extends MessageToMessageEncoder { if (isNull) { final ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.NULL_LENGTH + RedisConstants.EOL_LENGTH); - buf.writeByte(RedisMessageType.ARRAY_HEADER.value()); + RedisMessageType.ARRAY_HEADER.writeTo(buf); buf.writeShort(RedisConstants.NULL_SHORT); buf.writeShort(RedisConstants.EOL_SHORT); out.add(buf); } else { final ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.LONG_MAX_LENGTH + RedisConstants.EOL_LENGTH); - buf.writeByte(RedisMessageType.ARRAY_HEADER.value()); + RedisMessageType.ARRAY_HEADER.writeTo(buf); buf.writeBytes(numberToBytes(length)); buf.writeShort(RedisConstants.EOL_SHORT); out.add(buf); diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisMessageType.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisMessageType.java index b325f060f9..c558a410ed 100644 --- a/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisMessageType.java +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisMessageType.java @@ -15,6 +15,7 @@ package io.netty.handler.codec.redis; +import io.netty.buffer.ByteBuf; import io.netty.util.internal.UnstableApi; /** @@ -23,26 +24,26 @@ import io.netty.util.internal.UnstableApi; @UnstableApi public enum RedisMessageType { + INLINE_COMMAND(null, true), SIMPLE_STRING((byte) '+', true), ERROR((byte) '-', true), INTEGER((byte) ':', true), BULK_STRING((byte) '$', false), - ARRAY_HEADER((byte) '*', false), - ARRAY((byte) '*', false); // for aggregated + ARRAY_HEADER((byte) '*', false); - private final byte value; + private final Byte value; private final boolean inline; - RedisMessageType(byte value, boolean inline) { + RedisMessageType(Byte value, boolean inline) { this.value = value; this.inline = inline; } /** - * Returns prefix {@code byte} for this type. + * Returns length of this type. */ - public byte value() { - return value; + public int length() { + return value != null ? RedisConstants.TYPE_LENGTH : 0; } /** @@ -54,9 +55,32 @@ public enum RedisMessageType { } /** - * Return {@link RedisMessageType} for this type prefix {@code byte}. + * Determine {@link RedisMessageType} based on the type prefix {@code byte} read from given the buffer. */ - public static RedisMessageType valueOf(byte value) { + public static RedisMessageType readFrom(ByteBuf in, boolean decodeInlineCommands) { + final int initialIndex = in.readerIndex(); + final RedisMessageType type = valueOf(in.readByte()); + if (type == INLINE_COMMAND) { + if (!decodeInlineCommands) { + throw new RedisCodecException("Decoding of inline commands is disabled"); + } + // reset index to make content readable again + in.readerIndex(initialIndex); + } + return type; + } + + /** + * Write the message type's prefix to the given buffer. + */ + public void writeTo(ByteBuf out) { + if (value == null) { + return; + } + out.writeByte(value.byteValue()); + } + + private static RedisMessageType valueOf(byte value) { switch (value) { case '+': return SIMPLE_STRING; @@ -69,7 +93,7 @@ public enum RedisMessageType { case '*': return ARRAY_HEADER; default: - throw new RedisCodecException("Unknown RedisMessageType: " + value); + return INLINE_COMMAND; } } } diff --git a/codec-redis/src/main/java/io/netty/handler/codec/redis/SimpleStringRedisMessage.java b/codec-redis/src/main/java/io/netty/handler/codec/redis/SimpleStringRedisMessage.java index 1179b5deef..780999a660 100644 --- a/codec-redis/src/main/java/io/netty/handler/codec/redis/SimpleStringRedisMessage.java +++ b/codec-redis/src/main/java/io/netty/handler/codec/redis/SimpleStringRedisMessage.java @@ -15,7 +15,6 @@ package io.netty.handler.codec.redis; -import io.netty.util.internal.StringUtil; import io.netty.util.internal.UnstableApi; /** @@ -33,12 +32,4 @@ public final class SimpleStringRedisMessage extends AbstractStringRedisMessage { super(content); } - @Override - public String toString() { - return new StringBuilder(StringUtil.simpleClassName(this)) - .append('[') - .append("content=") - .append(content()) - .append(']').toString(); - } } diff --git a/codec-redis/src/test/java/io/netty/handler/codec/redis/RedisDecoderTest.java b/codec-redis/src/test/java/io/netty/handler/codec/redis/RedisDecoderTest.java index d7738aa2c7..6d9bd19de1 100644 --- a/codec-redis/src/test/java/io/netty/handler/codec/redis/RedisDecoderTest.java +++ b/codec-redis/src/test/java/io/netty/handler/codec/redis/RedisDecoderTest.java @@ -19,6 +19,7 @@ package io.netty.handler.codec.redis; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.DecoderException; import io.netty.util.IllegalReferenceCountException; import io.netty.util.ReferenceCountUtil; import org.junit.After; @@ -27,15 +28,9 @@ import org.junit.Test; import java.util.List; -import static io.netty.handler.codec.redis.RedisCodecTestUtil.byteBufOf; -import static io.netty.handler.codec.redis.RedisCodecTestUtil.bytesOf; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.nullValue; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; +import static io.netty.handler.codec.redis.RedisCodecTestUtil.*; +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; /** * Verifies the correct functionality of the {@link RedisDecoder} and {@link RedisArrayAggregator}. @@ -46,8 +41,12 @@ public class RedisDecoderTest { @Before public void setup() throws Exception { - channel = new EmbeddedChannel( - new RedisDecoder(), + channel = newChannel(false); + } + + private static EmbeddedChannel newChannel(boolean decodeInlineCommands) { + return new EmbeddedChannel( + new RedisDecoder(decodeInlineCommands), new RedisBulkStringAggregator(), new RedisArrayAggregator()); } @@ -67,6 +66,34 @@ public class RedisDecoderTest { ReferenceCountUtil.release(msg); } + @Test(expected = DecoderException.class) + public void shouldNotDecodeInlineCommandByDefault() { + assertFalse(channel.writeInbound(byteBufOf("P"))); + assertFalse(channel.writeInbound(byteBufOf("I"))); + assertFalse(channel.writeInbound(byteBufOf("N"))); + assertFalse(channel.writeInbound(byteBufOf("G"))); + assertTrue(channel.writeInbound(byteBufOf("\r\n"))); + + channel.readInbound(); + } + + @Test + public void shouldDecodeInlineCommand() { + channel = newChannel(true); + + assertFalse(channel.writeInbound(byteBufOf("P"))); + assertFalse(channel.writeInbound(byteBufOf("I"))); + assertFalse(channel.writeInbound(byteBufOf("N"))); + assertFalse(channel.writeInbound(byteBufOf("G"))); + assertTrue(channel.writeInbound(byteBufOf("\r\n"))); + + InlineCommandRedisMessage msg = channel.readInbound(); + + assertThat(msg.content(), is("PING")); + + ReferenceCountUtil.release(msg); + } + @Test public void shouldDecodeSimpleString() { assertFalse(channel.writeInbound(byteBufOf("+"))); diff --git a/codec-redis/src/test/java/io/netty/handler/codec/redis/RedisEncoderTest.java b/codec-redis/src/test/java/io/netty/handler/codec/redis/RedisEncoderTest.java index d880db2e7b..926b62116a 100644 --- a/codec-redis/src/test/java/io/netty/handler/codec/redis/RedisEncoderTest.java +++ b/codec-redis/src/test/java/io/netty/handler/codec/redis/RedisEncoderTest.java @@ -48,6 +48,18 @@ public class RedisEncoderTest { assertFalse(channel.finish()); } + @Test + public void shouldEncodeInlineCommand() { + RedisMessage msg = new InlineCommandRedisMessage("ping"); + + boolean result = channel.writeOutbound(msg); + assertThat(result, is(true)); + + ByteBuf written = readAll(channel); + assertThat(bytesOf(written), is(bytesOf("ping\r\n"))); + written.release(); + } + @Test public void shouldEncodeSimpleString() { RedisMessage msg = new SimpleStringRedisMessage("simple");