Introduce bypass buffer and use it in LoggingHandler

- Added ChannelBufferHolders.(inbound|outbound)BypassBuffer()
  - The holder returned by these methods returns the next handler's
    buffer.  When a handler's new(Inbound|Outbound)Buffer returns
    a bypass holder, your inboundBufferUpdated() and flush()
    implementation should check if the buffer is a bypass and should not
    modify the content of the buffer.
- Channel(Inbound|Outbound)?HandlerAdapter is now abstract.
  - A user has to specify the exact inbound/outbound buffer type
  - It's because there's no way to determine the best buffer type
- Implemented LoggingHandler using the new API.
  - It doesn't dump received or sent messages yet.
- Fixed a bug where DefaultUnsafe.close() does not trigger deregister()
- Fixed a bug where NioSocketChannel.isActive() does not return false
  when closed
This commit is contained in:
Trustin Lee 2012-05-10 23:19:59 +09:00
parent 532672deae
commit da9ecadfc0
9 changed files with 265 additions and 117 deletions

View File

@ -27,6 +27,8 @@ import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.SelectorEventLoop;
import io.netty.handler.logging.LoggingHandler;
import io.netty.logging.InternalLogLevel;
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
@ -47,6 +49,7 @@ public class EchoServer {
// Configure the server.
final EventLoop loop = new MultithreadEventLoop(SelectorEventLoop.class);
ServerSocketChannel ssc = new NioServerSocketChannel();
ssc.pipeline().addLast("logger", new LoggingHandler(InternalLogLevel.INFO));
ssc.pipeline().addLast("acceptor", new ChannelInboundHandlerAdapter<SocketChannel>() {
@Override
@ -66,6 +69,7 @@ public class EchoServer {
if (s == null) {
break;
}
s.pipeline().addLast("logger", new LoggingHandler(InternalLogLevel.INFO));
s.pipeline().addLast("echoer", new ChannelInboundHandlerAdapter<Byte>() {
@Override
public ChannelBufferHolder<Byte> newInboundBuffer(ChannelInboundHandlerContext<Byte> ctx) {

View File

@ -15,21 +15,21 @@
*/
package io.netty.handler.logging;
import static io.netty.buffer.ChannelBuffers.*;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelDownstreamHandler;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelUpstreamHandler;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelOutboundHandlerContext;
import io.netty.logging.InternalLogLevel;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.net.SocketAddress;
/**
* A {@link ChannelHandler} that logs all events via {@link InternalLogger}.
* By default, all events are logged at <tt>DEBUG</tt> level. You can extend
@ -38,7 +38,7 @@ import io.netty.logging.InternalLoggerFactory;
* @apiviz.landmark
*/
@Sharable
public class LoggingHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler {
public class LoggingHandler extends ChannelHandlerAdapter<Object, Object> {
private static final InternalLogLevel DEFAULT_LEVEL = InternalLogLevel.DEBUG;
@ -192,45 +192,144 @@ public class LoggingHandler implements ChannelUpstreamHandler, ChannelDownstream
return level;
}
/**
* Logs the specified event to the {@link InternalLogger} returned by
* {@link #getLogger()}. If hex dump has been enabled for this handler,
* the hex dump of the {@link ChannelBuffer} in a {@link MessageEvent} will
* be logged together.
*/
public void log(ChannelEvent e) {
protected String message(ChannelHandlerContext ctx, String message) {
String chStr = ctx.channel().toString();
StringBuilder buf = new StringBuilder(chStr.length() + message.length() + 1);
buf.append(chStr);
buf.append(' ');
buf.append(message);
return buf.toString();
}
@Override
public ChannelBufferHolder<Object> newOutboundBuffer(
ChannelOutboundHandlerContext<Object> ctx) throws Exception {
return ChannelBufferHolders.outboundBypassBuffer(ctx);
}
@Override
public ChannelBufferHolder<Object> newInboundBuffer(
ChannelInboundHandlerContext<Object> ctx) throws Exception {
return ChannelBufferHolders.inboundBypassBuffer(ctx);
}
@Override
public void channelRegistered(ChannelInboundHandlerContext<Object> ctx)
throws Exception {
if (getLogger().isEnabled(level)) {
String msg = e.toString();
// Append hex dump if necessary.
if (hexDump && e instanceof MessageEvent) {
MessageEvent me = (MessageEvent) e;
if (me.getMessage() instanceof ChannelBuffer) {
ChannelBuffer buf = (ChannelBuffer) me.getMessage();
msg = msg + " - (HEXDUMP: " + hexDump(buf) + ')';
}
}
// Log the message (and exception if available.)
if (e instanceof ExceptionEvent) {
getLogger().log(level, msg, ((ExceptionEvent) e).cause());
} else {
getLogger().log(level, msg);
}
logger.log(level, message(ctx, "REGISTERED"));
}
super.channelRegistered(ctx);
}
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
public void channelUnregistered(ChannelInboundHandlerContext<Object> ctx)
throws Exception {
log(e);
ctx.sendUpstream(e);
if (getLogger().isEnabled(level)) {
logger.log(level, message(ctx, "UNREGISTERED"));
}
super.channelUnregistered(ctx);
}
@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
public void channelActive(ChannelInboundHandlerContext<Object> ctx)
throws Exception {
log(e);
ctx.sendDownstream(e);
if (getLogger().isEnabled(level)) {
logger.log(level, message(ctx, "ACTIVE"));
}
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelInboundHandlerContext<Object> ctx)
throws Exception {
if (getLogger().isEnabled(level)) {
logger.log(level, message(ctx, "INACTIVE"));
}
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelInboundHandlerContext<Object> ctx,
Throwable cause) throws Exception {
if (getLogger().isEnabled(level)) {
logger.log(level, message(ctx, String.format("EXCEPTION: %s", cause)), cause);
}
super.exceptionCaught(ctx, cause);
}
@Override
public void userEventTriggered(ChannelInboundHandlerContext<Object> ctx,
Object evt) throws Exception {
if (getLogger().isEnabled(level)) {
logger.log(level, message(ctx, String.format("USER_EVENT: %s", evt)));
}
super.userEventTriggered(ctx, evt);
}
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<Object> ctx)
throws Exception {
if (getLogger().isEnabled(level)) {
logger.log(level, message(ctx, "INBOUND_UPDATED"));
}
// TODO Auto-generated method stub
super.inboundBufferUpdated(ctx);
}
@Override
public void bind(ChannelOutboundHandlerContext<Object> ctx,
SocketAddress localAddress, ChannelFuture future) throws Exception {
if (getLogger().isEnabled(level)) {
logger.log(level, message(ctx, String.format("bind(%s)", localAddress)));
}
super.bind(ctx, localAddress, future);
}
@Override
public void connect(ChannelOutboundHandlerContext<Object> ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelFuture future) throws Exception {
if (getLogger().isEnabled(level)) {
logger.log(level, message(ctx, String.format("connect(%s, %s)", remoteAddress, localAddress)));
}
super.connect(ctx, remoteAddress, localAddress, future);
}
@Override
public void disconnect(ChannelOutboundHandlerContext<Object> ctx,
ChannelFuture future) throws Exception {
if (getLogger().isEnabled(level)) {
logger.log(level, message(ctx, "disconnect()"));
}
super.disconnect(ctx, future);
}
@Override
public void close(ChannelOutboundHandlerContext<Object> ctx,
ChannelFuture future) throws Exception {
if (getLogger().isEnabled(level)) {
logger.log(level, message(ctx, "close()"));
}
super.close(ctx, future);
}
@Override
public void deregister(ChannelOutboundHandlerContext<Object> ctx,
ChannelFuture future) throws Exception {
if (getLogger().isEnabled(level)) {
logger.log(level, message(ctx, "deregister()"));
}
super.deregister(ctx, future);
}
@Override
public void flush(ChannelOutboundHandlerContext<Object> ctx,
ChannelFuture future) throws Exception {
if (getLogger().isEnabled(level)) {
logger.log(level, message(ctx, "flush()"));
}
super.flush(ctx, future);
}
}

View File

@ -486,6 +486,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
registered = true;
future.setSuccess();
pipeline().fireChannelRegistered();
if (isActive()) {
pipeline().fireChannelActive();
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
try {
@ -615,10 +618,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} catch (Throwable t) {
future.setFailure(t);
}
notifyClosureListeners();
if (wasActive && !isActive()) {
pipeline().fireChannelInactive();
}
notifyClosureListeners();
deregister(newVoidFuture());
} else {
// Closed already.
future.setSuccess();
@ -683,12 +689,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
if (closed) {
close(newFuture());
close(newVoidFuture());
}
} catch (Throwable t) {
pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
close(newFuture());
close(newVoidFuture());
}
}
}

View File

@ -6,13 +6,28 @@ import java.util.Queue;
public final class ChannelBufferHolder<E> {
private final ChannelHandlerContext ctx;
/** 0 - not a bypass, 1 - inbound bypass, 2 - outbound bypass */
private final int bypassDirection;
private final Queue<E> msgBuf;
private final ChannelBuffer byteBuf;
ChannelBufferHolder(ChannelHandlerContext ctx, boolean inbound) {
if (ctx == null) {
throw new NullPointerException("ctx");
}
this.ctx = ctx;
bypassDirection = inbound? 1 : 2;
msgBuf = null;
byteBuf = null;
}
ChannelBufferHolder(Queue<E> msgBuf) {
if (msgBuf == null) {
throw new NullPointerException("msgBuf");
}
ctx = null;
bypassDirection = 0;
this.msgBuf = msgBuf;
byteBuf = null;
@ -22,38 +37,91 @@ public final class ChannelBufferHolder<E> {
if (byteBuf == null) {
throw new NullPointerException("byteBuf");
}
ctx = null;
bypassDirection = 0;
msgBuf = null;
this.byteBuf = byteBuf;
}
public boolean isBypass() {
return bypassDirection != 0;
}
public boolean hasMessageBuffer() {
return msgBuf != null;
switch (bypassDirection) {
case 0:
return msgBuf != null;
case 1:
return ctx.nextIn().hasMessageBuffer();
case 2:
return ctx.out().hasMessageBuffer();
default:
throw new Error();
}
}
public boolean hasByteBuffer() {
return byteBuf != null;
switch (bypassDirection) {
case 0:
return byteBuf != null;
case 1:
return ctx.nextIn().hasByteBuffer();
case 2:
return ctx.out().hasByteBuffer();
default:
throw new Error();
}
}
@SuppressWarnings("unchecked")
public Queue<E> messageBuffer() {
if (!hasMessageBuffer()) {
throw new IllegalStateException("does not have a message buffer");
switch (bypassDirection) {
case 0:
if (!hasMessageBuffer()) {
throw new IllegalStateException("does not have a message buffer");
}
return msgBuf;
case 1:
return (Queue<E>) ctx.nextIn().messageBuffer();
case 2:
return (Queue<E>) ctx.out().messageBuffer();
default:
throw new Error();
}
return msgBuf;
}
public ChannelBuffer byteBuffer() {
if (!hasByteBuffer()) {
throw new IllegalStateException("does not have a byte buffer");
switch (bypassDirection) {
case 0:
if (!hasByteBuffer()) {
throw new IllegalStateException("does not have a byte buffer");
}
return byteBuf;
case 1:
return ctx.nextIn().byteBuffer();
case 2:
return ctx.out().byteBuffer();
default:
throw new Error();
}
return byteBuf;
}
@Override
public String toString() {
if (hasMessageBuffer()) {
return messageBuffer().toString();
} else {
return byteBuffer().toString();
switch (bypassDirection) {
case 0:
if (hasMessageBuffer()) {
return messageBuffer().toString();
} else {
return byteBuffer().toString();
}
case 1:
return ctx.nextIn().toString();
case 2:
return ctx.out().toString();
default:
throw new Error();
}
}
}

View File

@ -14,6 +14,14 @@ public final class ChannelBufferHolders {
return new ChannelBufferHolder<Byte>(buffer);
}
public static <E> ChannelBufferHolder<E> inboundBypassBuffer(ChannelHandlerContext ctx) {
return new ChannelBufferHolder<E>(ctx, true);
}
public static <E> ChannelBufferHolder<E> outboundBypassBuffer(ChannelHandlerContext ctx) {
return new ChannelBufferHolder<E>(ctx, false);
}
private ChannelBufferHolders() {
// Utility class
}

View File

@ -1,12 +1,8 @@
package io.netty.channel;
import io.netty.buffer.ChannelBuffer;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.Queue;
public class ChannelHandlerAdapter<I, O> implements ChannelInboundHandler<I>, ChannelOutboundHandler<O> {
public abstract class ChannelHandlerAdapter<I, O> implements ChannelInboundHandler<I>, ChannelOutboundHandler<O> {
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
@ -57,29 +53,9 @@ public class ChannelHandlerAdapter<I, O> implements ChannelInboundHandler<I>, Ch
ctx.fireUserEventTriggered(evt);
}
@Override
public ChannelBufferHolder<I> newInboundBuffer(ChannelInboundHandlerContext<I> ctx) throws Exception {
return ChannelBufferHolders.messageBuffer(new ArrayDeque<I>());
}
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<I> ctx) throws Exception {
if (ctx.in().hasMessageBuffer()) {
Queue<I> in = ctx.in().messageBuffer();
Queue<Object> nextIn = ctx.nextIn().messageBuffer();
for (;;) {
I msg = in.poll();
if (msg == null) {
break;
}
nextIn.add(msg);
}
} else {
ChannelBuffer in = ctx.in().byteBuffer();
ChannelBuffer nextIn = ctx.nextIn().byteBuffer();
nextIn.writeBytes(in);
}
ctx.fireInboundBufferUpdated();
ChannelInboundHandlerAdapter.inboundBufferUpdated0(ctx);
}
@Override
@ -107,28 +83,8 @@ public class ChannelHandlerAdapter<I, O> implements ChannelInboundHandler<I>, Ch
ctx.deregister(future);
}
@Override
public ChannelBufferHolder<O> newOutboundBuffer(ChannelOutboundHandlerContext<O> ctx) throws Exception {
return ChannelBufferHolders.messageBuffer(new ArrayDeque<O>());
}
@Override
public void flush(ChannelOutboundHandlerContext<O> ctx, ChannelFuture future) throws Exception {
if (ctx.prevOut().hasMessageBuffer()) {
Queue<O> out = ctx.prevOut().messageBuffer();
Queue<Object> nextOut = ctx.out().messageBuffer();
for (;;) {
O msg = out.poll();
if (msg == null) {
break;
}
nextOut.add(msg);
}
} else {
ChannelBuffer out = ctx.prevOut().byteBuffer();
ChannelBuffer nextOut = ctx.out().byteBuffer();
nextOut.writeBytes(out);
}
ctx.flush(future);
ChannelOutboundHandlerAdapter.flush0(ctx, future);
}
}

View File

@ -2,10 +2,9 @@ package io.netty.channel;
import io.netty.buffer.ChannelBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
public class ChannelInboundHandlerAdapter<I> implements ChannelInboundHandler<I> {
public abstract class ChannelInboundHandlerAdapter<I> implements ChannelInboundHandler<I> {
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
@ -57,12 +56,16 @@ public class ChannelInboundHandlerAdapter<I> implements ChannelInboundHandler<I>
}
@Override
public ChannelBufferHolder<I> newInboundBuffer(ChannelInboundHandlerContext<I> ctx) throws Exception {
return ChannelBufferHolders.messageBuffer(new ArrayDeque<I>());
public void inboundBufferUpdated(ChannelInboundHandlerContext<I> ctx) throws Exception {
inboundBufferUpdated0(ctx);
}
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<I> ctx) throws Exception {
static <I> void inboundBufferUpdated0(ChannelInboundHandlerContext<I> ctx) {
if (ctx.in().isBypass()) {
ctx.fireInboundBufferUpdated();
return;
}
if (ctx.in().hasMessageBuffer()) {
Queue<I> in = ctx.in().messageBuffer();
Queue<Object> nextIn = ctx.nextIn().messageBuffer();
@ -77,8 +80,8 @@ public class ChannelInboundHandlerAdapter<I> implements ChannelInboundHandler<I>
ChannelBuffer in = ctx.in().byteBuffer();
ChannelBuffer nextIn = ctx.nextIn().byteBuffer();
nextIn.writeBytes(in);
in.discardReadBytes();
}
ctx.fireInboundBufferUpdated();
}
}

View File

@ -3,10 +3,9 @@ package io.netty.channel;
import io.netty.buffer.ChannelBuffer;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.Queue;
public class ChannelOutboundHandlerAdapter<O> implements ChannelOutboundHandler<O> {
public abstract class ChannelOutboundHandlerAdapter<O> implements ChannelOutboundHandler<O> {
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
@ -53,12 +52,16 @@ public class ChannelOutboundHandlerAdapter<O> implements ChannelOutboundHandler<
}
@Override
public ChannelBufferHolder<O> newOutboundBuffer(ChannelOutboundHandlerContext<O> ctx) throws Exception {
return ChannelBufferHolders.messageBuffer(new ArrayDeque<O>());
public void flush(ChannelOutboundHandlerContext<O> ctx, ChannelFuture future) throws Exception {
flush0(ctx, future);
}
@Override
public void flush(ChannelOutboundHandlerContext<O> ctx, ChannelFuture future) throws Exception {
static <O> void flush0(ChannelOutboundHandlerContext<O> ctx, ChannelFuture future) {
if (ctx.prevOut().isBypass()) {
ctx.flush(future);
return;
}
if (ctx.prevOut().hasMessageBuffer()) {
Queue<O> out = ctx.prevOut().messageBuffer();
Queue<Object> nextOut = ctx.out().messageBuffer();
@ -73,6 +76,7 @@ public class ChannelOutboundHandlerAdapter<O> implements ChannelOutboundHandler<
ChannelBuffer out = ctx.prevOut().byteBuffer();
ChannelBuffer nextOut = ctx.out().byteBuffer();
nextOut.writeBytes(out);
out.discardReadBytes();
}
ctx.flush(future);
}

View File

@ -82,10 +82,10 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha
@Override
public boolean isActive() {
return javaChannel().isConnected();
SocketChannel ch = javaChannel();
return ch.isOpen() && ch.isConnected();
}
@Override
@SuppressWarnings("unchecked")
protected ChannelBufferHolder<Object> firstOut() {