* Added more handler methods to TrafficMonitor for more precise monitoring

This commit is contained in:
Trustin Lee 2009-02-05 04:47:25 +00:00
parent acfa2b1ce0
commit 44ccfa44b0
6 changed files with 82 additions and 31 deletions

View File

@ -94,44 +94,72 @@ public abstract class AbstractChannelFactory implements ChannelFactory {
} }
} }
void notifyState(Channel channel) { void fireChannelOpen(Channel channel) {
TrafficMonitor[] trafficMonitors = this.trafficMonitors; TrafficMonitor[] trafficMonitors = this.trafficMonitors;
for (TrafficMonitor m: trafficMonitors) { for (TrafficMonitor m: trafficMonitors) {
try { try {
m.onState(channel); m.channelOpen(channel);
} catch (Exception e) { } catch (Exception e) {
logger.warn( logger.warn(
"An exception was thrown by " + "An exception was thrown by " +
TrafficMonitor.class.getSimpleName() + TrafficMonitor.class.getSimpleName() +
".onState().", e); ".channelOpen().", e);
} }
} }
} }
void notifyInflow(Channel channel, int amount) { void fireChannelClosed(Channel channel) {
TrafficMonitor[] trafficMonitors = this.trafficMonitors; TrafficMonitor[] trafficMonitors = this.trafficMonitors;
for (TrafficMonitor m: trafficMonitors) { for (TrafficMonitor m: trafficMonitors) {
try { try {
m.onInflow(channel, amount); m.channelClosed(channel);
} catch (Exception e) { } catch (Exception e) {
logger.warn( logger.warn(
"An exception was thrown by " + "An exception was thrown by " +
TrafficMonitor.class.getSimpleName() + TrafficMonitor.class.getSimpleName() +
".onInflow().", e); ".channelClosed().", e);
} }
} }
} }
void notifyOutflow(Channel channel, int amount) { void fireChannelRead(Channel channel, int amount) {
TrafficMonitor[] trafficMonitors = this.trafficMonitors; TrafficMonitor[] trafficMonitors = this.trafficMonitors;
for (TrafficMonitor m: trafficMonitors) { for (TrafficMonitor m: trafficMonitors) {
try { try {
m.onOutflow(channel, amount); m.channelRead(channel, amount);
} catch (Exception e) { } catch (Exception e) {
logger.warn( logger.warn(
"An exception was thrown by " + "An exception was thrown by " +
TrafficMonitor.class.getSimpleName() + TrafficMonitor.class.getSimpleName() +
".onOutflow().", e); ".channelRead().", e);
}
}
}
void fireChannelWriteScheduled(Channel channel, int amount) {
TrafficMonitor[] trafficMonitors = this.trafficMonitors;
for (TrafficMonitor m: trafficMonitors) {
try {
m.channelWriteScheduled(channel, amount);
} catch (Exception e) {
logger.warn(
"An exception was thrown by " +
TrafficMonitor.class.getSimpleName() +
".channelWriteScheduled().", e);
}
}
}
void fireChannelWritten(Channel channel, int amount) {
TrafficMonitor[] trafficMonitors = this.trafficMonitors;
for (TrafficMonitor m: trafficMonitors) {
try {
m.channelWritten(channel, amount);
} catch (Exception e) {
logger.warn(
"An exception was thrown by " +
TrafficMonitor.class.getSimpleName() +
".channelWritten().", e);
} }
} }
} }

View File

