Reverted back recent changes related with TrafficMonitor

This commit is contained in:
Trustin Lee 2009-02-05 05:03:34 +00:00
parent d93d74677e
commit eb522dcd56
12 changed files with 8 additions and 274 deletions

View File

@ -1,166 +0,0 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$
*/
public abstract class AbstractChannelFactory implements ChannelFactory {
private final InternalLogger logger = InternalLoggerFactory.getInstance(getClass());
private final Object trafficMonitorLock = new Object();
private volatile TrafficMonitor[] trafficMonitors = new TrafficMonitor[0];
protected AbstractChannelFactory() {
super();
}
public void addTrafficMonitor(TrafficMonitor monitor) {
if (monitor == null) {
throw new NullPointerException("monitor");
}
synchronized (trafficMonitorLock) {
final TrafficMonitor[] oldTrafficMonitors = trafficMonitors;
for (TrafficMonitor m: oldTrafficMonitors) {
if (m == monitor) {
return;
}
}
final TrafficMonitor newTrafficMonitors[] =
new TrafficMonitor[oldTrafficMonitors.length + 1];
System.arraycopy(oldTrafficMonitors, 0, newTrafficMonitors, 0, oldTrafficMonitors.length);
newTrafficMonitors[oldTrafficMonitors.length] = monitor;
trafficMonitors = newTrafficMonitors;
}
}
public void removeTrafficMonitor(TrafficMonitor monitor) {
if (monitor == null) {
throw new NullPointerException("monitor");
}
synchronized (trafficMonitorLock) {
final TrafficMonitor[] oldTrafficMonitors = trafficMonitors;
boolean found = false;
for (TrafficMonitor m: oldTrafficMonitors) {
if (m == monitor) {
found = true;
break;
}
}
if (!found) {
return;
}
final TrafficMonitor newTrafficMonitors[] =
new TrafficMonitor[oldTrafficMonitors.length - 1];
int i = 0;
for (TrafficMonitor m: oldTrafficMonitors) {
if (m != monitor) {
newTrafficMonitors[i ++] = m;
}
}
trafficMonitors = newTrafficMonitors;
}
}
void fireChannelOpen(Channel channel) {
TrafficMonitor[] trafficMonitors = this.trafficMonitors;
for (TrafficMonitor m: trafficMonitors) {
try {
m.channelOpen(channel);
} catch (Exception e) {
logger.warn(
"An exception was thrown by " +
TrafficMonitor.class.getSimpleName() +
".channelOpen().", e);
}
}
}
void fireChannelClosed(Channel channel) {
TrafficMonitor[] trafficMonitors = this.trafficMonitors;
for (TrafficMonitor m: trafficMonitors) {
try {
m.channelClosed(channel);
} catch (Exception e) {
logger.warn(
"An exception was thrown by " +
TrafficMonitor.class.getSimpleName() +
".channelClosed().", e);
}
}
}
void fireChannelRead(Channel channel, int amount) {
TrafficMonitor[] trafficMonitors = this.trafficMonitors;
for (TrafficMonitor m: trafficMonitors) {
try {
m.channelRead(channel, amount);
} catch (Exception e) {
logger.warn(
"An exception was thrown by " +
TrafficMonitor.class.getSimpleName() +
".channelRead().", e);
}
}
}
void fireChannelWriteScheduled(Channel channel, int amount) {
TrafficMonitor[] trafficMonitors = this.trafficMonitors;
for (TrafficMonitor m: trafficMonitors) {
try {
m.channelWriteScheduled(channel, amount);
} catch (Exception e) {
logger.warn(
"An exception was thrown by " +
TrafficMonitor.class.getSimpleName() +
".channelWriteScheduled().", e);
}
}
}
void fireChannelWritten(Channel channel, int amount) {
TrafficMonitor[] trafficMonitors = this.trafficMonitors;
for (TrafficMonitor m: trafficMonitors) {
try {
m.channelWritten(channel, amount);
} catch (Exception e) {
logger.warn(
"An exception was thrown by " +
TrafficMonitor.class.getSimpleName() +
".channelWritten().", e);
}
}
}
}

View File

