Post-overhaul fixes / Split LoggingHandler into three

- LoggingHandler now only logs state and operations
- StreamLoggingHandler and MessageLoggingHandler log the buffer content
- Added ChannelOperationHandlerAdapter
  - Used by WriteTimeoutHandler
This commit is contained in:
Trustin Lee 2012-06-07 16:56:21 +09:00
parent 5e93d206ff
commit ea0c9cfe79
21 changed files with 422 additions and 288 deletions

View File

@ -21,6 +21,8 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOutboundHandler;
import java.nio.channels.ClosedChannelException;
import java.util.Queue;
@ -29,7 +31,9 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* Manages streams within a SPDY session.
*/
public class SpdySessionHandler extends ChannelHandlerAdapter<Object, Object> {
public class SpdySessionHandler
extends ChannelHandlerAdapter
implements ChannelInboundHandler<Object>, ChannelOutboundHandler<Object> {
private static final SpdyProtocolException PROTOCOL_EXCEPTION = new SpdyProtocolException();
private static final SpdyProtocolException STREAM_CLOSED = new SpdyProtocolException("Stream closed");

View File

@ -19,9 +19,12 @@ import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOutboundHandler;
public abstract class MessageToMessageCodec<INBOUND_IN, INBOUND_OUT, OUTBOUND_IN, OUTBOUND_OUT>
extends ChannelHandlerAdapter<INBOUND_IN, OUTBOUND_IN> {
extends ChannelHandlerAdapter
implements ChannelInboundHandler<INBOUND_IN>, ChannelOutboundHandler<OUTBOUND_IN> {
private final MessageToMessageEncoder<OUTBOUND_IN, OUTBOUND_OUT> encoder =
new MessageToMessageEncoder<OUTBOUND_IN, OUTBOUND_OUT>() {

View File

@ -20,9 +20,12 @@ import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOutboundHandler;
public abstract class StreamToMessageCodec<INBOUND_OUT, OUTBOUND_IN>
extends ChannelHandlerAdapter<Byte, OUTBOUND_IN> {
extends ChannelHandlerAdapter
implements ChannelInboundHandler<Byte>, ChannelOutboundHandler<OUTBOUND_IN> {
private final MessageToStreamEncoder<OUTBOUND_IN> encoder =
new MessageToStreamEncoder<OUTBOUND_IN>() {

View File

@ -20,8 +20,12 @@ import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOutboundHandler;
public abstract class StreamToStreamCodec extends ChannelHandlerAdapter<Byte, Byte> {
public abstract class StreamToStreamCodec
extends ChannelHandlerAdapter
implements ChannelInboundHandler<Byte>, ChannelOutboundHandler<Byte> {
private final StreamToStreamEncoder encoder = new StreamToStreamEncoder() {
@Override

View File

@ -15,9 +15,6 @@
*/
package io.netty.handler.logging;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandler.Sharable;
@ -28,7 +25,6 @@ import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.net.SocketAddress;
import java.util.Queue;
/**
* A {@link ChannelHandler} that logs all events via {@link InternalLogger}.
@ -38,72 +34,14 @@ import java.util.Queue;
* @apiviz.landmark
*/
@Sharable
public class LoggingHandler extends ChannelHandlerAdapter<Object, Object> {
public class LoggingHandler extends ChannelHandlerAdapter {
private static final LogLevel DEFAULT_LEVEL = LogLevel.DEBUG;
private static final String NEWLINE = String.format("%n");
private static final String[] BYTE2HEX = new String[256];
private static final String[] HEXPADDING = new String[16];
private static final String[] BYTEPADDING = new String[16];
private static final char[] BYTE2CHAR = new char[256];
protected final InternalLogger logger;
protected final InternalLogLevel internalLevel;
static {
int i;
// Generate the lookup table for byte-to-hex-dump conversion
for (i = 0; i < 10; i ++) {
StringBuilder buf = new StringBuilder(3);
buf.append(" 0");
buf.append(i);
BYTE2HEX[i] = buf.toString();
}
for (; i < 16; i ++) {
StringBuilder buf = new StringBuilder(3);
buf.append(" 0");
buf.append((char) ('a' + i - 10));
BYTE2HEX[i] = buf.toString();
}
for (; i < BYTE2HEX.length; i ++) {
StringBuilder buf = new StringBuilder(3);
buf.append(' ');
buf.append(Integer.toHexString(i));
BYTE2HEX[i] = buf.toString();
}
// Generate the lookup table for hex dump paddings
for (i = 0; i < HEXPADDING.length; i ++) {
int padding = HEXPADDING.length - i;
StringBuilder buf = new StringBuilder(padding * 3);
for (int j = 0; j < padding; j ++) {
buf.append(" ");
}
HEXPADDING[i] = buf.toString();
}
// Generate the lookup table for byte dump paddings
for (i = 0; i < BYTEPADDING.length; i ++) {
int padding = BYTEPADDING.length - i;
StringBuilder buf = new StringBuilder(padding);
for (int j = 0; j < padding; j ++) {
buf.append(' ');
}
BYTEPADDING[i] = buf.toString();
}
// Generate the lookup table for byte-to-char conversion
for (i = 0; i < BYTE2CHAR.length; i ++) {
if (i <= 0x1f || i >= 0x7f) {
BYTE2CHAR[i] = '.';
} else {
BYTE2CHAR[i] = (char) i;
}
}
}
private final InternalLogger logger;
private final LogLevel level;
private final InternalLogLevel internalLevel;
/**
* Creates a new instance whose logger name is the fully qualified class
@ -178,19 +116,11 @@ public class LoggingHandler extends ChannelHandlerAdapter<Object, Object> {
internalLevel = level.toInternalLevel();
}
/**
* Returns the {@link InternalLogger} that this handler uses to log
* a {@link ChannelEvent}.
*/
public InternalLogger getLogger() {
return logger;
}
/**
* Returns the {@link InternalLogLevel} that this handler uses to log
* a {@link ChannelEvent}.
*/
public LogLevel getLevel() {
public LogLevel level() {
return level;
}
@ -203,99 +133,10 @@ public class LoggingHandler extends ChannelHandlerAdapter<Object, Object> {
return buf.toString();
}
protected String formatBuffer(String bufName, ChannelBufferHolder<Object> holder) {
String content;
int size;
String elemType;
if (holder.hasByteBuffer()) {
ChannelBuffer buf = holder.byteBuffer();
size = buf.readableBytes();
elemType = "Byte";
content = hexdump(buf);
} else {
Queue<Object> buf = holder.messageBuffer();
content = buf.toString();
size = buf.size();
elemType = "Object";
}
StringBuilder buf = new StringBuilder(bufName.length() + elemType.length() + content.length() + 16);
buf.append(bufName);
buf.append('[');
buf.append(elemType);
buf.append("](");
buf.append(size);
buf.append("): ");
buf.append(content);
return buf.toString();
}
private static String hexdump(ChannelBuffer buf) {
int length = buf.readableBytes();
int rows = length / 16 + (length % 15 == 0? 0 : 1) + 4;
StringBuilder dump = new StringBuilder(rows * 80);
dump.append(
NEWLINE + " +-------------------------------------------------+" +
NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" +
NEWLINE + "+--------+-------------------------------------------------+----------------+");
final int startIndex = buf.readerIndex();
final int endIndex = buf.writerIndex();
int i;
for (i = startIndex; i < endIndex; i ++) {
int relIdx = i - startIndex;
int relIdxMod16 = relIdx & 15;
if (relIdxMod16 == 0) {
dump.append(NEWLINE);
dump.append(Long.toHexString(relIdx & 0xFFFFFFFFL | 0x100000000L));
dump.setCharAt(dump.length() - 9, '|');
dump.append('|');
}
dump.append(BYTE2HEX[buf.getUnsignedByte(i)]);
if (relIdxMod16 == 15) {
dump.append(" |");
for (int j = i - 15; j <= i; j ++) {
dump.append(BYTE2CHAR[buf.getUnsignedByte(j)]);
}
dump.append('|');
}
}
if ((i - startIndex & 15) != 0) {
int remainder = length & 15;
dump.append(HEXPADDING[remainder]);
dump.append(" |");
for (int j = i - remainder; j < i; j ++) {
dump.append(BYTE2CHAR[buf.getUnsignedByte(j)]);
}
dump.append(BYTEPADDING[remainder]);
dump.append('|');
}
dump.append(
NEWLINE + "+--------+-------------------------------------------------+----------------+");
return dump.toString();
}
@Override
public ChannelBufferHolder<Object> newOutboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.outboundBypassBuffer(ctx);
}
@Override
public ChannelBufferHolder<Object> newInboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.inboundBypassBuffer(ctx);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx)
throws Exception {
if (getLogger().isEnabled(internalLevel)) {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "REGISTERED"));
}
super.channelRegistered(ctx);
@ -304,7 +145,7 @@ public class LoggingHandler extends ChannelHandlerAdapter<Object, Object> {
@Override
public void channelUnregistered(ChannelHandlerContext ctx)
throws Exception {
if (getLogger().isEnabled(internalLevel)) {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "UNREGISTERED"));
}
super.channelUnregistered(ctx);
@ -313,7 +154,7 @@ public class LoggingHandler extends ChannelHandlerAdapter<Object, Object> {
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
if (getLogger().isEnabled(internalLevel)) {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "ACTIVE"));
}
super.channelActive(ctx);
@ -322,7 +163,7 @@ public class LoggingHandler extends ChannelHandlerAdapter<Object, Object> {
@Override
public void channelInactive(ChannelHandlerContext ctx)
throws Exception {
if (getLogger().isEnabled(internalLevel)) {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "INACTIVE"));
}
super.channelInactive(ctx);
@ -331,7 +172,7 @@ public class LoggingHandler extends ChannelHandlerAdapter<Object, Object> {
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {
if (getLogger().isEnabled(internalLevel)) {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "EXCEPTION: " + cause), cause);
}
super.exceptionCaught(ctx, cause);
@ -340,25 +181,16 @@ public class LoggingHandler extends ChannelHandlerAdapter<Object, Object> {
@Override
public void userEventTriggered(ChannelHandlerContext ctx,
Object evt) throws Exception {
if (getLogger().isEnabled(internalLevel)) {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "USER_EVENT: " + evt));
}
super.userEventTriggered(ctx, evt);
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx)
throws Exception {
if (getLogger().isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, formatBuffer("INBUF", ctx.inbound())));
}
ctx.fireInboundBufferUpdated();
}
@Override
public void bind(ChannelHandlerContext ctx,
SocketAddress localAddress, ChannelFuture future) throws Exception {
if (getLogger().isEnabled(internalLevel)) {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "BIND(" + localAddress + ')'));
}
super.bind(ctx, localAddress, future);
@ -368,7 +200,7 @@ public class LoggingHandler extends ChannelHandlerAdapter<Object, Object> {
public void connect(ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelFuture future) throws Exception {
if (getLogger().isEnabled(internalLevel)) {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "CONNECT(" + remoteAddress + ", " + localAddress + ')'));
}
super.connect(ctx, remoteAddress, localAddress, future);
@ -377,7 +209,7 @@ public class LoggingHandler extends ChannelHandlerAdapter<Object, Object> {
@Override
public void disconnect(ChannelHandlerContext ctx,
ChannelFuture future) throws Exception {
if (getLogger().isEnabled(internalLevel)) {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "DISCONNECT()"));
}
super.disconnect(ctx, future);
@ -386,7 +218,7 @@ public class LoggingHandler extends ChannelHandlerAdapter<Object, Object> {
@Override
public void close(ChannelHandlerContext ctx,
ChannelFuture future) throws Exception {
if (getLogger().isEnabled(internalLevel)) {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "CLOSE()"));
}
super.close(ctx, future);
@ -395,18 +227,9 @@ public class LoggingHandler extends ChannelHandlerAdapter<Object, Object> {
@Override
public void deregister(ChannelHandlerContext ctx,
ChannelFuture future) throws Exception {
if (getLogger().isEnabled(internalLevel)) {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "DEREGISTER()"));
}
super.deregister(ctx, future);
}
@Override
public void flush(ChannelHandlerContext ctx,
ChannelFuture future) throws Exception {
if (getLogger().isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, formatBuffer("OUTBUF", ctx.outbound())));
}
ctx.flush(future);
}
}

View File

@ -0,0 +1,93 @@
package io.netty.handler.logging;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOutboundHandler;
import java.util.Queue;
public class MessageLoggingHandler
extends LoggingHandler
implements ChannelInboundHandler<Byte>, ChannelOutboundHandler<Byte> {
public MessageLoggingHandler() {
super();
}
public MessageLoggingHandler(Class<?> clazz, LogLevel level) {
super(clazz, level);
}
public MessageLoggingHandler(Class<?> clazz) {
super(clazz);
}
public MessageLoggingHandler(LogLevel level) {
super(level);
}
public MessageLoggingHandler(String name, LogLevel level) {
super(name, level);
}
public MessageLoggingHandler(String name) {
super(name);
}
@Override
public ChannelBufferHolder<Byte> newOutboundBuffer(ChannelHandlerContext ctx)
throws Exception {
return ChannelBufferHolders.messageBuffer();
}
@Override
public ChannelBufferHolder<Byte> newInboundBuffer(ChannelHandlerContext ctx)
throws Exception {
return ChannelBufferHolders.messageBuffer();
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx)
throws Exception {
Queue<Object> buf = ctx.inboundMessageBuffer();
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, formatBuffer("RECEIVED", buf)));
}
Queue<Object> out = ctx.nextInboundMessageBuffer();
for (;;) {
Object o = buf.poll();
if (o == null) {
break;
}
out.add(o);
}
ctx.fireInboundBufferUpdated();
}
@Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future)
throws Exception {
Queue<Object> buf = ctx.outboundMessageBuffer();
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, formatBuffer("WRITE", buf)));
}
Queue<Object> out = ctx.nextOutboundMessageBuffer();
for (;;) {
Object o = buf.poll();
if (o == null) {
break;
}
out.add(o);
}
ctx.flush(future);
}
protected String formatBuffer(String message, Queue<Object> buf) {
return message + '(' + buf.size() + "): " + buf;
}
}