@ -183,10 +183,17 @@ public class Channels {
* a {@code "childChannelOpen"} event will be sent, too. * a {@code "childChannelOpen"} event will be sent, too.
*/ */
public static void fireChannelOpen(Channel channel) { public static void fireChannelOpen(Channel channel) {
// Notify the parent handler.
if (channel.getParent() != null) { if (channel.getParent() != null) {
fireChildChannelStateChanged(channel.getParent(), channel); fireChildChannelStateChanged(channel.getParent(), channel);
} }
notifyState(channel);
// Notify traffic monitors
ChannelFactory factory = channel.getFactory();
if (factory instanceof AbstractChannelFactory) {
((AbstractChannelFactory) factory).fireChannelOpen(channel);
}
channel.getPipeline().sendUpstream( channel.getPipeline().sendUpstream(
new DefaultChannelStateEvent( new DefaultChannelStateEvent(
channel, succeededFuture(channel), channel, succeededFuture(channel),
@ -548,7 +555,14 @@ public class Channels {
new DefaultChannelStateEvent( new DefaultChannelStateEvent(
channel, succeededFuture(channel), channel, succeededFuture(channel),
ChannelState.OPEN, Boolean.FALSE)); ChannelState.OPEN, Boolean.FALSE));
notifyState(channel);
// Notify traffic monitors
ChannelFactory factory = channel.getFactory();
if (factory instanceof AbstractChannelFactory) {
((AbstractChannelFactory) factory).fireChannelOpen(channel);
}
// Notify the parent handler.
if (channel.getParent() != null) { if (channel.getParent() != null) {
fireChildChannelStateChanged(channel.getParent(), channel); fireChildChannelStateChanged(channel.getParent(), channel);
} }
@ -1064,33 +1078,35 @@ public class Channels {
close(ctx, future); close(ctx, future);
} }
private static void notifyState(Channel channel) { public static void fireChannelRead(Channel channel, int amount) {
ChannelFactory factory = channel.getFactory();
if (factory instanceof AbstractChannelFactory) {
((AbstractChannelFactory) factory).notifyState(channel);
}
}
public static void notifyInflow(Channel channel, int amount) {
if (amount <= 0) { if (amount <= 0) {
return; return;
} }
ChannelFactory factory = channel.getFactory(); ChannelFactory factory = channel.getFactory();
if (factory instanceof AbstractChannelFactory) { if (factory instanceof AbstractChannelFactory) {
((AbstractChannelFactory) factory).notifyInflow(channel, amount); ((AbstractChannelFactory) factory).fireChannelRead(channel, amount);
} }
} }
public static void notifyOutflow(Channel channel, int amount) { public static void fireChannelWriteScheduled(Channel channel, int amount) {
if (amount <= 0) { if (amount <= 0) {
return; return;
} }
ChannelFactory factory = channel.getFactory(); ChannelFactory factory = channel.getFactory();
if (factory instanceof AbstractChannelFactory) { if (factory instanceof AbstractChannelFactory) {
((AbstractChannelFactory) factory).notifyOutflow(channel, amount); ((AbstractChannelFactory) factory).fireChannelWriteScheduled(channel, amount);
} }
} }
public static void fireChannelWritten(Channel channel, int amount) {
if (amount <= 0) {
return;
}
ChannelFactory factory = channel.getFactory();
if (factory instanceof AbstractChannelFactory) {
((AbstractChannelFactory) factory).fireChannelWritten(channel, amount);
}
}
private static void validateInterestOps(int interestOps) { private static void validateInterestOps(int interestOps) {
switch (interestOps) { switch (interestOps) {

View File

@ -28,7 +28,10 @@ package org.jboss.netty.channel;
* @version $Rev$, $Date$ * @version $Rev$, $Date$
*/ */
public interface TrafficMonitor { public interface TrafficMonitor {
void onState(Channel channel) throws Exception; void channelOpen(Channel channel) throws Exception;
void onInflow(Channel channel, int amount) throws Exception; void channelClosed(Channel channel) throws Exception;
void onOutflow(Channel channel, int amount) throws Exception;
void channelRead(Channel channel, int amount) throws Exception;
void channelWriteScheduled(Channel channel, int amount) throws Exception;
void channelWritten(Channel channel, int amount) throws Exception;
} }

View File

@ -186,6 +186,8 @@ class NioSocketChannel extends AbstractChannel
assert success; assert success;
int messageSize = ((ChannelBuffer) e.getMessage()).readableBytes(); int messageSize = ((ChannelBuffer) e.getMessage()).readableBytes();
fireChannelWriteScheduled(NioSocketChannel.this, messageSize);
int newWriteBufferSize = writeBufferSize.addAndGet(messageSize); int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
int highWaterMark = getConfig().getWriteBufferHighWaterMark(); int highWaterMark = getConfig().getWriteBufferHighWaterMark();

View File

@ -298,7 +298,7 @@ class NioWorker implements Runnable {
predictor.previousReceiveBufferSize(readBytes); predictor.previousReceiveBufferSize(readBytes);
// Notify the traffic monitors. // Notify the traffic monitors.
notifyInflow(channel, readBytes); fireChannelRead(channel, readBytes);
// Fire the event. // Fire the event.
fireMessageReceived(channel, buffer); fireMessageReceived(channel, buffer);
@ -398,7 +398,7 @@ class NioWorker implements Runnable {
if (bufIdx == buf.writerIndex()) { if (bufIdx == buf.writerIndex()) {
// Successful write: // Successful write:
// Notify the traffic monitors. // Notify the traffic monitors.
notifyOutflow(channel, writtenBytes); fireChannelWritten(channel, writtenBytes);
writtenBytes = 0; writtenBytes = 0;
// Proceed to the next message. // Proceed to the next message.
@ -425,7 +425,7 @@ class NioWorker implements Runnable {
} }
} }
notifyOutflow(channel, writtenBytes); fireChannelWritten(channel, writtenBytes);
if (open) { if (open) {
if (addOpWrite) { if (addOpWrite) {

View File

@ -97,7 +97,7 @@ class OioWorker implements Runnable {
buffer = ChannelBuffers.wrappedBuffer(buf, 0, readBytes); buffer = ChannelBuffers.wrappedBuffer(buf, 0, readBytes);
} }
notifyInflow(channel, readBytes); fireChannelRead(channel, readBytes);
fireMessageReceived(channel, buffer); fireMessageReceived(channel, buffer);
} }
@ -115,10 +115,12 @@ class OioWorker implements Runnable {
OutputStream out = channel.getOutputStream(); OutputStream out = channel.getOutputStream();
try { try {
ChannelBuffer a = (ChannelBuffer) message; ChannelBuffer a = (ChannelBuffer) message;
int bytes = a.readableBytes();
fireChannelWriteScheduled(channel, bytes);
synchronized (out) { synchronized (out) {
a.getBytes(a.readerIndex(), out, a.readableBytes()); a.getBytes(a.readerIndex(), out, bytes);
} }
notifyOutflow(channel, a.readableBytes()); fireChannelWritten(channel, bytes);
future.setSuccess(); future.setSuccess();
} catch (Throwable t) { } catch (Throwable t) {
future.setFailure(t); future.setFailure(t);