Rewrite HTTP encoder to use gathering writes
This commit is contained in:
parent
78d8f05c21
commit
a403da3042
@ -86,7 +86,7 @@ public final class HttpClientCodec
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void encode(
|
protected void encode(
|
||||||
ChannelHandlerContext ctx, HttpObject msg, ByteBuf out) throws Exception {
|
ChannelHandlerContext ctx, HttpObject msg, MessageList<Object> out) throws Exception {
|
||||||
if (msg instanceof HttpRequest && !done) {
|
if (msg instanceof HttpRequest && !done) {
|
||||||
queue.offer(((HttpRequest) msg).getMethod());
|
queue.offer(((HttpRequest) msg).getMethod());
|
||||||
}
|
}
|
||||||
|
@ -17,7 +17,8 @@ package io.netty.handler.codec.http;
|
|||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.handler.codec.MessageToByteEncoder;
|
import io.netty.channel.MessageList;
|
||||||
|
import io.netty.handler.codec.MessageToMessageEncoder;
|
||||||
import io.netty.util.CharsetUtil;
|
import io.netty.util.CharsetUtil;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -38,10 +39,12 @@ import static io.netty.handler.codec.http.HttpConstants.*;
|
|||||||
* To implement the encoder of such a derived protocol, extend this class and
|
* To implement the encoder of such a derived protocol, extend this class and
|
||||||
* implement all abstract methods properly.
|
* implement all abstract methods properly.
|
||||||
*/
|
*/
|
||||||
public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageToByteEncoder<HttpObject> {
|
public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageToMessageEncoder<HttpObject> {
|
||||||
private static final byte[] CRLF = { CR, LF };
|
private static final byte[] CRLF = { CR, LF };
|
||||||
private static final byte[] ZERO_CRLF = { '0', CR, LF };
|
private static final byte[] ZERO_CRLF = { '0', CR, LF };
|
||||||
private static final byte[] HEADER_SEPARATOR = { COLON , SP };
|
private static final ByteBuf ZERO_CRLF_CRLF_BUF =
|
||||||
|
unreleasableBuffer(directBuffer(5, 5).writeBytes(ZERO_CRLF).writeBytes(CRLF));
|
||||||
|
private static final byte[] HEADER_SEPARATOR = { COLON, SP};
|
||||||
private static final int ST_INIT = 0;
|
private static final int ST_INIT = 0;
|
||||||
private static final int ST_CONTENT_NON_CHUNK = 1;
|
private static final int ST_CONTENT_NON_CHUNK = 1;
|
||||||
private static final int ST_CONTENT_CHUNK = 2;
|
private static final int ST_CONTENT_CHUNK = 2;
|
||||||
@ -50,7 +53,7 @@ public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageTo
|
|||||||
private int state = ST_INIT;
|
private int state = ST_INIT;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void encode(ChannelHandlerContext ctx, HttpObject msg, ByteBuf out) throws Exception {
|
protected void encode(ChannelHandlerContext ctx, HttpObject msg, MessageList<Object> out) throws Exception {
|
||||||
if (msg instanceof HttpMessage) {
|
if (msg instanceof HttpMessage) {
|
||||||
if (state != ST_INIT) {
|
if (state != ST_INIT) {
|
||||||
throw new IllegalStateException("unexpected message type: " + msg.getClass().getSimpleName());
|
throw new IllegalStateException("unexpected message type: " + msg.getClass().getSimpleName());
|
||||||
@ -59,14 +62,14 @@ public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageTo
|
|||||||
@SuppressWarnings({ "unchecked", "CastConflictsWithInstanceof" })
|
@SuppressWarnings({ "unchecked", "CastConflictsWithInstanceof" })
|
||||||
H m = (H) msg;
|
H m = (H) msg;
|
||||||
|
|
||||||
|
ByteBuf buf = ctx.alloc().buffer();
|
||||||
// Encode the message.
|
// Encode the message.
|
||||||
encodeInitialLine(out, m);
|
encodeInitialLine(buf, m);
|
||||||
encodeHeaders(out, m);
|
encodeHeaders(buf, m.headers());
|
||||||
out.writeBytes(CRLF);
|
buf.writeBytes(CRLF);
|
||||||
|
out.add(buf);
|
||||||
state = HttpHeaders.isTransferEncodingChunked(m) ? ST_CONTENT_CHUNK : ST_CONTENT_NON_CHUNK;
|
state = HttpHeaders.isTransferEncodingChunked(m) ? ST_CONTENT_CHUNK : ST_CONTENT_NON_CHUNK;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (msg instanceof HttpContent) {
|
if (msg instanceof HttpContent) {
|
||||||
if (state == ST_INIT) {
|
if (state == ST_INIT) {
|
||||||
throw new IllegalStateException("unexpected message type: " + msg.getClass().getSimpleName());
|
throw new IllegalStateException("unexpected message type: " + msg.getClass().getSimpleName());
|
||||||
@ -78,7 +81,7 @@ public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageTo
|
|||||||
|
|
||||||
if (state == ST_CONTENT_NON_CHUNK) {
|
if (state == ST_CONTENT_NON_CHUNK) {
|
||||||
if (contentLength > 0) {
|
if (contentLength > 0) {
|
||||||
out.writeBytes(content, content.readerIndex(), content.readableBytes());
|
out.add(content.retain());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (chunk instanceof LastHttpContent) {
|
if (chunk instanceof LastHttpContent) {
|
||||||
@ -86,16 +89,26 @@ public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageTo
|
|||||||
}
|
}
|
||||||
} else if (state == ST_CONTENT_CHUNK) {
|
} else if (state == ST_CONTENT_CHUNK) {
|
||||||
if (contentLength > 0) {
|
if (contentLength > 0) {
|
||||||
out.writeBytes(copiedBuffer(Integer.toHexString(contentLength), CharsetUtil.US_ASCII));
|
byte[] length = Integer.toHexString(contentLength).getBytes(CharsetUtil.US_ASCII);
|
||||||
out.writeBytes(CRLF);
|
ByteBuf buf = ctx.alloc().buffer(length.length + 2);
|
||||||
out.writeBytes(content, content.readerIndex(), contentLength);
|
buf.writeBytes(length);
|
||||||
out.writeBytes(CRLF);
|
buf.writeBytes(CRLF);
|
||||||
|
out.add(buf);
|
||||||
|
out.add(content.retain());
|
||||||
|
out.add(wrappedBuffer(CRLF));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (chunk instanceof LastHttpContent) {
|
if (chunk instanceof LastHttpContent) {
|
||||||
out.writeBytes(ZERO_CRLF);
|
HttpHeaders headers = ((LastHttpContent) chunk).trailingHeaders();
|
||||||
encodeTrailingHeaders(out, (LastHttpContent) chunk);
|
if (headers.isEmpty()) {
|
||||||
out.writeBytes(CRLF);
|
out.add(ZERO_CRLF_CRLF_BUF.duplicate());
|
||||||
|
} else {
|
||||||
|
ByteBuf buf = ctx.alloc().buffer();
|
||||||
|
buf.writeBytes(ZERO_CRLF);
|
||||||
|
encodeHeaders(buf, headers);
|
||||||
|
buf.writeBytes(CRLF);
|
||||||
|
out.add(buf);
|
||||||
|
}
|
||||||
|
|
||||||
state = ST_INIT;
|
state = ST_INIT;
|
||||||
}
|
}
|
||||||
@ -105,14 +118,8 @@ public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageTo
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void encodeHeaders(ByteBuf buf, HttpMessage message) {
|
private static void encodeHeaders(ByteBuf buf, HttpHeaders headers) {
|
||||||
for (Map.Entry<String, String> h: message.headers()) {
|
for (Map.Entry<String, String> h: headers) {
|
||||||
encodeHeader(buf, h.getKey(), h.getValue());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void encodeTrailingHeaders(ByteBuf buf, LastHttpContent trailer) {
|
|
||||||
for (Map.Entry<String, String> h: trailer.trailingHeaders()) {
|
|
||||||
encodeHeader(buf, h.getKey(), h.getValue());
|
encodeHeader(buf, h.getKey(), h.getValue());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -15,7 +15,6 @@
|
|||||||
*/
|
*/
|
||||||
package io.netty.handler.codec.http.websocketx;
|
package io.netty.handler.codec.http.websocketx;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.embedded.EmbeddedChannel;
|
import io.netty.channel.embedded.EmbeddedChannel;
|
||||||
import io.netty.handler.codec.http.DefaultFullHttpRequest;
|
import io.netty.handler.codec.http.DefaultFullHttpRequest;
|
||||||
@ -56,10 +55,9 @@ public class WebSocketServerHandshaker00Test {
|
|||||||
new WebSocketServerHandshaker00(
|
new WebSocketServerHandshaker00(
|
||||||
"ws://example.com/chat", "chat", Integer.MAX_VALUE).handshake(ch, req);
|
"ws://example.com/chat", "chat", Integer.MAX_VALUE).handshake(ch, req);
|
||||||
|
|
||||||
ByteBuf resBuf = (ByteBuf) ch.readOutbound();
|
|
||||||
|
|
||||||
EmbeddedChannel ch2 = new EmbeddedChannel(new HttpResponseDecoder());
|
EmbeddedChannel ch2 = new EmbeddedChannel(new HttpResponseDecoder());
|
||||||
ch2.writeInbound(resBuf);
|
ch2.writeInbound(ch.readOutbound());
|
||||||
|
ch2.writeInbound(ch.readOutbound());
|
||||||
HttpResponse res = (HttpResponse) ch2.readInbound();
|
HttpResponse res = (HttpResponse) ch2.readInbound();
|
||||||
|
|
||||||
Assert.assertEquals("ws://example.com/chat", res.headers().get(Names.SEC_WEBSOCKET_LOCATION));
|
Assert.assertEquals("ws://example.com/chat", res.headers().get(Names.SEC_WEBSOCKET_LOCATION));
|
||||||
|
Loading…
Reference in New Issue
Block a user