View File

@ -0,0 +1,183 @@
package io.netty.handler.logging;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOutboundHandler;
public class StreamLoggingHandler
extends LoggingHandler
implements ChannelInboundHandler<Byte>, ChannelOutboundHandler<Byte> {
private static final String NEWLINE = String.format("%n");
private static final String[] BYTE2HEX = new String[256];
private static final String[] HEXPADDING = new String[16];
private static final String[] BYTEPADDING = new String[16];
private static final char[] BYTE2CHAR = new char[256];
static {
int i;
// Generate the lookup table for byte-to-hex-dump conversion
for (i = 0; i < 10; i ++) {
StringBuilder buf = new StringBuilder(3);
buf.append(" 0");
buf.append(i);
BYTE2HEX[i] = buf.toString();
}
for (; i < 16; i ++) {
StringBuilder buf = new StringBuilder(3);
buf.append(" 0");
buf.append((char) ('a' + i - 10));
BYTE2HEX[i] = buf.toString();
}
for (; i < BYTE2HEX.length; i ++) {
StringBuilder buf = new StringBuilder(3);
buf.append(' ');
buf.append(Integer.toHexString(i));
BYTE2HEX[i] = buf.toString();
}
// Generate the lookup table for hex dump paddings
for (i = 0; i < HEXPADDING.length; i ++) {
int padding = HEXPADDING.length - i;
StringBuilder buf = new StringBuilder(padding * 3);
for (int j = 0; j < padding; j ++) {
buf.append(" ");
}
HEXPADDING[i] = buf.toString();
}
// Generate the lookup table for byte dump paddings
for (i = 0; i < BYTEPADDING.length; i ++) {
int padding = BYTEPADDING.length - i;
StringBuilder buf = new StringBuilder(padding);
for (int j = 0; j < padding; j ++) {
buf.append(' ');
}
BYTEPADDING[i] = buf.toString();
}
// Generate the lookup table for byte-to-char conversion
for (i = 0; i < BYTE2CHAR.length; i ++) {
if (i <= 0x1f || i >= 0x7f) {
BYTE2CHAR[i] = '.';
} else {
BYTE2CHAR[i] = (char) i;
}
}
}
public StreamLoggingHandler() {
super();
}
public StreamLoggingHandler(Class<?> clazz, LogLevel level) {
super(clazz, level);
}
public StreamLoggingHandler(Class<?> clazz) {
super(clazz);
}
public StreamLoggingHandler(LogLevel level) {
super(level);
}
public StreamLoggingHandler(String name, LogLevel level) {
super(name, level);
}
public StreamLoggingHandler(String name) {
super(name);
}
@Override
public ChannelBufferHolder<Byte> newOutboundBuffer(ChannelHandlerContext ctx)
throws Exception {
return ChannelBufferHolders.byteBuffer();
}
@Override
public ChannelBufferHolder<Byte> newInboundBuffer(ChannelHandlerContext ctx)
throws Exception {
return ChannelBufferHolders.byteBuffer();
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx)
throws Exception {
ChannelBuffer buf = ctx.inboundByteBuffer();
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, formatBuffer("RECEIVED", buf)));
}
ctx.nextInboundByteBuffer().writeBytes(buf);
ctx.fireInboundBufferUpdated();
}
@Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future)
throws Exception {
ChannelBuffer buf = ctx.outboundByteBuffer();
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, formatBuffer("WRITE", buf)));
}
ctx.nextOutboundByteBuffer().writeBytes(buf);
ctx.flush(future);
}
protected String formatBuffer(String message, ChannelBuffer buf) {
int length = buf.readableBytes();
int rows = length / 16 + (length % 15 == 0? 0 : 1) + 4;
StringBuilder dump = new StringBuilder(rows * 80 + message.length() + 16);
dump.append(message).append('(').append(length).append('B').append(')');
dump.append(
NEWLINE + " +-------------------------------------------------+" +
NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" +
NEWLINE + "+--------+-------------------------------------------------+----------------+");
final int startIndex = buf.readerIndex();
final int endIndex = buf.writerIndex();
int i;
for (i = startIndex; i < endIndex; i ++) {
int relIdx = i - startIndex;
int relIdxMod16 = relIdx & 15;
if (relIdxMod16 == 0) {
dump.append(NEWLINE);
dump.append(Long.toHexString(relIdx & 0xFFFFFFFFL | 0x100000000L));
dump.setCharAt(dump.length() - 9, '|');
dump.append('|');
}
dump.append(BYTE2HEX[buf.getUnsignedByte(i)]);
if (relIdxMod16 == 15) {
dump.append(" |");
for (int j = i - 15; j <= i; j ++) {
dump.append(BYTE2CHAR[buf.getUnsignedByte(j)]);
}
dump.append('|');
}
}
if ((i - startIndex & 15) != 0) {
int remainder = length & 15;
dump.append(HEXPADDING[remainder]);
dump.append(" |");
for (int j = i - remainder; j < i; j ++) {
dump.append(BYTE2CHAR[buf.getUnsignedByte(j)]);
}
dump.append(BYTEPADDING[remainder]);
dump.append('|');
}
dump.append(
NEWLINE + "+--------+-------------------------------------------------+----------------+");
return dump.toString();
}
}

