Code cleanup

This commit is contained in:
Trustin Lee 2008-11-28 05:28:50 +00:00
parent 893cab5ce8
commit e0e282770f
14 changed files with 80 additions and 52 deletions

View File

@ -521,7 +521,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
} }
} }
private class DefaultChannelHandlerContext implements ChannelHandlerContext { private final class DefaultChannelHandlerContext implements ChannelHandlerContext {
volatile DefaultChannelHandlerContext next; volatile DefaultChannelHandlerContext next;
volatile DefaultChannelHandlerContext prev; volatile DefaultChannelHandlerContext prev;
private final String name; private final String name;

View File

@ -165,7 +165,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
workerIndex.getAndIncrement() % workers.length)]; workerIndex.getAndIncrement() % workers.length)];
} }
private class Boss implements Runnable { private final class Boss implements Runnable {
private final AtomicBoolean started = new AtomicBoolean(); private final AtomicBoolean started = new AtomicBoolean();
private volatile Selector selector; private volatile Selector selector;

View File

@ -57,7 +57,7 @@ class NioProviderMetadata {
private static final String CONSTRAINT_LEVEL_PROPERTY = private static final String CONSTRAINT_LEVEL_PROPERTY =
"java.nio.channels.spi.constraintLevel"; "java.nio.channels.spi.constraintLevel";
private static final long AUTODETECTION_TIMEOUT = 7000L; private static final long AUTODETECTION_TIMEOUT = 7000L;
/** /**
@ -93,7 +93,7 @@ class NioProviderMetadata {
"Couldn't get the NIO constraint level from the system properties."); "Couldn't get the NIO constraint level from the system properties.");
ConstraintLevelAutodetector autodetector = ConstraintLevelAutodetector autodetector =
new ConstraintLevelAutodetector(); new ConstraintLevelAutodetector();
try { try {
constraintLevel = autodetector.autodetectWithTimeout(); constraintLevel = autodetector.autodetectWithTimeout();
} catch (Exception e) { } catch (Exception e) {
@ -225,9 +225,9 @@ class NioProviderMetadata {
// Others (untested) // Others (untested)
return -1; return -1;
} }
private static class ConstraintLevelAutodetector {
private static final class ConstraintLevelAutodetector {
ConstraintLevelAutodetector() { ConstraintLevelAutodetector() {
super(); super();
@ -245,7 +245,7 @@ class NioProviderMetadata {
} }
} }
}, "NIO constraint level detector"); }, "NIO constraint level detector");
Thread detectorThread = new Thread(detector); Thread detectorThread = new Thread(detector);
detectorThread.start(); detectorThread.start();
@ -419,7 +419,7 @@ class NioProviderMetadata {
} }
} }
private static class SelectorLoop implements Runnable { private static final class SelectorLoop implements Runnable {
final Selector selector; final Selector selector;
volatile boolean done; volatile boolean done;
volatile boolean selecting; // Just an approximation volatile boolean selecting; // Just an approximation

View File

@ -191,7 +191,7 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
workerIndex.getAndIncrement() % workers.length)]; workerIndex.getAndIncrement() % workers.length)];
} }
private class Boss implements Runnable { private final class Boss implements Runnable {
private final NioServerSocketChannel channel; private final NioServerSocketChannel channel;
Boss(NioServerSocketChannel channel) { Boss(NioServerSocketChannel channel) {

View File

@ -58,29 +58,7 @@ abstract class NioSocketChannel extends AbstractChannel
final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean(); final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
final Runnable writeTask = new WriteTask(); final Runnable writeTask = new WriteTask();
final AtomicInteger writeBufferSize = new AtomicInteger(); final AtomicInteger writeBufferSize = new AtomicInteger();
final Queue<MessageEvent> writeBuffer = new LinkedTransferQueue<MessageEvent>() { final Queue<MessageEvent> writeBuffer = new WriteBuffer();
@Override
public boolean offer(MessageEvent e) {
boolean success = super.offer(e);
assert success;
writeBufferSize.addAndGet(
((ChannelBuffer) e.getMessage()).readableBytes());
return true;
}
@Override
public MessageEvent poll() {
MessageEvent e = super.poll();
if (e != null) {
int newWriteBufferSize = writeBufferSize.addAndGet(
-((ChannelBuffer) e.getMessage()).readableBytes());
if (newWriteBufferSize <= getConfig().getWriteBufferLowWaterMark()) {
mightNeedToNotifyUnwritability = true;
}
}
return e;
}
};
boolean wasWritable; boolean wasWritable;
boolean mightNeedToNotifyUnwritability; boolean mightNeedToNotifyUnwritability;
MessageEvent currentWriteEvent; MessageEvent currentWriteEvent;
@ -161,7 +139,35 @@ abstract class NioSocketChannel extends AbstractChannel
} }
} }
private class WriteTask implements Runnable { private final class WriteBuffer extends LinkedTransferQueue<MessageEvent> {
WriteBuffer() {
super();
}
@Override
public boolean offer(MessageEvent e) {
boolean success = super.offer(e);
assert success;
writeBufferSize.addAndGet(
((ChannelBuffer) e.getMessage()).readableBytes());
return true;
}
@Override
public MessageEvent poll() {
MessageEvent e = super.poll();
if (e != null) {
int newWriteBufferSize = writeBufferSize.addAndGet(
-((ChannelBuffer) e.getMessage()).readableBytes());
if (newWriteBufferSize <= getConfig().getWriteBufferLowWaterMark()) {
mightNeedToNotifyUnwritability = true;
}
}
return e;
}
}
private final class WriteTask implements Runnable {
WriteTask() { WriteTask() {
super(); super();

View File

@ -410,8 +410,8 @@ class NioWorker implements Runnable {
ChannelBuffer buf; ChannelBuffer buf;
int bufIdx; int bufIdx;
Queue<MessageEvent> writeBuffer = channel.writeBuffer;
synchronized (channel.writeLock) { synchronized (channel.writeLock) {
Queue<MessageEvent> writeBuffer = channel.writeBuffer;
evt = channel.currentWriteEvent; evt = channel.currentWriteEvent;
for (;;) { for (;;) {
if (evt == null) { if (evt == null) {
@ -473,15 +473,15 @@ class NioWorker implements Runnable {
setOpWrite(channel, false, mightNeedWakeup); setOpWrite(channel, false, mightNeedWakeup);
} }
fireChannelInterestChangedIfNecessary(channel); fireChannelInterestChangedIfNecessary(channel, open);
} }
} }
private static void fireChannelInterestChangedIfNecessary( private static void fireChannelInterestChangedIfNecessary(
NioSocketChannel channel) { NioSocketChannel channel, boolean open) {
int interestOps = channel.getRawInterestOps(); int interestOps = channel.getRawInterestOps();
boolean wasWritable = channel.wasWritable; boolean wasWritable = channel.wasWritable;
boolean writable = channel.wasWritable = channel.isWritable(); boolean writable = channel.wasWritable = open? channel.isWritable() : false;
if (wasWritable) { if (wasWritable) {
if (writable) { if (writable) {
if (channel.mightNeedToNotifyUnwritability) { if (channel.mightNeedToNotifyUnwritability) {
@ -779,7 +779,7 @@ class NioWorker implements Runnable {
} }
} }
private class RegisterTask implements Runnable { private final class RegisterTask implements Runnable {
private final NioSocketChannel channel; private final NioSocketChannel channel;
private final ChannelFuture future; private final ChannelFuture future;
private final boolean server; private final boolean server;

View File

@ -181,7 +181,7 @@ class OioServerSocketPipelineSink extends AbstractChannelSink {
} }
} }
private class Boss implements Runnable { private final class Boss implements Runnable {
private final OioServerSocketChannel channel; private final OioServerSocketChannel channel;
Boss(OioServerSocketChannel channel) { Boss(OioServerSocketChannel channel) {

View File

@ -28,6 +28,8 @@ import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.execution.ExecutionHandler;
import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
/** /**
* Keeps sending random data to the specified address. * Keeps sending random data to the specified address.
@ -68,6 +70,7 @@ public class DiscardClient {
ClientBootstrap bootstrap = new ClientBootstrap(factory); ClientBootstrap bootstrap = new ClientBootstrap(factory);
DiscardClientHandler handler = new DiscardClientHandler(firstMessageSize); DiscardClientHandler handler = new DiscardClientHandler(firstMessageSize);
bootstrap.getPipeline().addLast("executor", new ExecutionHandler(new MemoryAwareThreadPoolExecutor(16, 0, 0)));
bootstrap.getPipeline().addLast("handler", handler); bootstrap.getPipeline().addLast("handler", handler);
bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("keepAlive", true); bootstrap.setOption("keepAlive", true);

View File

@ -55,6 +55,7 @@ public class DiscardClientHandler extends SimpleChannelHandler {
private final Random random = new Random(); private final Random random = new Random();
private final int messageSize; private final int messageSize;
private final AtomicLong transferredBytes = new AtomicLong(); private final AtomicLong transferredBytes = new AtomicLong();
private final byte[] content;
public DiscardClientHandler(int messageSize) { public DiscardClientHandler(int messageSize) {
if (messageSize <= 0) { if (messageSize <= 0) {
@ -62,6 +63,7 @@ public class DiscardClientHandler extends SimpleChannelHandler {
"messageSize: " + messageSize); "messageSize: " + messageSize);
} }
this.messageSize = messageSize; this.messageSize = messageSize;
content = new byte[messageSize];
} }
public long getTransferredBytes() { public long getTransferredBytes() {
@ -71,7 +73,7 @@ public class DiscardClientHandler extends SimpleChannelHandler {
@Override @Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) { if (e instanceof ChannelStateEvent) {
logger.info(e.toString()); //logger.info(e.toString());
} }
// Let SimpleChannelHandler call actual event handler methods below. // Let SimpleChannelHandler call actual event handler methods below.
@ -112,18 +114,35 @@ public class DiscardClientHandler extends SimpleChannelHandler {
// If you keep writing messages ignoring this property, // If you keep writing messages ignoring this property,
// you will end up with an OutOfMemoryError. // you will end up with an OutOfMemoryError.
Channel channel = e.getChannel(); Channel channel = e.getChannel();
int cnt = 0;
while (channel.isWritable()) { while (channel.isWritable()) {
ChannelBuffer m = nextMessage(); ChannelBuffer m = nextMessage();
if (m == null) { if (m == null) {
break; break;
} }
channel.write(m); channel.write(m);
cnt ++;
if (cnt % 100000 == 0) {
System.out.println(cnt);
}
} }
// System.out.println("* " + cnt);
// if (cnt > 0) {
// for (int i = 0; i < 10; i ++) {
// ChannelBuffer m = nextMessage();
// if (m == null) {
// break;
// }
// channel.write(m);
// }
// }
} }
private ChannelBuffer nextMessage() { private ChannelBuffer nextMessage() {
byte[] content = new byte[messageSize]; //byte[] content = new byte[messageSize];
random.nextBytes(content); //random.nextBytes(content);
return ChannelBuffers.wrappedBuffer(content); return ChannelBuffers.wrappedBuffer(content);
} }
} }

View File

@ -112,7 +112,7 @@ public class DefaultHttpMessage implements HttpMessage {
return content; return content;
} }
private static class CaseIgnoringComparator private static final class CaseIgnoringComparator
implements Comparator<String>, Serializable { implements Comparator<String>, Serializable {
private static final long serialVersionUID = 4582133183775373862L; private static final long serialVersionUID = 4582133183775373862L;

View File

@ -71,7 +71,7 @@ public class QueryStringEncoder {
return s.replaceAll(" ", "%20"); return s.replaceAll(" ", "%20");
} }
private static class Param { private static final class Param {
final String name; final String name;
final String value; final String value;

View File

@ -425,7 +425,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
return true; return true;
} }
private static class Settings { private static final class Settings {
final ObjectSizeEstimator objectSizeEstimator; final ObjectSizeEstimator objectSizeEstimator;
final long maxChannelMemorySize; final long maxChannelMemorySize;
final long maxTotalMemorySize; final long maxTotalMemorySize;
@ -438,7 +438,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
} }
} }
private static class NewThreadRunsPolicy implements RejectedExecutionHandler { private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
NewThreadRunsPolicy() { NewThreadRunsPolicy() {
super(); super();
} }
@ -454,7 +454,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
} }
} }
private static class MemoryAwareRunnable implements Runnable { private static final class MemoryAwareRunnable implements Runnable {
final Runnable task; final Runnable task;
volatile int estimatedSize; volatile int estimatedSize;

View File

@ -177,7 +177,7 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
return super.shouldCount(task); return super.shouldCount(task);
} }
private class ChildExecutor implements Executor, Runnable { private final class ChildExecutor implements Executor, Runnable {
private final LinkedList<Runnable> tasks = new LinkedList<Runnable>(); private final LinkedList<Runnable> tasks = new LinkedList<Runnable>();
ChildExecutor() { ChildExecutor() {
@ -205,14 +205,14 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
} }
boolean ran = false; boolean ran = false;
OrderedMemoryAwareThreadPoolExecutor.this.beforeExecute(thread, task); beforeExecute(thread, task);
try { try {
task.run(); task.run();
ran = true; ran = true;
OrderedMemoryAwareThreadPoolExecutor.this.afterExecute(task, null); afterExecute(task, null);
} catch (RuntimeException e) { } catch (RuntimeException e) {
if (!ran) { if (!ran) {
OrderedMemoryAwareThreadPoolExecutor.this.afterExecute(task, e); afterExecute(task, e);
} }
throw e; throw e;
} finally { } finally {

View File

@ -779,7 +779,7 @@ public class SslHandler extends FrameDecoder {
return future; return future;
} }
private static class PendingWrite { private static final class PendingWrite {
final ChannelFuture future; final ChannelFuture future;
final ByteBuffer outAppBuf; final ByteBuffer outAppBuf;