Add ChannelInboundConsumingHandler

..which is useful when the handler is placed at the last position of the
pipeline because it releases the received messages automatically.
This commit is contained in:
Norman Maurer 2013-06-14 21:44:30 +02:00 committed by Trustin Lee
parent 6c5c119f8c
commit bfc9c6d80d
48 changed files with 674 additions and 732 deletions

View File

@ -17,7 +17,7 @@ package io.netty.handler.codec.http.websocketx;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.MessageList; import io.netty.channel.MessageList;
@ -156,13 +156,13 @@ public class WebSocketServerProtocolHandlerTest {
} }
} }
private static class CustomTextFrameHandler extends ChannelInboundHandlerAdapter { private static class CustomTextFrameHandler extends ChannelInboundConsumingHandler<TextWebSocketFrame> {
private String content; private String content;
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
assertEquals(1, msgs.size()); assertNull(content);
content = "processed: " + ((TextWebSocketFrame) msgs.get(0)).text(); content = "processed: " + msg.text();
} }
String getContent() { String getContent() {

View File

@ -19,10 +19,9 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.MessageList;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
@ -73,15 +72,11 @@ public class AppletDiscardServer extends JApplet {
} }
} }
private static final class DiscardServerHandler extends ChannelInboundHandlerAdapter { private static final class DiscardServerHandler extends ChannelInboundConsumingHandler<ByteBuf> {
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
MessageList<ByteBuf> bufs = msgs.cast(); System.out.println("Received: " + msg.toString(CharsetUtil.UTF_8));
for (int i = 0; i < bufs.size(); i++) {
System.out.println("Received: " + bufs.get(i).toString(CharsetUtil.UTF_8));
}
msgs.releaseAllAndRecycle();
} }
@Override @Override

View File

@ -32,7 +32,6 @@ public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
// Discard the received data silently.
msgs.releaseAllAndRecycle(); msgs.releaseAllAndRecycle();
} }

View File

@ -18,7 +18,7 @@ package io.netty.example.filetransfer;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultFileRegion; import io.netty.channel.DefaultFileRegion;
@ -92,33 +92,25 @@ public class FileServer {
new FileServer(port).run(); new FileServer(port).run();
} }
private static final class FileHandler extends ChannelInboundHandlerAdapter { private static final class FileHandler extends ChannelInboundConsumingHandler<String> {
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> messages) throws Exception { public void consume(ChannelHandlerContext ctx, String msg) throws Exception {
MessageList<String> msgs = messages.cast();
MessageList<Object> out = MessageList.newInstance();
for (int i = 0; i < msgs.size(); i++) {
String msg = msgs.get(i);
File file = new File(msg); File file = new File(msg);
if (file.exists()) { if (file.exists()) {
if (!file.isFile()) { if (!file.isFile()) {
ctx.write("Not a file: " + file + '\n'); ctx.write("Not a file: " + file + '\n');
return; return;
} }
MessageList<Object> out = MessageList.newInstance();
ctx.write(file + " " + file.length() + '\n'); ctx.write(file + " " + file.length() + '\n');
FileRegion region = new DefaultFileRegion(new FileInputStream(file).getChannel(), 0, file.length()); FileRegion region = new DefaultFileRegion(new FileInputStream(file).getChannel(), 0, file.length());
out.add(region); out.add(region);
out.add("\n"); out.add("\n");
} else { } else {
out.add("File not found: " + file + '\n'); ctx.write("File not found: " + file + '\n');
} }
} }
msgs.recycle();
ctx.write(out);
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace(); cause.printStackTrace();

View File

@ -19,7 +19,7 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.MessageList; import io.netty.channel.MessageList;
import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.DefaultHttpResponse;
@ -98,39 +98,36 @@ import static io.netty.handler.codec.http.HttpVersion.*;
* *
* </pre> * </pre>
*/ */
public class HttpStaticFileServerHandler extends ChannelInboundHandlerAdapter { public class HttpStaticFileServerHandler extends ChannelInboundConsumingHandler<FullHttpRequest> {
public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz"; public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz";
public static final String HTTP_DATE_GMT_TIMEZONE = "GMT"; public static final String HTTP_DATE_GMT_TIMEZONE = "GMT";
public static final int HTTP_CACHE_SECONDS = 60; public static final int HTTP_CACHE_SECONDS = 60;
@Override @Override
public void messageReceived( public void consume(
ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
MessageList<FullHttpRequest> requests = msgs.cast();
for (int i = 0; i < requests.size(); i++) {
FullHttpRequest request = requests.get(i);
if (!request.getDecoderResult().isSuccess()) { if (!request.getDecoderResult().isSuccess()) {
sendError(ctx, BAD_REQUEST); sendError(ctx, BAD_REQUEST);
continue; return;
} }
if (request.getMethod() != GET) { if (request.getMethod() != GET) {
sendError(ctx, METHOD_NOT_ALLOWED); sendError(ctx, METHOD_NOT_ALLOWED);
continue; return;
} }
final String uri = request.getUri(); final String uri = request.getUri();
final String path = sanitizeUri(uri); final String path = sanitizeUri(uri);
if (path == null) { if (path == null) {
sendError(ctx, FORBIDDEN); sendError(ctx, FORBIDDEN);
continue; return;
} }
File file = new File(path); File file = new File(path);
if (file.isHidden() || !file.exists()) { if (file.isHidden() || !file.exists()) {
sendError(ctx, NOT_FOUND); sendError(ctx, NOT_FOUND);
continue; return;
} }
if (file.isDirectory()) { if (file.isDirectory()) {
@ -139,12 +136,12 @@ public class HttpStaticFileServerHandler extends ChannelInboundHandlerAdapter {
} else { } else {
sendRedirect(ctx, uri + '/'); sendRedirect(ctx, uri + '/');
} }
continue; return;
} }
if (!file.isFile()) { if (!file.isFile()) {
sendError(ctx, FORBIDDEN); sendError(ctx, FORBIDDEN);
continue; return;
} }
// Cache Validation // Cache Validation
@ -159,7 +156,7 @@ public class HttpStaticFileServerHandler extends ChannelInboundHandlerAdapter {
long fileLastModifiedSeconds = file.lastModified() / 1000; long fileLastModifiedSeconds = file.lastModified() / 1000;
if (ifModifiedSinceDateSeconds == fileLastModifiedSeconds) { if (ifModifiedSinceDateSeconds == fileLastModifiedSeconds) {
sendNotModified(ctx); sendNotModified(ctx);
continue; return;
} }
} }
@ -168,7 +165,7 @@ public class HttpStaticFileServerHandler extends ChannelInboundHandlerAdapter {
raf = new RandomAccessFile(file, "r"); raf = new RandomAccessFile(file, "r");
} catch (FileNotFoundException fnfe) { } catch (FileNotFoundException fnfe) {
sendError(ctx, NOT_FOUND); sendError(ctx, NOT_FOUND);
continue; return;
} }
long fileLength = raf.length(); long fileLength = raf.length();
@ -195,8 +192,6 @@ public class HttpStaticFileServerHandler extends ChannelInboundHandlerAdapter {
writeFuture.addListener(ChannelFutureListener.CLOSE); writeFuture.addListener(ChannelFutureListener.CLOSE);
} }
} }
msgs.releaseAllAndRecycle();
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

View File

@ -19,7 +19,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.MessageList; import io.netty.channel.MessageList;
import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.FullHttpResponse;
@ -31,16 +31,24 @@ import static io.netty.handler.codec.http.HttpHeaders.*;
import static io.netty.handler.codec.http.HttpResponseStatus.*; import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpVersion.*; import static io.netty.handler.codec.http.HttpVersion.*;
public class HttpHelloWorldServerHandler extends ChannelInboundHandlerAdapter { public class HttpHelloWorldServerHandler extends ChannelInboundConsumingHandler<Object> {
private static final ByteBuf CONTENT = private static final ByteBuf CONTENT =
Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hello World", CharsetUtil.US_ASCII)); Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hello World", CharsetUtil.US_ASCII));
private MessageList<Object> out;
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { protected void beginConsume(ChannelHandlerContext ctx) {
MessageList<Object> out = MessageList.newInstance(); out = MessageList.newInstance();
int size = msgs.size(); }
for (int i = 0; i < size; i++) {
Object msg = msgs.get(i); @Override
protected void endConsume(ChannelHandlerContext ctx) {
ctx.write(out);
out = null;
}
@Override
public void consume(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpRequest) { if (msg instanceof HttpRequest) {
HttpRequest req = (HttpRequest) msg; HttpRequest req = (HttpRequest) msg;
@ -62,9 +70,6 @@ public class HttpHelloWorldServerHandler extends ChannelInboundHandlerAdapter {
} }
} }
} }
ctx.write(out);
msgs.releaseAllAndRecycle();
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

