Merge ChannelInboundConsumingHandler into SimpleChannelInboundHandler
- SimpleChannelInboundHandler now has a constructor parameter to let a user decide to enable automatic message release. (the default is to enable), which makes ChannelInboundConsumingHandler of less value.
This commit is contained in:
parent
bfc9c6d80d
commit
a969613540
@ -48,7 +48,6 @@ class WebSocketClientProtocolHandshakeHandler extends SimpleChannelInboundHandle
|
||||
protected void messageReceived(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception {
|
||||
if (!handshaker.isHandshakeComplete()) {
|
||||
handshaker.finishHandshake(ctx.channel(), msg);
|
||||
msg.release();
|
||||
ctx.fireUserEventTriggered(
|
||||
WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE);
|
||||
ctx.pipeline().remove(this);
|
||||
|
@ -20,7 +20,6 @@ import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandler;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.MessageList;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
@ -125,7 +124,6 @@ public class WebSocketServerProtocolHandler extends WebSocketProtocolHandler {
|
||||
return new SimpleChannelInboundHandler<FullHttpRequest>() {
|
||||
@Override
|
||||
protected void messageReceived(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
|
||||
msg.release();
|
||||
FullHttpResponse response =
|
||||
new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.FORBIDDEN);
|
||||
ctx.channel().write(response);
|
||||
|
@ -17,10 +17,10 @@ package io.netty.handler.codec.http.websocketx;
|
||||
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.ChannelOutboundHandlerAdapter;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.MessageList;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.embedded.EmbeddedChannel;
|
||||
import io.netty.handler.codec.http.DefaultFullHttpRequest;
|
||||
import io.netty.handler.codec.http.FullHttpRequest;
|
||||
@ -156,11 +156,11 @@ public class WebSocketServerProtocolHandlerTest {
|
||||
}
|
||||
}
|
||||
|
||||
private static class CustomTextFrameHandler extends ChannelInboundConsumingHandler<TextWebSocketFrame> {
|
||||
private static class CustomTextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
|
||||
private String content;
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
|
||||
assertNull(content);
|
||||
content = "processed: " + msg.text();
|
||||
}
|
||||
|
@ -19,9 +19,9 @@ import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
@ -72,10 +72,10 @@ public class AppletDiscardServer extends JApplet {
|
||||
}
|
||||
}
|
||||
|
||||
private static final class DiscardServerHandler extends ChannelInboundConsumingHandler<ByteBuf> {
|
||||
private static final class DiscardServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
|
||||
System.out.println("Received: " + msg.toString(CharsetUtil.UTF_8));
|
||||
}
|
||||
|
||||
|
@ -71,8 +71,7 @@ public class FactorialClientHandler extends SimpleChannelInboundHandler<BigInteg
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(
|
||||
ChannelHandlerContext ctx, final BigInteger msg) {
|
||||
public void messageReceived(ChannelHandlerContext ctx, final BigInteger msg) {
|
||||
receivedMessages ++;
|
||||
if (receivedMessages == count) {
|
||||
// Offer the answer after closing the connection.
|
||||
|
@ -18,13 +18,13 @@ package io.netty.example.filetransfer;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.DefaultFileRegion;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.FileRegion;
|
||||
import io.netty.channel.MessageList;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
@ -92,9 +92,9 @@ public class FileServer {
|
||||
new FileServer(port).run();
|
||||
}
|
||||
|
||||
private static final class FileHandler extends ChannelInboundConsumingHandler<String> {
|
||||
private static final class FileHandler extends SimpleChannelInboundHandler<String> {
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, String msg) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
|
||||
File file = new File(msg);
|
||||
if (file.exists()) {
|
||||
if (!file.isFile()) {
|
||||
|
@ -19,8 +19,8 @@ import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.MessageList;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.handler.codec.http.DefaultFullHttpResponse;
|
||||
import io.netty.handler.codec.http.DefaultHttpResponse;
|
||||
import io.netty.handler.codec.http.FullHttpRequest;
|
||||
@ -98,14 +98,14 @@ import static io.netty.handler.codec.http.HttpVersion.*;
|
||||
*
|
||||
* </pre>
|
||||
*/
|
||||
public class HttpStaticFileServerHandler extends ChannelInboundConsumingHandler<FullHttpRequest> {
|
||||
public class HttpStaticFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
|
||||
|
||||
public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz";
|
||||
public static final String HTTP_DATE_GMT_TIMEZONE = "GMT";
|
||||
public static final int HTTP_CACHE_SECONDS = 60;
|
||||
|
||||
@Override
|
||||
public void consume(
|
||||
public void messageReceived(
|
||||
ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
|
||||
if (!request.getDecoderResult().isSuccess()) {
|
||||
sendError(ctx, BAD_REQUEST);
|
||||
|
@ -19,8 +19,8 @@ import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.MessageList;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.handler.codec.http.DefaultFullHttpResponse;
|
||||
import io.netty.handler.codec.http.FullHttpResponse;
|
||||
import io.netty.handler.codec.http.HttpRequest;
|
||||
@ -31,24 +31,24 @@ import static io.netty.handler.codec.http.HttpHeaders.*;
|
||||
import static io.netty.handler.codec.http.HttpResponseStatus.*;
|
||||
import static io.netty.handler.codec.http.HttpVersion.*;
|
||||
|
||||
public class HttpHelloWorldServerHandler extends ChannelInboundConsumingHandler<Object> {
|
||||
public class HttpHelloWorldServerHandler extends SimpleChannelInboundHandler<Object> {
|
||||
private static final ByteBuf CONTENT =
|
||||
Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hello World", CharsetUtil.US_ASCII));
|
||||
|
||||
private MessageList<Object> out;
|
||||
@Override
|
||||
protected void beginConsume(ChannelHandlerContext ctx) {
|
||||
protected void beginMessageReceived(ChannelHandlerContext ctx) {
|
||||
out = MessageList.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void endConsume(ChannelHandlerContext ctx) {
|
||||
protected void endMessageReceived(ChannelHandlerContext ctx) {
|
||||
ctx.write(out);
|
||||
out = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
if (msg instanceof HttpRequest) {
|
||||
HttpRequest req = (HttpRequest) msg;
|
||||
|
||||
|
@ -16,7 +16,7 @@
|
||||
package io.netty.example.http.snoop;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.handler.codec.http.HttpContent;
|
||||
import io.netty.handler.codec.http.HttpHeaders;
|
||||
import io.netty.handler.codec.http.HttpObject;
|
||||
@ -24,10 +24,10 @@ import io.netty.handler.codec.http.HttpResponse;
|
||||
import io.netty.handler.codec.http.LastHttpContent;
|
||||
import io.netty.util.CharsetUtil;
|
||||
|
||||
public class HttpSnoopClientHandler extends ChannelInboundConsumingHandler<HttpObject> {
|
||||
public class HttpSnoopClientHandler extends SimpleChannelInboundHandler<HttpObject> {
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
|
||||
if (msg instanceof HttpResponse) {
|
||||
HttpResponse response = (HttpResponse) msg;
|
||||
|
||||
|
@ -16,7 +16,7 @@
|
||||
package io.netty.example.http.upload;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.handler.codec.http.HttpContent;
|
||||
import io.netty.handler.codec.http.HttpHeaders;
|
||||
import io.netty.handler.codec.http.HttpObject;
|
||||
@ -29,14 +29,14 @@ import java.util.logging.Logger;
|
||||
/**
|
||||
* Handler that just dumps the contents of the response from the server
|
||||
*/
|
||||
public class HttpUploadClientHandler extends ChannelInboundConsumingHandler<HttpObject> {
|
||||
public class HttpUploadClientHandler extends SimpleChannelInboundHandler<HttpObject> {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(HttpUploadClientHandler.class.getName());
|
||||
|
||||
private boolean readingChunks;
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
|
||||
if (msg instanceof HttpResponse) {
|
||||
HttpResponse response = (HttpResponse) msg;
|
||||
|
||||
|
@ -20,7 +20,7 @@ import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.handler.codec.http.Cookie;
|
||||
import io.netty.handler.codec.http.CookieDecoder;
|
||||
import io.netty.handler.codec.http.DefaultFullHttpResponse;
|
||||
@ -62,7 +62,7 @@ import java.util.logging.Logger;
|
||||
import static io.netty.buffer.Unpooled.*;
|
||||
import static io.netty.handler.codec.http.HttpHeaders.Names.*;
|
||||
|
||||
public class HttpUploadServerHandler extends ChannelInboundConsumingHandler<HttpObject> {
|
||||
public class HttpUploadServerHandler extends SimpleChannelInboundHandler<HttpObject> {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(HttpUploadServerHandler.class.getName());
|
||||
|
||||
@ -96,7 +96,7 @@ public class HttpUploadServerHandler extends ChannelInboundConsumingHandler<Http
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
|
||||
if (msg instanceof HttpRequest) {
|
||||
HttpRequest request = this.request = (HttpRequest) msg;
|
||||
URI uri = new URI(request.getUri());
|
||||
@ -319,7 +319,7 @@ public class HttpUploadServerHandler extends ChannelInboundConsumingHandler<Http
|
||||
if (!close) {
|
||||
// There's no need to add 'Content-Length' header
|
||||
// if this is the last response.
|
||||
response.headers().set(CONTENT_LENGTH, String.valueOf(buf.readableBytes()));
|
||||
response.headers().set(CONTENT_LENGTH, buf.readableBytes());
|
||||
}
|
||||
|
||||
Set<Cookie> cookies;
|
||||
@ -421,7 +421,7 @@ public class HttpUploadServerHandler extends ChannelInboundConsumingHandler<Http
|
||||
HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buf);
|
||||
|
||||
response.headers().set(CONTENT_TYPE, "text/html; charset=UTF-8");
|
||||
response.headers().set(CONTENT_LENGTH, String.valueOf(buf.readableBytes()));
|
||||
response.headers().set(CONTENT_LENGTH, buf.readableBytes());
|
||||
|
||||
// Write the response.
|
||||
ctx.channel().write(response);
|
||||
|
@ -40,8 +40,8 @@ package io.netty.example.http.websocketx.client;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.handler.codec.http.FullHttpResponse;
|
||||
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
|
||||
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
|
||||
@ -50,7 +50,7 @@ import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
|
||||
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
|
||||
import io.netty.util.CharsetUtil;
|
||||
|
||||
public class WebSocketClientHandler extends ChannelInboundConsumingHandler<Object> {
|
||||
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
|
||||
|
||||
private final WebSocketClientHandshaker handshaker;
|
||||
private ChannelPromise handshakeFuture;
|
||||
@ -79,7 +79,7 @@ public class WebSocketClientHandler extends ChannelInboundConsumingHandler<Objec
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
Channel ch = ctx.channel();
|
||||
if (!handshaker.isHandshakeComplete()) {
|
||||
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
|
||||
|
@ -20,11 +20,9 @@ import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
|
||||
|
||||
public class CustomTextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception {
|
||||
String request = frame.text();
|
||||
ctx.channel().write(new TextWebSocketFrame(request.toUpperCase()));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -20,7 +20,7 @@ import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.handler.codec.http.DefaultFullHttpResponse;
|
||||
import io.netty.handler.codec.http.FullHttpRequest;
|
||||
import io.netty.handler.codec.http.FullHttpResponse;
|
||||
@ -45,7 +45,7 @@ import static io.netty.handler.codec.http.HttpVersion.*;
|
||||
/**
|
||||
* Handles handshakes and messages
|
||||
*/
|
||||
public class WebSocketServerHandler extends ChannelInboundConsumingHandler<Object> {
|
||||
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
|
||||
private static final Logger logger = Logger.getLogger(WebSocketServerHandler.class.getName());
|
||||
|
||||
private static final String WEBSOCKET_PATH = "/websocket";
|
||||
@ -53,7 +53,7 @@ public class WebSocketServerHandler extends ChannelInboundConsumingHandler<Objec
|
||||
private WebSocketServerHandshaker handshaker;
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
if (msg instanceof FullHttpRequest) {
|
||||
handleHttpRequest(ctx, (FullHttpRequest) msg);
|
||||
} else if (msg instanceof WebSocketFrame) {
|
||||
|
@ -20,7 +20,7 @@ import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.example.http.websocketx.server.WebSocketServerIndexPage;
|
||||
import io.netty.handler.codec.http.DefaultFullHttpResponse;
|
||||
import io.netty.handler.codec.http.FullHttpRequest;
|
||||
@ -46,7 +46,7 @@ import static io.netty.handler.codec.http.HttpVersion.*;
|
||||
/**
|
||||
* Handles handshakes and messages
|
||||
*/
|
||||
public class WebSocketSslServerHandler extends ChannelInboundConsumingHandler<Object> {
|
||||
public class WebSocketSslServerHandler extends SimpleChannelInboundHandler<Object> {
|
||||
private static final Logger logger = Logger.getLogger(WebSocketSslServerHandler.class.getName());
|
||||
|
||||
private static final String WEBSOCKET_PATH = "/websocket";
|
||||
@ -54,7 +54,7 @@ public class WebSocketSslServerHandler extends ChannelInboundConsumingHandler<Ob
|
||||
private WebSocketServerHandshaker handshaker;
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
if (msg instanceof FullHttpRequest) {
|
||||
handleHttpRequest(ctx, (FullHttpRequest) msg);
|
||||
} else if (msg instanceof WebSocketFrame) {
|
||||
|
@ -16,12 +16,12 @@
|
||||
package io.netty.example.localecho;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
|
||||
public class LocalEchoClientHandler extends ChannelInboundConsumingHandler<Object> {
|
||||
public class LocalEchoClientHandler extends SimpleChannelInboundHandler<Object> {
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
// Print as received
|
||||
System.out.println(msg);
|
||||
}
|
||||
|
@ -16,14 +16,14 @@
|
||||
package io.netty.example.qotm;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.socket.DatagramPacket;
|
||||
import io.netty.util.CharsetUtil;
|
||||
|
||||
public class QuoteOfTheMomentClientHandler extends ChannelInboundConsumingHandler<DatagramPacket> {
|
||||
public class QuoteOfTheMomentClientHandler extends SimpleChannelInboundHandler<DatagramPacket> {
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
|
||||
String response = msg.content().toString(CharsetUtil.UTF_8);
|
||||
if (response.startsWith("QOTM: ")) {
|
||||
System.out.println("Quote of the Moment: " + response.substring(6));
|
||||
|
@ -17,13 +17,13 @@ package io.netty.example.qotm;
|
||||
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.socket.DatagramPacket;
|
||||
import io.netty.util.CharsetUtil;
|
||||
|
||||
import java.util.Random;
|
||||
|
||||
public class QuoteOfTheMomentServerHandler extends ChannelInboundConsumingHandler<DatagramPacket> {
|
||||
public class QuoteOfTheMomentServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
|
||||
|
||||
private static final Random random = new Random();
|
||||
|
||||
@ -44,7 +44,7 @@ public class QuoteOfTheMomentServerHandler extends ChannelInboundConsumingHandle
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
|
||||
System.err.println(packet);
|
||||
if ("QOTM?".equals(packet.content().toString(CharsetUtil.UTF_8))) {
|
||||
ctx.write(new DatagramPacket(
|
||||
|
@ -16,9 +16,9 @@
|
||||
package io.netty.example.rxtx;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
|
||||
public class RxtxClientHandler extends ChannelInboundConsumingHandler<String> {
|
||||
public class RxtxClientHandler extends SimpleChannelInboundHandler<String> {
|
||||
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) {
|
||||
@ -26,7 +26,7 @@ public class RxtxClientHandler extends ChannelInboundConsumingHandler<String> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, String msg) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
|
||||
if ("OK".equals(msg)) {
|
||||
System.out.println("Serial port responded to AT");
|
||||
} else {
|
||||
|
@ -16,7 +16,7 @@
|
||||
package io.netty.example.securechat;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
@ -24,13 +24,13 @@ import java.util.logging.Logger;
|
||||
/**
|
||||
* Handles a client-side channel.
|
||||
*/
|
||||
public class SecureChatClientHandler extends ChannelInboundConsumingHandler<String> {
|
||||
public class SecureChatClientHandler extends SimpleChannelInboundHandler<String> {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(
|
||||
SecureChatClientHandler.class.getName());
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, String msg) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
|
||||
System.err.println(msg);
|
||||
}
|
||||
|
||||
|
@ -17,7 +17,7 @@ package io.netty.example.securechat;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.group.ChannelGroup;
|
||||
import io.netty.channel.group.DefaultChannelGroup;
|
||||
import io.netty.handler.ssl.SslHandler;
|
||||
@ -32,7 +32,7 @@ import java.util.logging.Logger;
|
||||
/**
|
||||
* Handles a server-side channel.
|
||||
*/
|
||||
public class SecureChatServerHandler extends ChannelInboundConsumingHandler<String> {
|
||||
public class SecureChatServerHandler extends SimpleChannelInboundHandler<String> {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(
|
||||
SecureChatServerHandler.class.getName());
|
||||
@ -61,7 +61,7 @@ public class SecureChatServerHandler extends ChannelInboundConsumingHandler<Stri
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, String msg) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
|
||||
// Send the received message to all channels but the current one.
|
||||
for (Channel c: channels) {
|
||||
if (c != ctx.channel()) {
|
||||
|
@ -28,8 +28,7 @@ import java.util.logging.Logger;
|
||||
@Sharable
|
||||
public class TelnetClientHandler extends SimpleChannelInboundHandler<String> {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(
|
||||
TelnetClientHandler.class.getName());
|
||||
private static final Logger logger = Logger.getLogger(TelnetClientHandler.class.getName());
|
||||
|
||||
@Override
|
||||
protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
|
||||
|
@ -19,7 +19,7 @@ import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandler.Sharable;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.util.Date;
|
||||
@ -30,7 +30,7 @@ import java.util.logging.Logger;
|
||||
* Handles a server-side channel.
|
||||
*/
|
||||
@Sharable
|
||||
public class TelnetServerHandler extends ChannelInboundConsumingHandler<String> {
|
||||
public class TelnetServerHandler extends SimpleChannelInboundHandler<String> {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(TelnetServerHandler.class.getName());
|
||||
|
||||
@ -43,7 +43,7 @@ public class TelnetServerHandler extends ChannelInboundConsumingHandler<String>
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, String request) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, String request) throws Exception {
|
||||
|
||||
// Generate and write a response.
|
||||
String response;
|
||||
|
@ -21,8 +21,8 @@ import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.sctp.SctpChannel;
|
||||
import io.netty.handler.codec.sctp.SctpInboundByteStreamHandler;
|
||||
import io.netty.handler.codec.sctp.SctpMessageCompletionHandler;
|
||||
@ -138,7 +138,7 @@ public class SctpEchoTest extends AbstractSctpTest {
|
||||
}
|
||||
}
|
||||
|
||||
private static class EchoHandler extends ChannelInboundConsumingHandler<ByteBuf> {
|
||||
private static class EchoHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
||||
volatile Channel channel;
|
||||
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||
volatile int counter;
|
||||
@ -149,7 +149,7 @@ public class SctpEchoTest extends AbstractSctpTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
||||
byte[] actual = new byte[in.readableBytes()];
|
||||
in.readBytes(actual);
|
||||
|
||||
|
@ -19,10 +19,10 @@ import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.MessageList;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.socket.DatagramChannel;
|
||||
import io.netty.channel.socket.DatagramPacket;
|
||||
import io.netty.channel.socket.oio.OioDatagramChannel;
|
||||
@ -93,14 +93,14 @@ public class DatagramMulticastTest extends AbstractDatagramTest {
|
||||
cc.close().awaitUninterruptibly();
|
||||
}
|
||||
|
||||
private static final class MulticastTestHandler extends ChannelInboundConsumingHandler<DatagramPacket> {
|
||||
private static final class MulticastTestHandler extends SimpleChannelInboundHandler<DatagramPacket> {
|
||||
private final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
private boolean done;
|
||||
private volatile boolean fail;
|
||||
|
||||
@Override
|
||||
protected void consume(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
|
||||
protected void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
|
||||
if (done) {
|
||||
fail = true;
|
||||
}
|
||||
|
@ -19,9 +19,9 @@ import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.MessageList;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.socket.DatagramPacket;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -40,9 +40,9 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
|
||||
public void testSimpleSend(Bootstrap sb, Bootstrap cb) throws Throwable {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
sb.handler(new ChannelInboundConsumingHandler<DatagramPacket>() {
|
||||
sb.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
|
||||
assertEquals(1, msg.content().readInt());
|
||||
latch.countDown();
|
||||
}
|
||||
|
@ -21,8 +21,8 @@ import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.util.concurrent.DefaultEventExecutorGroup;
|
||||
import io.netty.util.concurrent.EventExecutorGroup;
|
||||
@ -180,7 +180,7 @@ public class SocketEchoTest extends AbstractSocketTest {
|
||||
}
|
||||
}
|
||||
|
||||
private static class EchoHandler extends ChannelInboundConsumingHandler<ByteBuf> {
|
||||
private static class EchoHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
||||
volatile Channel channel;
|
||||
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||
volatile int counter;
|
||||
@ -192,7 +192,7 @@ public class SocketEchoTest extends AbstractSocketTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
||||
byte[] actual = new byte[in.readableBytes()];
|
||||
in.readBytes(actual);
|
||||
|
||||
|
@ -20,12 +20,12 @@ import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.ChannelInboundHandler;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.DefaultFileRegion;
|
||||
import io.netty.channel.FileRegion;
|
||||
import io.netty.channel.MessageList;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
@ -122,7 +122,7 @@ public class SocketFileRegionTest extends AbstractSocketTest {
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestHandler extends ChannelInboundConsumingHandler<ByteBuf> {
|
||||
private static class TestHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
||||
volatile Channel channel;
|
||||
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||
volatile int counter;
|
||||
@ -134,7 +134,7 @@ public class SocketFileRegionTest extends AbstractSocketTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
||||
byte[] actual = new byte[in.readableBytes()];
|
||||
in.readBytes(actual);
|
||||
|
||||
|
@ -21,8 +21,8 @@ import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.handler.codec.FixedLengthFrameDecoder;
|
||||
import org.junit.Test;
|
||||
@ -123,7 +123,7 @@ public class SocketFixedLengthEchoTest extends AbstractSocketTest {
|
||||
}
|
||||
}
|
||||
|
||||
private static class EchoHandler extends ChannelInboundConsumingHandler<ByteBuf> {
|
||||
private static class EchoHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
||||
volatile Channel channel;
|
||||
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||
volatile int counter;
|
||||
@ -134,7 +134,7 @@ public class SocketFixedLengthEchoTest extends AbstractSocketTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
|
||||
assertEquals(1024, msg.readableBytes());
|
||||
|
||||
byte[] actual = new byte[msg.readableBytes()];
|
||||
|
@ -22,16 +22,15 @@ import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.MessageList;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class SocketGatheringWriteTest extends AbstractSocketTest {
|
||||
|
||||
@ -124,7 +123,7 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
|
||||
assertEquals(Unpooled.wrappedBuffer(data), sh.received);
|
||||
}
|
||||
|
||||
private static class TestHandler extends ChannelInboundConsumingHandler<ByteBuf> {
|
||||
private static class TestHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
||||
volatile Channel channel;
|
||||
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||
volatile int counter;
|
||||
@ -136,7 +135,7 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
||||
counter += in.readableBytes();
|
||||
received.writeBytes(in);
|
||||
}
|
||||
|
@ -18,8 +18,8 @@ package io.netty.testsuite.transport.socket;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import org.junit.Test;
|
||||
@ -109,7 +109,7 @@ public class SocketShutdownOutputByPeerTest extends AbstractServerSocketTest {
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestHandler extends ChannelInboundConsumingHandler<ByteBuf> {
|
||||
private static class TestHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
||||
volatile SocketChannel ch;
|
||||
final BlockingQueue<Byte> queue = new LinkedBlockingQueue<Byte>();
|
||||
final CountDownLatch halfClosure = new CountDownLatch(1);
|
||||
@ -127,7 +127,7 @@ public class SocketShutdownOutputByPeerTest extends AbstractServerSocketTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
|
||||
queue.offer(msg.readByte());
|
||||
}
|
||||
|
||||
|
@ -19,7 +19,7 @@ import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -76,7 +76,7 @@ public class SocketShutdownOutputBySelfTest extends AbstractClientSocketTest {
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestHandler extends ChannelInboundConsumingHandler<ByteBuf> {
|
||||
private static class TestHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
||||
volatile SocketChannel ch;
|
||||
final BlockingQueue<Byte> queue = new LinkedBlockingQueue<Byte>();
|
||||
|
||||
@ -86,7 +86,7 @@ public class SocketShutdownOutputBySelfTest extends AbstractClientSocketTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
|
||||
queue.offer(msg.readByte());
|
||||
}
|
||||
}
|
||||
|
@ -21,10 +21,10 @@ import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.MessageList;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.handler.codec.spdy.SpdyConstants;
|
||||
import io.netty.handler.codec.spdy.SpdyFrameDecoder;
|
||||
@ -246,7 +246,7 @@ public class SocketSpdyEchoTest extends AbstractSocketTest {
|
||||
}
|
||||
}
|
||||
|
||||
private static class SpdyEchoTestClientHandler extends ChannelInboundConsumingHandler<ByteBuf> {
|
||||
private static class SpdyEchoTestClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
||||
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||
final ByteBuf frames;
|
||||
volatile int counter;
|
||||
@ -256,7 +256,7 @@ public class SocketSpdyEchoTest extends AbstractSocketTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
||||
byte[] actual = new byte[in.readableBytes()];
|
||||
in.readBytes(actual);
|
||||
|
||||
|
@ -21,8 +21,8 @@ import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.handler.ssl.SslHandler;
|
||||
import io.netty.handler.stream.ChunkedWriteHandler;
|
||||
@ -161,7 +161,7 @@ public class SocketSslEchoTest extends AbstractSocketTest {
|
||||
}
|
||||
}
|
||||
|
||||
private class EchoHandler extends ChannelInboundConsumingHandler<ByteBuf> {
|
||||
private class EchoHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
||||
volatile Channel channel;
|
||||
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||
volatile int counter;
|
||||
@ -178,7 +178,7 @@ public class SocketSslEchoTest extends AbstractSocketTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
||||
byte[] actual = new byte[in.readableBytes()];
|
||||
in.readBytes(actual);
|
||||
|
||||
|
@ -19,9 +19,9 @@ import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.handler.codec.LineBasedFrameDecoder;
|
||||
import io.netty.handler.codec.string.StringDecoder;
|
||||
@ -142,7 +142,7 @@ public class SocketStartTlsTest extends AbstractSocketTest {
|
||||
}
|
||||
}
|
||||
|
||||
private class StartTlsClientHandler extends ChannelInboundConsumingHandler<String> {
|
||||
private class StartTlsClientHandler extends SimpleChannelInboundHandler<String> {
|
||||
private final SslHandler sslHandler;
|
||||
private Future<Channel> handshakeFuture;
|
||||
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||
@ -159,7 +159,7 @@ public class SocketStartTlsTest extends AbstractSocketTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, String msg) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
|
||||
if ("StartTlsResponse".equals(msg)) {
|
||||
ctx.pipeline().addAfter("logger", "ssl", sslHandler);
|
||||
handshakeFuture = sslHandler.handshakeFuture();
|
||||
@ -185,7 +185,7 @@ public class SocketStartTlsTest extends AbstractSocketTest {
|
||||
}
|
||||
}
|
||||
|
||||
private class StartTlsServerHandler extends ChannelInboundConsumingHandler<String> {
|
||||
private class StartTlsServerHandler extends SimpleChannelInboundHandler<String> {
|
||||
private final SslHandler sslHandler;
|
||||
volatile Channel channel;
|
||||
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||
@ -201,7 +201,7 @@ public class SocketStartTlsTest extends AbstractSocketTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, String msg) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
|
||||
if ("StartTlsRequest".equals(msg)) {
|
||||
ctx.pipeline().addAfter("logger", "ssl", sslHandler);
|
||||
ctx.write("StartTlsResponse\n");
|
||||
|
@ -19,8 +19,8 @@ import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
|
||||
import io.netty.handler.codec.Delimiters;
|
||||
@ -135,7 +135,7 @@ public class SocketStringEchoTest extends AbstractSocketTest {
|
||||
}
|
||||
}
|
||||
|
||||
static class StringEchoHandler extends ChannelInboundConsumingHandler<String> {
|
||||
static class StringEchoHandler extends SimpleChannelInboundHandler<String> {
|
||||
volatile Channel channel;
|
||||
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||
volatile int counter;
|
||||
@ -146,7 +146,7 @@ public class SocketStringEchoTest extends AbstractSocketTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, String msg) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
|
||||
assertEquals(data[counter], msg);
|
||||
|
||||
if (channel.parent() != null) {
|
||||
|
@ -19,11 +19,11 @@ import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.MessageList;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.group.ChannelGroup;
|
||||
import io.netty.channel.group.DefaultChannelGroup;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
@ -139,7 +139,7 @@ public class UDTClientServerConnectionTest {
|
||||
}
|
||||
}
|
||||
|
||||
static class ClientHandler extends ChannelInboundConsumingHandler<Object> {
|
||||
static class ClientHandler extends SimpleChannelInboundHandler<Object> {
|
||||
|
||||
static final Logger log = LoggerFactory.getLogger(ClientHandler.class);
|
||||
|
||||
@ -169,7 +169,7 @@ public class UDTClientServerConnectionTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
log.info("Client received: " + msg);
|
||||
}
|
||||
}
|
||||
|
@ -19,12 +19,12 @@ import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelConfig;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.ServerChannel;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.util.AttributeKey;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
@ -217,7 +217,7 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
|
||||
return new Entry[size];
|
||||
}
|
||||
|
||||
private static class ServerBootstrapAcceptor extends ChannelInboundConsumingHandler<Channel> {
|
||||
private static class ServerBootstrapAcceptor extends SimpleChannelInboundHandler<Channel> {
|
||||
|
||||
private final EventLoopGroup childGroup;
|
||||
private final ChannelHandler childHandler;
|
||||
@ -236,7 +236,7 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void consume(ChannelHandlerContext ctx, Channel child) {
|
||||
public void messageReceived(ChannelHandlerContext ctx, Channel child) {
|
||||
child.pipeline().addLast(childHandler);
|
||||
|
||||
for (Entry<ChannelOption<?>, Object> e: childOptions) {
|
||||
|
@ -1,85 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel;
|
||||
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.ReferenceCounted;
|
||||
|
||||
|
||||
/**
|
||||
* Abstract base class for {@link ChannelInboundHandler} that would like to consume messages. This means they will
|
||||
* actually handle them and not pass them to the next handler in the {@link ChannelPipeline}.
|
||||
*
|
||||
* If you need to pass them throught the {@link ChannelPipeline} use {@link ChannelInboundHandlerAdapter}.
|
||||
*/
|
||||
public abstract class ChannelInboundConsumingHandler<I> extends ChannelInboundHandlerAdapter {
|
||||
|
||||
@Override
|
||||
public final void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
|
||||
try {
|
||||
beginConsume(ctx);
|
||||
MessageList<I> cast = msgs.cast();
|
||||
int size = cast.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
consume(ctx, cast.get(i));
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
msgs.releaseAllAndRecycle();
|
||||
} finally {
|
||||
endConsume(ctx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Is called before consume of messages start.
|
||||
*
|
||||
* @param ctx The {@link ChannelHandlerContext} which is bound to this
|
||||
* {@link ChannelInboundConsumingHandler}
|
||||
*/
|
||||
protected void beginConsume(ChannelHandlerContext ctx) {
|
||||
// NOOP
|
||||
}
|
||||
|
||||
/**
|
||||
* Is called after consume of messages ends.
|
||||
*
|
||||
* @param ctx The {@link ChannelHandlerContext} which is bound to this
|
||||
* {@link ChannelInboundConsumingHandler}
|
||||
*/
|
||||
protected void endConsume(ChannelHandlerContext ctx) {
|
||||
// NOOP
|
||||
}
|
||||
|
||||
/**
|
||||
* Consume the message. After this method was executed for all of the messages in the {@link MessageList}
|
||||
* {@link MessageList#releaseAllAndRecycle()} is called and so the {@link MessageList} is recycled and
|
||||
* {@link ReferenceCounted#release()} is called on all messages that implement {@link ReferenceCounted}.
|
||||
*
|
||||
* Be aware that because of this you must not hold a reference to a message or to the {@link MessageList} after
|
||||
* this method returns. If you really need to hold a reference to a message, use
|
||||
* {@link ReferenceCountUtil#retain(Object)} on it to increment the reference count and so make sure its not
|
||||
* released.
|
||||
*
|
||||
*
|
||||
* @param ctx The {@link ChannelHandlerContext} which is bound to this
|
||||
* {@link ChannelInboundConsumingHandler}
|
||||
* @param msg The mesage to consume and handle
|
||||
* @throws Exception thrown if an error accours
|
||||
*/
|
||||
protected abstract void consume(ChannelHandlerContext ctx, I msg) throws Exception;
|
||||
}
|
@ -27,7 +27,7 @@ package io.netty.channel;
|
||||
* Be aware that messages are not released after the {@link #messageReceived(ChannelHandlerContext, MessageList)}
|
||||
* method returns automatically. This is done for make it as flexible as possible and get the most out of
|
||||
* performance. Because of this you need to explicit call {@link MessageList#releaseAllAndRecycle()} if you
|
||||
* consumed all the messages. Because this is such a common need {@link ChannelInboundConsumingHandler} is provided ,
|
||||
* consumed all the messages. Because this is such a common need {@link SimpleChannelInboundHandler} is provided ,
|
||||
* which will automatically release messages and the {@link MessageList} after processing is done.
|
||||
* </p>
|
||||
*/
|
||||
|
@ -37,14 +37,27 @@ import io.netty.util.internal.TypeParameterMatcher;
|
||||
*/
|
||||
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {
|
||||
|
||||
private static final Object UNRELEASABLE = new Object();
|
||||
|
||||
private final TypeParameterMatcher matcher;
|
||||
private final boolean autoRelease;
|
||||
|
||||
protected SimpleChannelInboundHandler() {
|
||||
this(true);
|
||||
}
|
||||
|
||||
protected SimpleChannelInboundHandler(boolean autoRelease) {
|
||||
matcher = TypeParameterMatcher.find(this, SimpleChannelInboundHandler.class, "I");
|
||||
this.autoRelease = autoRelease;
|
||||
}
|
||||
|
||||
protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType) {
|
||||
this(inboundMessageType, true);
|
||||
}
|
||||
|
||||
protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType, boolean autoRelease) {
|
||||
matcher = TypeParameterMatcher.get(inboundMessageType);
|
||||
this.autoRelease = autoRelease;
|
||||
}
|
||||
|
||||
public boolean acceptInboundMessage(Object msg) throws Exception {
|
||||
@ -56,6 +69,8 @@ public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandl
|
||||
MessageList<Object> unaccepted = MessageList.newInstance();
|
||||
int size = msgs.size();
|
||||
try {
|
||||
beginMessageReceived(ctx);
|
||||
|
||||
for (int i = 0; i < size; i++) {
|
||||
Object msg = msgs.get(i);
|
||||
if (!ctx.isRemoved() && acceptInboundMessage(msg)) {
|
||||
@ -68,12 +83,26 @@ public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandl
|
||||
I imsg = (I) msg;
|
||||
messageReceived(ctx, imsg);
|
||||
} else {
|
||||
if (autoRelease) {
|
||||
msgs.set(i, UNRELEASABLE); // Prevent the message added to 'unaccepted' from being released.
|
||||
}
|
||||
unaccepted.add(msg);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
msgs.recycle();
|
||||
ctx.fireMessageReceived(unaccepted);
|
||||
try {
|
||||
if (autoRelease) {
|
||||
msgs.releaseAllAndRecycle();
|
||||
} else {
|
||||
msgs.recycle();
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
endMessageReceived(ctx);
|
||||
} finally {
|
||||
ctx.fireMessageReceived(unaccepted);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -86,4 +115,26 @@ public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandl
|
||||
* @throws Exception is thrown if an error accour
|
||||
*/
|
||||
protected abstract void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception;
|
||||
|
||||
/**
|
||||
* Is called before the first {@link #messageReceived(ChannelHandlerContext, Object)} of the current
|
||||
* {@link MessageList} is handled.
|
||||
*
|
||||
* @param ctx the {@link ChannelHandlerContext} which is bound to this handler
|
||||
*/
|
||||
@SuppressWarnings("UnusedParameters")
|
||||
protected void beginMessageReceived(ChannelHandlerContext ctx) throws Exception {
|
||||
// NOOP
|
||||
}
|
||||
|
||||
/**
|
||||
* Is called after the last {@link #messageReceived(ChannelHandlerContext, Object)} of the current
|
||||
* {@link MessageList} is handled.
|
||||
*
|
||||
* @param ctx the {@link ChannelHandlerContext} which is bound to this handler
|
||||
*/
|
||||
@SuppressWarnings("UnusedParameters")
|
||||
protected void endMessageReceived(ChannelHandlerContext ctx) throws Exception {
|
||||
// NOOP
|
||||
}
|
||||
}
|
||||
|
@ -20,9 +20,9 @@ import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.AbstractChannel;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundConsumingHandler;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
import org.junit.Test;
|
||||
@ -140,9 +140,9 @@ public class LocalChannelTest {
|
||||
clientGroup.terminationFuture().sync();
|
||||
}
|
||||
|
||||
static class TestHandler extends ChannelInboundConsumingHandler<Object> {
|
||||
static class TestHandler extends SimpleChannelInboundHandler<Object> {
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
logger.info(String.format("Received mesage: %s", msg));
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user