diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkAggregator.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkAggregator.java index 7e0b65fb25..7acc2c2a96 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkAggregator.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkAggregator.java @@ -15,23 +15,19 @@ */ package io.netty.handler.codec.http; -import static io.netty.channel.Channels.*; import static io.netty.handler.codec.http.HttpHeaders.*; - -import java.util.List; -import java.util.Map.Entry; - import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffers; import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerContext; import io.netty.channel.ChannelPipeline; -import io.netty.channel.Channels; -import io.netty.channel.MessageEvent; -import io.netty.channel.SimpleChannelUpstreamHandler; +import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.handler.codec.TooLongFrameException; import io.netty.util.CharsetUtil; +import java.util.List; +import java.util.Map.Entry; + /** * A {@link ChannelHandler} that aggregates an {@link HttpMessage} * and its following {@link HttpChunk}s into a single {@link HttpMessage} with @@ -50,7 +46,7 @@ import io.netty.util.CharsetUtil; * @apiviz.landmark * @apiviz.has io.netty.handler.codec.http.HttpChunk oneway - - filters out */ -public class HttpChunkAggregator extends SimpleChannelUpstreamHandler { +public class HttpChunkAggregator extends MessageToMessageDecoder { private static final ChannelBuffer CONTINUE = ChannelBuffers.copiedBuffer( "HTTP/1.1 100 Continue\r\n\r\n", CharsetUtil.US_ASCII); @@ -75,11 +71,9 @@ public class HttpChunkAggregator extends SimpleChannelUpstreamHandler { this.maxContentLength = maxContentLength; } - @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) - throws Exception { - Object msg = e.getMessage(); + @Override + public HttpMessage decode(ChannelInboundHandlerContext ctx, Object msg) throws Exception { HttpMessage currentMessage = this.currentMessage; if (msg instanceof HttpMessage) { @@ -91,7 +85,7 @@ public class HttpChunkAggregator extends SimpleChannelUpstreamHandler { // No need to notify the upstream handlers - just log. // If decoding a response, just throw an exception. if (is100ContinueExpected(m)) { - write(ctx, succeededFuture(ctx.channel()), CONTINUE.duplicate()); + ctx.write(CONTINUE.duplicate()); } if (m.isChunked()) { @@ -103,12 +97,13 @@ public class HttpChunkAggregator extends SimpleChannelUpstreamHandler { m.removeHeader(HttpHeaders.Names.TRANSFER_ENCODING); } m.setChunked(false); - m.setContent(ChannelBuffers.dynamicBuffer(e.channel().getConfig().getBufferFactory())); + m.setContent(ChannelBuffers.dynamicBuffer()); this.currentMessage = m; + return null; } else { // Not a chunked message - pass through. this.currentMessage = null; - ctx.sendUpstream(e); + return m; } } else if (msg instanceof HttpChunk) { // Sanity check @@ -149,12 +144,15 @@ public class HttpChunkAggregator extends SimpleChannelUpstreamHandler { HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(content.readableBytes())); - // All done - generate the event. - Channels.fireMessageReceived(ctx, currentMessage, e.getRemoteAddress()); + // All done + return currentMessage; + } else { + return null; } } else { - // Neither HttpMessage or HttpChunk - ctx.sendUpstream(e); + throw new IllegalStateException( + "Only " + HttpMessage.class.getSimpleName() + " and " + + HttpChunk.class.getSimpleName() + " are accepted: " + msg.getClass().getName()); } } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java index 8049b79a90..35d7d6cf82 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java @@ -15,16 +15,14 @@ */ package io.netty.handler.codec.http; -import java.util.Queue; - import io.netty.buffer.ChannelBuffer; -import io.netty.channel.Channel; -import io.netty.channel.ChannelDownstreamHandler; -import io.netty.channel.ChannelEvent; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelUpstreamHandler; +import io.netty.channel.ChannelInboundHandlerContext; +import io.netty.channel.ChannelOutboundHandlerContext; +import io.netty.channel.CombinedChannelHandler; import io.netty.util.internal.QueueFactory; +import java.util.Queue; + /** * A combination of {@link HttpRequestEncoder} and {@link HttpResponseDecoder} * which enables easier client side HTTP implementation. {@link HttpClientCodec} @@ -38,8 +36,7 @@ import io.netty.util.internal.QueueFactory; * @apiviz.has io.netty.handler.codec.http.HttpResponseDecoder * @apiviz.has io.netty.handler.codec.http.HttpRequestEncoder */ -public class HttpClientCodec implements ChannelUpstreamHandler, - ChannelDownstreamHandler { +public class HttpClientCodec extends CombinedChannelHandler { /** A queue that is used for correlating a request and a response. */ final Queue queue = QueueFactory.createQueue(HttpMethod.class); @@ -47,9 +44,6 @@ public class HttpClientCodec implements ChannelUpstreamHandler, /** If true, decoding stops (i.e. pass-through) */ volatile boolean done; - private final HttpRequestEncoder encoder = new Encoder(); - private final HttpResponseDecoder decoder; - /** * Creates a new instance with the default decoder options * ({@code maxInitialLineLength (4096}}, {@code maxHeaderSize (8192)}, and @@ -64,33 +58,19 @@ public class HttpClientCodec implements ChannelUpstreamHandler, */ public HttpClientCodec( int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) { - decoder = new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize); - } - - @Override - public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) - throws Exception { - decoder.handleUpstream(ctx, e); - } - - @Override - public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) - throws Exception { - encoder.handleDownstream(ctx, e); + init( + new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize), + new Encoder()); } private final class Encoder extends HttpRequestEncoder { - - Encoder() { - } - @Override - protected Object encode(ChannelHandlerContext ctx, Channel channel, - Object msg) throws Exception { + public void encode( + ChannelOutboundHandlerContext ctx, Object msg, ChannelBuffer out) throws Exception { if (msg instanceof HttpRequest && !done) { queue.offer(((HttpRequest) msg).getMethod()); } - return super.encode(ctx, channel, msg); + super.encode(ctx, msg, out); } } @@ -101,12 +81,12 @@ public class HttpClientCodec implements ChannelUpstreamHandler, } @Override - protected Object decode(ChannelHandlerContext ctx, Channel channel, - ChannelBuffer buffer, State state) throws Exception { + public Object decode( + ChannelInboundHandlerContext ctx, ChannelBuffer buffer) throws Exception { if (done) { return buffer.readBytes(actualReadableBytes()); } else { - return super.decode(ctx, channel, buffer, state); + return super.decode(ctx, buffer); } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpMessageDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpMessageDecoder.java index 95b9ed0ea8..4d6f936129 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpMessageDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpMessageDecoder.java @@ -15,16 +15,15 @@ */ package io.netty.handler.codec.http; -import java.util.List; - import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffers; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.ReplayingDecoder; import io.netty.handler.codec.TooLongFrameException; +import java.util.List; + /** * Decodes {@link ChannelBuffer}s into {@link HttpMessage}s and * {@link HttpChunk}s. @@ -98,7 +97,7 @@ import io.netty.handler.codec.TooLongFrameException; * implement all abstract methods properly. * @apiviz.landmark */ -public abstract class HttpMessageDecoder extends ReplayingDecoder { +public abstract class HttpMessageDecoder extends ReplayingDecoder { private final int maxInitialLineLength; private final int maxHeaderSize; @@ -143,7 +142,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder ctx, ChannelBuffer buffer) throws Exception { + switch (state()) { case SKIP_CONTROL_CHARS: { try { skipControlCharacters(buffer); @@ -237,7 +236,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder= 100 && code < 200) { @@ -404,7 +403,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder 0; result --) { if (!Character.isWhitespace(sb.charAt(result - 1))) { diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpMessageEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpMessageEncoder.java index a4de1caecb..2050a66c20 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpMessageEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpMessageEncoder.java @@ -17,19 +17,17 @@ package io.netty.handler.codec.http; import static io.netty.buffer.ChannelBuffers.*; import static io.netty.handler.codec.http.HttpCodecUtil.*; +import io.netty.buffer.ChannelBuffer; +import io.netty.channel.ChannelOutboundHandlerContext; +import io.netty.handler.codec.MessageToStreamEncoder; +import io.netty.handler.codec.UnsupportedMessageTypeException; +import io.netty.handler.codec.http.HttpHeaders.Names; +import io.netty.handler.codec.http.HttpHeaders.Values; +import io.netty.util.CharsetUtil; import java.io.UnsupportedEncodingException; import java.util.Map; -import io.netty.buffer.ChannelBuffer; -import io.netty.buffer.ChannelBuffers; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.http.HttpHeaders.Names; -import io.netty.handler.codec.http.HttpHeaders.Values; -import io.netty.handler.codec.oneone.OneToOneEncoder; -import io.netty.util.CharsetUtil; - /** * Encodes an {@link HttpMessage} or an {@link HttpChunk} into * a {@link ChannelBuffer}. @@ -44,7 +42,7 @@ import io.netty.util.CharsetUtil; * implement all abstract methods properly. * @apiviz.landmark */ -public abstract class HttpMessageEncoder extends OneToOneEncoder { +public abstract class HttpMessageEncoder extends MessageToStreamEncoder { private static final ChannelBuffer LAST_CHUNK = copiedBuffer("0\r\n\r\n", CharsetUtil.US_ASCII); @@ -58,7 +56,7 @@ public abstract class HttpMessageEncoder extends OneToOneEncoder { } @Override - protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { + public void encode(ChannelOutboundHandlerContext ctx, Object msg, ChannelBuffer out) throws Exception { if (msg instanceof HttpMessage) { HttpMessage m = (HttpMessage) msg; boolean chunked; @@ -72,70 +70,59 @@ public abstract class HttpMessageEncoder extends OneToOneEncoder { } else { chunked = this.chunked = HttpCodecUtil.isTransferEncodingChunked(m); } - ChannelBuffer header = ChannelBuffers.dynamicBuffer( - channel.getConfig().getBufferFactory()); - encodeInitialLine(header, m); - encodeHeaders(header, m); - header.writeByte(CR); - header.writeByte(LF); + out.markWriterIndex(); + encodeInitialLine(out, m); + encodeHeaders(out, m); + out.writeByte(CR); + out.writeByte(LF); ChannelBuffer content = m.getContent(); - if (!content.readable()) { - return header; // no content - } else if (chunked) { - throw new IllegalArgumentException( - "HttpMessage.content must be empty " + - "if Transfer-Encoding is chunked."); - } else { - return wrappedBuffer(header, content); + if (content.readable()) { + if (chunked) { + out.resetWriterIndex(); + throw new IllegalArgumentException( + "HttpMessage.content must be empty " + + "if Transfer-Encoding is chunked."); + } else { + out.writeBytes(content, content.readerIndex(), content.readableBytes()); + } } - } - - if (msg instanceof HttpChunk) { + } else if (msg instanceof HttpChunk) { HttpChunk chunk = (HttpChunk) msg; if (chunked) { if (chunk.isLast()) { chunked = false; if (chunk instanceof HttpChunkTrailer) { - ChannelBuffer trailer = ChannelBuffers.dynamicBuffer( - channel.getConfig().getBufferFactory()); - trailer.writeByte((byte) '0'); - trailer.writeByte(CR); - trailer.writeByte(LF); - encodeTrailingHeaders(trailer, (HttpChunkTrailer) chunk); - trailer.writeByte(CR); - trailer.writeByte(LF); - return trailer; + out.writeByte((byte) '0'); + out.writeByte(CR); + out.writeByte(LF); + encodeTrailingHeaders(out, (HttpChunkTrailer) chunk); + out.writeByte(CR); + out.writeByte(LF); } else { - return LAST_CHUNK.duplicate(); + out.writeBytes(LAST_CHUNK, LAST_CHUNK.readerIndex(), LAST_CHUNK.readableBytes()); } } else { ChannelBuffer content = chunk.getContent(); int contentLength = content.readableBytes(); - - return wrappedBuffer( - copiedBuffer( - Integer.toHexString(contentLength), - CharsetUtil.US_ASCII), - wrappedBuffer(CRLF), - content.slice(content.readerIndex(), contentLength), - wrappedBuffer(CRLF)); + out.writeBytes(copiedBuffer(Integer.toHexString(contentLength), CharsetUtil.US_ASCII)); + out.writeBytes(CRLF); + out.writeBytes(content, content.readerIndex(), contentLength); + out.writeBytes(CRLF); } } else { - if (chunk.isLast()) { - return null; - } else { - return chunk.getContent(); + if (!chunk.isLast()) { + ChannelBuffer chunkContent = chunk.getContent(); + out.writeBytes(chunkContent, chunkContent.readerIndex(), chunkContent.readableBytes()); } } + } else { + throw new UnsupportedMessageTypeException(msg, HttpMessage.class, HttpChunk.class); } - - // Unknown message type. - return msg; } - private void encodeHeaders(ChannelBuffer buf, HttpMessage message) { + private static void encodeHeaders(ChannelBuffer buf, HttpMessage message) { try { for (Map.Entry h: message.getHeaders()) { encodeHeader(buf, h.getKey(), h.getValue()); @@ -145,7 +132,7 @@ public abstract class HttpMessageEncoder extends OneToOneEncoder { } } - private void encodeTrailingHeaders(ChannelBuffer buf, HttpChunkTrailer trailer) { + private static void encodeTrailingHeaders(ChannelBuffer buf, HttpChunkTrailer trailer) { try { for (Map.Entry h: trailer.getHeaders()) { encodeHeader(buf, h.getKey(), h.getValue()); @@ -155,7 +142,7 @@ public abstract class HttpMessageEncoder extends OneToOneEncoder { } } - private void encodeHeader(ChannelBuffer buf, String header, String value) + private static void encodeHeader(ChannelBuffer buf, String header, String value) throws UnsupportedEncodingException { buf.writeBytes(header.getBytes("ASCII")); buf.writeByte(COLON); diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpServerCodec.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpServerCodec.java index 17519a220d..b45491af1d 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpServerCodec.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpServerCodec.java @@ -15,10 +15,7 @@ */ package io.netty.handler.codec.http; -import io.netty.channel.ChannelDownstreamHandler; -import io.netty.channel.ChannelEvent; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelUpstreamHandler; +import io.netty.channel.CombinedChannelHandler; /** * A combination of {@link HttpRequestDecoder} and {@link HttpResponseEncoder} @@ -28,11 +25,7 @@ import io.netty.channel.ChannelUpstreamHandler; * @apiviz.has io.netty.handler.codec.http.HttpRequestDecoder * @apiviz.has io.netty.handler.codec.http.HttpResponseEncoder */ -public class HttpServerCodec implements ChannelUpstreamHandler, - ChannelDownstreamHandler { - - private final HttpRequestDecoder decoder; - private final HttpResponseEncoder encoder = new HttpResponseEncoder(); +public class HttpServerCodec extends CombinedChannelHandler { /** * Creates a new instance with the default decoder options @@ -48,18 +41,8 @@ public class HttpServerCodec implements ChannelUpstreamHandler, */ public HttpServerCodec( int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) { - decoder = new HttpRequestDecoder(maxInitialLineLength, maxHeaderSize, maxChunkSize); - } - - @Override - public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) - throws Exception { - decoder.handleUpstream(ctx, e); - } - - @Override - public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) - throws Exception { - encoder.handleDownstream(ctx, e); + super( + new HttpRequestDecoder(maxInitialLineLength, maxHeaderSize, maxChunkSize), + new HttpResponseEncoder()); } } diff --git a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java index 1d786b5387..5394c5cd7d 100644 --- a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java @@ -339,6 +339,25 @@ public abstract class ReplayingDecoder> extends StreamToMes return oldState; } + /** + * Returns the actual number of readable bytes in the internal cumulative + * buffer of this decoder. You usually do not need to rely on this value + * to write a decoder. Use it only when you muse use it at your own risk. + * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}. + */ + protected int actualReadableBytes() { + return internalBuffer().readableBytes(); + } + + /** + * Returns the internal cumulative buffer of this decoder. You usually + * do not need to access the internal buffer directly to write a decoder. + * Use it only when you must use it at your own risk. + */ + protected ChannelBuffer internalBuffer() { + return cumulation; + } + @Override public ChannelBufferHolder newInboundBuffer( ChannelInboundHandlerContext ctx) throws Exception { @@ -360,8 +379,7 @@ public abstract class ReplayingDecoder> extends StreamToMes try { if (unfoldAndAdd(ctx, ctx.nextIn(), decodeLast(ctx, replayable))) { - in.discardReadBytes(); - ctx.fireInboundBufferUpdated(); + fireInboundBufferUpdated(ctx, in); } } catch (Signal replay) { // Ignore @@ -377,8 +395,10 @@ public abstract class ReplayingDecoder> extends StreamToMes ctx.fireChannelInactive(); } - private void callDecode(ChannelInboundHandlerContext ctx) { + @Override + protected void callDecode(ChannelInboundHandlerContext ctx) { ChannelBuffer in = cumulation; + boolean decoded = false; while (in.readable()) { try { int oldReaderIndex = checkpoint = in.readerIndex(); @@ -422,8 +442,15 @@ public abstract class ReplayingDecoder> extends StreamToMes } // A successful decode - MessageToMessageEncoder.unfoldAndAdd(ctx, ctx.nextIn(), result); + if (unfoldAndAdd(ctx, ctx.nextIn(), result)) { + decoded = true; + } } catch (Throwable t) { + if (decoded) { + decoded = false; + fireInboundBufferUpdated(ctx, in); + } + if (t instanceof CodecException) { ctx.fireExceptionCaught(t); } else { @@ -431,5 +458,15 @@ public abstract class ReplayingDecoder> extends StreamToMes } } } + + if (decoded) { + fireInboundBufferUpdated(ctx, in); + } + } + + private void fireInboundBufferUpdated(ChannelInboundHandlerContext ctx, ChannelBuffer in) { + checkpoint -= in.readerIndex(); + in.discardReadBytes(); + ctx.fireInboundBufferUpdated(); } } diff --git a/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java index 06bfc926b1..04248ce398 100644 --- a/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java @@ -48,7 +48,7 @@ public abstract class StreamToMessageDecoder extends ChannelInboundHandlerAda ctx.fireChannelInactive(); } - private void callDecode(ChannelInboundHandlerContext ctx) { + protected void callDecode(ChannelInboundHandlerContext ctx) { ChannelBuffer in = ctx.in().byteBuffer(); boolean decoded = false; @@ -75,6 +75,12 @@ public abstract class StreamToMessageDecoder extends ChannelInboundHandlerAda break; } } catch (Throwable t) { + if (decoded) { + decoded = false; + in.discardReadBytes(); + ctx.fireInboundBufferUpdated(); + } + if (t instanceof CodecException) { ctx.fireExceptionCaught(t); } else { diff --git a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClient.java b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClient.java index 13410579fe..99d837f75d 100644 --- a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClient.java +++ b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClient.java @@ -15,14 +15,10 @@ */ package io.netty.example.http.snoop; -import java.net.InetSocketAddress; -import java.net.URI; -import java.util.concurrent.Executors; - -import io.netty.bootstrap.ClientBootstrap; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.socket.nio.NioClientSocketChannelFactory; +import io.netty.channel.ChannelBootstrap; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.socket.nio.SelectorEventLoop; import io.netty.handler.codec.http.CookieEncoder; import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.HttpHeaders; @@ -30,6 +26,9 @@ import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpVersion; +import java.net.InetSocketAddress; +import java.net.URI; + /** * A simple HTTP client that prints out the content of the HTTP response to * {@link System#out} to test {@link HttpSnoopServer}. @@ -42,7 +41,7 @@ public class HttpSnoopClient { this.uri = uri; } - public void run() { + public void run() throws Exception { String scheme = uri.getScheme() == null? "http" : uri.getScheme(); String host = uri.getHost() == null? "localhost" : uri.getHost(); int port = uri.getPort(); @@ -62,45 +61,38 @@ public class HttpSnoopClient { boolean ssl = scheme.equalsIgnoreCase("https"); // Configure the client. - ClientBootstrap bootstrap = new ClientBootstrap( - new NioClientSocketChannelFactory( - Executors.newCachedThreadPool())); + ChannelBootstrap b = new ChannelBootstrap(); + try { + b.eventLoop(new SelectorEventLoop()) + .channel(new NioSocketChannel()) + .initializer(new HttpSnoopClientInitializer(ssl)) + .remoteAddress(new InetSocketAddress(host, port)); - // Set up the event pipeline factory. - bootstrap.setPipelineFactory(new HttpSnoopClientPipelineFactory(ssl)); + // Make the connection attempt. + Channel ch = b.connect().sync().channel(); - // Start the connection attempt. - ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); + // Prepare the HTTP request. + HttpRequest request = new DefaultHttpRequest( + HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath()); + request.setHeader(HttpHeaders.Names.HOST, host); + request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); + request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP); - // Wait until the connection attempt succeeds or fails. - Channel channel = future.awaitUninterruptibly().channel(); - if (!future.isSuccess()) { - future.cause().printStackTrace(); - bootstrap.releaseExternalResources(); - return; + // Set some example cookies. + CookieEncoder httpCookieEncoder = new CookieEncoder(false); + httpCookieEncoder.addCookie("my-cookie", "foo"); + httpCookieEncoder.addCookie("another-cookie", "bar"); + request.setHeader(HttpHeaders.Names.COOKIE, httpCookieEncoder.encode()); + + // Send the HTTP request. + ch.write(request); + + // Wait for the server to close the connection. + ch.closeFuture().sync(); + } finally { + // Shut down executor threads to exit. + b.shutdown(); } - - // Prepare the HTTP request. - HttpRequest request = new DefaultHttpRequest( - HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath()); - request.setHeader(HttpHeaders.Names.HOST, host); - request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); - request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP); - - // Set some example cookies. - CookieEncoder httpCookieEncoder = new CookieEncoder(false); - httpCookieEncoder.addCookie("my-cookie", "foo"); - httpCookieEncoder.addCookie("another-cookie", "bar"); - request.setHeader(HttpHeaders.Names.COOKIE, httpCookieEncoder.encode()); - - // Send the HTTP request. - channel.write(request); - - // Wait for the server to close the connection. - channel.getCloseFuture().awaitUninterruptibly(); - - // Shut down executor threads to exit. - bootstrap.releaseExternalResources(); } public static void main(String[] args) throws Exception { diff --git a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClientHandler.java b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClientHandler.java index 0e5d83e703..9ca5f35f4e 100644 --- a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClientHandler.java +++ b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClientHandler.java @@ -16,21 +16,44 @@ package io.netty.example.http.snoop; import io.netty.buffer.ChannelBuffer; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.MessageEvent; -import io.netty.channel.SimpleChannelUpstreamHandler; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelBufferHolders; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundHandlerContext; import io.netty.handler.codec.http.HttpChunk; import io.netty.handler.codec.http.HttpResponse; import io.netty.util.CharsetUtil; -public class HttpSnoopClientHandler extends SimpleChannelUpstreamHandler { +import java.util.Queue; + +public class HttpSnoopClientHandler extends ChannelInboundHandlerAdapter { private boolean readingChunks; + @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { + public ChannelBufferHolder newInboundBuffer( + ChannelInboundHandlerContext ctx) throws Exception { + return ChannelBufferHolders.messageBuffer(); + } + + + @Override + public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) + throws Exception { + Queue in = ctx.in().messageBuffer(); + while (handleMessage(in.poll())) { + continue; + } + } + + private boolean handleMessage(Object msg) throws Exception { + if (msg == null) { + return false; + } + if (!readingChunks) { - HttpResponse response = (HttpResponse) e.getMessage(); + HttpResponse response = (HttpResponse) msg; System.out.println("STATUS: " + response.getStatus()); System.out.println("VERSION: " + response.getProtocolVersion()); @@ -57,7 +80,7 @@ public class HttpSnoopClientHandler extends SimpleChannelUpstreamHandler { } } } else { - HttpChunk chunk = (HttpChunk) e.getMessage(); + HttpChunk chunk = (HttpChunk) msg; if (chunk.isLast()) { readingChunks = false; System.out.println("} END OF CHUNKED CONTENT"); @@ -66,5 +89,14 @@ public class HttpSnoopClientHandler extends SimpleChannelUpstreamHandler { System.out.flush(); } } + + return true; + } + + @Override + public void exceptionCaught( + ChannelInboundHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); } } diff --git a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClientPipelineFactory.java b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClientInitializer.java similarity index 64% rename from example/src/main/java/io/netty/example/http/snoop/HttpSnoopClientPipelineFactory.java rename to example/src/main/java/io/netty/example/http/snoop/HttpSnoopClientInitializer.java index 7013659ef4..4bbec634dc 100644 --- a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClientPipelineFactory.java +++ b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClientInitializer.java @@ -15,51 +15,49 @@ */ package io.netty.example.http.snoop; -import static io.netty.channel.Channels.*; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.example.securechat.SecureChatSslContextFactory; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; import javax.net.ssl.SSLEngine; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPipelineFactory; -import io.netty.example.securechat.SecureChatSslContextFactory; -import io.netty.handler.codec.http.HttpClientCodec; -import io.netty.handler.codec.http.HttpContentDecompressor; -import io.netty.handler.logging.LoggingHandler; -import io.netty.handler.ssl.SslHandler; -import io.netty.logging.InternalLogLevel; - -public class HttpSnoopClientPipelineFactory implements ChannelPipelineFactory { +public class HttpSnoopClientInitializer extends ChannelInitializer { private final boolean ssl; - public HttpSnoopClientPipelineFactory(boolean ssl) { + public HttpSnoopClientInitializer(boolean ssl) { this.ssl = ssl; } @Override - public ChannelPipeline getPipeline() throws Exception { + public void initChannel(Channel ch) throws Exception { // Create a default pipeline implementation. - ChannelPipeline pipeline = pipeline(); + ChannelPipeline p = ch.pipeline(); - pipeline.addLast("log", new LoggingHandler(InternalLogLevel.INFO)); + p.addLast("log", new LoggingHandler(LogLevel.INFO)); // Enable HTTPS if necessary. if (ssl) { SSLEngine engine = SecureChatSslContextFactory.getClientContext().createSSLEngine(); engine.setUseClientMode(true); - pipeline.addLast("ssl", new SslHandler(engine)); + // FIXME: Port SslHandler to the new API + //p.addLast("ssl", new SslHandler(engine)); } - pipeline.addLast("codec", new HttpClientCodec()); + p.addLast("codec", new HttpClientCodec()); // Remove the following line if you don't want automatic content decompression. - pipeline.addLast("inflater", new HttpContentDecompressor()); + // FIXME: Port HttpContentDecompressor to the new API + //p.addLast("inflater", new HttpContentDecompressor()); // Uncomment the following line if you don't want to handle HttpChunks. //pipeline.addLast("aggregator", new HttpChunkAggregator(1048576)); - pipeline.addLast("handler", new HttpSnoopClientHandler()); - return pipeline; + p.addLast("handler", new HttpSnoopClientHandler()); } } diff --git a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServer.java b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServer.java index b0699a3032..4697b1da7b 100644 --- a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServer.java +++ b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServer.java @@ -15,11 +15,12 @@ */ package io.netty.example.http.snoop; -import java.net.InetSocketAddress; -import java.util.concurrent.Executors; +import io.netty.channel.Channel; +import io.netty.channel.ServerChannelBootstrap; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.SelectorEventLoop; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.socket.nio.NioServerSocketChannelFactory; +import java.net.InetSocketAddress; /** * An HTTP server that sends back the content of the received HTTP request @@ -33,20 +34,24 @@ public class HttpSnoopServer { this.port = port; } - public void run() { + public void run() throws Exception { // Configure the server. - ServerBootstrap bootstrap = new ServerBootstrap( - new NioServerSocketChannelFactory( - Executors.newCachedThreadPool())); + ServerChannelBootstrap b = new ServerChannelBootstrap(); - // Set up the event pipeline factory. - bootstrap.setPipelineFactory(new HttpSnoopServerPipelineFactory()); + try { + b.eventLoop(new SelectorEventLoop(), new SelectorEventLoop()) + .channel(new NioServerSocketChannel()) + .childInitializer(new HttpSnoopServerInitializer()) + .localAddress(new InetSocketAddress(port)); - // Bind and start to accept incoming connections. - bootstrap.bind(new InetSocketAddress(port)); + Channel ch = b.bind().sync().channel(); + ch.closeFuture().sync(); + } finally { + b.shutdown(); + } } - public static void main(String[] args) { + public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); diff --git a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServerHandler.java b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServerHandler.java index 2e2ff07566..3c00146a64 100644 --- a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServerHandler.java +++ b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServerHandler.java @@ -19,20 +19,14 @@ import static io.netty.handler.codec.http.HttpHeaders.*; import static io.netty.handler.codec.http.HttpHeaders.Names.*; import static io.netty.handler.codec.http.HttpResponseStatus.*; import static io.netty.handler.codec.http.HttpVersion.*; - -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffers; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ExceptionEvent; -import io.netty.channel.MessageEvent; -import io.netty.channel.SimpleChannelUpstreamHandler; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundHandlerContext; import io.netty.handler.codec.http.Cookie; import io.netty.handler.codec.http.CookieDecoder; import io.netty.handler.codec.http.CookieEncoder; @@ -45,7 +39,13 @@ import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.QueryStringDecoder; import io.netty.util.CharsetUtil; -public class HttpSnoopServerHandler extends SimpleChannelUpstreamHandler { +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Queue; +import java.util.Set; + +public class HttpSnoopServerHandler extends ChannelInboundHandlerAdapter { private HttpRequest request; private boolean readingChunks; @@ -53,12 +53,30 @@ public class HttpSnoopServerHandler extends SimpleChannelUpstreamHandler { private final StringBuilder buf = new StringBuilder(); @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { + public ChannelBufferHolder newInboundBuffer( + ChannelInboundHandlerContext ctx) throws Exception { + return ChannelBufferHolders.messageBuffer(); + } + + @Override + public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) + throws Exception { + Queue in = ctx.in().messageBuffer(); + while (handleMessage(ctx, in.poll())) { + continue; + } + } + + private boolean handleMessage(ChannelInboundHandlerContext ctx, Object msg) throws Exception { + if (msg == null) { + return false; + } + if (!readingChunks) { - HttpRequest request = this.request = (HttpRequest) e.getMessage(); + HttpRequest request = this.request = (HttpRequest) msg; if (is100ContinueExpected(request)) { - send100Continue(e); + send100Continue(ctx); } buf.setLength(0); @@ -94,10 +112,10 @@ public class HttpSnoopServerHandler extends SimpleChannelUpstreamHandler { if (content.readable()) { buf.append("CONTENT: " + content.toString(CharsetUtil.UTF_8) + "\r\n"); } - writeResponse(e); + writeResponse(ctx); } } else { - HttpChunk chunk = (HttpChunk) e.getMessage(); + HttpChunk chunk = (HttpChunk) msg; if (chunk.isLast()) { readingChunks = false; buf.append("END OF CONTENT\r\n"); @@ -113,14 +131,16 @@ public class HttpSnoopServerHandler extends SimpleChannelUpstreamHandler { buf.append("\r\n"); } - writeResponse(e); + writeResponse(ctx); } else { buf.append("CHUNK: " + chunk.getContent().toString(CharsetUtil.UTF_8) + "\r\n"); } } + + return true; } - private void writeResponse(MessageEvent e) { + private void writeResponse(ChannelInboundHandlerContext ctx) { // Decide whether to close the connection or not. boolean keepAlive = isKeepAlive(request); @@ -152,7 +172,7 @@ public class HttpSnoopServerHandler extends SimpleChannelUpstreamHandler { } // Write the response. - ChannelFuture future = e.channel().write(response); + ChannelFuture future = ctx.write(response); // Close the non-keep-alive connection after the write operation is done. if (!keepAlive) { @@ -160,15 +180,15 @@ public class HttpSnoopServerHandler extends SimpleChannelUpstreamHandler { } } - private void send100Continue(MessageEvent e) { + private static void send100Continue(ChannelInboundHandlerContext ctx) { HttpResponse response = new DefaultHttpResponse(HTTP_1_1, CONTINUE); - e.channel().write(response); + ctx.write(response); } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) - throws Exception { - e.cause().printStackTrace(); - e.channel().close(); + public void exceptionCaught( + ChannelInboundHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); } } diff --git a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServerPipelineFactory.java b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServerInitializer.java similarity index 66% rename from example/src/main/java/io/netty/example/http/snoop/HttpSnoopServerPipelineFactory.java rename to example/src/main/java/io/netty/example/http/snoop/HttpSnoopServerInitializer.java index c7a8bacc5b..09ae47828e 100644 --- a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServerPipelineFactory.java +++ b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServerInitializer.java @@ -15,32 +15,30 @@ */ package io.netty.example.http.snoop; -import static io.netty.channel.Channels.*; - +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPipelineFactory; -import io.netty.handler.codec.http.HttpContentCompressor; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; -public class HttpSnoopServerPipelineFactory implements ChannelPipelineFactory { +public class HttpSnoopServerInitializer extends ChannelInitializer { @Override - public ChannelPipeline getPipeline() throws Exception { + public void initChannel(Channel ch) throws Exception { // Create a default pipeline implementation. - ChannelPipeline pipeline = pipeline(); + ChannelPipeline p = ch.pipeline(); // Uncomment the following line if you want HTTPS //SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine(); //engine.setUseClientMode(false); - //pipeline.addLast("ssl", new SslHandler(engine)); + //p.addLast("ssl", new SslHandler(engine)); - pipeline.addLast("decoder", new HttpRequestDecoder()); + p.addLast("decoder", new HttpRequestDecoder()); // Uncomment the following line if you don't want to handle HttpChunks. //pipeline.addLast("aggregator", new HttpChunkAggregator(1048576)); - pipeline.addLast("encoder", new HttpResponseEncoder()); + p.addLast("encoder", new HttpResponseEncoder()); // Remove the following line if you don't want automatic content compression. - pipeline.addLast("deflater", new HttpContentCompressor()); - pipeline.addLast("handler", new HttpSnoopServerHandler()); - return pipeline; + // FIXME: Port HttpContentCompressor to the new API + //p.addLast("deflater", new HttpContentCompressor()); + p.addLast("handler", new HttpSnoopServerHandler()); } } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index d5d07f3999..1daeb1a608 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -15,6 +15,7 @@ */ package io.netty.channel; +import io.netty.buffer.ChannelBuffer; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import io.netty.util.DefaultAttributeMap; @@ -636,19 +637,23 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public void read() { assert eventLoop().inEventLoop(); + + final ChannelBufferHolder buf = pipeline().nextIn(); + final boolean hasByteBuffer = buf.hasByteBuffer(); + long readAmount = 0; boolean closed = false; try { for (;;) { - int localReadAmount = doRead(); + int localReadAmount = doRead(buf); if (localReadAmount > 0) { readAmount += localReadAmount; - continue; - } - if (localReadAmount == 0) { - break; - } - if (localReadAmount < 0) { + expandReadBuffer(buf, hasByteBuffer); + } else if (localReadAmount == 0) { + if (!expandReadBuffer(buf, hasByteBuffer)) { + break; + } + } else if (localReadAmount < 0) { closed = true; break; } @@ -851,7 +856,22 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha protected abstract void doClose() throws Exception; protected abstract void doDeregister() throws Exception; - protected abstract int doRead() throws Exception; + protected abstract int doRead(ChannelBufferHolder buf) throws Exception; protected abstract int doFlush(boolean lastSpin) throws Exception; protected abstract boolean inEventLoopDrivenFlush(); + + private static boolean expandReadBuffer(ChannelBufferHolder buf, boolean hasByteBuffer) { + if (!hasByteBuffer) { + return false; + } + + ChannelBuffer byteBuf = buf.byteBuffer(); + if (!byteBuf.writable()) { + // FIXME: Use a sensible value. + byteBuf.ensureWritableBytes(128); + return true; + } + + return false; + } } diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java index a6e40406a4..c5e51e4ce5 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java @@ -4,145 +4,6 @@ import java.net.SocketAddress; public abstract class ChannelHandlerAdapter implements ChannelInboundHandler, ChannelOutboundHandler { - public static ChannelHandlerAdapter combine( - ChannelInboundHandler inboundHandler, ChannelOutboundHandler outboundHandler) { - if (inboundHandler == null) { - throw new NullPointerException("inboundHandler"); - } - if (outboundHandler == null) { - throw new NullPointerException("outboundHandler"); - } - - final ChannelInboundHandler in = inboundHandler; - final ChannelOutboundHandler out = outboundHandler; - return new ChannelHandlerAdapter() { - - @Override - public ChannelBufferHolder newInboundBuffer( - ChannelInboundHandlerContext ctx) throws Exception { - return in.newInboundBuffer(ctx); - } - - @Override - public ChannelBufferHolder newOutboundBuffer( - ChannelOutboundHandlerContext ctx) throws Exception { - return out.newOutboundBuffer(ctx); - } - - @Override - public void beforeAdd(ChannelHandlerContext ctx) throws Exception { - try { - in.beforeAdd(ctx); - } finally { - out.beforeAdd(ctx); - } - } - - @Override - public void afterAdd(ChannelHandlerContext ctx) throws Exception { - try { - in.afterAdd(ctx); - } finally { - out.afterAdd(ctx); - } - } - - @Override - public void beforeRemove(ChannelHandlerContext ctx) throws Exception { - try { - in.beforeRemove(ctx); - } finally { - out.beforeRemove(ctx); - } - } - - @Override - public void afterRemove(ChannelHandlerContext ctx) throws Exception { - try { - in.afterRemove(ctx); - } finally { - out.afterRemove(ctx); - } - } - - @Override - public void channelRegistered(ChannelInboundHandlerContext ctx) throws Exception { - in.channelRegistered(ctx); - } - - @Override - public void channelUnregistered(ChannelInboundHandlerContext ctx) throws Exception { - in.channelUnregistered(ctx); - } - - @Override - public void channelActive(ChannelInboundHandlerContext ctx) throws Exception { - in.channelActive(ctx); - } - - @Override - public void channelInactive(ChannelInboundHandlerContext ctx) throws Exception { - in.channelInactive(ctx); - } - - @Override - public void exceptionCaught( - ChannelInboundHandlerContext ctx, Throwable cause) throws Exception { - in.exceptionCaught(ctx, cause); - } - - @Override - public void userEventTriggered( - ChannelInboundHandlerContext ctx, Object evt) throws Exception { - in.userEventTriggered(ctx, evt); - } - - @Override - public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { - in.inboundBufferUpdated(ctx); - } - - @Override - public void bind( - ChannelOutboundHandlerContext ctx, - SocketAddress localAddress, ChannelFuture future) throws Exception { - out.bind(ctx, localAddress, future); - } - - @Override - public void connect( - ChannelOutboundHandlerContext ctx, - SocketAddress remoteAddress, SocketAddress localAddress, - ChannelFuture future) throws Exception { - out.connect(ctx, remoteAddress, localAddress, future); - } - - @Override - public void disconnect( - ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { - out.disconnect(ctx, future); - } - - @Override - public void close( - ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { - out.close(ctx, future); - } - - @Override - public void deregister( - ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { - out.deregister(ctx, future); - } - - @Override - public void flush( - ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { - out.flush(ctx, future); - } - }; - } - @Override public void beforeAdd(ChannelHandlerContext ctx) throws Exception { // Do nothing by default. diff --git a/transport/src/main/java/io/netty/channel/CombinedChannelHandler.java b/transport/src/main/java/io/netty/channel/CombinedChannelHandler.java new file mode 100644 index 0000000000..a9695257d6 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/CombinedChannelHandler.java @@ -0,0 +1,166 @@ +package io.netty.channel; + +import java.net.SocketAddress; + +public class CombinedChannelHandler implements ChannelInboundHandler, + ChannelOutboundHandler { + + private ChannelOutboundHandler out; + private ChannelInboundHandler in; + + public CombinedChannelHandler() { + // User will call init in the subclass constructor. + } + + public CombinedChannelHandler( + ChannelInboundHandler inboundHandler, ChannelOutboundHandler outboundHandler) { + init(inboundHandler, outboundHandler); + } + + @SuppressWarnings("unchecked") + protected void init(ChannelInboundHandler inboundHandler, + ChannelOutboundHandler outboundHandler) { + if (inboundHandler == null) { + throw new NullPointerException("inboundHandler"); + } + if (outboundHandler == null) { + throw new NullPointerException("outboundHandler"); + } + + if (in != null) { + throw new IllegalStateException("init() cannot be called more than once."); + } + + in = (ChannelInboundHandler) inboundHandler; + out = (ChannelOutboundHandler) outboundHandler; + } + + @Override + public ChannelBufferHolder newInboundBuffer( + ChannelInboundHandlerContext ctx) throws Exception { + return in.newInboundBuffer(ctx); + } + + @Override + public ChannelBufferHolder newOutboundBuffer( + ChannelOutboundHandlerContext ctx) throws Exception { + return out.newOutboundBuffer(ctx); + } + + @Override + public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + if (in == null) { + throw new IllegalStateException( + "not initialized yet - call init() in the constructor of the subclass"); + } + + try { + in.beforeAdd(ctx); + } finally { + out.beforeAdd(ctx); + } + } + + @Override + public void afterAdd(ChannelHandlerContext ctx) throws Exception { + try { + in.afterAdd(ctx); + } finally { + out.afterAdd(ctx); + } + } + + @Override + public void beforeRemove(ChannelHandlerContext ctx) throws Exception { + try { + in.beforeRemove(ctx); + } finally { + out.beforeRemove(ctx); + } + } + + @Override + public void afterRemove(ChannelHandlerContext ctx) throws Exception { + try { + in.afterRemove(ctx); + } finally { + out.afterRemove(ctx); + } + } + + @Override + public void channelRegistered(ChannelInboundHandlerContext ctx) throws Exception { + in.channelRegistered(ctx); + } + + @Override + public void channelUnregistered(ChannelInboundHandlerContext ctx) throws Exception { + in.channelUnregistered(ctx); + } + + @Override + public void channelActive(ChannelInboundHandlerContext ctx) throws Exception { + in.channelActive(ctx); + } + + @Override + public void channelInactive(ChannelInboundHandlerContext ctx) throws Exception { + in.channelInactive(ctx); + } + + @Override + public void exceptionCaught( + ChannelInboundHandlerContext ctx, Throwable cause) throws Exception { + in.exceptionCaught(ctx, cause); + } + + @Override + public void userEventTriggered( + ChannelInboundHandlerContext ctx, Object evt) throws Exception { + in.userEventTriggered(ctx, evt); + } + + @Override + public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { + in.inboundBufferUpdated(ctx); + } + + @Override + public void bind( + ChannelOutboundHandlerContext ctx, + SocketAddress localAddress, ChannelFuture future) throws Exception { + out.bind(ctx, localAddress, future); + } + + @Override + public void connect( + ChannelOutboundHandlerContext ctx, + SocketAddress remoteAddress, SocketAddress localAddress, + ChannelFuture future) throws Exception { + out.connect(ctx, remoteAddress, localAddress, future); + } + + @Override + public void disconnect( + ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { + out.disconnect(ctx, future); + } + + @Override + public void close( + ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { + out.close(ctx, future); + } + + @Override + public void deregister( + ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { + out.deregister(ctx, future); + } + + @Override + public void flush( + ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { + out.flush(ctx, future); + } +} \ No newline at end of file diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java index 67d063d38b..45d82e717c 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java @@ -16,6 +16,7 @@ package io.netty.channel.socket.nio; import io.netty.channel.AbstractServerChannel; +import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelException; import io.netty.channel.EventLoop; import io.netty.channel.socket.DefaultServerSocketChannelConfig; @@ -127,12 +128,12 @@ public class NioServerSocketChannel extends AbstractServerChannel } @Override - protected int doRead() throws Exception { + protected int doRead(ChannelBufferHolder buf) throws Exception { java.nio.channels.SocketChannel ch = javaChannel().accept(); if (ch == null) { return 0; } - pipeline().nextIn().messageBuffer().add(new NioSocketChannel(this, null, ch)); + buf.messageBuffer().add(new NioSocketChannel(this, null, ch)); return 1; } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index ba351147b0..038debc64a 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -166,9 +166,9 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha } @Override - protected int doRead() throws Exception { - ChannelBuffer buf = pipeline().nextIn().byteBuffer(); - return buf.writeBytes(javaChannel(), buf.writableBytes()); + protected int doRead(ChannelBufferHolder buf) throws Exception { + ChannelBuffer byteBuf = buf.byteBuffer(); + return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes()); } @Override