View File

@ -16,20 +16,18 @@
package io.netty.example.http.snoop; package io.netty.example.http.snoop;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.MessageList;
import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
public class HttpSnoopClientHandler extends ChannelInboundHandlerAdapter { public class HttpSnoopClientHandler extends ChannelInboundConsumingHandler<HttpObject> {
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
for (int i = 0; i < msgs.size(); i++) {
Object msg = msgs.get(i);
if (msg instanceof HttpResponse) { if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse) msg; HttpResponse response = (HttpResponse) msg;
@ -63,8 +61,6 @@ public class HttpSnoopClientHandler extends ChannelInboundHandlerAdapter {
} }
} }
} }
msgs.releaseAllAndRecycle();
}
@Override @Override
public void exceptionCaught( public void exceptionCaught(

View File

@ -134,7 +134,7 @@ public class HttpSnoopServerHandler extends ChannelInboundHandlerAdapter {
buf.append("\r\n"); buf.append("\r\n");
} }
return writeResponse(ctx, trailer, out); return writeResponse(trailer, out);
} }
} }
return true; return true;
@ -151,7 +151,7 @@ public class HttpSnoopServerHandler extends ChannelInboundHandlerAdapter {
buf.append("\r\n"); buf.append("\r\n");
} }
private boolean writeResponse(ChannelHandlerContext ctx, HttpObject currentObj, MessageList<Object> out) { private boolean writeResponse(HttpObject currentObj, MessageList<Object> out) {
// Decide whether to close the connection or not. // Decide whether to close the connection or not.
boolean keepAlive = isKeepAlive(request); boolean keepAlive = isKeepAlive(request);
// Build the response object. // Build the response object.

View File

@ -16,10 +16,10 @@
package io.netty.example.http.upload; package io.netty.example.http.upload;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.MessageList;
import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
@ -29,16 +29,14 @@ import java.util.logging.Logger;
/** /**
* Handler that just dumps the contents of the response from the server * Handler that just dumps the contents of the response from the server
*/ */
public class HttpUploadClientHandler extends ChannelInboundHandlerAdapter { public class HttpUploadClientHandler extends ChannelInboundConsumingHandler<HttpObject> {
private static final Logger logger = Logger.getLogger(HttpUploadClientHandler.class.getName()); private static final Logger logger = Logger.getLogger(HttpUploadClientHandler.class.getName());
private boolean readingChunks; private boolean readingChunks;
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
for (int i = 0; i < msgs.size(); i++) {
Object msg = msgs.get(i);
if (msg instanceof HttpResponse) { if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse) msg; HttpResponse response = (HttpResponse) msg;
@ -76,8 +74,6 @@ public class HttpUploadClientHandler extends ChannelInboundHandlerAdapter {
} }
} }
} }
msgs.releaseAllAndRecycle();
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

View File

