From ea0c9cfe79e07a5e11b30e8d5a8734ee98957b62 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Thu, 7 Jun 2012 16:56:21 +0900 Subject: [PATCH] Post-overhaul fixes / Split LoggingHandler into three - LoggingHandler now only logs state and operations - StreamLoggingHandler and MessageLoggingHandler log the buffer content - Added ChannelOperationHandlerAdapter - Used by WriteTimeoutHandler --- .../codec/spdy/SpdySessionHandler.java | 6 +- .../handler/codec/MessageToMessageCodec.java | 5 +- .../handler/codec/StreamToMessageCodec.java | 5 +- .../handler/codec/StreamToStreamCodec.java | 6 +- .../netty/handler/logging/LoggingHandler.java | 207 ++---------------- .../logging/MessageLoggingHandler.java | 93 ++++++++ .../handler/logging/StreamLoggingHandler.java | 183 ++++++++++++++++ .../handler/stream/ChunkedWriteHandler.java | 19 +- .../handler/timeout/IdleStateHandler.java | 14 +- .../handler/timeout/ReadTimeoutHandler.java | 12 +- .../handler/timeout/WriteTimeoutHandler.java | 11 +- .../netty/channel/ChannelHandlerAdapter.java | 14 +- .../netty/channel/ChannelInboundHandler.java | 3 +- .../channel/ChannelOperationHandler.java | 1 + .../ChannelOperationHandlerAdapter.java | 74 +++++++ .../netty/channel/ChannelOutboundHandler.java | 3 +- .../ChannelOutboundHandlerAdapter.java | 30 +-- .../io/netty/channel/ChannelStateHandler.java | 2 + .../channel/ChannelStateHandlerAdapter.java | 9 + .../channel/DefaultChannelHandlerContext.java | 7 +- .../netty/channel/DefaultChannelPipeline.java | 6 +- 21 files changed, 422 insertions(+), 288 deletions(-) create mode 100644 handler/src/main/java/io/netty/handler/logging/MessageLoggingHandler.java create mode 100644 handler/src/main/java/io/netty/handler/logging/StreamLoggingHandler.java create mode 100644 transport/src/main/java/io/netty/channel/ChannelOperationHandlerAdapter.java diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java index 33c0dc52c0..1926a64999 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java @@ -21,6 +21,8 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandler; +import io.netty.channel.ChannelOutboundHandler; import java.nio.channels.ClosedChannelException; import java.util.Queue; @@ -29,7 +31,9 @@ import java.util.concurrent.atomic.AtomicInteger; /** * Manages streams within a SPDY session. */ -public class SpdySessionHandler extends ChannelHandlerAdapter { +public class SpdySessionHandler + extends ChannelHandlerAdapter + implements ChannelInboundHandler, ChannelOutboundHandler { private static final SpdyProtocolException PROTOCOL_EXCEPTION = new SpdyProtocolException(); private static final SpdyProtocolException STREAM_CLOSED = new SpdyProtocolException("Stream closed"); diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java index 5851154582..f7bb62efa4 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java @@ -19,9 +19,12 @@ import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandler; +import io.netty.channel.ChannelOutboundHandler; public abstract class MessageToMessageCodec - extends ChannelHandlerAdapter { + extends ChannelHandlerAdapter + implements ChannelInboundHandler, ChannelOutboundHandler { private final MessageToMessageEncoder encoder = new MessageToMessageEncoder() { diff --git a/codec/src/main/java/io/netty/handler/codec/StreamToMessageCodec.java b/codec/src/main/java/io/netty/handler/codec/StreamToMessageCodec.java index 7f14686b21..f1cecaaf13 100644 --- a/codec/src/main/java/io/netty/handler/codec/StreamToMessageCodec.java +++ b/codec/src/main/java/io/netty/handler/codec/StreamToMessageCodec.java @@ -20,9 +20,12 @@ import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandler; +import io.netty.channel.ChannelOutboundHandler; public abstract class StreamToMessageCodec - extends ChannelHandlerAdapter { + extends ChannelHandlerAdapter + implements ChannelInboundHandler, ChannelOutboundHandler { private final MessageToStreamEncoder encoder = new MessageToStreamEncoder() { diff --git a/codec/src/main/java/io/netty/handler/codec/StreamToStreamCodec.java b/codec/src/main/java/io/netty/handler/codec/StreamToStreamCodec.java index e74dc32586..30f6224ea1 100644 --- a/codec/src/main/java/io/netty/handler/codec/StreamToStreamCodec.java +++ b/codec/src/main/java/io/netty/handler/codec/StreamToStreamCodec.java @@ -20,8 +20,12 @@ import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandler; +import io.netty.channel.ChannelOutboundHandler; -public abstract class StreamToStreamCodec extends ChannelHandlerAdapter { +public abstract class StreamToStreamCodec + extends ChannelHandlerAdapter + implements ChannelInboundHandler, ChannelOutboundHandler { private final StreamToStreamEncoder encoder = new StreamToStreamEncoder() { @Override diff --git a/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java b/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java index fe422f6217..4d84250b81 100644 --- a/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java +++ b/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java @@ -15,9 +15,6 @@ */ package io.netty.handler.logging; -import io.netty.buffer.ChannelBuffer; -import io.netty.channel.ChannelBufferHolder; -import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler.Sharable; @@ -28,7 +25,6 @@ import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import java.net.SocketAddress; -import java.util.Queue; /** * A {@link ChannelHandler} that logs all events via {@link InternalLogger}. @@ -38,72 +34,14 @@ import java.util.Queue; * @apiviz.landmark */ @Sharable -public class LoggingHandler extends ChannelHandlerAdapter { +public class LoggingHandler extends ChannelHandlerAdapter { private static final LogLevel DEFAULT_LEVEL = LogLevel.DEBUG; - private static final String NEWLINE = String.format("%n"); - private static final String[] BYTE2HEX = new String[256]; - private static final String[] HEXPADDING = new String[16]; - private static final String[] BYTEPADDING = new String[16]; - private static final char[] BYTE2CHAR = new char[256]; + protected final InternalLogger logger; + protected final InternalLogLevel internalLevel; - static { - int i; - - // Generate the lookup table for byte-to-hex-dump conversion - for (i = 0; i < 10; i ++) { - StringBuilder buf = new StringBuilder(3); - buf.append(" 0"); - buf.append(i); - BYTE2HEX[i] = buf.toString(); - } - for (; i < 16; i ++) { - StringBuilder buf = new StringBuilder(3); - buf.append(" 0"); - buf.append((char) ('a' + i - 10)); - BYTE2HEX[i] = buf.toString(); - } - for (; i < BYTE2HEX.length; i ++) { - StringBuilder buf = new StringBuilder(3); - buf.append(' '); - buf.append(Integer.toHexString(i)); - BYTE2HEX[i] = buf.toString(); - } - - // Generate the lookup table for hex dump paddings - for (i = 0; i < HEXPADDING.length; i ++) { - int padding = HEXPADDING.length - i; - StringBuilder buf = new StringBuilder(padding * 3); - for (int j = 0; j < padding; j ++) { - buf.append(" "); - } - HEXPADDING[i] = buf.toString(); - } - - // Generate the lookup table for byte dump paddings - for (i = 0; i < BYTEPADDING.length; i ++) { - int padding = BYTEPADDING.length - i; - StringBuilder buf = new StringBuilder(padding); - for (int j = 0; j < padding; j ++) { - buf.append(' '); - } - BYTEPADDING[i] = buf.toString(); - } - - // Generate the lookup table for byte-to-char conversion - for (i = 0; i < BYTE2CHAR.length; i ++) { - if (i <= 0x1f || i >= 0x7f) { - BYTE2CHAR[i] = '.'; - } else { - BYTE2CHAR[i] = (char) i; - } - } - } - - private final InternalLogger logger; private final LogLevel level; - private final InternalLogLevel internalLevel; /** * Creates a new instance whose logger name is the fully qualified class @@ -178,19 +116,11 @@ public class LoggingHandler extends ChannelHandlerAdapter { internalLevel = level.toInternalLevel(); } - /** - * Returns the {@link InternalLogger} that this handler uses to log - * a {@link ChannelEvent}. - */ - public InternalLogger getLogger() { - return logger; - } - /** * Returns the {@link InternalLogLevel} that this handler uses to log * a {@link ChannelEvent}. */ - public LogLevel getLevel() { + public LogLevel level() { return level; } @@ -203,99 +133,10 @@ public class LoggingHandler extends ChannelHandlerAdapter { return buf.toString(); } - protected String formatBuffer(String bufName, ChannelBufferHolder holder) { - String content; - int size; - String elemType; - if (holder.hasByteBuffer()) { - ChannelBuffer buf = holder.byteBuffer(); - size = buf.readableBytes(); - elemType = "Byte"; - content = hexdump(buf); - } else { - Queue buf = holder.messageBuffer(); - content = buf.toString(); - size = buf.size(); - elemType = "Object"; - } - - StringBuilder buf = new StringBuilder(bufName.length() + elemType.length() + content.length() + 16); - buf.append(bufName); - buf.append('['); - buf.append(elemType); - buf.append("]("); - buf.append(size); - buf.append("): "); - buf.append(content); - return buf.toString(); - } - - private static String hexdump(ChannelBuffer buf) { - int length = buf.readableBytes(); - int rows = length / 16 + (length % 15 == 0? 0 : 1) + 4; - StringBuilder dump = new StringBuilder(rows * 80); - - dump.append( - NEWLINE + " +-------------------------------------------------+" + - NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" + - NEWLINE + "+--------+-------------------------------------------------+----------------+"); - - final int startIndex = buf.readerIndex(); - final int endIndex = buf.writerIndex(); - - int i; - for (i = startIndex; i < endIndex; i ++) { - int relIdx = i - startIndex; - int relIdxMod16 = relIdx & 15; - if (relIdxMod16 == 0) { - dump.append(NEWLINE); - dump.append(Long.toHexString(relIdx & 0xFFFFFFFFL | 0x100000000L)); - dump.setCharAt(dump.length() - 9, '|'); - dump.append('|'); - } - dump.append(BYTE2HEX[buf.getUnsignedByte(i)]); - if (relIdxMod16 == 15) { - dump.append(" |"); - for (int j = i - 15; j <= i; j ++) { - dump.append(BYTE2CHAR[buf.getUnsignedByte(j)]); - } - dump.append('|'); - } - } - - if ((i - startIndex & 15) != 0) { - int remainder = length & 15; - dump.append(HEXPADDING[remainder]); - dump.append(" |"); - for (int j = i - remainder; j < i; j ++) { - dump.append(BYTE2CHAR[buf.getUnsignedByte(j)]); - } - dump.append(BYTEPADDING[remainder]); - dump.append('|'); - } - - dump.append( - NEWLINE + "+--------+-------------------------------------------------+----------------+"); - - return dump.toString(); - } - - @Override - public ChannelBufferHolder newOutboundBuffer( - ChannelHandlerContext ctx) throws Exception { - return ChannelBufferHolders.outboundBypassBuffer(ctx); - } - - @Override - public ChannelBufferHolder newInboundBuffer( - ChannelHandlerContext ctx) throws Exception { - return ChannelBufferHolders.inboundBypassBuffer(ctx); - } - @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { - if (getLogger().isEnabled(internalLevel)) { + if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "REGISTERED")); } super.channelRegistered(ctx); @@ -304,7 +145,7 @@ public class LoggingHandler extends ChannelHandlerAdapter { @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { - if (getLogger().isEnabled(internalLevel)) { + if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "UNREGISTERED")); } super.channelUnregistered(ctx); @@ -313,7 +154,7 @@ public class LoggingHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - if (getLogger().isEnabled(internalLevel)) { + if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "ACTIVE")); } super.channelActive(ctx); @@ -322,7 +163,7 @@ public class LoggingHandler extends ChannelHandlerAdapter { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - if (getLogger().isEnabled(internalLevel)) { + if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "INACTIVE")); } super.channelInactive(ctx); @@ -331,7 +172,7 @@ public class LoggingHandler extends ChannelHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - if (getLogger().isEnabled(internalLevel)) { + if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "EXCEPTION: " + cause), cause); } super.exceptionCaught(ctx, cause); @@ -340,25 +181,16 @@ public class LoggingHandler extends ChannelHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (getLogger().isEnabled(internalLevel)) { + if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "USER_EVENT: " + evt)); } super.userEventTriggered(ctx, evt); } - @Override - public void inboundBufferUpdated(ChannelHandlerContext ctx) - throws Exception { - if (getLogger().isEnabled(internalLevel)) { - logger.log(internalLevel, format(ctx, formatBuffer("INBUF", ctx.inbound()))); - } - ctx.fireInboundBufferUpdated(); - } - @Override public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelFuture future) throws Exception { - if (getLogger().isEnabled(internalLevel)) { + if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "BIND(" + localAddress + ')')); } super.bind(ctx, localAddress, future); @@ -368,7 +200,7 @@ public class LoggingHandler extends ChannelHandlerAdapter { public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) throws Exception { - if (getLogger().isEnabled(internalLevel)) { + if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "CONNECT(" + remoteAddress + ", " + localAddress + ')')); } super.connect(ctx, remoteAddress, localAddress, future); @@ -377,7 +209,7 @@ public class LoggingHandler extends ChannelHandlerAdapter { @Override public void disconnect(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { - if (getLogger().isEnabled(internalLevel)) { + if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "DISCONNECT()")); } super.disconnect(ctx, future); @@ -386,7 +218,7 @@ public class LoggingHandler extends ChannelHandlerAdapter { @Override public void close(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { - if (getLogger().isEnabled(internalLevel)) { + if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "CLOSE()")); } super.close(ctx, future); @@ -395,18 +227,9 @@ public class LoggingHandler extends ChannelHandlerAdapter { @Override public void deregister(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { - if (getLogger().isEnabled(internalLevel)) { + if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "DEREGISTER()")); } super.deregister(ctx, future); } - - @Override - public void flush(ChannelHandlerContext ctx, - ChannelFuture future) throws Exception { - if (getLogger().isEnabled(internalLevel)) { - logger.log(internalLevel, format(ctx, formatBuffer("OUTBUF", ctx.outbound()))); - } - ctx.flush(future); - } } diff --git a/handler/src/main/java/io/netty/handler/logging/MessageLoggingHandler.java b/handler/src/main/java/io/netty/handler/logging/MessageLoggingHandler.java new file mode 100644 index 0000000000..68173597c0 --- /dev/null +++ b/handler/src/main/java/io/netty/handler/logging/MessageLoggingHandler.java @@ -0,0 +1,93 @@ +package io.netty.handler.logging; + +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelBufferHolders; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandler; +import io.netty.channel.ChannelOutboundHandler; + +import java.util.Queue; + +public class MessageLoggingHandler + extends LoggingHandler + implements ChannelInboundHandler, ChannelOutboundHandler { + + public MessageLoggingHandler() { + super(); + } + + public MessageLoggingHandler(Class clazz, LogLevel level) { + super(clazz, level); + } + + public MessageLoggingHandler(Class clazz) { + super(clazz); + } + + public MessageLoggingHandler(LogLevel level) { + super(level); + } + + public MessageLoggingHandler(String name, LogLevel level) { + super(name, level); + } + + public MessageLoggingHandler(String name) { + super(name); + } + @Override + public ChannelBufferHolder newOutboundBuffer(ChannelHandlerContext ctx) + throws Exception { + return ChannelBufferHolders.messageBuffer(); + } + + @Override + public ChannelBufferHolder newInboundBuffer(ChannelHandlerContext ctx) + throws Exception { + return ChannelBufferHolders.messageBuffer(); + } + + + @Override + public void inboundBufferUpdated(ChannelHandlerContext ctx) + throws Exception { + Queue buf = ctx.inboundMessageBuffer(); + if (logger.isEnabled(internalLevel)) { + logger.log(internalLevel, format(ctx, formatBuffer("RECEIVED", buf))); + } + + Queue out = ctx.nextInboundMessageBuffer(); + for (;;) { + Object o = buf.poll(); + if (o == null) { + break; + } + out.add(o); + } + ctx.fireInboundBufferUpdated(); + } + + @Override + public void flush(ChannelHandlerContext ctx, ChannelFuture future) + throws Exception { + Queue buf = ctx.outboundMessageBuffer(); + if (logger.isEnabled(internalLevel)) { + logger.log(internalLevel, format(ctx, formatBuffer("WRITE", buf))); + } + + Queue out = ctx.nextOutboundMessageBuffer(); + for (;;) { + Object o = buf.poll(); + if (o == null) { + break; + } + out.add(o); + } + ctx.flush(future); + } + + protected String formatBuffer(String message, Queue buf) { + return message + '(' + buf.size() + "): " + buf; + } +} diff --git a/handler/src/main/java/io/netty/handler/logging/StreamLoggingHandler.java b/handler/src/main/java/io/netty/handler/logging/StreamLoggingHandler.java new file mode 100644 index 0000000000..6391a54a68 --- /dev/null +++ b/handler/src/main/java/io/netty/handler/logging/StreamLoggingHandler.java @@ -0,0 +1,183 @@ +package io.netty.handler.logging; + +import io.netty.buffer.ChannelBuffer; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelBufferHolders; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandler; +import io.netty.channel.ChannelOutboundHandler; + +public class StreamLoggingHandler + extends LoggingHandler + implements ChannelInboundHandler, ChannelOutboundHandler { + + private static final String NEWLINE = String.format("%n"); + + private static final String[] BYTE2HEX = new String[256]; + private static final String[] HEXPADDING = new String[16]; + private static final String[] BYTEPADDING = new String[16]; + private static final char[] BYTE2CHAR = new char[256]; + + static { + int i; + + // Generate the lookup table for byte-to-hex-dump conversion + for (i = 0; i < 10; i ++) { + StringBuilder buf = new StringBuilder(3); + buf.append(" 0"); + buf.append(i); + BYTE2HEX[i] = buf.toString(); + } + for (; i < 16; i ++) { + StringBuilder buf = new StringBuilder(3); + buf.append(" 0"); + buf.append((char) ('a' + i - 10)); + BYTE2HEX[i] = buf.toString(); + } + for (; i < BYTE2HEX.length; i ++) { + StringBuilder buf = new StringBuilder(3); + buf.append(' '); + buf.append(Integer.toHexString(i)); + BYTE2HEX[i] = buf.toString(); + } + + // Generate the lookup table for hex dump paddings + for (i = 0; i < HEXPADDING.length; i ++) { + int padding = HEXPADDING.length - i; + StringBuilder buf = new StringBuilder(padding * 3); + for (int j = 0; j < padding; j ++) { + buf.append(" "); + } + HEXPADDING[i] = buf.toString(); + } + + // Generate the lookup table for byte dump paddings + for (i = 0; i < BYTEPADDING.length; i ++) { + int padding = BYTEPADDING.length - i; + StringBuilder buf = new StringBuilder(padding); + for (int j = 0; j < padding; j ++) { + buf.append(' '); + } + BYTEPADDING[i] = buf.toString(); + } + + // Generate the lookup table for byte-to-char conversion + for (i = 0; i < BYTE2CHAR.length; i ++) { + if (i <= 0x1f || i >= 0x7f) { + BYTE2CHAR[i] = '.'; + } else { + BYTE2CHAR[i] = (char) i; + } + } + } + + public StreamLoggingHandler() { + super(); + } + + public StreamLoggingHandler(Class clazz, LogLevel level) { + super(clazz, level); + } + + public StreamLoggingHandler(Class clazz) { + super(clazz); + } + + public StreamLoggingHandler(LogLevel level) { + super(level); + } + + public StreamLoggingHandler(String name, LogLevel level) { + super(name, level); + } + + public StreamLoggingHandler(String name) { + super(name); + } + @Override + public ChannelBufferHolder newOutboundBuffer(ChannelHandlerContext ctx) + throws Exception { + return ChannelBufferHolders.byteBuffer(); + } + + @Override + public ChannelBufferHolder newInboundBuffer(ChannelHandlerContext ctx) + throws Exception { + return ChannelBufferHolders.byteBuffer(); + } + + + @Override + public void inboundBufferUpdated(ChannelHandlerContext ctx) + throws Exception { + ChannelBuffer buf = ctx.inboundByteBuffer(); + if (logger.isEnabled(internalLevel)) { + logger.log(internalLevel, format(ctx, formatBuffer("RECEIVED", buf))); + } + ctx.nextInboundByteBuffer().writeBytes(buf); + ctx.fireInboundBufferUpdated(); + } + + @Override + public void flush(ChannelHandlerContext ctx, ChannelFuture future) + throws Exception { + ChannelBuffer buf = ctx.outboundByteBuffer(); + if (logger.isEnabled(internalLevel)) { + logger.log(internalLevel, format(ctx, formatBuffer("WRITE", buf))); + } + ctx.nextOutboundByteBuffer().writeBytes(buf); + ctx.flush(future); + } + + protected String formatBuffer(String message, ChannelBuffer buf) { + int length = buf.readableBytes(); + int rows = length / 16 + (length % 15 == 0? 0 : 1) + 4; + StringBuilder dump = new StringBuilder(rows * 80 + message.length() + 16); + + dump.append(message).append('(').append(length).append('B').append(')'); + dump.append( + NEWLINE + " +-------------------------------------------------+" + + NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" + + NEWLINE + "+--------+-------------------------------------------------+----------------+"); + + final int startIndex = buf.readerIndex(); + final int endIndex = buf.writerIndex(); + + int i; + for (i = startIndex; i < endIndex; i ++) { + int relIdx = i - startIndex; + int relIdxMod16 = relIdx & 15; + if (relIdxMod16 == 0) { + dump.append(NEWLINE); + dump.append(Long.toHexString(relIdx & 0xFFFFFFFFL | 0x100000000L)); + dump.setCharAt(dump.length() - 9, '|'); + dump.append('|'); + } + dump.append(BYTE2HEX[buf.getUnsignedByte(i)]); + if (relIdxMod16 == 15) { + dump.append(" |"); + for (int j = i - 15; j <= i; j ++) { + dump.append(BYTE2CHAR[buf.getUnsignedByte(j)]); + } + dump.append('|'); + } + } + + if ((i - startIndex & 15) != 0) { + int remainder = length & 15; + dump.append(HEXPADDING[remainder]); + dump.append(" |"); + for (int j = i - remainder; j < i; j ++) { + dump.append(BYTE2CHAR[buf.getUnsignedByte(j)]); + } + dump.append(BYTEPADDING[remainder]); + dump.append('|'); + } + + dump.append( + NEWLINE + "+--------+-------------------------------------------------+----------------+"); + + return dump.toString(); + } +} diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java index 311125b287..0c7fc35603 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java @@ -23,8 +23,8 @@ import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; @@ -67,7 +67,7 @@ import java.util.concurrent.atomic.AtomicInteger; * @apiviz.landmark * @apiviz.has io.netty.handler.stream.ChunkedInput oneway - - reads from */ -public class ChunkedWriteHandler extends ChannelHandlerAdapter { +public class ChunkedWriteHandler extends ChannelOutboundHandlerAdapter { private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChunkedWriteHandler.class); @@ -80,13 +80,6 @@ public class ChunkedWriteHandler extends ChannelHandlerAdapter { private final AtomicInteger pendingWrites = new AtomicInteger(); private Object currentEvent; - @Override - public ChannelBufferHolder newInboundBuffer( - ChannelHandlerContext ctx) throws Exception { - this.ctx = ctx; - return ChannelBufferHolders.inboundBypassBuffer(ctx); - } - @Override public ChannelBufferHolder newOutboundBuffer( ChannelHandlerContext ctx) throws Exception { @@ -116,7 +109,7 @@ public class ChunkedWriteHandler extends ChannelHandlerAdapter { } else { // let the transfer resume on the next event loop round ctx.executor().execute(new Runnable() { - + @Override public void run() { try { @@ -125,7 +118,7 @@ public class ChunkedWriteHandler extends ChannelHandlerAdapter { if (logger.isWarnEnabled()) { logger.warn("Unexpected exception while sending chunks.", e); } - } + } } }); } @@ -283,8 +276,8 @@ public class ChunkedWriteHandler extends ChannelHandlerAdapter { return; } } - - + + } static void closeInput(ChunkedInput chunks) { diff --git a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java index 6e1ff40430..6e3df84265 100644 --- a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java @@ -17,8 +17,6 @@ package io.netty.handler.timeout; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; -import io.netty.channel.ChannelBufferHolder; -import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; @@ -114,7 +112,7 @@ import java.util.concurrent.TimeUnit; * @apiviz.uses io.netty.util.HashedWheelTimer * @apiviz.has io.netty.handler.timeout.IdleStateEvent oneway - - triggers */ -public class IdleStateHandler extends ChannelHandlerAdapter { +public class IdleStateHandler extends ChannelHandlerAdapter { private final long readerIdleTimeMillis; private final long writerIdleTimeMillis; @@ -202,16 +200,6 @@ public class IdleStateHandler extends ChannelHandlerAdapter { } } - @Override - public ChannelBufferHolder newInboundBuffer(ChannelHandlerContext ctx) throws Exception { - return ChannelBufferHolders.inboundBypassBuffer(ctx); - } - - @Override - public ChannelBufferHolder newOutboundBuffer(ChannelHandlerContext ctx) throws Exception { - return ChannelBufferHolders.outboundBypassBuffer(ctx); - } - @Override public void beforeAdd(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isActive()) { diff --git a/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutHandler.java b/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutHandler.java index 486f4d935f..99816a7c9c 100644 --- a/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutHandler.java @@ -16,12 +16,10 @@ package io.netty.handler.timeout; import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelBufferHolder; -import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelStateHandlerAdapter; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; @@ -69,7 +67,7 @@ import java.util.concurrent.TimeUnit; * @apiviz.uses io.netty.util.HashedWheelTimer * @apiviz.has io.netty.handler.timeout.TimeoutException oneway - - raises */ -public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter { +public class ReadTimeoutHandler extends ChannelStateHandlerAdapter { private final long timeoutMillis; @@ -110,12 +108,6 @@ public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter { } } - @Override - public ChannelBufferHolder newInboundBuffer( - ChannelHandlerContext ctx) throws Exception { - return ChannelBufferHolders.inboundBypassBuffer(ctx); - } - @Override public void beforeAdd(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isActive()) { diff --git a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java index 21238cb15c..4ad5b61791 100644 --- a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java @@ -16,12 +16,10 @@ package io.netty.handler.timeout; import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelBufferHolder; -import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelOperationHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; @@ -67,7 +65,7 @@ import java.util.concurrent.TimeUnit; * @apiviz.uses io.netty.util.HashedWheelTimer * @apiviz.has io.netty.handler.timeout.TimeoutException oneway - - raises */ -public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter { +public class WriteTimeoutHandler extends ChannelOperationHandlerAdapter { private final long timeoutMillis; @@ -103,11 +101,6 @@ public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter { } } - @Override - public ChannelBufferHolder newOutboundBuffer(ChannelHandlerContext ctx) throws Exception { - return ChannelBufferHolders.outboundBypassBuffer(ctx); - } - @Override public void flush(final ChannelHandlerContext ctx, final ChannelFuture future) throws Exception { if (timeoutMillis > 0) { diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java index 5b522b1fba..df1ac23efc 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java @@ -17,13 +17,7 @@ package io.netty.channel; import java.net.SocketAddress; -public abstract class ChannelHandlerAdapter extends ChannelStateHandlerAdapter - implements ChannelOperationHandler, ChannelInboundHandler, ChannelOutboundHandler { - - @Override - public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception { - ChannelInboundHandlerAdapter.inboundBufferUpdated0(ctx); - } +public class ChannelHandlerAdapter extends ChannelStateHandlerAdapter implements ChannelOperationHandler { @Override public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelFuture future) throws Exception { @@ -52,6 +46,10 @@ public abstract class ChannelHandlerAdapter extends ChannelStateHandlerAda @Override public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { - ChannelOutboundHandlerAdapter.flush0(ctx, future); + if (ctx.type().contains(ChannelHandlerType.OUTBOUND)) { + ChannelOutboundHandlerAdapter.flush0(ctx, future); + } else { + ctx.flush(future); + } } } diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundHandler.java b/transport/src/main/java/io/netty/channel/ChannelInboundHandler.java index 55f46b1b48..32af75baee 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundHandler.java @@ -16,7 +16,6 @@ package io.netty.channel; -public interface ChannelInboundHandler extends ChannelHandler { +public interface ChannelInboundHandler extends ChannelStateHandler { ChannelBufferHolder newInboundBuffer(ChannelHandlerContext ctx) throws Exception; - void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception; } diff --git a/transport/src/main/java/io/netty/channel/ChannelOperationHandler.java b/transport/src/main/java/io/netty/channel/ChannelOperationHandler.java index 8354caaa03..605611f8b6 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOperationHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelOperationHandler.java @@ -8,4 +8,5 @@ public interface ChannelOperationHandler extends ChannelHandler { void disconnect(ChannelHandlerContext ctx, ChannelFuture future) throws Exception; void close(ChannelHandlerContext ctx, ChannelFuture future) throws Exception; void deregister(ChannelHandlerContext ctx, ChannelFuture future) throws Exception; + void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception; } diff --git a/transport/src/main/java/io/netty/channel/ChannelOperationHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOperationHandlerAdapter.java new file mode 100644 index 0000000000..88d9ea47fd --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ChannelOperationHandlerAdapter.java @@ -0,0 +1,74 @@ +package io.netty.channel; + +import java.net.SocketAddress; + +public class ChannelOperationHandlerAdapter implements ChannelOperationHandler { + + @Override + public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + // NOOP + } + + @Override + public void afterAdd(ChannelHandlerContext ctx) throws Exception { + // NOOP + } + + @Override + public void beforeRemove(ChannelHandlerContext ctx) throws Exception { + // NOOP + } + + @Override + public void afterRemove(ChannelHandlerContext ctx) throws Exception { + // NOOP + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) + throws Exception { + ctx.fireExceptionCaught(cause); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) + throws Exception { + ctx.fireUserEventTriggered(evt); + } + + @Override + public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, + ChannelFuture future) throws Exception { + ctx.bind(localAddress, future); + } + + @Override + public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, + SocketAddress localAddress, ChannelFuture future) throws Exception { + ctx.connect(remoteAddress, localAddress, future); + } + + @Override + public void disconnect(ChannelHandlerContext ctx, ChannelFuture future) + throws Exception { + ctx.disconnect(future); + } + + @Override + public void close(ChannelHandlerContext ctx, ChannelFuture future) + throws Exception { + ctx.close(future); + } + + @Override + public void deregister(ChannelHandlerContext ctx, ChannelFuture future) + throws Exception { + ctx.deregister(future); + } + + @Override + public void flush(ChannelHandlerContext ctx, ChannelFuture future) + throws Exception { + ctx.flush(future); + } +} diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundHandler.java b/transport/src/main/java/io/netty/channel/ChannelOutboundHandler.java index f327de0847..3b252501ce 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundHandler.java @@ -15,7 +15,6 @@ */ package io.netty.channel; -public interface ChannelOutboundHandler extends ChannelHandler { +public interface ChannelOutboundHandler extends ChannelOperationHandler { ChannelBufferHolder newOutboundBuffer(ChannelHandlerContext ctx) throws Exception; - void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception; } diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java index ad50931649..4303dae0c6 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java @@ -17,36 +17,10 @@ package io.netty.channel; import io.netty.buffer.ChannelBuffer; -import java.net.SocketAddress; import java.util.Queue; -public abstract class ChannelOutboundHandlerAdapter extends ChannelStateHandlerAdapter - implements ChannelOperationHandler, ChannelOutboundHandler { - - @Override - public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelFuture future) throws Exception { - ctx.bind(localAddress, future); - } - - @Override - public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) throws Exception { - ctx.connect(remoteAddress, localAddress, future); - } - - @Override - public void disconnect(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { - ctx.disconnect(future); - } - - @Override - public void close(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { - ctx.close(future); - } - - @Override - public void deregister(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { - ctx.deregister(future); - } +public abstract class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter + implements ChannelOutboundHandler { @Override public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { diff --git a/transport/src/main/java/io/netty/channel/ChannelStateHandler.java b/transport/src/main/java/io/netty/channel/ChannelStateHandler.java index cd4181b4b7..1d03b3a8cf 100644 --- a/transport/src/main/java/io/netty/channel/ChannelStateHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelStateHandler.java @@ -6,4 +6,6 @@ public interface ChannelStateHandler extends ChannelHandler { void channelActive(ChannelHandlerContext ctx) throws Exception; void channelInactive(ChannelHandlerContext ctx) throws Exception; + + void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception; } diff --git a/transport/src/main/java/io/netty/channel/ChannelStateHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelStateHandlerAdapter.java index 981f9a2fb3..fd24571619 100644 --- a/transport/src/main/java/io/netty/channel/ChannelStateHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelStateHandlerAdapter.java @@ -75,4 +75,13 @@ public class ChannelStateHandlerAdapter implements ChannelStateHandler { public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); } + + @Override + public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception { + if (ctx.type().contains(ChannelHandlerType.INBOUND)) { + ChannelInboundHandlerAdapter.inboundBufferUpdated0(ctx); + } else { + ctx.fireInboundBufferUpdated(); + } + } } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index 40c74eb76e..228227bfce 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -105,12 +105,11 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements }; final Runnable curCtxFireInboundBufferUpdatedTask = new Runnable() { @Override - @SuppressWarnings("unchecked") public void run() { DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this; flushBridge(); try { - ((ChannelInboundHandler) ctx.handler).inboundBufferUpdated(ctx); + ((ChannelStateHandler) ctx.handler).inboundBufferUpdated(ctx); } catch (Throwable t) { pipeline.notifyHandlerException(t); } finally { @@ -127,7 +126,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements @Override public void run() { DefaultChannelHandlerContext next = nextContext( - DefaultChannelHandlerContext.this.next, ChannelHandlerType.INBOUND); + DefaultChannelHandlerContext.this.next, ChannelHandlerType.STATE); if (next != null) { next.fillBridge(); DefaultChannelPipeline.fireInboundBufferUpdated(next); @@ -554,7 +553,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements public ChannelFuture flush(final ChannelFuture future) { EventExecutor executor = executor(); if (executor.inEventLoop()) { - DefaultChannelHandlerContext prev = nextContext(this.prev, ChannelHandlerType.OUTBOUND); + DefaultChannelHandlerContext prev = nextContext(this.prev, ChannelHandlerType.OPERATION); prev.fillBridge(); pipeline.flush(prev, future); } else { diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 6e748c1902..c63271ae3e 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -1214,7 +1214,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { fireInboundBufferUpdatedOnActivation = true; return; } - DefaultChannelHandlerContext ctx = firstContext(ChannelHandlerType.INBOUND); + DefaultChannelHandlerContext ctx = firstContext(ChannelHandlerType.STATE); if (ctx != null) { fireInboundBufferUpdated(ctx); } @@ -1413,7 +1413,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelFuture flush(ChannelFuture future) { - return flush(firstContext(ChannelHandlerType.OUTBOUND), future); + return flush(firstContext(ChannelHandlerType.OPERATION), future); } ChannelFuture flush(final DefaultChannelHandlerContext ctx, final ChannelFuture future) { @@ -1436,7 +1436,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { private void flush0(final DefaultChannelHandlerContext ctx, ChannelFuture future) { try { ctx.flushBridge(); - ((ChannelOutboundHandler) ctx.handler()).flush(ctx, future); + ((ChannelOperationHandler) ctx.handler()).flush(ctx, future); } catch (Throwable t) { notifyHandlerException(t); } finally {