Ported the HTTP snoop example to the new API

- Replaced pipeline factories with initializers
- Ported essential parts related with HTTP to the new API
- Replaced ChannelHandlerAdapter.combine() with CombinedChannelHandler
- Fixed a bug where ReplayingDecoder does not notify the next handler
- Fixed a bug where ReplayingDecoder calls wrong callDecode() method
- Added a destination buffer as an argument to AbstractChannel.doRead()
  for easier implementation
- Fixed a bug where NioSocketChannel did not try to increase the inbound
  buffer size (moved the logic to AbstractChannel)
This commit is contained in:
Trustin Lee 2012-05-20 14:19:11 +09:00
parent e846505ceb
commit af37ec4f23
18 changed files with 515 additions and 432 deletions

View File

@ -15,23 +15,19 @@
*/ */
package io.netty.handler.codec.http; package io.netty.handler.codec.http;
import static io.netty.channel.Channels.*;
import static io.netty.handler.codec.http.HttpHeaders.*; import static io.netty.handler.codec.http.HttpHeaders.*;
import java.util.List;
import java.util.Map.Entry;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers; import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.Channels; import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.TooLongFrameException;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import java.util.List;
import java.util.Map.Entry;
/** /**
* A {@link ChannelHandler} that aggregates an {@link HttpMessage} * A {@link ChannelHandler} that aggregates an {@link HttpMessage}
* and its following {@link HttpChunk}s into a single {@link HttpMessage} with * and its following {@link HttpChunk}s into a single {@link HttpMessage} with
@ -50,7 +46,7 @@ import io.netty.util.CharsetUtil;
* @apiviz.landmark * @apiviz.landmark
* @apiviz.has io.netty.handler.codec.http.HttpChunk oneway - - filters out * @apiviz.has io.netty.handler.codec.http.HttpChunk oneway - - filters out
*/ */
public class HttpChunkAggregator extends SimpleChannelUpstreamHandler { public class HttpChunkAggregator extends MessageToMessageDecoder<Object, HttpMessage> {
private static final ChannelBuffer CONTINUE = ChannelBuffers.copiedBuffer( private static final ChannelBuffer CONTINUE = ChannelBuffers.copiedBuffer(
"HTTP/1.1 100 Continue\r\n\r\n", CharsetUtil.US_ASCII); "HTTP/1.1 100 Continue\r\n\r\n", CharsetUtil.US_ASCII);
@ -75,11 +71,9 @@ public class HttpChunkAggregator extends SimpleChannelUpstreamHandler {
this.maxContentLength = maxContentLength; this.maxContentLength = maxContentLength;
} }
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
Object msg = e.getMessage(); @Override
public HttpMessage decode(ChannelInboundHandlerContext<Object> ctx, Object msg) throws Exception {
HttpMessage currentMessage = this.currentMessage; HttpMessage currentMessage = this.currentMessage;
if (msg instanceof HttpMessage) { if (msg instanceof HttpMessage) {
@ -91,7 +85,7 @@ public class HttpChunkAggregator extends SimpleChannelUpstreamHandler {
// No need to notify the upstream handlers - just log. // No need to notify the upstream handlers - just log.
// If decoding a response, just throw an exception. // If decoding a response, just throw an exception.
if (is100ContinueExpected(m)) { if (is100ContinueExpected(m)) {
write(ctx, succeededFuture(ctx.channel()), CONTINUE.duplicate()); ctx.write(CONTINUE.duplicate());
} }
if (m.isChunked()) { if (m.isChunked()) {
@ -103,12 +97,13 @@ public class HttpChunkAggregator extends SimpleChannelUpstreamHandler {
m.removeHeader(HttpHeaders.Names.TRANSFER_ENCODING); m.removeHeader(HttpHeaders.Names.TRANSFER_ENCODING);
} }
m.setChunked(false); m.setChunked(false);
m.setContent(ChannelBuffers.dynamicBuffer(e.channel().getConfig().getBufferFactory())); m.setContent(ChannelBuffers.dynamicBuffer());
this.currentMessage = m; this.currentMessage = m;
return null;
} else { } else {
// Not a chunked message - pass through. // Not a chunked message - pass through.
this.currentMessage = null; this.currentMessage = null;
ctx.sendUpstream(e); return m;
} }
} else if (msg instanceof HttpChunk) { } else if (msg instanceof HttpChunk) {
// Sanity check // Sanity check
@ -149,12 +144,15 @@ public class HttpChunkAggregator extends SimpleChannelUpstreamHandler {
HttpHeaders.Names.CONTENT_LENGTH, HttpHeaders.Names.CONTENT_LENGTH,
String.valueOf(content.readableBytes())); String.valueOf(content.readableBytes()));
// All done - generate the event. // All done
Channels.fireMessageReceived(ctx, currentMessage, e.getRemoteAddress()); return currentMessage;
} else {
return null;
} }
} else { } else {
// Neither HttpMessage or HttpChunk throw new IllegalStateException(
ctx.sendUpstream(e); "Only " + HttpMessage.class.getSimpleName() + " and " +
HttpChunk.class.getSimpleName() + " are accepted: " + msg.getClass().getName());
} }
} }
} }

View File

@ -15,16 +15,14 @@
*/ */
package io.netty.handler.codec.http; package io.netty.handler.codec.http;
import java.util.Queue;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel; import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelDownstreamHandler; import io.netty.channel.ChannelOutboundHandlerContext;
import io.netty.channel.ChannelEvent; import io.netty.channel.CombinedChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelUpstreamHandler;
import io.netty.util.internal.QueueFactory; import io.netty.util.internal.QueueFactory;
import java.util.Queue;
/** /**
* A combination of {@link HttpRequestEncoder} and {@link HttpResponseDecoder} * A combination of {@link HttpRequestEncoder} and {@link HttpResponseDecoder}
* which enables easier client side HTTP implementation. {@link HttpClientCodec} * which enables easier client side HTTP implementation. {@link HttpClientCodec}
@ -38,8 +36,7 @@ import io.netty.util.internal.QueueFactory;
* @apiviz.has io.netty.handler.codec.http.HttpResponseDecoder * @apiviz.has io.netty.handler.codec.http.HttpResponseDecoder
* @apiviz.has io.netty.handler.codec.http.HttpRequestEncoder * @apiviz.has io.netty.handler.codec.http.HttpRequestEncoder
*/ */
public class HttpClientCodec implements ChannelUpstreamHandler, public class HttpClientCodec extends CombinedChannelHandler {
ChannelDownstreamHandler {
/** A queue that is used for correlating a request and a response. */ /** A queue that is used for correlating a request and a response. */
final Queue<HttpMethod> queue = QueueFactory.createQueue(HttpMethod.class); final Queue<HttpMethod> queue = QueueFactory.createQueue(HttpMethod.class);
@ -47,9 +44,6 @@ public class HttpClientCodec implements ChannelUpstreamHandler,
/** If true, decoding stops (i.e. pass-through) */ /** If true, decoding stops (i.e. pass-through) */
volatile boolean done; volatile boolean done;
private final HttpRequestEncoder encoder = new Encoder();
private final HttpResponseDecoder decoder;
/** /**
* Creates a new instance with the default decoder options * Creates a new instance with the default decoder options
* ({@code maxInitialLineLength (4096}}, {@code maxHeaderSize (8192)}, and * ({@code maxInitialLineLength (4096}}, {@code maxHeaderSize (8192)}, and
@ -64,33 +58,19 @@ public class HttpClientCodec implements ChannelUpstreamHandler,
*/ */
public HttpClientCodec( public HttpClientCodec(
int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) { int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) {
decoder = new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize); init(
} new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize),
new Encoder());
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
decoder.handleUpstream(ctx, e);
}
@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
encoder.handleDownstream(ctx, e);
} }
private final class Encoder extends HttpRequestEncoder { private final class Encoder extends HttpRequestEncoder {
Encoder() {
}
@Override @Override
protected Object encode(ChannelHandlerContext ctx, Channel channel, public void encode(
Object msg) throws Exception { ChannelOutboundHandlerContext<Object> ctx, Object msg, ChannelBuffer out) throws Exception {
if (msg instanceof HttpRequest && !done) { if (msg instanceof HttpRequest && !done) {
queue.offer(((HttpRequest) msg).getMethod()); queue.offer(((HttpRequest) msg).getMethod());
} }
return super.encode(ctx, channel, msg); super.encode(ctx, msg, out);
} }
} }
@ -101,12 +81,12 @@ public class HttpClientCodec implements ChannelUpstreamHandler,
} }
@Override @Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, public Object decode(
ChannelBuffer buffer, State state) throws Exception { ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer buffer) throws Exception {
if (done) { if (done) {
return buffer.readBytes(actualReadableBytes()); return buffer.readBytes(actualReadableBytes());
} else { } else {
return super.decode(ctx, channel, buffer, state); return super.decode(ctx, buffer);
} }
} }