@ -20,14 +20,14 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.MessageList;
import io.netty.handler.codec.http.Cookie; import io.netty.handler.codec.http.Cookie;
import io.netty.handler.codec.http.CookieDecoder; import io.netty.handler.codec.http.CookieDecoder;
import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.HttpVersion;
@ -62,7 +62,7 @@ import java.util.logging.Logger;
import static io.netty.buffer.Unpooled.*; import static io.netty.buffer.Unpooled.*;
import static io.netty.handler.codec.http.HttpHeaders.Names.*; import static io.netty.handler.codec.http.HttpHeaders.Names.*;
public class HttpUploadServerHandler extends ChannelInboundHandlerAdapter { public class HttpUploadServerHandler extends ChannelInboundConsumingHandler<HttpObject> {
private static final Logger logger = Logger.getLogger(HttpUploadServerHandler.class.getName()); private static final Logger logger = Logger.getLogger(HttpUploadServerHandler.class.getName());
@ -96,9 +96,7 @@ public class HttpUploadServerHandler extends ChannelInboundHandlerAdapter {
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
for (int i = 0; i < msgs.size(); i++) {
Object msg = msgs.get(i);
if (msg instanceof HttpRequest) { if (msg instanceof HttpRequest) {
HttpRequest request = this.request = (HttpRequest) msg; HttpRequest request = this.request = (HttpRequest) msg;
URI uri = new URI(request.getUri()); URI uri = new URI(request.getUri());
@ -203,8 +201,6 @@ public class HttpUploadServerHandler extends ChannelInboundHandlerAdapter {
} }
} }
} }
msgs.releaseAllAndRecycle();
}
private void reset() { private void reset() {
request = null; request = null;

View File

@ -40,9 +40,8 @@ package io.netty.example.http.websocketx.client;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.MessageList;
import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
@ -51,7 +50,7 @@ import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
public class WebSocketClientHandler extends ChannelInboundHandlerAdapter { public class WebSocketClientHandler extends ChannelInboundConsumingHandler<Object> {
private final WebSocketClientHandshaker handshaker; private final WebSocketClientHandshaker handshaker;
private ChannelPromise handshakeFuture; private ChannelPromise handshakeFuture;
@ -80,15 +79,13 @@ public class WebSocketClientHandler extends ChannelInboundHandlerAdapter {
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, Object msg) throws Exception {
for (int i = 0; i < msgs.size(); i++) {
Object msg = msgs.get(i);
Channel ch = ctx.channel(); Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) { if (!handshaker.isHandshakeComplete()) {
handshaker.finishHandshake(ch, (FullHttpResponse) msg); handshaker.finishHandshake(ch, (FullHttpResponse) msg);
System.out.println("WebSocket Client connected!"); System.out.println("WebSocket Client connected!");
handshakeFuture.setSuccess(); handshakeFuture.setSuccess();
continue; return;
} }
if (msg instanceof FullHttpResponse) { if (msg instanceof FullHttpResponse) {
@ -108,8 +105,6 @@ public class WebSocketClientHandler extends ChannelInboundHandlerAdapter {
ch.close(); ch.close();
} }
} }
msgs.releaseAllAndRecycle();
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

View File

@ -20,8 +20,7 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.MessageList;
import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.FullHttpResponse;
@ -46,7 +45,7 @@ import static io.netty.handler.codec.http.HttpVersion.*;
/** /**
* Handles handshakes and messages * Handles handshakes and messages
*/ */
public class WebSocketServerHandler extends ChannelInboundHandlerAdapter { public class WebSocketServerHandler extends ChannelInboundConsumingHandler<Object> {
private static final Logger logger = Logger.getLogger(WebSocketServerHandler.class.getName()); private static final Logger logger = Logger.getLogger(WebSocketServerHandler.class.getName());
private static final String WEBSOCKET_PATH = "/websocket"; private static final String WEBSOCKET_PATH = "/websocket";
@ -54,17 +53,13 @@ public class WebSocketServerHandler extends ChannelInboundHandlerAdapter {
private WebSocketServerHandshaker handshaker; private WebSocketServerHandshaker handshaker;
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, Object msg) throws Exception {
for (int i = 0; i < msgs.size(); i++) {
Object msg = msgs.get(i);
if (msg instanceof FullHttpRequest) { if (msg instanceof FullHttpRequest) {
handleHttpRequest(ctx, (FullHttpRequest) msg); handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) { } else if (msg instanceof WebSocketFrame) {
handleWebSocketFrame(ctx, (WebSocketFrame) msg); handleWebSocketFrame(ctx, (WebSocketFrame) msg);
} }
} }
msgs.releaseAllAndRecycle();
}
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception { private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
// Handle a bad request. // Handle a bad request.

View File

@ -20,8 +20,7 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.MessageList;
import io.netty.example.http.websocketx.server.WebSocketServerIndexPage; import io.netty.example.http.websocketx.server.WebSocketServerIndexPage;
import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpRequest;
@ -47,7 +46,7 @@ import static io.netty.handler.codec.http.HttpVersion.*;
/** /**
* Handles handshakes and messages * Handles handshakes and messages
*/ */
public class WebSocketSslServerHandler extends ChannelInboundHandlerAdapter { public class WebSocketSslServerHandler extends ChannelInboundConsumingHandler<Object> {
private static final Logger logger = Logger.getLogger(WebSocketSslServerHandler.class.getName()); private static final Logger logger = Logger.getLogger(WebSocketSslServerHandler.class.getName());
private static final String WEBSOCKET_PATH = "/websocket"; private static final String WEBSOCKET_PATH = "/websocket";
@ -55,17 +54,13 @@ public class WebSocketSslServerHandler extends ChannelInboundHandlerAdapter {
private WebSocketServerHandshaker handshaker; private WebSocketServerHandshaker handshaker;
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, Object msg) throws Exception {
for (int i = 0; i < msgs.size(); i++) {
Object msg = msgs.get(i);
if (msg instanceof FullHttpRequest) { if (msg instanceof FullHttpRequest) {
handleHttpRequest(ctx, (FullHttpRequest) msg); handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) { } else if (msg instanceof WebSocketFrame) {
handleWebSocketFrame(ctx, (WebSocketFrame) msg); handleWebSocketFrame(ctx, (WebSocketFrame) msg);
} }
} }
msgs.releaseAllAndRecycle();
}
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception { private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
// Handle a bad request. // Handle a bad request.

View File

@ -16,18 +16,14 @@
package io.netty.example.localecho; package io.netty.example.localecho;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.MessageList;
public class LocalEchoClientHandler extends ChannelInboundHandlerAdapter { public class LocalEchoClientHandler extends ChannelInboundConsumingHandler<Object> {
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, Object msg) throws Exception {
for (int i = 0; i < msgs.size(); i++) {
// Print as received // Print as received
System.out.println(msgs.get(i)); System.out.println(msg);
}
msgs.releaseAllAndRecycle();
} }
@Override @Override

View File

@ -24,7 +24,7 @@ public class LocalEchoServerHandler extends ChannelInboundHandlerAdapter {
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
// Write back as received // Write back as received
ctx.write(msgs.copy()); ctx.write(msgs);
} }
@Override @Override

View File

@ -16,30 +16,23 @@
package io.netty.example.qotm; package io.netty.example.qotm;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.MessageList;
import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
public class QuoteOfTheMomentClientHandler extends ChannelInboundHandlerAdapter { public class QuoteOfTheMomentClientHandler extends ChannelInboundConsumingHandler<DatagramPacket> {
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
MessageList<DatagramPacket> packets = msgs.cast(); String response = msg.content().toString(CharsetUtil.UTF_8);
for (int i = 0; i < packets.size(); i++) {
String response = packets.get(i).content().toString(CharsetUtil.UTF_8);
if (response.startsWith("QOTM: ")) { if (response.startsWith("QOTM: ")) {
System.out.println("Quote of the Moment: " + response.substring(6)); System.out.println("Quote of the Moment: " + response.substring(6));
ctx.close(); ctx.close();
} }
} }
msgs.releaseAllAndRecycle();
}
@Override @Override
public void exceptionCaught( public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace(); cause.printStackTrace();
ctx.close(); ctx.close();
} }

View File

@ -17,14 +17,13 @@ package io.netty.example.qotm;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.MessageList;
import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import java.util.Random; import java.util.Random;
public class QuoteOfTheMomentServerHandler extends ChannelInboundHandlerAdapter { public class QuoteOfTheMomentServerHandler extends ChannelInboundConsumingHandler<DatagramPacket> {
private static final Random random = new Random(); private static final Random random = new Random();
@ -45,18 +44,13 @@ public class QuoteOfTheMomentServerHandler extends ChannelInboundHandlerAdapter
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
MessageList<DatagramPacket> packets = msgs.cast();
for (int i = 0; i < packets.size(); i++) {
DatagramPacket packet = packets.get(i);
System.err.println(packet); System.err.println(packet);
if ("QOTM?".equals(packet.content().toString(CharsetUtil.UTF_8))) { if ("QOTM?".equals(packet.content().toString(CharsetUtil.UTF_8))) {
ctx.write(new DatagramPacket( ctx.write(new DatagramPacket(
Unpooled.copiedBuffer("QOTM: " + nextQuote(), CharsetUtil.UTF_8), packet.sender())); Unpooled.copiedBuffer("QOTM: " + nextQuote(), CharsetUtil.UTF_8), packet.sender()));
} }
} }
msgs.releaseAllAndRecycle();
}
@Override @Override
public void exceptionCaught( public void exceptionCaught(

View File

@ -16,10 +16,9 @@
package io.netty.example.rxtx; package io.netty.example.rxtx;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.MessageList;
public class RxtxClientHandler extends ChannelInboundHandlerAdapter { public class RxtxClientHandler extends ChannelInboundConsumingHandler<String> {
@Override @Override
public void channelActive(ChannelHandlerContext ctx) { public void channelActive(ChannelHandlerContext ctx) {
@ -27,16 +26,12 @@ public class RxtxClientHandler extends ChannelInboundHandlerAdapter {
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, String msg) throws Exception {
for (int i = 0; i < msgs.size(); i++) {
String msg = msgs.get(i).toString();
if ("OK".equals(msg)) { if ("OK".equals(msg)) {
System.out.println("Serial port responded to AT"); System.out.println("Serial port responded to AT");
} else { } else {
System.out.println("Serial port responded with not-OK: " + msg); System.out.println("Serial port responded with not-OK: " + msg);
} }
}
msgs.releaseAllAndRecycle();
ctx.close(); ctx.close();
} }
} }

View File

@ -16,8 +16,7 @@
package io.netty.example.securechat; package io.netty.example.securechat;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.MessageList;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -25,17 +24,14 @@ import java.util.logging.Logger;
/** /**
* Handles a client-side channel. * Handles a client-side channel.
*/ */
public class SecureChatClientHandler extends ChannelInboundHandlerAdapter { public class SecureChatClientHandler extends ChannelInboundConsumingHandler<String> {
private static final Logger logger = Logger.getLogger( private static final Logger logger = Logger.getLogger(
SecureChatClientHandler.class.getName()); SecureChatClientHandler.class.getName());
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, String msg) throws Exception {
for (int i = 0; i < msgs.size(); i++) { System.err.println(msg);
System.err.println(msgs.get(i));
}
msgs.releaseAllAndRecycle();
} }
@Override @Override

View File

@ -17,8 +17,7 @@ package io.netty.example.securechat;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.MessageList;
import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
@ -33,7 +32,7 @@ import java.util.logging.Logger;
/** /**
* Handles a server-side channel. * Handles a server-side channel.
*/ */
public class SecureChatServerHandler extends ChannelInboundHandlerAdapter { public class SecureChatServerHandler extends ChannelInboundConsumingHandler<String> {
private static final Logger logger = Logger.getLogger( private static final Logger logger = Logger.getLogger(
SecureChatServerHandler.class.getName()); SecureChatServerHandler.class.getName());
@ -62,10 +61,7 @@ public class SecureChatServerHandler extends ChannelInboundHandlerAdapter {
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> requests) throws Exception { public void consume(ChannelHandlerContext ctx, String msg) throws Exception {
MessageList<String> msgs = requests.cast();
for (int i = 0; i < msgs.size(); i++) {
String msg = msgs.get(i);
// Send the received message to all channels but the current one. // Send the received message to all channels but the current one.
for (Channel c: channels) { for (Channel c: channels) {
if (c != ctx.channel()) { if (c != ctx.channel()) {
@ -81,8 +77,6 @@ public class SecureChatServerHandler extends ChannelInboundHandlerAdapter {
ctx.close(); ctx.close();
} }
} }
msgs.releaseAllAndRecycle();
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

View File

@ -19,8 +19,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.MessageList;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.Date; import java.util.Date;
@ -31,10 +30,9 @@ import java.util.logging.Logger;
* Handles a server-side channel. * Handles a server-side channel.
*/ */
@Sharable @Sharable
public class TelnetServerHandler extends ChannelInboundHandlerAdapter { public class TelnetServerHandler extends ChannelInboundConsumingHandler<String> {
private static final Logger logger = Logger.getLogger( private static final Logger logger = Logger.getLogger(TelnetServerHandler.class.getName());
TelnetServerHandler.class.getName());
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
@ -45,10 +43,7 @@ public class TelnetServerHandler extends ChannelInboundHandlerAdapter {
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, String request) throws Exception {
MessageList<String> requests = msgs.cast();
for (int i = 0; i < requests.size(); i++) {
String request = requests.get(i);
// Generate and write a response. // Generate and write a response.
String response; String response;
@ -72,8 +67,6 @@ public class TelnetServerHandler extends ChannelInboundHandlerAdapter {
future.addListener(ChannelFutureListener.CLOSE); future.addListener(ChannelFutureListener.CLOSE);
} }
} }
msgs.releaseAllAndRecycle();
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

View File

@ -21,9 +21,8 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.MessageList;
import io.netty.channel.sctp.SctpChannel; import io.netty.channel.sctp.SctpChannel;
import io.netty.handler.codec.sctp.SctpInboundByteStreamHandler; import io.netty.handler.codec.sctp.SctpInboundByteStreamHandler;
import io.netty.handler.codec.sctp.SctpMessageCompletionHandler; import io.netty.handler.codec.sctp.SctpMessageCompletionHandler;
@ -139,21 +138,18 @@ public class SctpEchoTest extends AbstractSctpTest {
} }
} }
private static class EchoHandler extends ChannelInboundHandlerAdapter { private static class EchoHandler extends ChannelInboundConsumingHandler<ByteBuf> {
volatile Channel channel; volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter; volatile int counter;
@Override @Override
public void channelActive(ChannelHandlerContext ctx) public void channelActive(ChannelHandlerContext ctx) throws Exception {
throws Exception {
channel = ctx.channel(); channel = ctx.channel();
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
for (int j = 0; j < msgs.size(); j ++) {
ByteBuf in = (ByteBuf) msgs.get(j);
byte[] actual = new byte[in.readableBytes()]; byte[] actual = new byte[in.readableBytes()];
in.readBytes(actual); in.readBytes(actual);
@ -168,11 +164,9 @@ public class SctpEchoTest extends AbstractSctpTest {
counter += actual.length; counter += actual.length;
} }
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Throwable cause) throws Exception {
if (exception.compareAndSet(null, cause)) { if (exception.compareAndSet(null, cause)) {
ctx.close(); ctx.close();
} }

View File

@ -19,6 +19,7 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.MessageList; import io.netty.channel.MessageList;
@ -92,20 +93,19 @@ public class DatagramMulticastTest extends AbstractDatagramTest {
cc.close().awaitUninterruptibly(); cc.close().awaitUninterruptibly();
} }
private static final class MulticastTestHandler extends ChannelInboundHandlerAdapter { private static final class MulticastTestHandler extends ChannelInboundConsumingHandler<DatagramPacket> {
private final CountDownLatch latch = new CountDownLatch(1); private final CountDownLatch latch = new CountDownLatch(1);
private boolean done; private boolean done;
private volatile boolean fail; private volatile boolean fail;
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { protected void consume(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
if (done || msgs.size() != 1) { if (done) {
fail = true; fail = true;
} }
assertEquals(1, ((DatagramPacket) msgs.get(0)).content().readInt()); assertEquals(1, msg.content().readInt());
msgs.releaseAllAndRecycle();
latch.countDown(); latch.countDown();

View File

@ -19,6 +19,7 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.MessageList; import io.netty.channel.MessageList;
import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.DatagramPacket;
@ -39,12 +40,10 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
public void testSimpleSend(Bootstrap sb, Bootstrap cb) throws Throwable { public void testSimpleSend(Bootstrap sb, Bootstrap cb) throws Throwable {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
sb.handler(new ChannelInboundHandlerAdapter() { sb.handler(new ChannelInboundConsumingHandler<DatagramPacket>() {
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
assertEquals(1, msgs.size()); assertEquals(1, msg.content().readInt());
assertEquals(1, ((DatagramPacket) msgs.get(0)).content().readInt());
msgs.releaseAllAndRecycle();
latch.countDown(); latch.countDown();
} }
}); });

View File

@ -97,6 +97,7 @@ public class SocketBufReleaseTest extends AbstractSocketTest {
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
// discard
msgs.releaseAllAndRecycle(); msgs.releaseAllAndRecycle();
} }

View File

@ -21,9 +21,8 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.MessageList;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup;
@ -181,7 +180,7 @@ public class SocketEchoTest extends AbstractSocketTest {
} }
} }
private static class EchoHandler extends ChannelInboundHandlerAdapter { private static class EchoHandler extends ChannelInboundConsumingHandler<ByteBuf> {
volatile Channel channel; volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter; volatile int counter;
@ -193,9 +192,7 @@ public class SocketEchoTest extends AbstractSocketTest {
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
for (int j = 0; j < msgs.size(); j ++) {
ByteBuf in = (ByteBuf) msgs.get(j);
byte[] actual = new byte[in.readableBytes()]; byte[] actual = new byte[in.readableBytes()];
in.readBytes(actual); in.readBytes(actual);
@ -210,8 +207,6 @@ public class SocketEchoTest extends AbstractSocketTest {
counter += actual.length; counter += actual.length;
} }
msgs.releaseAllAndRecycle();
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, public void exceptionCaught(ChannelHandlerContext ctx,

View File

@ -20,6 +20,7 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.DefaultFileRegion; import io.netty.channel.DefaultFileRegion;
@ -63,7 +64,7 @@ public class SocketFileRegionTest extends AbstractSocketTest {
testFileRegion0(sb, cb, true); testFileRegion0(sb, cb, true);
} }
private void testFileRegion0(ServerBootstrap sb, Bootstrap cb, boolean voidPromise) throws Throwable { private static void testFileRegion0(ServerBootstrap sb, Bootstrap cb, boolean voidPromise) throws Throwable {
File file = File.createTempFile("netty-", ".tmp"); File file = File.createTempFile("netty-", ".tmp");
file.deleteOnExit(); file.deleteOnExit();
@ -72,10 +73,8 @@ public class SocketFileRegionTest extends AbstractSocketTest {
out.close(); out.close();
ChannelInboundHandler ch = new ChannelInboundHandlerAdapter() { ChannelInboundHandler ch = new ChannelInboundHandlerAdapter() {
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
// discard
msgs.releaseAllAndRecycle(); msgs.releaseAllAndRecycle();
} }
@ -123,7 +122,7 @@ public class SocketFileRegionTest extends AbstractSocketTest {
} }
} }
private static class TestHandler extends ChannelInboundHandlerAdapter { private static class TestHandler extends ChannelInboundConsumingHandler<ByteBuf> {
volatile Channel channel; volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter; volatile int counter;
@ -135,9 +134,7 @@ public class SocketFileRegionTest extends AbstractSocketTest {
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
for (int j = 0; j < msgs.size(); j ++) {
ByteBuf in = (ByteBuf) msgs.get(j);
byte[] actual = new byte[in.readableBytes()]; byte[] actual = new byte[in.readableBytes()];
in.readBytes(actual); in.readBytes(actual);
@ -147,8 +144,6 @@ public class SocketFileRegionTest extends AbstractSocketTest {
} }
counter += actual.length; counter += actual.length;
} }
msgs.releaseAllAndRecycle();
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, public void exceptionCaught(ChannelHandlerContext ctx,

View File

@ -21,9 +21,8 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.MessageList;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.FixedLengthFrameDecoder;
import org.junit.Test; import org.junit.Test;
@ -124,21 +123,18 @@ public class SocketFixedLengthEchoTest extends AbstractSocketTest {
} }
} }
private static class EchoHandler extends ChannelInboundHandlerAdapter { private static class EchoHandler extends ChannelInboundConsumingHandler<ByteBuf> {
volatile Channel channel; volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter; volatile int counter;
@Override @Override
public void channelActive(ChannelHandlerContext ctx) public void channelActive(ChannelHandlerContext ctx) throws Exception {
throws Exception {
channel = ctx.channel(); channel = ctx.channel();
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
for (int j = 0; j < msgs.size(); j ++) {
ByteBuf msg = (ByteBuf) msgs.get(j);
assertEquals(1024, msg.readableBytes()); assertEquals(1024, msg.readableBytes());
byte[] actual = new byte[msg.readableBytes()]; byte[] actual = new byte[msg.readableBytes()];
@ -155,13 +151,9 @@ public class SocketFixedLengthEchoTest extends AbstractSocketTest {
counter += actual.length; counter += actual.length;
} }
msgs.releaseAllAndRecycle();
}
@Override @Override
public void exceptionCaught( public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ChannelHandlerContext ctx, Throwable cause)
throws Exception {
if (exception.compareAndSet(null, cause)) { if (exception.compareAndSet(null, cause)) {
ctx.close(); ctx.close();
} }

View File

@ -22,7 +22,7 @@ import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.MessageList; import io.netty.channel.MessageList;
import org.junit.Test; import org.junit.Test;
@ -124,7 +124,7 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
assertEquals(Unpooled.wrappedBuffer(data), sh.received); assertEquals(Unpooled.wrappedBuffer(data), sh.received);
} }
private static class TestHandler extends ChannelInboundHandlerAdapter { private static class TestHandler extends ChannelInboundConsumingHandler<ByteBuf> {
volatile Channel channel; volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter; volatile int counter;
@ -136,14 +136,10 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
for (int j = 0; j < msgs.size(); j ++) {
ByteBuf in = (ByteBuf) msgs.get(j);
counter += in.readableBytes(); counter += in.readableBytes();
received.writeBytes(in); received.writeBytes(in);
} }
msgs.releaseAllAndRecycle();
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, public void exceptionCaught(ChannelHandlerContext ctx,

View File

@ -160,7 +160,7 @@ public class SocketObjectEchoTest extends AbstractSocketTest {
counter ++; counter ++;
} }
msgs.releaseAllAndRecycle(); msgs.recycle();
} }
@Override @Override

View File

@ -18,9 +18,8 @@ package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.MessageList;
import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import org.junit.Test; import org.junit.Test;
@ -110,7 +109,7 @@ public class SocketShutdownOutputByPeerTest extends AbstractServerSocketTest {
} }
} }
private static class TestHandler extends ChannelInboundHandlerAdapter { private static class TestHandler extends ChannelInboundConsumingHandler<ByteBuf> {
volatile SocketChannel ch; volatile SocketChannel ch;
final BlockingQueue<Byte> queue = new LinkedBlockingQueue<Byte>(); final BlockingQueue<Byte> queue = new LinkedBlockingQueue<Byte>();
final CountDownLatch halfClosure = new CountDownLatch(1); final CountDownLatch halfClosure = new CountDownLatch(1);
@ -128,11 +127,8 @@ public class SocketShutdownOutputByPeerTest extends AbstractServerSocketTest {
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
for (int i = 0; i < msgs.size(); i ++) { queue.offer(msg.readByte());
queue.offer(((ByteBuf) msgs.get(i)).readByte());
}
msgs.releaseAllAndRecycle();
} }
@Override @Override

View File

@ -19,8 +19,7 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.MessageList;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import org.junit.Test; import org.junit.Test;
@ -77,7 +76,7 @@ public class SocketShutdownOutputBySelfTest extends AbstractClientSocketTest {
} }
} }
private static class TestHandler extends ChannelInboundHandlerAdapter { private static class TestHandler extends ChannelInboundConsumingHandler<ByteBuf> {
volatile SocketChannel ch; volatile SocketChannel ch;
final BlockingQueue<Byte> queue = new LinkedBlockingQueue<Byte>(); final BlockingQueue<Byte> queue = new LinkedBlockingQueue<Byte>();
@ -87,11 +86,8 @@ public class SocketShutdownOutputBySelfTest extends AbstractClientSocketTest {
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
for (int i = 0; i < msgs.size(); i ++) { queue.offer(msg.readByte());
queue.offer(((ByteBuf) msgs.get(i)).readByte());
}
msgs.releaseAllAndRecycle();
} }
} }
} }

View File

@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.MessageList; import io.netty.channel.MessageList;
@ -245,7 +246,7 @@ public class SocketSpdyEchoTest extends AbstractSocketTest {
} }
} }
private static class SpdyEchoTestClientHandler extends ChannelInboundHandlerAdapter { private static class SpdyEchoTestClientHandler extends ChannelInboundConsumingHandler<ByteBuf> {
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
final ByteBuf frames; final ByteBuf frames;
volatile int counter; volatile int counter;
@ -255,9 +256,7 @@ public class SocketSpdyEchoTest extends AbstractSocketTest {
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
for (int j = 0; j < msgs.size(); j ++) {
ByteBuf in = (ByteBuf) msgs.get(j);
byte[] actual = new byte[in.readableBytes()]; byte[] actual = new byte[in.readableBytes()];
in.readBytes(actual); in.readBytes(actual);
@ -268,8 +267,6 @@ public class SocketSpdyEchoTest extends AbstractSocketTest {
counter += actual.length; counter += actual.length;
} }
msgs.releaseAllAndRecycle();
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

View File

@ -21,9 +21,8 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.MessageList;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.stream.ChunkedWriteHandler;
@ -162,7 +161,7 @@ public class SocketSslEchoTest extends AbstractSocketTest {
} }
} }
private class EchoHandler extends ChannelInboundHandlerAdapter { private class EchoHandler extends ChannelInboundConsumingHandler<ByteBuf> {
volatile Channel channel; volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter; volatile int counter;
@ -179,9 +178,7 @@ public class SocketSslEchoTest extends AbstractSocketTest {
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
for (int j = 0; j < msgs.size(); j ++) {
ByteBuf in = (ByteBuf) msgs.get(j);
byte[] actual = new byte[in.readableBytes()]; byte[] actual = new byte[in.readableBytes()];
in.readBytes(actual); in.readBytes(actual);
@ -196,8 +193,6 @@ public class SocketSslEchoTest extends AbstractSocketTest {
counter += actual.length; counter += actual.length;
} }
msgs.releaseAllAndRecycle();
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, public void exceptionCaught(ChannelHandlerContext ctx,

View File

@ -19,10 +19,9 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageList;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringDecoder;
@ -143,7 +142,7 @@ public class SocketStartTlsTest extends AbstractSocketTest {
} }
} }
private class StartTlsClientHandler extends ChannelInboundHandlerAdapter { private class StartTlsClientHandler extends ChannelInboundConsumingHandler<String> {
private final SslHandler sslHandler; private final SslHandler sslHandler;
private Future<Channel> handshakeFuture; private Future<Channel> handshakeFuture;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
@ -160,14 +159,12 @@ public class SocketStartTlsTest extends AbstractSocketTest {
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, String msg) throws Exception {
for (int i = 0; i < msgs.size(); i ++) {
String msg = (String) msgs.get(i);
if ("StartTlsResponse".equals(msg)) { if ("StartTlsResponse".equals(msg)) {
ctx.pipeline().addAfter("logger", "ssl", sslHandler); ctx.pipeline().addAfter("logger", "ssl", sslHandler);
handshakeFuture = sslHandler.handshakeFuture(); handshakeFuture = sslHandler.handshakeFuture();
ctx.write("EncryptedRequest\n"); ctx.write("EncryptedRequest\n");
continue; return;
} }
assertEquals("EncryptedResponse", msg); assertEquals("EncryptedResponse", msg);
@ -175,8 +172,6 @@ public class SocketStartTlsTest extends AbstractSocketTest {
assertTrue(handshakeFuture.isSuccess()); assertTrue(handshakeFuture.isSuccess());
ctx.close(); ctx.close();
} }
msgs.releaseAllAndRecycle();
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, public void exceptionCaught(ChannelHandlerContext ctx,
@ -190,7 +185,7 @@ public class SocketStartTlsTest extends AbstractSocketTest {
} }
} }
private class StartTlsServerHandler extends ChannelInboundHandlerAdapter { private class StartTlsServerHandler extends ChannelInboundConsumingHandler<String> {
private final SslHandler sslHandler; private final SslHandler sslHandler;
volatile Channel channel; volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
@ -206,20 +201,16 @@ public class SocketStartTlsTest extends AbstractSocketTest {
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, String msg) throws Exception {
for (int i = 0; i < msgs.size(); i ++) {
String msg = (String) msgs.get(i);
if ("StartTlsRequest".equals(msg)) { if ("StartTlsRequest".equals(msg)) {
ctx.pipeline().addAfter("logger", "ssl", sslHandler); ctx.pipeline().addAfter("logger", "ssl", sslHandler);
ctx.write("StartTlsResponse\n"); ctx.write("StartTlsResponse\n");
continue; return;
} }
assertEquals("EncryptedRequest", msg); assertEquals("EncryptedRequest", msg);
ctx.write("EncryptedResponse\n"); ctx.write("EncryptedResponse\n");
} }
msgs.releaseAllAndRecycle();
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, public void exceptionCaught(ChannelHandlerContext ctx,

View File

@ -19,9 +19,8 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.MessageList;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.Delimiters;
@ -136,21 +135,18 @@ public class SocketStringEchoTest extends AbstractSocketTest {
} }
} }
static class StringEchoHandler extends ChannelInboundHandlerAdapter { static class StringEchoHandler extends ChannelInboundConsumingHandler<String> {
volatile Channel channel; volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter; volatile int counter;
@Override @Override
public void channelActive(ChannelHandlerContext ctx) public void channelActive(ChannelHandlerContext ctx) throws Exception {
throws Exception {
channel = ctx.channel(); channel = ctx.channel();
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, String msg) throws Exception {
for (int i = 0; i < msgs.size(); i ++) {
String msg = (String) msgs.get(i);
assertEquals(data[counter], msg); assertEquals(data[counter], msg);
if (channel.parent() != null) { if (channel.parent() != null) {
@ -160,12 +156,9 @@ public class SocketStringEchoTest extends AbstractSocketTest {
counter ++; counter ++;
} }
msgs.releaseAllAndRecycle();
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Throwable cause) throws Exception {
if (exception.compareAndSet(null, cause)) { if (exception.compareAndSet(null, cause)) {
ctx.close(); ctx.close();
} }

View File

@ -19,6 +19,7 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
@ -138,8 +139,7 @@ public class UDTClientServerConnectionTest {
} }
} }
static class ClientHandler extends static class ClientHandler extends ChannelInboundConsumingHandler<Object> {
ChannelInboundHandlerAdapter {
static final Logger log = LoggerFactory.getLogger(ClientHandler.class); static final Logger log = LoggerFactory.getLogger(ClientHandler.class);
@ -169,11 +169,8 @@ public class UDTClientServerConnectionTest {
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, Object msg) throws Exception {
for (int i = 0; i < msgs.size(); i ++) { log.info("Client received: " + msg);
log.info("Client received: " + msgs.get(i));
}
msgs.releaseAllAndRecycle();
} }
} }

View File

@ -70,7 +70,6 @@ public class EchoByteHandler extends ChannelInboundHandlerAdapter {
if (meter != null) { if (meter != null) {
meter.mark(buf.readableBytes()); meter.mark(buf.readableBytes());
} }
buf.retain();
} }
ctx.write(msgs); ctx.write(msgs);
} }

View File

@ -19,12 +19,11 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.MessageList;
import io.netty.channel.ServerChannel; import io.netty.channel.ServerChannel;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
@ -218,7 +217,7 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
return new Entry[size]; return new Entry[size];
} }
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { private static class ServerBootstrapAcceptor extends ChannelInboundConsumingHandler<Channel> {
private final EventLoopGroup childGroup; private final EventLoopGroup childGroup;
private final ChannelHandler childHandler; private final ChannelHandler childHandler;
@ -237,10 +236,7 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) { public void consume(ChannelHandlerContext ctx, Channel child) {
int size = msgs.size();
for (int i = 0; i < size; i ++) {
Channel child = (Channel) msgs.get(i);
child.pipeline().addLast(childHandler); child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) { for (Entry<ChannelOption<?>, Object> e: childOptions) {
@ -264,7 +260,6 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
logger.warn("Failed to register an accepted channel: " + child, t); logger.warn("Failed to register an accepted channel: " + child, t);
} }
} }
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

View File

@ -0,0 +1,85 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
/**
* Abstract base class for {@link ChannelInboundHandler} that would like to consume messages. This means they will
* actually handle them and not pass them to the next handler in the {@link ChannelPipeline}.
*
* If you need to pass them throught the {@link ChannelPipeline} use {@link ChannelInboundHandlerAdapter}.
*/
public abstract class ChannelInboundConsumingHandler<I> extends ChannelInboundHandlerAdapter {
@Override
public final void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
try {
beginConsume(ctx);
MessageList<I> cast = msgs.cast();
int size = cast.size();
for (int i = 0; i < size; i++) {
consume(ctx, cast.get(i));
}
} finally {
try {
msgs.releaseAllAndRecycle();
} finally {
endConsume(ctx);
}
}
}
/**
* Is called before consume of messages start.
*
* @param ctx The {@link ChannelHandlerContext} which is bound to this
* {@link ChannelInboundConsumingHandler}
*/
protected void beginConsume(ChannelHandlerContext ctx) {
// NOOP
}
/**
* Is called after consume of messages ends.
*
* @param ctx The {@link ChannelHandlerContext} which is bound to this
* {@link ChannelInboundConsumingHandler}
*/
protected void endConsume(ChannelHandlerContext ctx) {
// NOOP
}
/**
* Consume the message. After this method was executed for all of the messages in the {@link MessageList}
* {@link MessageList#releaseAllAndRecycle()} is called and so the {@link MessageList} is recycled and
* {@link ReferenceCounted#release()} is called on all messages that implement {@link ReferenceCounted}.
*
* Be aware that because of this you must not hold a reference to a message or to the {@link MessageList} after
* this method returns. If you really need to hold a reference to a message, use
* {@link ReferenceCountUtil#retain(Object)} on it to increment the reference count and so make sure its not
* released.
*
*
* @param ctx The {@link ChannelHandlerContext} which is bound to this
* {@link ChannelInboundConsumingHandler}
* @param msg The mesage to consume and handle
* @throws Exception thrown if an error accours
*/
protected abstract void consume(ChannelHandlerContext ctx, I msg) throws Exception;
}

View File

@ -15,13 +15,21 @@
*/ */
package io.netty.channel; package io.netty.channel;
/** /**
* Abstract base class for {@link ChannelInboundHandler} implementations which provide * Abstract base class for {@link ChannelInboundHandler} implementations which provide
* implementations of all of their methods. * implementations of all of their methods.
* *
* <p>
* This implementation just forward the operation to the next {@link ChannelHandler} in the * This implementation just forward the operation to the next {@link ChannelHandler} in the
* {@link ChannelPipeline}. Sub-classes may override a method implementation to change this. * {@link ChannelPipeline}. Sub-classes may override a method implementation to change this.
* </p>
* <p>
* Be aware that messages are not released after the {@link #messageReceived(ChannelHandlerContext, MessageList)}
* method returns automatically. This is done for make it as flexible as possible and get the most out of
* performance. Because of this you need to explicit call {@link MessageList#releaseAllAndRecycle()} if you
* consumed all the messages. Because this is such a common need {@link ChannelInboundConsumingHandler} is provided ,
* which will automatically release messages and the {@link MessageList} after processing is done.
* </p>
*/ */
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler { public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {

View File

@ -310,6 +310,7 @@ public class EmbeddedChannel extends AbstractChannel {
for (int i = 0; i < size; i ++) { for (int i = 0; i < size; i ++) {
lastInboundBuffer.add(msgs.get(i)); lastInboundBuffer.add(msgs.get(i));
} }
msgs.recycle();
} }
@Override @Override

View File

@ -61,7 +61,7 @@ public class DefaultChannelPipelineTest {
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
// Swallow. msgs.releaseAllAndRecycle();
} }
}); });

View File

@ -20,10 +20,9 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.AbstractChannel; import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundConsumingHandler;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.MessageList;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
import org.junit.Test; import org.junit.Test;
@ -141,13 +140,10 @@ public class LocalChannelTest {
clientGroup.terminationFuture().sync(); clientGroup.terminationFuture().sync();
} }
static class TestHandler extends ChannelInboundHandlerAdapter { static class TestHandler extends ChannelInboundConsumingHandler<Object> {
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void consume(ChannelHandlerContext ctx, Object msg) throws Exception {
final int size = msgs.size(); logger.info(String.format("Received mesage: %s", msg));
for (int i = 0; i < size; i ++) {
logger.info(String.format("Received mesage: %s", msgs.get(i)));
}
} }
} }
} }

