* Added TrafficMonitor

* Added AbstractChannelFactory that provides support methods for managing and notifying TrafficMonitors
* Added Channels.notifyInflow/Outflow()
* Modified all transports to notify TrafficMonitors
This commit is contained in:
Trustin Lee 2009-02-05 03:05:20 +00:00
parent 4d84b0e4c6
commit b6ecccb493
10 changed files with 206 additions and 5 deletions

View File

@ -0,0 +1,124 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$
*/
public abstract class AbstractChannelFactory implements ChannelFactory {
private final InternalLogger logger = InternalLoggerFactory.getInstance(getClass());
private final Object trafficMonitorLock = new Object();
private volatile TrafficMonitor[] trafficMonitors = new TrafficMonitor[0];
protected AbstractChannelFactory() {
super();
}
public void addTrafficMonitor(TrafficMonitor monitor) {
if (monitor == null) {
throw new NullPointerException("monitor");
}
synchronized (trafficMonitorLock) {
final TrafficMonitor[] oldTrafficMonitors = trafficMonitors;
for (TrafficMonitor m: oldTrafficMonitors) {
if (m == monitor) {
return;
}
}
final TrafficMonitor newTrafficMonitors[] =
new TrafficMonitor[oldTrafficMonitors.length + 1];
System.arraycopy(oldTrafficMonitors, 0, newTrafficMonitors, 0, oldTrafficMonitors.length);
newTrafficMonitors[oldTrafficMonitors.length] = monitor;
trafficMonitors = newTrafficMonitors;
}
}
public void removeTrafficMonitor(TrafficMonitor monitor) {
if (monitor == null) {
throw new NullPointerException("monitor");
}
synchronized (trafficMonitorLock) {
final TrafficMonitor[] oldTrafficMonitors = trafficMonitors;
boolean found = false;
for (TrafficMonitor m: oldTrafficMonitors) {
if (m == monitor) {
found = true;
break;
}
}
if (!found) {
return;
}
final TrafficMonitor newTrafficMonitors[] =
new TrafficMonitor[oldTrafficMonitors.length - 1];
int i = 0;
for (TrafficMonitor m: oldTrafficMonitors) {
if (m != monitor) {
newTrafficMonitors[i ++] = m;
}
}
trafficMonitors = newTrafficMonitors;
}
}
void notifyInflow(Channel channel, int amount) {
TrafficMonitor[] trafficMonitors = this.trafficMonitors;
for (TrafficMonitor m: trafficMonitors) {
try {
m.onInflow(channel, amount);
} catch (Exception e) {
logger.warn(
"An exception was thrown by " +
TrafficMonitor.class.getSimpleName() +
".onInflow().", e);
}
}
}
void notifyOutflow(Channel channel, int amount) {
TrafficMonitor[] trafficMonitors = this.trafficMonitors;
for (TrafficMonitor m: trafficMonitors) {
try {
m.onOutflow(channel, amount);
} catch (Exception e) {
logger.warn(
"An exception was thrown by " +
TrafficMonitor.class.getSimpleName() +
".onOutflow().", e);
}
}
}
}

View File

