diff --git a/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/AbstractBinaryMemcacheDecoder.java b/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/AbstractBinaryMemcacheDecoder.java index 7feb2ac42b..a3cdd1ca4d 100644 --- a/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/AbstractBinaryMemcacheDecoder.java +++ b/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/AbstractBinaryMemcacheDecoder.java @@ -16,12 +16,15 @@ package io.netty.handler.codec.memcache.binary; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.DecoderResult; import io.netty.handler.codec.memcache.AbstractMemcacheObjectDecoder; import io.netty.handler.codec.memcache.DefaultLastMemcacheContent; import io.netty.handler.codec.memcache.DefaultMemcacheContent; import io.netty.handler.codec.memcache.LastMemcacheContent; import io.netty.handler.codec.memcache.MemcacheContent; +import io.netty.handler.codec.memcache.MemcacheMessage; import io.netty.util.CharsetUtil; import java.util.List; @@ -71,7 +74,7 @@ public abstract class AbstractBinaryMemcacheDecoder @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { switch (state) { - case READ_HEADER: + case READ_HEADER: try { if (in.readableBytes() < 24) { return; } @@ -79,7 +82,11 @@ public abstract class AbstractBinaryMemcacheDecoder currentHeader = decodeHeader(in); state = State.READ_EXTRAS; - case READ_EXTRAS: + } catch (Exception e) { + out.add(invalidMessage(e)); + return; + } + case READ_EXTRAS: try { byte extrasLength = currentHeader.getExtrasLength(); if (extrasLength > 0) { if (in.readableBytes() < extrasLength) { @@ -90,7 +97,11 @@ public abstract class AbstractBinaryMemcacheDecoder } state = State.READ_KEY; - case READ_KEY: + } catch (Exception e) { + out.add(invalidMessage(e)); + return; + } + case READ_KEY: try { short keyLength = currentHeader.getKeyLength(); if (keyLength > 0) { if (in.readableBytes() < keyLength) { @@ -103,8 +114,12 @@ public abstract class AbstractBinaryMemcacheDecoder out.add(buildMessage(currentHeader, currentExtras, currentKey)); currentExtras = null; - state = State.READ_VALUE; - case READ_VALUE: + state = State.READ_CONTENT; + } catch (Exception e) { + out.add(invalidMessage(e)); + return; + } + case READ_CONTENT: try { int valueLength = currentHeader.getTotalBodyLength() - currentHeader.getKeyLength() - currentHeader.getExtrasLength(); @@ -142,11 +157,44 @@ public abstract class AbstractBinaryMemcacheDecoder state = State.READ_HEADER; return; + } catch (Exception e) { + out.add(invalidChunk(e)); + return; + } + case BAD_MESSAGE: + in.skipBytes(actualReadableBytes()); + return; default: throw new Error("Unknown state reached: " + state); } } + /** + * Helper method to create a message indicating a invalid decoding result. + * + * @param cause the cause of the decoding failure. + * @return a valid message indicating failure. + */ + private M invalidMessage(Exception cause) { + state = State.BAD_MESSAGE; + M message = buildInvalidMessage(); + message.setDecoderResult(DecoderResult.failure(cause)); + return message; + } + + /** + * Helper method to create a content chunk indicating a invalid decoding result. + * + * @param cause the cause of the decoding failure. + * @return a valid content chunk indicating failure. + */ + private MemcacheContent invalidChunk(Exception cause) { + state = State.BAD_MESSAGE; + MemcacheContent chunk = new DefaultLastMemcacheContent(Unpooled.EMPTY_BUFFER); + chunk.setDecoderResult(DecoderResult.failure(cause)); + return chunk; + } + /** * When the channel goes inactive, release all frames to prevent data leaks. * @@ -192,6 +240,13 @@ public abstract class AbstractBinaryMemcacheDecoder */ protected abstract M buildMessage(H header, ByteBuf extras, String key); + /** + * Helper method to create a upstream message when the incoming parsing did fail. + * + * @return a message indicating a decoding failure. + */ + protected abstract M buildInvalidMessage(); + /** * Contains all states this decoder can possibly be in. *

@@ -218,7 +273,12 @@ public abstract class AbstractBinaryMemcacheDecoder /** * Currently reading the value chunks (optional). */ - READ_VALUE + READ_CONTENT, + + /** + * Something went wrong while decoding the message or chunks. + */ + BAD_MESSAGE } } diff --git a/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/BinaryMemcacheObjectAggregator.java b/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/BinaryMemcacheObjectAggregator.java index 10a42593ab..17a22d2ed6 100644 --- a/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/BinaryMemcacheObjectAggregator.java +++ b/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/BinaryMemcacheObjectAggregator.java @@ -31,7 +31,10 @@ import io.netty.util.ReferenceCountUtil; import java.util.List; /** - * A memcache object aggregator for the binary protocol. + * An object aggregator for the memcache binary protocol. + * + * It aggregates {@link BinaryMemcacheMessage}s and {@link MemcacheContent} into {@link FullBinaryMemcacheRequest}s + * or {@link FullBinaryMemcacheResponse}s. */ public class BinaryMemcacheObjectAggregator extends AbstractMemcacheObjectAggregator { @@ -50,8 +53,8 @@ public class BinaryMemcacheObjectAggregator extends AbstractMemcacheObjectAggreg MemcacheMessage m = (MemcacheMessage) msg; if (!m.getDecoderResult().isSuccess()) { + out.add(toFullMessage(m)); this.currentMessage = null; - out.add(ReferenceCountUtil.retain(m)); return; } @@ -111,4 +114,34 @@ public class BinaryMemcacheObjectAggregator extends AbstractMemcacheObjectAggreg } } + /** + * Convert a invalid message into a full message. + * + * This method makes sure that upstream handlers always get a full message returned, even + * when invalid chunks are failing. + * + * @param msg the message to transform. + * @return a full message containing parts of the original message. + */ + private static FullMemcacheMessage toFullMessage(final MemcacheMessage msg) { + if (msg instanceof FullMemcacheMessage) { + return ((FullMemcacheMessage) msg).retain(); + } + + FullMemcacheMessage fullMsg; + if (msg instanceof BinaryMemcacheRequest) { + BinaryMemcacheRequest req = (BinaryMemcacheRequest) msg; + fullMsg = new DefaultFullBinaryMemcacheRequest(req.getHeader(), req.getKey(), req.getExtras(), + Unpooled.EMPTY_BUFFER); + } else if (msg instanceof BinaryMemcacheResponse) { + BinaryMemcacheResponse res = (BinaryMemcacheResponse) msg; + fullMsg = new DefaultFullBinaryMemcacheResponse(res.getHeader(), res.getKey(), res.getExtras(), + Unpooled.EMPTY_BUFFER); + } else { + throw new IllegalStateException(); + } + + return fullMsg; + } + } diff --git a/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/BinaryMemcacheRequestDecoder.java b/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/BinaryMemcacheRequestDecoder.java index 06dd938c9b..4bc2041dc1 100644 --- a/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/BinaryMemcacheRequestDecoder.java +++ b/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/BinaryMemcacheRequestDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.memcache.binary; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; /** * The decoder part which takes care of decoding the request-specific headers. @@ -51,4 +52,12 @@ public class BinaryMemcacheRequestDecoder return new DefaultBinaryMemcacheRequest(header, key, extras); } + @Override + protected BinaryMemcacheRequest buildInvalidMessage() { + return new DefaultBinaryMemcacheRequest( + new DefaultBinaryMemcacheRequestHeader(), + "", + Unpooled.EMPTY_BUFFER + ); + } } diff --git a/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/BinaryMemcacheResponseDecoder.java b/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/BinaryMemcacheResponseDecoder.java index 220412d4da..44ccfcb0f6 100644 --- a/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/BinaryMemcacheResponseDecoder.java +++ b/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/BinaryMemcacheResponseDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.memcache.binary; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; /** * The decoder which takes care of decoding the response headers. @@ -51,4 +52,12 @@ public class BinaryMemcacheResponseDecoder return new DefaultBinaryMemcacheResponse(header, key, extras); } + @Override + protected BinaryMemcacheResponse buildInvalidMessage() { + return new DefaultBinaryMemcacheResponse( + new DefaultBinaryMemcacheResponseHeader(), + "", + Unpooled.EMPTY_BUFFER + ); + } }