Implementing the Binary Memcache protocol

This changeset implements the full memcache binary protocol spec, including
a first batch of tests. Ascii protocol and more coverage and helper classes
will follow.
This commit is contained in:
Michael Nitschinger 2013-10-01 15:42:05 +02:00 committed by Trustin Lee
parent 6b0025430e
commit 5169376309
46 changed files with 3184 additions and 0 deletions

45
codec-memcache/pom.xml Normal file
View File

@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2013 The Netty Project
~
~ The Netty Project licenses this file to you under the Apache License,
~ version 2.0 (the "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at:
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
~ License for the specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>5.0.0.Alpha1-SNAPSHOT</version>
</parent>
<artifactId>netty-codec-memcache</artifactId>
<packaging>bundle</packaging>
<name>Netty/Codec/Memcache</name>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-codec</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-handler</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,57 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
/**
* The default implementation for the {@link LastMemcacheContent}.
*/
public class DefaultLastMemcacheContent extends DefaultMemcacheContent implements LastMemcacheContent {
public DefaultLastMemcacheContent() {
super(Unpooled.buffer());
}
public DefaultLastMemcacheContent(ByteBuf content) {
super(content);
}
@Override
public LastMemcacheContent retain() {
super.retain();
return this;
}
@Override
public LastMemcacheContent retain(int increment) {
super.retain(increment);
return this;
}
@Override
public LastMemcacheContent copy() {
return new DefaultLastMemcacheContent(content().copy());
}
@Override
public LastMemcacheContent duplicate() {
return new DefaultLastMemcacheContent(content().duplicate());
}
}

View File

@ -0,0 +1,84 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache;
import io.netty.buffer.ByteBuf;
/**
* The default {@link MemcacheContent} implementation.
*/
public class DefaultMemcacheContent extends DefaultMemcacheObject implements MemcacheContent {
private final ByteBuf content;
/**
* Creates a new instance with the specified content.
*/
public DefaultMemcacheContent(ByteBuf content) {
if (content == null) {
throw new NullPointerException("Content cannot be null.");
}
this.content = content;
}
@Override
public ByteBuf content() {
return content;
}
@Override
public MemcacheContent copy() {
return new DefaultMemcacheContent(content.copy());
}
@Override
public MemcacheContent duplicate() {
return new DefaultMemcacheContent(content.duplicate());
}
@Override
public int refCnt() {
return content.refCnt();
}
@Override
public MemcacheContent retain() {
content.retain();
return this;
}
@Override
public MemcacheContent retain(int increment) {
content.retain(increment);
return this;
}
@Override
public boolean release() {
return content.release();
}
@Override
public boolean release(int decrement) {
return content.release(decrement);
}
@Override
public String toString() {
return getClass().getSimpleName() + "(data: " + content() + ", getDecoderResult: " + getDecoderResult() + ')';
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache;
import io.netty.handler.codec.DecoderResult;
import io.netty.util.AbstractReferenceCounted;
/**
* The default {@link MemcacheObject} implementation.
*/
public abstract class DefaultMemcacheObject implements MemcacheObject {
private DecoderResult decoderResult = DecoderResult.SUCCESS;
protected DefaultMemcacheObject() {
// Disallow direct instantiation
}
@Override
public DecoderResult getDecoderResult() {
return decoderResult;
}
@Override
public void setDecoderResult(DecoderResult result) {
if (result == null) {
throw new NullPointerException("DecoderResult should not be null.");
}
decoderResult = result;
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache;
/**
* Combines {@link MemcacheMessage} and {@link LastMemcacheContent} into one
* message. So it represent a <i>complete</i> memcache message.
*/
public interface FullMemcacheMessage extends MemcacheMessage, LastMemcacheContent {
@Override
FullMemcacheMessage copy();
@Override
FullMemcacheMessage retain(int increment);
@Override
FullMemcacheMessage retain();
@Override
FullMemcacheMessage duplicate();
}

View File

@ -0,0 +1,95 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.DecoderResult;
/**
* The {@link MemcacheContent} which signals the end of the content batch.
* <p/>
* Note that by design, even when no content is emitted by the protocol, an
* empty {@link LastMemcacheContent} is issued to make the upstream parsing
* easier.
*/
public interface LastMemcacheContent extends MemcacheContent {
LastMemcacheContent EMPTY_LAST_CONTENT = new LastMemcacheContent() {
@Override
public LastMemcacheContent copy() {
return EMPTY_LAST_CONTENT;
}
@Override
public LastMemcacheContent retain(int increment) {
return this;
}
@Override
public LastMemcacheContent retain() {
return this;
}
@Override
public LastMemcacheContent duplicate() {
return this;
}
@Override
public ByteBuf content() {
return Unpooled.EMPTY_BUFFER;
}
@Override
public DecoderResult getDecoderResult() {
return DecoderResult.SUCCESS;
}
@Override
public void setDecoderResult(DecoderResult result) {
throw new UnsupportedOperationException("read only");
}
@Override
public int refCnt() {
return 1;
}
@Override
public boolean release() {
return false;
}
@Override
public boolean release(int decrement) {
return false;
}
};
@Override
LastMemcacheContent copy();
@Override
LastMemcacheContent retain(int increment);
@Override
LastMemcacheContent retain();
@Override
LastMemcacheContent duplicate();
}

View File

@ -0,0 +1,42 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache;
import io.netty.buffer.ByteBufHolder;
/**
* An Memcache content chunk.
* <p/>
* A MemcacheObjectDecoder generates {@link MemcacheContent} after
* {@link MemcacheMessage} when the content is large. If you prefer not to
* receive {@link MemcacheContent} in your handler, place a aggregator
* after MemcacheObjectDecoder in the {@link io.netty.channel.ChannelPipeline}.
*/
public interface MemcacheContent extends MemcacheObject, ByteBufHolder {
@Override
MemcacheContent copy();
@Override
MemcacheContent duplicate();
@Override
MemcacheContent retain();
@Override
MemcacheContent retain(int increment);
}

View File

@ -0,0 +1,37 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache;
import io.netty.util.ReferenceCounted;
/**
* Marker interface for both ascii and binary messages.
*/
public interface MemcacheMessage extends MemcacheObject, ReferenceCounted {
/**
* Increases the reference count by {@code 1}.
*/
@Override
MemcacheMessage retain();
/**
* Increases the reference count by the specified {@code increment}.
*/
@Override
MemcacheMessage retain(int increment);
}

View File

@ -0,0 +1,37 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache;
import io.netty.handler.codec.DecoderResult;
/**
* Defines a common interface for all {@link MemcacheObject} implementations.
*/
public interface MemcacheObject {
/**
* Returns the result of decoding this message.
*/
DecoderResult getDecoderResult();
/**
* Updates the result of decoding this message.
* <p/>
* <p>Do not call this method unless you know what you are doing.</p>
*/
void setDecoderResult(DecoderResult result);
}

View File

@ -0,0 +1,127 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
/**
* A {@link io.netty.channel.ChannelHandler} that aggregates an {@link MemcacheMessage}
* and its following {@link MemcacheContent}s into a single {@link MemcacheMessage} with
* no following {@link MemcacheContent}s. It is useful when you don't want to take
* care of memcache messages where the content comes along in chunks. Insert this
* handler after a MemcacheObjectDecoder in the {@link io.netty.channel.ChannelPipeline}.
* <p/>
* For example, here for the binary protocol:
* <p/>
* <pre>
* {@link io.netty.channel.ChannelPipeline} p = ...;
* ...
* p.addLast("decoder", new {@link io.netty.handler.codec.memcache.binary.BinaryMemcacheRequestDecoder}());
* p.addLast("aggregator", <b>new {@link MemcacheObjectAggregator}(1048576)</b>);
* ...
* p.addLast("encoder", new {@link io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseEncoder}());
* p.addLast("handler", new YourMemcacheRequestHandler());
* </pre>
*/
public abstract class MemcacheObjectAggregator extends MessageToMessageDecoder<MemcacheObject> {
/**
* Contains the current message that gets aggregated.
*/
protected FullMemcacheMessage currentMessage;
/**
* Holds the current channel handler context if set.
*/
protected ChannelHandlerContext ctx;
public static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024;
private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS;
private final int maxContentLength;
public MemcacheObjectAggregator(int maxContentLength) {
if (maxContentLength <= 0) {
throw new IllegalArgumentException("maxContentLength must be a positive integer: " + maxContentLength);
}
this.maxContentLength = maxContentLength;
}
/**
* Returns the maximum number of components in the cumulation buffer. If the number of
* the components in the cumulation buffer exceeds this value, the components of the
* cumulation buffer are consolidated into a single component, involving memory copies.
* The default value of this property is {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}.
*/
public final int getMaxCumulationBufferComponents() {
return maxCumulationBufferComponents;
}
/**
* Sets the maximum number of components in the cumulation buffer. If the number of
* the components in the cumulation buffer exceeds this value, the components of the
* cumulation buffer are consolidated into a single component, involving memory copies.
* The default value of this property is {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}
* and its minimum allowed value is {@code 2}.
*/
public final void setMaxCumulationBufferComponents(int maxCumulationBufferComponents) {
if (maxCumulationBufferComponents < 2) {
throw new IllegalArgumentException(
"maxCumulationBufferComponents: " + maxCumulationBufferComponents +
" (expected: >= 2)");
}
if (ctx == null) {
this.maxCumulationBufferComponents = maxCumulationBufferComponents;
} else {
throw new IllegalStateException(
"decoder properties cannot be changed once the decoder is added to a pipeline.");
}
}
public int getMaxContentLength() {
return maxContentLength;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
if (currentMessage != null) {
currentMessage.release();
currentMessage = null;
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
if (currentMessage != null) {
currentMessage.release();
currentMessage = null;
}
}
}

View File

@ -0,0 +1,27 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache;
import io.netty.handler.codec.ByteToMessageDecoder;
/**
* Abstract super class for both ascii and binary decoders.
* <p/>
* Currently it just acts as a common denominator, but will certainly include methods once the ascii protocol
* is implemented.
*/
public abstract class MemcacheObjectDecoder extends ByteToMessageDecoder {
}

View File

@ -0,0 +1,111 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
/**
* A general purpose {@link MemcacheObjectEncoder} that encodes {@link MemcacheMessage}s.
* <p/>
* <p>Note that this class is designed to be extended, especially because both the binary and ascii protocol
* require different treatment of their messages. Since the content chunk writing is the same for both, the encoder
* abstracts this right away.</p>
*/
public abstract class MemcacheObjectEncoder<M extends MemcacheMessage> extends MessageToMessageEncoder<Object> {
private boolean expectingMoreContent;
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
if (msg instanceof MemcacheMessage) {
if (expectingMoreContent) {
throw new IllegalStateException("unexpected message type: " + msg.getClass().getSimpleName());
}
out.add(encodeMessage(ctx, (M) msg));
}
if (msg instanceof MemcacheContent || msg instanceof ByteBuf || msg instanceof FileRegion) {
int contentLength = contentLength(msg);
if (contentLength > 0) {
out.add(encodeAndRetain(msg));
} else {
out.add(Unpooled.EMPTY_BUFFER);
}
expectingMoreContent = !(msg instanceof LastMemcacheContent);
}
}
@Override
public boolean acceptOutboundMessage(Object msg) throws Exception {
return msg instanceof MemcacheObject || msg instanceof ByteBuf || msg instanceof FileRegion;
}
/**
* Take the given {@link MemcacheMessage} and encode it into a writable {@link ByteBuf}.
*
* @param ctx the channel handler context.
* @param msg the message to encode.
* @return the {@link ByteBuf} representation of the message.
*/
protected abstract ByteBuf encodeMessage(ChannelHandlerContext ctx, M msg);
/**
* Determine the content length of the given object.
*
* @param msg the object to determine the length of.
* @return the determined content length.
*/
private static int contentLength(Object msg) {
if (msg instanceof MemcacheContent) {
return ((MemcacheContent) msg).content().readableBytes();
}
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
}
if (msg instanceof FileRegion) {
return (int) ((FileRegion) msg).count();
}
throw new IllegalStateException("unexpected message type: " + msg.getClass().getSimpleName());
}
/**
* Encode the content, depending on the object type.
*
* @param msg the object to encode.
* @return the encoded object.
*/
private static Object encodeAndRetain(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).retain();
}
if (msg instanceof MemcacheContent) {
return ((MemcacheContent) msg).content().retain();
}
if (msg instanceof FileRegion) {
return ((FileRegion) msg).retain();
}
throw new IllegalStateException("unexpected message type: " + msg.getClass().getSimpleName());
}
}

View File

@ -0,0 +1,120 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.CombinedChannelDuplexHandler;
import io.netty.handler.codec.PrematureChannelClosureException;
import io.netty.handler.codec.memcache.LastMemcacheContent;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
* The client codec that combines the proper encoder and decoder.
* <p/>
* Use this codec if you want to implement a memcache client that speaks the binary protocol. It
* combines both the {@link BinaryMemcacheResponseDecoder} and the {@link BinaryMemcacheRequestEncoder}.
* <p/>
* Optionally, it counts the number of outstanding responses and raises an exception if - on connection
* close - the list is not 0 (this is turned off by default). You can also define a chunk size for the
* content, which defaults to 8192. This chunk size is the maximum, so if smaller chunks arrive they
* will be passed up the pipeline and not queued up to the chunk size.
*/
public final class BinaryMemcacheClientCodec
extends CombinedChannelDuplexHandler<BinaryMemcacheResponseDecoder, BinaryMemcacheRequestEncoder> {
private final boolean failOnMissingResponse;
private final AtomicLong requestResponseCounter = new AtomicLong();
/**
* Create a new {@link BinaryMemcacheClientCodec} with the default settings applied.
*/
public BinaryMemcacheClientCodec() {
this(Decoder.DEFAULT_MAX_CHUNK_SIZE);
}
/**
* Create a new {@link BinaryMemcacheClientCodec} and set a custom chunk size.
*
* @param decodeChunkSize the maximum chunk size.
*/
public BinaryMemcacheClientCodec(int decodeChunkSize) {
this(decodeChunkSize, false);
}
/**
* Create a new {@link BinaryMemcacheClientCodec} with custom settings.
*
* @param decodeChunkSize the maximum chunk size.
* @param failOnMissingResponse report if after close there are outstanding requests.
*/
public BinaryMemcacheClientCodec(int decodeChunkSize, boolean failOnMissingResponse) {
this.failOnMissingResponse = failOnMissingResponse;
init(new Decoder(decodeChunkSize), new Encoder());
}
private final class Encoder extends BinaryMemcacheRequestEncoder {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
super.encode(ctx, msg, out);
if (failOnMissingResponse && msg instanceof LastMemcacheContent) {
requestResponseCounter.incrementAndGet();
}
}
}
private final class Decoder extends BinaryMemcacheResponseDecoder {
public Decoder(int chunkSize) {
super(chunkSize);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int oldSize = out.size();
super.decode(ctx, in, out);
if (failOnMissingResponse) {
int size = out.size();
for (int i = oldSize; i < size; size++) {
Object msg = out.get(i);
if (msg != null && msg instanceof LastMemcacheContent) {
requestResponseCounter.decrementAndGet();
}
}
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
if (failOnMissingResponse) {
long missingResponses = requestResponseCounter.get();
if (missingResponses > 0) {
ctx.fireExceptionCaught(new PrematureChannelClosureException(
"channel gone inactive with " + missingResponses +
" missing response(s)"));
}
}
}
}
}

View File

@ -0,0 +1,217 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.memcache.DefaultLastMemcacheContent;
import io.netty.handler.codec.memcache.DefaultMemcacheContent;
import io.netty.handler.codec.memcache.LastMemcacheContent;
import io.netty.handler.codec.memcache.MemcacheContent;
import io.netty.handler.codec.memcache.MemcacheObjectDecoder;
import io.netty.util.CharsetUtil;
import java.util.List;
import static io.netty.buffer.ByteBufUtil.readBytes;
/**
* Decoder for both {@link BinaryMemcacheRequest} and {@link BinaryMemcacheResponse}.
* <p/>
* The difference in the protocols (header) is implemented by the subclasses.
*/
public abstract class BinaryMemcacheDecoder<M extends BinaryMemcacheMessage, H extends BinaryMemcacheMessageHeader>
extends MemcacheObjectDecoder {
public static final int DEFAULT_MAX_CHUNK_SIZE = 8192;
private final int chunkSize;
private H currentHeader;
private ByteBuf currentExtras;
private String currentKey;
private int alreadyReadChunkSize;
private State state = State.READ_HEADER;
/**
* Create a new {@link BinaryMemcacheDecoder} with default settings.
*/
public BinaryMemcacheDecoder() {
this(DEFAULT_MAX_CHUNK_SIZE);
}
/**
* Create a new {@link BinaryMemcacheDecoder} with custom settings.
*
* @param chunkSize the maximum chunk size of the payload.
*/
public BinaryMemcacheDecoder(int chunkSize) {
if (chunkSize < 0) {
throw new IllegalArgumentException("chunkSize must be a positive integer: " + chunkSize);
}
this.chunkSize = chunkSize;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
switch (state) {
case READ_HEADER:
if (in.readableBytes() < 24) {
return;
}
resetDecoder();
currentHeader = decodeHeader(in);
state = State.READ_EXTRAS;
case READ_EXTRAS:
byte extrasLength = currentHeader.getExtrasLength();
if (extrasLength > 0) {
if (in.readableBytes() < extrasLength) {
return;
}
currentExtras = readBytes(ctx.alloc(), in, extrasLength);
}
state = State.READ_KEY;
case READ_KEY:
short keyLength = currentHeader.getKeyLength();
if (keyLength > 0) {
if (in.readableBytes() < keyLength) {
return;
}
currentKey = readBytes(ctx.alloc(), in, keyLength).toString(CharsetUtil.UTF_8);
}
out.add(buildMessage(currentHeader, currentExtras, currentKey));
currentExtras = null;
state = State.READ_VALUE;
case READ_VALUE:
int valueLength = currentHeader.getTotalBodyLength()
- currentHeader.getKeyLength()
- currentHeader.getExtrasLength();
int toRead = in.readableBytes();
if (valueLength > 0) {
if (toRead == 0) {
return;
} else if (toRead > chunkSize) {
toRead = chunkSize;
}
if (toRead > valueLength) {
toRead = valueLength;
}
ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead);
boolean isLast = (alreadyReadChunkSize + toRead) >= valueLength;
MemcacheContent chunk = isLast
? new DefaultLastMemcacheContent(chunkBuffer)
: new DefaultMemcacheContent(chunkBuffer);
alreadyReadChunkSize += toRead;
out.add(chunk);
if (alreadyReadChunkSize < valueLength) {
return;
}
} else {
out.add(LastMemcacheContent.EMPTY_LAST_CONTENT);
}
state = State.READ_HEADER;
return;
default:
throw new Error("Unknown state reached: " + state);
}
}
/**
* When the channel goes inactive, release all frames to prevent data leaks.
*
* @param ctx handler context
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
if (currentExtras != null) {
currentExtras.release();
}
resetDecoder();
}
/**
* Prepare for next decoding iteration.
*/
protected void resetDecoder() {
currentHeader = null;
currentExtras = null;
currentKey = null;
alreadyReadChunkSize = 0;
}
/**
* Decode and return the parsed {@link BinaryMemcacheMessageHeader}.
*
* @param in the incoming buffer.
* @return the decoded header.
*/
protected abstract H decodeHeader(ByteBuf in);
/**
* Build the complete message, based on the information decoded.
*
* @param header the header of the message.
* @param extras possible extras.
* @param key possible key.
* @return the decoded message.
*/
protected abstract M buildMessage(H header, ByteBuf extras, String key);
/**
* Contains all states this decoder can possibly be in.
* <p/>
* Note that most of the states can be optional, the only one required is reading
* the header ({@link #READ_HEADER}. All other steps depend on the length fields
* in the header and will be executed conditionally.
*/
enum State {
/**
* Currently reading the header portion.
*/
READ_HEADER,
/**
* Currently reading the extras portion (optional).
*/
READ_EXTRAS,
/**
* Currently reading the key portion (optional).
*/
READ_KEY,
/**
* Currently reading the value chunks (optional).
*/
READ_VALUE
}
}

View File

@ -0,0 +1,80 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.memcache.MemcacheObjectEncoder;
import io.netty.util.CharsetUtil;
/**
* A {@link io.netty.handler.codec.MessageToByteEncoder} that encodes binary memache messages into bytes.
*/
public abstract class BinaryMemcacheEncoder<M extends BinaryMemcacheMessage, H extends BinaryMemcacheMessageHeader>
extends MemcacheObjectEncoder<M> {
@Override
protected ByteBuf encodeMessage(ChannelHandlerContext ctx, M msg) {
ByteBuf buf = ctx.alloc().buffer();
encodeHeader(buf, (H) msg.getHeader());
encodeExtras(buf, msg.getExtras());
encodeKey(buf, msg.getKey());
return buf;
}
/**
* Encode the extras.
*
* @param buf the {@link ByteBuf} to write into.
* @param extras the extras to encode.
*/
private void encodeExtras(ByteBuf buf, ByteBuf extras) {
if (extras == null) {
return;
}
buf.writeBytes(extras);
}
/**
* Encode the key.
*
* @param buf the {@link ByteBuf} to write into.
* @param key the key to encode.
*/
private void encodeKey(ByteBuf buf, String key) {
if (key == null || key.isEmpty()) {
return;
}
buf.writeBytes(Unpooled.copiedBuffer(key, CharsetUtil.UTF_8));
}
/**
* Encode the header.
* <p/>
* This methods needs to be implemented by a sub class because the header is different
* for both requests and responses.
*
* @param buf the {@link ByteBuf} to write into.
* @param header the header to encode.
*/
protected abstract void encodeHeader(ByteBuf buf, H header);
}

View File

@ -0,0 +1,66 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.memcache.MemcacheMessage;
import io.netty.handler.codec.memcache.MemcacheObject;
/**
* An interface that defines a binary Memcache message, providing common properties for
* {@link BinaryMemcacheRequest} and {@link BinaryMemcacheResponse}.
* <p/>
* A {@link BinaryMemcacheMessage} always consists of a header and optional extras or/and
* a key.
*
* @see BinaryMemcacheMessageHeader
* @see BinaryMemcacheRequest
* @see BinaryMemcacheResponse
*/
public interface BinaryMemcacheMessage<H extends BinaryMemcacheMessageHeader> extends MemcacheMessage {
/**
* Returns the {@link BinaryMemcacheMessageHeader} which contains the full required header.
*
* @return the required header.
*/
H getHeader();
/**
* Returns the optional key of the document.
*
* @return the key of the document.
*/
String getKey();
/**
* Returns a {@link ByteBuf} representation of the optional extras.
*
* @return the optional extras.
*/
ByteBuf getExtras();
/**
* Increases the reference count by {@code 1}.
*/
BinaryMemcacheMessage retain();
/**
* Increases the reference count by the specified {@code increment}.
*/
BinaryMemcacheMessage retain(int increment);
}

View File

@ -0,0 +1,157 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
/**
* Contains all common header fields in a {@link BinaryMemcacheMessage}.
* <p/>
* <p>Since the header is different for a {@link BinaryMemcacheRequest} and {@link BinaryMemcacheResponse}, see
* {@link BinaryMemcacheRequestHeader} and {@link BinaryMemcacheResponseHeader}.</p>
* <p/>
* <p>The {@link BinaryMemcacheMessageHeader} is always 24 bytes in length and needs to be filled up if values are
* not set.</p>
* <p/>
* <p>Fore more information, see the official protocol specification
* <a href="https://code.google.com/p/memcached/wiki/MemcacheBinaryProtocol">here</a>.</p>
*/
public interface BinaryMemcacheMessageHeader {
/**
* Returns the magic byte for the message.
*
* @return the magic byte.
*/
byte getMagic();
/**
* Sets the magic byte.
*
* @param magic the magic byte to use.
* @see io.netty.handler.codec.memcache.binary.util.BinaryMemcacheOpcodes for typesafe opcodes.
*/
BinaryMemcacheMessageHeader setMagic(byte magic);
/**
* Returns the opcode for the message.
*
* @return the opcode.
*/
byte getOpcode();
/**
* Sets the opcode for the message.
*
* @param code the opcode to use.
*/
BinaryMemcacheMessageHeader setOpcode(byte code);
/**
* Returns the key length of the message.
* <p/>
* This may return 0, since the key is optional.
*
* @return the key length.
*/
short getKeyLength();
/**
* Set the key length of the message.
* <p/>
* This may be 0, since the key is optional.
*
* @param keyLength the key length to use.
*/
BinaryMemcacheMessageHeader setKeyLength(short keyLength);
/**
* Return the extras length of the message.
* <p/>
* This may be 0, since the extras content is optional.
*
* @return the extras length.
*/
byte getExtrasLength();
/**
* Set the extras length of the message.
* <p/>
* This may be 0, since the extras content is optional.
*
* @param extrasLength the extras length.
*/
BinaryMemcacheMessageHeader setExtrasLength(byte extrasLength);
/**
* Returns the data type of the message.
*
* @return the data type of the message.
*/
byte getDataType();
/**
* Sets the data type of the message.
*
* @param dataType the data type of the message.
*/
BinaryMemcacheMessageHeader setDataType(byte dataType);
/**
* Returns the total body length.
* <p/>
* Note that this may be 0, since the body is optional.
*
* @return the total body length.
*/
int getTotalBodyLength();
/**
* Sets the total body length.
* <p/>
* Note that this may be 0, since the body length is optional.
*
* @param totalBodyLength the total body length.
*/
BinaryMemcacheMessageHeader setTotalBodyLength(int totalBodyLength);
/**
* Returns the opaque value.
*
* @return the opaque value.
*/
int getOpaque();
/**
* Sets the opaque value.
*
* @param opaque the opqaue value to use.
*/
BinaryMemcacheMessageHeader setOpaque(int opaque);
/**
* Returns the CAS identifier.
*
* @return the CAS identifier.
*/
long getCAS();
/**
* Sets the CAS identifier.
*
* @param cas the CAS identifier to use.
*/
BinaryMemcacheMessageHeader setCAS(long cas);
}

View File

@ -0,0 +1,114 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.memcache.FullMemcacheMessage;
import io.netty.handler.codec.memcache.LastMemcacheContent;
import io.netty.handler.codec.memcache.MemcacheContent;
import io.netty.handler.codec.memcache.MemcacheMessage;
import io.netty.handler.codec.memcache.MemcacheObject;
import io.netty.handler.codec.memcache.MemcacheObjectAggregator;
import io.netty.util.ReferenceCountUtil;
import java.util.List;
/**
* A {@link MemcacheObjectAggregator} for the binary protocol.
*/
public class BinaryMemcacheObjectAggregator extends MemcacheObjectAggregator {
private boolean tooLongFrameFound;
public BinaryMemcacheObjectAggregator(int maxContentLength) {
super(maxContentLength);
}
@Override
protected void decode(ChannelHandlerContext ctx, MemcacheObject msg, List<Object> out) throws Exception {
FullMemcacheMessage currentMessage = this.currentMessage;
if (msg instanceof MemcacheMessage) {
tooLongFrameFound = false;
MemcacheMessage m = (MemcacheMessage) msg;
if (!m.getDecoderResult().isSuccess()) {
this.currentMessage = null;
out.add(ReferenceCountUtil.retain(m));
return;
}
if (msg instanceof BinaryMemcacheRequest) {
BinaryMemcacheRequest request = (BinaryMemcacheRequest) msg;
this.currentMessage = new DefaultFullBinaryMemcacheRequest(request.getHeader(), request.getKey(),
request.getExtras(), Unpooled.compositeBuffer(getMaxCumulationBufferComponents()));
} else if (msg instanceof BinaryMemcacheResponse) {
BinaryMemcacheResponse response = (BinaryMemcacheResponse) msg;
this.currentMessage = new DefaultFullBinaryMemcacheResponse(response.getHeader(), response.getKey(),
response.getExtras(), Unpooled.compositeBuffer(getMaxCumulationBufferComponents()));
} else {
throw new Error();
}
} else if (msg instanceof MemcacheContent) {
if (tooLongFrameFound) {
if (msg instanceof LastMemcacheContent) {
this.currentMessage = null;
}
return;
}
MemcacheContent chunk = (MemcacheContent) msg;
CompositeByteBuf content = (CompositeByteBuf) currentMessage.content();
if (content.readableBytes() > getMaxContentLength() - chunk.content().readableBytes()) {
tooLongFrameFound = true;
currentMessage.release();
this.currentMessage = null;
throw new TooLongFrameException("Memcache content length exceeded " + getMaxContentLength()
+ " bytes.");
}
if (chunk.content().isReadable()) {
chunk.retain();
content.addComponent(chunk.content());
content.writerIndex(content.writerIndex() + chunk.content().readableBytes());
}
final boolean last;
if (!chunk.getDecoderResult().isSuccess()) {
currentMessage.setDecoderResult(
DecoderResult.failure(chunk.getDecoderResult().cause()));
last = true;
} else {
last = chunk instanceof LastMemcacheContent;
}
if (last) {
this.currentMessage = null;
out.add(currentMessage);
}
} else {
throw new Error();
}
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
/**
* Represents a full {@link BinaryMemcacheRequest}, which contains the header and optional key and extras.
*/
public interface BinaryMemcacheRequest extends BinaryMemcacheMessage<BinaryMemcacheRequestHeader> {
/**
* Returns the {@link BinaryMemcacheRequestHeader} which contains the full required request header.
*
* @return the required request header.
*/
BinaryMemcacheRequestHeader getHeader();
}

View File

@ -0,0 +1,54 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
import io.netty.buffer.ByteBuf;
/**
* The decoder part which takes care of decoding the request-specific headers.
*/
public class BinaryMemcacheRequestDecoder
extends BinaryMemcacheDecoder<BinaryMemcacheRequest, BinaryMemcacheRequestHeader> {
public BinaryMemcacheRequestDecoder() {
this(DEFAULT_MAX_CHUNK_SIZE);
}
public BinaryMemcacheRequestDecoder(int chunkSize) {
super(chunkSize);
}
@Override
protected BinaryMemcacheRequestHeader decodeHeader(ByteBuf in) {
BinaryMemcacheRequestHeader header = new DefaultBinaryMemcacheRequestHeader();
header.setMagic(in.readByte());
header.setOpcode(in.readByte());
header.setKeyLength(in.readShort());
header.setExtrasLength(in.readByte());
header.setDataType(in.readByte());
header.setReserved(in.readShort());
header.setTotalBodyLength(in.readInt());
header.setOpaque(in.readInt());
header.setCAS(in.readLong());
return header;
}
@Override
protected BinaryMemcacheRequest buildMessage(BinaryMemcacheRequestHeader header, ByteBuf extras, String key) {
return new DefaultBinaryMemcacheRequest(header, key, extras);
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
import io.netty.buffer.ByteBuf;
/**
* The encoder part which takes care of encoding the request headers.
*/
public class BinaryMemcacheRequestEncoder
extends BinaryMemcacheEncoder<BinaryMemcacheRequest, BinaryMemcacheRequestHeader> {
@Override
protected void encodeHeader(ByteBuf buf, BinaryMemcacheRequestHeader header) {
buf.writeByte(header.getMagic());
buf.writeByte(header.getOpcode());
buf.writeShort(header.getKeyLength());
buf.writeByte(header.getExtrasLength());
buf.writeByte(header.getDataType());
buf.writeShort(header.getReserved());
buf.writeInt(header.getTotalBodyLength());
buf.writeInt(header.getOpaque());
buf.writeLong(header.getCAS());
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
/**
* Extends the common {@link BinaryMemcacheMessageHeader} header fields with hose who can only show up in
* {@link BinaryMemcacheRequest} messages.
* <p/>
* <p>Note that while the additional field in the request is called "reserved", it can still be used for a custom
* memcached implementation. It will not be mirrored back like the
* {@link io.netty.handler.codec.memcache.binary.BinaryMemcacheRequestHeader#getOpaque()} field, because in the
* {@link BinaryMemcacheResponseHeader}, the status field is set there instead.</p>
*/
public interface BinaryMemcacheRequestHeader extends BinaryMemcacheMessageHeader {
/**
* Returns the reserved field value.
*
* @return the reserved field value.
*/
short getReserved();
/**
* Sets the reserved field value.
*
* @param reserved the reserved field value.
*/
BinaryMemcacheRequestHeader setReserved(short reserved);
}

View File

@ -0,0 +1,30 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
/**
* Represents a full {@link BinaryMemcacheResponse}, which contains the header and optional key and extras.
*/
public interface BinaryMemcacheResponse extends BinaryMemcacheMessage<BinaryMemcacheResponseHeader> {
/**
* Returns the {@link BinaryMemcacheResponseHeader} which contains the full required response header.
*
* @return the required response header.
*/
BinaryMemcacheResponseHeader getHeader();
}

View File

@ -0,0 +1,54 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
import io.netty.buffer.ByteBuf;
/**
* The decoder which takes care of decoding the response headers.
*/
public class BinaryMemcacheResponseDecoder
extends BinaryMemcacheDecoder<BinaryMemcacheResponse, BinaryMemcacheResponseHeader> {
public BinaryMemcacheResponseDecoder() {
this(DEFAULT_MAX_CHUNK_SIZE);
}
public BinaryMemcacheResponseDecoder(int chunkSize) {
super(chunkSize);
}
@Override
protected BinaryMemcacheResponseHeader decodeHeader(ByteBuf in) {
BinaryMemcacheResponseHeader header = new DefaultBinaryMemcacheResponseHeader();
header.setMagic(in.readByte());
header.setOpcode(in.readByte());
header.setKeyLength(in.readShort());
header.setExtrasLength(in.readByte());
header.setDataType(in.readByte());
header.setStatus(in.readShort());
header.setTotalBodyLength(in.readInt());
header.setOpaque(in.readInt());
header.setCAS(in.readLong());
return header;
}
@Override
protected BinaryMemcacheResponse buildMessage(BinaryMemcacheResponseHeader header, ByteBuf extras, String key) {
return new DefaultBinaryMemcacheResponse(header, key, extras);
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
import io.netty.buffer.ByteBuf;
/**
* The encoder which takes care of encoding the response headers.
*/
public class BinaryMemcacheResponseEncoder
extends BinaryMemcacheEncoder<BinaryMemcacheResponse, BinaryMemcacheResponseHeader> {
@Override
protected void encodeHeader(ByteBuf buf, BinaryMemcacheResponseHeader header) {
buf.writeByte(header.getMagic());
buf.writeByte(header.getOpcode());
buf.writeShort(header.getKeyLength());
buf.writeByte(header.getExtrasLength());
buf.writeByte(header.getDataType());
buf.writeShort(header.getStatus());
buf.writeInt(header.getTotalBodyLength());
buf.writeInt(header.getOpaque());
buf.writeLong(header.getCAS());
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
/**
* Extends the common {@link BinaryMemcacheMessageHeader} header fields with hose who can only show up in
* {@link BinaryMemcacheResponse} messages.
*
* @see io.netty.handler.codec.memcache.binary.util.BinaryMemcacheResponseStatus
*/
public interface BinaryMemcacheResponseHeader extends BinaryMemcacheMessageHeader {
/**
* Returns the status of the response.
*
* @return the status of the response.
*/
short getStatus();
/**
* Sets the status of the response.
*
* @param status the status to set.
*/
BinaryMemcacheResponseHeader setStatus(short status);
}

View File

@ -0,0 +1,37 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
import io.netty.channel.CombinedChannelDuplexHandler;
/**
* The full server codec that combines the correct encoder and decoder.
* <p/>
* Use this codec if you need to implement a server that speaks the memache binary protocol.
* Internally, it combines the {@link BinaryMemcacheRequestDecoder} and the
* {@link BinaryMemcacheResponseEncoder} to request decoding and response encoding.
*/
public class BinaryMemcacheServerCodec
extends CombinedChannelDuplexHandler<BinaryMemcacheRequestDecoder, BinaryMemcacheResponseEncoder> {
public BinaryMemcacheServerCodec() {
this(BinaryMemcacheRequestDecoder.DEFAULT_MAX_CHUNK_SIZE);
}
public BinaryMemcacheServerCodec(int decodeChunkSize) {
init(new BinaryMemcacheRequestDecoder(decodeChunkSize), new BinaryMemcacheResponseEncoder());
}
}

View File

@ -0,0 +1,112 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.memcache.DefaultMemcacheObject;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCounted;
/**
* Default implementation of a {@link BinaryMemcacheMessage}.
*/
public abstract class DefaultBinaryMemcacheMessage<H extends BinaryMemcacheMessageHeader>
extends DefaultMemcacheObject
implements BinaryMemcacheMessage<H> {
/**
* Contains the message header.
*/
private final H header;
/**
* Contains the optional key.
*/
private final String key;
/**
* Contains the optional extras.
*/
private final ByteBuf extras;
/**
* Create a new instance with all properties set.
*
* @param header the message header.
* @param key the message key.
* @param extras the message extras.
*/
public DefaultBinaryMemcacheMessage(H header, String key, ByteBuf extras) {
this.header = header;
this.key = key;
this.extras = extras;
}
@Override
public H getHeader() {
return header;
}
@Override
public String getKey() {
return key;
}
@Override
public ByteBuf getExtras() {
return extras;
}
@Override
public int refCnt() {
if (extras != null) {
return extras.refCnt();
}
return 1;
}
@Override
public BinaryMemcacheMessage retain() {
if (extras != null) {
extras.retain();
}
return this;
}
@Override
public BinaryMemcacheMessage retain(int increment) {
if (extras != null) {
extras.retain(increment);
}
return this;
}
@Override
public boolean release() {
if (extras != null) {
return extras.release();
}
return false;
}
@Override
public boolean release(int decrement) {
if (extras != null) {
return extras.release(decrement);
}
return false;
}
}

View File

@ -0,0 +1,119 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
/**
* The default implementation of a {@link BinaryMemcacheMessageHeader}.
*/
public abstract class DefaultBinaryMemcacheMessageHeader implements BinaryMemcacheMessageHeader {
private byte magic;
private byte opcode;
private short keyLength;
private byte extrasLength;
private byte dataType;
private int totalBodyLength;
private int opaque;
private long cas;
@Override
public byte getMagic() {
return magic;
}
@Override
public BinaryMemcacheMessageHeader setMagic(byte magic) {
this.magic = magic;
return this;
}
@Override
public long getCAS() {
return cas;
}
@Override
public BinaryMemcacheMessageHeader setCAS(long cas) {
this.cas = cas;
return this;
}
@Override
public int getOpaque() {
return opaque;
}
@Override
public BinaryMemcacheMessageHeader setOpaque(int opaque) {
this.opaque = opaque;
return this;
}
@Override
public int getTotalBodyLength() {
return totalBodyLength;
}
@Override
public BinaryMemcacheMessageHeader setTotalBodyLength(int totalBodyLength) {
this.totalBodyLength = totalBodyLength;
return this;
}
@Override
public byte getDataType() {
return dataType;
}
@Override
public BinaryMemcacheMessageHeader setDataType(byte dataType) {
this.dataType = dataType;
return this;
}
@Override
public byte getExtrasLength() {
return extrasLength;
}
@Override
public BinaryMemcacheMessageHeader setExtrasLength(byte extrasLength) {
this.extrasLength = extrasLength;
return this;
}
@Override
public short getKeyLength() {
return keyLength;
}
@Override
public BinaryMemcacheMessageHeader setKeyLength(short keyLength) {
this.keyLength = keyLength;
return this;
}
@Override
public byte getOpcode() {
return opcode;
}
@Override
public BinaryMemcacheMessageHeader setOpcode(byte opcode) {
this.opcode = opcode;
return this;
}
}

View File

@ -0,0 +1,67 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
/**
* The default implementation of the {@link BinaryMemcacheRequest}.
*/
public class DefaultBinaryMemcacheRequest extends DefaultBinaryMemcacheMessage<BinaryMemcacheRequestHeader>
implements BinaryMemcacheRequest {
/**
* Create a new {@link DefaultBinaryMemcacheRequest} with the header only.
*
* @param header the header to use.
*/
public DefaultBinaryMemcacheRequest(BinaryMemcacheRequestHeader header) {
this(header, null, Unpooled.EMPTY_BUFFER);
}
/**
* Create a new {@link DefaultBinaryMemcacheRequest} with the header and key.
*
* @param header the header to use.
* @param key the key to use.
*/
public DefaultBinaryMemcacheRequest(BinaryMemcacheRequestHeader header, String key) {
this(header, key, Unpooled.EMPTY_BUFFER);
}
/**
* Create a new {@link DefaultBinaryMemcacheRequest} with the header and extras.
*
* @param header the header to use.
* @param extras the extras to use.
*/
public DefaultBinaryMemcacheRequest(BinaryMemcacheRequestHeader header, ByteBuf extras) {
this(header, null, extras);
}
/**
* Create a new {@link DefaultBinaryMemcacheRequest} with the header only.
*
* @param header the header to use.
* @param key the key to use.
* @param extras the extras to use.
*/
public DefaultBinaryMemcacheRequest(BinaryMemcacheRequestHeader header, String key, ByteBuf extras) {
super(header, key, extras);
}
}

View File

@ -0,0 +1,48 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
/**
* The default implementation of a {@link BinaryMemcacheRequestHeader}.
*/
public class DefaultBinaryMemcacheRequestHeader extends DefaultBinaryMemcacheMessageHeader
implements BinaryMemcacheRequestHeader {
/**
* Default magic byte for a request.
*/
public static final byte REQUEST_MAGIC_BYTE = (byte) 0x80;
private short reserved;
/**
* Create a new {@link BinaryMemcacheRequestHeader} and apply default values.
*/
public DefaultBinaryMemcacheRequestHeader() {
setMagic(REQUEST_MAGIC_BYTE);
}
@Override
public short getReserved() {
return reserved;
}
@Override
public BinaryMemcacheRequestHeader setReserved(short reserved) {
this.reserved = reserved;
return this;
}
}

View File

@ -0,0 +1,67 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
/**
* The default implementation of the {@link BinaryMemcacheResponse}.
*/
public class DefaultBinaryMemcacheResponse extends DefaultBinaryMemcacheMessage<BinaryMemcacheResponseHeader>
implements BinaryMemcacheResponse {
/**
* Create a new {@link DefaultBinaryMemcacheResponse} with the header only.
*
* @param header the header to use.
*/
public DefaultBinaryMemcacheResponse(BinaryMemcacheResponseHeader header) {
this(header, null, Unpooled.EMPTY_BUFFER);
}
/**
* Create a new {@link DefaultBinaryMemcacheResponse} with the header and key.
*
* @param header the header to use.
* @param key the key to use
*/
public DefaultBinaryMemcacheResponse(BinaryMemcacheResponseHeader header, String key) {
this(header, key, Unpooled.EMPTY_BUFFER);
}
/**
* Create a new {@link DefaultBinaryMemcacheResponse} with the header and extras.
*
* @param header the header to use.
* @param extras the extras to use.
*/
public DefaultBinaryMemcacheResponse(BinaryMemcacheResponseHeader header, ByteBuf extras) {
this(header, null, extras);
}
/**
* Create a new {@link DefaultBinaryMemcacheResponse} with the header, key and extras.
*
* @param header the header to use.
* @param key the key to use.
* @param extras the extras to use.
*/
public DefaultBinaryMemcacheResponse(BinaryMemcacheResponseHeader header, String key, ByteBuf extras) {
super(header, key, extras);
}
}

View File

@ -0,0 +1,48 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
/**
* The default implementation of a {@link BinaryMemcacheResponseHeader}.
*/
public class DefaultBinaryMemcacheResponseHeader extends DefaultBinaryMemcacheMessageHeader
implements BinaryMemcacheResponseHeader {
/**
* Default magic byte for a request.
*/
public static final byte RESPONSE_MAGIC_BYTE = (byte) 0x81;
private short status;
/**
* Create a new {@link BinaryMemcacheRequestHeader} and apply default values.
*/
public DefaultBinaryMemcacheResponseHeader() {
setMagic(RESPONSE_MAGIC_BYTE);
}
@Override
public short getStatus() {
return status;
}
@Override
public BinaryMemcacheResponseHeader setStatus(short status) {
this.status = status;
return this;
}
}

View File

@ -0,0 +1,100 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
/**
* The default implementation of a {@link FullBinaryMemcacheRequest}.
*/
public class DefaultFullBinaryMemcacheRequest extends DefaultBinaryMemcacheRequest
implements FullBinaryMemcacheRequest {
private final ByteBuf content;
/**
* Create a new {@link DefaultBinaryMemcacheRequest} with the header, key and extras.
*
* @param header the header to use.
* @param key the key to use.
* @param extras the extras to use.
*/
public DefaultFullBinaryMemcacheRequest(BinaryMemcacheRequestHeader header, String key, ByteBuf extras) {
this(header, key, extras, Unpooled.buffer(0));
}
/**
* Create a new {@link DefaultBinaryMemcacheRequest} with the header, key, extras and content.
*
* @param header the header to use.
* @param key the key to use.
* @param extras the extras to use.
* @param content the content of the full request.
*/
public DefaultFullBinaryMemcacheRequest(BinaryMemcacheRequestHeader header, String key, ByteBuf extras,
ByteBuf content) {
super(header, key, extras);
if (content == null) {
throw new NullPointerException("Supplied content is null.");
}
this.content = content;
}
@Override
public ByteBuf content() {
return content;
}
@Override
public int refCnt() {
return content.refCnt();
}
@Override
public FullBinaryMemcacheRequest retain() {
content.retain();
return this;
}
@Override
public FullBinaryMemcacheRequest retain(int increment) {
content.retain(increment);
return this;
}
@Override
public boolean release() {
return content.release();
}
@Override
public boolean release(int decrement) {
return content.release(decrement);
}
@Override
public FullBinaryMemcacheRequest copy() {
return new DefaultFullBinaryMemcacheRequest(getHeader(), getKey(), getExtras(), content().copy());
}
@Override
public FullBinaryMemcacheRequest duplicate() {
return new DefaultFullBinaryMemcacheRequest(getHeader(), getKey(), getExtras(), content().duplicate());
}
}

View File

@ -0,0 +1,100 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
/**
* The default implementation of a {@link FullBinaryMemcacheResponse}.
*/
public class DefaultFullBinaryMemcacheResponse extends DefaultBinaryMemcacheResponse
implements FullBinaryMemcacheResponse {
private final ByteBuf content;
/**
* Create a new {@link DefaultFullBinaryMemcacheResponse} with the header, key and extras.
*
* @param header the header to use.
* @param key the key to use.
* @param extras the extras to use.
*/
public DefaultFullBinaryMemcacheResponse(BinaryMemcacheResponseHeader header, String key, ByteBuf extras) {
this(header, key, extras, Unpooled.buffer(0));
}
/**
* Create a new {@link DefaultFullBinaryMemcacheResponse} with the header, key, extras and content.
*
* @param header the header to use.
* @param key the key to use.
* @param extras the extras to use.
* @param content the content of the full request.
*/
public DefaultFullBinaryMemcacheResponse(BinaryMemcacheResponseHeader header, String key, ByteBuf extras,
ByteBuf content) {
super(header, key, extras);
if (content == null) {
throw new NullPointerException("Supplied content is null.");
}
this.content = content;
}
@Override
public ByteBuf content() {
return content;
}
@Override
public int refCnt() {
return content.refCnt();
}
@Override
public FullBinaryMemcacheResponse retain() {
content.retain();
return this;
}
@Override
public FullBinaryMemcacheResponse retain(int increment) {
content.retain(increment);
return this;
}
@Override
public boolean release() {
return content.release();
}
@Override
public boolean release(int decrement) {
return content.release(decrement);
}
@Override
public FullBinaryMemcacheResponse copy() {
return new DefaultFullBinaryMemcacheResponse(getHeader(), getKey(), getExtras(), content().copy());
}
@Override
public FullBinaryMemcacheResponse duplicate() {
return new DefaultFullBinaryMemcacheResponse(getHeader(), getKey(), getExtras(), content().duplicate());
}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
import io.netty.handler.codec.memcache.FullMemcacheMessage;
/**
* A {@link BinaryMemcacheRequest} that also includes the content.
*/
public interface FullBinaryMemcacheRequest extends BinaryMemcacheRequest, FullMemcacheMessage {
@Override
FullBinaryMemcacheRequest copy();
@Override
FullBinaryMemcacheRequest retain(int increment);
@Override
FullBinaryMemcacheRequest retain();
@Override
FullBinaryMemcacheRequest duplicate();
}

View File

@ -0,0 +1,37 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
import io.netty.handler.codec.memcache.FullMemcacheMessage;
/**
* A {@link BinaryMemcacheResponse} that also includes the content.
*/
public interface FullBinaryMemcacheResponse extends BinaryMemcacheResponse, FullMemcacheMessage {
@Override
FullBinaryMemcacheResponse copy();
@Override
FullBinaryMemcacheResponse retain(int increment);
@Override
FullBinaryMemcacheResponse retain();
@Override
FullBinaryMemcacheResponse duplicate();
}

View File

@ -0,0 +1,20 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Implementations and Interfaces for the Memcache Binary protocol.
*/
package io.netty.handler.codec.memcache.binary;

View File

@ -0,0 +1,66 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary.util;
/**
* Represents all Opcodes that can occur in a {@link io.netty.handler.codec.memcache.binary.BinaryMemcacheMessage}.
* <p/>
* This class can be extended if a custom application needs to implement a superset of the normally supported
* operations by a vanilla memcached protocol.
*/
public final class BinaryMemcacheOpcodes {
private BinaryMemcacheOpcodes() {
// disallow construction
}
public static final byte GET = 0x00;
public static final byte SET = 0x01;
public static final byte ADD = 0x02;
public static final byte REPLACE = 0x03;
public static final byte DELETE = 0x04;
public static final byte INCREMENT = 0x05;
public static final byte DECREMENT = 0x06;
public static final byte QUIT = 0x07;
public static final byte FLUSH = 0x08;
public static final byte GETQ = 0x09;
public static final byte NOOP = 0x0a;
public static final byte VERSION = 0x0b;
public static final byte GETK = 0x0c;
public static final byte GETKQ = 0x0d;
public static final byte APPEND = 0x0e;
public static final byte PREPEND = 0x0f;
public static final byte STAT = 0x10;
public static final byte SETQ = 0x11;
public static final byte ADDQ = 0x12;
public static final byte REPLACEQ = 0x13;
public static final byte DELETEQ = 0x14;
public static final byte INCREMENTQ = 0x15;
public static final byte DECREMENTQ = 0x16;
public static final byte QUITQ = 0x17;
public static final byte FLUSHQ = 0x18;
public static final byte APPENDQ = 0x19;
public static final byte PREPENDQ = 0x1a;
public static final byte TOUCH = 0x1c;
public static final byte GAT = 0x1d;
public static final byte GATQ = 0x1e;
public static final byte GATK = 0x23;
public static final byte GATKQ = 0x24;
public static final byte SASL_LIST_MECHS = 0x20;
public static final byte SASL_AUTH = 0x21;
public static final byte SASL_STEP = 0x22;
}

View File

@ -0,0 +1,40 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary.util;
/**
* Contains all possible status values a
* {@link io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseHeader} can return.
*/
public final class BinaryMemcacheResponseStatus {
private BinaryMemcacheResponseStatus() {
// disallow construction
}
public static final short SUCCESS = 0x00;
public static final short KEY_ENOENT = 0x01;
public static final short KEY_EEXISTS = 0x02;
public static final short E2BIG = 0x03;
public static final short EINVA = 0x04;
public static final short NOT_STORED = 0x05;
public static final short DELTA_BADVAL = 0x06;
public static final short AUTH_ERROR = 0x20;
public static final short AUTH_CONTINUE = 0x21;
public static final short UNKNOWN_COMMAND = 0x81;
public static final short ENOMEM = 0x82;
}

View File

@ -0,0 +1,20 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Utility helpers for the binary protocol.
*/
package io.netty.handler.codec.memcache.binary.util;

View File

@ -0,0 +1,20 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Common superset of ascii and binary classes.
*/
package io.netty.handler.codec.memcache;

View File

@ -0,0 +1,181 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.memcache.LastMemcacheContent;
import io.netty.handler.codec.memcache.MemcacheContent;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsNull.notNullValue;
import static org.hamcrest.core.IsNull.nullValue;
/**
* Verifies the correct functionality of the {@link BinaryMemcacheDecoder}.
* <p/>
* While technically there are both a {@link BinaryMemcacheRequestDecoder} and a {@link BinaryMemcacheResponseDecoder}
* they implement the same basics and just differ in the type of headers returned.
*/
public class BinaryMemcacheDecoderTest {
/**
* Represents a GET request header with a key size of three.
*/
private static final byte[] GET_REQUEST = new byte[]{
(byte) 0x80, 0x00, 0x00, 0x03,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x03,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x66, 0x6f, 0x6f
};
private static final byte[] SET_REQUEST_WITH_CONTENT = new byte[]{
(byte) 0x80, 0x01, 0x00, 0x03,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x0B,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x66, 0x6f, 0x6f,
0x01, 0x02, 0x03, 0x04,
0x05, 0x06, 0x07, 0x08
};
private EmbeddedChannel channel;
@Before
public void setup() throws Exception {
channel = new EmbeddedChannel(new BinaryMemcacheRequestDecoder());
}
/**
* This tests a simple GET request with a key as the value.
*/
@Test
public void shouldDecodeRequestWithSimpleValue() {
ByteBuf incoming = Unpooled.buffer();
incoming.writeBytes(GET_REQUEST);
channel.writeInbound(incoming);
BinaryMemcacheRequest request = (BinaryMemcacheRequest) channel.readInbound();
assertThat(request, notNullValue());
assertThat(request.getHeader(), notNullValue());
assertThat(request.getKey(), notNullValue());
assertThat(request.getExtras(), nullValue());
BinaryMemcacheRequestHeader header = request.getHeader();
assertThat(header.getKeyLength(), is((short) 3));
assertThat(header.getExtrasLength(), is((byte) 0));
assertThat(header.getTotalBodyLength(), is(3));
request.release();
assertThat(channel.readInbound(), instanceOf(LastMemcacheContent.class));
}
/**
* This test makes sure that large content is emitted in chunks.
*/
@Test
public void shouldDecodeRequestWithChunkedContent() {
int smallBatchSize = 2;
channel = new EmbeddedChannel(new BinaryMemcacheRequestDecoder(smallBatchSize));
ByteBuf incoming = Unpooled.buffer();
incoming.writeBytes(SET_REQUEST_WITH_CONTENT);
channel.writeInbound(incoming);
BinaryMemcacheRequest request = (BinaryMemcacheRequest) channel.readInbound();
assertThat(request, notNullValue());
assertThat(request.getHeader(), notNullValue());
assertThat(request.getKey(), notNullValue());
assertThat(request.getExtras(), nullValue());
BinaryMemcacheRequestHeader header = request.getHeader();
assertThat(header.getKeyLength(), is((short) 3));
assertThat(header.getExtrasLength(), is((byte) 0));
assertThat(header.getTotalBodyLength(), is(11));
int expectedContentChunks = 4;
for (int i = 1; i <= expectedContentChunks; i++) {
MemcacheContent content = (MemcacheContent) channel.readInbound();
if (i < expectedContentChunks) {
assertThat(content, instanceOf(MemcacheContent.class));
} else {
assertThat(content, instanceOf(LastMemcacheContent.class));
}
assertThat(content.content().readableBytes(), is(2));
}
assertThat(channel.readInbound(), nullValue());
}
/**
* This test makes sure that even when the decoder is confronted with various chunk
* sizes in the middle of decoding, it can recover and decode all the time eventually.
*/
@Test
public void shouldHandleNonUniformNetworkBatches() {
ByteBuf incoming = Unpooled.copiedBuffer(SET_REQUEST_WITH_CONTENT);
while (incoming.isReadable()) {
channel.writeInbound(incoming.readBytes(5));
}
BinaryMemcacheRequest request = (BinaryMemcacheRequest) channel.readInbound();
assertThat(request, notNullValue());
assertThat(request.getHeader(), notNullValue());
assertThat(request.getKey(), notNullValue());
assertThat(request.getExtras(), nullValue());
MemcacheContent content1 = (MemcacheContent) channel.readInbound();
MemcacheContent content2 = (MemcacheContent) channel.readInbound();
assertThat(content1, instanceOf(MemcacheContent.class));
assertThat(content2, instanceOf(LastMemcacheContent.class));
assertThat(content1.content().readableBytes(), is(3));
assertThat(content2.content().readableBytes(), is(5));
}
/**
* This test makes sure that even when more requests arrive in the same batch, they
* get emitted as separate messages.
*/
@Test
public void shouldHandleTwoMessagesInOneBatch() {
channel.writeInbound(Unpooled.buffer().writeBytes(GET_REQUEST).writeBytes(GET_REQUEST));
BinaryMemcacheRequest request = (BinaryMemcacheRequest) channel.readInbound();
assertThat(request, instanceOf(BinaryMemcacheRequest.class));
assertThat(request, notNullValue());
assertThat(channel.readInbound(), instanceOf(LastMemcacheContent.class));
request = (BinaryMemcacheRequest) channel.readInbound();
assertThat(request, instanceOf(BinaryMemcacheRequest.class));
assertThat(request, notNullValue());
assertThat(channel.readInbound(), instanceOf(LastMemcacheContent.class));
}
}

View File

@ -0,0 +1,154 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.memcache.DefaultLastMemcacheContent;
import io.netty.handler.codec.memcache.DefaultMemcacheContent;
import io.netty.handler.codec.memcache.binary.util.BinaryMemcacheOpcodes;
import io.netty.util.CharsetUtil;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertThat;
/**
* Verifies the correct functionality of the {@link BinaryMemcacheEncoder}.
*/
public class BinaryMemcacheEncoderTest {
public static int DEFAULT_HEADER_SIZE = 24;
private EmbeddedChannel channel;
@Before
public void setup() throws Exception {
channel = new EmbeddedChannel(new BinaryMemcacheRequestEncoder());
}
@Test
public void shouldEncodeDefaultHeader() {
BinaryMemcacheRequestHeader header = new DefaultBinaryMemcacheRequestHeader();
BinaryMemcacheRequest request = new DefaultBinaryMemcacheRequest(header);
boolean result = channel.writeOutbound(request);
assertThat(result, is(true));
ByteBuf written = (ByteBuf) channel.readOutbound();
assertThat(written.readableBytes(), is(DEFAULT_HEADER_SIZE));
assertThat(written.readByte(), is((byte) 0x80));
assertThat(written.readByte(), is((byte) 0x00));
}
@Test
public void shouldEncodeCustomHeader() {
BinaryMemcacheRequestHeader header = new DefaultBinaryMemcacheRequestHeader();
header.setMagic((byte) 0xAA);
header.setOpcode(BinaryMemcacheOpcodes.GET);
BinaryMemcacheRequest request = new DefaultBinaryMemcacheRequest(header);
boolean result = channel.writeOutbound(request);
assertThat(result, is(true));
ByteBuf written = (ByteBuf) channel.readOutbound();
assertThat(written.readableBytes(), is(DEFAULT_HEADER_SIZE));
assertThat(written.readByte(), is((byte) 0xAA));
assertThat(written.readByte(), is(BinaryMemcacheOpcodes.GET));
}
@Test
public void shouldEncodeExtras() {
String extrasContent = "netty<3memcache";
ByteBuf extras = Unpooled.copiedBuffer(extrasContent, CharsetUtil.UTF_8);
int extrasLength = extras.readableBytes();
BinaryMemcacheRequestHeader header = new DefaultBinaryMemcacheRequestHeader();
header.setExtrasLength((byte) extrasLength);
BinaryMemcacheRequest request = new DefaultBinaryMemcacheRequest(header, extras);
boolean result = channel.writeOutbound(request);
assertThat(result, is(true));
ByteBuf written = (ByteBuf) channel.readOutbound();
assertThat(written.readableBytes(), is(DEFAULT_HEADER_SIZE + extrasLength));
written.readBytes(DEFAULT_HEADER_SIZE);
assertThat(written.readBytes(extrasLength).toString(CharsetUtil.UTF_8), equalTo(extrasContent));
}
@Test
public void shouldEncodeKey() {
String key = "netty";
int keyLength = key.length();
BinaryMemcacheRequestHeader header = new DefaultBinaryMemcacheRequestHeader();
header.setKeyLength((byte) keyLength);
BinaryMemcacheRequest request = new DefaultBinaryMemcacheRequest(header, key);
boolean result = channel.writeOutbound(request);
assertThat(result, is(true));
ByteBuf written = (ByteBuf) channel.readOutbound();
assertThat(written.readableBytes(), is(DEFAULT_HEADER_SIZE + keyLength));
written.readBytes(DEFAULT_HEADER_SIZE);
assertThat(written.readBytes(keyLength).toString(CharsetUtil.UTF_8), equalTo(key));
}
@Test
public void shouldEncodeContent() {
DefaultMemcacheContent content1 =
new DefaultMemcacheContent(Unpooled.copiedBuffer("Netty", CharsetUtil.UTF_8));
DefaultLastMemcacheContent content2 =
new DefaultLastMemcacheContent(Unpooled.copiedBuffer(" Rocks!", CharsetUtil.UTF_8));
int totalBodyLength = content1.content().readableBytes() + content2.content().readableBytes();
BinaryMemcacheRequestHeader header = new DefaultBinaryMemcacheRequestHeader();
header.setTotalBodyLength(totalBodyLength);
BinaryMemcacheRequest request = new DefaultBinaryMemcacheRequest(header);
boolean result = channel.writeOutbound(request);
assertThat(result, is(true));
result = channel.writeOutbound(content1);
assertThat(result, is(true));
result = channel.writeOutbound(content2);
assertThat(result, is(true));
ByteBuf written = (ByteBuf) channel.readOutbound();
assertThat(written.readableBytes(), is(DEFAULT_HEADER_SIZE));
written = (ByteBuf) channel.readOutbound();
assertThat(written.readableBytes(), is(content1.content().readableBytes()));
assertThat(
written.readBytes(content1.content().readableBytes()).toString(CharsetUtil.UTF_8),
is("Netty")
);
written = (ByteBuf) channel.readOutbound();
assertThat(written.readableBytes(), is(content2.content().readableBytes()));
assertThat(
written.readBytes(content2.content().readableBytes()).toString(CharsetUtil.UTF_8),
is(" Rocks!")
);
}
@Test(expected = EncoderException.class)
public void shouldFailWithoutLastContent() {
channel.writeOutbound(new DefaultMemcacheContent(Unpooled.EMPTY_BUFFER));
channel.writeOutbound(
new DefaultBinaryMemcacheRequest(new DefaultBinaryMemcacheRequestHeader()));
}
}

View File

@ -0,0 +1,83 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.memcache.binary;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.memcache.LastMemcacheContent;
import io.netty.handler.codec.memcache.MemcacheContent;
import io.netty.util.CharsetUtil;
import org.hamcrest.CoreMatchers;
import org.junit.Before;
import org.junit.Test;
import java.nio.charset.Charset;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsNull.notNullValue;
import static org.hamcrest.core.IsNull.nullValue;
/**
* Verifies the correct functionality of the {@link BinaryMemcacheObjectAggregator}.
*/
public class BinaryMemcacheObjectAggregatorTest {
private static final byte[] SET_REQUEST_WITH_CONTENT = new byte[]{
(byte) 0x80, 0x01, 0x00, 0x03,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x0B,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x66, 0x6f, 0x6f,
0x01, 0x02, 0x03, 0x04,
0x05, 0x06, 0x07, 0x08
};
public static final int MAX_CONTENT_SIZE = 2 << 10;
private EmbeddedChannel channel;
@Test
public void shouldAggregateChunksOnDecode() {
int smallBatchSize = 2;
channel = new EmbeddedChannel(
new BinaryMemcacheRequestDecoder(smallBatchSize),
new BinaryMemcacheObjectAggregator(MAX_CONTENT_SIZE));
ByteBuf incoming = Unpooled.buffer();
incoming.writeBytes(SET_REQUEST_WITH_CONTENT);
channel.writeInbound(incoming);
FullBinaryMemcacheRequest request = (FullBinaryMemcacheRequest) channel.readInbound();
assertThat(request, instanceOf(FullBinaryMemcacheRequest.class));
assertThat(request, notNullValue());
assertThat(request.getHeader(), notNullValue());
assertThat(request.getKey(), notNullValue());
assertThat(request.getExtras(), nullValue());
assertThat(request.content().readableBytes(), is(8));
assertThat(request.content().readByte(), is((byte) 0x01));
assertThat(request.content().readByte(), is((byte) 0x02));
assertThat(channel.readInbound(), nullValue());
}
}

View File

@ -89,6 +89,7 @@
<module>buffer</module> <module>buffer</module>
<module>codec</module> <module>codec</module>
<module>codec-http</module> <module>codec-http</module>
<module>codec-memcache</module>
<module>codec-socks</module> <module>codec-socks</module>
<module>transport</module> <module>transport</module>
<module>transport-rxtx</module> <module>transport-rxtx</module>