@ -74,6 +74,10 @@ public interface ChannelFactory extends ExternalResourceReleasable {
*/ */
Channel newChannel(ChannelPipeline pipeline); Channel newChannel(ChannelPipeline pipeline);
void addTrafficMonitor(TrafficMonitor monitor);
void removeTrafficMonitor(TrafficMonitor monitor);
/** /**
* Releases the external resources that this factory depends on to function. * Releases the external resources that this factory depends on to function.
* An external resource is a resource that this factory didn't create by * An external resource is a resource that this factory didn't create by

View File

@ -1062,6 +1062,27 @@ public class Channels {
close(ctx, future); close(ctx, future);
} }
public static void notifyInflow(Channel channel, int amount) {
if (amount <= 0) {
return;
}
ChannelFactory factory = channel.getFactory();
if (factory instanceof AbstractChannelFactory) {
((AbstractChannelFactory) factory).notifyInflow(channel, amount);
}
}
public static void notifyOutflow(Channel channel, int amount) {
if (amount <= 0) {
return;
}
ChannelFactory factory = channel.getFactory();
if (factory instanceof AbstractChannelFactory) {
((AbstractChannelFactory) factory).notifyOutflow(channel, amount);
}
}
private static void validateInterestOps(int interestOps) { private static void validateInterestOps(int interestOps) {
switch (interestOps) { switch (interestOps) {
case Channel.OP_NONE: case Channel.OP_NONE:

View File

@ -0,0 +1,33 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$
*/
public interface TrafficMonitor {
void onInflow(Channel channel, int amount) throws Exception;
void onOutflow(Channel channel, int amount) throws Exception;
}

View File

@ -27,6 +27,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import org.jboss.netty.channel.AbstractChannelFactory;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
@ -88,7 +89,7 @@ import org.jboss.netty.util.ExecutorUtil;
* *
* @apiviz.landmark * @apiviz.landmark
*/ */
public class NioClientSocketChannelFactory implements ClientSocketChannelFactory { public class NioClientSocketChannelFactory extends AbstractChannelFactory implements ClientSocketChannelFactory {
private final Executor bossExecutor; private final Executor bossExecutor;
private final Executor workerExecutor; private final Executor workerExecutor;

View File

@ -27,6 +27,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import org.jboss.netty.channel.AbstractChannelFactory;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink; import org.jboss.netty.channel.ChannelSink;
@ -92,7 +93,7 @@ import org.jboss.netty.util.ExecutorUtil;
* *
* @apiviz.landmark * @apiviz.landmark
*/ */
public class NioServerSocketChannelFactory implements ServerSocketChannelFactory { public class NioServerSocketChannelFactory extends AbstractChannelFactory implements ServerSocketChannelFactory {
final Executor bossExecutor; final Executor bossExecutor;
private final Executor workerExecutor; private final Executor workerExecutor;

View File

@ -297,6 +297,9 @@ class NioWorker implements Runnable {
// Update the predictor. // Update the predictor.
predictor.previousReceiveBufferSize(readBytes); predictor.previousReceiveBufferSize(readBytes);
// Notify the traffic monitors.
notifyInflow(channel, readBytes);
// Fire the event. // Fire the event.
fireMessageReceived(channel, buffer); fireMessageReceived(channel, buffer);
} }
@ -357,6 +360,7 @@ class NioWorker implements Runnable {
MessageEvent evt; MessageEvent evt;
ChannelBuffer buf; ChannelBuffer buf;
int bufIdx; int bufIdx;
int writtenBytes = 0;
Queue<MessageEvent> writeBuffer = channel.writeBuffer; Queue<MessageEvent> writeBuffer = channel.writeBuffer;
synchronized (channel.writeLock) { synchronized (channel.writeLock) {
@ -386,12 +390,18 @@ class NioWorker implements Runnable {
if (localWrittenBytes != 0) { if (localWrittenBytes != 0) {
bufIdx += localWrittenBytes; bufIdx += localWrittenBytes;
writtenBytes += localWrittenBytes;
break; break;
} }
} }
if (bufIdx == buf.writerIndex()) { if (bufIdx == buf.writerIndex()) {
// Successful write - proceed to the next message. // Successful write:
// Notify the traffic monitors.
notifyOutflow(channel, writtenBytes);
writtenBytes = 0;
// Proceed to the next message.
evt.getFuture().setSuccess(); evt.getFuture().setSuccess();
evt = null; evt = null;
} else { } else {
@ -415,6 +425,8 @@ class NioWorker implements Runnable {
} }
} }
notifyOutflow(channel, writtenBytes);
if (open) { if (open) {
if (addOpWrite) { if (addOpWrite) {
setOpWrite(channel); setOpWrite(channel);

View File

@ -25,6 +25,7 @@ package org.jboss.netty.channel.socket.oio;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import org.jboss.netty.channel.AbstractChannelFactory;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
@ -81,7 +82,7 @@ import org.jboss.netty.util.ExecutorUtil;
* *
* @apiviz.landmark * @apiviz.landmark
*/ */
public class OioClientSocketChannelFactory implements ClientSocketChannelFactory { public class OioClientSocketChannelFactory extends AbstractChannelFactory implements ClientSocketChannelFactory {
private final Executor workerExecutor; private final Executor workerExecutor;
final OioClientSocketPipelineSink sink; final OioClientSocketPipelineSink sink;

View File

@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.jboss.netty.channel.AbstractChannelFactory;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink; import org.jboss.netty.channel.ChannelSink;
@ -98,7 +99,7 @@ import org.jboss.netty.util.ExecutorUtil;
* *
* @apiviz.landmark * @apiviz.landmark
*/ */
public class OioServerSocketChannelFactory implements ServerSocketChannelFactory { public class OioServerSocketChannelFactory extends AbstractChannelFactory implements ServerSocketChannelFactory {
final Executor bossExecutor; final Executor bossExecutor;
private final Executor workerExecutor; private final Executor workerExecutor;

View File

@ -96,6 +96,8 @@ class OioWorker implements Runnable {
// A rare case, but it sometimes happen. // A rare case, but it sometimes happen.
buffer = ChannelBuffers.wrappedBuffer(buf, 0, readBytes); buffer = ChannelBuffers.wrappedBuffer(buf, 0, readBytes);
} }
notifyInflow(channel, readBytes);
fireMessageReceived(channel, buffer); fireMessageReceived(channel, buffer);
} }
@ -116,6 +118,7 @@ class OioWorker implements Runnable {
synchronized (out) { synchronized (out) {
a.getBytes(a.readerIndex(), out, a.readableBytes()); a.getBytes(a.readerIndex(), out, a.readableBytes());
} }
notifyOutflow(channel, a.readableBytes());
future.setSuccess(); future.setSuccess();
} catch (Throwable t) { } catch (Throwable t) {
future.setFailure(t); future.setFailure(t);