Support Redis inline commands

Motivation:
The RESP protocol implementation lacked inline command
support.

Modifications:
Added logic to decode and encode inline commands.

Result:
Inline commands are supported. Fixes #7686.
This commit is contained in:
Marian Seitner 2018-02-04 01:50:27 +01:00 committed by Norman Maurer
parent 2e92a2f5cd
commit c75bc1f25b
11 changed files with 181 additions and 60 deletions

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.redis; package io.netty.handler.codec.redis;
import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.UnstableApi; import io.netty.util.internal.UnstableApi;
/** /**
@ -38,4 +39,14 @@ public abstract class AbstractStringRedisMessage implements RedisMessage {
public final String content() { public final String content() {
return content; return content;
} }
@Override
public String toString() {
return new StringBuilder(StringUtil.simpleClassName(this))
.append('[')
.append("content=")
.append(content)
.append(']').toString();
}
} }

View File

@ -137,10 +137,6 @@ public class ArrayRedisMessage extends AbstractReferenceCounted implements Redis
* A predefined empty array instance for {@link ArrayRedisMessage}. * A predefined empty array instance for {@link ArrayRedisMessage}.
*/ */
public static final ArrayRedisMessage EMPTY_INSTANCE = new ArrayRedisMessage() { public static final ArrayRedisMessage EMPTY_INSTANCE = new ArrayRedisMessage() {
@Override
public boolean isNull() {
return false;
}
@Override @Override
public ArrayRedisMessage retain() { public ArrayRedisMessage retain() {

View File

@ -15,7 +15,6 @@
package io.netty.handler.codec.redis; package io.netty.handler.codec.redis;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.UnstableApi; import io.netty.util.internal.UnstableApi;
/** /**
@ -33,12 +32,4 @@ public final class ErrorRedisMessage extends AbstractStringRedisMessage {
super(content); super(content);
} }
@Override
public String toString() {
return new StringBuilder(StringUtil.simpleClassName(this))
.append('[')
.append("content=")
.append(content())
.append(']').toString();
}
} }

View File

@ -0,0 +1,35 @@
/*
* Copyright 2018 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.redis;
import io.netty.util.internal.UnstableApi;
/**
* Inline commands of <a href="http://redis.io/topics/protocol">RESP</a>.
*/
@UnstableApi
public final class InlineCommandRedisMessage extends AbstractStringRedisMessage {
/**
* Creates a {@link InlineCommandRedisMessage} for the given {@code content}.
*
* @param content the message content, must not be {@code null}.
*/
public InlineCommandRedisMessage(String content) {
super(content);
}
}

View File

@ -33,6 +33,9 @@ final class RedisConstants {
static final int REDIS_MESSAGE_MAX_LENGTH = 512 * 1024 * 1024; // 512MB static final int REDIS_MESSAGE_MAX_LENGTH = 512 * 1024 * 1024; // 512MB
// 64KB is max inline length of current Redis server implementation.
static final int REDIS_INLINE_MESSAGE_MAX_LENGTH = 64 * 1024;
static final int POSITIVE_LONG_MAX_LENGTH = 19; // length of Long.MAX_VALUE static final int POSITIVE_LONG_MAX_LENGTH = 19; // length of Long.MAX_VALUE
static final int LONG_MAX_LENGTH = POSITIVE_LONG_MAX_LENGTH + 1; // +1 is sign static final int LONG_MAX_LENGTH = POSITIVE_LONG_MAX_LENGTH + 1; // +1 is sign

View File

@ -36,6 +36,7 @@ public final class RedisDecoder extends ByteToMessageDecoder {
private final ToPositiveLongProcessor toPositiveLongProcessor = new ToPositiveLongProcessor(); private final ToPositiveLongProcessor toPositiveLongProcessor = new ToPositiveLongProcessor();
private final boolean decodeInlineCommands;
private final int maxInlineMessageLength; private final int maxInlineMessageLength;
private final RedisMessagePool messagePool; private final RedisMessagePool messagePool;
@ -53,25 +54,44 @@ public final class RedisDecoder extends ByteToMessageDecoder {
} }
/** /**
* Creates a new instance with default {@code maxInlineMessageLength} and {@code messagePool}. * Creates a new instance with default {@code maxInlineMessageLength} and {@code messagePool}
* and inline command decoding disabled.
*/ */
public RedisDecoder() { public RedisDecoder() {
// 1024 * 64 is max inline length of current Redis server implementation. this(false);
this(1024 * 64, FixedRedisMessagePool.INSTANCE); }
/**
* Creates a new instance with default {@code maxInlineMessageLength} and {@code messagePool}.
* @param decodeInlineCommands if {@code true}, inline commands will be decoded.
*/
public RedisDecoder(boolean decodeInlineCommands) {
this(RedisConstants.REDIS_INLINE_MESSAGE_MAX_LENGTH, FixedRedisMessagePool.INSTANCE, decodeInlineCommands);
}
/**
* Creates a new instance with inline command decoding disabled.
* @param maxInlineMessageLength the maximum length of inline message.
* @param messagePool the predefined message pool.
*/
public RedisDecoder(int maxInlineMessageLength, RedisMessagePool messagePool) {
this(maxInlineMessageLength, messagePool, false);
} }
/** /**
* Creates a new instance. * Creates a new instance.
* @param maxInlineMessageLength the maximum length of inline message. * @param maxInlineMessageLength the maximum length of inline message.
* @param messagePool the predefined message pool. * @param messagePool the predefined message pool.
* @param decodeInlineCommands if {@code true}, inline commands will be decoded.
*/ */
public RedisDecoder(int maxInlineMessageLength, RedisMessagePool messagePool) { public RedisDecoder(int maxInlineMessageLength, RedisMessagePool messagePool, boolean decodeInlineCommands) {
if (maxInlineMessageLength <= 0 || maxInlineMessageLength > RedisConstants.REDIS_MESSAGE_MAX_LENGTH) { if (maxInlineMessageLength <= 0 || maxInlineMessageLength > RedisConstants.REDIS_MESSAGE_MAX_LENGTH) {
throw new RedisCodecException("maxInlineMessageLength: " + maxInlineMessageLength + throw new RedisCodecException("maxInlineMessageLength: " + maxInlineMessageLength +
" (expected: <= " + RedisConstants.REDIS_MESSAGE_MAX_LENGTH + ")"); " (expected: <= " + RedisConstants.REDIS_MESSAGE_MAX_LENGTH + ")");
} }
this.maxInlineMessageLength = maxInlineMessageLength; this.maxInlineMessageLength = maxInlineMessageLength;
this.messagePool = messagePool; this.messagePool = messagePool;
this.decodeInlineCommands = decodeInlineCommands;
} }
@Override @Override
@ -126,7 +146,8 @@ public final class RedisDecoder extends ByteToMessageDecoder {
if (!in.isReadable()) { if (!in.isReadable()) {
return false; return false;
} }
type = RedisMessageType.valueOf(in.readByte());
type = RedisMessageType.readFrom(in, decodeInlineCommands);
state = type.isInline() ? State.DECODE_INLINE : State.DECODE_LENGTH; state = type.isInline() ? State.DECODE_INLINE : State.DECODE_LENGTH;
return true; return true;
} }
@ -233,6 +254,8 @@ public final class RedisDecoder extends ByteToMessageDecoder {
private RedisMessage newInlineRedisMessage(RedisMessageType messageType, ByteBuf content) { private RedisMessage newInlineRedisMessage(RedisMessageType messageType, ByteBuf content) {
switch (messageType) { switch (messageType) {
case INLINE_COMMAND:
return new InlineCommandRedisMessage(content.toString(CharsetUtil.UTF_8));
case SIMPLE_STRING: { case SIMPLE_STRING: {
SimpleStringRedisMessage cached = messagePool.getSimpleString(content); SimpleStringRedisMessage cached = messagePool.getSimpleString(content);
return cached != null ? cached : new SimpleStringRedisMessage(content.toString(CharsetUtil.UTF_8)); return cached != null ? cached : new SimpleStringRedisMessage(content.toString(CharsetUtil.UTF_8));

View File

@ -62,7 +62,9 @@ public class RedisEncoder extends MessageToMessageEncoder<RedisMessage> {
} }
private void writeRedisMessage(ByteBufAllocator allocator, RedisMessage msg, List<Object> out) { private void writeRedisMessage(ByteBufAllocator allocator, RedisMessage msg, List<Object> out) {
if (msg instanceof SimpleStringRedisMessage) { if (msg instanceof InlineCommandRedisMessage) {
writeInlineCommandMessage(allocator, (InlineCommandRedisMessage) msg, out);
} else if (msg instanceof SimpleStringRedisMessage) {
writeSimpleStringMessage(allocator, (SimpleStringRedisMessage) msg, out); writeSimpleStringMessage(allocator, (SimpleStringRedisMessage) msg, out);
} else if (msg instanceof ErrorRedisMessage) { } else if (msg instanceof ErrorRedisMessage) {
writeErrorMessage(allocator, (ErrorRedisMessage) msg, out); writeErrorMessage(allocator, (ErrorRedisMessage) msg, out);
@ -83,19 +85,25 @@ public class RedisEncoder extends MessageToMessageEncoder<RedisMessage> {
} }
} }
private static void writeInlineCommandMessage(ByteBufAllocator allocator, InlineCommandRedisMessage msg,
List<Object> out) {
writeString(allocator, RedisMessageType.INLINE_COMMAND, msg.content(), out);
}
private static void writeSimpleStringMessage(ByteBufAllocator allocator, SimpleStringRedisMessage msg, private static void writeSimpleStringMessage(ByteBufAllocator allocator, SimpleStringRedisMessage msg,
List<Object> out) { List<Object> out) {
writeString(allocator, RedisMessageType.SIMPLE_STRING.value(), msg.content(), out); writeString(allocator, RedisMessageType.SIMPLE_STRING, msg.content(), out);
} }
private static void writeErrorMessage(ByteBufAllocator allocator, ErrorRedisMessage msg, List<Object> out) { private static void writeErrorMessage(ByteBufAllocator allocator, ErrorRedisMessage msg, List<Object> out) {
writeString(allocator, RedisMessageType.ERROR.value(), msg.content(), out); writeString(allocator, RedisMessageType.ERROR, msg.content(), out);
} }
private static void writeString(ByteBufAllocator allocator, byte type, String content, List<Object> out) { private static void writeString(ByteBufAllocator allocator, RedisMessageType type, String content,
ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + ByteBufUtil.utf8MaxBytes(content) + List<Object> out) {
ByteBuf buf = allocator.ioBuffer(type.length() + ByteBufUtil.utf8MaxBytes(content) +
RedisConstants.EOL_LENGTH); RedisConstants.EOL_LENGTH);
buf.writeByte(type); type.writeTo(buf);
ByteBufUtil.writeUtf8(buf, content); ByteBufUtil.writeUtf8(buf, content);
buf.writeShort(RedisConstants.EOL_SHORT); buf.writeShort(RedisConstants.EOL_SHORT);
out.add(buf); out.add(buf);
@ -104,7 +112,7 @@ public class RedisEncoder extends MessageToMessageEncoder<RedisMessage> {
private void writeIntegerMessage(ByteBufAllocator allocator, IntegerRedisMessage msg, List<Object> out) { private void writeIntegerMessage(ByteBufAllocator allocator, IntegerRedisMessage msg, List<Object> out) {
ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.LONG_MAX_LENGTH + ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.LONG_MAX_LENGTH +
RedisConstants.EOL_LENGTH); RedisConstants.EOL_LENGTH);
buf.writeByte(RedisMessageType.INTEGER.value()); RedisMessageType.INTEGER.writeTo(buf);
buf.writeBytes(numberToBytes(msg.value())); buf.writeBytes(numberToBytes(msg.value()));
buf.writeShort(RedisConstants.EOL_SHORT); buf.writeShort(RedisConstants.EOL_SHORT);
out.add(buf); out.add(buf);
@ -114,7 +122,7 @@ public class RedisEncoder extends MessageToMessageEncoder<RedisMessage> {
final ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + final ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH +
(msg.isNull() ? RedisConstants.NULL_LENGTH : (msg.isNull() ? RedisConstants.NULL_LENGTH :
RedisConstants.LONG_MAX_LENGTH + RedisConstants.EOL_LENGTH)); RedisConstants.LONG_MAX_LENGTH + RedisConstants.EOL_LENGTH));
buf.writeByte(RedisMessageType.BULK_STRING.value()); RedisMessageType.BULK_STRING.writeTo(buf);
if (msg.isNull()) { if (msg.isNull()) {
buf.writeShort(RedisConstants.NULL_SHORT); buf.writeShort(RedisConstants.NULL_SHORT);
} else { } else {
@ -137,14 +145,14 @@ public class RedisEncoder extends MessageToMessageEncoder<RedisMessage> {
if (msg.isNull()) { if (msg.isNull()) {
ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.NULL_LENGTH + ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.NULL_LENGTH +
RedisConstants.EOL_LENGTH); RedisConstants.EOL_LENGTH);
buf.writeByte(RedisMessageType.BULK_STRING.value()); RedisMessageType.BULK_STRING.writeTo(buf);
buf.writeShort(RedisConstants.NULL_SHORT); buf.writeShort(RedisConstants.NULL_SHORT);
buf.writeShort(RedisConstants.EOL_SHORT); buf.writeShort(RedisConstants.EOL_SHORT);
out.add(buf); out.add(buf);
} else { } else {
ByteBuf headerBuf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.LONG_MAX_LENGTH + ByteBuf headerBuf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.LONG_MAX_LENGTH +
RedisConstants.EOL_LENGTH); RedisConstants.EOL_LENGTH);
headerBuf.writeByte(RedisMessageType.BULK_STRING.value()); RedisMessageType.BULK_STRING.writeTo(headerBuf);
headerBuf.writeBytes(numberToBytes(msg.content().readableBytes())); headerBuf.writeBytes(numberToBytes(msg.content().readableBytes()));
headerBuf.writeShort(RedisConstants.EOL_SHORT); headerBuf.writeShort(RedisConstants.EOL_SHORT);
out.add(headerBuf); out.add(headerBuf);
@ -178,14 +186,14 @@ public class RedisEncoder extends MessageToMessageEncoder<RedisMessage> {
if (isNull) { if (isNull) {
final ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.NULL_LENGTH + final ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.NULL_LENGTH +
RedisConstants.EOL_LENGTH); RedisConstants.EOL_LENGTH);
buf.writeByte(RedisMessageType.ARRAY_HEADER.value()); RedisMessageType.ARRAY_HEADER.writeTo(buf);
buf.writeShort(RedisConstants.NULL_SHORT); buf.writeShort(RedisConstants.NULL_SHORT);
buf.writeShort(RedisConstants.EOL_SHORT); buf.writeShort(RedisConstants.EOL_SHORT);
out.add(buf); out.add(buf);
} else { } else {
final ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.LONG_MAX_LENGTH + final ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.LONG_MAX_LENGTH +
RedisConstants.EOL_LENGTH); RedisConstants.EOL_LENGTH);
buf.writeByte(RedisMessageType.ARRAY_HEADER.value()); RedisMessageType.ARRAY_HEADER.writeTo(buf);
buf.writeBytes(numberToBytes(length)); buf.writeBytes(numberToBytes(length));
buf.writeShort(RedisConstants.EOL_SHORT); buf.writeShort(RedisConstants.EOL_SHORT);
out.add(buf); out.add(buf);

View File

@ -15,6 +15,7 @@
package io.netty.handler.codec.redis; package io.netty.handler.codec.redis;
import io.netty.buffer.ByteBuf;
import io.netty.util.internal.UnstableApi; import io.netty.util.internal.UnstableApi;
/** /**
@ -23,26 +24,26 @@ import io.netty.util.internal.UnstableApi;
@UnstableApi @UnstableApi
public enum RedisMessageType { public enum RedisMessageType {
INLINE_COMMAND(null, true),
SIMPLE_STRING((byte) '+', true), SIMPLE_STRING((byte) '+', true),
ERROR((byte) '-', true), ERROR((byte) '-', true),
INTEGER((byte) ':', true), INTEGER((byte) ':', true),
BULK_STRING((byte) '$', false), BULK_STRING((byte) '$', false),
ARRAY_HEADER((byte) '*', false), ARRAY_HEADER((byte) '*', false);
ARRAY((byte) '*', false); // for aggregated
private final byte value; private final Byte value;
private final boolean inline; private final boolean inline;
RedisMessageType(byte value, boolean inline) { RedisMessageType(Byte value, boolean inline) {
this.value = value; this.value = value;
this.inline = inline; this.inline = inline;
} }
/** /**
* Returns prefix {@code byte} for this type. * Returns length of this type.
*/ */
public byte value() { public int length() {
return value; return value != null ? RedisConstants.TYPE_LENGTH : 0;
} }
/** /**
@ -54,9 +55,32 @@ public enum RedisMessageType {
} }
/** /**
* Return {@link RedisMessageType} for this type prefix {@code byte}. * Determine {@link RedisMessageType} based on the type prefix {@code byte} read from given the buffer.
*/ */
public static RedisMessageType valueOf(byte value) { public static RedisMessageType readFrom(ByteBuf in, boolean decodeInlineCommands) {
final int initialIndex = in.readerIndex();
final RedisMessageType type = valueOf(in.readByte());
if (type == INLINE_COMMAND) {
if (!decodeInlineCommands) {
throw new RedisCodecException("Decoding of inline commands is disabled");
}
// reset index to make content readable again
in.readerIndex(initialIndex);
}
return type;
}
/**
* Write the message type's prefix to the given buffer.
*/
public void writeTo(ByteBuf out) {
if (value == null) {
return;
}
out.writeByte(value.byteValue());
}
private static RedisMessageType valueOf(byte value) {
switch (value) { switch (value) {
case '+': case '+':
return SIMPLE_STRING; return SIMPLE_STRING;
@ -69,7 +93,7 @@ public enum RedisMessageType {
case '*': case '*':
return ARRAY_HEADER; return ARRAY_HEADER;
default: default:
throw new RedisCodecException("Unknown RedisMessageType: " + value); return INLINE_COMMAND;
} }
} }
} }

View File

@ -15,7 +15,6 @@
package io.netty.handler.codec.redis; package io.netty.handler.codec.redis;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.UnstableApi; import io.netty.util.internal.UnstableApi;
/** /**
@ -33,12 +32,4 @@ public final class SimpleStringRedisMessage extends AbstractStringRedisMessage {
super(content); super(content);
} }
@Override
public String toString() {
return new StringBuilder(StringUtil.simpleClassName(this))
.append('[')
.append("content=")
.append(content())
.append(']').toString();
}
} }

View File

@ -19,6 +19,7 @@ package io.netty.handler.codec.redis;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.DecoderException;
import io.netty.util.IllegalReferenceCountException; import io.netty.util.IllegalReferenceCountException;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import org.junit.After; import org.junit.After;
@ -27,15 +28,9 @@ import org.junit.Test;
import java.util.List; import java.util.List;
import static io.netty.handler.codec.redis.RedisCodecTestUtil.byteBufOf; import static io.netty.handler.codec.redis.RedisCodecTestUtil.*;
import static io.netty.handler.codec.redis.RedisCodecTestUtil.bytesOf; import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.*;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/** /**
* Verifies the correct functionality of the {@link RedisDecoder} and {@link RedisArrayAggregator}. * Verifies the correct functionality of the {@link RedisDecoder} and {@link RedisArrayAggregator}.
@ -46,8 +41,12 @@ public class RedisDecoderTest {
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
channel = new EmbeddedChannel( channel = newChannel(false);
new RedisDecoder(), }
private static EmbeddedChannel newChannel(boolean decodeInlineCommands) {
return new EmbeddedChannel(
new RedisDecoder(decodeInlineCommands),
new RedisBulkStringAggregator(), new RedisBulkStringAggregator(),
new RedisArrayAggregator()); new RedisArrayAggregator());
} }
@ -67,6 +66,34 @@ public class RedisDecoderTest {
ReferenceCountUtil.release(msg); ReferenceCountUtil.release(msg);
} }
@Test(expected = DecoderException.class)
public void shouldNotDecodeInlineCommandByDefault() {
assertFalse(channel.writeInbound(byteBufOf("P")));
assertFalse(channel.writeInbound(byteBufOf("I")));
assertFalse(channel.writeInbound(byteBufOf("N")));
assertFalse(channel.writeInbound(byteBufOf("G")));
assertTrue(channel.writeInbound(byteBufOf("\r\n")));
channel.readInbound();
}
@Test
public void shouldDecodeInlineCommand() {
channel = newChannel(true);
assertFalse(channel.writeInbound(byteBufOf("P")));
assertFalse(channel.writeInbound(byteBufOf("I")));
assertFalse(channel.writeInbound(byteBufOf("N")));
assertFalse(channel.writeInbound(byteBufOf("G")));
assertTrue(channel.writeInbound(byteBufOf("\r\n")));
InlineCommandRedisMessage msg = channel.readInbound();
assertThat(msg.content(), is("PING"));
ReferenceCountUtil.release(msg);
}
@Test @Test
public void shouldDecodeSimpleString() { public void shouldDecodeSimpleString() {
assertFalse(channel.writeInbound(byteBufOf("+"))); assertFalse(channel.writeInbound(byteBufOf("+")));

View File

@ -48,6 +48,18 @@ public class RedisEncoderTest {
assertFalse(channel.finish()); assertFalse(channel.finish());
} }
@Test
public void shouldEncodeInlineCommand() {
RedisMessage msg = new InlineCommandRedisMessage("ping");
boolean result = channel.writeOutbound(msg);
assertThat(result, is(true));
ByteBuf written = readAll(channel);
assertThat(bytesOf(written), is(bytesOf("ping\r\n")));
written.release();
}
@Test @Test
public void shouldEncodeSimpleString() { public void shouldEncodeSimpleString() {
RedisMessage msg = new SimpleStringRedisMessage("simple"); RedisMessage msg = new SimpleStringRedisMessage("simple");