@ -74,10 +74,6 @@ public interface ChannelFactory extends ExternalResourceReleasable {
*/
Channel newChannel(ChannelPipeline pipeline);
void addTrafficMonitor(TrafficMonitor monitor);
void removeTrafficMonitor(TrafficMonitor monitor);
/**
* Releases the external resources that this factory depends on to function.
* An external resource is a resource that this factory didn't create by

View File

@ -188,12 +188,6 @@ public class Channels {
fireChildChannelStateChanged(channel.getParent(), channel);
}
// Notify traffic monitors
ChannelFactory factory = channel.getFactory();
if (factory instanceof AbstractChannelFactory) {
((AbstractChannelFactory) factory).fireChannelOpen(channel);
}
channel.getPipeline().sendUpstream(
new DefaultChannelStateEvent(
channel, succeededFuture(channel),
@ -556,12 +550,6 @@ public class Channels {
channel, succeededFuture(channel),
ChannelState.OPEN, Boolean.FALSE));
// Notify traffic monitors
ChannelFactory factory = channel.getFactory();
if (factory instanceof AbstractChannelFactory) {
((AbstractChannelFactory) factory).fireChannelOpen(channel);
}
// Notify the parent handler.
if (channel.getParent() != null) {
fireChildChannelStateChanged(channel.getParent(), channel);
@ -1078,36 +1066,6 @@ public class Channels {
close(ctx, future);
}
public static void fireChannelRead(Channel channel, int amount) {
if (amount <= 0) {
return;
}
ChannelFactory factory = channel.getFactory();
if (factory instanceof AbstractChannelFactory) {
((AbstractChannelFactory) factory).fireChannelRead(channel, amount);
}
}
public static void fireChannelWriteScheduled(Channel channel, int amount) {
if (amount <= 0) {
return;
}
ChannelFactory factory = channel.getFactory();
if (factory instanceof AbstractChannelFactory) {
((AbstractChannelFactory) factory).fireChannelWriteScheduled(channel, amount);
}
}
public static void fireChannelWritten(Channel channel, int amount) {
if (amount <= 0) {
return;
}
ChannelFactory factory = channel.getFactory();
if (factory instanceof AbstractChannelFactory) {
((AbstractChannelFactory) factory).fireChannelWritten(channel, amount);
}
}
private static void validateInterestOps(int interestOps) {
switch (interestOps) {
case Channel.OP_NONE:

View File

@ -1,37 +0,0 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$
*/
public interface TrafficMonitor {
void channelOpen(Channel channel) throws Exception;
void channelClosed(Channel channel) throws Exception;
void channelRead(Channel channel, int amount) throws Exception;
void channelWriteScheduled(Channel channel, int amount) throws Exception;
void channelWritten(Channel channel, int amount) throws Exception;
}

View File

@ -27,7 +27,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import org.jboss.netty.channel.AbstractChannelFactory;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
@ -89,7 +88,7 @@ import org.jboss.netty.util.ExecutorUtil;
*
* @apiviz.landmark
*/
public class NioClientSocketChannelFactory extends AbstractChannelFactory implements ClientSocketChannelFactory {
public class NioClientSocketChannelFactory implements ClientSocketChannelFactory {
private final Executor bossExecutor;
private final Executor workerExecutor;

View File

@ -27,7 +27,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import org.jboss.netty.channel.AbstractChannelFactory;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
@ -93,7 +92,7 @@ import org.jboss.netty.util.ExecutorUtil;
*
* @apiviz.landmark
*/
public class NioServerSocketChannelFactory extends AbstractChannelFactory implements ServerSocketChannelFactory {
public class NioServerSocketChannelFactory implements ServerSocketChannelFactory {
final Executor bossExecutor;
private final Executor workerExecutor;

View File

@ -186,8 +186,6 @@ class NioSocketChannel extends AbstractChannel
assert success;
int messageSize = ((ChannelBuffer) e.getMessage()).readableBytes();
fireChannelWriteScheduled(NioSocketChannel.this, messageSize);
int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
int highWaterMark = getConfig().getWriteBufferHighWaterMark();

View File

@ -297,9 +297,6 @@ class NioWorker implements Runnable {
// Update the predictor.
predictor.previousReceiveBufferSize(readBytes);
// Notify the traffic monitors.
fireChannelRead(channel, readBytes);
// Fire the event.
fireMessageReceived(channel, buffer);
}
@ -396,12 +393,7 @@ class NioWorker implements Runnable {
}
if (bufIdx == buf.writerIndex()) {
// Successful write:
// Notify the traffic monitors.
fireChannelWritten(channel, writtenBytes);
writtenBytes = 0;
// Proceed to the next message.
// Successful write - proceed to the next message.
evt.getFuture().setSuccess();
evt = null;
} else {
@ -425,7 +417,7 @@ class NioWorker implements Runnable {
}
}
fireChannelWritten(channel, writtenBytes);
//fireChannelWritten(channel, writtenBytes);
if (open) {
if (addOpWrite) {

View File

@ -25,7 +25,6 @@ package org.jboss.netty.channel.socket.oio;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import org.jboss.netty.channel.AbstractChannelFactory;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
@ -82,7 +81,7 @@ import org.jboss.netty.util.ExecutorUtil;
*
* @apiviz.landmark
*/
public class OioClientSocketChannelFactory extends AbstractChannelFactory implements ClientSocketChannelFactory {
public class OioClientSocketChannelFactory implements ClientSocketChannelFactory {
private final Executor workerExecutor;
final OioClientSocketPipelineSink sink;

View File

@ -27,7 +27,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.channel.AbstractChannelFactory;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
@ -99,7 +98,7 @@ import org.jboss.netty.util.ExecutorUtil;
*
* @apiviz.landmark
*/
public class OioServerSocketChannelFactory extends AbstractChannelFactory implements ServerSocketChannelFactory {
public class OioServerSocketChannelFactory implements ServerSocketChannelFactory {
final Executor bossExecutor;
private final Executor workerExecutor;

View File

@ -97,7 +97,6 @@ class OioWorker implements Runnable {
buffer = ChannelBuffers.wrappedBuffer(buf, 0, readBytes);
}
fireChannelRead(channel, readBytes);
fireMessageReceived(channel, buffer);
}
@ -116,11 +115,10 @@ class OioWorker implements Runnable {
try {
ChannelBuffer a = (ChannelBuffer) message;
int bytes = a.readableBytes();
fireChannelWriteScheduled(channel, bytes);
synchronized (out) {
a.getBytes(a.readerIndex(), out, bytes);
}
fireChannelWritten(channel, bytes);
//fireChannelWritten(channel, bytes);
future.setSuccess();
} catch (Throwable t) {
future.setFailure(t);

View File

@ -22,7 +22,6 @@
*/
package org.jboss.netty.handler.codec.embedder;
import org.jboss.netty.channel.AbstractChannelFactory;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
@ -32,7 +31,7 @@ import org.jboss.netty.channel.ChannelPipeline;
* @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$
*/
class EmbeddedChannelFactory extends AbstractChannelFactory {
class EmbeddedChannelFactory implements ChannelFactory {
static final ChannelFactory INSTANCE = new EmbeddedChannelFactory();