Always upstream full memcache messages.
This changeset is related to #2182, which exposes the failure in the http codec, but the memcache codec works very similar. In addition, better failure handling in the decoder has been added.
This commit is contained in:
parent
64be9b2e4a
commit
3f53ba2e36
@ -16,12 +16,15 @@
|
|||||||
package io.netty.handler.codec.memcache.binary;
|
package io.netty.handler.codec.memcache.binary;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.handler.codec.DecoderResult;
|
||||||
import io.netty.handler.codec.memcache.AbstractMemcacheObjectDecoder;
|
import io.netty.handler.codec.memcache.AbstractMemcacheObjectDecoder;
|
||||||
import io.netty.handler.codec.memcache.DefaultLastMemcacheContent;
|
import io.netty.handler.codec.memcache.DefaultLastMemcacheContent;
|
||||||
import io.netty.handler.codec.memcache.DefaultMemcacheContent;
|
import io.netty.handler.codec.memcache.DefaultMemcacheContent;
|
||||||
import io.netty.handler.codec.memcache.LastMemcacheContent;
|
import io.netty.handler.codec.memcache.LastMemcacheContent;
|
||||||
import io.netty.handler.codec.memcache.MemcacheContent;
|
import io.netty.handler.codec.memcache.MemcacheContent;
|
||||||
|
import io.netty.handler.codec.memcache.MemcacheMessage;
|
||||||
import io.netty.util.CharsetUtil;
|
import io.netty.util.CharsetUtil;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -71,7 +74,7 @@ public abstract class AbstractBinaryMemcacheDecoder
|
|||||||
@Override
|
@Override
|
||||||
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
||||||
switch (state) {
|
switch (state) {
|
||||||
case READ_HEADER:
|
case READ_HEADER: try {
|
||||||
if (in.readableBytes() < 24) {
|
if (in.readableBytes() < 24) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -79,7 +82,11 @@ public abstract class AbstractBinaryMemcacheDecoder
|
|||||||
|
|
||||||
currentHeader = decodeHeader(in);
|
currentHeader = decodeHeader(in);
|
||||||
state = State.READ_EXTRAS;
|
state = State.READ_EXTRAS;
|
||||||
case READ_EXTRAS:
|
} catch (Exception e) {
|
||||||
|
out.add(invalidMessage(e));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
case READ_EXTRAS: try {
|
||||||
byte extrasLength = currentHeader.getExtrasLength();
|
byte extrasLength = currentHeader.getExtrasLength();
|
||||||
if (extrasLength > 0) {
|
if (extrasLength > 0) {
|
||||||
if (in.readableBytes() < extrasLength) {
|
if (in.readableBytes() < extrasLength) {
|
||||||
@ -90,7 +97,11 @@ public abstract class AbstractBinaryMemcacheDecoder
|
|||||||
}
|
}
|
||||||
|
|
||||||
state = State.READ_KEY;
|
state = State.READ_KEY;
|
||||||
case READ_KEY:
|
} catch (Exception e) {
|
||||||
|
out.add(invalidMessage(e));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
case READ_KEY: try {
|
||||||
short keyLength = currentHeader.getKeyLength();
|
short keyLength = currentHeader.getKeyLength();
|
||||||
if (keyLength > 0) {
|
if (keyLength > 0) {
|
||||||
if (in.readableBytes() < keyLength) {
|
if (in.readableBytes() < keyLength) {
|
||||||
@ -103,8 +114,12 @@ public abstract class AbstractBinaryMemcacheDecoder
|
|||||||
|
|
||||||
out.add(buildMessage(currentHeader, currentExtras, currentKey));
|
out.add(buildMessage(currentHeader, currentExtras, currentKey));
|
||||||
currentExtras = null;
|
currentExtras = null;
|
||||||
state = State.READ_VALUE;
|
state = State.READ_CONTENT;
|
||||||
case READ_VALUE:
|
} catch (Exception e) {
|
||||||
|
out.add(invalidMessage(e));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
case READ_CONTENT: try {
|
||||||
int valueLength = currentHeader.getTotalBodyLength()
|
int valueLength = currentHeader.getTotalBodyLength()
|
||||||
- currentHeader.getKeyLength()
|
- currentHeader.getKeyLength()
|
||||||
- currentHeader.getExtrasLength();
|
- currentHeader.getExtrasLength();
|
||||||
@ -142,11 +157,44 @@ public abstract class AbstractBinaryMemcacheDecoder
|
|||||||
|
|
||||||
state = State.READ_HEADER;
|
state = State.READ_HEADER;
|
||||||
return;
|
return;
|
||||||
|
} catch (Exception e) {
|
||||||
|
out.add(invalidChunk(e));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
case BAD_MESSAGE:
|
||||||
|
in.skipBytes(actualReadableBytes());
|
||||||
|
return;
|
||||||
default:
|
default:
|
||||||
throw new Error("Unknown state reached: " + state);
|
throw new Error("Unknown state reached: " + state);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method to create a message indicating a invalid decoding result.
|
||||||
|
*
|
||||||
|
* @param cause the cause of the decoding failure.
|
||||||
|
* @return a valid message indicating failure.
|
||||||
|
*/
|
||||||
|
private M invalidMessage(Exception cause) {
|
||||||
|
state = State.BAD_MESSAGE;
|
||||||
|
M message = buildInvalidMessage();
|
||||||
|
message.setDecoderResult(DecoderResult.failure(cause));
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method to create a content chunk indicating a invalid decoding result.
|
||||||
|
*
|
||||||
|
* @param cause the cause of the decoding failure.
|
||||||
|
* @return a valid content chunk indicating failure.
|
||||||
|
*/
|
||||||
|
private MemcacheContent invalidChunk(Exception cause) {
|
||||||
|
state = State.BAD_MESSAGE;
|
||||||
|
MemcacheContent chunk = new DefaultLastMemcacheContent(Unpooled.EMPTY_BUFFER);
|
||||||
|
chunk.setDecoderResult(DecoderResult.failure(cause));
|
||||||
|
return chunk;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* When the channel goes inactive, release all frames to prevent data leaks.
|
* When the channel goes inactive, release all frames to prevent data leaks.
|
||||||
*
|
*
|
||||||
@ -192,6 +240,13 @@ public abstract class AbstractBinaryMemcacheDecoder
|
|||||||
*/
|
*/
|
||||||
protected abstract M buildMessage(H header, ByteBuf extras, String key);
|
protected abstract M buildMessage(H header, ByteBuf extras, String key);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method to create a upstream message when the incoming parsing did fail.
|
||||||
|
*
|
||||||
|
* @return a message indicating a decoding failure.
|
||||||
|
*/
|
||||||
|
protected abstract M buildInvalidMessage();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Contains all states this decoder can possibly be in.
|
* Contains all states this decoder can possibly be in.
|
||||||
* <p/>
|
* <p/>
|
||||||
@ -218,7 +273,12 @@ public abstract class AbstractBinaryMemcacheDecoder
|
|||||||
/**
|
/**
|
||||||
* Currently reading the value chunks (optional).
|
* Currently reading the value chunks (optional).
|
||||||
*/
|
*/
|
||||||
READ_VALUE
|
READ_CONTENT,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Something went wrong while decoding the message or chunks.
|
||||||
|
*/
|
||||||
|
BAD_MESSAGE
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -31,7 +31,10 @@ import io.netty.util.ReferenceCountUtil;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A memcache object aggregator for the binary protocol.
|
* An object aggregator for the memcache binary protocol.
|
||||||
|
*
|
||||||
|
* It aggregates {@link BinaryMemcacheMessage}s and {@link MemcacheContent} into {@link FullBinaryMemcacheRequest}s
|
||||||
|
* or {@link FullBinaryMemcacheResponse}s.
|
||||||
*/
|
*/
|
||||||
public class BinaryMemcacheObjectAggregator extends AbstractMemcacheObjectAggregator {
|
public class BinaryMemcacheObjectAggregator extends AbstractMemcacheObjectAggregator {
|
||||||
|
|
||||||
@ -50,8 +53,8 @@ public class BinaryMemcacheObjectAggregator extends AbstractMemcacheObjectAggreg
|
|||||||
MemcacheMessage m = (MemcacheMessage) msg;
|
MemcacheMessage m = (MemcacheMessage) msg;
|
||||||
|
|
||||||
if (!m.getDecoderResult().isSuccess()) {
|
if (!m.getDecoderResult().isSuccess()) {
|
||||||
|
out.add(toFullMessage(m));
|
||||||
this.currentMessage = null;
|
this.currentMessage = null;
|
||||||
out.add(ReferenceCountUtil.retain(m));
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -111,4 +114,34 @@ public class BinaryMemcacheObjectAggregator extends AbstractMemcacheObjectAggreg
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert a invalid message into a full message.
|
||||||
|
*
|
||||||
|
* This method makes sure that upstream handlers always get a full message returned, even
|
||||||
|
* when invalid chunks are failing.
|
||||||
|
*
|
||||||
|
* @param msg the message to transform.
|
||||||
|
* @return a full message containing parts of the original message.
|
||||||
|
*/
|
||||||
|
private static FullMemcacheMessage toFullMessage(final MemcacheMessage msg) {
|
||||||
|
if (msg instanceof FullMemcacheMessage) {
|
||||||
|
return ((FullMemcacheMessage) msg).retain();
|
||||||
|
}
|
||||||
|
|
||||||
|
FullMemcacheMessage fullMsg;
|
||||||
|
if (msg instanceof BinaryMemcacheRequest) {
|
||||||
|
BinaryMemcacheRequest req = (BinaryMemcacheRequest) msg;
|
||||||
|
fullMsg = new DefaultFullBinaryMemcacheRequest(req.getHeader(), req.getKey(), req.getExtras(),
|
||||||
|
Unpooled.EMPTY_BUFFER);
|
||||||
|
} else if (msg instanceof BinaryMemcacheResponse) {
|
||||||
|
BinaryMemcacheResponse res = (BinaryMemcacheResponse) msg;
|
||||||
|
fullMsg = new DefaultFullBinaryMemcacheResponse(res.getHeader(), res.getKey(), res.getExtras(),
|
||||||
|
Unpooled.EMPTY_BUFFER);
|
||||||
|
} else {
|
||||||
|
throw new IllegalStateException();
|
||||||
|
}
|
||||||
|
|
||||||
|
return fullMsg;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
package io.netty.handler.codec.memcache.binary;
|
package io.netty.handler.codec.memcache.binary;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The decoder part which takes care of decoding the request-specific headers.
|
* The decoder part which takes care of decoding the request-specific headers.
|
||||||
@ -51,4 +52,12 @@ public class BinaryMemcacheRequestDecoder
|
|||||||
return new DefaultBinaryMemcacheRequest(header, key, extras);
|
return new DefaultBinaryMemcacheRequest(header, key, extras);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected BinaryMemcacheRequest buildInvalidMessage() {
|
||||||
|
return new DefaultBinaryMemcacheRequest(
|
||||||
|
new DefaultBinaryMemcacheRequestHeader(),
|
||||||
|
"",
|
||||||
|
Unpooled.EMPTY_BUFFER
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
package io.netty.handler.codec.memcache.binary;
|
package io.netty.handler.codec.memcache.binary;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The decoder which takes care of decoding the response headers.
|
* The decoder which takes care of decoding the response headers.
|
||||||
@ -51,4 +52,12 @@ public class BinaryMemcacheResponseDecoder
|
|||||||
return new DefaultBinaryMemcacheResponse(header, key, extras);
|
return new DefaultBinaryMemcacheResponse(header, key, extras);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected BinaryMemcacheResponse buildInvalidMessage() {
|
||||||
|
return new DefaultBinaryMemcacheResponse(
|
||||||
|
new DefaultBinaryMemcacheResponseHeader(),
|
||||||
|
"",
|
||||||
|
Unpooled.EMPTY_BUFFER
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user