Merge pull request #338 from fredericBregier/master

Improvement: Allow ObjectSizeEstimator in traffic shaping
This commit is contained in:
Norman Maurer 2012-05-18 05:45:20 -07:00
commit ba8cfa5d33
4 changed files with 250 additions and 8 deletions

View File

@ -28,7 +28,9 @@ import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelHandler; import io.netty.channel.SimpleChannelHandler;
import io.netty.logging.InternalLogger; import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory; import io.netty.logging.InternalLoggerFactory;
import io.netty.util.DefaultObjectSizeEstimator;
import io.netty.util.ExternalResourceReleasable; import io.netty.util.ExternalResourceReleasable;
import io.netty.util.ObjectSizeEstimator;
import io.netty.util.internal.ExecutorUtil; import io.netty.util.internal.ExecutorUtil;
/** /**
@ -72,6 +74,11 @@ public abstract class AbstractTrafficShapingHandler extends
*/ */
protected TrafficCounter trafficCounter; protected TrafficCounter trafficCounter;
/**
* ObjectSizeEstimator
*/
private ObjectSizeEstimator objectSizeEstimator;
/** /**
* Executor to associated to any TrafficCounter * Executor to associated to any TrafficCounter
*/ */
@ -99,8 +106,9 @@ public abstract class AbstractTrafficShapingHandler extends
*/ */
final AtomicBoolean release = new AtomicBoolean(false); final AtomicBoolean release = new AtomicBoolean(false);
private void init( private void init(ObjectSizeEstimator newObjectSizeEstimator,
Executor newExecutor, long newWriteLimit, long newReadLimit, long newCheckInterval) { Executor newExecutor, long newWriteLimit, long newReadLimit, long newCheckInterval) {
objectSizeEstimator = newObjectSizeEstimator;
executor = newExecutor; executor = newExecutor;
writeLimit = newWriteLimit; writeLimit = newWriteLimit;
readLimit = newReadLimit; readLimit = newReadLimit;
@ -117,6 +125,8 @@ public abstract class AbstractTrafficShapingHandler extends
} }
/** /**
* Constructor using default {@link ObjectSizeEstimator}
*
* @param executor * @param executor
* created for instance like Executors.newCachedThreadPool * created for instance like Executors.newCachedThreadPool
* @param writeLimit * @param writeLimit
@ -129,10 +139,36 @@ public abstract class AbstractTrafficShapingHandler extends
*/ */
public AbstractTrafficShapingHandler(Executor executor, long writeLimit, public AbstractTrafficShapingHandler(Executor executor, long writeLimit,
long readLimit, long checkInterval) { long readLimit, long checkInterval) {
init(executor, writeLimit, readLimit, checkInterval); init(new DefaultObjectSizeEstimator(), executor, writeLimit, readLimit,
checkInterval);
} }
/** /**
* Constructor using the specified ObjectSizeEstimator
*
* @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message
* @param executor
* created for instance like Executors.newCachedThreadPool
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
*/
public AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Executor executor,
long writeLimit, long readLimit, long checkInterval) {
init(objectSizeEstimator, executor, writeLimit, readLimit,
checkInterval);
}
/**
* Constructor using default {@link ObjectSizeEstimator} and using default Check Interval
*
* @param executor * @param executor
* created for instance like Executors.newCachedThreadPool * created for instance like Executors.newCachedThreadPool
* @param writeLimit * @param writeLimit
@ -142,7 +178,84 @@ public abstract class AbstractTrafficShapingHandler extends
*/ */
public AbstractTrafficShapingHandler(Executor executor, long writeLimit, public AbstractTrafficShapingHandler(Executor executor, long writeLimit,
long readLimit) { long readLimit) {
init(executor, writeLimit, readLimit, DEFAULT_CHECK_INTERVAL); init(new DefaultObjectSizeEstimator(), executor, writeLimit, readLimit,
DEFAULT_CHECK_INTERVAL);
}
/**
* Constructor using the specified ObjectSizeEstimator and using default Check Interval
*
* @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message
* @param executor
* created for instance like Executors.newCachedThreadPool
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
*/
public AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Executor executor,
long writeLimit, long readLimit) {
init(objectSizeEstimator, executor, writeLimit, readLimit,
DEFAULT_CHECK_INTERVAL);
}
/**
* Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT and default Check Interval
*
* @param executor
* created for instance like Executors.newCachedThreadPool
*/
public AbstractTrafficShapingHandler(Executor executor) {
init(new DefaultObjectSizeEstimator(), executor, 0, 0,
DEFAULT_CHECK_INTERVAL);
}
/**
* Constructor using the specified ObjectSizeEstimator and using NO LIMIT and default Check Interval
*
* @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message
* @param executor
* created for instance like Executors.newCachedThreadPool
*/
public AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Executor executor) {
init(objectSizeEstimator, executor, 0, 0, DEFAULT_CHECK_INTERVAL);
}
/**
* Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT
*
* @param executor
* created for instance like Executors.newCachedThreadPool
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
*/
public AbstractTrafficShapingHandler(Executor executor, long checkInterval) {
init(new DefaultObjectSizeEstimator(), executor, 0, 0, checkInterval);
}
/**
* Constructor using the specified ObjectSizeEstimator and using NO LIMIT
*
* @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message
* @param executor
* created for instance like Executors.newCachedThreadPool
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
*/
public AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Executor executor,
long checkInterval) {
init(objectSizeEstimator, executor, 0, 0, checkInterval);
} }
/** /**
@ -255,7 +368,7 @@ public abstract class AbstractTrafficShapingHandler extends
throws Exception { throws Exception {
try { try {
long curtime = System.currentTimeMillis(); long curtime = System.currentTimeMillis();
long size = ((ChannelBuffer) arg1.getMessage()).readableBytes(); long size = objectSizeEstimator.estimateSize(arg1.getMessage());
if (trafficCounter != null) { if (trafficCounter != null) {
trafficCounter.bytesRecvFlowControl(arg0, size); trafficCounter.bytesRecvFlowControl(arg0, size);
if (readLimit == 0) { if (readLimit == 0) {
@ -315,7 +428,7 @@ public abstract class AbstractTrafficShapingHandler extends
throws Exception { throws Exception {
try { try {
long curtime = System.currentTimeMillis(); long curtime = System.currentTimeMillis();
long size = ((ChannelBuffer) arg1.getMessage()).readableBytes(); long size = objectSizeEstimator.estimateSize(arg1.getMessage());
if (trafficCounter != null) { if (trafficCounter != null) {
trafficCounter.bytesWriteFlowControl(size); trafficCounter.bytesWriteFlowControl(size);
if (writeLimit == 0) { if (writeLimit == 0) {

View File

@ -23,6 +23,7 @@ import io.netty.channel.ChannelStateEvent;
import io.netty.handler.execution.ExecutionHandler; import io.netty.handler.execution.ExecutionHandler;
import io.netty.handler.execution.MemoryAwareThreadPoolExecutor; import io.netty.handler.execution.MemoryAwareThreadPoolExecutor;
import io.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor; import io.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import io.netty.util.ObjectSizeEstimator;
/** /**
* This implementation of the {@link AbstractTrafficShapingHandler} is for channel * This implementation of the {@link AbstractTrafficShapingHandler} is for channel
@ -79,6 +80,69 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler
long readLimit) { long readLimit) {
super(executor, writeLimit, readLimit); super(executor, writeLimit, readLimit);
} }
/**
* @param executor
* @param checkInterval
*/
public ChannelTrafficShapingHandler(Executor executor, long checkInterval) {
super(executor, checkInterval);
}
/**
* @param executor
*/
public ChannelTrafficShapingHandler(Executor executor) {
super(executor);
}
/**
* @param objectSizeEstimator
* @param executor
* @param writeLimit
* @param readLimit
* @param checkInterval
*/
public ChannelTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Executor executor,
long writeLimit, long readLimit, long checkInterval) {
super(objectSizeEstimator, executor, writeLimit, readLimit,
checkInterval);
}
/**
* @param objectSizeEstimator
* @param executor
* @param writeLimit
* @param readLimit
*/
public ChannelTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Executor executor,
long writeLimit, long readLimit) {
super(objectSizeEstimator, executor, writeLimit, readLimit);
}
/**
* @param objectSizeEstimator
* @param executor
* @param checkInterval
*/
public ChannelTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Executor executor,
long checkInterval) {
super(objectSizeEstimator, executor, checkInterval);
}
/**
* @param objectSizeEstimator
* @param executor
*/
public ChannelTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Executor executor) {
super(objectSizeEstimator, executor);
}
@Override @Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)

