Introduce MessageAggregator and DecoderResultProvider

Motivation:

We have different message aggregator implementations for different
protocols, but they are very similar with each other.  They all stems
from HttpObjectAggregator.  If we provide an abstract class that provide
generic message aggregation functionality, we will remove their code
duplication.

Modifications:

- Add MessageAggregator which provides generic message aggregation
- Reimplement all existing aggregators using MessageAggregator
- Add DecoderResultProvider interface and extend it wherever possible so
  that MessageAggregator respects the state of the decoded message

Result:

Less code duplication
This commit is contained in:
Trustin Lee 2014-06-04 18:34:57 +09:00
parent a8143eda27
commit 8b0a0f9a8f
28 changed files with 749 additions and 657 deletions

View File

@ -70,10 +70,15 @@ final class ComposedLastHttpContent implements LastHttpContent {
}
@Override
public DecoderResult getDecoderResult() {
public DecoderResult decoderResult() {
return result;
}
@Override
public DecoderResult getDecoderResult() {
return decoderResult();
}
@Override
public void setDecoderResult(DecoderResult result) {
this.result = result;

View File

@ -92,6 +92,6 @@ public class DefaultHttpContent extends DefaultHttpObject implements HttpContent
@Override
public String toString() {
return StringUtil.simpleClassName(this) +
"(data: " + content() + ", decoderResult: " + getDecoderResult() + ')';
"(data: " + content() + ", decoderResult: " + decoderResult() + ')';
}
}

View File

@ -26,10 +26,15 @@ public class DefaultHttpObject implements HttpObject {
}
@Override
public DecoderResult getDecoderResult() {
public DecoderResult decoderResult() {
return decoderResult;
}
@Override
public DecoderResult getDecoderResult() {
return decoderResult();
}
@Override
public void setDecoderResult(DecoderResult decoderResult) {
if (decoderResult == null) {

View File

@ -95,7 +95,7 @@ public class DefaultHttpRequest extends DefaultHttpMessage implements HttpReques
StringBuilder buf = new StringBuilder();
buf.append(StringUtil.simpleClassName(this));
buf.append("(decodeResult: ");
buf.append(getDecoderResult());
buf.append(decoderResult());
buf.append(')');
buf.append(StringUtil.NEWLINE);
buf.append(getMethod());

View File

@ -74,12 +74,12 @@ public class DefaultHttpResponse extends DefaultHttpMessage implements HttpRespo
StringBuilder buf = new StringBuilder();
buf.append(StringUtil.simpleClassName(this));
buf.append("(decodeResult: ");
buf.append(getDecoderResult());
buf.append(decoderResult());
buf.append(')');
buf.append(StringUtil.NEWLINE);
buf.append(getProtocolVersion().text());
buf.append(' ');
buf.append(getStatus().toString());
buf.append(getStatus());
buf.append(StringUtil.NEWLINE);
appendHeaders(buf);

View File

@ -35,7 +35,7 @@ import static io.netty.handler.codec.http.HttpConstants.*;
*/
public abstract class HttpHeaders implements Iterable<Map.Entry<String, String>> {
private static final byte[] HEADER_SEPERATOR = { HttpConstants.COLON, HttpConstants.SP };
private static final byte[] HEADER_SEPERATOR = { COLON, SP };
private static final byte[] CRLF = { CR, LF };
private static final CharSequence CONTENT_LENGTH_ENTITY = newEntity(Names.CONTENT_LENGTH);
private static final CharSequence CONNECTION_ENTITY = newEntity(Names.CONNECTION);
@ -785,7 +785,7 @@ public abstract class HttpHeaders implements Iterable<Map.Entry<String, String>>
try {
return Integer.parseInt(value);
} catch (NumberFormatException e) {
} catch (NumberFormatException ignored) {
return defaultValue;
}
}
@ -882,7 +882,7 @@ public abstract class HttpHeaders implements Iterable<Map.Entry<String, String>>
try {
return HttpHeaderDateFormat.get().parse(value);
} catch (ParseException e) {
} catch (ParseException ignored) {
return defaultValue;
}
}
@ -985,7 +985,7 @@ public abstract class HttpHeaders implements Iterable<Map.Entry<String, String>>
if (contentLength != null) {
try {
return Long.parseLong(contentLength);
} catch (NumberFormatException e) {
} catch (NumberFormatException ignored) {
return defaultValue;
}
}
@ -1584,7 +1584,12 @@ public abstract class HttpHeaders implements Iterable<Map.Entry<String, String>>
if (headers == null) {
throw new NullPointerException("headers");
}
clear();
if (headers.isEmpty()) {
return this;
}
for (Map.Entry<String, String> e: headers) {
add(e.getKey(), e.getValue());
}

View File

@ -16,16 +16,12 @@
package io.netty.handler.codec.http;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.DecoderResultProvider;
public interface HttpObject {
public interface HttpObject extends DecoderResultProvider {
/**
* Returns the result of decoding this message.
* @deprecated Use {@link #decoderResult()} instead.
*/
@Deprecated
DecoderResult getDecoderResult();
/**
* Updates the result of decoding this message. This method is supposed to be invoked by {@link HttpObjectDecoder}.
* Do not call this method unless you know what you are doing.
*/
void setDecoderResult(DecoderResult result);
}

View File

@ -15,24 +15,18 @@
*/
package io.netty.handler.codec.http;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageAggregator;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.List;
import static io.netty.handler.codec.http.HttpHeaders.*;
/**
* A {@link ChannelHandler} that aggregates an {@link HttpMessage}
* and its following {@link HttpContent}s into a single {@link FullHttpRequest}
@ -52,13 +46,8 @@ import static io.netty.handler.codec.http.HttpHeaders.*;
* Be aware that you need to have the {@link HttpResponseEncoder} or {@link HttpRequestEncoder}
* before the {@link HttpObjectAggregator} in the {@link ChannelPipeline}.
*/
public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
/**
* @deprecated Will be removed in the next minor version bump.
*/
@Deprecated
public static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024; // TODO: Make it private in the next bump.
public class HttpObjectAggregator
extends MessageAggregator<HttpObject, HttpMessage, HttpContent, FullHttpMessage> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(HttpObjectAggregator.class);
@ -68,16 +57,9 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
HttpVersion.HTTP_1_1, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, Unpooled.EMPTY_BUFFER);
static {
TOO_LARGE.headers().set(Names.CONTENT_LENGTH, 0);
TOO_LARGE.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 0);
}
private final int maxContentLength;
private FullHttpMessage currentMessage;
private boolean handlingOversizedMessage;
private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS;
private ChannelHandlerContext ctx;
/**
* Creates a new instance.
*
@ -88,195 +70,90 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
* will be called.
*/
public HttpObjectAggregator(int maxContentLength) {
if (maxContentLength <= 0) {
throw new IllegalArgumentException(
"maxContentLength must be a positive integer: " +
maxContentLength);
}
this.maxContentLength = maxContentLength;
super(maxContentLength);
}
/**
* Returns the maximum number of components in the cumulation buffer. If the number of
* the components in the cumulation buffer exceeds this value, the components of the
* cumulation buffer are consolidated into a single component, involving memory copies.
* The default value of this property is {@value #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}.
*/
public final int getMaxCumulationBufferComponents() {
return maxCumulationBufferComponents;
@Override
protected boolean isStartMessage(HttpObject msg) throws Exception {
return msg instanceof HttpMessage;
}
/**
* Sets the maximum number of components in the cumulation buffer. If the number of
* the components in the cumulation buffer exceeds this value, the components of the
* cumulation buffer are consolidated into a single component, involving memory copies.
* The default value of this property is {@value #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}
* and its minimum allowed value is {@code 2}.
*/
public final void setMaxCumulationBufferComponents(int maxCumulationBufferComponents) {
if (maxCumulationBufferComponents < 2) {
throw new IllegalArgumentException(
"maxCumulationBufferComponents: " + maxCumulationBufferComponents +
" (expected: >= 2)");
}
@Override
protected boolean isContentMessage(HttpObject msg) throws Exception {
return msg instanceof HttpContent;
}
if (ctx == null) {
this.maxCumulationBufferComponents = maxCumulationBufferComponents;
@Override
protected boolean isLastContentMessage(HttpContent msg) throws Exception {
return msg instanceof LastHttpContent;
}
@Override
protected boolean isAggregated(HttpObject msg) throws Exception {
return msg instanceof FullHttpMessage;
}
@Override
protected boolean hasContentLength(HttpMessage start) throws Exception {
return HttpHeaders.isContentLengthSet(start);
}
@Override
protected long contentLength(HttpMessage start) throws Exception {
return HttpHeaders.getContentLength(start);
}
@Override
protected Object newContinueResponse(HttpMessage start) throws Exception {
if (HttpHeaders.is100ContinueExpected(start)) {
return CONTINUE;
} else {
throw new IllegalStateException(
"decoder properties cannot be changed once the decoder is added to a pipeline.");
return null;
}
}
@Override
protected void decode(final ChannelHandlerContext ctx, HttpObject msg, List<Object> out) throws Exception {
FullHttpMessage currentMessage = this.currentMessage;
protected FullHttpMessage beginAggregation(HttpMessage start, ByteBuf content) throws Exception {
assert !(start instanceof FullHttpMessage);
if (msg instanceof HttpMessage) {
handlingOversizedMessage = false;
assert currentMessage == null;
HttpHeaders.removeTransferEncodingChunked(start);
HttpMessage m = (HttpMessage) msg;
// if content length is set, preemptively close if it's too large
if (isContentLengthSet(m)) {
if (getContentLength(m) > maxContentLength) {
// handle oversized message
invokeHandleOversizedMessage(ctx, m);
return;
}
}
// Handle the 'Expect: 100-continue' header if necessary.
if (is100ContinueExpected(m)) {
ctx.writeAndFlush(CONTINUE).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
ctx.fireExceptionCaught(future.cause());
}
}
});
}
if (!m.getDecoderResult().isSuccess()) {
removeTransferEncodingChunked(m);
out.add(toFullMessage(m));
this.currentMessage = null;
return;
}
if (msg instanceof HttpRequest) {
HttpRequest header = (HttpRequest) msg;
this.currentMessage = currentMessage = new DefaultFullHttpRequest(header.getProtocolVersion(),
header.getMethod(), header.getUri(), Unpooled.compositeBuffer(maxCumulationBufferComponents));
} else if (msg instanceof HttpResponse) {
HttpResponse header = (HttpResponse) msg;
this.currentMessage = currentMessage = new DefaultFullHttpResponse(
header.getProtocolVersion(), header.getStatus(),
Unpooled.compositeBuffer(maxCumulationBufferComponents));
} else {
throw new Error();
}
currentMessage.headers().set(m.headers());
// A streamed message - initialize the cumulative buffer, and wait for incoming chunks.
removeTransferEncodingChunked(currentMessage);
} else if (msg instanceof HttpContent) {
if (handlingOversizedMessage) {
if (msg instanceof LastHttpContent) {
this.currentMessage = null;
}
// already detect the too long frame so just discard the content
return;
}
assert currentMessage != null;
// Merge the received chunk into the content of the current message.
HttpContent chunk = (HttpContent) msg;
CompositeByteBuf content = (CompositeByteBuf) currentMessage.content();
if (content.readableBytes() > maxContentLength - chunk.content().readableBytes()) {
// handle oversized message
invokeHandleOversizedMessage(ctx, currentMessage);
return;
}
// Append the content of the chunk
if (chunk.content().isReadable()) {
chunk.retain();
content.addComponent(chunk.content());
content.writerIndex(content.writerIndex() + chunk.content().readableBytes());
}
final boolean last;
if (!chunk.getDecoderResult().isSuccess()) {
currentMessage.setDecoderResult(
DecoderResult.failure(chunk.getDecoderResult().cause()));
last = true;
} else {
last = chunk instanceof LastHttpContent;
}
if (last) {
// Merge trailing headers into the message.
if (chunk instanceof LastHttpContent) {
LastHttpContent trailer = (LastHttpContent) chunk;
currentMessage.headers().add(trailer.trailingHeaders());
}
// Set the 'Content-Length' header.
currentMessage.headers().set(
HttpHeaders.Names.CONTENT_LENGTH,
String.valueOf(content.readableBytes()));
// All done
out.add(currentMessage);
this.currentMessage = null;
}
FullHttpMessage ret;
if (start instanceof HttpRequest) {
HttpRequest req = (HttpRequest) start;
ret = new DefaultFullHttpRequest(req.getProtocolVersion(),
req.getMethod(), req.getUri(), content);
} else if (start instanceof HttpResponse) {
HttpResponse res = (HttpResponse) start;
ret = new DefaultFullHttpResponse(
res.getProtocolVersion(), res.getStatus(), content);
} else {
throw new Error();
}
ret.headers().set(start.headers());
return ret;
}
private void invokeHandleOversizedMessage(ChannelHandlerContext ctx, HttpMessage msg) throws Exception {
handlingOversizedMessage = true;
currentMessage = null;
try {
handleOversizedMessage(ctx, msg);
} finally {
// Release the message in case it is a full one.
ReferenceCountUtil.release(msg);
if (msg instanceof HttpRequest) {
// If an oversized request was handled properly and the connection is still alive
// (i.e. rejected 100-continue). the decoder should prepare to handle a new message.
HttpObjectDecoder decoder = ctx.pipeline().get(HttpObjectDecoder.class);
if (decoder != null) {
decoder.reset();
}
}
@Override
protected void aggregate(FullHttpMessage aggregated, HttpContent content) throws Exception {
if (content instanceof LastHttpContent) {
// Merge trailing headers into the message.
aggregated.headers().add(((LastHttpContent) content).trailingHeaders());
}
}
/**
* Invoked when an incoming request exceeds the maximum content length.
*
* The default behavior is:
* <ul>
* <li>Oversized request: Send a {@link HttpResponseStatus#REQUEST_ENTITY_TOO_LARGE} and close the connection
* if keep-alive is not enabled.</li>
* <li>Oversized response: Close the connection and raise {@link TooLongFrameException}.</li>
* </ul>
* Sub-classes may override this method to change the default behavior. The specified {@code msg} is released
* once this method returns.
*
* @param ctx the {@link ChannelHandlerContext}
* @param msg the accumulated HTTP message up to this point
*/
@SuppressWarnings("UnusedParameters")
protected void handleOversizedMessage(final ChannelHandlerContext ctx, HttpMessage msg) throws Exception {
if (msg instanceof HttpRequest) {
@Override
protected void finishAggregation(FullHttpMessage aggregated) throws Exception {
// Set the 'Content-Length' header.
aggregated.headers().set(
HttpHeaders.Names.CONTENT_LENGTH,
String.valueOf(aggregated.content().readableBytes()));
}
@Override
protected void handleOversizedMessage(final ChannelHandlerContext ctx, HttpMessage oversized) throws Exception {
if (oversized instanceof HttpRequest) {
// send back a 413 and close the connection
ChannelFuture future = ctx.writeAndFlush(TOO_LARGE).addListener(new ChannelFutureListener() {
@Override
@ -291,63 +168,22 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
// If the client started to send data already, close because it's impossible to recover.
// If 'Expect: 100-continue' is missing, close becuase it's impossible to recover.
// If keep-alive is off, no need to leave the connection open.
if (msg instanceof FullHttpMessage || !is100ContinueExpected(msg) || !isKeepAlive(msg)) {
if (oversized instanceof FullHttpMessage ||
!HttpHeaders.is100ContinueExpected(oversized) || !HttpHeaders.isKeepAlive(oversized)) {
future.addListener(ChannelFutureListener.CLOSE);
}
} else if (msg instanceof HttpResponse) {
// If an oversized request was handled properly and the connection is still alive
// (i.e. rejected 100-continue). the decoder should prepare to handle a new message.
HttpObjectDecoder decoder = ctx.pipeline().get(HttpObjectDecoder.class);
if (decoder != null) {
decoder.reset();
}
} else if (oversized instanceof HttpResponse) {
ctx.close();
throw new TooLongFrameException("Response entity too large: " + msg);
throw new TooLongFrameException("Response entity too large: " + oversized);
} else {
throw new IllegalStateException();
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// release current message if it is not null as it may be a left-over
if (currentMessage != null) {
currentMessage.release();
currentMessage = null;
}
super.channelInactive(ctx);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
// release current message if it is not null as it may be a left-over as there is not much more we can do in
// this case
if (currentMessage != null) {
currentMessage.release();
currentMessage = null;
}
}
private static FullHttpMessage toFullMessage(HttpMessage msg) {
if (msg instanceof FullHttpMessage) {
return ((FullHttpMessage) msg).retain();
}
FullHttpMessage fullMsg;
if (msg instanceof HttpRequest) {
HttpRequest req = (HttpRequest) msg;
fullMsg = new DefaultFullHttpRequest(
req.getProtocolVersion(), req.getMethod(), req.getUri(), Unpooled.EMPTY_BUFFER, false);
} else if (msg instanceof HttpResponse) {
HttpResponse res = (HttpResponse) msg;
fullMsg = new DefaultFullHttpResponse(
res.getProtocolVersion(), res.getStatus(), Unpooled.EMPTY_BUFFER, false);
} else {
throw new IllegalStateException();
}
return fullMsg;
}
}

View File

@ -50,10 +50,15 @@ public interface LastHttpContent extends HttpContent {
}
@Override
public DecoderResult getDecoderResult() {
public DecoderResult decoderResult() {
return DecoderResult.SUCCESS;
}
@Override
public DecoderResult getDecoderResult() {
return decoderResult();
}
@Override
public void setDecoderResult(DecoderResult result) {
throw new UnsupportedOperationException("read only");

View File

@ -1149,6 +1149,12 @@ public class HttpPostRequestEncoder implements ChunkedInput<HttpContent> {
}
@Override
public DecoderResult decoderResult() {
return request.decoderResult();
}
@Override
@Deprecated
public DecoderResult getDecoderResult() {
return request.getDecoderResult();
}

View File

@ -16,110 +16,78 @@
package io.netty.handler.codec.http.websocketx;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageAggregator;
import io.netty.handler.codec.TooLongFrameException;
import java.util.List;
/**
* Handler that aggregate fragmented WebSocketFrame's.
*
* Be aware if PING/PONG/CLOSE frames are send in the middle of a fragmented {@link WebSocketFrame} they will
* just get forwarded to the next handler in the pipeline.
*/
public class WebSocketFrameAggregator extends MessageToMessageDecoder<WebSocketFrame> {
private final int maxFrameSize;
private WebSocketFrame currentFrame;
private boolean tooLongFrameFound;
public class WebSocketFrameAggregator
extends MessageAggregator<WebSocketFrame, WebSocketFrame, ContinuationWebSocketFrame, WebSocketFrame> {
/**
* Construct a new instance
* Creates a new instance
*
* @param maxFrameSize If the size of the aggregated frame exceeds this value,
* a {@link TooLongFrameException} is thrown.
* @param maxContentLength If the size of the aggregated frame exceeds this value,
* a {@link TooLongFrameException} is thrown.
*/
public WebSocketFrameAggregator(int maxFrameSize) {
if (maxFrameSize < 1) {
throw new IllegalArgumentException("maxFrameSize must be > 0");
}
this.maxFrameSize = maxFrameSize;
public WebSocketFrameAggregator(int maxContentLength) {
super(maxContentLength);
}
@Override
protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {
if (currentFrame == null) {
tooLongFrameFound = false;
if (msg.isFinalFragment()) {
out.add(msg.retain());
return;
}
ByteBuf buf = ctx.alloc().compositeBuffer().addComponent(msg.content().retain());
buf.writerIndex(buf.writerIndex() + msg.content().readableBytes());
if (msg instanceof TextWebSocketFrame) {
currentFrame = new TextWebSocketFrame(true, msg.rsv(), buf);
} else if (msg instanceof BinaryWebSocketFrame) {
currentFrame = new BinaryWebSocketFrame(true, msg.rsv(), buf);
} else {
buf.release();
throw new IllegalStateException(
"WebSocket frame was not of type TextWebSocketFrame or BinaryWebSocketFrame");
}
return;
}
if (msg instanceof ContinuationWebSocketFrame) {
if (tooLongFrameFound) {
if (msg.isFinalFragment()) {
currentFrame = null;
}
return;
}
CompositeByteBuf content = (CompositeByteBuf) currentFrame.content();
if (content.readableBytes() > maxFrameSize - msg.content().readableBytes()) {
// release the current frame
currentFrame.release();
tooLongFrameFound = true;
throw new TooLongFrameException(
"WebSocketFrame length exceeded " + content +
" bytes.");
}
content.addComponent(msg.content().retain());
content.writerIndex(content.writerIndex() + msg.content().readableBytes());
if (msg.isFinalFragment()) {
WebSocketFrame currentFrame = this.currentFrame;
this.currentFrame = null;
out.add(currentFrame);
return;
} else {
return;
}
}
// It is possible to receive CLOSE/PING/PONG frames during fragmented frames so just pass them to the next
// handler in the chain
out.add(msg.retain());
protected boolean isStartMessage(WebSocketFrame msg) throws Exception {
return msg instanceof TextWebSocketFrame || msg instanceof BinaryWebSocketFrame;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
// release current frame if it is not null as it may be a left-over
if (currentFrame != null) {
currentFrame.release();
currentFrame = null;
}
protected boolean isContentMessage(WebSocketFrame msg) throws Exception {
return msg instanceof ContinuationWebSocketFrame;
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
// release current frame if it is not null as it may be a left-over as there is not much more we can do in
// this case
if (currentFrame != null) {
currentFrame.release();
currentFrame = null;
protected boolean isLastContentMessage(ContinuationWebSocketFrame msg) throws Exception {
return isContentMessage(msg) && msg.isFinalFragment();
}
@Override
protected boolean isAggregated(WebSocketFrame msg) throws Exception {
if (msg.isFinalFragment()) {
return !isContentMessage(msg);
}
return !isStartMessage(msg) && !isContentMessage(msg);
}
@Override
protected boolean hasContentLength(WebSocketFrame start) throws Exception {
return false;
}
@Override
protected long contentLength(WebSocketFrame start) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected Object newContinueResponse(WebSocketFrame start) throws Exception {
return null;
}
@Override
protected WebSocketFrame beginAggregation(WebSocketFrame start, ByteBuf content) throws Exception {
if (start instanceof TextWebSocketFrame) {
return new TextWebSocketFrame(true, start.rsv(), content);
}
if (start instanceof BinaryWebSocketFrame) {
return new BinaryWebSocketFrame(true, start.rsv(), content);
}
// Should not reach here.
throw new Error();
}
}

View File

@ -35,7 +35,7 @@ public class HttpInvalidMessageTest {
EmbeddedChannel ch = new EmbeddedChannel(new HttpRequestDecoder());
ch.writeInbound(Unpooled.copiedBuffer("GET / HTTP/1.0 with extra\r\n", CharsetUtil.UTF_8));
HttpRequest req = ch.readInbound();
DecoderResult dr = req.getDecoderResult();
DecoderResult dr = req.decoderResult();
assertFalse(dr.isSuccess());
assertTrue(dr.isFailure());
ensureInboundTrafficDiscarded(ch);
@ -49,7 +49,7 @@ public class HttpInvalidMessageTest {
ch.writeInbound(Unpooled.copiedBuffer("Bad=Name: Bad Value\r\n", CharsetUtil.UTF_8));
ch.writeInbound(Unpooled.copiedBuffer("\r\n", CharsetUtil.UTF_8));
HttpRequest req = ch.readInbound();
DecoderResult dr = req.getDecoderResult();
DecoderResult dr = req.decoderResult();
assertFalse(dr.isSuccess());
assertTrue(dr.isFailure());
assertEquals("Good Value", req.headers().get("Good_Name"));
@ -62,7 +62,7 @@ public class HttpInvalidMessageTest {
EmbeddedChannel ch = new EmbeddedChannel(new HttpResponseDecoder());
ch.writeInbound(Unpooled.copiedBuffer("HTTP/1.0 BAD_CODE Bad Server\r\n", CharsetUtil.UTF_8));
HttpResponse res = ch.readInbound();
DecoderResult dr = res.getDecoderResult();
DecoderResult dr = res.decoderResult();
assertFalse(dr.isSuccess());
assertTrue(dr.isFailure());
ensureInboundTrafficDiscarded(ch);
@ -76,7 +76,7 @@ public class HttpInvalidMessageTest {
ch.writeInbound(Unpooled.copiedBuffer("Bad=Name: Bad Value\r\n", CharsetUtil.UTF_8));
ch.writeInbound(Unpooled.copiedBuffer("\r\n", CharsetUtil.UTF_8));
HttpResponse res = ch.readInbound();
DecoderResult dr = res.getDecoderResult();
DecoderResult dr = res.decoderResult();
assertFalse(dr.isSuccess());
assertTrue(dr.isFailure());
assertEquals("Maybe OK", res.getStatus().reasonPhrase());
@ -92,10 +92,10 @@ public class HttpInvalidMessageTest {
ch.writeInbound(Unpooled.copiedBuffer("BAD_LENGTH\r\n", CharsetUtil.UTF_8));
HttpRequest req = ch.readInbound();
assertTrue(req.getDecoderResult().isSuccess());
assertTrue(req.decoderResult().isSuccess());
LastHttpContent chunk = ch.readInbound();
DecoderResult dr = chunk.getDecoderResult();
DecoderResult dr = chunk.decoderResult();
assertFalse(dr.isSuccess());
assertTrue(dr.isFailure());
ensureInboundTrafficDiscarded(ch);

View File

@ -524,8 +524,8 @@ public class HttpResponseDecoderTest {
HttpResponse res = ch.readInbound();
assertThat(res.getProtocolVersion(), sameInstance(HttpVersion.HTTP_1_0));
assertThat(res.getStatus().code(), is(999));
assertThat(res.getDecoderResult().isFailure(), is(true));
assertThat(res.getDecoderResult().isFinished(), is(true));
assertThat(res.decoderResult().isFailure(), is(true));
assertThat(res.decoderResult().isFinished(), is(true));
assertThat(ch.readInbound(), is(nullValue()));
// More garbage should not generate anything (i.e. the decoder discards anything beyond this point.)
@ -554,7 +554,7 @@ public class HttpResponseDecoderTest {
// Ensure that the decoder generates the last chunk with correct decoder result.
LastHttpContent invalidChunk = channel.readInbound();
assertThat(invalidChunk.getDecoderResult().isFailure(), is(true));
assertThat(invalidChunk.decoderResult().isFailure(), is(true));
invalidChunk.release();
// And no more messages should be produced by the decoder.

View File

@ -16,9 +16,8 @@
package io.netty.handler.codec.memcache;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageAggregator;
import io.netty.handler.codec.memcache.binary.BinaryMemcacheRequestDecoder;
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseEncoder;
@ -42,91 +41,40 @@ import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseEncoder;
* p.addLast("handler", new YourMemcacheRequestHandler());
* </pre>
*/
public abstract class AbstractMemcacheObjectAggregator extends MessageToMessageDecoder<MemcacheObject> {
/**
* Contains the current message that gets aggregated.
*/
protected FullMemcacheMessage currentMessage;
/**
* Holds the current channel handler context if set.
*/
protected ChannelHandlerContext ctx;
public static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024;
private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS;
private final int maxContentLength;
public abstract class AbstractMemcacheObjectAggregator<H extends MemcacheMessage> extends
MessageAggregator<MemcacheObject, H, MemcacheContent, FullMemcacheMessage> {
protected AbstractMemcacheObjectAggregator(int maxContentLength) {
if (maxContentLength <= 0) {
throw new IllegalArgumentException("maxContentLength must be a positive integer: " + maxContentLength);
}
this.maxContentLength = maxContentLength;
}
/**
* Returns the maximum number of components in the cumulation buffer. If the number of
* the components in the cumulation buffer exceeds this value, the components of the
* cumulation buffer are consolidated into a single component, involving memory copies.
* The default value of this property is {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}.
*/
public final int getMaxCumulationBufferComponents() {
return maxCumulationBufferComponents;
}
/**
* Sets the maximum number of components in the cumulation buffer. If the number of
* the components in the cumulation buffer exceeds this value, the components of the
* cumulation buffer are consolidated into a single component, involving memory copies.
* The default value of this property is {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}
* and its minimum allowed value is {@code 2}.
*/
public final void setMaxCumulationBufferComponents(int maxCumulationBufferComponents) {
if (maxCumulationBufferComponents < 2) {
throw new IllegalArgumentException(
"maxCumulationBufferComponents: " + maxCumulationBufferComponents +
" (expected: >= 2)");
}
if (ctx == null) {
this.maxCumulationBufferComponents = maxCumulationBufferComponents;
} else {
throw new IllegalStateException(
"decoder properties cannot be changed once the decoder is added to a pipeline.");
}
}
public int getMaxContentLength() {
return maxContentLength;
super(maxContentLength);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
if (currentMessage != null) {
currentMessage.release();
currentMessage = null;
}
protected boolean isContentMessage(MemcacheObject msg) throws Exception {
return msg instanceof MemcacheContent;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
protected boolean isLastContentMessage(MemcacheContent msg) throws Exception {
return msg instanceof LastMemcacheContent;
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
if (currentMessage != null) {
currentMessage.release();
currentMessage = null;
}
protected boolean isAggregated(MemcacheObject msg) throws Exception {
return msg instanceof FullMemcacheMessage;
}
@Override
protected boolean hasContentLength(H start) throws Exception {
return false;
}
@Override
protected long contentLength(H start) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected Object newContinueResponse(H start) throws Exception {
return null;
}
}

View File

@ -16,20 +16,10 @@
package io.netty.handler.codec.memcache.binary;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.memcache.AbstractMemcacheObjectAggregator;
import io.netty.handler.codec.memcache.FullMemcacheMessage;
import io.netty.handler.codec.memcache.LastMemcacheContent;
import io.netty.handler.codec.memcache.MemcacheContent;
import io.netty.handler.codec.memcache.MemcacheMessage;
import io.netty.handler.codec.memcache.MemcacheObject;
import io.netty.util.ReferenceCountUtil;
import java.util.List;
/**
* An object aggregator for the memcache binary protocol.
@ -37,111 +27,34 @@ import java.util.List;
* It aggregates {@link BinaryMemcacheMessage}s and {@link MemcacheContent} into {@link FullBinaryMemcacheRequest}s
* or {@link FullBinaryMemcacheResponse}s.
*/
public class BinaryMemcacheObjectAggregator extends AbstractMemcacheObjectAggregator {
private boolean tooLongFrameFound;
public class BinaryMemcacheObjectAggregator extends AbstractMemcacheObjectAggregator<BinaryMemcacheMessage> {
public BinaryMemcacheObjectAggregator(int maxContentLength) {
super(maxContentLength);
}
@Override
protected void decode(ChannelHandlerContext ctx, MemcacheObject msg, List<Object> out) throws Exception {
FullMemcacheMessage currentMessage = this.currentMessage;
if (msg instanceof MemcacheMessage) {
tooLongFrameFound = false;
MemcacheMessage m = (MemcacheMessage) msg;
if (!m.getDecoderResult().isSuccess()) {
out.add(toFullMessage(m));
this.currentMessage = null;
return;
}
if (msg instanceof BinaryMemcacheRequest) {
this.currentMessage = toFullRequest((BinaryMemcacheRequest) msg,
Unpooled.compositeBuffer(getMaxCumulationBufferComponents()));
} else if (msg instanceof BinaryMemcacheResponse) {
this.currentMessage = toFullResponse((BinaryMemcacheResponse) msg,
Unpooled.compositeBuffer(getMaxCumulationBufferComponents()));
} else {
throw new Error();
}
} else if (msg instanceof MemcacheContent) {
if (tooLongFrameFound) {
if (msg instanceof LastMemcacheContent) {
this.currentMessage = null;
}
return;
}
MemcacheContent chunk = (MemcacheContent) msg;
CompositeByteBuf content = (CompositeByteBuf) currentMessage.content();
if (content.readableBytes() > getMaxContentLength() - chunk.content().readableBytes()) {
tooLongFrameFound = true;
currentMessage.release();
this.currentMessage = null;
throw new TooLongFrameException("Memcache content length exceeded " + getMaxContentLength()
+ " bytes.");
}
if (chunk.content().isReadable()) {
chunk.retain();
content.addComponent(chunk.content());
content.writerIndex(content.writerIndex() + chunk.content().readableBytes());
}
final boolean last;
if (!chunk.getDecoderResult().isSuccess()) {
currentMessage.setDecoderResult(
DecoderResult.failure(chunk.getDecoderResult().cause()));
last = true;
} else {
last = chunk instanceof LastMemcacheContent;
}
if (last) {
this.currentMessage = null;
out.add(currentMessage);
}
} else {
throw new Error();
}
protected boolean isStartMessage(MemcacheObject msg) throws Exception {
return msg instanceof BinaryMemcacheMessage;
}
/**
* Convert a invalid message into a full message.
*
* This method makes sure that upstream handlers always get a full message returned, even
* when invalid chunks are failing.
*
* @param msg the message to transform.
* @return a full message containing parts of the original message.
*/
private static FullMemcacheMessage toFullMessage(final MemcacheMessage msg) {
if (msg instanceof FullMemcacheMessage) {
return ((FullMemcacheMessage) msg).retain();
@Override
protected FullMemcacheMessage beginAggregation(BinaryMemcacheMessage start, ByteBuf content) throws Exception {
if (start instanceof BinaryMemcacheRequest) {
return toFullRequest((BinaryMemcacheRequest) start, content);
}
FullMemcacheMessage fullMsg;
if (msg instanceof BinaryMemcacheRequest) {
fullMsg = toFullRequest((BinaryMemcacheRequest) msg, Unpooled.EMPTY_BUFFER);
} else if (msg instanceof BinaryMemcacheResponse) {
fullMsg = toFullResponse((BinaryMemcacheResponse) msg, Unpooled.EMPTY_BUFFER);
} else {
throw new IllegalStateException();
if (start instanceof BinaryMemcacheResponse) {
return toFullResponse((BinaryMemcacheResponse) start, content);
}
return fullMsg;
// Should not reach here.
throw new Error();
}
private static FullBinaryMemcacheRequest toFullRequest(BinaryMemcacheRequest request, ByteBuf content) {
FullBinaryMemcacheRequest fullRequest = new DefaultFullBinaryMemcacheRequest(request.getKey(),
request.getExtras(), content);
FullBinaryMemcacheRequest fullRequest =
new DefaultFullBinaryMemcacheRequest(request.getKey(), request.getExtras(), content);
fullRequest.setMagic(request.getMagic());
fullRequest.setOpcode(request.getOpcode());
@ -157,8 +70,8 @@ public class BinaryMemcacheObjectAggregator extends AbstractMemcacheObjectAggreg
}
private static FullBinaryMemcacheResponse toFullResponse(BinaryMemcacheResponse response, ByteBuf content) {
FullBinaryMemcacheResponse fullResponse = new DefaultFullBinaryMemcacheResponse(response.getKey(),
response.getExtras(), content);
FullBinaryMemcacheResponse fullResponse =
new DefaultFullBinaryMemcacheResponse(response.getKey(), response.getExtras(), content);
fullResponse.setMagic(response.getMagic());
fullResponse.setOpcode(response.getOpcode());
@ -172,5 +85,4 @@ public class BinaryMemcacheObjectAggregator extends AbstractMemcacheObjectAggreg
return fullResponse;
}
}

View File

@ -20,7 +20,7 @@
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>5.0.0.Alpha2-SNAPSHOT</version>
<version>4.1.0.Alpha1-SNAPSHOT</version>
</parent>
<artifactId>netty-codec-stomp</artifactId>

View File

@ -23,7 +23,7 @@ import io.netty.handler.codec.DecoderResult;
* The default {@link StompContentSubframe} implementation.
*/
public class DefaultStompContentSubframe implements StompContentSubframe {
private DecoderResult decoderResult;
private DecoderResult decoderResult = DecoderResult.SUCCESS;
private final ByteBuf content;
public DefaultStompContentSubframe(ByteBuf content) {

View File

@ -23,7 +23,7 @@ import io.netty.handler.codec.DecoderResult;
public class DefaultStompHeadersSubframe implements StompHeadersSubframe {
protected final StompCommand command;
protected DecoderResult decoderResult;
protected DecoderResult decoderResult = DecoderResult.SUCCESS;
protected final StompHeaders headers = new StompHeaders();
public DefaultStompHeadersSubframe(StompCommand command) {

View File

@ -15,20 +15,9 @@
*/
package io.netty.handler.codec.stomp;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.DecoderResultProvider;
/**
* Defines a common interface for all {@link StompSubframe} implementations.
*/
public interface StompSubframe {
/**
* Returns the result of decoding this object.
*/
DecoderResult decoderResult();
/**
* Updates the result of decoding this object. This method is supposed to be invoked by
* {@link StompSubframeDecoder}. Do not call this method unless you know what you are doing.
*/
void setDecoderResult(DecoderResult result);
}
public interface StompSubframe extends DecoderResultProvider { }

View File

@ -15,32 +15,20 @@
*/
package io.netty.handler.codec.stomp;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageAggregator;
import io.netty.handler.codec.TooLongFrameException;
import java.util.List;
/**
* A {@link ChannelHandler} that aggregates an {@link StompHeadersSubframe}
* and its following {@link StompContentSubframe}s into a single {@link StompFrame}.
* It is useful when you don't want to take care of STOMP frames whose content is 'chunked'. Insert this
* handler after {@link StompSubframeDecoder} in the {@link ChannelPipeline}:
*/
public class StompSubframeAggregator extends MessageToMessageDecoder<StompSubframe> {
private static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024;
private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS;
private final int maxContentLength;
private StompFrame currentFrame;
private boolean tooLongFrameFound;
private volatile boolean handlerAdded;
public class StompSubframeAggregator
extends MessageAggregator<StompSubframe, StompHeadersSubframe, StompContentSubframe, StompFrame> {
/**
* Creates a new instance.
@ -51,97 +39,48 @@ public class StompSubframeAggregator extends MessageToMessageDecoder<StompSubfra
* a {@link TooLongFrameException} will be raised.
*/
public StompSubframeAggregator(int maxContentLength) {
if (maxContentLength <= 0) {
throw new IllegalArgumentException(
"maxContentLength must be a positive integer: " +
maxContentLength);
}
this.maxContentLength = maxContentLength;
}
/**
* Sets the maximum number of components in the cumulation buffer. If the number of
* the components in the cumulation buffer exceeds this value, the components of the
* cumulation buffer are consolidated into a single component, involving memory copies.
* The default value of this property is {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}
* and its minimum allowed value is {@code 2}.
*/
public final void setMaxCumulationBufferComponents(int maxCumulationBufferComponents) {
if (maxCumulationBufferComponents < 2) {
throw new IllegalArgumentException(
"maxCumulationBufferComponents: " + maxCumulationBufferComponents +
" (expected: >= 2)");
}
if (!handlerAdded) {
this.maxCumulationBufferComponents = maxCumulationBufferComponents;
} else {
throw new IllegalStateException(
"decoder properties cannot be changed once the decoder is added to a pipeline.");
}
super(maxContentLength);
}
@Override
protected void decode(ChannelHandlerContext ctx, StompSubframe msg, List<Object> out) throws Exception {
StompFrame currentFrame = this.currentFrame;
if (msg instanceof StompHeadersSubframe) {
assert currentFrame == null;
StompHeadersSubframe frame = (StompHeadersSubframe) msg;
this.currentFrame = currentFrame = new DefaultStompFrame(frame.command(),
Unpooled.compositeBuffer(maxCumulationBufferComponents));
currentFrame.headers().set(frame.headers());
} else if (msg instanceof StompContentSubframe) {
if (tooLongFrameFound) {
if (msg instanceof LastStompContentSubframe) {
this.currentFrame = null;
}
return;
}
assert currentFrame != null;
StompContentSubframe chunk = (StompContentSubframe) msg;
CompositeByteBuf contentBuf = (CompositeByteBuf) currentFrame.content();
if (contentBuf.readableBytes() > maxContentLength - chunk.content().readableBytes()) {
tooLongFrameFound = true;
currentFrame.release();
this.currentFrame = null;
throw new TooLongFrameException(
"STOMP content length exceeded " + maxContentLength +
" bytes.");
}
contentBuf.addComponent(chunk.retain().content());
contentBuf.writerIndex(contentBuf.writerIndex() + chunk.content().readableBytes());
if (chunk instanceof LastStompContentSubframe) {
out.add(currentFrame);
this.currentFrame = null;
}
} else {
throw new IllegalArgumentException("received unsupported object type " + msg);
}
protected boolean isStartMessage(StompSubframe msg) throws Exception {
return msg instanceof StompHeadersSubframe;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
handlerAdded = true;
protected boolean isContentMessage(StompSubframe msg) throws Exception {
return msg instanceof StompContentSubframe;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
if (currentFrame != null) {
currentFrame.release();
currentFrame = null;
}
protected boolean isLastContentMessage(StompContentSubframe msg) throws Exception {
return msg instanceof LastStompContentSubframe;
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
handlerAdded = false;
if (currentFrame != null) {
currentFrame.release();
currentFrame = null;
}
protected boolean isAggregated(StompSubframe msg) throws Exception {
return msg instanceof StompFrame;
}
@Override
protected boolean hasContentLength(StompHeadersSubframe start) throws Exception {
return start.headers().has(StompHeaders.CONTENT_LENGTH);
}
@Override
protected long contentLength(StompHeadersSubframe start) throws Exception {
return Long.parseLong(start.headers().get(StompHeaders.CONTENT_LENGTH));
}
@Override
protected Object newContinueResponse(StompHeadersSubframe start) throws Exception {
return null;
}
@Override
protected StompFrame beginAggregation(StompHeadersSubframe start, ByteBuf content) throws Exception {
StompFrame ret = new DefaultStompFrame(start.command(), content);
ret.headers().set(start.headers());
return ret;
}
}

View File

@ -0,0 +1,33 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec;
/**
* Provides the accessor methods for the {@link DecoderResult} property of a decoded message.
*/
public interface DecoderResultProvider {
/**
* Returns the result of decoding this object.
*/
DecoderResult decoderResult();
/**
* Updates the result of decoding this object. This method is supposed to be invoked by a decoder.
* Do not call this method unless you know what you are doing.
*/
void setDecoderResult(DecoderResult result);
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec;
/**
* Raised by {@link MessageAggregator} when aggregation fails due to an unexpected message sequence.
*/
public class MessageAggregationException extends IllegalStateException {
private static final long serialVersionUID = -1995826182950310255L;
public MessageAggregationException() { }
public MessageAggregationException(String s) {
super(s);
}
public MessageAggregationException(String message, Throwable cause) {
super(message, cause);
}
public MessageAggregationException(Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,408 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
import java.util.List;
/**
* An abstract {@link ChannelHandler} that aggregates a series of message objects into a single aggregated message.
* <p>
* 'A series of messages' is composed of the following:
* <ul>
* <li>a single start message which optionally contains the first part of the content, and</li>
* <li>1 or more content messages.</li>
* </ul>
* The content of the aggregated message will be the merged content of the start message and its following content
* messages. If this aggregator encounters a content message where {@link #isLastContentMessage(ByteBufHolder)}
* return {@code true} for, the aggregator will finish the aggregation and produce the aggregated message and expect
* another start message.
* </p>
*
* @param <I> the type that covers both start message and content message
* @param <S> the type of the start message
* @param <C> the type of the content message (must be a subtype of {@link ByteBufHolder})
* @param <O> the type of the aggregated message (must be a subtype of {@code S} and {@link ByteBufHolder})
*/
public abstract class MessageAggregator<I, S, C extends ByteBufHolder, O extends ByteBufHolder>
extends MessageToMessageDecoder<I> {
private static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024;
private final int maxContentLength;
private O currentMessage;
private boolean handlingOversizedMessage;
private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS;
private ChannelHandlerContext ctx;
private ChannelFutureListener continueResponseWriteListener;
/**
* Creates a new instance.
*
* @param maxContentLength
* the maximum length of the aggregated content.
* If the length of the aggregated content exceeds this value,
* {@link #handleOversizedMessage(ChannelHandlerContext, Object)} will be called.
*/
protected MessageAggregator(int maxContentLength) {
validateMaxContentLength(maxContentLength);
this.maxContentLength = maxContentLength;
}
protected MessageAggregator(int maxContentLength, Class<? extends I> inboundMessageType) {
super(inboundMessageType);
validateMaxContentLength(maxContentLength);
this.maxContentLength = maxContentLength;
}
private static void validateMaxContentLength(int maxContentLength) {
if (maxContentLength <= 0) {
throw new IllegalArgumentException("maxContentLength must be a positive integer: " + maxContentLength);
}
}
@Override
public boolean acceptInboundMessage(Object msg) throws Exception {
// No need to match last and full types because they are subset of first and middle types.
if (!super.acceptInboundMessage(msg)) {
return false;
}
@SuppressWarnings("unchecked")
I in = (I) msg;
return (isContentMessage(in) || isStartMessage(in)) && !isAggregated(in);
}
/**
* Returns {@code true} if and only if the specified message is a start message. Typically, this method is
* implemented as a single {@code return} statement with {@code instanceof}:
* <pre>
* return msg instanceof MyStartMessage;
* </pre>
*/
protected abstract boolean isStartMessage(I msg) throws Exception;
/**
* Returns {@code true} if and only if the specified message is a content message. Typically, this method is
* implemented as a single {@code return} statement with {@code instanceof}:
* <pre>
* return msg instanceof MyContentMessage;
* </pre>
*/
protected abstract boolean isContentMessage(I msg) throws Exception;
/**
* Returns {@code true} if and only if the specified message is the last content message. Typically, this method is
* implemented as a single {@code return} statement with {@code instanceof}:
* <pre>
* return msg instanceof MyLastContentMessage;
* </pre>
* or with {@code instanceof} and boolean field check:
* <pre>
* return msg instanceof MyContentMessage && msg.isLastFragment();
* </pre>
*/
protected abstract boolean isLastContentMessage(C msg) throws Exception;
/**
* Returns {@code true} if and only if the specified message is already aggregated. If this method returns
* {@code true}, this handler will simply forward the message to the next handler as-is.
*/
protected abstract boolean isAggregated(I msg) throws Exception;
/**
* Returns the maximum allowed length of the aggregated message.
*/
public final int maxContentLength() {
return maxContentLength;
}
/**
* Returns the maximum number of components in the cumulation buffer. If the number of
* the components in the cumulation buffer exceeds this value, the components of the
* cumulation buffer are consolidated into a single component, involving memory copies.
* The default value of this property is {@value #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}.
*/
public final int maxCumulationBufferComponents() {
return maxCumulationBufferComponents;
}
/**
* Sets the maximum number of components in the cumulation buffer. If the number of
* the components in the cumulation buffer exceeds this value, the components of the
* cumulation buffer are consolidated into a single component, involving memory copies.
* The default value of this property is {@value #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}
* and its minimum allowed value is {@code 2}.
*/
public final void setMaxCumulationBufferComponents(int maxCumulationBufferComponents) {
if (maxCumulationBufferComponents < 2) {
throw new IllegalArgumentException(
"maxCumulationBufferComponents: " + maxCumulationBufferComponents +
" (expected: >= 2)");
}
if (ctx == null) {
this.maxCumulationBufferComponents = maxCumulationBufferComponents;
} else {
throw new IllegalStateException(
"decoder properties cannot be changed once the decoder is added to a pipeline.");
}
}
public final boolean isHandlingOversizedMessage() {
return handlingOversizedMessage;
}
protected final ChannelHandlerContext ctx() {
if (ctx == null) {
throw new IllegalStateException("not added to a pipeline yet");
}
return ctx;
}
@Override
protected void decode(final ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception {
O currentMessage = this.currentMessage;
if (isStartMessage(msg)) {
handlingOversizedMessage = false;
if (currentMessage != null) {
throw new MessageAggregationException();
}
@SuppressWarnings("unchecked")
S m = (S) msg;
// if content length is set, preemptively close if it's too large
if (hasContentLength(m)) {
if (contentLength(m) > maxContentLength) {
// handle oversized message
invokeHandleOversizedMessage(ctx, m);
return;
}
}
// Send the continue response if necessary (e.g. 'Expect: 100-continue' header)
Object continueResponse = newContinueResponse(m);
if (continueResponse != null) {
// Cache the write listener for reuse.
ChannelFutureListener listener = continueResponseWriteListener;
if (listener == null) {
continueResponseWriteListener = listener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
ctx.fireExceptionCaught(future.cause());
}
}
};
}
ctx.writeAndFlush(continueResponse).addListener(listener);
}
if (m instanceof DecoderResultProvider && !((DecoderResultProvider) m).decoderResult().isSuccess()) {
O aggregated;
if (m instanceof ByteBufHolder && ((ByteBufHolder) m).content().isReadable()) {
aggregated = beginAggregation(m, ((ByteBufHolder) m).content().retain());
} else {
aggregated = beginAggregation(m, Unpooled.EMPTY_BUFFER);
}
finishAggregation(aggregated);
out.add(aggregated);
this.currentMessage = null;
return;
}
// A streamed message - initialize the cumulative buffer, and wait for incoming chunks.
CompositeByteBuf content = Unpooled.compositeBuffer(maxCumulationBufferComponents);
if (m instanceof ByteBufHolder) {
appendPartialContent(content, ((ByteBufHolder) m).content());
}
this.currentMessage = beginAggregation(m, content);
} else if (isContentMessage(msg)) {
@SuppressWarnings("unchecked")
final C m = (C) msg;
final ByteBuf partialContent = ((ByteBufHolder) msg).content();
final boolean isLastContentMessage = isLastContentMessage(m);
if (handlingOversizedMessage) {
if (isLastContentMessage) {
this.currentMessage = null;
}
// already detect the too long frame so just discard the content
return;
}
if (currentMessage == null) {
throw new MessageAggregationException();
}
// Merge the received chunk into the content of the current message.
CompositeByteBuf content = (CompositeByteBuf) currentMessage.content();
// Handle oversized message.
if (content.readableBytes() > maxContentLength - partialContent.readableBytes()) {
// By convention, full message type extends first message type.
@SuppressWarnings("unchecked")
S s = (S) currentMessage;
invokeHandleOversizedMessage(ctx, s);
return;
}
// Append the content of the chunk.
appendPartialContent(content, partialContent);
// Give the subtypes a chance to merge additional information such as trailing headers.
aggregate(currentMessage, m);
final boolean last;
if (m instanceof DecoderResultProvider) {
DecoderResult decoderResult = ((DecoderResultProvider) m).decoderResult();
if (!decoderResult.isSuccess()) {
if (currentMessage instanceof DecoderResultProvider) {
((DecoderResultProvider) currentMessage).setDecoderResult(
DecoderResult.failure(decoderResult.cause()));
}
last = true;
} else {
last = isLastContentMessage;
}
} else {
last = isLastContentMessage;
}
if (last) {
finishAggregation(currentMessage);
// All done
out.add(currentMessage);
this.currentMessage = null;
}
} else {
throw new MessageAggregationException();
}
}
private static void appendPartialContent(CompositeByteBuf content, ByteBuf partialContent) {
if (partialContent.isReadable()) {
partialContent.retain();
content.addComponent(partialContent);
content.writerIndex(content.writerIndex() + partialContent.readableBytes());
}
}
/**
* Returns {@code true} if and only if the specified start message already contains the information about the
* length of the whole content.
*/
protected abstract boolean hasContentLength(S start) throws Exception;
/**
* Retrieves the length of the whole content from the specified start message. This method is invoked only when
* {@link #hasContentLength(Object)} returned {@code true}.
*/
protected abstract long contentLength(S start) throws Exception;
/**
* Returns the 'continue response' for the specified start message if necessary. For example, this method is
* useful to handle an HTTP 100-continue header.
*
* @return the 'continue response', or {@code null} if there's no message to send
*/
protected abstract Object newContinueResponse(S start) throws Exception;
/**
* Creates a new aggregated message from the specified start message and the specified content. If the start
* message implements {@link ByteBufHolder}, its content is appended to the specified {@code content}.
* This aggregator will continue to append the received content to the specified {@code content}.
*/
protected abstract O beginAggregation(S start, ByteBuf content) throws Exception;
/**
* Transfers the information provided by the specified content message to the specified aggregated message.
* Note that the content of the specified content message has been appended to the content of the specified
* aggregated message already, so that you don't need to. Use this method to transfer the additional information
* that the content message provides to {@code aggregated}.
*/
protected void aggregate(O aggregated, C content) throws Exception { }
/**
* Invoked when the specified {@code aggregated} message is about to be passed to the next handler in the pipeline.
*/
protected void finishAggregation(O aggregated) throws Exception { }
private void invokeHandleOversizedMessage(ChannelHandlerContext ctx, S oversized) throws Exception {
handlingOversizedMessage = true;
currentMessage = null;
try {
handleOversizedMessage(ctx, oversized);
} finally {
// Release the message in case it is a full one.
ReferenceCountUtil.release(oversized);
}
}
/**
* Invoked when an incoming request exceeds the maximum content length. The default behvaior is to trigger an
* {@code exceptionCaught()} event with a {@link TooLongFrameException}.
*
* @param ctx the {@link ChannelHandlerContext}
* @param oversized the accumulated message up to this point, whose type is {@code S} or {@code O}
*/
protected void handleOversizedMessage(ChannelHandlerContext ctx, S oversized) throws Exception {
ctx.fireExceptionCaught(
new TooLongFrameException("content length exceeded " + maxContentLength() + " bytes."));
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// release current message if it is not null as it may be a left-over
if (currentMessage != null) {
currentMessage.release();
currentMessage = null;
}
super.channelInactive(ctx);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
// release current message if it is not null as it may be a left-over as there is not much more we can do in
// this case
if (currentMessage != null) {
currentMessage.release();
currentMessage = null;
}
}
}

View File

@ -111,7 +111,7 @@ public class HttpStaticFileServerHandler extends SimpleChannelInboundHandler<Ful
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
if (!request.getDecoderResult().isSuccess()) {
if (!request.decoderResult().isSuccess()) {
sendError(ctx, BAD_REQUEST);
return;
}

View File

@ -133,7 +133,7 @@ public class HttpSnoopServerHandler extends SimpleChannelInboundHandler<Object>
}
private static void appendDecoderResult(StringBuilder buf, HttpObject o) {
DecoderResult result = o.getDecoderResult();
DecoderResult result = o.decoderResult();
if (result.isSuccess()) {
return;
}
@ -148,7 +148,7 @@ public class HttpSnoopServerHandler extends SimpleChannelInboundHandler<Object>
boolean keepAlive = isKeepAlive(request);
// Build the response object.
FullHttpResponse response = new DefaultFullHttpResponse(
HTTP_1_1, currentObj.getDecoderResult().isSuccess()? OK : BAD_REQUEST,
HTTP_1_1, currentObj.decoderResult().isSuccess()? OK : BAD_REQUEST,
Unpooled.copiedBuffer(buf.toString(), CharsetUtil.UTF_8));
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");

View File

@ -64,7 +64,7 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
// Handle a bad request.
if (!req.getDecoderResult().isSuccess()) {
if (!req.decoderResult().isSuccess()) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
return;
}

View File

@ -48,7 +48,7 @@ public class StompClientHandler extends SimpleChannelInboundHandler<StompFrame>
}
@Override
protected void messageReceived(ChannelHandlerContext ctx, StompFrame frame) throws Exception {
protected void channelRead0(ChannelHandlerContext ctx, StompFrame frame) throws Exception {
String subscrReceiptId = "001";
String disconReceiptId = "002";
switch (frame.command()) {

View File

@ -71,7 +71,7 @@ public class AutobahnServerHandler extends ChannelInboundHandlerAdapter {
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req)
throws Exception {
// Handle a bad request.
if (!req.getDecoderResult().isSuccess()) {
if (!req.decoderResult().isSuccess()) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
req.release();
return;
@ -106,11 +106,9 @@ public class AutobahnServerHandler extends ChannelInboundHandlerAdapter {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
} else if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content()));
} else if (frame instanceof TextWebSocketFrame) {
ctx.write(frame);
} else if (frame instanceof BinaryWebSocketFrame) {
ctx.write(frame);
} else if (frame instanceof ContinuationWebSocketFrame) {
} else if (frame instanceof TextWebSocketFrame ||
frame instanceof BinaryWebSocketFrame ||
frame instanceof ContinuationWebSocketFrame) {
ctx.write(frame);
} else if (frame instanceof PongWebSocketFrame) {
frame.release();