From da9ecadfc081b86a08bc3dd65db91065209676fd Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Thu, 10 May 2012 23:19:59 +0900 Subject: [PATCH] Introduce bypass buffer and use it in LoggingHandler - Added ChannelBufferHolders.(inbound|outbound)BypassBuffer() - The holder returned by these methods returns the next handler's buffer. When a handler's new(Inbound|Outbound)Buffer returns a bypass holder, your inboundBufferUpdated() and flush() implementation should check if the buffer is a bypass and should not modify the content of the buffer. - Channel(Inbound|Outbound)?HandlerAdapter is now abstract. - A user has to specify the exact inbound/outbound buffer type - It's because there's no way to determine the best buffer type - Implemented LoggingHandler using the new API. - It doesn't dump received or sent messages yet. - Fixed a bug where DefaultUnsafe.close() does not trigger deregister() - Fixed a bug where NioSocketChannel.isActive() does not return false when closed --- .../io/netty/example/echo/EchoServer.java | 4 + .../netty/handler/logging/LoggingHandler.java | 179 ++++++++++++++---- .../io/netty/channel/AbstractChannel.java | 12 +- .../io/netty/channel/ChannelBufferHolder.java | 92 +++++++-- .../netty/channel/ChannelBufferHolders.java | 8 + .../netty/channel/ChannelHandlerAdapter.java | 50 +---- .../channel/ChannelInboundHandlerAdapter.java | 17 +- .../ChannelOutboundHandlerAdapter.java | 16 +- .../channel/socket/nio/NioSocketChannel.java | 4 +- 9 files changed, 265 insertions(+), 117 deletions(-) diff --git a/example/src/main/java/io/netty/example/echo/EchoServer.java b/example/src/main/java/io/netty/example/echo/EchoServer.java index 2c597fede5..264fb16989 100644 --- a/example/src/main/java/io/netty/example/echo/EchoServer.java +++ b/example/src/main/java/io/netty/example/echo/EchoServer.java @@ -27,6 +27,8 @@ import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.SelectorEventLoop; +import io.netty.handler.logging.LoggingHandler; +import io.netty.logging.InternalLogLevel; import java.net.InetSocketAddress; import java.util.ArrayDeque; @@ -47,6 +49,7 @@ public class EchoServer { // Configure the server. final EventLoop loop = new MultithreadEventLoop(SelectorEventLoop.class); ServerSocketChannel ssc = new NioServerSocketChannel(); + ssc.pipeline().addLast("logger", new LoggingHandler(InternalLogLevel.INFO)); ssc.pipeline().addLast("acceptor", new ChannelInboundHandlerAdapter() { @Override @@ -66,6 +69,7 @@ public class EchoServer { if (s == null) { break; } + s.pipeline().addLast("logger", new LoggingHandler(InternalLogLevel.INFO)); s.pipeline().addLast("echoer", new ChannelInboundHandlerAdapter() { @Override public ChannelBufferHolder newInboundBuffer(ChannelInboundHandlerContext ctx) { diff --git a/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java b/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java index 885ea7e0fe..9f9b4060f2 100644 --- a/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java +++ b/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java @@ -15,21 +15,21 @@ */ package io.netty.handler.logging; -import static io.netty.buffer.ChannelBuffers.*; - -import io.netty.buffer.ChannelBuffer; -import io.netty.channel.ChannelDownstreamHandler; -import io.netty.channel.ChannelEvent; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelBufferHolders; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelUpstreamHandler; -import io.netty.channel.ExceptionEvent; -import io.netty.channel.MessageEvent; import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerContext; +import io.netty.channel.ChannelOutboundHandlerContext; import io.netty.logging.InternalLogLevel; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; +import java.net.SocketAddress; + /** * A {@link ChannelHandler} that logs all events via {@link InternalLogger}. * By default, all events are logged at DEBUG level. You can extend @@ -38,7 +38,7 @@ import io.netty.logging.InternalLoggerFactory; * @apiviz.landmark */ @Sharable -public class LoggingHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler { +public class LoggingHandler extends ChannelHandlerAdapter { private static final InternalLogLevel DEFAULT_LEVEL = InternalLogLevel.DEBUG; @@ -192,45 +192,144 @@ public class LoggingHandler implements ChannelUpstreamHandler, ChannelDownstream return level; } - /** - * Logs the specified event to the {@link InternalLogger} returned by - * {@link #getLogger()}. If hex dump has been enabled for this handler, - * the hex dump of the {@link ChannelBuffer} in a {@link MessageEvent} will - * be logged together. - */ - public void log(ChannelEvent e) { + protected String message(ChannelHandlerContext ctx, String message) { + String chStr = ctx.channel().toString(); + StringBuilder buf = new StringBuilder(chStr.length() + message.length() + 1); + buf.append(chStr); + buf.append(' '); + buf.append(message); + return buf.toString(); + } + + @Override + public ChannelBufferHolder newOutboundBuffer( + ChannelOutboundHandlerContext ctx) throws Exception { + return ChannelBufferHolders.outboundBypassBuffer(ctx); + } + + @Override + public ChannelBufferHolder newInboundBuffer( + ChannelInboundHandlerContext ctx) throws Exception { + return ChannelBufferHolders.inboundBypassBuffer(ctx); + } + + @Override + public void channelRegistered(ChannelInboundHandlerContext ctx) + throws Exception { if (getLogger().isEnabled(level)) { - String msg = e.toString(); - - // Append hex dump if necessary. - if (hexDump && e instanceof MessageEvent) { - MessageEvent me = (MessageEvent) e; - if (me.getMessage() instanceof ChannelBuffer) { - ChannelBuffer buf = (ChannelBuffer) me.getMessage(); - msg = msg + " - (HEXDUMP: " + hexDump(buf) + ')'; - } - } - - // Log the message (and exception if available.) - if (e instanceof ExceptionEvent) { - getLogger().log(level, msg, ((ExceptionEvent) e).cause()); - } else { - getLogger().log(level, msg); - } + logger.log(level, message(ctx, "REGISTERED")); } + super.channelRegistered(ctx); } @Override - public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) + public void channelUnregistered(ChannelInboundHandlerContext ctx) throws Exception { - log(e); - ctx.sendUpstream(e); + if (getLogger().isEnabled(level)) { + logger.log(level, message(ctx, "UNREGISTERED")); + } + super.channelUnregistered(ctx); } @Override - public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) + public void channelActive(ChannelInboundHandlerContext ctx) throws Exception { - log(e); - ctx.sendDownstream(e); + if (getLogger().isEnabled(level)) { + logger.log(level, message(ctx, "ACTIVE")); + } + super.channelActive(ctx); } + + @Override + public void channelInactive(ChannelInboundHandlerContext ctx) + throws Exception { + if (getLogger().isEnabled(level)) { + logger.log(level, message(ctx, "INACTIVE")); + } + super.channelInactive(ctx); + } + + @Override + public void exceptionCaught(ChannelInboundHandlerContext ctx, + Throwable cause) throws Exception { + if (getLogger().isEnabled(level)) { + logger.log(level, message(ctx, String.format("EXCEPTION: %s", cause)), cause); + } + super.exceptionCaught(ctx, cause); + } + + @Override + public void userEventTriggered(ChannelInboundHandlerContext ctx, + Object evt) throws Exception { + if (getLogger().isEnabled(level)) { + logger.log(level, message(ctx, String.format("USER_EVENT: %s", evt))); + } + super.userEventTriggered(ctx, evt); + } + + @Override + public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) + throws Exception { + if (getLogger().isEnabled(level)) { + logger.log(level, message(ctx, "INBOUND_UPDATED")); + } + // TODO Auto-generated method stub + super.inboundBufferUpdated(ctx); + } + + @Override + public void bind(ChannelOutboundHandlerContext ctx, + SocketAddress localAddress, ChannelFuture future) throws Exception { + if (getLogger().isEnabled(level)) { + logger.log(level, message(ctx, String.format("bind(%s)", localAddress))); + } + super.bind(ctx, localAddress, future); + } + + @Override + public void connect(ChannelOutboundHandlerContext ctx, + SocketAddress remoteAddress, SocketAddress localAddress, + ChannelFuture future) throws Exception { + if (getLogger().isEnabled(level)) { + logger.log(level, message(ctx, String.format("connect(%s, %s)", remoteAddress, localAddress))); + } + super.connect(ctx, remoteAddress, localAddress, future); + } + + @Override + public void disconnect(ChannelOutboundHandlerContext ctx, + ChannelFuture future) throws Exception { + if (getLogger().isEnabled(level)) { + logger.log(level, message(ctx, "disconnect()")); + } + super.disconnect(ctx, future); + } + + @Override + public void close(ChannelOutboundHandlerContext ctx, + ChannelFuture future) throws Exception { + if (getLogger().isEnabled(level)) { + logger.log(level, message(ctx, "close()")); + } + super.close(ctx, future); + } + + @Override + public void deregister(ChannelOutboundHandlerContext ctx, + ChannelFuture future) throws Exception { + if (getLogger().isEnabled(level)) { + logger.log(level, message(ctx, "deregister()")); + } + super.deregister(ctx, future); + } + + @Override + public void flush(ChannelOutboundHandlerContext ctx, + ChannelFuture future) throws Exception { + if (getLogger().isEnabled(level)) { + logger.log(level, message(ctx, "flush()")); + } + super.flush(ctx, future); + } + } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 135dbe8206..d29e619f62 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -486,6 +486,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha registered = true; future.setSuccess(); pipeline().fireChannelRegistered(); + if (isActive()) { + pipeline().fireChannelActive(); + } } catch (Throwable t) { // Close the channel directly to avoid FD leak. try { @@ -615,10 +618,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } catch (Throwable t) { future.setFailure(t); } + + notifyClosureListeners(); if (wasActive && !isActive()) { pipeline().fireChannelInactive(); } - notifyClosureListeners(); + + deregister(newVoidFuture()); } else { // Closed already. future.setSuccess(); @@ -683,12 +689,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } if (closed) { - close(newFuture()); + close(newVoidFuture()); } } catch (Throwable t) { pipeline().fireExceptionCaught(t); if (t instanceof IOException) { - close(newFuture()); + close(newVoidFuture()); } } } diff --git a/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java b/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java index 6ce9eb3fd3..0142d963d4 100644 --- a/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java +++ b/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java @@ -6,13 +6,28 @@ import java.util.Queue; public final class ChannelBufferHolder { + private final ChannelHandlerContext ctx; + /** 0 - not a bypass, 1 - inbound bypass, 2 - outbound bypass */ + private final int bypassDirection; private final Queue msgBuf; private final ChannelBuffer byteBuf; + ChannelBufferHolder(ChannelHandlerContext ctx, boolean inbound) { + if (ctx == null) { + throw new NullPointerException("ctx"); + } + this.ctx = ctx; + bypassDirection = inbound? 1 : 2; + msgBuf = null; + byteBuf = null; + } + ChannelBufferHolder(Queue msgBuf) { if (msgBuf == null) { throw new NullPointerException("msgBuf"); } + ctx = null; + bypassDirection = 0; this.msgBuf = msgBuf; byteBuf = null; @@ -22,38 +37,91 @@ public final class ChannelBufferHolder { if (byteBuf == null) { throw new NullPointerException("byteBuf"); } + ctx = null; + bypassDirection = 0; msgBuf = null; this.byteBuf = byteBuf; } + public boolean isBypass() { + return bypassDirection != 0; + } + public boolean hasMessageBuffer() { - return msgBuf != null; + switch (bypassDirection) { + case 0: + return msgBuf != null; + case 1: + return ctx.nextIn().hasMessageBuffer(); + case 2: + return ctx.out().hasMessageBuffer(); + default: + throw new Error(); + } } public boolean hasByteBuffer() { - return byteBuf != null; + switch (bypassDirection) { + case 0: + return byteBuf != null; + case 1: + return ctx.nextIn().hasByteBuffer(); + case 2: + return ctx.out().hasByteBuffer(); + default: + throw new Error(); + } } + @SuppressWarnings("unchecked") public Queue messageBuffer() { - if (!hasMessageBuffer()) { - throw new IllegalStateException("does not have a message buffer"); + switch (bypassDirection) { + case 0: + if (!hasMessageBuffer()) { + throw new IllegalStateException("does not have a message buffer"); + } + return msgBuf; + case 1: + return (Queue) ctx.nextIn().messageBuffer(); + case 2: + return (Queue) ctx.out().messageBuffer(); + default: + throw new Error(); } - return msgBuf; } public ChannelBuffer byteBuffer() { - if (!hasByteBuffer()) { - throw new IllegalStateException("does not have a byte buffer"); + switch (bypassDirection) { + case 0: + if (!hasByteBuffer()) { + throw new IllegalStateException("does not have a byte buffer"); + } + return byteBuf; + case 1: + return ctx.nextIn().byteBuffer(); + case 2: + return ctx.out().byteBuffer(); + default: + throw new Error(); } - return byteBuf; } @Override public String toString() { - if (hasMessageBuffer()) { - return messageBuffer().toString(); - } else { - return byteBuffer().toString(); + switch (bypassDirection) { + case 0: + if (hasMessageBuffer()) { + return messageBuffer().toString(); + } else { + return byteBuffer().toString(); + } + case 1: + return ctx.nextIn().toString(); + case 2: + return ctx.out().toString(); + default: + throw new Error(); } + } } diff --git a/transport/src/main/java/io/netty/channel/ChannelBufferHolders.java b/transport/src/main/java/io/netty/channel/ChannelBufferHolders.java index 2ce45cd160..d94c9988f4 100644 --- a/transport/src/main/java/io/netty/channel/ChannelBufferHolders.java +++ b/transport/src/main/java/io/netty/channel/ChannelBufferHolders.java @@ -14,6 +14,14 @@ public final class ChannelBufferHolders { return new ChannelBufferHolder(buffer); } + public static ChannelBufferHolder inboundBypassBuffer(ChannelHandlerContext ctx) { + return new ChannelBufferHolder(ctx, true); + } + + public static ChannelBufferHolder outboundBypassBuffer(ChannelHandlerContext ctx) { + return new ChannelBufferHolder(ctx, false); + } + private ChannelBufferHolders() { // Utility class } diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java index 8d222cf258..cd5d01b260 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java @@ -1,12 +1,8 @@ package io.netty.channel; -import io.netty.buffer.ChannelBuffer; - import java.net.SocketAddress; -import java.util.ArrayDeque; -import java.util.Queue; -public class ChannelHandlerAdapter implements ChannelInboundHandler, ChannelOutboundHandler { +public abstract class ChannelHandlerAdapter implements ChannelInboundHandler, ChannelOutboundHandler { @Override public void beforeAdd(ChannelHandlerContext ctx) throws Exception { // Do nothing by default. @@ -57,29 +53,9 @@ public class ChannelHandlerAdapter implements ChannelInboundHandler, Ch ctx.fireUserEventTriggered(evt); } - @Override - public ChannelBufferHolder newInboundBuffer(ChannelInboundHandlerContext ctx) throws Exception { - return ChannelBufferHolders.messageBuffer(new ArrayDeque()); - } - @Override public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { - if (ctx.in().hasMessageBuffer()) { - Queue in = ctx.in().messageBuffer(); - Queue nextIn = ctx.nextIn().messageBuffer(); - for (;;) { - I msg = in.poll(); - if (msg == null) { - break; - } - nextIn.add(msg); - } - } else { - ChannelBuffer in = ctx.in().byteBuffer(); - ChannelBuffer nextIn = ctx.nextIn().byteBuffer(); - nextIn.writeBytes(in); - } - ctx.fireInboundBufferUpdated(); + ChannelInboundHandlerAdapter.inboundBufferUpdated0(ctx); } @Override @@ -107,28 +83,8 @@ public class ChannelHandlerAdapter implements ChannelInboundHandler, Ch ctx.deregister(future); } - @Override - public ChannelBufferHolder newOutboundBuffer(ChannelOutboundHandlerContext ctx) throws Exception { - return ChannelBufferHolders.messageBuffer(new ArrayDeque()); - } - @Override public void flush(ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { - if (ctx.prevOut().hasMessageBuffer()) { - Queue out = ctx.prevOut().messageBuffer(); - Queue nextOut = ctx.out().messageBuffer(); - for (;;) { - O msg = out.poll(); - if (msg == null) { - break; - } - nextOut.add(msg); - } - } else { - ChannelBuffer out = ctx.prevOut().byteBuffer(); - ChannelBuffer nextOut = ctx.out().byteBuffer(); - nextOut.writeBytes(out); - } - ctx.flush(future); + ChannelOutboundHandlerAdapter.flush0(ctx, future); } } diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java index 6d7830a11e..a95eb9e8ca 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java @@ -2,10 +2,9 @@ package io.netty.channel; import io.netty.buffer.ChannelBuffer; -import java.util.ArrayDeque; import java.util.Queue; -public class ChannelInboundHandlerAdapter implements ChannelInboundHandler { +public abstract class ChannelInboundHandlerAdapter implements ChannelInboundHandler { @Override public void beforeAdd(ChannelHandlerContext ctx) throws Exception { // Do nothing by default. @@ -57,12 +56,16 @@ public class ChannelInboundHandlerAdapter implements ChannelInboundHandler } @Override - public ChannelBufferHolder newInboundBuffer(ChannelInboundHandlerContext ctx) throws Exception { - return ChannelBufferHolders.messageBuffer(new ArrayDeque()); + public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { + inboundBufferUpdated0(ctx); } - @Override - public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { + static void inboundBufferUpdated0(ChannelInboundHandlerContext ctx) { + if (ctx.in().isBypass()) { + ctx.fireInboundBufferUpdated(); + return; + } + if (ctx.in().hasMessageBuffer()) { Queue in = ctx.in().messageBuffer(); Queue nextIn = ctx.nextIn().messageBuffer(); @@ -77,8 +80,8 @@ public class ChannelInboundHandlerAdapter implements ChannelInboundHandler ChannelBuffer in = ctx.in().byteBuffer(); ChannelBuffer nextIn = ctx.nextIn().byteBuffer(); nextIn.writeBytes(in); + in.discardReadBytes(); } ctx.fireInboundBufferUpdated(); } - } diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java index 2a7a8d78a3..794c6e45ab 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java @@ -3,10 +3,9 @@ package io.netty.channel; import io.netty.buffer.ChannelBuffer; import java.net.SocketAddress; -import java.util.ArrayDeque; import java.util.Queue; -public class ChannelOutboundHandlerAdapter implements ChannelOutboundHandler { +public abstract class ChannelOutboundHandlerAdapter implements ChannelOutboundHandler { @Override public void beforeAdd(ChannelHandlerContext ctx) throws Exception { // Do nothing by default. @@ -53,12 +52,16 @@ public class ChannelOutboundHandlerAdapter implements ChannelOutboundHandler< } @Override - public ChannelBufferHolder newOutboundBuffer(ChannelOutboundHandlerContext ctx) throws Exception { - return ChannelBufferHolders.messageBuffer(new ArrayDeque()); + public void flush(ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { + flush0(ctx, future); } - @Override - public void flush(ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { + static void flush0(ChannelOutboundHandlerContext ctx, ChannelFuture future) { + if (ctx.prevOut().isBypass()) { + ctx.flush(future); + return; + } + if (ctx.prevOut().hasMessageBuffer()) { Queue out = ctx.prevOut().messageBuffer(); Queue nextOut = ctx.out().messageBuffer(); @@ -73,6 +76,7 @@ public class ChannelOutboundHandlerAdapter implements ChannelOutboundHandler< ChannelBuffer out = ctx.prevOut().byteBuffer(); ChannelBuffer nextOut = ctx.out().byteBuffer(); nextOut.writeBytes(out); + out.discardReadBytes(); } ctx.flush(future); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index 8d8ff1d8a0..f6130d4d00 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -82,10 +82,10 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha @Override public boolean isActive() { - return javaChannel().isConnected(); + SocketChannel ch = javaChannel(); + return ch.isOpen() && ch.isConnected(); } - @Override @SuppressWarnings("unchecked") protected ChannelBufferHolder firstOut() {