View File

@ -21,6 +21,7 @@ import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.execution.ExecutionHandler; import io.netty.handler.execution.ExecutionHandler;
import io.netty.handler.execution.MemoryAwareThreadPoolExecutor; import io.netty.handler.execution.MemoryAwareThreadPoolExecutor;
import io.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor; import io.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import io.netty.util.ObjectSizeEstimator;
/** /**
* This implementation of the {@link AbstractTrafficShapingHandler} is for global * This implementation of the {@link AbstractTrafficShapingHandler} is for global
@ -91,4 +92,68 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
super(executor, writeLimit, readLimit); super(executor, writeLimit, readLimit);
createGlobalTrafficCounter(); createGlobalTrafficCounter();
} }
/**
* @param executor
* @param checkInterval
*/
public GlobalTrafficShapingHandler(Executor executor, long checkInterval) {
super(executor, checkInterval);
createGlobalTrafficCounter();
}
/**
* @param executor
*/
public GlobalTrafficShapingHandler(Executor executor) {
super(executor);
createGlobalTrafficCounter();
}
/**
* @param objectSizeEstimator
* @param executor
* @param writeLimit
* @param readLimit
* @param checkInterval
*/
public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
Executor executor, long writeLimit, long readLimit,
long checkInterval) {
super(objectSizeEstimator, executor, writeLimit, readLimit,
checkInterval);
createGlobalTrafficCounter();
}
/**
* @param objectSizeEstimator
* @param executor
* @param writeLimit
* @param readLimit
*/
public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
Executor executor, long writeLimit, long readLimit) {
super(objectSizeEstimator, executor, writeLimit, readLimit);
createGlobalTrafficCounter();
}
/**
* @param objectSizeEstimator
* @param executor
* @param checkInterval
*/
public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
Executor executor, long checkInterval) {
super(objectSizeEstimator, executor, checkInterval);
createGlobalTrafficCounter();
}
/**
* @param objectSizeEstimator
* @param executor
*/
public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
Executor executor) {
super(objectSizeEstimator, executor);
createGlobalTrafficCounter();
}
} }

View File

@ -117,7 +117,7 @@ public class TrafficCounter {
/** /**
* Class to implement monitoring at fix delay * Class to implement monitoring at fix delay
*/ */
private class TrafficMonitoring implements Runnable { private static class TrafficMonitoring implements Runnable {
/** /**
* The associated TrafficShapingHandler * The associated TrafficShapingHandler
*/ */
@ -145,8 +145,8 @@ public class TrafficCounter {
@Override @Override
public void run() { public void run() {
try { try {
Thread.currentThread().setName(name); Thread.currentThread().setName(counter.name);
for (; monitorActive.get();) { for (; counter.monitorActive.get();) {
long check = counter.checkInterval.get(); long check = counter.checkInterval.get();
if (check > 0) { if (check > 0) {
Thread.sleep(check); Thread.sleep(check);