View File

@ -15,16 +15,15 @@
*/ */
package io.netty.handler.codec.http; package io.netty.handler.codec.http;
import java.util.List;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers; import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel; import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.ReplayingDecoder; import io.netty.handler.codec.ReplayingDecoder;
import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.TooLongFrameException;
import java.util.List;
/** /**
* Decodes {@link ChannelBuffer}s into {@link HttpMessage}s and * Decodes {@link ChannelBuffer}s into {@link HttpMessage}s and
* {@link HttpChunk}s. * {@link HttpChunk}s.
@ -98,7 +97,7 @@ import io.netty.handler.codec.TooLongFrameException;
* implement all abstract methods properly. * implement all abstract methods properly.
* @apiviz.landmark * @apiviz.landmark
*/ */
public abstract class HttpMessageDecoder extends ReplayingDecoder<HttpMessageDecoder.State> { public abstract class HttpMessageDecoder extends ReplayingDecoder<Object, HttpMessageDecoder.State> {
private final int maxInitialLineLength; private final int maxInitialLineLength;
private final int maxHeaderSize; private final int maxHeaderSize;
@ -143,7 +142,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<HttpMessageDec
protected HttpMessageDecoder( protected HttpMessageDecoder(
int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) { int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) {
super(State.SKIP_CONTROL_CHARS, true); super(State.SKIP_CONTROL_CHARS);
if (maxInitialLineLength <= 0) { if (maxInitialLineLength <= 0) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
@ -166,8 +165,8 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<HttpMessageDec
} }
@Override @Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, State state) throws Exception { public Object decode(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer buffer) throws Exception {
switch (state) { switch (state()) {
case SKIP_CONTROL_CHARS: { case SKIP_CONTROL_CHARS: {
try { try {
skipControlCharacters(buffer); skipControlCharacters(buffer);
@ -237,7 +236,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<HttpMessageDec
} }
case READ_VARIABLE_LENGTH_CONTENT: { case READ_VARIABLE_LENGTH_CONTENT: {
if (content == null) { if (content == null) {
content = ChannelBuffers.dynamicBuffer(channel.getConfig().getBufferFactory()); content = ChannelBuffers.dynamicBuffer();
} }
//this will cause a replay error until the channel is closed where this will read what's left in the buffer //this will cause a replay error until the channel is closed where this will read what's left in the buffer
content.writeBytes(buffer.readBytes(buffer.readableBytes())); content.writeBytes(buffer.readBytes(buffer.readableBytes()));
@ -368,10 +367,10 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<HttpMessageDec
if (msg instanceof HttpResponse) { if (msg instanceof HttpResponse) {
HttpResponse res = (HttpResponse) msg; HttpResponse res = (HttpResponse) msg;
int code = res.getStatus().getCode(); int code = res.getStatus().getCode();
// Correctly handle return codes of 1xx. // Correctly handle return codes of 1xx.
// //
// See: // See:
// - http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html Section 4.4 // - http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html Section 4.4
// - https://github.com/netty/netty/issues/222 // - https://github.com/netty/netty/issues/222
if (code >= 100 && code < 200) { if (code >= 100 && code < 200) {
@ -404,7 +403,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<HttpMessageDec
return message; return message;
} }
private void skipControlCharacters(ChannelBuffer buffer) { private static void skipControlCharacters(ChannelBuffer buffer) {
for (;;) { for (;;) {
char c = (char) buffer.readUnsignedByte(); char c = (char) buffer.readUnsignedByte();
if (!Character.isISOControl(c) && if (!Character.isISOControl(c) &&
@ -556,7 +555,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<HttpMessageDec
protected abstract boolean isDecodingRequest(); protected abstract boolean isDecodingRequest();
protected abstract HttpMessage createMessage(String[] initialLine) throws Exception; protected abstract HttpMessage createMessage(String[] initialLine) throws Exception;
private int getChunkSize(String hex) { private static int getChunkSize(String hex) {
hex = hex.trim(); hex = hex.trim();
for (int i = 0; i < hex.length(); i ++) { for (int i = 0; i < hex.length(); i ++) {
char c = hex.charAt(i); char c = hex.charAt(i);
@ -569,7 +568,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<HttpMessageDec
return Integer.parseInt(hex, 16); return Integer.parseInt(hex, 16);
} }
private String readLine(ChannelBuffer buffer, int maxLineLength) throws TooLongFrameException { private static String readLine(ChannelBuffer buffer, int maxLineLength) throws TooLongFrameException {
StringBuilder sb = new StringBuilder(64); StringBuilder sb = new StringBuilder(64);
int lineLength = 0; int lineLength = 0;
while (true) { while (true) {
@ -597,7 +596,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<HttpMessageDec
} }
} }
private String[] splitInitialLine(String sb) { private static String[] splitInitialLine(String sb) {
int aStart; int aStart;
int aEnd; int aEnd;
int bStart; int bStart;
@ -620,7 +619,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<HttpMessageDec
cStart < cEnd? sb.substring(cStart, cEnd) : "" }; cStart < cEnd? sb.substring(cStart, cEnd) : "" };
} }
private String[] splitHeader(String sb) { private static String[] splitHeader(String sb) {
final int length = sb.length(); final int length = sb.length();
int nameStart; int nameStart;
int nameEnd; int nameEnd;
@ -658,7 +657,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<HttpMessageDec
}; };
} }
private int findNonWhitespace(String sb, int offset) { private static int findNonWhitespace(String sb, int offset) {
int result; int result;
for (result = offset; result < sb.length(); result ++) { for (result = offset; result < sb.length(); result ++) {
if (!Character.isWhitespace(sb.charAt(result))) { if (!Character.isWhitespace(sb.charAt(result))) {
@ -668,7 +667,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<HttpMessageDec
return result; return result;
} }
private int findWhitespace(String sb, int offset) { private static int findWhitespace(String sb, int offset) {
int result; int result;
for (result = offset; result < sb.length(); result ++) { for (result = offset; result < sb.length(); result ++) {
if (Character.isWhitespace(sb.charAt(result))) { if (Character.isWhitespace(sb.charAt(result))) {
@ -678,7 +677,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<HttpMessageDec
return result; return result;
} }
private int findEndOfString(String sb) { private static int findEndOfString(String sb) {
int result; int result;
for (result = sb.length(); result > 0; result --) { for (result = sb.length(); result > 0; result --) {
if (!Character.isWhitespace(sb.charAt(result - 1))) { if (!Character.isWhitespace(sb.charAt(result - 1))) {

View File

@ -17,19 +17,17 @@ package io.netty.handler.codec.http;
import static io.netty.buffer.ChannelBuffers.*; import static io.netty.buffer.ChannelBuffers.*;
import static io.netty.handler.codec.http.HttpCodecUtil.*; import static io.netty.handler.codec.http.HttpCodecUtil.*;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelOutboundHandlerContext;
import io.netty.handler.codec.MessageToStreamEncoder;
import io.netty.handler.codec.UnsupportedMessageTypeException;
import io.netty.handler.codec.http.HttpHeaders.Names;
import io.netty.handler.codec.http.HttpHeaders.Values;
import io.netty.util.CharsetUtil;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.util.Map; import java.util.Map;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpHeaders.Names;
import io.netty.handler.codec.http.HttpHeaders.Values;
import io.netty.handler.codec.oneone.OneToOneEncoder;
import io.netty.util.CharsetUtil;
/** /**
* Encodes an {@link HttpMessage} or an {@link HttpChunk} into * Encodes an {@link HttpMessage} or an {@link HttpChunk} into
* a {@link ChannelBuffer}. * a {@link ChannelBuffer}.
@ -44,7 +42,7 @@ import io.netty.util.CharsetUtil;
* implement all abstract methods properly. * implement all abstract methods properly.
* @apiviz.landmark * @apiviz.landmark
*/ */
public abstract class HttpMessageEncoder extends OneToOneEncoder { public abstract class HttpMessageEncoder extends MessageToStreamEncoder<Object> {
private static final ChannelBuffer LAST_CHUNK = private static final ChannelBuffer LAST_CHUNK =
copiedBuffer("0\r\n\r\n", CharsetUtil.US_ASCII); copiedBuffer("0\r\n\r\n", CharsetUtil.US_ASCII);
@ -58,7 +56,7 @@ public abstract class HttpMessageEncoder extends OneToOneEncoder {
} }
@Override @Override
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { public void encode(ChannelOutboundHandlerContext<Object> ctx, Object msg, ChannelBuffer out) throws Exception {
if (msg instanceof HttpMessage) { if (msg instanceof HttpMessage) {
HttpMessage m = (HttpMessage) msg; HttpMessage m = (HttpMessage) msg;
boolean chunked; boolean chunked;
@ -72,70 +70,59 @@ public abstract class HttpMessageEncoder extends OneToOneEncoder {
} else { } else {
chunked = this.chunked = HttpCodecUtil.isTransferEncodingChunked(m); chunked = this.chunked = HttpCodecUtil.isTransferEncodingChunked(m);
} }
ChannelBuffer header = ChannelBuffers.dynamicBuffer( out.markWriterIndex();
channel.getConfig().getBufferFactory()); encodeInitialLine(out, m);
encodeInitialLine(header, m); encodeHeaders(out, m);
encodeHeaders(header, m); out.writeByte(CR);
header.writeByte(CR); out.writeByte(LF);
header.writeByte(LF);
ChannelBuffer content = m.getContent(); ChannelBuffer content = m.getContent();
if (!content.readable()) { if (content.readable()) {
return header; // no content if (chunked) {
} else if (chunked) { out.resetWriterIndex();
throw new IllegalArgumentException( throw new IllegalArgumentException(
"HttpMessage.content must be empty " + "HttpMessage.content must be empty " +
"if Transfer-Encoding is chunked."); "if Transfer-Encoding is chunked.");
} else { } else {
return wrappedBuffer(header, content); out.writeBytes(content, content.readerIndex(), content.readableBytes());
}
} }
} } else if (msg instanceof HttpChunk) {
if (msg instanceof HttpChunk) {
HttpChunk chunk = (HttpChunk) msg; HttpChunk chunk = (HttpChunk) msg;
if (chunked) { if (chunked) {
if (chunk.isLast()) { if (chunk.isLast()) {
chunked = false; chunked = false;
if (chunk instanceof HttpChunkTrailer) { if (chunk instanceof HttpChunkTrailer) {
ChannelBuffer trailer = ChannelBuffers.dynamicBuffer( out.writeByte((byte) '0');
channel.getConfig().getBufferFactory()); out.writeByte(CR);
trailer.writeByte((byte) '0'); out.writeByte(LF);
trailer.writeByte(CR); encodeTrailingHeaders(out, (HttpChunkTrailer) chunk);
trailer.writeByte(LF); out.writeByte(CR);
encodeTrailingHeaders(trailer, (HttpChunkTrailer) chunk); out.writeByte(LF);
trailer.writeByte(CR);
trailer.writeByte(LF);
return trailer;
} else { } else {
return LAST_CHUNK.duplicate(); out.writeBytes(LAST_CHUNK, LAST_CHUNK.readerIndex(), LAST_CHUNK.readableBytes());
} }
} else { } else {
ChannelBuffer content = chunk.getContent(); ChannelBuffer content = chunk.getContent();
int contentLength = content.readableBytes(); int contentLength = content.readableBytes();
out.writeBytes(copiedBuffer(Integer.toHexString(contentLength), CharsetUtil.US_ASCII));
return wrappedBuffer( out.writeBytes(CRLF);
copiedBuffer( out.writeBytes(content, content.readerIndex(), contentLength);
Integer.toHexString(contentLength), out.writeBytes(CRLF);
CharsetUtil.US_ASCII),
wrappedBuffer(CRLF),
content.slice(content.readerIndex(), contentLength),
wrappedBuffer(CRLF));
} }
} else { } else {
if (chunk.isLast()) { if (!chunk.isLast()) {
return null; ChannelBuffer chunkContent = chunk.getContent();
} else { out.writeBytes(chunkContent, chunkContent.readerIndex(), chunkContent.readableBytes());
return chunk.getContent();
} }
} }
} else {
throw new UnsupportedMessageTypeException(msg, HttpMessage.class, HttpChunk.class);
} }
// Unknown message type.
return msg;
} }
private void encodeHeaders(ChannelBuffer buf, HttpMessage message) { private static void encodeHeaders(ChannelBuffer buf, HttpMessage message) {
try { try {
for (Map.Entry<String, String> h: message.getHeaders()) { for (Map.Entry<String, String> h: message.getHeaders()) {
encodeHeader(buf, h.getKey(), h.getValue()); encodeHeader(buf, h.getKey(), h.getValue());
@ -145,7 +132,7 @@ public abstract class HttpMessageEncoder extends OneToOneEncoder {
} }
} }
private void encodeTrailingHeaders(ChannelBuffer buf, HttpChunkTrailer trailer) { private static void encodeTrailingHeaders(ChannelBuffer buf, HttpChunkTrailer trailer) {
try { try {
for (Map.Entry<String, String> h: trailer.getHeaders()) { for (Map.Entry<String, String> h: trailer.getHeaders()) {
encodeHeader(buf, h.getKey(), h.getValue()); encodeHeader(buf, h.getKey(), h.getValue());
@ -155,7 +142,7 @@ public abstract class HttpMessageEncoder extends OneToOneEncoder {
} }
} }
private void encodeHeader(ChannelBuffer buf, String header, String value) private static void encodeHeader(ChannelBuffer buf, String header, String value)
throws UnsupportedEncodingException { throws UnsupportedEncodingException {
buf.writeBytes(header.getBytes("ASCII")); buf.writeBytes(header.getBytes("ASCII"));
buf.writeByte(COLON); buf.writeByte(COLON);

View File

@ -15,10 +15,7 @@
*/ */
package io.netty.handler.codec.http; package io.netty.handler.codec.http;
import io.netty.channel.ChannelDownstreamHandler; import io.netty.channel.CombinedChannelHandler;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelUpstreamHandler;
/** /**
* A combination of {@link HttpRequestDecoder} and {@link HttpResponseEncoder} * A combination of {@link HttpRequestDecoder} and {@link HttpResponseEncoder}
@ -28,11 +25,7 @@ import io.netty.channel.ChannelUpstreamHandler;
* @apiviz.has io.netty.handler.codec.http.HttpRequestDecoder * @apiviz.has io.netty.handler.codec.http.HttpRequestDecoder
* @apiviz.has io.netty.handler.codec.http.HttpResponseEncoder * @apiviz.has io.netty.handler.codec.http.HttpResponseEncoder
*/ */
public class HttpServerCodec implements ChannelUpstreamHandler, public class HttpServerCodec extends CombinedChannelHandler {
ChannelDownstreamHandler {
private final HttpRequestDecoder decoder;
private final HttpResponseEncoder encoder = new HttpResponseEncoder();
/** /**
* Creates a new instance with the default decoder options * Creates a new instance with the default decoder options
@ -48,18 +41,8 @@ public class HttpServerCodec implements ChannelUpstreamHandler,
*/ */
public HttpServerCodec( public HttpServerCodec(
int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) { int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) {
decoder = new HttpRequestDecoder(maxInitialLineLength, maxHeaderSize, maxChunkSize); super(
} new HttpRequestDecoder(maxInitialLineLength, maxHeaderSize, maxChunkSize),
new HttpResponseEncoder());
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
decoder.handleUpstream(ctx, e);
}
@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
encoder.handleDownstream(ctx, e);
} }
} }

View File

@ -339,6 +339,25 @@ public abstract class ReplayingDecoder<O, S extends Enum<S>> extends StreamToMes
return oldState; return oldState;
} }
/**
* Returns the actual number of readable bytes in the internal cumulative
* buffer of this decoder. You usually do not need to rely on this value
* to write a decoder. Use it only when you muse use it at your own risk.
* This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
*/
protected int actualReadableBytes() {
return internalBuffer().readableBytes();
}
/**
* Returns the internal cumulative buffer of this decoder. You usually
* do not need to access the internal buffer directly to write a decoder.
* Use it only when you must use it at your own risk.
*/
protected ChannelBuffer internalBuffer() {
return cumulation;
}
@Override @Override
public ChannelBufferHolder<Byte> newInboundBuffer( public ChannelBufferHolder<Byte> newInboundBuffer(
ChannelInboundHandlerContext<Byte> ctx) throws Exception { ChannelInboundHandlerContext<Byte> ctx) throws Exception {
@ -360,8 +379,7 @@ public abstract class ReplayingDecoder<O, S extends Enum<S>> extends StreamToMes
try { try {
if (unfoldAndAdd(ctx, ctx.nextIn(), decodeLast(ctx, replayable))) { if (unfoldAndAdd(ctx, ctx.nextIn(), decodeLast(ctx, replayable))) {
in.discardReadBytes(); fireInboundBufferUpdated(ctx, in);
ctx.fireInboundBufferUpdated();
} }
} catch (Signal replay) { } catch (Signal replay) {
// Ignore // Ignore
@ -377,8 +395,10 @@ public abstract class ReplayingDecoder<O, S extends Enum<S>> extends StreamToMes
ctx.fireChannelInactive(); ctx.fireChannelInactive();
} }
private void callDecode(ChannelInboundHandlerContext<Byte> ctx) { @Override
protected void callDecode(ChannelInboundHandlerContext<Byte> ctx) {
ChannelBuffer in = cumulation; ChannelBuffer in = cumulation;
boolean decoded = false;
while (in.readable()) { while (in.readable()) {
try { try {
int oldReaderIndex = checkpoint = in.readerIndex(); int oldReaderIndex = checkpoint = in.readerIndex();
@ -422,8 +442,15 @@ public abstract class ReplayingDecoder<O, S extends Enum<S>> extends StreamToMes
} }
// A successful decode // A successful decode
MessageToMessageEncoder.unfoldAndAdd(ctx, ctx.nextIn(), result); if (unfoldAndAdd(ctx, ctx.nextIn(), result)) {
decoded = true;
}
} catch (Throwable t) { } catch (Throwable t) {
if (decoded) {
decoded = false;
fireInboundBufferUpdated(ctx, in);
}
if (t instanceof CodecException) { if (t instanceof CodecException) {
ctx.fireExceptionCaught(t); ctx.fireExceptionCaught(t);
} else { } else {
@ -431,5 +458,15 @@ public abstract class ReplayingDecoder<O, S extends Enum<S>> extends StreamToMes
} }
} }
} }
if (decoded) {
fireInboundBufferUpdated(ctx, in);
}
}
private void fireInboundBufferUpdated(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer in) {
checkpoint -= in.readerIndex();
in.discardReadBytes();
ctx.fireInboundBufferUpdated();
} }
} }

View File

@ -48,7 +48,7 @@ public abstract class StreamToMessageDecoder<O> extends ChannelInboundHandlerAda
ctx.fireChannelInactive(); ctx.fireChannelInactive();
} }
private void callDecode(ChannelInboundHandlerContext<Byte> ctx) { protected void callDecode(ChannelInboundHandlerContext<Byte> ctx) {
ChannelBuffer in = ctx.in().byteBuffer(); ChannelBuffer in = ctx.in().byteBuffer();
boolean decoded = false; boolean decoded = false;
@ -75,6 +75,12 @@ public abstract class StreamToMessageDecoder<O> extends ChannelInboundHandlerAda
break; break;
} }
} catch (Throwable t) { } catch (Throwable t) {
if (decoded) {
decoded = false;
in.discardReadBytes();
ctx.fireInboundBufferUpdated();
}
if (t instanceof CodecException) { if (t instanceof CodecException) {
ctx.fireExceptionCaught(t); ctx.fireExceptionCaught(t);
} else { } else {

View File

@ -15,14 +15,10 @@
*/ */
package io.netty.example.http.snoop; package io.netty.example.http.snoop;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.concurrent.Executors;
import io.netty.bootstrap.ClientBootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelBootstrap;
import io.netty.channel.socket.nio.NioClientSocketChannelFactory; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.socket.nio.SelectorEventLoop;
import io.netty.handler.codec.http.CookieEncoder; import io.netty.handler.codec.http.CookieEncoder;
import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpHeaders;
@ -30,6 +26,9 @@ import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.HttpVersion;
import java.net.InetSocketAddress;
import java.net.URI;
/** /**
* A simple HTTP client that prints out the content of the HTTP response to * A simple HTTP client that prints out the content of the HTTP response to
* {@link System#out} to test {@link HttpSnoopServer}. * {@link System#out} to test {@link HttpSnoopServer}.
@ -42,7 +41,7 @@ public class HttpSnoopClient {
this.uri = uri; this.uri = uri;
} }
public void run() { public void run() throws Exception {
String scheme = uri.getScheme() == null? "http" : uri.getScheme(); String scheme = uri.getScheme() == null? "http" : uri.getScheme();
String host = uri.getHost() == null? "localhost" : uri.getHost(); String host = uri.getHost() == null? "localhost" : uri.getHost();
int port = uri.getPort(); int port = uri.getPort();
@ -62,45 +61,38 @@ public class HttpSnoopClient {
boolean ssl = scheme.equalsIgnoreCase("https"); boolean ssl = scheme.equalsIgnoreCase("https");
// Configure the client. // Configure the client.
ClientBootstrap bootstrap = new ClientBootstrap( ChannelBootstrap b = new ChannelBootstrap();
new NioClientSocketChannelFactory( try {
Executors.newCachedThreadPool())); b.eventLoop(new SelectorEventLoop())
.channel(new NioSocketChannel())
.initializer(new HttpSnoopClientInitializer(ssl))
.remoteAddress(new InetSocketAddress(host, port));
// Set up the event pipeline factory. // Make the connection attempt.
bootstrap.setPipelineFactory(new HttpSnoopClientPipelineFactory(ssl)); Channel ch = b.connect().sync().channel();
// Start the connection attempt. // Prepare the HTTP request.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); HttpRequest request = new DefaultHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath());
request.setHeader(HttpHeaders.Names.HOST, host);
request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
// Wait until the connection attempt succeeds or fails. // Set some example cookies.
Channel channel = future.awaitUninterruptibly().channel(); CookieEncoder httpCookieEncoder = new CookieEncoder(false);
if (!future.isSuccess()) { httpCookieEncoder.addCookie("my-cookie", "foo");
future.cause().printStackTrace(); httpCookieEncoder.addCookie("another-cookie", "bar");
bootstrap.releaseExternalResources(); request.setHeader(HttpHeaders.Names.COOKIE, httpCookieEncoder.encode());
return;
// Send the HTTP request.
ch.write(request);
// Wait for the server to close the connection.
ch.closeFuture().sync();
} finally {
// Shut down executor threads to exit.
b.shutdown();
} }
// Prepare the HTTP request.
HttpRequest request = new DefaultHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath());
request.setHeader(HttpHeaders.Names.HOST, host);
request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
// Set some example cookies.
CookieEncoder httpCookieEncoder = new CookieEncoder(false);
httpCookieEncoder.addCookie("my-cookie", "foo");
httpCookieEncoder.addCookie("another-cookie", "bar");
request.setHeader(HttpHeaders.Names.COOKIE, httpCookieEncoder.encode());
// Send the HTTP request.
channel.write(request);
// Wait for the server to close the connection.
channel.getCloseFuture().awaitUninterruptibly();
// Shut down executor threads to exit.
bootstrap.releaseExternalResources();
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {

View File

@ -16,21 +16,44 @@
package io.netty.example.http.snoop; package io.netty.example.http.snoop;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.MessageEvent; import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.SimpleChannelUpstreamHandler; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.handler.codec.http.HttpChunk; import io.netty.handler.codec.http.HttpChunk;
import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponse;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
public class HttpSnoopClientHandler extends SimpleChannelUpstreamHandler { import java.util.Queue;
public class HttpSnoopClientHandler extends ChannelInboundHandlerAdapter<Object> {
private boolean readingChunks; private boolean readingChunks;
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { public ChannelBufferHolder<Object> newInboundBuffer(
ChannelInboundHandlerContext<Object> ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
}
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<Object> ctx)
throws Exception {
Queue<Object> in = ctx.in().messageBuffer();
while (handleMessage(in.poll())) {
continue;
}
}
private boolean handleMessage(Object msg) throws Exception {
if (msg == null) {
return false;
}
if (!readingChunks) { if (!readingChunks) {
HttpResponse response = (HttpResponse) e.getMessage(); HttpResponse response = (HttpResponse) msg;
System.out.println("STATUS: " + response.getStatus()); System.out.println("STATUS: " + response.getStatus());
System.out.println("VERSION: " + response.getProtocolVersion()); System.out.println("VERSION: " + response.getProtocolVersion());
@ -57,7 +80,7 @@ public class HttpSnoopClientHandler extends SimpleChannelUpstreamHandler {
} }
} }
} else { } else {
HttpChunk chunk = (HttpChunk) e.getMessage(); HttpChunk chunk = (HttpChunk) msg;
if (chunk.isLast()) { if (chunk.isLast()) {
readingChunks = false; readingChunks = false;
System.out.println("} END OF CHUNKED CONTENT"); System.out.println("} END OF CHUNKED CONTENT");
@ -66,5 +89,14 @@ public class HttpSnoopClientHandler extends SimpleChannelUpstreamHandler {
System.out.flush(); System.out.flush();
} }
} }
return true;
}
@Override
public void exceptionCaught(
ChannelInboundHandlerContext<Object> ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
} }
} }

View File

@ -15,51 +15,49 @@
*/ */
package io.netty.example.http.snoop; package io.netty.example.http.snoop;
import static io.netty.channel.Channels.*; import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.example.securechat.SecureChatSslContextFactory;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngine;
import io.netty.channel.ChannelPipeline; public class HttpSnoopClientInitializer extends ChannelInitializer {
import io.netty.channel.ChannelPipelineFactory;
import io.netty.example.securechat.SecureChatSslContextFactory;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.logging.InternalLogLevel;
public class HttpSnoopClientPipelineFactory implements ChannelPipelineFactory {
private final boolean ssl; private final boolean ssl;
public HttpSnoopClientPipelineFactory(boolean ssl) { public HttpSnoopClientInitializer(boolean ssl) {
this.ssl = ssl; this.ssl = ssl;
} }
@Override @Override
public ChannelPipeline getPipeline() throws Exception { public void initChannel(Channel ch) throws Exception {
// Create a default pipeline implementation. // Create a default pipeline implementation.
ChannelPipeline pipeline = pipeline(); ChannelPipeline p = ch.pipeline();
pipeline.addLast("log", new LoggingHandler(InternalLogLevel.INFO)); p.addLast("log", new LoggingHandler(LogLevel.INFO));
// Enable HTTPS if necessary. // Enable HTTPS if necessary.
if (ssl) { if (ssl) {
SSLEngine engine = SSLEngine engine =
SecureChatSslContextFactory.getClientContext().createSSLEngine(); SecureChatSslContextFactory.getClientContext().createSSLEngine();
engine.setUseClientMode(true); engine.setUseClientMode(true);
pipeline.addLast("ssl", new SslHandler(engine)); // FIXME: Port SslHandler to the new API
//p.addLast("ssl", new SslHandler(engine));
} }
pipeline.addLast("codec", new HttpClientCodec()); p.addLast("codec", new HttpClientCodec());
// Remove the following line if you don't want automatic content decompression. // Remove the following line if you don't want automatic content decompression.
pipeline.addLast("inflater", new HttpContentDecompressor()); // FIXME: Port HttpContentDecompressor to the new API
//p.addLast("inflater", new HttpContentDecompressor());
// Uncomment the following line if you don't want to handle HttpChunks. // Uncomment the following line if you don't want to handle HttpChunks.
//pipeline.addLast("aggregator", new HttpChunkAggregator(1048576)); //pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
pipeline.addLast("handler", new HttpSnoopClientHandler()); p.addLast("handler", new HttpSnoopClientHandler());
return pipeline;
} }
} }

View File

@ -15,11 +15,12 @@
*/ */
package io.netty.example.http.snoop; package io.netty.example.http.snoop;
import java.net.InetSocketAddress; import io.netty.channel.Channel;
import java.util.concurrent.Executors; import io.netty.channel.ServerChannelBootstrap;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.SelectorEventLoop;
import io.netty.bootstrap.ServerBootstrap; import java.net.InetSocketAddress;
import io.netty.channel.socket.nio.NioServerSocketChannelFactory;
/** /**
* An HTTP server that sends back the content of the received HTTP request * An HTTP server that sends back the content of the received HTTP request
@ -33,20 +34,24 @@ public class HttpSnoopServer {
this.port = port; this.port = port;
} }
public void run() { public void run() throws Exception {
// Configure the server. // Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap( ServerChannelBootstrap b = new ServerChannelBootstrap();
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool()));
// Set up the event pipeline factory. try {
bootstrap.setPipelineFactory(new HttpSnoopServerPipelineFactory()); b.eventLoop(new SelectorEventLoop(), new SelectorEventLoop())
.channel(new NioServerSocketChannel())
.childInitializer(new HttpSnoopServerInitializer())
.localAddress(new InetSocketAddress(port));
// Bind and start to accept incoming connections. Channel ch = b.bind().sync().channel();
bootstrap.bind(new InetSocketAddress(port)); ch.closeFuture().sync();
} finally {
b.shutdown();
}
} }
public static void main(String[] args) { public static void main(String[] args) throws Exception {
int port; int port;
if (args.length > 0) { if (args.length > 0) {
port = Integer.parseInt(args[0]); port = Integer.parseInt(args[0]);

View File

@ -19,20 +19,14 @@ import static io.netty.handler.codec.http.HttpHeaders.*;
import static io.netty.handler.codec.http.HttpHeaders.Names.*; import static io.netty.handler.codec.http.HttpHeaders.Names.*;
import static io.netty.handler.codec.http.HttpResponseStatus.*; import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpVersion.*; import static io.netty.handler.codec.http.HttpVersion.*;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers; import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ExceptionEvent; import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.handler.codec.http.Cookie; import io.netty.handler.codec.http.Cookie;
import io.netty.handler.codec.http.CookieDecoder; import io.netty.handler.codec.http.CookieDecoder;
import io.netty.handler.codec.http.CookieEncoder; import io.netty.handler.codec.http.CookieEncoder;
@ -45,7 +39,13 @@ import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.QueryStringDecoder; import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
public class HttpSnoopServerHandler extends SimpleChannelUpstreamHandler { import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
public class HttpSnoopServerHandler extends ChannelInboundHandlerAdapter<Object> {
private HttpRequest request; private HttpRequest request;
private boolean readingChunks; private boolean readingChunks;
@ -53,12 +53,30 @@ public class HttpSnoopServerHandler extends SimpleChannelUpstreamHandler {
private final StringBuilder buf = new StringBuilder(); private final StringBuilder buf = new StringBuilder();
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { public ChannelBufferHolder<Object> newInboundBuffer(
ChannelInboundHandlerContext<Object> ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
}
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<Object> ctx)
throws Exception {
Queue<Object> in = ctx.in().messageBuffer();
while (handleMessage(ctx, in.poll())) {
continue;
}
}
private boolean handleMessage(ChannelInboundHandlerContext<Object> ctx, Object msg) throws Exception {
if (msg == null) {
return false;
}
if (!readingChunks) { if (!readingChunks) {
HttpRequest request = this.request = (HttpRequest) e.getMessage(); HttpRequest request = this.request = (HttpRequest) msg;
if (is100ContinueExpected(request)) { if (is100ContinueExpected(request)) {
send100Continue(e); send100Continue(ctx);
} }
buf.setLength(0); buf.setLength(0);
@ -94,10 +112,10 @@ public class HttpSnoopServerHandler extends SimpleChannelUpstreamHandler {
if (content.readable()) { if (content.readable()) {
buf.append("CONTENT: " + content.toString(CharsetUtil.UTF_8) + "\r\n"); buf.append("CONTENT: " + content.toString(CharsetUtil.UTF_8) + "\r\n");
} }
writeResponse(e); writeResponse(ctx);
} }
} else { } else {
HttpChunk chunk = (HttpChunk) e.getMessage(); HttpChunk chunk = (HttpChunk) msg;
if (chunk.isLast()) { if (chunk.isLast()) {
readingChunks = false; readingChunks = false;
buf.append("END OF CONTENT\r\n"); buf.append("END OF CONTENT\r\n");
@ -113,14 +131,16 @@ public class HttpSnoopServerHandler extends SimpleChannelUpstreamHandler {
buf.append("\r\n"); buf.append("\r\n");
} }
writeResponse(e); writeResponse(ctx);
} else { } else {
buf.append("CHUNK: " + chunk.getContent().toString(CharsetUtil.UTF_8) + "\r\n"); buf.append("CHUNK: " + chunk.getContent().toString(CharsetUtil.UTF_8) + "\r\n");
} }
} }
return true;
} }
private void writeResponse(MessageEvent e) { private void writeResponse(ChannelInboundHandlerContext<Object> ctx) {
// Decide whether to close the connection or not. // Decide whether to close the connection or not.
boolean keepAlive = isKeepAlive(request); boolean keepAlive = isKeepAlive(request);
@ -152,7 +172,7 @@ public class HttpSnoopServerHandler extends SimpleChannelUpstreamHandler {
} }
// Write the response. // Write the response.
ChannelFuture future = e.channel().write(response); ChannelFuture future = ctx.write(response);
// Close the non-keep-alive connection after the write operation is done. // Close the non-keep-alive connection after the write operation is done.
if (!keepAlive) { if (!keepAlive) {
@ -160,15 +180,15 @@ public class HttpSnoopServerHandler extends SimpleChannelUpstreamHandler {
} }
} }
private void send100Continue(MessageEvent e) { private static void send100Continue(ChannelInboundHandlerContext<Object> ctx) {
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, CONTINUE); HttpResponse response = new DefaultHttpResponse(HTTP_1_1, CONTINUE);
e.channel().write(response); ctx.write(response);
} }
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) public void exceptionCaught(
throws Exception { ChannelInboundHandlerContext<Object> ctx, Throwable cause) throws Exception {
e.cause().printStackTrace(); cause.printStackTrace();
e.channel().close(); ctx.close();
} }
} }

View File

@ -15,32 +15,30 @@
*/ */
package io.netty.example.http.snoop; package io.netty.example.http.snoop;
import static io.netty.channel.Channels.*; import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.handler.codec.http.HttpResponseEncoder;
public class HttpSnoopServerPipelineFactory implements ChannelPipelineFactory { public class HttpSnoopServerInitializer extends ChannelInitializer {
@Override @Override
public ChannelPipeline getPipeline() throws Exception { public void initChannel(Channel ch) throws Exception {
// Create a default pipeline implementation. // Create a default pipeline implementation.
ChannelPipeline pipeline = pipeline(); ChannelPipeline p = ch.pipeline();
// Uncomment the following line if you want HTTPS // Uncomment the following line if you want HTTPS
//SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine(); //SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();
//engine.setUseClientMode(false); //engine.setUseClientMode(false);
//pipeline.addLast("ssl", new SslHandler(engine)); //p.addLast("ssl", new SslHandler(engine));
pipeline.addLast("decoder", new HttpRequestDecoder()); p.addLast("decoder", new HttpRequestDecoder());
// Uncomment the following line if you don't want to handle HttpChunks. // Uncomment the following line if you don't want to handle HttpChunks.
//pipeline.addLast("aggregator", new HttpChunkAggregator(1048576)); //pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
pipeline.addLast("encoder", new HttpResponseEncoder()); p.addLast("encoder", new HttpResponseEncoder());
// Remove the following line if you don't want automatic content compression. // Remove the following line if you don't want automatic content compression.
pipeline.addLast("deflater", new HttpContentCompressor()); // FIXME: Port HttpContentCompressor to the new API
pipeline.addLast("handler", new HttpSnoopServerHandler()); //p.addLast("deflater", new HttpContentCompressor());
return pipeline; p.addLast("handler", new HttpSnoopServerHandler());
} }
} }

View File

@ -15,6 +15,7 @@
*/ */
package io.netty.channel; package io.netty.channel;
import io.netty.buffer.ChannelBuffer;
import io.netty.logging.InternalLogger; import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory; import io.netty.logging.InternalLoggerFactory;
import io.netty.util.DefaultAttributeMap; import io.netty.util.DefaultAttributeMap;
@ -636,19 +637,23 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override @Override
public void read() { public void read() {
assert eventLoop().inEventLoop(); assert eventLoop().inEventLoop();
final ChannelBufferHolder<Object> buf = pipeline().nextIn();
final boolean hasByteBuffer = buf.hasByteBuffer();
long readAmount = 0; long readAmount = 0;
boolean closed = false; boolean closed = false;
try { try {
for (;;) { for (;;) {
int localReadAmount = doRead(); int localReadAmount = doRead(buf);
if (localReadAmount > 0) { if (localReadAmount > 0) {
readAmount += localReadAmount; readAmount += localReadAmount;
continue; expandReadBuffer(buf, hasByteBuffer);
} } else if (localReadAmount == 0) {
if (localReadAmount == 0) { if (!expandReadBuffer(buf, hasByteBuffer)) {
break; break;
} }
if (localReadAmount < 0) { } else if (localReadAmount < 0) {
closed = true; closed = true;
break; break;
} }
@ -851,7 +856,22 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
protected abstract void doClose() throws Exception; protected abstract void doClose() throws Exception;
protected abstract void doDeregister() throws Exception; protected abstract void doDeregister() throws Exception;
protected abstract int doRead() throws Exception; protected abstract int doRead(ChannelBufferHolder<Object> buf) throws Exception;
protected abstract int doFlush(boolean lastSpin) throws Exception; protected abstract int doFlush(boolean lastSpin) throws Exception;
protected abstract boolean inEventLoopDrivenFlush(); protected abstract boolean inEventLoopDrivenFlush();
private static boolean expandReadBuffer(ChannelBufferHolder<Object> buf, boolean hasByteBuffer) {
if (!hasByteBuffer) {
return false;
}
ChannelBuffer byteBuf = buf.byteBuffer();
if (!byteBuf.writable()) {
// FIXME: Use a sensible value.
byteBuf.ensureWritableBytes(128);
return true;
}
return false;
}
} }

View File

@ -4,145 +4,6 @@ import java.net.SocketAddress;
public abstract class ChannelHandlerAdapter<I, O> implements ChannelInboundHandler<I>, ChannelOutboundHandler<O> { public abstract class ChannelHandlerAdapter<I, O> implements ChannelInboundHandler<I>, ChannelOutboundHandler<O> {
public static <I, O> ChannelHandlerAdapter<I, O> combine(
ChannelInboundHandler<I> inboundHandler, ChannelOutboundHandler<O> outboundHandler) {
if (inboundHandler == null) {
throw new NullPointerException("inboundHandler");
}
if (outboundHandler == null) {
throw new NullPointerException("outboundHandler");
}
final ChannelInboundHandler<I> in = inboundHandler;
final ChannelOutboundHandler<O> out = outboundHandler;
return new ChannelHandlerAdapter<I, O>() {
@Override
public ChannelBufferHolder<I> newInboundBuffer(
ChannelInboundHandlerContext<I> ctx) throws Exception {
return in.newInboundBuffer(ctx);
}
@Override
public ChannelBufferHolder<O> newOutboundBuffer(
ChannelOutboundHandlerContext<O> ctx) throws Exception {
return out.newOutboundBuffer(ctx);
}
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
try {
in.beforeAdd(ctx);
} finally {
out.beforeAdd(ctx);
}
}
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
try {
in.afterAdd(ctx);
} finally {
out.afterAdd(ctx);
}
}
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
try {
in.beforeRemove(ctx);
} finally {
out.beforeRemove(ctx);
}
}
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
try {
in.afterRemove(ctx);
} finally {
out.afterRemove(ctx);
}
}
@Override
public void channelRegistered(ChannelInboundHandlerContext<I> ctx) throws Exception {
in.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelInboundHandlerContext<I> ctx) throws Exception {
in.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelInboundHandlerContext<I> ctx) throws Exception {
in.channelActive(ctx);
}
@Override
public void channelInactive(ChannelInboundHandlerContext<I> ctx) throws Exception {
in.channelInactive(ctx);
}
@Override
public void exceptionCaught(
ChannelInboundHandlerContext<I> ctx, Throwable cause) throws Exception {
in.exceptionCaught(ctx, cause);
}
@Override
public void userEventTriggered(
ChannelInboundHandlerContext<I> ctx, Object evt) throws Exception {
in.userEventTriggered(ctx, evt);
}
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<I> ctx) throws Exception {
in.inboundBufferUpdated(ctx);
}
@Override
public void bind(
ChannelOutboundHandlerContext<O> ctx,
SocketAddress localAddress, ChannelFuture future) throws Exception {
out.bind(ctx, localAddress, future);
}
@Override
public void connect(
ChannelOutboundHandlerContext<O> ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelFuture future) throws Exception {
out.connect(ctx, remoteAddress, localAddress, future);
}
@Override
public void disconnect(
ChannelOutboundHandlerContext<O> ctx, ChannelFuture future) throws Exception {
out.disconnect(ctx, future);
}
@Override
public void close(
ChannelOutboundHandlerContext<O> ctx, ChannelFuture future) throws Exception {
out.close(ctx, future);
}
@Override
public void deregister(
ChannelOutboundHandlerContext<O> ctx, ChannelFuture future) throws Exception {
out.deregister(ctx, future);
}
@Override
public void flush(
ChannelOutboundHandlerContext<O> ctx, ChannelFuture future) throws Exception {
out.flush(ctx, future);
}
};
}
@Override @Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception { public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default. // Do nothing by default.

View File

@ -0,0 +1,166 @@
package io.netty.channel;
import java.net.SocketAddress;
public class CombinedChannelHandler implements ChannelInboundHandler<Object>,
ChannelOutboundHandler<Object> {
private ChannelOutboundHandler<Object> out;
private ChannelInboundHandler<Object> in;
public CombinedChannelHandler() {
// User will call init in the subclass constructor.
}
public CombinedChannelHandler(
ChannelInboundHandler<?> inboundHandler, ChannelOutboundHandler<?> outboundHandler) {
init(inboundHandler, outboundHandler);
}
@SuppressWarnings("unchecked")
protected void init(ChannelInboundHandler<?> inboundHandler,
ChannelOutboundHandler<?> outboundHandler) {
if (inboundHandler == null) {
throw new NullPointerException("inboundHandler");
}
if (outboundHandler == null) {
throw new NullPointerException("outboundHandler");
}
if (in != null) {
throw new IllegalStateException("init() cannot be called more than once.");
}
in = (ChannelInboundHandler<Object>) inboundHandler;
out = (ChannelOutboundHandler<Object>) outboundHandler;
}
@Override
public ChannelBufferHolder<Object> newInboundBuffer(
ChannelInboundHandlerContext<Object> ctx) throws Exception {
return in.newInboundBuffer(ctx);
}
@Override
public ChannelBufferHolder<Object> newOutboundBuffer(
ChannelOutboundHandlerContext<Object> ctx) throws Exception {
return out.newOutboundBuffer(ctx);
}
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
if (in == null) {
throw new IllegalStateException(
"not initialized yet - call init() in the constructor of the subclass");
}
try {
in.beforeAdd(ctx);
} finally {
out.beforeAdd(ctx);
}
}
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
try {
in.afterAdd(ctx);
} finally {
out.afterAdd(ctx);
}
}
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
try {
in.beforeRemove(ctx);
} finally {
out.beforeRemove(ctx);
}
}
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
try {
in.afterRemove(ctx);
} finally {
out.afterRemove(ctx);
}
}
@Override
public void channelRegistered(ChannelInboundHandlerContext<Object> ctx) throws Exception {
in.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelInboundHandlerContext<Object> ctx) throws Exception {
in.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelInboundHandlerContext<Object> ctx) throws Exception {
in.channelActive(ctx);
}
@Override
public void channelInactive(ChannelInboundHandlerContext<Object> ctx) throws Exception {
in.channelInactive(ctx);
}
@Override
public void exceptionCaught(
ChannelInboundHandlerContext<Object> ctx, Throwable cause) throws Exception {
in.exceptionCaught(ctx, cause);
}
@Override
public void userEventTriggered(
ChannelInboundHandlerContext<Object> ctx, Object evt) throws Exception {
in.userEventTriggered(ctx, evt);
}
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<Object> ctx) throws Exception {
in.inboundBufferUpdated(ctx);
}
@Override
public void bind(
ChannelOutboundHandlerContext<Object> ctx,
SocketAddress localAddress, ChannelFuture future) throws Exception {
out.bind(ctx, localAddress, future);
}
@Override
public void connect(
ChannelOutboundHandlerContext<Object> ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelFuture future) throws Exception {
out.connect(ctx, remoteAddress, localAddress, future);
}
@Override
public void disconnect(
ChannelOutboundHandlerContext<Object> ctx, ChannelFuture future) throws Exception {
out.disconnect(ctx, future);
}
@Override
public void close(
ChannelOutboundHandlerContext<Object> ctx, ChannelFuture future) throws Exception {
out.close(ctx, future);
}
@Override
public void deregister(
ChannelOutboundHandlerContext<Object> ctx, ChannelFuture future) throws Exception {
out.deregister(ctx, future);
}
@Override
public void flush(
ChannelOutboundHandlerContext<Object> ctx, ChannelFuture future) throws Exception {
out.flush(ctx, future);
}
}

View File

@ -16,6 +16,7 @@
package io.netty.channel.socket.nio; package io.netty.channel.socket.nio;
import io.netty.channel.AbstractServerChannel; import io.netty.channel.AbstractServerChannel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.socket.DefaultServerSocketChannelConfig; import io.netty.channel.socket.DefaultServerSocketChannelConfig;
@ -127,12 +128,12 @@ public class NioServerSocketChannel extends AbstractServerChannel
} }
@Override @Override
protected int doRead() throws Exception { protected int doRead(ChannelBufferHolder<Object> buf) throws Exception {
java.nio.channels.SocketChannel ch = javaChannel().accept(); java.nio.channels.SocketChannel ch = javaChannel().accept();
if (ch == null) { if (ch == null) {
return 0; return 0;
} }
pipeline().nextIn().messageBuffer().add(new NioSocketChannel(this, null, ch)); buf.messageBuffer().add(new NioSocketChannel(this, null, ch));
return 1; return 1;
} }
} }

View File

@ -166,9 +166,9 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha
} }
@Override @Override
protected int doRead() throws Exception { protected int doRead(ChannelBufferHolder<Object> buf) throws Exception {
ChannelBuffer buf = pipeline().nextIn().byteBuffer(); ChannelBuffer byteBuf = buf.byteBuffer();
return buf.writeBytes(javaChannel(), buf.writableBytes()); return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());
} }
@Override @Override