View File

@ -59,6 +59,7 @@ public class LocalTransportThreadModelTest {
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) { public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) {
// Discard // Discard
msgs.releaseAllAndRecycle();
} }
}); });
} }

View File

@ -114,7 +114,7 @@ public class LocalTransportThreadModelTest2 {
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
count.addAndGet(msgs.size()); count.addAndGet(msgs.size());
msgs.recycle(); msgs.releaseAllAndRecycle();
} }
} }
} }

View File

@ -71,9 +71,9 @@ public class LocalTransportThreadModelTest3 {
public void initChannel(LocalChannel ch) throws Exception { public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override @Override
public void messageReceived( public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) {
ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
// Discard // Discard
msgs.releaseAllAndRecycle();
} }
}); });
} }

View File

@ -46,9 +46,9 @@ public class NioDatagramChannelTest {
.option(ChannelOption.SO_BROADCAST, true) .option(ChannelOption.SO_BROADCAST, true)
.handler(new ChannelInboundHandlerAdapter() { .handler(new ChannelInboundHandlerAdapter() {
@Override @Override
public void messageReceived( public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) {
ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception { // Discard
// noop msgs.releaseAllAndRecycle();
} }
}); });
DatagramChannel datagramChannel = (DatagramChannel) udpBootstrap DatagramChannel datagramChannel = (DatagramChannel) udpBootstrap