diff --git a/src/main/java/org/jboss/netty/channel/AbstractChannelFactory.java b/src/main/java/org/jboss/netty/channel/AbstractChannelFactory.java index e577018e15..d54a60a230 100644 --- a/src/main/java/org/jboss/netty/channel/AbstractChannelFactory.java +++ b/src/main/java/org/jboss/netty/channel/AbstractChannelFactory.java @@ -94,44 +94,72 @@ public abstract class AbstractChannelFactory implements ChannelFactory { } } - void notifyState(Channel channel) { + void fireChannelOpen(Channel channel) { TrafficMonitor[] trafficMonitors = this.trafficMonitors; for (TrafficMonitor m: trafficMonitors) { try { - m.onState(channel); + m.channelOpen(channel); } catch (Exception e) { logger.warn( "An exception was thrown by " + TrafficMonitor.class.getSimpleName() + - ".onState().", e); + ".channelOpen().", e); } } } - void notifyInflow(Channel channel, int amount) { + void fireChannelClosed(Channel channel) { TrafficMonitor[] trafficMonitors = this.trafficMonitors; for (TrafficMonitor m: trafficMonitors) { try { - m.onInflow(channel, amount); + m.channelClosed(channel); } catch (Exception e) { logger.warn( "An exception was thrown by " + TrafficMonitor.class.getSimpleName() + - ".onInflow().", e); + ".channelClosed().", e); } } } - void notifyOutflow(Channel channel, int amount) { + void fireChannelRead(Channel channel, int amount) { TrafficMonitor[] trafficMonitors = this.trafficMonitors; for (TrafficMonitor m: trafficMonitors) { try { - m.onOutflow(channel, amount); + m.channelRead(channel, amount); } catch (Exception e) { logger.warn( "An exception was thrown by " + TrafficMonitor.class.getSimpleName() + - ".onOutflow().", e); + ".channelRead().", e); + } + } + } + + void fireChannelWriteScheduled(Channel channel, int amount) { + TrafficMonitor[] trafficMonitors = this.trafficMonitors; + for (TrafficMonitor m: trafficMonitors) { + try { + m.channelWriteScheduled(channel, amount); + } catch (Exception e) { + logger.warn( + "An exception was thrown by " + + TrafficMonitor.class.getSimpleName() + + ".channelWriteScheduled().", e); + } + } + } + + void fireChannelWritten(Channel channel, int amount) { + TrafficMonitor[] trafficMonitors = this.trafficMonitors; + for (TrafficMonitor m: trafficMonitors) { + try { + m.channelWritten(channel, amount); + } catch (Exception e) { + logger.warn( + "An exception was thrown by " + + TrafficMonitor.class.getSimpleName() + + ".channelWritten().", e); } } } diff --git a/src/main/java/org/jboss/netty/channel/Channels.java b/src/main/java/org/jboss/netty/channel/Channels.java index fc592db673..ee754446e4 100644 --- a/src/main/java/org/jboss/netty/channel/Channels.java +++ b/src/main/java/org/jboss/netty/channel/Channels.java @@ -183,10 +183,17 @@ public class Channels { * a {@code "childChannelOpen"} event will be sent, too. */ public static void fireChannelOpen(Channel channel) { + // Notify the parent handler. if (channel.getParent() != null) { fireChildChannelStateChanged(channel.getParent(), channel); } - notifyState(channel); + + // Notify traffic monitors + ChannelFactory factory = channel.getFactory(); + if (factory instanceof AbstractChannelFactory) { + ((AbstractChannelFactory) factory).fireChannelOpen(channel); + } + channel.getPipeline().sendUpstream( new DefaultChannelStateEvent( channel, succeededFuture(channel), @@ -548,7 +555,14 @@ public class Channels { new DefaultChannelStateEvent( channel, succeededFuture(channel), ChannelState.OPEN, Boolean.FALSE)); - notifyState(channel); + + // Notify traffic monitors + ChannelFactory factory = channel.getFactory(); + if (factory instanceof AbstractChannelFactory) { + ((AbstractChannelFactory) factory).fireChannelOpen(channel); + } + + // Notify the parent handler. if (channel.getParent() != null) { fireChildChannelStateChanged(channel.getParent(), channel); } @@ -1064,33 +1078,35 @@ public class Channels { close(ctx, future); } - private static void notifyState(Channel channel) { - ChannelFactory factory = channel.getFactory(); - if (factory instanceof AbstractChannelFactory) { - ((AbstractChannelFactory) factory).notifyState(channel); - } - } - - public static void notifyInflow(Channel channel, int amount) { + public static void fireChannelRead(Channel channel, int amount) { if (amount <= 0) { return; } ChannelFactory factory = channel.getFactory(); if (factory instanceof AbstractChannelFactory) { - ((AbstractChannelFactory) factory).notifyInflow(channel, amount); + ((AbstractChannelFactory) factory).fireChannelRead(channel, amount); } } - public static void notifyOutflow(Channel channel, int amount) { + public static void fireChannelWriteScheduled(Channel channel, int amount) { if (amount <= 0) { return; } ChannelFactory factory = channel.getFactory(); if (factory instanceof AbstractChannelFactory) { - ((AbstractChannelFactory) factory).notifyOutflow(channel, amount); + ((AbstractChannelFactory) factory).fireChannelWriteScheduled(channel, amount); } } + public static void fireChannelWritten(Channel channel, int amount) { + if (amount <= 0) { + return; + } + ChannelFactory factory = channel.getFactory(); + if (factory instanceof AbstractChannelFactory) { + ((AbstractChannelFactory) factory).fireChannelWritten(channel, amount); + } + } private static void validateInterestOps(int interestOps) { switch (interestOps) { diff --git a/src/main/java/org/jboss/netty/channel/TrafficMonitor.java b/src/main/java/org/jboss/netty/channel/TrafficMonitor.java index fc3dcd3887..5e46ca905b 100644 --- a/src/main/java/org/jboss/netty/channel/TrafficMonitor.java +++ b/src/main/java/org/jboss/netty/channel/TrafficMonitor.java @@ -28,7 +28,10 @@ package org.jboss.netty.channel; * @version $Rev$, $Date$ */ public interface TrafficMonitor { - void onState(Channel channel) throws Exception; - void onInflow(Channel channel, int amount) throws Exception; - void onOutflow(Channel channel, int amount) throws Exception; + void channelOpen(Channel channel) throws Exception; + void channelClosed(Channel channel) throws Exception; + + void channelRead(Channel channel, int amount) throws Exception; + void channelWriteScheduled(Channel channel, int amount) throws Exception; + void channelWritten(Channel channel, int amount) throws Exception; } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java index 6cdc3e2a0b..1d32292743 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java @@ -186,6 +186,8 @@ class NioSocketChannel extends AbstractChannel assert success; int messageSize = ((ChannelBuffer) e.getMessage()).readableBytes(); + fireChannelWriteScheduled(NioSocketChannel.this, messageSize); + int newWriteBufferSize = writeBufferSize.addAndGet(messageSize); int highWaterMark = getConfig().getWriteBufferHighWaterMark(); diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index 3de164c25b..eefb10808e 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -298,7 +298,7 @@ class NioWorker implements Runnable { predictor.previousReceiveBufferSize(readBytes); // Notify the traffic monitors. - notifyInflow(channel, readBytes); + fireChannelRead(channel, readBytes); // Fire the event. fireMessageReceived(channel, buffer); @@ -398,7 +398,7 @@ class NioWorker implements Runnable { if (bufIdx == buf.writerIndex()) { // Successful write: // Notify the traffic monitors. - notifyOutflow(channel, writtenBytes); + fireChannelWritten(channel, writtenBytes); writtenBytes = 0; // Proceed to the next message. @@ -425,7 +425,7 @@ class NioWorker implements Runnable { } } - notifyOutflow(channel, writtenBytes); + fireChannelWritten(channel, writtenBytes); if (open) { if (addOpWrite) { diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/OioWorker.java b/src/main/java/org/jboss/netty/channel/socket/oio/OioWorker.java index 65f243d072..dc54de86ac 100644 --- a/src/main/java/org/jboss/netty/channel/socket/oio/OioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/oio/OioWorker.java @@ -97,7 +97,7 @@ class OioWorker implements Runnable { buffer = ChannelBuffers.wrappedBuffer(buf, 0, readBytes); } - notifyInflow(channel, readBytes); + fireChannelRead(channel, readBytes); fireMessageReceived(channel, buffer); } @@ -115,10 +115,12 @@ class OioWorker implements Runnable { OutputStream out = channel.getOutputStream(); try { ChannelBuffer a = (ChannelBuffer) message; + int bytes = a.readableBytes(); + fireChannelWriteScheduled(channel, bytes); synchronized (out) { - a.getBytes(a.readerIndex(), out, a.readableBytes()); + a.getBytes(a.readerIndex(), out, bytes); } - notifyOutflow(channel, a.readableBytes()); + fireChannelWritten(channel, bytes); future.setSuccess(); } catch (Throwable t) { future.setFailure(t);