View File

@ -23,8 +23,8 @@ import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
@ -67,7 +67,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* @apiviz.landmark
* @apiviz.has io.netty.handler.stream.ChunkedInput oneway - - reads from
*/
public class ChunkedWriteHandler extends ChannelHandlerAdapter<Object, Object> {
public class ChunkedWriteHandler extends ChannelOutboundHandlerAdapter<Object> {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
@ -80,13 +80,6 @@ public class ChunkedWriteHandler extends ChannelHandlerAdapter<Object, Object> {
private final AtomicInteger pendingWrites = new AtomicInteger();
private Object currentEvent;
@Override
public ChannelBufferHolder<Object> newInboundBuffer(
ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
return ChannelBufferHolders.inboundBypassBuffer(ctx);
}
@Override
public ChannelBufferHolder<Object> newOutboundBuffer(
ChannelHandlerContext ctx) throws Exception {

View File

@ -17,8 +17,6 @@ package io.netty.handler.timeout;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
@ -114,7 +112,7 @@ import java.util.concurrent.TimeUnit;
* @apiviz.uses io.netty.util.HashedWheelTimer
* @apiviz.has io.netty.handler.timeout.IdleStateEvent oneway - - triggers
*/
public class IdleStateHandler extends ChannelHandlerAdapter<Object, Object> {
public class IdleStateHandler extends ChannelHandlerAdapter {
private final long readerIdleTimeMillis;
private final long writerIdleTimeMillis;
@ -202,16 +200,6 @@ public class IdleStateHandler extends ChannelHandlerAdapter<Object, Object> {
}
}
@Override
public ChannelBufferHolder<Object> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.inboundBypassBuffer(ctx);
}
@Override
public ChannelBufferHolder<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.outboundBypassBuffer(ctx);
}
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isActive()) {

View File

@ -16,12 +16,10 @@
package io.netty.handler.timeout;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelStateHandlerAdapter;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
@ -69,7 +67,7 @@ import java.util.concurrent.TimeUnit;
* @apiviz.uses io.netty.util.HashedWheelTimer
* @apiviz.has io.netty.handler.timeout.TimeoutException oneway - - raises
*/
public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter<Object> {
public class ReadTimeoutHandler extends ChannelStateHandlerAdapter {
private final long timeoutMillis;
@ -110,12 +108,6 @@ public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter<Object> {
}
}
@Override
public ChannelBufferHolder<Object> newInboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.inboundBypassBuffer(ctx);
}
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isActive()) {

View File

@ -16,12 +16,10 @@
package io.netty.handler.timeout;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelOperationHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
@ -67,7 +65,7 @@ import java.util.concurrent.TimeUnit;
* @apiviz.uses io.netty.util.HashedWheelTimer
* @apiviz.has io.netty.handler.timeout.TimeoutException oneway - - raises
*/
public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter<Object> {
public class WriteTimeoutHandler extends ChannelOperationHandlerAdapter {
private final long timeoutMillis;
@ -103,11 +101,6 @@ public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter<Object> {
}
}
@Override
public ChannelBufferHolder<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.outboundBypassBuffer(ctx);
}
@Override
public void flush(final ChannelHandlerContext ctx, final ChannelFuture future) throws Exception {
if (timeoutMillis > 0) {

View File

@ -17,13 +17,7 @@ package io.netty.channel;
import java.net.SocketAddress;
public abstract class ChannelHandlerAdapter<I, O> extends ChannelStateHandlerAdapter
implements ChannelOperationHandler, ChannelInboundHandler<I>, ChannelOutboundHandler<O> {
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
ChannelInboundHandlerAdapter.inboundBufferUpdated0(ctx);
}
public class ChannelHandlerAdapter extends ChannelStateHandlerAdapter implements ChannelOperationHandler {
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelFuture future) throws Exception {
@ -52,6 +46,10 @@ public abstract class ChannelHandlerAdapter<I, O> extends ChannelStateHandlerAda
@Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
if (ctx.type().contains(ChannelHandlerType.OUTBOUND)) {
ChannelOutboundHandlerAdapter.flush0(ctx, future);
} else {
ctx.flush(future);
}
}
}

View File

@ -16,7 +16,6 @@
package io.netty.channel;
public interface ChannelInboundHandler<T> extends ChannelHandler {
public interface ChannelInboundHandler<T> extends ChannelStateHandler {
ChannelBufferHolder<T> newInboundBuffer(ChannelHandlerContext ctx) throws Exception;
void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception;
}

View File

@ -8,4 +8,5 @@ public interface ChannelOperationHandler extends ChannelHandler {
void disconnect(ChannelHandlerContext ctx, ChannelFuture future) throws Exception;
void close(ChannelHandlerContext ctx, ChannelFuture future) throws Exception;
void deregister(ChannelHandlerContext ctx, ChannelFuture future) throws Exception;
void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception;
}

View File

@ -0,0 +1,74 @@
package io.netty.channel;
import java.net.SocketAddress;
public class ChannelOperationHandlerAdapter implements ChannelOperationHandler {
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.fireExceptionCaught(cause);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
ctx.fireUserEventTriggered(evt);
}
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
ChannelFuture future) throws Exception {
ctx.bind(localAddress, future);
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelFuture future) throws Exception {
ctx.connect(remoteAddress, localAddress, future);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelFuture future)
throws Exception {
ctx.disconnect(future);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelFuture future)
throws Exception {
ctx.close(future);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelFuture future)
throws Exception {
ctx.deregister(future);
}
@Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future)
throws Exception {
ctx.flush(future);
}
}

View File

@ -15,7 +15,6 @@
*/
package io.netty.channel;
public interface ChannelOutboundHandler<T> extends ChannelHandler {
public interface ChannelOutboundHandler<T> extends ChannelOperationHandler {
ChannelBufferHolder<T> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception;
void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception;
}

View File

@ -17,36 +17,10 @@ package io.netty.channel;
import io.netty.buffer.ChannelBuffer;
import java.net.SocketAddress;
import java.util.Queue;
public abstract class ChannelOutboundHandlerAdapter<O> extends ChannelStateHandlerAdapter
implements ChannelOperationHandler, ChannelOutboundHandler<O> {
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelFuture future) throws Exception {
ctx.bind(localAddress, future);
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) throws Exception {
ctx.connect(remoteAddress, localAddress, future);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
ctx.disconnect(future);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
ctx.close(future);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
ctx.deregister(future);
}
public abstract class ChannelOutboundHandlerAdapter<O> extends ChannelHandlerAdapter
implements ChannelOutboundHandler<O> {
@Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {

View File

@ -6,4 +6,6 @@ public interface ChannelStateHandler extends ChannelHandler {
void channelActive(ChannelHandlerContext ctx) throws Exception;
void channelInactive(ChannelHandlerContext ctx) throws Exception;
void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception;
}

View File

@ -75,4 +75,13 @@ public class ChannelStateHandlerAdapter implements ChannelStateHandler {
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
if (ctx.type().contains(ChannelHandlerType.INBOUND)) {
ChannelInboundHandlerAdapter.inboundBufferUpdated0(ctx);
} else {
ctx.fireInboundBufferUpdated();
}
}
}

View File

@ -105,12 +105,11 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
};
final Runnable curCtxFireInboundBufferUpdatedTask = new Runnable() {
@Override
@SuppressWarnings("unchecked")
public void run() {
DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this;
flushBridge();
try {
((ChannelInboundHandler<Object>) ctx.handler).inboundBufferUpdated(ctx);
((ChannelStateHandler) ctx.handler).inboundBufferUpdated(ctx);
} catch (Throwable t) {
pipeline.notifyHandlerException(t);
} finally {
@ -127,7 +126,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public void run() {
DefaultChannelHandlerContext next = nextContext(
DefaultChannelHandlerContext.this.next, ChannelHandlerType.INBOUND);
DefaultChannelHandlerContext.this.next, ChannelHandlerType.STATE);
if (next != null) {
next.fillBridge();
DefaultChannelPipeline.fireInboundBufferUpdated(next);
@ -554,7 +553,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
public ChannelFuture flush(final ChannelFuture future) {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
DefaultChannelHandlerContext prev = nextContext(this.prev, ChannelHandlerType.OUTBOUND);
DefaultChannelHandlerContext prev = nextContext(this.prev, ChannelHandlerType.OPERATION);
prev.fillBridge();
pipeline.flush(prev, future);
} else {

View File

@ -1214,7 +1214,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
fireInboundBufferUpdatedOnActivation = true;
return;
}
DefaultChannelHandlerContext ctx = firstContext(ChannelHandlerType.INBOUND);
DefaultChannelHandlerContext ctx = firstContext(ChannelHandlerType.STATE);
if (ctx != null) {
fireInboundBufferUpdated(ctx);
}
@ -1413,7 +1413,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelFuture flush(ChannelFuture future) {
return flush(firstContext(ChannelHandlerType.OUTBOUND), future);
return flush(firstContext(ChannelHandlerType.OPERATION), future);
}
ChannelFuture flush(final DefaultChannelHandlerContext ctx, final ChannelFuture future) {
@ -1436,7 +1436,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
private void flush0(final DefaultChannelHandlerContext ctx, ChannelFuture future) {
try {
ctx.flushBridge();
((ChannelOutboundHandler<Object>) ctx.handler()).flush(ctx, future);
((ChannelOperationHandler) ctx.handler()).flush(ctx, future);
} catch (Throwable t) {
notifyHandlerException(t);
} finally {