From 2e98566916fae22c9dd78eb8ce5f04d87076d691 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Wed, 4 Jun 2014 18:34:57 +0900 Subject: [PATCH] Introduce MessageAggregator and DecoderResultProvider Motivation: We have different message aggregator implementations for different protocols, but they are very similar with each other. They all stems from HttpObjectAggregator. If we provide an abstract class that provide generic message aggregation functionality, we will remove their code duplication. Modifications: - Add MessageAggregator which provides generic message aggregation - Reimplement all existing aggregators using MessageAggregator - Add DecoderResultProvider interface and extend it wherever possible so that MessageAggregator respects the state of the decoded message Result: Less code duplication --- .../codec/http/ComposedLastHttpContent.java | 2 +- .../codec/http/DefaultHttpContent.java | 2 +- .../handler/codec/http/DefaultHttpObject.java | 2 +- .../codec/http/DefaultHttpRequest.java | 2 +- .../codec/http/DefaultHttpResponse.java | 4 +- .../netty/handler/codec/http/HttpHeaders.java | 13 +- .../netty/handler/codec/http/HttpObject.java | 15 +- .../codec/http/HttpObjectAggregator.java | 320 ++++---------- .../codec/http/HttpServerUpgradeHandler.java | 41 +- .../handler/codec/http/LastHttpContent.java | 2 +- .../multipart/HttpPostRequestEncoder.java | 4 +- .../websocketx/WebSocketFrameAggregator.java | 134 +++--- .../codec/http/HttpInvalidMessageTest.java | 12 +- .../codec/http/HttpResponseDecoderTest.java | 6 +- .../AbstractMemcacheObjectAggregator.java | 100 ++--- .../BinaryMemcacheObjectAggregator.java | 118 +---- .../stomp/DefaultStompContentSubframe.java | 2 +- .../stomp/DefaultStompHeadersSubframe.java | 2 +- .../handler/codec/stomp/StompSubframe.java | 15 +- .../codec/stomp/StompSubframeAggregator.java | 131 ++---- .../handler/codec/DecoderResultProvider.java | 33 ++ .../codec/MessageAggregationException.java | 39 ++ .../handler/codec/MessageAggregator.java | 408 ++++++++++++++++++ .../file/HttpStaticFileServerHandler.java | 2 +- .../http/snoop/HttpSnoopServerHandler.java | 4 +- .../server/WebSocketServerHandler.java | 2 +- .../autobahn/AutobahnServerHandler.java | 10 +- 27 files changed, 745 insertions(+), 680 deletions(-) create mode 100644 codec/src/main/java/io/netty/handler/codec/DecoderResultProvider.java create mode 100644 codec/src/main/java/io/netty/handler/codec/MessageAggregationException.java create mode 100644 codec/src/main/java/io/netty/handler/codec/MessageAggregator.java diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/ComposedLastHttpContent.java b/codec-http/src/main/java/io/netty/handler/codec/http/ComposedLastHttpContent.java index f38687efd5..162805246a 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/ComposedLastHttpContent.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/ComposedLastHttpContent.java @@ -70,7 +70,7 @@ final class ComposedLastHttpContent implements LastHttpContent { } @Override - public DecoderResult getDecoderResult() { + public DecoderResult decoderResult() { return result; } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpContent.java b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpContent.java index b366cb43a8..18f6a6b794 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpContent.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpContent.java @@ -92,6 +92,6 @@ public class DefaultHttpContent extends DefaultHttpObject implements HttpContent @Override public String toString() { return StringUtil.simpleClassName(this) + - "(data: " + content() + ", decoderResult: " + getDecoderResult() + ')'; + "(data: " + content() + ", decoderResult: " + decoderResult() + ')'; } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpObject.java b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpObject.java index 11f3c23c55..24eade9be7 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpObject.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpObject.java @@ -26,7 +26,7 @@ public class DefaultHttpObject implements HttpObject { } @Override - public DecoderResult getDecoderResult() { + public DecoderResult decoderResult() { return decoderResult; } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpRequest.java b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpRequest.java index 1c4078246c..8ca421f596 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpRequest.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpRequest.java @@ -95,7 +95,7 @@ public class DefaultHttpRequest extends DefaultHttpMessage implements HttpReques StringBuilder buf = new StringBuilder(); buf.append(StringUtil.simpleClassName(this)); buf.append("(decodeResult: "); - buf.append(getDecoderResult()); + buf.append(decoderResult()); buf.append(')'); buf.append(StringUtil.NEWLINE); buf.append(getMethod()); diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpResponse.java b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpResponse.java index 90f9a64dbc..fb5dd67250 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpResponse.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpResponse.java @@ -74,12 +74,12 @@ public class DefaultHttpResponse extends DefaultHttpMessage implements HttpRespo StringBuilder buf = new StringBuilder(); buf.append(StringUtil.simpleClassName(this)); buf.append("(decodeResult: "); - buf.append(getDecoderResult()); + buf.append(decoderResult()); buf.append(')'); buf.append(StringUtil.NEWLINE); buf.append(getProtocolVersion().text()); buf.append(' '); - buf.append(getStatus().toString()); + buf.append(getStatus()); buf.append(StringUtil.NEWLINE); appendHeaders(buf); diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpHeaders.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpHeaders.java index b27f7e772a..f9b6a5fd9f 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpHeaders.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpHeaders.java @@ -36,7 +36,7 @@ import static io.netty.handler.codec.http.HttpConstants.*; */ public abstract class HttpHeaders implements Iterable> { - private static final byte[] HEADER_SEPERATOR = { HttpConstants.COLON, HttpConstants.SP }; + private static final byte[] HEADER_SEPERATOR = { COLON, SP }; private static final byte[] CRLF = { CR, LF }; public static final HttpHeaders EMPTY_HEADERS = new HttpHeaders() { @@ -722,7 +722,7 @@ public abstract class HttpHeaders implements Iterable> try { return Integer.parseInt(value); - } catch (NumberFormatException e) { + } catch (NumberFormatException ignored) { return defaultValue; } } @@ -783,7 +783,7 @@ public abstract class HttpHeaders implements Iterable> try { return HttpHeaderDateFormat.get().parse(value); - } catch (ParseException e) { + } catch (ParseException ignored) { return defaultValue; } } @@ -865,7 +865,7 @@ public abstract class HttpHeaders implements Iterable> if (contentLength != null) { try { return Long.parseLong(contentLength); - } catch (NumberFormatException e) { + } catch (NumberFormatException ignored) { return defaultValue; } } @@ -1408,7 +1408,12 @@ public abstract class HttpHeaders implements Iterable> if (headers == null) { throw new NullPointerException("headers"); } + clear(); + if (headers.isEmpty()) { + return this; + } + for (Map.Entry e: headers) { add(e.getKey(), e.getValue()); } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpObject.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpObject.java index f28a785cb8..9168d4a6c8 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpObject.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpObject.java @@ -15,17 +15,6 @@ */ package io.netty.handler.codec.http; -import io.netty.handler.codec.DecoderResult; +import io.netty.handler.codec.DecoderResultProvider; -public interface HttpObject { - /** - * Returns the result of decoding this message. - */ - DecoderResult getDecoderResult(); - - /** - * Updates the result of decoding this message. This method is supposed to be invoked by {@link HttpObjectDecoder}. - * Do not call this method unless you know what you are doing. - */ - void setDecoderResult(DecoderResult result); -} +public interface HttpObject extends DecoderResultProvider { } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectAggregator.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectAggregator.java index f07e880e98..4343ab76f1 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectAggregator.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectAggregator.java @@ -15,24 +15,18 @@ */ package io.netty.handler.codec.http; -import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; -import io.netty.handler.codec.DecoderResult; -import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.MessageAggregator; import io.netty.handler.codec.TooLongFrameException; -import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; -import java.util.List; - -import static io.netty.handler.codec.http.HttpHeaders.*; - /** * A {@link ChannelHandler} that aggregates an {@link HttpMessage} * and its following {@link HttpContent}s into a single {@link FullHttpRequest} @@ -52,9 +46,8 @@ import static io.netty.handler.codec.http.HttpHeaders.*; * Be aware that you need to have the {@link HttpResponseEncoder} or {@link HttpRequestEncoder} * before the {@link HttpObjectAggregator} in the {@link ChannelPipeline}. */ -public class HttpObjectAggregator extends MessageToMessageDecoder { - - private static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024; +public class HttpObjectAggregator + extends MessageAggregator { private static final InternalLogger logger = InternalLoggerFactory.getInstance(HttpObjectAggregator.class); @@ -64,16 +57,9 @@ public class HttpObjectAggregator extends MessageToMessageDecoder { HttpVersion.HTTP_1_1, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, Unpooled.EMPTY_BUFFER); static { - TOO_LARGE.headers().set(Names.CONTENT_LENGTH, 0); + TOO_LARGE.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 0); } - private final int maxContentLength; - private FullHttpMessage currentMessage; - private boolean handlingOversizedMessage; - - private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS; - private ChannelHandlerContext ctx; - /** * Creates a new instance. * @@ -84,195 +70,90 @@ public class HttpObjectAggregator extends MessageToMessageDecoder { * will be called. */ public HttpObjectAggregator(int maxContentLength) { - if (maxContentLength <= 0) { - throw new IllegalArgumentException( - "maxContentLength must be a positive integer: " + - maxContentLength); - } - this.maxContentLength = maxContentLength; + super(maxContentLength); } - /** - * Returns the maximum number of components in the cumulation buffer. If the number of - * the components in the cumulation buffer exceeds this value, the components of the - * cumulation buffer are consolidated into a single component, involving memory copies. - * The default value of this property is {@value #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}. - */ - public final int getMaxCumulationBufferComponents() { - return maxCumulationBufferComponents; + @Override + protected boolean isStartMessage(HttpObject msg) throws Exception { + return msg instanceof HttpMessage; } - /** - * Sets the maximum number of components in the cumulation buffer. If the number of - * the components in the cumulation buffer exceeds this value, the components of the - * cumulation buffer are consolidated into a single component, involving memory copies. - * The default value of this property is {@value #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS} - * and its minimum allowed value is {@code 2}. - */ - public final void setMaxCumulationBufferComponents(int maxCumulationBufferComponents) { - if (maxCumulationBufferComponents < 2) { - throw new IllegalArgumentException( - "maxCumulationBufferComponents: " + maxCumulationBufferComponents + - " (expected: >= 2)"); - } + @Override + protected boolean isContentMessage(HttpObject msg) throws Exception { + return msg instanceof HttpContent; + } - if (ctx == null) { - this.maxCumulationBufferComponents = maxCumulationBufferComponents; + @Override + protected boolean isLastContentMessage(HttpContent msg) throws Exception { + return msg instanceof LastHttpContent; + } + + @Override + protected boolean isAggregated(HttpObject msg) throws Exception { + return msg instanceof FullHttpMessage; + } + + @Override + protected boolean hasContentLength(HttpMessage start) throws Exception { + return HttpHeaders.isContentLengthSet(start); + } + + @Override + protected long contentLength(HttpMessage start) throws Exception { + return HttpHeaders.getContentLength(start); + } + + @Override + protected Object newContinueResponse(HttpMessage start) throws Exception { + if (HttpHeaders.is100ContinueExpected(start)) { + return CONTINUE; } else { - throw new IllegalStateException( - "decoder properties cannot be changed once the decoder is added to a pipeline."); + return null; } } @Override - protected void decode(final ChannelHandlerContext ctx, HttpObject msg, List out) throws Exception { - FullHttpMessage currentMessage = this.currentMessage; + protected FullHttpMessage beginAggregation(HttpMessage start, ByteBuf content) throws Exception { + assert !(start instanceof FullHttpMessage); - if (msg instanceof HttpMessage) { - handlingOversizedMessage = false; - assert currentMessage == null; + HttpHeaders.removeTransferEncodingChunked(start); - HttpMessage m = (HttpMessage) msg; - - // if content length is set, preemptively close if it's too large - if (isContentLengthSet(m)) { - if (getContentLength(m) > maxContentLength) { - // handle oversized message - invokeHandleOversizedMessage(ctx, m); - return; - } - } - - // Handle the 'Expect: 100-continue' header if necessary. - if (is100ContinueExpected(m)) { - ctx.writeAndFlush(CONTINUE).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - ctx.fireExceptionCaught(future.cause()); - } - } - }); - } - - if (!m.getDecoderResult().isSuccess()) { - removeTransferEncodingChunked(m); - out.add(toFullMessage(m)); - this.currentMessage = null; - return; - } - if (msg instanceof HttpRequest) { - HttpRequest header = (HttpRequest) msg; - this.currentMessage = currentMessage = new DefaultFullHttpRequest(header.getProtocolVersion(), - header.getMethod(), header.getUri(), Unpooled.compositeBuffer(maxCumulationBufferComponents)); - } else if (msg instanceof HttpResponse) { - HttpResponse header = (HttpResponse) msg; - this.currentMessage = currentMessage = new DefaultFullHttpResponse( - header.getProtocolVersion(), header.getStatus(), - Unpooled.compositeBuffer(maxCumulationBufferComponents)); - } else { - throw new Error(); - } - - currentMessage.headers().set(m.headers()); - - // A streamed message - initialize the cumulative buffer, and wait for incoming chunks. - removeTransferEncodingChunked(currentMessage); - } else if (msg instanceof HttpContent) { - if (handlingOversizedMessage) { - if (msg instanceof LastHttpContent) { - this.currentMessage = null; - } - // already detect the too long frame so just discard the content - return; - } - assert currentMessage != null; - - // Merge the received chunk into the content of the current message. - HttpContent chunk = (HttpContent) msg; - CompositeByteBuf content = (CompositeByteBuf) currentMessage.content(); - - if (content.readableBytes() > maxContentLength - chunk.content().readableBytes()) { - // handle oversized message - invokeHandleOversizedMessage(ctx, currentMessage); - return; - } - - // Append the content of the chunk - if (chunk.content().isReadable()) { - chunk.retain(); - content.addComponent(chunk.content()); - content.writerIndex(content.writerIndex() + chunk.content().readableBytes()); - } - - final boolean last; - if (!chunk.getDecoderResult().isSuccess()) { - currentMessage.setDecoderResult( - DecoderResult.failure(chunk.getDecoderResult().cause())); - last = true; - } else { - last = chunk instanceof LastHttpContent; - } - - if (last) { - // Merge trailing headers into the message. - if (chunk instanceof LastHttpContent) { - LastHttpContent trailer = (LastHttpContent) chunk; - currentMessage.headers().add(trailer.trailingHeaders()); - } - - // Set the 'Content-Length' header. - currentMessage.headers().set( - HttpHeaders.Names.CONTENT_LENGTH, - String.valueOf(content.readableBytes())); - - // All done - out.add(currentMessage); - this.currentMessage = null; - } + FullHttpMessage ret; + if (start instanceof HttpRequest) { + HttpRequest req = (HttpRequest) start; + ret = new DefaultFullHttpRequest(req.getProtocolVersion(), + req.getMethod(), req.getUri(), content); + } else if (start instanceof HttpResponse) { + HttpResponse res = (HttpResponse) start; + ret = new DefaultFullHttpResponse( + res.getProtocolVersion(), res.getStatus(), content); } else { throw new Error(); } + + ret.headers().set(start.headers()); + return ret; } - private void invokeHandleOversizedMessage(ChannelHandlerContext ctx, HttpMessage msg) throws Exception { - handlingOversizedMessage = true; - currentMessage = null; - try { - handleOversizedMessage(ctx, msg); - } finally { - // Release the message in case it is a full one. - ReferenceCountUtil.release(msg); - - if (msg instanceof HttpRequest) { - // If an oversized request was handled properly and the connection is still alive - // (i.e. rejected 100-continue). the decoder should prepare to handle a new message. - HttpObjectDecoder decoder = ctx.pipeline().get(HttpObjectDecoder.class); - if (decoder != null) { - decoder.reset(); - } - } + @Override + protected void aggregate(FullHttpMessage aggregated, HttpContent content) throws Exception { + if (content instanceof LastHttpContent) { + // Merge trailing headers into the message. + aggregated.headers().add(((LastHttpContent) content).trailingHeaders()); } } - /** - * Invoked when an incoming request exceeds the maximum content length. - * - * The default behavior is: - *
    - *
  • Oversized request: Send a {@link HttpResponseStatus#REQUEST_ENTITY_TOO_LARGE} and close the connection - * if keep-alive is not enabled.
  • - *
  • Oversized response: Close the connection and raise {@link TooLongFrameException}.
  • - *
- * Sub-classes may override this method to change the default behavior. The specified {@code msg} is released - * once this method returns. - * - * @param ctx the {@link ChannelHandlerContext} - * @param msg the accumulated HTTP message up to this point - */ - @SuppressWarnings("UnusedParameters") - protected void handleOversizedMessage(final ChannelHandlerContext ctx, HttpMessage msg) throws Exception { - if (msg instanceof HttpRequest) { + @Override + protected void finishAggregation(FullHttpMessage aggregated) throws Exception { + // Set the 'Content-Length' header. + aggregated.headers().set( + HttpHeaders.Names.CONTENT_LENGTH, + String.valueOf(aggregated.content().readableBytes())); + } + + @Override + protected void handleOversizedMessage(final ChannelHandlerContext ctx, HttpMessage oversized) throws Exception { + if (oversized instanceof HttpRequest) { // send back a 413 and close the connection ChannelFuture future = ctx.writeAndFlush(TOO_LARGE).addListener(new ChannelFutureListener() { @Override @@ -287,63 +168,22 @@ public class HttpObjectAggregator extends MessageToMessageDecoder { // If the client started to send data already, close because it's impossible to recover. // If 'Expect: 100-continue' is missing, close becuase it's impossible to recover. // If keep-alive is off, no need to leave the connection open. - if (msg instanceof FullHttpMessage || !is100ContinueExpected(msg) || !isKeepAlive(msg)) { + if (oversized instanceof FullHttpMessage || + !HttpHeaders.is100ContinueExpected(oversized) || !HttpHeaders.isKeepAlive(oversized)) { future.addListener(ChannelFutureListener.CLOSE); } - } else if (msg instanceof HttpResponse) { + + // If an oversized request was handled properly and the connection is still alive + // (i.e. rejected 100-continue). the decoder should prepare to handle a new message. + HttpObjectDecoder decoder = ctx.pipeline().get(HttpObjectDecoder.class); + if (decoder != null) { + decoder.reset(); + } + } else if (oversized instanceof HttpResponse) { ctx.close(); - throw new TooLongFrameException("Response entity too large: " + msg); + throw new TooLongFrameException("Response entity too large: " + oversized); } else { throw new IllegalStateException(); } } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - // release current message if it is not null as it may be a left-over - if (currentMessage != null) { - currentMessage.release(); - currentMessage = null; - } - - super.channelInactive(ctx); - } - - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - this.ctx = ctx; - } - - @Override - public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - super.handlerRemoved(ctx); - - // release current message if it is not null as it may be a left-over as there is not much more we can do in - // this case - if (currentMessage != null) { - currentMessage.release(); - currentMessage = null; - } - } - - private static FullHttpMessage toFullMessage(HttpMessage msg) { - if (msg instanceof FullHttpMessage) { - return ((FullHttpMessage) msg).retain(); - } - - FullHttpMessage fullMsg; - if (msg instanceof HttpRequest) { - HttpRequest req = (HttpRequest) msg; - fullMsg = new DefaultFullHttpRequest( - req.getProtocolVersion(), req.getMethod(), req.getUri(), Unpooled.EMPTY_BUFFER, false); - } else if (msg instanceof HttpResponse) { - HttpResponse res = (HttpResponse) msg; - fullMsg = new DefaultFullHttpResponse( - res.getProtocolVersion(), res.getStatus(), Unpooled.EMPTY_BUFFER, false); - } else { - throw new IllegalStateException(); - } - - return fullMsg; - } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpServerUpgradeHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpServerUpgradeHandler.java index ef1b0e8f32..0e93a76f02 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpServerUpgradeHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpServerUpgradeHandler.java @@ -14,12 +14,6 @@ */ package io.netty.handler.codec.http; -import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; -import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; -import static io.netty.handler.codec.http.HttpHeaders.Names.UPGRADE; -import static io.netty.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS; -import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; -import static java.lang.String.CASE_INSENSITIVE_ORDER; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; @@ -35,6 +29,11 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; +import static io.netty.handler.codec.http.HttpHeaders.Names.*; +import static io.netty.handler.codec.http.HttpResponseStatus.*; +import static io.netty.handler.codec.http.HttpVersion.*; +import static java.lang.String.*; + /** * A server-side handler that receives HTTP requests and optionally performs a protocol switch if * the requested protocol is supported. Once an upgrade is performed, this handler removes itself @@ -57,13 +56,14 @@ public class HttpServerUpgradeHandler extends HttpObjectAggregator { */ public interface UpgradeCodec { /** - * Returns the name of the protocol supported by this codec, as indicated by the {@link UPGRADE} header. + * Returns the name of the protocol supported by this codec, as indicated by the + * {@link HttpHeaders.Names#UPGRADE} header. */ String protocol(); /** * Gets all protocol-specific headers required by this protocol for a successful upgrade. - * Any supplied header will be required to appear in the {@link CONNECTION} header as well. + * Any supplied header will be required to appear in the {@link HttpHeaders.Names#CONNECTION} header as well. */ Collection requiredUpgradeHeaders(); @@ -81,13 +81,11 @@ public class HttpServerUpgradeHandler extends HttpObjectAggregator { * @param upgradeRequest the request that triggered the upgrade to this protocol. The * upgraded protocol is responsible for sending the response. * @param upgradeResponse a 101 Switching Protocols response that is populated with the - * {@link CONNECTION} and {@link UPGRADE} headers. The protocol is required to - * send this before sending any other frames back to the client. The headers may - * be augmented as necessary by the protocol before sending. - * @return the future for the writing of the upgrade response. + * {@link HttpHeaders.Names#CONNECTION} and {@link HttpHeaders.Names#UPGRADE} headers. + * The protocol is required to send this before sending any other frames back to the client. + * The headers may be augmented as necessary by the protocol before sending. */ - void upgradeTo(ChannelHandlerContext ctx, FullHttpRequest upgradeRequest, - FullHttpResponse upgradeResponse); + void upgradeTo(ChannelHandlerContext ctx, FullHttpRequest upgradeRequest, FullHttpResponse upgradeResponse); } /** @@ -159,8 +157,7 @@ public class HttpServerUpgradeHandler extends HttpObjectAggregator { @Override public String toString() { - return "UpgradeEvent [protocol=" + protocol + ", upgradeRequest=" + upgradeRequest - + "]"; + return "UpgradeEvent [protocol=" + protocol + ", upgradeRequest=" + upgradeRequest + ']'; } } @@ -197,7 +194,7 @@ public class HttpServerUpgradeHandler extends HttpObjectAggregator { protected void decode(ChannelHandlerContext ctx, HttpObject msg, List out) throws Exception { // Determine if we're already handling an upgrade request or just starting a new one. - handlingUpgrade = handlingUpgrade || isUpgradeRequest(msg); + handlingUpgrade |= isUpgradeRequest(msg); if (!handlingUpgrade) { // Not handling an upgrade request, just pass it to the next handler. ReferenceCountUtil.retain(msg); @@ -205,7 +202,7 @@ public class HttpServerUpgradeHandler extends HttpObjectAggregator { return; } - FullHttpRequest fullRequest = null; + FullHttpRequest fullRequest; if (msg instanceof FullHttpRequest) { fullRequest = (FullHttpRequest) msg; ReferenceCountUtil.retain(msg); @@ -238,12 +235,12 @@ public class HttpServerUpgradeHandler extends HttpObjectAggregator { /** * Determines whether or not the message is an HTTP upgrade request. */ - private boolean isUpgradeRequest(HttpObject msg) { - return (msg instanceof HttpRequest) && ((HttpRequest) msg).headers().get(UPGRADE) != null; + private static boolean isUpgradeRequest(HttpObject msg) { + return msg instanceof HttpRequest && ((HttpRequest) msg).headers().get(UPGRADE) != null; } /** - * Attempts to upgrade to the protocol(s) identified by the {@link UPGRADE} header (if provided + * Attempts to upgrade to the protocol(s) identified by the {@link HttpHeaders.Names#UPGRADE} header (if provided * in the request). * * @param ctx the context for this handler. @@ -336,7 +333,7 @@ public class HttpServerUpgradeHandler extends HttpObjectAggregator { /** * Creates the 101 Switching Protocols response message. */ - private FullHttpResponse createUpgradeResponse(UpgradeCodec upgradeCodec) { + private static FullHttpResponse createUpgradeResponse(UpgradeCodec upgradeCodec) { DefaultFullHttpResponse res = new DefaultFullHttpResponse(HTTP_1_1, SWITCHING_PROTOCOLS); res.headers().add(CONNECTION, UPGRADE); res.headers().add(UPGRADE, upgradeCodec.protocol()); diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/LastHttpContent.java b/codec-http/src/main/java/io/netty/handler/codec/http/LastHttpContent.java index 2f68e10c16..33f0cd7fbe 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/LastHttpContent.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/LastHttpContent.java @@ -50,7 +50,7 @@ public interface LastHttpContent extends HttpContent { } @Override - public DecoderResult getDecoderResult() { + public DecoderResult decoderResult() { return DecoderResult.SUCCESS; } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/multipart/HttpPostRequestEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/multipart/HttpPostRequestEncoder.java index 30c94342bc..40c24755fb 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/multipart/HttpPostRequestEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/multipart/HttpPostRequestEncoder.java @@ -1149,8 +1149,8 @@ public class HttpPostRequestEncoder implements ChunkedInput { } @Override - public DecoderResult getDecoderResult() { - return request.getDecoderResult(); + public DecoderResult decoderResult() { + return request.decoderResult(); } @Override diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketFrameAggregator.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketFrameAggregator.java index c8cb395b61..5df7ea8557 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketFrameAggregator.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketFrameAggregator.java @@ -16,110 +16,78 @@ package io.netty.handler.codec.http.websocketx; import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.MessageAggregator; import io.netty.handler.codec.TooLongFrameException; -import java.util.List; - /** * Handler that aggregate fragmented WebSocketFrame's. * * Be aware if PING/PONG/CLOSE frames are send in the middle of a fragmented {@link WebSocketFrame} they will * just get forwarded to the next handler in the pipeline. */ -public class WebSocketFrameAggregator extends MessageToMessageDecoder { - private final int maxFrameSize; - private WebSocketFrame currentFrame; - private boolean tooLongFrameFound; +public class WebSocketFrameAggregator + extends MessageAggregator { /** - * Construct a new instance + * Creates a new instance * - * @param maxFrameSize If the size of the aggregated frame exceeds this value, - * a {@link TooLongFrameException} is thrown. + * @param maxContentLength If the size of the aggregated frame exceeds this value, + * a {@link TooLongFrameException} is thrown. */ - public WebSocketFrameAggregator(int maxFrameSize) { - if (maxFrameSize < 1) { - throw new IllegalArgumentException("maxFrameSize must be > 0"); - } - this.maxFrameSize = maxFrameSize; + public WebSocketFrameAggregator(int maxContentLength) { + super(maxContentLength); } @Override - protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List out) throws Exception { - if (currentFrame == null) { - tooLongFrameFound = false; - if (msg.isFinalFragment()) { - out.add(msg.retain()); - return; - } - ByteBuf buf = ctx.alloc().compositeBuffer().addComponent(msg.content().retain()); - buf.writerIndex(buf.writerIndex() + msg.content().readableBytes()); - - if (msg instanceof TextWebSocketFrame) { - currentFrame = new TextWebSocketFrame(true, msg.rsv(), buf); - } else if (msg instanceof BinaryWebSocketFrame) { - currentFrame = new BinaryWebSocketFrame(true, msg.rsv(), buf); - } else { - buf.release(); - throw new IllegalStateException( - "WebSocket frame was not of type TextWebSocketFrame or BinaryWebSocketFrame"); - } - return; - } - if (msg instanceof ContinuationWebSocketFrame) { - if (tooLongFrameFound) { - if (msg.isFinalFragment()) { - currentFrame = null; - } - return; - } - CompositeByteBuf content = (CompositeByteBuf) currentFrame.content(); - if (content.readableBytes() > maxFrameSize - msg.content().readableBytes()) { - // release the current frame - currentFrame.release(); - tooLongFrameFound = true; - throw new TooLongFrameException( - "WebSocketFrame length exceeded " + content + - " bytes."); - } - content.addComponent(msg.content().retain()); - content.writerIndex(content.writerIndex() + msg.content().readableBytes()); - - if (msg.isFinalFragment()) { - WebSocketFrame currentFrame = this.currentFrame; - this.currentFrame = null; - out.add(currentFrame); - return; - } else { - return; - } - } - // It is possible to receive CLOSE/PING/PONG frames during fragmented frames so just pass them to the next - // handler in the chain - out.add(msg.retain()); + protected boolean isStartMessage(WebSocketFrame msg) throws Exception { + return msg instanceof TextWebSocketFrame || msg instanceof BinaryWebSocketFrame; } @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - super.channelInactive(ctx); - // release current frame if it is not null as it may be a left-over - if (currentFrame != null) { - currentFrame.release(); - currentFrame = null; - } + protected boolean isContentMessage(WebSocketFrame msg) throws Exception { + return msg instanceof ContinuationWebSocketFrame; } @Override - public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - super.handlerRemoved(ctx); - // release current frame if it is not null as it may be a left-over as there is not much more we can do in - // this case - if (currentFrame != null) { - currentFrame.release(); - currentFrame = null; + protected boolean isLastContentMessage(ContinuationWebSocketFrame msg) throws Exception { + return isContentMessage(msg) && msg.isFinalFragment(); + } + + @Override + protected boolean isAggregated(WebSocketFrame msg) throws Exception { + if (msg.isFinalFragment()) { + return !isContentMessage(msg); } + + return !isStartMessage(msg) && !isContentMessage(msg); + } + + @Override + protected boolean hasContentLength(WebSocketFrame start) throws Exception { + return false; + } + + @Override + protected long contentLength(WebSocketFrame start) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected Object newContinueResponse(WebSocketFrame start) throws Exception { + return null; + } + + @Override + protected WebSocketFrame beginAggregation(WebSocketFrame start, ByteBuf content) throws Exception { + if (start instanceof TextWebSocketFrame) { + return new TextWebSocketFrame(true, start.rsv(), content); + } + + if (start instanceof BinaryWebSocketFrame) { + return new BinaryWebSocketFrame(true, start.rsv(), content); + } + + // Should not reach here. + throw new Error(); } } diff --git a/codec-http/src/test/java/io/netty/handler/codec/http/HttpInvalidMessageTest.java b/codec-http/src/test/java/io/netty/handler/codec/http/HttpInvalidMessageTest.java index 54596cc673..3baff2cf91 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/http/HttpInvalidMessageTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/http/HttpInvalidMessageTest.java @@ -35,7 +35,7 @@ public class HttpInvalidMessageTest { EmbeddedChannel ch = new EmbeddedChannel(new HttpRequestDecoder()); ch.writeInbound(Unpooled.copiedBuffer("GET / HTTP/1.0 with extra\r\n", CharsetUtil.UTF_8)); HttpRequest req = ch.readInbound(); - DecoderResult dr = req.getDecoderResult(); + DecoderResult dr = req.decoderResult(); assertFalse(dr.isSuccess()); assertTrue(dr.isFailure()); ensureInboundTrafficDiscarded(ch); @@ -49,7 +49,7 @@ public class HttpInvalidMessageTest { ch.writeInbound(Unpooled.copiedBuffer("Bad=Name: Bad Value\r\n", CharsetUtil.UTF_8)); ch.writeInbound(Unpooled.copiedBuffer("\r\n", CharsetUtil.UTF_8)); HttpRequest req = ch.readInbound(); - DecoderResult dr = req.getDecoderResult(); + DecoderResult dr = req.decoderResult(); assertFalse(dr.isSuccess()); assertTrue(dr.isFailure()); assertEquals("Good Value", req.headers().get("Good_Name")); @@ -62,7 +62,7 @@ public class HttpInvalidMessageTest { EmbeddedChannel ch = new EmbeddedChannel(new HttpResponseDecoder()); ch.writeInbound(Unpooled.copiedBuffer("HTTP/1.0 BAD_CODE Bad Server\r\n", CharsetUtil.UTF_8)); HttpResponse res = ch.readInbound(); - DecoderResult dr = res.getDecoderResult(); + DecoderResult dr = res.decoderResult(); assertFalse(dr.isSuccess()); assertTrue(dr.isFailure()); ensureInboundTrafficDiscarded(ch); @@ -76,7 +76,7 @@ public class HttpInvalidMessageTest { ch.writeInbound(Unpooled.copiedBuffer("Bad=Name: Bad Value\r\n", CharsetUtil.UTF_8)); ch.writeInbound(Unpooled.copiedBuffer("\r\n", CharsetUtil.UTF_8)); HttpResponse res = ch.readInbound(); - DecoderResult dr = res.getDecoderResult(); + DecoderResult dr = res.decoderResult(); assertFalse(dr.isSuccess()); assertTrue(dr.isFailure()); assertEquals("Maybe OK", res.getStatus().reasonPhrase()); @@ -92,10 +92,10 @@ public class HttpInvalidMessageTest { ch.writeInbound(Unpooled.copiedBuffer("BAD_LENGTH\r\n", CharsetUtil.UTF_8)); HttpRequest req = ch.readInbound(); - assertTrue(req.getDecoderResult().isSuccess()); + assertTrue(req.decoderResult().isSuccess()); LastHttpContent chunk = ch.readInbound(); - DecoderResult dr = chunk.getDecoderResult(); + DecoderResult dr = chunk.decoderResult(); assertFalse(dr.isSuccess()); assertTrue(dr.isFailure()); ensureInboundTrafficDiscarded(ch); diff --git a/codec-http/src/test/java/io/netty/handler/codec/http/HttpResponseDecoderTest.java b/codec-http/src/test/java/io/netty/handler/codec/http/HttpResponseDecoderTest.java index 2f5188ae9b..c6777082a5 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/http/HttpResponseDecoderTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/http/HttpResponseDecoderTest.java @@ -524,8 +524,8 @@ public class HttpResponseDecoderTest { HttpResponse res = ch.readInbound(); assertThat(res.getProtocolVersion(), sameInstance(HttpVersion.HTTP_1_0)); assertThat(res.getStatus().code(), is(999)); - assertThat(res.getDecoderResult().isFailure(), is(true)); - assertThat(res.getDecoderResult().isFinished(), is(true)); + assertThat(res.decoderResult().isFailure(), is(true)); + assertThat(res.decoderResult().isFinished(), is(true)); assertThat(ch.readInbound(), is(nullValue())); // More garbage should not generate anything (i.e. the decoder discards anything beyond this point.) @@ -554,7 +554,7 @@ public class HttpResponseDecoderTest { // Ensure that the decoder generates the last chunk with correct decoder result. LastHttpContent invalidChunk = channel.readInbound(); - assertThat(invalidChunk.getDecoderResult().isFailure(), is(true)); + assertThat(invalidChunk.decoderResult().isFailure(), is(true)); invalidChunk.release(); // And no more messages should be produced by the decoder. diff --git a/codec-memcache/src/main/java/io/netty/handler/codec/memcache/AbstractMemcacheObjectAggregator.java b/codec-memcache/src/main/java/io/netty/handler/codec/memcache/AbstractMemcacheObjectAggregator.java index 15e5042cc7..89850599cf 100644 --- a/codec-memcache/src/main/java/io/netty/handler/codec/memcache/AbstractMemcacheObjectAggregator.java +++ b/codec-memcache/src/main/java/io/netty/handler/codec/memcache/AbstractMemcacheObjectAggregator.java @@ -16,9 +16,8 @@ package io.netty.handler.codec.memcache; import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; -import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.MessageAggregator; import io.netty.handler.codec.memcache.binary.BinaryMemcacheRequestDecoder; import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseEncoder; @@ -42,91 +41,40 @@ import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseEncoder; * p.addLast("handler", new YourMemcacheRequestHandler()); * */ -public abstract class AbstractMemcacheObjectAggregator extends MessageToMessageDecoder { - - /** - * Contains the current message that gets aggregated. - */ - protected FullMemcacheMessage currentMessage; - - /** - * Holds the current channel handler context if set. - */ - protected ChannelHandlerContext ctx; - - public static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024; - - private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS; - - private final int maxContentLength; +public abstract class AbstractMemcacheObjectAggregator extends + MessageAggregator { protected AbstractMemcacheObjectAggregator(int maxContentLength) { - if (maxContentLength <= 0) { - throw new IllegalArgumentException("maxContentLength must be a positive integer: " + maxContentLength); - } - - this.maxContentLength = maxContentLength; - } - - /** - * Returns the maximum number of components in the cumulation buffer. If the number of - * the components in the cumulation buffer exceeds this value, the components of the - * cumulation buffer are consolidated into a single component, involving memory copies. - * The default value of this property is {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}. - */ - public final int getMaxCumulationBufferComponents() { - return maxCumulationBufferComponents; - } - - /** - * Sets the maximum number of components in the cumulation buffer. If the number of - * the components in the cumulation buffer exceeds this value, the components of the - * cumulation buffer are consolidated into a single component, involving memory copies. - * The default value of this property is {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS} - * and its minimum allowed value is {@code 2}. - */ - public final void setMaxCumulationBufferComponents(int maxCumulationBufferComponents) { - if (maxCumulationBufferComponents < 2) { - throw new IllegalArgumentException( - "maxCumulationBufferComponents: " + maxCumulationBufferComponents + - " (expected: >= 2)"); - } - - if (ctx == null) { - this.maxCumulationBufferComponents = maxCumulationBufferComponents; - } else { - throw new IllegalStateException( - "decoder properties cannot be changed once the decoder is added to a pipeline."); - } - } - - public int getMaxContentLength() { - return maxContentLength; + super(maxContentLength); } @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - super.channelInactive(ctx); - - if (currentMessage != null) { - currentMessage.release(); - currentMessage = null; - } + protected boolean isContentMessage(MemcacheObject msg) throws Exception { + return msg instanceof MemcacheContent; } @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - this.ctx = ctx; + protected boolean isLastContentMessage(MemcacheContent msg) throws Exception { + return msg instanceof LastMemcacheContent; } @Override - public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - super.handlerRemoved(ctx); - - if (currentMessage != null) { - currentMessage.release(); - currentMessage = null; - } + protected boolean isAggregated(MemcacheObject msg) throws Exception { + return msg instanceof FullMemcacheMessage; } + @Override + protected boolean hasContentLength(H start) throws Exception { + return false; + } + + @Override + protected long contentLength(H start) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected Object newContinueResponse(H start) throws Exception { + return null; + } } diff --git a/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/BinaryMemcacheObjectAggregator.java b/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/BinaryMemcacheObjectAggregator.java index 7c5ddbad80..30aee7d4ed 100644 --- a/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/BinaryMemcacheObjectAggregator.java +++ b/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/BinaryMemcacheObjectAggregator.java @@ -16,20 +16,10 @@ package io.netty.handler.codec.memcache.binary; import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.DecoderResult; -import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.memcache.AbstractMemcacheObjectAggregator; import io.netty.handler.codec.memcache.FullMemcacheMessage; -import io.netty.handler.codec.memcache.LastMemcacheContent; import io.netty.handler.codec.memcache.MemcacheContent; -import io.netty.handler.codec.memcache.MemcacheMessage; import io.netty.handler.codec.memcache.MemcacheObject; -import io.netty.util.ReferenceCountUtil; - -import java.util.List; /** * An object aggregator for the memcache binary protocol. @@ -37,111 +27,34 @@ import java.util.List; * It aggregates {@link BinaryMemcacheMessage}s and {@link MemcacheContent} into {@link FullBinaryMemcacheRequest}s * or {@link FullBinaryMemcacheResponse}s. */ -public class BinaryMemcacheObjectAggregator extends AbstractMemcacheObjectAggregator { - - private boolean tooLongFrameFound; +public class BinaryMemcacheObjectAggregator extends AbstractMemcacheObjectAggregator { public BinaryMemcacheObjectAggregator(int maxContentLength) { super(maxContentLength); } @Override - protected void decode(ChannelHandlerContext ctx, MemcacheObject msg, List out) throws Exception { - FullMemcacheMessage currentMessage = this.currentMessage; - - if (msg instanceof MemcacheMessage) { - tooLongFrameFound = false; - MemcacheMessage m = (MemcacheMessage) msg; - - if (!m.getDecoderResult().isSuccess()) { - out.add(toFullMessage(m)); - this.currentMessage = null; - return; - } - - if (msg instanceof BinaryMemcacheRequest) { - this.currentMessage = toFullRequest((BinaryMemcacheRequest) msg, - Unpooled.compositeBuffer(getMaxCumulationBufferComponents())); - } else if (msg instanceof BinaryMemcacheResponse) { - this.currentMessage = toFullResponse((BinaryMemcacheResponse) msg, - Unpooled.compositeBuffer(getMaxCumulationBufferComponents())); - } else { - throw new Error(); - } - } else if (msg instanceof MemcacheContent) { - if (tooLongFrameFound) { - if (msg instanceof LastMemcacheContent) { - this.currentMessage = null; - } - return; - } - - MemcacheContent chunk = (MemcacheContent) msg; - CompositeByteBuf content = (CompositeByteBuf) currentMessage.content(); - - if (content.readableBytes() > getMaxContentLength() - chunk.content().readableBytes()) { - tooLongFrameFound = true; - - currentMessage.release(); - this.currentMessage = null; - - throw new TooLongFrameException("Memcache content length exceeded " + getMaxContentLength() - + " bytes."); - } - - if (chunk.content().isReadable()) { - chunk.retain(); - content.addComponent(chunk.content()); - content.writerIndex(content.writerIndex() + chunk.content().readableBytes()); - } - - final boolean last; - if (!chunk.getDecoderResult().isSuccess()) { - currentMessage.setDecoderResult( - DecoderResult.failure(chunk.getDecoderResult().cause())); - last = true; - } else { - last = chunk instanceof LastMemcacheContent; - } - - if (last) { - this.currentMessage = null; - out.add(currentMessage); - } - } else { - throw new Error(); - } + protected boolean isStartMessage(MemcacheObject msg) throws Exception { + return msg instanceof BinaryMemcacheMessage; } - /** - * Convert a invalid message into a full message. - * - * This method makes sure that upstream handlers always get a full message returned, even - * when invalid chunks are failing. - * - * @param msg the message to transform. - * @return a full message containing parts of the original message. - */ - private static FullMemcacheMessage toFullMessage(final MemcacheMessage msg) { - if (msg instanceof FullMemcacheMessage) { - return ((FullMemcacheMessage) msg).retain(); + @Override + protected FullMemcacheMessage beginAggregation(BinaryMemcacheMessage start, ByteBuf content) throws Exception { + if (start instanceof BinaryMemcacheRequest) { + return toFullRequest((BinaryMemcacheRequest) start, content); } - FullMemcacheMessage fullMsg; - if (msg instanceof BinaryMemcacheRequest) { - fullMsg = toFullRequest((BinaryMemcacheRequest) msg, Unpooled.EMPTY_BUFFER); - } else if (msg instanceof BinaryMemcacheResponse) { - fullMsg = toFullResponse((BinaryMemcacheResponse) msg, Unpooled.EMPTY_BUFFER); - } else { - throw new IllegalStateException(); + if (start instanceof BinaryMemcacheResponse) { + return toFullResponse((BinaryMemcacheResponse) start, content); } - return fullMsg; + // Should not reach here. + throw new Error(); } private static FullBinaryMemcacheRequest toFullRequest(BinaryMemcacheRequest request, ByteBuf content) { - FullBinaryMemcacheRequest fullRequest = new DefaultFullBinaryMemcacheRequest(request.getKey(), - request.getExtras(), content); + FullBinaryMemcacheRequest fullRequest = + new DefaultFullBinaryMemcacheRequest(request.getKey(), request.getExtras(), content); fullRequest.setMagic(request.getMagic()); fullRequest.setOpcode(request.getOpcode()); @@ -157,8 +70,8 @@ public class BinaryMemcacheObjectAggregator extends AbstractMemcacheObjectAggreg } private static FullBinaryMemcacheResponse toFullResponse(BinaryMemcacheResponse response, ByteBuf content) { - FullBinaryMemcacheResponse fullResponse = new DefaultFullBinaryMemcacheResponse(response.getKey(), - response.getExtras(), content); + FullBinaryMemcacheResponse fullResponse = + new DefaultFullBinaryMemcacheResponse(response.getKey(), response.getExtras(), content); fullResponse.setMagic(response.getMagic()); fullResponse.setOpcode(response.getOpcode()); @@ -172,5 +85,4 @@ public class BinaryMemcacheObjectAggregator extends AbstractMemcacheObjectAggreg return fullResponse; } - } diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultStompContentSubframe.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultStompContentSubframe.java index 2c212634ce..d38566c7b1 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultStompContentSubframe.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultStompContentSubframe.java @@ -23,7 +23,7 @@ import io.netty.handler.codec.DecoderResult; * The default {@link StompContentSubframe} implementation. */ public class DefaultStompContentSubframe implements StompContentSubframe { - private DecoderResult decoderResult; + private DecoderResult decoderResult = DecoderResult.SUCCESS; private final ByteBuf content; public DefaultStompContentSubframe(ByteBuf content) { diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultStompHeadersSubframe.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultStompHeadersSubframe.java index 236fcddd56..2f4dc990cb 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultStompHeadersSubframe.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultStompHeadersSubframe.java @@ -23,7 +23,7 @@ import io.netty.handler.codec.DecoderResult; public class DefaultStompHeadersSubframe implements StompHeadersSubframe { protected final StompCommand command; - protected DecoderResult decoderResult; + protected DecoderResult decoderResult = DecoderResult.SUCCESS; protected final StompHeaders headers = new StompHeaders(); public DefaultStompHeadersSubframe(StompCommand command) { diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframe.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframe.java index 22f676a335..bedba18046 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframe.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframe.java @@ -15,20 +15,9 @@ */ package io.netty.handler.codec.stomp; -import io.netty.handler.codec.DecoderResult; +import io.netty.handler.codec.DecoderResultProvider; /** * Defines a common interface for all {@link StompSubframe} implementations. */ -public interface StompSubframe { - /** - * Returns the result of decoding this object. - */ - DecoderResult decoderResult(); - - /** - * Updates the result of decoding this object. This method is supposed to be invoked by - * {@link StompSubframeDecoder}. Do not call this method unless you know what you are doing. - */ - void setDecoderResult(DecoderResult result); -} +public interface StompSubframe extends DecoderResultProvider { } diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeAggregator.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeAggregator.java index 093e3f84fd..0c93342a27 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeAggregator.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeAggregator.java @@ -15,32 +15,20 @@ */ package io.netty.handler.codec.stomp; -import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.Unpooled; +import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; -import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.MessageAggregator; import io.netty.handler.codec.TooLongFrameException; -import java.util.List; - /** * A {@link ChannelHandler} that aggregates an {@link StompHeadersSubframe} * and its following {@link StompContentSubframe}s into a single {@link StompFrame}. * It is useful when you don't want to take care of STOMP frames whose content is 'chunked'. Insert this * handler after {@link StompSubframeDecoder} in the {@link ChannelPipeline}: */ -public class StompSubframeAggregator extends MessageToMessageDecoder { - - private static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024; - - private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS; - - private final int maxContentLength; - private StompFrame currentFrame; - private boolean tooLongFrameFound; - private volatile boolean handlerAdded; +public class StompSubframeAggregator + extends MessageAggregator { /** * Creates a new instance. @@ -51,97 +39,48 @@ public class StompSubframeAggregator extends MessageToMessageDecoder= 2)"); - } - - if (!handlerAdded) { - this.maxCumulationBufferComponents = maxCumulationBufferComponents; - } else { - throw new IllegalStateException( - "decoder properties cannot be changed once the decoder is added to a pipeline."); - } + super(maxContentLength); } @Override - protected void decode(ChannelHandlerContext ctx, StompSubframe msg, List out) throws Exception { - StompFrame currentFrame = this.currentFrame; - if (msg instanceof StompHeadersSubframe) { - assert currentFrame == null; - StompHeadersSubframe frame = (StompHeadersSubframe) msg; - this.currentFrame = currentFrame = new DefaultStompFrame(frame.command(), - Unpooled.compositeBuffer(maxCumulationBufferComponents)); - currentFrame.headers().set(frame.headers()); - } else if (msg instanceof StompContentSubframe) { - if (tooLongFrameFound) { - if (msg instanceof LastStompContentSubframe) { - this.currentFrame = null; - } - return; - } - assert currentFrame != null; - StompContentSubframe chunk = (StompContentSubframe) msg; - CompositeByteBuf contentBuf = (CompositeByteBuf) currentFrame.content(); - if (contentBuf.readableBytes() > maxContentLength - chunk.content().readableBytes()) { - tooLongFrameFound = true; - currentFrame.release(); - this.currentFrame = null; - throw new TooLongFrameException( - "STOMP content length exceeded " + maxContentLength + - " bytes."); - } - - contentBuf.addComponent(chunk.retain().content()); - contentBuf.writerIndex(contentBuf.writerIndex() + chunk.content().readableBytes()); - if (chunk instanceof LastStompContentSubframe) { - out.add(currentFrame); - this.currentFrame = null; - } - } else { - throw new IllegalArgumentException("received unsupported object type " + msg); - } + protected boolean isStartMessage(StompSubframe msg) throws Exception { + return msg instanceof StompHeadersSubframe; } @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - super.handlerAdded(ctx); - handlerAdded = true; + protected boolean isContentMessage(StompSubframe msg) throws Exception { + return msg instanceof StompContentSubframe; } @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - super.channelInactive(ctx); - if (currentFrame != null) { - currentFrame.release(); - currentFrame = null; - } + protected boolean isLastContentMessage(StompContentSubframe msg) throws Exception { + return msg instanceof LastStompContentSubframe; } @Override - public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - super.handlerRemoved(ctx); - handlerAdded = false; - if (currentFrame != null) { - currentFrame.release(); - currentFrame = null; - } + protected boolean isAggregated(StompSubframe msg) throws Exception { + return msg instanceof StompFrame; + } + + @Override + protected boolean hasContentLength(StompHeadersSubframe start) throws Exception { + return start.headers().has(StompHeaders.CONTENT_LENGTH); + } + + @Override + protected long contentLength(StompHeadersSubframe start) throws Exception { + return Long.parseLong(start.headers().get(StompHeaders.CONTENT_LENGTH)); + } + + @Override + protected Object newContinueResponse(StompHeadersSubframe start) throws Exception { + return null; + } + + @Override + protected StompFrame beginAggregation(StompHeadersSubframe start, ByteBuf content) throws Exception { + StompFrame ret = new DefaultStompFrame(start.command(), content); + ret.headers().set(start.headers()); + return ret; } } diff --git a/codec/src/main/java/io/netty/handler/codec/DecoderResultProvider.java b/codec/src/main/java/io/netty/handler/codec/DecoderResultProvider.java new file mode 100644 index 0000000000..082d4bbad2 --- /dev/null +++ b/codec/src/main/java/io/netty/handler/codec/DecoderResultProvider.java @@ -0,0 +1,33 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec; + +/** + * Provides the accessor methods for the {@link DecoderResult} property of a decoded message. + */ +public interface DecoderResultProvider { + /** + * Returns the result of decoding this object. + */ + DecoderResult decoderResult(); + + /** + * Updates the result of decoding this object. This method is supposed to be invoked by a decoder. + * Do not call this method unless you know what you are doing. + */ + void setDecoderResult(DecoderResult result); +} diff --git a/codec/src/main/java/io/netty/handler/codec/MessageAggregationException.java b/codec/src/main/java/io/netty/handler/codec/MessageAggregationException.java new file mode 100644 index 0000000000..9fd1f45bd7 --- /dev/null +++ b/codec/src/main/java/io/netty/handler/codec/MessageAggregationException.java @@ -0,0 +1,39 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec; + +/** + * Raised by {@link MessageAggregator} when aggregation fails due to an unexpected message sequence. + */ +public class MessageAggregationException extends IllegalStateException { + + private static final long serialVersionUID = -1995826182950310255L; + + public MessageAggregationException() { } + + public MessageAggregationException(String s) { + super(s); + } + + public MessageAggregationException(String message, Throwable cause) { + super(message, cause); + } + + public MessageAggregationException(Throwable cause) { + super(cause); + } +} diff --git a/codec/src/main/java/io/netty/handler/codec/MessageAggregator.java b/codec/src/main/java/io/netty/handler/codec/MessageAggregator.java new file mode 100644 index 0000000000..6a84ca4f1e --- /dev/null +++ b/codec/src/main/java/io/netty/handler/codec/MessageAggregator.java @@ -0,0 +1,408 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufHolder; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.ReferenceCountUtil; + +import java.util.List; + +/** + * An abstract {@link ChannelHandler} that aggregates a series of message objects into a single aggregated message. + *

+ * 'A series of messages' is composed of the following: + *

    + *
  • a single start message which optionally contains the first part of the content, and
  • + *
  • 1 or more content messages.
  • + *
+ * The content of the aggregated message will be the merged content of the start message and its following content + * messages. If this aggregator encounters a content message where {@link #isLastContentMessage(ByteBufHolder)} + * return {@code true} for, the aggregator will finish the aggregation and produce the aggregated message and expect + * another start message. + *

+ * + * @param the type that covers both start message and content message + * @param the type of the start message + * @param the type of the content message (must be a subtype of {@link ByteBufHolder}) + * @param the type of the aggregated message (must be a subtype of {@code S} and {@link ByteBufHolder}) + */ +public abstract class MessageAggregator + extends MessageToMessageDecoder { + + private static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024; + + private final int maxContentLength; + private O currentMessage; + private boolean handlingOversizedMessage; + + private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS; + private ChannelHandlerContext ctx; + private ChannelFutureListener continueResponseWriteListener; + + /** + * Creates a new instance. + * + * @param maxContentLength + * the maximum length of the aggregated content. + * If the length of the aggregated content exceeds this value, + * {@link #handleOversizedMessage(ChannelHandlerContext, Object)} will be called. + */ + protected MessageAggregator(int maxContentLength) { + validateMaxContentLength(maxContentLength); + this.maxContentLength = maxContentLength; + } + + protected MessageAggregator(int maxContentLength, Class inboundMessageType) { + super(inboundMessageType); + validateMaxContentLength(maxContentLength); + this.maxContentLength = maxContentLength; + } + + private static void validateMaxContentLength(int maxContentLength) { + if (maxContentLength <= 0) { + throw new IllegalArgumentException("maxContentLength must be a positive integer: " + maxContentLength); + } + } + + @Override + public boolean acceptInboundMessage(Object msg) throws Exception { + // No need to match last and full types because they are subset of first and middle types. + if (!super.acceptInboundMessage(msg)) { + return false; + } + + @SuppressWarnings("unchecked") + I in = (I) msg; + + return (isContentMessage(in) || isStartMessage(in)) && !isAggregated(in); + } + + /** + * Returns {@code true} if and only if the specified message is a start message. Typically, this method is + * implemented as a single {@code return} statement with {@code instanceof}: + *
+     * return msg instanceof MyStartMessage;
+     * 
+ */ + protected abstract boolean isStartMessage(I msg) throws Exception; + + /** + * Returns {@code true} if and only if the specified message is a content message. Typically, this method is + * implemented as a single {@code return} statement with {@code instanceof}: + *
+     * return msg instanceof MyContentMessage;
+     * 
+ */ + protected abstract boolean isContentMessage(I msg) throws Exception; + + /** + * Returns {@code true} if and only if the specified message is the last content message. Typically, this method is + * implemented as a single {@code return} statement with {@code instanceof}: + *
+     * return msg instanceof MyLastContentMessage;
+     * 
+ * or with {@code instanceof} and boolean field check: + *
+     * return msg instanceof MyContentMessage && msg.isLastFragment();
+     * 
+ */ + protected abstract boolean isLastContentMessage(C msg) throws Exception; + + /** + * Returns {@code true} if and only if the specified message is already aggregated. If this method returns + * {@code true}, this handler will simply forward the message to the next handler as-is. + */ + protected abstract boolean isAggregated(I msg) throws Exception; + + /** + * Returns the maximum allowed length of the aggregated message. + */ + public final int maxContentLength() { + return maxContentLength; + } + + /** + * Returns the maximum number of components in the cumulation buffer. If the number of + * the components in the cumulation buffer exceeds this value, the components of the + * cumulation buffer are consolidated into a single component, involving memory copies. + * The default value of this property is {@value #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}. + */ + public final int maxCumulationBufferComponents() { + return maxCumulationBufferComponents; + } + + /** + * Sets the maximum number of components in the cumulation buffer. If the number of + * the components in the cumulation buffer exceeds this value, the components of the + * cumulation buffer are consolidated into a single component, involving memory copies. + * The default value of this property is {@value #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS} + * and its minimum allowed value is {@code 2}. + */ + public final void setMaxCumulationBufferComponents(int maxCumulationBufferComponents) { + if (maxCumulationBufferComponents < 2) { + throw new IllegalArgumentException( + "maxCumulationBufferComponents: " + maxCumulationBufferComponents + + " (expected: >= 2)"); + } + + if (ctx == null) { + this.maxCumulationBufferComponents = maxCumulationBufferComponents; + } else { + throw new IllegalStateException( + "decoder properties cannot be changed once the decoder is added to a pipeline."); + } + } + + public final boolean isHandlingOversizedMessage() { + return handlingOversizedMessage; + } + + protected final ChannelHandlerContext ctx() { + if (ctx == null) { + throw new IllegalStateException("not added to a pipeline yet"); + } + return ctx; + } + + @Override + protected void decode(final ChannelHandlerContext ctx, I msg, List out) throws Exception { + O currentMessage = this.currentMessage; + + if (isStartMessage(msg)) { + handlingOversizedMessage = false; + if (currentMessage != null) { + throw new MessageAggregationException(); + } + + @SuppressWarnings("unchecked") + S m = (S) msg; + + // if content length is set, preemptively close if it's too large + if (hasContentLength(m)) { + if (contentLength(m) > maxContentLength) { + // handle oversized message + invokeHandleOversizedMessage(ctx, m); + return; + } + } + + // Send the continue response if necessary (e.g. 'Expect: 100-continue' header) + Object continueResponse = newContinueResponse(m); + if (continueResponse != null) { + // Cache the write listener for reuse. + ChannelFutureListener listener = continueResponseWriteListener; + if (listener == null) { + continueResponseWriteListener = listener = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + ctx.fireExceptionCaught(future.cause()); + } + } + }; + } + ctx.writeAndFlush(continueResponse).addListener(listener); + } + + if (m instanceof DecoderResultProvider && !((DecoderResultProvider) m).decoderResult().isSuccess()) { + O aggregated; + if (m instanceof ByteBufHolder && ((ByteBufHolder) m).content().isReadable()) { + aggregated = beginAggregation(m, ((ByteBufHolder) m).content().retain()); + } else { + aggregated = beginAggregation(m, Unpooled.EMPTY_BUFFER); + } + finishAggregation(aggregated); + out.add(aggregated); + this.currentMessage = null; + return; + } + + // A streamed message - initialize the cumulative buffer, and wait for incoming chunks. + CompositeByteBuf content = Unpooled.compositeBuffer(maxCumulationBufferComponents); + if (m instanceof ByteBufHolder) { + appendPartialContent(content, ((ByteBufHolder) m).content()); + } + this.currentMessage = beginAggregation(m, content); + + } else if (isContentMessage(msg)) { + @SuppressWarnings("unchecked") + final C m = (C) msg; + final ByteBuf partialContent = ((ByteBufHolder) msg).content(); + final boolean isLastContentMessage = isLastContentMessage(m); + if (handlingOversizedMessage) { + if (isLastContentMessage) { + this.currentMessage = null; + } + // already detect the too long frame so just discard the content + return; + } + + if (currentMessage == null) { + throw new MessageAggregationException(); + } + + // Merge the received chunk into the content of the current message. + CompositeByteBuf content = (CompositeByteBuf) currentMessage.content(); + + // Handle oversized message. + if (content.readableBytes() > maxContentLength - partialContent.readableBytes()) { + // By convention, full message type extends first message type. + @SuppressWarnings("unchecked") + S s = (S) currentMessage; + invokeHandleOversizedMessage(ctx, s); + return; + } + + // Append the content of the chunk. + appendPartialContent(content, partialContent); + + // Give the subtypes a chance to merge additional information such as trailing headers. + aggregate(currentMessage, m); + + final boolean last; + if (m instanceof DecoderResultProvider) { + DecoderResult decoderResult = ((DecoderResultProvider) m).decoderResult(); + if (!decoderResult.isSuccess()) { + if (currentMessage instanceof DecoderResultProvider) { + ((DecoderResultProvider) currentMessage).setDecoderResult( + DecoderResult.failure(decoderResult.cause())); + } + last = true; + } else { + last = isLastContentMessage; + } + } else { + last = isLastContentMessage; + } + + if (last) { + finishAggregation(currentMessage); + + // All done + out.add(currentMessage); + this.currentMessage = null; + } + } else { + throw new MessageAggregationException(); + } + } + + private static void appendPartialContent(CompositeByteBuf content, ByteBuf partialContent) { + if (partialContent.isReadable()) { + partialContent.retain(); + content.addComponent(partialContent); + content.writerIndex(content.writerIndex() + partialContent.readableBytes()); + } + } + + /** + * Returns {@code true} if and only if the specified start message already contains the information about the + * length of the whole content. + */ + protected abstract boolean hasContentLength(S start) throws Exception; + + /** + * Retrieves the length of the whole content from the specified start message. This method is invoked only when + * {@link #hasContentLength(Object)} returned {@code true}. + */ + protected abstract long contentLength(S start) throws Exception; + + /** + * Returns the 'continue response' for the specified start message if necessary. For example, this method is + * useful to handle an HTTP 100-continue header. + * + * @return the 'continue response', or {@code null} if there's no message to send + */ + protected abstract Object newContinueResponse(S start) throws Exception; + + /** + * Creates a new aggregated message from the specified start message and the specified content. If the start + * message implements {@link ByteBufHolder}, its content is appended to the specified {@code content}. + * This aggregator will continue to append the received content to the specified {@code content}. + */ + protected abstract O beginAggregation(S start, ByteBuf content) throws Exception; + + /** + * Transfers the information provided by the specified content message to the specified aggregated message. + * Note that the content of the specified content message has been appended to the content of the specified + * aggregated message already, so that you don't need to. Use this method to transfer the additional information + * that the content message provides to {@code aggregated}. + */ + protected void aggregate(O aggregated, C content) throws Exception { } + + /** + * Invoked when the specified {@code aggregated} message is about to be passed to the next handler in the pipeline. + */ + protected void finishAggregation(O aggregated) throws Exception { } + + private void invokeHandleOversizedMessage(ChannelHandlerContext ctx, S oversized) throws Exception { + handlingOversizedMessage = true; + currentMessage = null; + try { + handleOversizedMessage(ctx, oversized); + } finally { + // Release the message in case it is a full one. + ReferenceCountUtil.release(oversized); + } + } + + /** + * Invoked when an incoming request exceeds the maximum content length. The default behvaior is to trigger an + * {@code exceptionCaught()} event with a {@link TooLongFrameException}. + * + * @param ctx the {@link ChannelHandlerContext} + * @param oversized the accumulated message up to this point, whose type is {@code S} or {@code O} + */ + protected void handleOversizedMessage(ChannelHandlerContext ctx, S oversized) throws Exception { + ctx.fireExceptionCaught( + new TooLongFrameException("content length exceeded " + maxContentLength() + " bytes.")); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + // release current message if it is not null as it may be a left-over + if (currentMessage != null) { + currentMessage.release(); + currentMessage = null; + } + + super.channelInactive(ctx); + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + this.ctx = ctx; + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + super.handlerRemoved(ctx); + + // release current message if it is not null as it may be a left-over as there is not much more we can do in + // this case + if (currentMessage != null) { + currentMessage.release(); + currentMessage = null; + } + } +} diff --git a/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java b/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java index 3bd35fc8a7..c38b3dbdd3 100644 --- a/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java +++ b/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java @@ -111,7 +111,7 @@ public class HttpStaticFileServerHandler extends SimpleChannelInboundHandler } private static void appendDecoderResult(StringBuilder buf, HttpObject o) { - DecoderResult result = o.getDecoderResult(); + DecoderResult result = o.decoderResult(); if (result.isSuccess()) { return; } @@ -148,7 +148,7 @@ public class HttpSnoopServerHandler extends SimpleChannelInboundHandler boolean keepAlive = isKeepAlive(request); // Build the response object. FullHttpResponse response = new DefaultFullHttpResponse( - HTTP_1_1, currentObj.getDecoderResult().isSuccess()? OK : BAD_REQUEST, + HTTP_1_1, currentObj.decoderResult().isSuccess()? OK : BAD_REQUEST, Unpooled.copiedBuffer(buf.toString(), CharsetUtil.UTF_8)); response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8"); diff --git a/example/src/main/java/io/netty/example/http/websocketx/server/WebSocketServerHandler.java b/example/src/main/java/io/netty/example/http/websocketx/server/WebSocketServerHandler.java index bdbcfe2837..b4a9eb99fa 100644 --- a/example/src/main/java/io/netty/example/http/websocketx/server/WebSocketServerHandler.java +++ b/example/src/main/java/io/netty/example/http/websocketx/server/WebSocketServerHandler.java @@ -64,7 +64,7 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { // Handle a bad request. - if (!req.getDecoderResult().isSuccess()) { + if (!req.decoderResult().isSuccess()) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST)); return; } diff --git a/testsuite/src/test/java/io/netty/testsuite/websockets/autobahn/AutobahnServerHandler.java b/testsuite/src/test/java/io/netty/testsuite/websockets/autobahn/AutobahnServerHandler.java index 5a2205f286..ef0a410322 100644 --- a/testsuite/src/test/java/io/netty/testsuite/websockets/autobahn/AutobahnServerHandler.java +++ b/testsuite/src/test/java/io/netty/testsuite/websockets/autobahn/AutobahnServerHandler.java @@ -71,7 +71,7 @@ public class AutobahnServerHandler extends ChannelHandlerAdapter { private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception { // Handle a bad request. - if (!req.getDecoderResult().isSuccess()) { + if (!req.decoderResult().isSuccess()) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST)); req.release(); return; @@ -106,11 +106,9 @@ public class AutobahnServerHandler extends ChannelHandlerAdapter { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame); } else if (frame instanceof PingWebSocketFrame) { ctx.write(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content())); - } else if (frame instanceof TextWebSocketFrame) { - ctx.write(frame); - } else if (frame instanceof BinaryWebSocketFrame) { - ctx.write(frame); - } else if (frame instanceof ContinuationWebSocketFrame) { + } else if (frame instanceof TextWebSocketFrame || + frame instanceof BinaryWebSocketFrame || + frame instanceof ContinuationWebSocketFrame) { ctx.write(frame); } else if (frame instanceof PongWebSocketFrame) { frame.release();