diff --git a/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandlerTest.java b/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandlerTest.java index 67f635122b..a0d95fc541 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandlerTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandlerTest.java @@ -17,7 +17,7 @@ package io.netty.handler.codec.http.websocketx; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.channel.MessageList; @@ -156,13 +156,13 @@ public class WebSocketServerProtocolHandlerTest { } } - private static class CustomTextFrameHandler extends ChannelInboundHandlerAdapter { + private static class CustomTextFrameHandler extends ChannelInboundConsumingHandler { private String content; @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - assertEquals(1, msgs.size()); - content = "processed: " + ((TextWebSocketFrame) msgs.get(0)).text(); + public void consume(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { + assertNull(content); + content = "processed: " + msg.text(); } String getContent() { diff --git a/example/src/main/java/io/netty/example/applet/AppletDiscardServer.java b/example/src/main/java/io/netty/example/applet/AppletDiscardServer.java index 878b14fb37..d4b2c94f3d 100644 --- a/example/src/main/java/io/netty/example/applet/AppletDiscardServer.java +++ b/example/src/main/java/io/netty/example/applet/AppletDiscardServer.java @@ -19,10 +19,9 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; -import io.netty.channel.MessageList; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; @@ -73,15 +72,11 @@ public class AppletDiscardServer extends JApplet { } } - private static final class DiscardServerHandler extends ChannelInboundHandlerAdapter { + private static final class DiscardServerHandler extends ChannelInboundConsumingHandler { @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - MessageList bufs = msgs.cast(); - for (int i = 0; i < bufs.size(); i++) { - System.out.println("Received: " + bufs.get(i).toString(CharsetUtil.UTF_8)); - } - msgs.releaseAllAndRecycle(); + public void consume(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + System.out.println("Received: " + msg.toString(CharsetUtil.UTF_8)); } @Override diff --git a/example/src/main/java/io/netty/example/discard/DiscardServerHandler.java b/example/src/main/java/io/netty/example/discard/DiscardServerHandler.java index e2386fd2d0..3c3675a5ba 100644 --- a/example/src/main/java/io/netty/example/discard/DiscardServerHandler.java +++ b/example/src/main/java/io/netty/example/discard/DiscardServerHandler.java @@ -32,7 +32,6 @@ public class DiscardServerHandler extends ChannelInboundHandlerAdapter { @Override public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - // Discard the received data silently. msgs.releaseAllAndRecycle(); } diff --git a/example/src/main/java/io/netty/example/filetransfer/FileServer.java b/example/src/main/java/io/netty/example/filetransfer/FileServer.java index efe864a9a7..1c782c4f72 100644 --- a/example/src/main/java/io/netty/example/filetransfer/FileServer.java +++ b/example/src/main/java/io/netty/example/filetransfer/FileServer.java @@ -18,7 +18,7 @@ package io.netty.example.filetransfer; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.DefaultFileRegion; @@ -92,31 +92,23 @@ public class FileServer { new FileServer(port).run(); } - private static final class FileHandler extends ChannelInboundHandlerAdapter { + private static final class FileHandler extends ChannelInboundConsumingHandler { @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList messages) throws Exception { - MessageList msgs = messages.cast(); - MessageList out = MessageList.newInstance(); - - for (int i = 0; i < msgs.size(); i++) { - String msg = msgs.get(i); - File file = new File(msg); - if (file.exists()) { - if (!file.isFile()) { - ctx.write("Not a file: " + file + '\n'); - return; - } - ctx.write(file + " " + file.length() + '\n'); - FileRegion region = new DefaultFileRegion(new FileInputStream(file).getChannel(), 0, file.length()); - out.add(region); - out.add("\n"); - } else { - out.add("File not found: " + file + '\n'); + public void consume(ChannelHandlerContext ctx, String msg) throws Exception { + File file = new File(msg); + if (file.exists()) { + if (!file.isFile()) { + ctx.write("Not a file: " + file + '\n'); + return; } + MessageList out = MessageList.newInstance(); + ctx.write(file + " " + file.length() + '\n'); + FileRegion region = new DefaultFileRegion(new FileInputStream(file).getChannel(), 0, file.length()); + out.add(region); + out.add("\n"); + } else { + ctx.write("File not found: " + file + '\n'); } - - msgs.recycle(); - ctx.write(out); } @Override diff --git a/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java b/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java index a460789c13..1d7d45ebb9 100644 --- a/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java +++ b/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java @@ -19,7 +19,7 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.channel.MessageList; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.DefaultHttpResponse; @@ -98,104 +98,99 @@ import static io.netty.handler.codec.http.HttpVersion.*; * * */ -public class HttpStaticFileServerHandler extends ChannelInboundHandlerAdapter { +public class HttpStaticFileServerHandler extends ChannelInboundConsumingHandler { public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz"; public static final String HTTP_DATE_GMT_TIMEZONE = "GMT"; public static final int HTTP_CACHE_SECONDS = 60; @Override - public void messageReceived( - ChannelHandlerContext ctx, MessageList msgs) throws Exception { - MessageList requests = msgs.cast(); - for (int i = 0; i < requests.size(); i++) { - FullHttpRequest request = requests.get(i); - if (!request.getDecoderResult().isSuccess()) { - sendError(ctx, BAD_REQUEST); - continue; + public void consume( + ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { + if (!request.getDecoderResult().isSuccess()) { + sendError(ctx, BAD_REQUEST); + return; + } + + if (request.getMethod() != GET) { + sendError(ctx, METHOD_NOT_ALLOWED); + return; + } + + final String uri = request.getUri(); + final String path = sanitizeUri(uri); + if (path == null) { + sendError(ctx, FORBIDDEN); + return; + } + + File file = new File(path); + if (file.isHidden() || !file.exists()) { + sendError(ctx, NOT_FOUND); + return; + } + + if (file.isDirectory()) { + if (uri.endsWith("/")) { + sendListing(ctx, file); + } else { + sendRedirect(ctx, uri + '/'); } + return; + } - if (request.getMethod() != GET) { - sendError(ctx, METHOD_NOT_ALLOWED); - continue; - } + if (!file.isFile()) { + sendError(ctx, FORBIDDEN); + return; + } - final String uri = request.getUri(); - final String path = sanitizeUri(uri); - if (path == null) { - sendError(ctx, FORBIDDEN); - continue; - } + // Cache Validation + String ifModifiedSince = request.headers().get(IF_MODIFIED_SINCE); + if (ifModifiedSince != null && !ifModifiedSince.isEmpty()) { + SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US); + Date ifModifiedSinceDate = dateFormatter.parse(ifModifiedSince); - File file = new File(path); - if (file.isHidden() || !file.exists()) { - sendError(ctx, NOT_FOUND); - continue; - } - - if (file.isDirectory()) { - if (uri.endsWith("/")) { - sendListing(ctx, file); - } else { - sendRedirect(ctx, uri + '/'); - } - continue; - } - - if (!file.isFile()) { - sendError(ctx, FORBIDDEN); - continue; - } - - // Cache Validation - String ifModifiedSince = request.headers().get(IF_MODIFIED_SINCE); - if (ifModifiedSince != null && !ifModifiedSince.isEmpty()) { - SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US); - Date ifModifiedSinceDate = dateFormatter.parse(ifModifiedSince); - - // Only compare up to the second because the datetime format we send to the client - // does not have milliseconds - long ifModifiedSinceDateSeconds = ifModifiedSinceDate.getTime() / 1000; - long fileLastModifiedSeconds = file.lastModified() / 1000; - if (ifModifiedSinceDateSeconds == fileLastModifiedSeconds) { - sendNotModified(ctx); - continue; - } - } - - RandomAccessFile raf; - try { - raf = new RandomAccessFile(file, "r"); - } catch (FileNotFoundException fnfe) { - sendError(ctx, NOT_FOUND); - continue; - } - long fileLength = raf.length(); - - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); - setContentLength(response, fileLength); - setContentTypeHeader(response, file); - setDateAndCacheHeaders(response, file); - if (isKeepAlive(request)) { - response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); - } - - MessageList out = MessageList.newInstance(); - // Write the initial line and the header. - out.add(response); - // Write the content. - out.add(new ChunkedFile(raf, 0, fileLength, 8192)); - // Write the end marker - out.add(LastHttpContent.EMPTY_LAST_CONTENT); - - ChannelFuture writeFuture = ctx.write(out); - // Decide whether to close the connection or not. - if (!isKeepAlive(request)) { - // Close the connection when the whole content is written out. - writeFuture.addListener(ChannelFutureListener.CLOSE); + // Only compare up to the second because the datetime format we send to the client + // does not have milliseconds + long ifModifiedSinceDateSeconds = ifModifiedSinceDate.getTime() / 1000; + long fileLastModifiedSeconds = file.lastModified() / 1000; + if (ifModifiedSinceDateSeconds == fileLastModifiedSeconds) { + sendNotModified(ctx); + return; } } - msgs.releaseAllAndRecycle(); + + RandomAccessFile raf; + try { + raf = new RandomAccessFile(file, "r"); + } catch (FileNotFoundException fnfe) { + sendError(ctx, NOT_FOUND); + return; + } + long fileLength = raf.length(); + + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + setContentLength(response, fileLength); + setContentTypeHeader(response, file); + setDateAndCacheHeaders(response, file); + if (isKeepAlive(request)) { + response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); + } + + MessageList out = MessageList.newInstance(); + // Write the initial line and the header. + out.add(response); + // Write the content. + out.add(new ChunkedFile(raf, 0, fileLength, 8192)); + // Write the end marker + out.add(LastHttpContent.EMPTY_LAST_CONTENT); + + ChannelFuture writeFuture = ctx.write(out); + // Decide whether to close the connection or not. + if (!isKeepAlive(request)) { + // Close the connection when the whole content is written out. + writeFuture.addListener(ChannelFutureListener.CLOSE); + } } @Override diff --git a/example/src/main/java/io/netty/example/http/helloworld/HttpHelloWorldServerHandler.java b/example/src/main/java/io/netty/example/http/helloworld/HttpHelloWorldServerHandler.java index 50273f3976..6a68a7083c 100644 --- a/example/src/main/java/io/netty/example/http/helloworld/HttpHelloWorldServerHandler.java +++ b/example/src/main/java/io/netty/example/http/helloworld/HttpHelloWorldServerHandler.java @@ -19,7 +19,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.channel.MessageList; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpResponse; @@ -31,39 +31,44 @@ import static io.netty.handler.codec.http.HttpHeaders.*; import static io.netty.handler.codec.http.HttpResponseStatus.*; import static io.netty.handler.codec.http.HttpVersion.*; -public class HttpHelloWorldServerHandler extends ChannelInboundHandlerAdapter { +public class HttpHelloWorldServerHandler extends ChannelInboundConsumingHandler { private static final ByteBuf CONTENT = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hello World", CharsetUtil.US_ASCII)); + private MessageList out; @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - MessageList out = MessageList.newInstance(); - int size = msgs.size(); - for (int i = 0; i < size; i++) { - Object msg = msgs.get(i); - if (msg instanceof HttpRequest) { - HttpRequest req = (HttpRequest) msg; + protected void beginConsume(ChannelHandlerContext ctx) { + out = MessageList.newInstance(); + } - if (is100ContinueExpected(req)) { - out.add(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE)); - } - boolean keepAlive = isKeepAlive(req); - FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, CONTENT.duplicate()); - response.headers().set(CONTENT_TYPE, "text/plain"); - response.headers().set(CONTENT_LENGTH, response.content().readableBytes()); + @Override + protected void endConsume(ChannelHandlerContext ctx) { + ctx.write(out); + out = null; + } - if (!keepAlive) { - out.add(response); - ctx.write(out).addListener(ChannelFutureListener.CLOSE); - out = MessageList.newInstance(); - } else { - out.add(response); - response.headers().set(CONNECTION, Values.KEEP_ALIVE); - } + @Override + public void consume(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof HttpRequest) { + HttpRequest req = (HttpRequest) msg; + + if (is100ContinueExpected(req)) { + out.add(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE)); + } + boolean keepAlive = isKeepAlive(req); + FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, CONTENT.duplicate()); + response.headers().set(CONTENT_TYPE, "text/plain"); + response.headers().set(CONTENT_LENGTH, response.content().readableBytes()); + + if (!keepAlive) { + out.add(response); + ctx.write(out).addListener(ChannelFutureListener.CLOSE); + out = MessageList.newInstance(); + } else { + out.add(response); + response.headers().set(CONNECTION, Values.KEEP_ALIVE); } } - ctx.write(out); - msgs.releaseAllAndRecycle(); } @Override diff --git a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClientHandler.java b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClientHandler.java index 7262fde8d9..f70389e219 100644 --- a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClientHandler.java +++ b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClientHandler.java @@ -16,54 +16,50 @@ package io.netty.example.http.snoop; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.MessageList; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.LastHttpContent; import io.netty.util.CharsetUtil; -public class HttpSnoopClientHandler extends ChannelInboundHandlerAdapter { +public class HttpSnoopClientHandler extends ChannelInboundConsumingHandler { @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - for (int i = 0; i < msgs.size(); i++) { - Object msg = msgs.get(i); - if (msg instanceof HttpResponse) { - HttpResponse response = (HttpResponse) msg; + public void consume(ChannelHandlerContext ctx, HttpObject msg) throws Exception { + if (msg instanceof HttpResponse) { + HttpResponse response = (HttpResponse) msg; - System.out.println("STATUS: " + response.getStatus()); - System.out.println("VERSION: " + response.getProtocolVersion()); - System.out.println(); + System.out.println("STATUS: " + response.getStatus()); + System.out.println("VERSION: " + response.getProtocolVersion()); + System.out.println(); - if (!response.headers().isEmpty()) { - for (String name: response.headers().names()) { - for (String value: response.headers().getAll(name)) { - System.out.println("HEADER: " + name + " = " + value); - } + if (!response.headers().isEmpty()) { + for (String name: response.headers().names()) { + for (String value: response.headers().getAll(name)) { + System.out.println("HEADER: " + name + " = " + value); } - System.out.println(); - } - - if (HttpHeaders.isTransferEncodingChunked(response)) { - System.out.println("CHUNKED CONTENT {"); - } else { - System.out.println("CONTENT {"); } + System.out.println(); } - if (msg instanceof HttpContent) { - HttpContent content = (HttpContent) msg; - System.out.print(content.content().toString(CharsetUtil.UTF_8)); - System.out.flush(); - - if (content instanceof LastHttpContent) { - System.out.println("} END OF CONTENT"); - } + if (HttpHeaders.isTransferEncodingChunked(response)) { + System.out.println("CHUNKED CONTENT {"); + } else { + System.out.println("CONTENT {"); + } + } + if (msg instanceof HttpContent) { + HttpContent content = (HttpContent) msg; + + System.out.print(content.content().toString(CharsetUtil.UTF_8)); + System.out.flush(); + + if (content instanceof LastHttpContent) { + System.out.println("} END OF CONTENT"); } } - msgs.releaseAllAndRecycle(); } @Override diff --git a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServerHandler.java b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServerHandler.java index 178aebe984..62c4c8d4a9 100644 --- a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServerHandler.java +++ b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServerHandler.java @@ -134,7 +134,7 @@ public class HttpSnoopServerHandler extends ChannelInboundHandlerAdapter { buf.append("\r\n"); } - return writeResponse(ctx, trailer, out); + return writeResponse(trailer, out); } } return true; @@ -151,7 +151,7 @@ public class HttpSnoopServerHandler extends ChannelInboundHandlerAdapter { buf.append("\r\n"); } - private boolean writeResponse(ChannelHandlerContext ctx, HttpObject currentObj, MessageList out) { + private boolean writeResponse(HttpObject currentObj, MessageList out) { // Decide whether to close the connection or not. boolean keepAlive = isKeepAlive(request); // Build the response object. diff --git a/example/src/main/java/io/netty/example/http/upload/HttpUploadClientHandler.java b/example/src/main/java/io/netty/example/http/upload/HttpUploadClientHandler.java index 6c70bd6ebd..9919c008b6 100644 --- a/example/src/main/java/io/netty/example/http/upload/HttpUploadClientHandler.java +++ b/example/src/main/java/io/netty/example/http/upload/HttpUploadClientHandler.java @@ -16,10 +16,10 @@ package io.netty.example.http.upload; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.MessageList; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.LastHttpContent; import io.netty.util.CharsetUtil; @@ -29,54 +29,50 @@ import java.util.logging.Logger; /** * Handler that just dumps the contents of the response from the server */ -public class HttpUploadClientHandler extends ChannelInboundHandlerAdapter { +public class HttpUploadClientHandler extends ChannelInboundConsumingHandler { private static final Logger logger = Logger.getLogger(HttpUploadClientHandler.class.getName()); private boolean readingChunks; @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - for (int i = 0; i < msgs.size(); i++) { - Object msg = msgs.get(i); - if (msg instanceof HttpResponse) { - HttpResponse response = (HttpResponse) msg; + public void consume(ChannelHandlerContext ctx, HttpObject msg) throws Exception { + if (msg instanceof HttpResponse) { + HttpResponse response = (HttpResponse) msg; - logger.info("STATUS: " + response.getStatus()); - logger.info("VERSION: " + response.getProtocolVersion()); + logger.info("STATUS: " + response.getStatus()); + logger.info("VERSION: " + response.getProtocolVersion()); - if (!response.headers().isEmpty()) { - for (String name : response.headers().names()) { - for (String value : response.headers().getAll(name)) { - logger.info("HEADER: " + name + " = " + value); - } + if (!response.headers().isEmpty()) { + for (String name : response.headers().names()) { + for (String value : response.headers().getAll(name)) { + logger.info("HEADER: " + name + " = " + value); } } - - if (response.getStatus().code() == 200 && HttpHeaders.isTransferEncodingChunked(response)) { - readingChunks = true; - logger.info("CHUNKED CONTENT {"); - } else { - logger.info("CONTENT {"); - } } - if (msg instanceof HttpContent) { - HttpContent chunk = (HttpContent) msg; - logger.info(chunk.content().toString(CharsetUtil.UTF_8)); - if (chunk instanceof LastHttpContent) { - if (readingChunks) { - logger.info("} END OF CHUNKED CONTENT"); - } else { - logger.info("} END OF CONTENT"); - } - readingChunks = false; - } else { - logger.info(chunk.content().toString(CharsetUtil.UTF_8)); - } + if (response.getStatus().code() == 200 && HttpHeaders.isTransferEncodingChunked(response)) { + readingChunks = true; + logger.info("CHUNKED CONTENT {"); + } else { + logger.info("CONTENT {"); + } + } + if (msg instanceof HttpContent) { + HttpContent chunk = (HttpContent) msg; + logger.info(chunk.content().toString(CharsetUtil.UTF_8)); + + if (chunk instanceof LastHttpContent) { + if (readingChunks) { + logger.info("} END OF CHUNKED CONTENT"); + } else { + logger.info("} END OF CONTENT"); + } + readingChunks = false; + } else { + logger.info(chunk.content().toString(CharsetUtil.UTF_8)); } } - msgs.releaseAllAndRecycle(); } @Override diff --git a/example/src/main/java/io/netty/example/http/upload/HttpUploadServerHandler.java b/example/src/main/java/io/netty/example/http/upload/HttpUploadServerHandler.java index 63adb8523f..9f95c67df8 100644 --- a/example/src/main/java/io/netty/example/http/upload/HttpUploadServerHandler.java +++ b/example/src/main/java/io/netty/example/http/upload/HttpUploadServerHandler.java @@ -20,14 +20,14 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.MessageList; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.handler.codec.http.Cookie; import io.netty.handler.codec.http.CookieDecoder; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; @@ -62,7 +62,7 @@ import java.util.logging.Logger; import static io.netty.buffer.Unpooled.*; import static io.netty.handler.codec.http.HttpHeaders.Names.*; -public class HttpUploadServerHandler extends ChannelInboundHandlerAdapter { +public class HttpUploadServerHandler extends ChannelInboundConsumingHandler { private static final Logger logger = Logger.getLogger(HttpUploadServerHandler.class.getName()); @@ -96,114 +96,110 @@ public class HttpUploadServerHandler extends ChannelInboundHandlerAdapter { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - for (int i = 0; i < msgs.size(); i++) { - Object msg = msgs.get(i); - if (msg instanceof HttpRequest) { - HttpRequest request = this.request = (HttpRequest) msg; - URI uri = new URI(request.getUri()); - if (!uri.getPath().startsWith("/form")) { - // Write Menu - writeMenu(ctx); - return; + public void consume(ChannelHandlerContext ctx, HttpObject msg) throws Exception { + if (msg instanceof HttpRequest) { + HttpRequest request = this.request = (HttpRequest) msg; + URI uri = new URI(request.getUri()); + if (!uri.getPath().startsWith("/form")) { + // Write Menu + writeMenu(ctx); + return; + } + responseContent.setLength(0); + responseContent.append("WELCOME TO THE WILD WILD WEB SERVER\r\n"); + responseContent.append("===================================\r\n"); + + responseContent.append("VERSION: " + request.getProtocolVersion().text() + "\r\n"); + + responseContent.append("REQUEST_URI: " + request.getUri() + "\r\n\r\n"); + responseContent.append("\r\n\r\n"); + + // new getMethod + List> headers = request.headers().entries(); + for (Entry entry : headers) { + responseContent.append("HEADER: " + entry.getKey() + '=' + entry.getValue() + "\r\n"); + } + responseContent.append("\r\n\r\n"); + + // new getMethod + Set cookies; + String value = request.headers().get(COOKIE); + if (value == null) { + cookies = Collections.emptySet(); + } else { + cookies = CookieDecoder.decode(value); + } + for (Cookie cookie : cookies) { + responseContent.append("COOKIE: " + cookie.toString() + "\r\n"); + } + responseContent.append("\r\n\r\n"); + + QueryStringDecoder decoderQuery = new QueryStringDecoder(request.getUri()); + Map> uriAttributes = decoderQuery.parameters(); + for (Entry> attr: uriAttributes.entrySet()) { + for (String attrVal: attr.getValue()) { + responseContent.append("URI: " + attr.getKey() + '=' + attrVal + "\r\n"); } - responseContent.setLength(0); - responseContent.append("WELCOME TO THE WILD WILD WEB SERVER\r\n"); - responseContent.append("===================================\r\n"); + } + responseContent.append("\r\n\r\n"); - responseContent.append("VERSION: " + request.getProtocolVersion().text() + "\r\n"); + // if GET Method: should not try to create a HttpPostRequestDecoder + try { + decoder = new HttpPostRequestDecoder(factory, request); + } catch (ErrorDataDecoderException e1) { + e1.printStackTrace(); + responseContent.append(e1.getMessage()); + writeResponse(ctx.channel()); + ctx.channel().close(); + return; + } catch (IncompatibleDataDecoderException e1) { + // GET Method: should not try to create a HttpPostRequestDecoder + // So OK but stop here + responseContent.append(e1.getMessage()); + responseContent.append("\r\n\r\nEND OF GET CONTENT\r\n"); + writeResponse(ctx.channel()); + return; + } - responseContent.append("REQUEST_URI: " + request.getUri() + "\r\n\r\n"); - responseContent.append("\r\n\r\n"); + readingChunks = HttpHeaders.isTransferEncodingChunked(request); + responseContent.append("Is Chunked: " + readingChunks + "\r\n"); + responseContent.append("IsMultipart: " + decoder.isMultipart() + "\r\n"); + if (readingChunks) { + // Chunk version + responseContent.append("Chunks: "); + readingChunks = true; + } + } - // new getMethod - List> headers = request.headers().entries(); - for (Entry entry : headers) { - responseContent.append("HEADER: " + entry.getKey() + '=' + entry.getValue() + "\r\n"); - } - responseContent.append("\r\n\r\n"); - - // new getMethod - Set cookies; - String value = request.headers().get(COOKIE); - if (value == null) { - cookies = Collections.emptySet(); - } else { - cookies = CookieDecoder.decode(value); - } - for (Cookie cookie : cookies) { - responseContent.append("COOKIE: " + cookie.toString() + "\r\n"); - } - responseContent.append("\r\n\r\n"); - - QueryStringDecoder decoderQuery = new QueryStringDecoder(request.getUri()); - Map> uriAttributes = decoderQuery.parameters(); - for (Entry> attr: uriAttributes.entrySet()) { - for (String attrVal: attr.getValue()) { - responseContent.append("URI: " + attr.getKey() + '=' + attrVal + "\r\n"); - } - } - responseContent.append("\r\n\r\n"); - - // if GET Method: should not try to create a HttpPostRequestDecoder + // check if the decoder was constructed before + // if not it handles the form get + if (decoder != null) { + if (msg instanceof HttpContent) { + // New chunk is received + HttpContent chunk = (HttpContent) msg; try { - decoder = new HttpPostRequestDecoder(factory, request); + decoder.offer(chunk); } catch (ErrorDataDecoderException e1) { e1.printStackTrace(); responseContent.append(e1.getMessage()); writeResponse(ctx.channel()); ctx.channel().close(); return; - } catch (IncompatibleDataDecoderException e1) { - // GET Method: should not try to create a HttpPostRequestDecoder - // So OK but stop here - responseContent.append(e1.getMessage()); - responseContent.append("\r\n\r\nEND OF GET CONTENT\r\n"); + } + responseContent.append('o'); + // example of reading chunk by chunk (minimize memory usage due to + // Factory) + readHttpDataChunkByChunk(); + // example of reading only if at the end + if (chunk instanceof LastHttpContent) { + readHttpDataAllReceive(ctx.channel()); writeResponse(ctx.channel()); - return; - } + readingChunks = false; - readingChunks = HttpHeaders.isTransferEncodingChunked(request); - responseContent.append("Is Chunked: " + readingChunks + "\r\n"); - responseContent.append("IsMultipart: " + decoder.isMultipart() + "\r\n"); - if (readingChunks) { - // Chunk version - responseContent.append("Chunks: "); - readingChunks = true; - } - } - - // check if the decoder was constructed before - // if not it handles the form get - if (decoder != null) { - if (msg instanceof HttpContent) { - // New chunk is received - HttpContent chunk = (HttpContent) msg; - try { - decoder.offer(chunk); - } catch (ErrorDataDecoderException e1) { - e1.printStackTrace(); - responseContent.append(e1.getMessage()); - writeResponse(ctx.channel()); - ctx.channel().close(); - return; - } - responseContent.append('o'); - // example of reading chunk by chunk (minimize memory usage due to - // Factory) - readHttpDataChunkByChunk(); - // example of reading only if at the end - if (chunk instanceof LastHttpContent) { - readHttpDataAllReceive(ctx.channel()); - writeResponse(ctx.channel()); - readingChunks = false; - - reset(); - } + reset(); } } } - msgs.releaseAllAndRecycle(); } private void reset() { diff --git a/example/src/main/java/io/netty/example/http/websocketx/client/WebSocketClientHandler.java b/example/src/main/java/io/netty/example/http/websocketx/client/WebSocketClientHandler.java index d5c271362c..02e87185d9 100644 --- a/example/src/main/java/io/netty/example/http/websocketx/client/WebSocketClientHandler.java +++ b/example/src/main/java/io/netty/example/http/websocketx/client/WebSocketClientHandler.java @@ -40,9 +40,8 @@ package io.netty.example.http.websocketx.client; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.channel.ChannelPromise; -import io.netty.channel.MessageList; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; @@ -51,7 +50,7 @@ import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.util.CharsetUtil; -public class WebSocketClientHandler extends ChannelInboundHandlerAdapter { +public class WebSocketClientHandler extends ChannelInboundConsumingHandler { private final WebSocketClientHandshaker handshaker; private ChannelPromise handshakeFuture; @@ -80,35 +79,31 @@ public class WebSocketClientHandler extends ChannelInboundHandlerAdapter { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - for (int i = 0; i < msgs.size(); i++) { - Object msg = msgs.get(i); - Channel ch = ctx.channel(); - if (!handshaker.isHandshakeComplete()) { - handshaker.finishHandshake(ch, (FullHttpResponse) msg); - System.out.println("WebSocket Client connected!"); - handshakeFuture.setSuccess(); - continue; - } - - if (msg instanceof FullHttpResponse) { - FullHttpResponse response = (FullHttpResponse) msg; - throw new Exception("Unexpected FullHttpResponse (getStatus=" + response.getStatus() + ", content=" - + response.content().toString(CharsetUtil.UTF_8) + ')'); - } - - WebSocketFrame frame = (WebSocketFrame) msg; - if (frame instanceof TextWebSocketFrame) { - TextWebSocketFrame textFrame = (TextWebSocketFrame) frame; - System.out.println("WebSocket Client received message: " + textFrame.text()); - } else if (frame instanceof PongWebSocketFrame) { - System.out.println("WebSocket Client received pong"); - } else if (frame instanceof CloseWebSocketFrame) { - System.out.println("WebSocket Client received closing"); - ch.close(); - } + public void consume(ChannelHandlerContext ctx, Object msg) throws Exception { + Channel ch = ctx.channel(); + if (!handshaker.isHandshakeComplete()) { + handshaker.finishHandshake(ch, (FullHttpResponse) msg); + System.out.println("WebSocket Client connected!"); + handshakeFuture.setSuccess(); + return; + } + + if (msg instanceof FullHttpResponse) { + FullHttpResponse response = (FullHttpResponse) msg; + throw new Exception("Unexpected FullHttpResponse (getStatus=" + response.getStatus() + ", content=" + + response.content().toString(CharsetUtil.UTF_8) + ')'); + } + + WebSocketFrame frame = (WebSocketFrame) msg; + if (frame instanceof TextWebSocketFrame) { + TextWebSocketFrame textFrame = (TextWebSocketFrame) frame; + System.out.println("WebSocket Client received message: " + textFrame.text()); + } else if (frame instanceof PongWebSocketFrame) { + System.out.println("WebSocket Client received pong"); + } else if (frame instanceof CloseWebSocketFrame) { + System.out.println("WebSocket Client received closing"); + ch.close(); } - msgs.releaseAllAndRecycle(); } @Override diff --git a/example/src/main/java/io/netty/example/http/websocketx/server/WebSocketServerHandler.java b/example/src/main/java/io/netty/example/http/websocketx/server/WebSocketServerHandler.java index 0c6f130502..b9cbe975b0 100644 --- a/example/src/main/java/io/netty/example/http/websocketx/server/WebSocketServerHandler.java +++ b/example/src/main/java/io/netty/example/http/websocketx/server/WebSocketServerHandler.java @@ -20,8 +20,7 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.MessageList; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; @@ -46,7 +45,7 @@ import static io.netty.handler.codec.http.HttpVersion.*; /** * Handles handshakes and messages */ -public class WebSocketServerHandler extends ChannelInboundHandlerAdapter { +public class WebSocketServerHandler extends ChannelInboundConsumingHandler { private static final Logger logger = Logger.getLogger(WebSocketServerHandler.class.getName()); private static final String WEBSOCKET_PATH = "/websocket"; @@ -54,16 +53,12 @@ public class WebSocketServerHandler extends ChannelInboundHandlerAdapter { private WebSocketServerHandshaker handshaker; @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - for (int i = 0; i < msgs.size(); i++) { - Object msg = msgs.get(i); - if (msg instanceof FullHttpRequest) { - handleHttpRequest(ctx, (FullHttpRequest) msg); - } else if (msg instanceof WebSocketFrame) { - handleWebSocketFrame(ctx, (WebSocketFrame) msg); - } + public void consume(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof FullHttpRequest) { + handleHttpRequest(ctx, (FullHttpRequest) msg); + } else if (msg instanceof WebSocketFrame) { + handleWebSocketFrame(ctx, (WebSocketFrame) msg); } - msgs.releaseAllAndRecycle(); } private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception { diff --git a/example/src/main/java/io/netty/example/http/websocketx/sslserver/WebSocketSslServerHandler.java b/example/src/main/java/io/netty/example/http/websocketx/sslserver/WebSocketSslServerHandler.java index 050dd6200e..fa4af6a0cf 100644 --- a/example/src/main/java/io/netty/example/http/websocketx/sslserver/WebSocketSslServerHandler.java +++ b/example/src/main/java/io/netty/example/http/websocketx/sslserver/WebSocketSslServerHandler.java @@ -20,8 +20,7 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.MessageList; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.example.http.websocketx.server.WebSocketServerIndexPage; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; @@ -47,7 +46,7 @@ import static io.netty.handler.codec.http.HttpVersion.*; /** * Handles handshakes and messages */ -public class WebSocketSslServerHandler extends ChannelInboundHandlerAdapter { +public class WebSocketSslServerHandler extends ChannelInboundConsumingHandler { private static final Logger logger = Logger.getLogger(WebSocketSslServerHandler.class.getName()); private static final String WEBSOCKET_PATH = "/websocket"; @@ -55,16 +54,12 @@ public class WebSocketSslServerHandler extends ChannelInboundHandlerAdapter { private WebSocketServerHandshaker handshaker; @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - for (int i = 0; i < msgs.size(); i++) { - Object msg = msgs.get(i); - if (msg instanceof FullHttpRequest) { - handleHttpRequest(ctx, (FullHttpRequest) msg); - } else if (msg instanceof WebSocketFrame) { - handleWebSocketFrame(ctx, (WebSocketFrame) msg); - } + public void consume(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof FullHttpRequest) { + handleHttpRequest(ctx, (FullHttpRequest) msg); + } else if (msg instanceof WebSocketFrame) { + handleWebSocketFrame(ctx, (WebSocketFrame) msg); } - msgs.releaseAllAndRecycle(); } private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception { diff --git a/example/src/main/java/io/netty/example/localecho/LocalEchoClientHandler.java b/example/src/main/java/io/netty/example/localecho/LocalEchoClientHandler.java index c10774b644..92796dee13 100644 --- a/example/src/main/java/io/netty/example/localecho/LocalEchoClientHandler.java +++ b/example/src/main/java/io/netty/example/localecho/LocalEchoClientHandler.java @@ -16,18 +16,14 @@ package io.netty.example.localecho; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.MessageList; +import io.netty.channel.ChannelInboundConsumingHandler; -public class LocalEchoClientHandler extends ChannelInboundHandlerAdapter { +public class LocalEchoClientHandler extends ChannelInboundConsumingHandler { @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - for (int i = 0; i < msgs.size(); i++) { - // Print as received - System.out.println(msgs.get(i)); - } - msgs.releaseAllAndRecycle(); + public void consume(ChannelHandlerContext ctx, Object msg) throws Exception { + // Print as received + System.out.println(msg); } @Override diff --git a/example/src/main/java/io/netty/example/localecho/LocalEchoServerHandler.java b/example/src/main/java/io/netty/example/localecho/LocalEchoServerHandler.java index 861e62d01f..bb4c334d1c 100644 --- a/example/src/main/java/io/netty/example/localecho/LocalEchoServerHandler.java +++ b/example/src/main/java/io/netty/example/localecho/LocalEchoServerHandler.java @@ -24,7 +24,7 @@ public class LocalEchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { // Write back as received - ctx.write(msgs.copy()); + ctx.write(msgs); } @Override diff --git a/example/src/main/java/io/netty/example/qotm/QuoteOfTheMomentClientHandler.java b/example/src/main/java/io/netty/example/qotm/QuoteOfTheMomentClientHandler.java index 640a6a8b13..ff9d56a423 100644 --- a/example/src/main/java/io/netty/example/qotm/QuoteOfTheMomentClientHandler.java +++ b/example/src/main/java/io/netty/example/qotm/QuoteOfTheMomentClientHandler.java @@ -16,30 +16,23 @@ package io.netty.example.qotm; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.MessageList; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.channel.socket.DatagramPacket; import io.netty.util.CharsetUtil; -public class QuoteOfTheMomentClientHandler extends ChannelInboundHandlerAdapter { +public class QuoteOfTheMomentClientHandler extends ChannelInboundConsumingHandler { @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - MessageList packets = msgs.cast(); - for (int i = 0; i < packets.size(); i++) { - String response = packets.get(i).content().toString(CharsetUtil.UTF_8); - if (response.startsWith("QOTM: ")) { - System.out.println("Quote of the Moment: " + response.substring(6)); - ctx.close(); - } + public void consume(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception { + String response = msg.content().toString(CharsetUtil.UTF_8); + if (response.startsWith("QOTM: ")) { + System.out.println("Quote of the Moment: " + response.substring(6)); + ctx.close(); } - msgs.releaseAllAndRecycle(); } @Override - public void exceptionCaught( - ChannelHandlerContext ctx, Throwable cause) - throws Exception { + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } diff --git a/example/src/main/java/io/netty/example/qotm/QuoteOfTheMomentServerHandler.java b/example/src/main/java/io/netty/example/qotm/QuoteOfTheMomentServerHandler.java index ed0c54c487..530fb44709 100644 --- a/example/src/main/java/io/netty/example/qotm/QuoteOfTheMomentServerHandler.java +++ b/example/src/main/java/io/netty/example/qotm/QuoteOfTheMomentServerHandler.java @@ -17,14 +17,13 @@ package io.netty.example.qotm; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.MessageList; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.channel.socket.DatagramPacket; import io.netty.util.CharsetUtil; import java.util.Random; -public class QuoteOfTheMomentServerHandler extends ChannelInboundHandlerAdapter { +public class QuoteOfTheMomentServerHandler extends ChannelInboundConsumingHandler { private static final Random random = new Random(); @@ -45,17 +44,12 @@ public class QuoteOfTheMomentServerHandler extends ChannelInboundHandlerAdapter } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - MessageList packets = msgs.cast(); - for (int i = 0; i < packets.size(); i++) { - DatagramPacket packet = packets.get(i); - System.err.println(packet); - if ("QOTM?".equals(packet.content().toString(CharsetUtil.UTF_8))) { - ctx.write(new DatagramPacket( - Unpooled.copiedBuffer("QOTM: " + nextQuote(), CharsetUtil.UTF_8), packet.sender())); - } + public void consume(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception { + System.err.println(packet); + if ("QOTM?".equals(packet.content().toString(CharsetUtil.UTF_8))) { + ctx.write(new DatagramPacket( + Unpooled.copiedBuffer("QOTM: " + nextQuote(), CharsetUtil.UTF_8), packet.sender())); } - msgs.releaseAllAndRecycle(); } @Override diff --git a/example/src/main/java/io/netty/example/rxtx/RxtxClientHandler.java b/example/src/main/java/io/netty/example/rxtx/RxtxClientHandler.java index 34885c32e4..b2bf5deb46 100644 --- a/example/src/main/java/io/netty/example/rxtx/RxtxClientHandler.java +++ b/example/src/main/java/io/netty/example/rxtx/RxtxClientHandler.java @@ -16,10 +16,9 @@ package io.netty.example.rxtx; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.MessageList; +import io.netty.channel.ChannelInboundConsumingHandler; -public class RxtxClientHandler extends ChannelInboundHandlerAdapter { +public class RxtxClientHandler extends ChannelInboundConsumingHandler { @Override public void channelActive(ChannelHandlerContext ctx) { @@ -27,16 +26,12 @@ public class RxtxClientHandler extends ChannelInboundHandlerAdapter { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - for (int i = 0; i < msgs.size(); i++) { - String msg = msgs.get(i).toString(); - if ("OK".equals(msg)) { - System.out.println("Serial port responded to AT"); - } else { - System.out.println("Serial port responded with not-OK: " + msg); - } + public void consume(ChannelHandlerContext ctx, String msg) throws Exception { + if ("OK".equals(msg)) { + System.out.println("Serial port responded to AT"); + } else { + System.out.println("Serial port responded with not-OK: " + msg); } - msgs.releaseAllAndRecycle(); ctx.close(); } } diff --git a/example/src/main/java/io/netty/example/securechat/SecureChatClientHandler.java b/example/src/main/java/io/netty/example/securechat/SecureChatClientHandler.java index 99efe31b72..5e3946d715 100644 --- a/example/src/main/java/io/netty/example/securechat/SecureChatClientHandler.java +++ b/example/src/main/java/io/netty/example/securechat/SecureChatClientHandler.java @@ -16,8 +16,7 @@ package io.netty.example.securechat; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.MessageList; +import io.netty.channel.ChannelInboundConsumingHandler; import java.util.logging.Level; import java.util.logging.Logger; @@ -25,17 +24,14 @@ import java.util.logging.Logger; /** * Handles a client-side channel. */ -public class SecureChatClientHandler extends ChannelInboundHandlerAdapter { +public class SecureChatClientHandler extends ChannelInboundConsumingHandler { private static final Logger logger = Logger.getLogger( SecureChatClientHandler.class.getName()); @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - for (int i = 0; i < msgs.size(); i++) { - System.err.println(msgs.get(i)); - } - msgs.releaseAllAndRecycle(); + public void consume(ChannelHandlerContext ctx, String msg) throws Exception { + System.err.println(msg); } @Override diff --git a/example/src/main/java/io/netty/example/securechat/SecureChatServerHandler.java b/example/src/main/java/io/netty/example/securechat/SecureChatServerHandler.java index 2c2b28ece0..137629e36a 100644 --- a/example/src/main/java/io/netty/example/securechat/SecureChatServerHandler.java +++ b/example/src/main/java/io/netty/example/securechat/SecureChatServerHandler.java @@ -17,8 +17,7 @@ package io.netty.example.securechat; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.MessageList; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.ssl.SslHandler; @@ -33,7 +32,7 @@ import java.util.logging.Logger; /** * Handles a server-side channel. */ -public class SecureChatServerHandler extends ChannelInboundHandlerAdapter { +public class SecureChatServerHandler extends ChannelInboundConsumingHandler { private static final Logger logger = Logger.getLogger( SecureChatServerHandler.class.getName()); @@ -62,26 +61,21 @@ public class SecureChatServerHandler extends ChannelInboundHandlerAdapter { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList requests) throws Exception { - MessageList msgs = requests.cast(); - for (int i = 0; i < msgs.size(); i++) { - String msg = msgs.get(i); - // Send the received message to all channels but the current one. - for (Channel c: channels) { - if (c != ctx.channel()) { - c.write("[" + ctx.channel().remoteAddress() + "] " + - msg + '\n'); - } else { - c.write("[you] " + msg + '\n'); - } - } - - // Close the connection if the client has sent 'bye'. - if ("bye".equals(msg.toLowerCase())) { - ctx.close(); + public void consume(ChannelHandlerContext ctx, String msg) throws Exception { + // Send the received message to all channels but the current one. + for (Channel c: channels) { + if (c != ctx.channel()) { + c.write("[" + ctx.channel().remoteAddress() + "] " + + msg + '\n'); + } else { + c.write("[you] " + msg + '\n'); } } - msgs.releaseAllAndRecycle(); + + // Close the connection if the client has sent 'bye'. + if ("bye".equals(msg.toLowerCase())) { + ctx.close(); + } } @Override diff --git a/example/src/main/java/io/netty/example/telnet/TelnetServerHandler.java b/example/src/main/java/io/netty/example/telnet/TelnetServerHandler.java index e871a6d326..8bac77a20f 100644 --- a/example/src/main/java/io/netty/example/telnet/TelnetServerHandler.java +++ b/example/src/main/java/io/netty/example/telnet/TelnetServerHandler.java @@ -19,8 +19,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.MessageList; +import io.netty.channel.ChannelInboundConsumingHandler; import java.net.InetAddress; import java.util.Date; @@ -31,10 +30,9 @@ import java.util.logging.Logger; * Handles a server-side channel. */ @Sharable -public class TelnetServerHandler extends ChannelInboundHandlerAdapter { +public class TelnetServerHandler extends ChannelInboundConsumingHandler { - private static final Logger logger = Logger.getLogger( - TelnetServerHandler.class.getName()); + private static final Logger logger = Logger.getLogger(TelnetServerHandler.class.getName()); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { @@ -45,34 +43,29 @@ public class TelnetServerHandler extends ChannelInboundHandlerAdapter { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - MessageList requests = msgs.cast(); - for (int i = 0; i < requests.size(); i++) { - String request = requests.get(i); + public void consume(ChannelHandlerContext ctx, String request) throws Exception { - // Generate and write a response. - String response; - boolean close = false; - if (request.isEmpty()) { - response = "Please type something.\r\n"; - } else if ("bye".equals(request.toLowerCase())) { - response = "Have a good day!\r\n"; - close = true; - } else { - response = "Did you say '" + request + "'?\r\n"; - } - - // We do not need to write a ChannelBuffer here. - // We know the encoder inserted at TelnetPipelineFactory will do the conversion. - ChannelFuture future = ctx.write(response); - - // Close the connection after sending 'Have a good day!' - // if the client has sent 'bye'. - if (close) { - future.addListener(ChannelFutureListener.CLOSE); - } + // Generate and write a response. + String response; + boolean close = false; + if (request.isEmpty()) { + response = "Please type something.\r\n"; + } else if ("bye".equals(request.toLowerCase())) { + response = "Have a good day!\r\n"; + close = true; + } else { + response = "Did you say '" + request + "'?\r\n"; + } + + // We do not need to write a ChannelBuffer here. + // We know the encoder inserted at TelnetPipelineFactory will do the conversion. + ChannelFuture future = ctx.write(response); + + // Close the connection after sending 'Have a good day!' + // if the client has sent 'bye'. + if (close) { + future.addListener(ChannelFutureListener.CLOSE); } - msgs.releaseAllAndRecycle(); } @Override diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/sctp/SctpEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/sctp/SctpEchoTest.java index dd15f1df7e..d926145053 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/sctp/SctpEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/sctp/SctpEchoTest.java @@ -21,9 +21,8 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.channel.ChannelInitializer; -import io.netty.channel.MessageList; import io.netty.channel.sctp.SctpChannel; import io.netty.handler.codec.sctp.SctpInboundByteStreamHandler; import io.netty.handler.codec.sctp.SctpMessageCompletionHandler; @@ -139,40 +138,35 @@ public class SctpEchoTest extends AbstractSctpTest { } } - private static class EchoHandler extends ChannelInboundHandlerAdapter { + private static class EchoHandler extends ChannelInboundConsumingHandler { volatile Channel channel; final AtomicReference exception = new AtomicReference(); volatile int counter; @Override - public void channelActive(ChannelHandlerContext ctx) - throws Exception { + public void channelActive(ChannelHandlerContext ctx) throws Exception { channel = ctx.channel(); } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - for (int j = 0; j < msgs.size(); j ++) { - ByteBuf in = (ByteBuf) msgs.get(j); - byte[] actual = new byte[in.readableBytes()]; - in.readBytes(actual); + public void consume(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + byte[] actual = new byte[in.readableBytes()]; + in.readBytes(actual); - int lastIdx = counter; - for (int i = 0; i < actual.length; i++) { - assertEquals(data[i + lastIdx], actual[i]); - } - - if (channel.parent() != null) { - channel.write(Unpooled.wrappedBuffer(actual)); - } - - counter += actual.length; + int lastIdx = counter; + for (int i = 0; i < actual.length; i++) { + assertEquals(data[i + lastIdx], actual[i]); } + + if (channel.parent() != null) { + channel.write(Unpooled.wrappedBuffer(actual)); + } + + counter += actual.length; } @Override - public void exceptionCaught(ChannelHandlerContext ctx, - Throwable cause) throws Exception { + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (exception.compareAndSet(null, cause)) { ctx.close(); } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramMulticastTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramMulticastTest.java index 5a59cc8615..15f891b8cb 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramMulticastTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramMulticastTest.java @@ -19,6 +19,7 @@ import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelOption; import io.netty.channel.MessageList; @@ -92,20 +93,19 @@ public class DatagramMulticastTest extends AbstractDatagramTest { cc.close().awaitUninterruptibly(); } - private static final class MulticastTestHandler extends ChannelInboundHandlerAdapter { + private static final class MulticastTestHandler extends ChannelInboundConsumingHandler { private final CountDownLatch latch = new CountDownLatch(1); private boolean done; private volatile boolean fail; @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - if (done || msgs.size() != 1) { + protected void consume(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception { + if (done) { fail = true; } - assertEquals(1, ((DatagramPacket) msgs.get(0)).content().readInt()); - msgs.releaseAllAndRecycle(); + assertEquals(1, msg.content().readInt()); latch.countDown(); diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java index 4ad4c9320b..6aea30d91d 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java @@ -19,6 +19,7 @@ import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.MessageList; import io.netty.channel.socket.DatagramPacket; @@ -39,12 +40,10 @@ public class DatagramUnicastTest extends AbstractDatagramTest { public void testSimpleSend(Bootstrap sb, Bootstrap cb) throws Throwable { final CountDownLatch latch = new CountDownLatch(1); - sb.handler(new ChannelInboundHandlerAdapter() { + sb.handler(new ChannelInboundConsumingHandler() { @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - assertEquals(1, msgs.size()); - assertEquals(1, ((DatagramPacket) msgs.get(0)).content().readInt()); - msgs.releaseAllAndRecycle(); + public void consume(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception { + assertEquals(1, msg.content().readInt()); latch.countDown(); } }); diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketBufReleaseTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketBufReleaseTest.java index ae02ab802d..3032baed69 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketBufReleaseTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketBufReleaseTest.java @@ -97,6 +97,7 @@ public class SocketBufReleaseTest extends AbstractSocketTest { @Override public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { + // discard msgs.releaseAllAndRecycle(); } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java index 226a48a904..03733f25d7 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java @@ -21,9 +21,8 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.channel.ChannelInitializer; -import io.netty.channel.MessageList; import io.netty.channel.socket.SocketChannel; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup; @@ -181,7 +180,7 @@ public class SocketEchoTest extends AbstractSocketTest { } } - private static class EchoHandler extends ChannelInboundHandlerAdapter { + private static class EchoHandler extends ChannelInboundConsumingHandler { volatile Channel channel; final AtomicReference exception = new AtomicReference(); volatile int counter; @@ -193,24 +192,20 @@ public class SocketEchoTest extends AbstractSocketTest { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - for (int j = 0; j < msgs.size(); j ++) { - ByteBuf in = (ByteBuf) msgs.get(j); - byte[] actual = new byte[in.readableBytes()]; - in.readBytes(actual); + public void consume(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + byte[] actual = new byte[in.readableBytes()]; + in.readBytes(actual); - int lastIdx = counter; - for (int i = 0; i < actual.length; i ++) { - assertEquals(data[i + lastIdx], actual[i]); - } - - if (channel.parent() != null) { - channel.write(Unpooled.wrappedBuffer(actual)); - } - - counter += actual.length; + int lastIdx = counter; + for (int i = 0; i < actual.length; i ++) { + assertEquals(data[i + lastIdx], actual[i]); } - msgs.releaseAllAndRecycle(); + + if (channel.parent() != null) { + channel.write(Unpooled.wrappedBuffer(actual)); + } + + counter += actual.length; } @Override diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFileRegionTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFileRegionTest.java index 7d52878508..093e1894a7 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFileRegionTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFileRegionTest.java @@ -20,6 +20,7 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.DefaultFileRegion; @@ -63,7 +64,7 @@ public class SocketFileRegionTest extends AbstractSocketTest { testFileRegion0(sb, cb, true); } - private void testFileRegion0(ServerBootstrap sb, Bootstrap cb, boolean voidPromise) throws Throwable { + private static void testFileRegion0(ServerBootstrap sb, Bootstrap cb, boolean voidPromise) throws Throwable { File file = File.createTempFile("netty-", ".tmp"); file.deleteOnExit(); @@ -72,10 +73,8 @@ public class SocketFileRegionTest extends AbstractSocketTest { out.close(); ChannelInboundHandler ch = new ChannelInboundHandlerAdapter() { - @Override public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - // discard msgs.releaseAllAndRecycle(); } @@ -123,7 +122,7 @@ public class SocketFileRegionTest extends AbstractSocketTest { } } - private static class TestHandler extends ChannelInboundHandlerAdapter { + private static class TestHandler extends ChannelInboundConsumingHandler { volatile Channel channel; final AtomicReference exception = new AtomicReference(); volatile int counter; @@ -135,19 +134,15 @@ public class SocketFileRegionTest extends AbstractSocketTest { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - for (int j = 0; j < msgs.size(); j ++) { - ByteBuf in = (ByteBuf) msgs.get(j); - byte[] actual = new byte[in.readableBytes()]; - in.readBytes(actual); + public void consume(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + byte[] actual = new byte[in.readableBytes()]; + in.readBytes(actual); - int lastIdx = counter; - for (int i = 0; i < actual.length; i ++) { - assertEquals(data[i + lastIdx], actual[i]); - } - counter += actual.length; + int lastIdx = counter; + for (int i = 0; i < actual.length; i ++) { + assertEquals(data[i + lastIdx], actual[i]); } - msgs.releaseAllAndRecycle(); + counter += actual.length; } @Override diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFixedLengthEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFixedLengthEchoTest.java index d25528fcf0..604a7494a1 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFixedLengthEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFixedLengthEchoTest.java @@ -21,9 +21,8 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.channel.ChannelInitializer; -import io.netty.channel.MessageList; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.FixedLengthFrameDecoder; import org.junit.Test; @@ -124,44 +123,37 @@ public class SocketFixedLengthEchoTest extends AbstractSocketTest { } } - private static class EchoHandler extends ChannelInboundHandlerAdapter { + private static class EchoHandler extends ChannelInboundConsumingHandler { volatile Channel channel; final AtomicReference exception = new AtomicReference(); volatile int counter; @Override - public void channelActive(ChannelHandlerContext ctx) - throws Exception { + public void channelActive(ChannelHandlerContext ctx) throws Exception { channel = ctx.channel(); } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - for (int j = 0; j < msgs.size(); j ++) { - ByteBuf msg = (ByteBuf) msgs.get(j); - assertEquals(1024, msg.readableBytes()); + public void consume(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + assertEquals(1024, msg.readableBytes()); - byte[] actual = new byte[msg.readableBytes()]; - msg.getBytes(0, actual); + byte[] actual = new byte[msg.readableBytes()]; + msg.getBytes(0, actual); - int lastIdx = counter; - for (int i = 0; i < actual.length; i ++) { - assertEquals(data[i + lastIdx], actual[i]); - } - - if (channel.parent() != null) { - channel.write(msg.retain()); - } - - counter += actual.length; + int lastIdx = counter; + for (int i = 0; i < actual.length; i ++) { + assertEquals(data[i + lastIdx], actual[i]); } - msgs.releaseAllAndRecycle(); + + if (channel.parent() != null) { + channel.write(msg.retain()); + } + + counter += actual.length; } @Override - public void exceptionCaught( - ChannelHandlerContext ctx, Throwable cause) - throws Exception { + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (exception.compareAndSet(null, cause)) { ctx.close(); } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketGatheringWriteTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketGatheringWriteTest.java index f4dc9b310e..848a4b4a0d 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketGatheringWriteTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketGatheringWriteTest.java @@ -22,7 +22,7 @@ import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.channel.MessageList; import org.junit.Test; @@ -124,7 +124,7 @@ public class SocketGatheringWriteTest extends AbstractSocketTest { assertEquals(Unpooled.wrappedBuffer(data), sh.received); } - private static class TestHandler extends ChannelInboundHandlerAdapter { + private static class TestHandler extends ChannelInboundConsumingHandler { volatile Channel channel; final AtomicReference exception = new AtomicReference(); volatile int counter; @@ -136,13 +136,9 @@ public class SocketGatheringWriteTest extends AbstractSocketTest { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - for (int j = 0; j < msgs.size(); j ++) { - ByteBuf in = (ByteBuf) msgs.get(j); - counter += in.readableBytes(); - received.writeBytes(in); - } - msgs.releaseAllAndRecycle(); + public void consume(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + counter += in.readableBytes(); + received.writeBytes(in); } @Override diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketObjectEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketObjectEchoTest.java index d660df2784..92cb46eee2 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketObjectEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketObjectEchoTest.java @@ -160,7 +160,7 @@ public class SocketObjectEchoTest extends AbstractSocketTest { counter ++; } - msgs.releaseAllAndRecycle(); + msgs.recycle(); } @Override diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketShutdownOutputByPeerTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketShutdownOutputByPeerTest.java index f2b30237db..03588ec25d 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketShutdownOutputByPeerTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketShutdownOutputByPeerTest.java @@ -18,9 +18,8 @@ package io.netty.testsuite.transport.socket; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.channel.ChannelOption; -import io.netty.channel.MessageList; import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.socket.SocketChannel; import org.junit.Test; @@ -110,7 +109,7 @@ public class SocketShutdownOutputByPeerTest extends AbstractServerSocketTest { } } - private static class TestHandler extends ChannelInboundHandlerAdapter { + private static class TestHandler extends ChannelInboundConsumingHandler { volatile SocketChannel ch; final BlockingQueue queue = new LinkedBlockingQueue(); final CountDownLatch halfClosure = new CountDownLatch(1); @@ -128,11 +127,8 @@ public class SocketShutdownOutputByPeerTest extends AbstractServerSocketTest { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - for (int i = 0; i < msgs.size(); i ++) { - queue.offer(((ByteBuf) msgs.get(i)).readByte()); - } - msgs.releaseAllAndRecycle(); + public void consume(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + queue.offer(msg.readByte()); } @Override diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketShutdownOutputBySelfTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketShutdownOutputBySelfTest.java index a895c6846e..86b9f2395e 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketShutdownOutputBySelfTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketShutdownOutputBySelfTest.java @@ -19,8 +19,7 @@ import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.MessageList; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.channel.socket.SocketChannel; import org.junit.Test; @@ -77,7 +76,7 @@ public class SocketShutdownOutputBySelfTest extends AbstractClientSocketTest { } } - private static class TestHandler extends ChannelInboundHandlerAdapter { + private static class TestHandler extends ChannelInboundConsumingHandler { volatile SocketChannel ch; final BlockingQueue queue = new LinkedBlockingQueue(); @@ -87,11 +86,8 @@ public class SocketShutdownOutputBySelfTest extends AbstractClientSocketTest { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - for (int i = 0; i < msgs.size(); i ++) { - queue.offer(((ByteBuf) msgs.get(i)).readByte()); - } - msgs.releaseAllAndRecycle(); + public void consume(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + queue.offer(msg.readByte()); } } } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSpdyEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSpdyEchoTest.java index 19b15bfbcb..1b65f22a25 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSpdyEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSpdyEchoTest.java @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.MessageList; @@ -245,7 +246,7 @@ public class SocketSpdyEchoTest extends AbstractSocketTest { } } - private static class SpdyEchoTestClientHandler extends ChannelInboundHandlerAdapter { + private static class SpdyEchoTestClientHandler extends ChannelInboundConsumingHandler { final AtomicReference exception = new AtomicReference(); final ByteBuf frames; volatile int counter; @@ -255,20 +256,16 @@ public class SocketSpdyEchoTest extends AbstractSocketTest { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - for (int j = 0; j < msgs.size(); j ++) { - ByteBuf in = (ByteBuf) msgs.get(j); - byte[] actual = new byte[in.readableBytes()]; - in.readBytes(actual); + public void consume(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + byte[] actual = new byte[in.readableBytes()]; + in.readBytes(actual); - int lastIdx = counter; - for (int i = 0; i < actual.length; i ++) { - assertEquals(frames.getByte(ignoredBytes + i + lastIdx), actual[i]); - } - - counter += actual.length; + int lastIdx = counter; + for (int i = 0; i < actual.length; i ++) { + assertEquals(frames.getByte(ignoredBytes + i + lastIdx), actual[i]); } - msgs.releaseAllAndRecycle(); + + counter += actual.length; } @Override diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java index 3e75430eda..5d36c9aa03 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java @@ -21,9 +21,8 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.channel.ChannelInitializer; -import io.netty.channel.MessageList; import io.netty.channel.socket.SocketChannel; import io.netty.handler.ssl.SslHandler; import io.netty.handler.stream.ChunkedWriteHandler; @@ -162,7 +161,7 @@ public class SocketSslEchoTest extends AbstractSocketTest { } } - private class EchoHandler extends ChannelInboundHandlerAdapter { + private class EchoHandler extends ChannelInboundConsumingHandler { volatile Channel channel; final AtomicReference exception = new AtomicReference(); volatile int counter; @@ -179,24 +178,20 @@ public class SocketSslEchoTest extends AbstractSocketTest { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - for (int j = 0; j < msgs.size(); j ++) { - ByteBuf in = (ByteBuf) msgs.get(j); - byte[] actual = new byte[in.readableBytes()]; - in.readBytes(actual); + public void consume(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + byte[] actual = new byte[in.readableBytes()]; + in.readBytes(actual); - int lastIdx = counter; - for (int i = 0; i < actual.length; i ++) { - assertEquals(data[i + lastIdx], actual[i]); - } - - if (channel.parent() != null) { - channel.write(Unpooled.wrappedBuffer(actual)); - } - - counter += actual.length; + int lastIdx = counter; + for (int i = 0; i < actual.length; i ++) { + assertEquals(data[i + lastIdx], actual[i]); } - msgs.releaseAllAndRecycle(); + + if (channel.parent() != null) { + channel.write(Unpooled.wrappedBuffer(actual)); + } + + counter += actual.length; } @Override diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java index 7a5b9467d3..5fa8806492 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java @@ -19,10 +19,9 @@ import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; -import io.netty.channel.MessageList; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; @@ -143,7 +142,7 @@ public class SocketStartTlsTest extends AbstractSocketTest { } } - private class StartTlsClientHandler extends ChannelInboundHandlerAdapter { + private class StartTlsClientHandler extends ChannelInboundConsumingHandler { private final SslHandler sslHandler; private Future handshakeFuture; final AtomicReference exception = new AtomicReference(); @@ -160,22 +159,18 @@ public class SocketStartTlsTest extends AbstractSocketTest { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - for (int i = 0; i < msgs.size(); i ++) { - String msg = (String) msgs.get(i); - if ("StartTlsResponse".equals(msg)) { - ctx.pipeline().addAfter("logger", "ssl", sslHandler); - handshakeFuture = sslHandler.handshakeFuture(); - ctx.write("EncryptedRequest\n"); - continue; - } - - assertEquals("EncryptedResponse", msg); - assertNotNull(handshakeFuture); - assertTrue(handshakeFuture.isSuccess()); - ctx.close(); + public void consume(ChannelHandlerContext ctx, String msg) throws Exception { + if ("StartTlsResponse".equals(msg)) { + ctx.pipeline().addAfter("logger", "ssl", sslHandler); + handshakeFuture = sslHandler.handshakeFuture(); + ctx.write("EncryptedRequest\n"); + return; } - msgs.releaseAllAndRecycle(); + + assertEquals("EncryptedResponse", msg); + assertNotNull(handshakeFuture); + assertTrue(handshakeFuture.isSuccess()); + ctx.close(); } @Override @@ -190,7 +185,7 @@ public class SocketStartTlsTest extends AbstractSocketTest { } } - private class StartTlsServerHandler extends ChannelInboundHandlerAdapter { + private class StartTlsServerHandler extends ChannelInboundConsumingHandler { private final SslHandler sslHandler; volatile Channel channel; final AtomicReference exception = new AtomicReference(); @@ -206,19 +201,15 @@ public class SocketStartTlsTest extends AbstractSocketTest { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - for (int i = 0; i < msgs.size(); i ++) { - String msg = (String) msgs.get(i); - if ("StartTlsRequest".equals(msg)) { - ctx.pipeline().addAfter("logger", "ssl", sslHandler); - ctx.write("StartTlsResponse\n"); - continue; - } - - assertEquals("EncryptedRequest", msg); - ctx.write("EncryptedResponse\n"); + public void consume(ChannelHandlerContext ctx, String msg) throws Exception { + if ("StartTlsRequest".equals(msg)) { + ctx.pipeline().addAfter("logger", "ssl", sslHandler); + ctx.write("StartTlsResponse\n"); + return; } - msgs.releaseAllAndRecycle(); + + assertEquals("EncryptedRequest", msg); + ctx.write("EncryptedResponse\n"); } @Override diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStringEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStringEchoTest.java index 9cb3b2a8f9..f108b8b8b0 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStringEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStringEchoTest.java @@ -19,9 +19,8 @@ import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.channel.ChannelInitializer; -import io.netty.channel.MessageList; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; @@ -136,36 +135,30 @@ public class SocketStringEchoTest extends AbstractSocketTest { } } - static class StringEchoHandler extends ChannelInboundHandlerAdapter { + static class StringEchoHandler extends ChannelInboundConsumingHandler { volatile Channel channel; final AtomicReference exception = new AtomicReference(); volatile int counter; @Override - public void channelActive(ChannelHandlerContext ctx) - throws Exception { + public void channelActive(ChannelHandlerContext ctx) throws Exception { channel = ctx.channel(); } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - for (int i = 0; i < msgs.size(); i ++) { - String msg = (String) msgs.get(i); - assertEquals(data[counter], msg); + public void consume(ChannelHandlerContext ctx, String msg) throws Exception { + assertEquals(data[counter], msg); - if (channel.parent() != null) { - String delimiter = random.nextBoolean() ? "\r\n" : "\n"; - channel.write(msg + delimiter); - } - - counter ++; + if (channel.parent() != null) { + String delimiter = random.nextBoolean() ? "\r\n" : "\n"; + channel.write(msg + delimiter); } - msgs.releaseAllAndRecycle(); + + counter ++; } @Override - public void exceptionCaught(ChannelHandlerContext ctx, - Throwable cause) throws Exception { + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (exception.compareAndSet(null, cause)) { ctx.close(); } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/udt/UDTClientServerConnectionTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/udt/UDTClientServerConnectionTest.java index 2384e428d8..1a14a93f5f 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/udt/UDTClientServerConnectionTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/udt/UDTClientServerConnectionTest.java @@ -19,6 +19,7 @@ import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; @@ -138,8 +139,7 @@ public class UDTClientServerConnectionTest { } } - static class ClientHandler extends - ChannelInboundHandlerAdapter { + static class ClientHandler extends ChannelInboundConsumingHandler { static final Logger log = LoggerFactory.getLogger(ClientHandler.class); @@ -169,11 +169,8 @@ public class UDTClientServerConnectionTest { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - for (int i = 0; i < msgs.size(); i ++) { - log.info("Client received: " + msgs.get(i)); - } - msgs.releaseAllAndRecycle(); + public void consume(ChannelHandlerContext ctx, Object msg) throws Exception { + log.info("Client received: " + msg); } } diff --git a/transport-udt/src/test/java/io/netty/test/udt/util/EchoByteHandler.java b/transport-udt/src/test/java/io/netty/test/udt/util/EchoByteHandler.java index ab839cd103..3a43bef517 100644 --- a/transport-udt/src/test/java/io/netty/test/udt/util/EchoByteHandler.java +++ b/transport-udt/src/test/java/io/netty/test/udt/util/EchoByteHandler.java @@ -70,7 +70,6 @@ public class EchoByteHandler extends ChannelInboundHandlerAdapter { if (meter != null) { meter.mark(buf.readableBytes()); } - buf.retain(); } ctx.write(msgs); } diff --git a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java index b6d51873d9..1b526f2f16 100644 --- a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java @@ -19,12 +19,11 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; -import io.netty.channel.MessageList; import io.netty.channel.ServerChannel; import io.netty.channel.socket.SocketChannel; import io.netty.util.AttributeKey; @@ -218,7 +217,7 @@ public final class ServerBootstrap extends AbstractBootstrap { private final EventLoopGroup childGroup; private final ChannelHandler childHandler; @@ -237,33 +236,29 @@ public final class ServerBootstrap extends AbstractBootstrap msgs) { - int size = msgs.size(); - for (int i = 0; i < size; i ++) { - Channel child = (Channel) msgs.get(i); - child.pipeline().addLast(childHandler); - - for (Entry, Object> e: childOptions) { - try { - if (!child.config().setOption((ChannelOption) e.getKey(), e.getValue())) { - logger.warn("Unknown channel option: " + e); - } - } catch (Throwable t) { - logger.warn("Failed to set a channel option: " + child, t); - } - } - - for (Entry, Object> e: childAttrs) { - child.attr((AttributeKey) e.getKey()).set(e.getValue()); - } + public void consume(ChannelHandlerContext ctx, Channel child) { + child.pipeline().addLast(childHandler); + for (Entry, Object> e: childOptions) { try { - childGroup.register(child); + if (!child.config().setOption((ChannelOption) e.getKey(), e.getValue())) { + logger.warn("Unknown channel option: " + e); + } } catch (Throwable t) { - child.unsafe().closeForcibly(); - logger.warn("Failed to register an accepted channel: " + child, t); + logger.warn("Failed to set a channel option: " + child, t); } } + + for (Entry, Object> e: childAttrs) { + child.attr((AttributeKey) e.getKey()).set(e.getValue()); + } + + try { + childGroup.register(child); + } catch (Throwable t) { + child.unsafe().closeForcibly(); + logger.warn("Failed to register an accepted channel: " + child, t); + } } @Override diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundConsumingHandler.java b/transport/src/main/java/io/netty/channel/ChannelInboundConsumingHandler.java new file mode 100644 index 0000000000..92ade0b42a --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ChannelInboundConsumingHandler.java @@ -0,0 +1,85 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +import io.netty.util.ReferenceCountUtil; +import io.netty.util.ReferenceCounted; + + +/** + * Abstract base class for {@link ChannelInboundHandler} that would like to consume messages. This means they will + * actually handle them and not pass them to the next handler in the {@link ChannelPipeline}. + * + * If you need to pass them throught the {@link ChannelPipeline} use {@link ChannelInboundHandlerAdapter}. + */ +public abstract class ChannelInboundConsumingHandler extends ChannelInboundHandlerAdapter { + + @Override + public final void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { + try { + beginConsume(ctx); + MessageList cast = msgs.cast(); + int size = cast.size(); + for (int i = 0; i < size; i++) { + consume(ctx, cast.get(i)); + } + } finally { + try { + msgs.releaseAllAndRecycle(); + } finally { + endConsume(ctx); + } + } + } + + /** + * Is called before consume of messages start. + * + * @param ctx The {@link ChannelHandlerContext} which is bound to this + * {@link ChannelInboundConsumingHandler} + */ + protected void beginConsume(ChannelHandlerContext ctx) { + // NOOP + } + + /** + * Is called after consume of messages ends. + * + * @param ctx The {@link ChannelHandlerContext} which is bound to this + * {@link ChannelInboundConsumingHandler} + */ + protected void endConsume(ChannelHandlerContext ctx) { + // NOOP + } + + /** + * Consume the message. After this method was executed for all of the messages in the {@link MessageList} + * {@link MessageList#releaseAllAndRecycle()} is called and so the {@link MessageList} is recycled and + * {@link ReferenceCounted#release()} is called on all messages that implement {@link ReferenceCounted}. + * + * Be aware that because of this you must not hold a reference to a message or to the {@link MessageList} after + * this method returns. If you really need to hold a reference to a message, use + * {@link ReferenceCountUtil#retain(Object)} on it to increment the reference count and so make sure its not + * released. + * + * + * @param ctx The {@link ChannelHandlerContext} which is bound to this + * {@link ChannelInboundConsumingHandler} + * @param msg The mesage to consume and handle + * @throws Exception thrown if an error accours + */ + protected abstract void consume(ChannelHandlerContext ctx, I msg) throws Exception; +} diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java index edd0856f6e..1e469a0bb5 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java @@ -15,13 +15,21 @@ */ package io.netty.channel; - /** * Abstract base class for {@link ChannelInboundHandler} implementations which provide * implementations of all of their methods. * + *

* This implementation just forward the operation to the next {@link ChannelHandler} in the * {@link ChannelPipeline}. Sub-classes may override a method implementation to change this. + *

+ *

+ * Be aware that messages are not released after the {@link #messageReceived(ChannelHandlerContext, MessageList)} + * method returns automatically. This is done for make it as flexible as possible and get the most out of + * performance. Because of this you need to explicit call {@link MessageList#releaseAllAndRecycle()} if you + * consumed all the messages. Because this is such a common need {@link ChannelInboundConsumingHandler} is provided , + * which will automatically release messages and the {@link MessageList} after processing is done. + *

*/ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler { diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java index 54862aea62..e0dc398453 100755 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java @@ -310,6 +310,7 @@ public class EmbeddedChannel extends AbstractChannel { for (int i = 0; i < size; i ++) { lastInboundBuffer.add(msgs.get(i)); } + msgs.recycle(); } @Override diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index beb6fa0a36..99f4be3bc0 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -61,7 +61,7 @@ public class DefaultChannelPipelineTest { @Override public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - // Swallow. + msgs.releaseAllAndRecycle(); } }); diff --git a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java index b501ebab59..81ef0de262 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java @@ -20,10 +20,9 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundConsumingHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; -import io.netty.channel.MessageList; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import org.junit.Test; @@ -141,13 +140,10 @@ public class LocalChannelTest { clientGroup.terminationFuture().sync(); } - static class TestHandler extends ChannelInboundHandlerAdapter { + static class TestHandler extends ChannelInboundConsumingHandler { @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - final int size = msgs.size(); - for (int i = 0; i < size; i ++) { - logger.info(String.format("Received mesage: %s", msgs.get(i))); - } + public void consume(ChannelHandlerContext ctx, Object msg) throws Exception { + logger.info(String.format("Received mesage: %s", msg)); } } } diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java index 29381476df..2047268c32 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java @@ -56,10 +56,11 @@ public class LocalTransportThreadModelTest { @Override public void initChannel(LocalChannel ch) throws Exception { ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { - @Override - public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) { - // Discard - } + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) { + // Discard + msgs.releaseAllAndRecycle(); + } }); } }); diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java index 174bf7e041..aea62af6fd 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java @@ -114,7 +114,7 @@ public class LocalTransportThreadModelTest2 { @Override public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { count.addAndGet(msgs.size()); - msgs.recycle(); + msgs.releaseAllAndRecycle(); } } } diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java index 8c8d39b06c..21206ad6dc 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java @@ -71,9 +71,9 @@ public class LocalTransportThreadModelTest3 { public void initChannel(LocalChannel ch) throws Exception { ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override - public void messageReceived( - ChannelHandlerContext ctx, MessageList msgs) throws Exception { + public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) { // Discard + msgs.releaseAllAndRecycle(); } }); } diff --git a/transport/src/test/java/io/netty/channel/nio/NioDatagramChannelTest.java b/transport/src/test/java/io/netty/channel/nio/NioDatagramChannelTest.java index 7f20b876ae..f3131e5e64 100644 --- a/transport/src/test/java/io/netty/channel/nio/NioDatagramChannelTest.java +++ b/transport/src/test/java/io/netty/channel/nio/NioDatagramChannelTest.java @@ -46,9 +46,9 @@ public class NioDatagramChannelTest { .option(ChannelOption.SO_BROADCAST, true) .handler(new ChannelInboundHandlerAdapter() { @Override - public void messageReceived( - ChannelHandlerContext ctx, MessageList msgs) throws Exception { - // noop + public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) { + // Discard + msgs.releaseAllAndRecycle(); } }); DatagramChannel datagramChannel = (DatagramChannel) udpBootstrap