From e0e282770f1ccf49367efd9fe003193ea630f062 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Fri, 28 Nov 2008 05:28:50 +0000 Subject: [PATCH] Code cleanup --- .../netty/channel/DefaultChannelPipeline.java | 2 +- .../nio/NioClientSocketPipelineSink.java | 2 +- .../socket/nio/NioProviderMetadata.java | 12 ++--- .../nio/NioServerSocketPipelineSink.java | 2 +- .../channel/socket/nio/NioSocketChannel.java | 54 ++++++++++--------- .../netty/channel/socket/nio/NioWorker.java | 10 ++-- .../oio/OioServerSocketPipelineSink.java | 2 +- .../netty/example/discard/DiscardClient.java | 3 ++ .../example/discard/DiscardClientHandler.java | 25 +++++++-- .../codec/http/DefaultHttpMessage.java | 2 +- .../codec/http/QueryStringEncoder.java | 2 +- .../MemoryAwareThreadPoolExecutor.java | 6 +-- .../OrderedMemoryAwareThreadPoolExecutor.java | 8 +-- .../jboss/netty/handler/ssl/SslHandler.java | 2 +- 14 files changed, 80 insertions(+), 52 deletions(-) diff --git a/src/main/java/org/jboss/netty/channel/DefaultChannelPipeline.java b/src/main/java/org/jboss/netty/channel/DefaultChannelPipeline.java index 9263660e4a..eaaa4420f1 100644 --- a/src/main/java/org/jboss/netty/channel/DefaultChannelPipeline.java +++ b/src/main/java/org/jboss/netty/channel/DefaultChannelPipeline.java @@ -521,7 +521,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } - private class DefaultChannelHandlerContext implements ChannelHandlerContext { + private final class DefaultChannelHandlerContext implements ChannelHandlerContext { volatile DefaultChannelHandlerContext next; volatile DefaultChannelHandlerContext prev; private final String name; diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java index ef1258945e..10951ab8b8 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -165,7 +165,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { workerIndex.getAndIncrement() % workers.length)]; } - private class Boss implements Runnable { + private final class Boss implements Runnable { private final AtomicBoolean started = new AtomicBoolean(); private volatile Selector selector; diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioProviderMetadata.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioProviderMetadata.java index aec83c9fcb..8834c70cfa 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioProviderMetadata.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioProviderMetadata.java @@ -57,7 +57,7 @@ class NioProviderMetadata { private static final String CONSTRAINT_LEVEL_PROPERTY = "java.nio.channels.spi.constraintLevel"; - + private static final long AUTODETECTION_TIMEOUT = 7000L; /** @@ -93,7 +93,7 @@ class NioProviderMetadata { "Couldn't get the NIO constraint level from the system properties."); ConstraintLevelAutodetector autodetector = new ConstraintLevelAutodetector(); - + try { constraintLevel = autodetector.autodetectWithTimeout(); } catch (Exception e) { @@ -225,9 +225,9 @@ class NioProviderMetadata { // Others (untested) return -1; } - - private static class ConstraintLevelAutodetector { + + private static final class ConstraintLevelAutodetector { ConstraintLevelAutodetector() { super(); @@ -245,7 +245,7 @@ class NioProviderMetadata { } } }, "NIO constraint level detector"); - + Thread detectorThread = new Thread(detector); detectorThread.start(); @@ -419,7 +419,7 @@ class NioProviderMetadata { } } - private static class SelectorLoop implements Runnable { + private static final class SelectorLoop implements Runnable { final Selector selector; volatile boolean done; volatile boolean selecting; // Just an approximation diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java index 9c8e6c840a..890d62427e 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java @@ -191,7 +191,7 @@ class NioServerSocketPipelineSink extends AbstractChannelSink { workerIndex.getAndIncrement() % workers.length)]; } - private class Boss implements Runnable { + private final class Boss implements Runnable { private final NioServerSocketChannel channel; Boss(NioServerSocketChannel channel) { diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java index 988ceac7c6..5f25d944d6 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java @@ -58,29 +58,7 @@ abstract class NioSocketChannel extends AbstractChannel final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean(); final Runnable writeTask = new WriteTask(); final AtomicInteger writeBufferSize = new AtomicInteger(); - final Queue writeBuffer = new LinkedTransferQueue() { - @Override - public boolean offer(MessageEvent e) { - boolean success = super.offer(e); - assert success; - writeBufferSize.addAndGet( - ((ChannelBuffer) e.getMessage()).readableBytes()); - return true; - } - - @Override - public MessageEvent poll() { - MessageEvent e = super.poll(); - if (e != null) { - int newWriteBufferSize = writeBufferSize.addAndGet( - -((ChannelBuffer) e.getMessage()).readableBytes()); - if (newWriteBufferSize <= getConfig().getWriteBufferLowWaterMark()) { - mightNeedToNotifyUnwritability = true; - } - } - return e; - } - }; + final Queue writeBuffer = new WriteBuffer(); boolean wasWritable; boolean mightNeedToNotifyUnwritability; MessageEvent currentWriteEvent; @@ -161,7 +139,35 @@ abstract class NioSocketChannel extends AbstractChannel } } - private class WriteTask implements Runnable { + private final class WriteBuffer extends LinkedTransferQueue { + WriteBuffer() { + super(); + } + + @Override + public boolean offer(MessageEvent e) { + boolean success = super.offer(e); + assert success; + writeBufferSize.addAndGet( + ((ChannelBuffer) e.getMessage()).readableBytes()); + return true; + } + + @Override + public MessageEvent poll() { + MessageEvent e = super.poll(); + if (e != null) { + int newWriteBufferSize = writeBufferSize.addAndGet( + -((ChannelBuffer) e.getMessage()).readableBytes()); + if (newWriteBufferSize <= getConfig().getWriteBufferLowWaterMark()) { + mightNeedToNotifyUnwritability = true; + } + } + return e; + } + } + + private final class WriteTask implements Runnable { WriteTask() { super(); diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index 66b70a5fc2..7791ed4ac9 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -410,8 +410,8 @@ class NioWorker implements Runnable { ChannelBuffer buf; int bufIdx; + Queue writeBuffer = channel.writeBuffer; synchronized (channel.writeLock) { - Queue writeBuffer = channel.writeBuffer; evt = channel.currentWriteEvent; for (;;) { if (evt == null) { @@ -473,15 +473,15 @@ class NioWorker implements Runnable { setOpWrite(channel, false, mightNeedWakeup); } - fireChannelInterestChangedIfNecessary(channel); + fireChannelInterestChangedIfNecessary(channel, open); } } private static void fireChannelInterestChangedIfNecessary( - NioSocketChannel channel) { + NioSocketChannel channel, boolean open) { int interestOps = channel.getRawInterestOps(); boolean wasWritable = channel.wasWritable; - boolean writable = channel.wasWritable = channel.isWritable(); + boolean writable = channel.wasWritable = open? channel.isWritable() : false; if (wasWritable) { if (writable) { if (channel.mightNeedToNotifyUnwritability) { @@ -779,7 +779,7 @@ class NioWorker implements Runnable { } } - private class RegisterTask implements Runnable { + private final class RegisterTask implements Runnable { private final NioSocketChannel channel; private final ChannelFuture future; private final boolean server; diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/OioServerSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/oio/OioServerSocketPipelineSink.java index a8cc4d446a..b4ba3b71a9 100644 --- a/src/main/java/org/jboss/netty/channel/socket/oio/OioServerSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/oio/OioServerSocketPipelineSink.java @@ -181,7 +181,7 @@ class OioServerSocketPipelineSink extends AbstractChannelSink { } } - private class Boss implements Runnable { + private final class Boss implements Runnable { private final OioServerSocketChannel channel; Boss(OioServerSocketChannel channel) { diff --git a/src/main/java/org/jboss/netty/example/discard/DiscardClient.java b/src/main/java/org/jboss/netty/example/discard/DiscardClient.java index d4defc27c8..61f188aa87 100644 --- a/src/main/java/org/jboss/netty/example/discard/DiscardClient.java +++ b/src/main/java/org/jboss/netty/example/discard/DiscardClient.java @@ -28,6 +28,8 @@ import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; +import org.jboss.netty.handler.execution.ExecutionHandler; +import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor; /** * Keeps sending random data to the specified address. @@ -68,6 +70,7 @@ public class DiscardClient { ClientBootstrap bootstrap = new ClientBootstrap(factory); DiscardClientHandler handler = new DiscardClientHandler(firstMessageSize); + bootstrap.getPipeline().addLast("executor", new ExecutionHandler(new MemoryAwareThreadPoolExecutor(16, 0, 0))); bootstrap.getPipeline().addLast("handler", handler); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("keepAlive", true); diff --git a/src/main/java/org/jboss/netty/example/discard/DiscardClientHandler.java b/src/main/java/org/jboss/netty/example/discard/DiscardClientHandler.java index 12876ea6ae..1c4850c52e 100644 --- a/src/main/java/org/jboss/netty/example/discard/DiscardClientHandler.java +++ b/src/main/java/org/jboss/netty/example/discard/DiscardClientHandler.java @@ -55,6 +55,7 @@ public class DiscardClientHandler extends SimpleChannelHandler { private final Random random = new Random(); private final int messageSize; private final AtomicLong transferredBytes = new AtomicLong(); + private final byte[] content; public DiscardClientHandler(int messageSize) { if (messageSize <= 0) { @@ -62,6 +63,7 @@ public class DiscardClientHandler extends SimpleChannelHandler { "messageSize: " + messageSize); } this.messageSize = messageSize; + content = new byte[messageSize]; } public long getTransferredBytes() { @@ -71,7 +73,7 @@ public class DiscardClientHandler extends SimpleChannelHandler { @Override public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { if (e instanceof ChannelStateEvent) { - logger.info(e.toString()); + //logger.info(e.toString()); } // Let SimpleChannelHandler call actual event handler methods below. @@ -112,18 +114,35 @@ public class DiscardClientHandler extends SimpleChannelHandler { // If you keep writing messages ignoring this property, // you will end up with an OutOfMemoryError. Channel channel = e.getChannel(); + int cnt = 0; while (channel.isWritable()) { ChannelBuffer m = nextMessage(); if (m == null) { break; } channel.write(m); + cnt ++; + if (cnt % 100000 == 0) { + System.out.println(cnt); + } } + +// System.out.println("* " + cnt); + +// if (cnt > 0) { +// for (int i = 0; i < 10; i ++) { +// ChannelBuffer m = nextMessage(); +// if (m == null) { +// break; +// } +// channel.write(m); +// } +// } } private ChannelBuffer nextMessage() { - byte[] content = new byte[messageSize]; - random.nextBytes(content); + //byte[] content = new byte[messageSize]; + //random.nextBytes(content); return ChannelBuffers.wrappedBuffer(content); } } diff --git a/src/main/java/org/jboss/netty/handler/codec/http/DefaultHttpMessage.java b/src/main/java/org/jboss/netty/handler/codec/http/DefaultHttpMessage.java index 31cfee8e86..141f6ada61 100644 --- a/src/main/java/org/jboss/netty/handler/codec/http/DefaultHttpMessage.java +++ b/src/main/java/org/jboss/netty/handler/codec/http/DefaultHttpMessage.java @@ -112,7 +112,7 @@ public class DefaultHttpMessage implements HttpMessage { return content; } - private static class CaseIgnoringComparator + private static final class CaseIgnoringComparator implements Comparator, Serializable { private static final long serialVersionUID = 4582133183775373862L; diff --git a/src/main/java/org/jboss/netty/handler/codec/http/QueryStringEncoder.java b/src/main/java/org/jboss/netty/handler/codec/http/QueryStringEncoder.java index 7237ef7193..15cf194a26 100644 --- a/src/main/java/org/jboss/netty/handler/codec/http/QueryStringEncoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/http/QueryStringEncoder.java @@ -71,7 +71,7 @@ public class QueryStringEncoder { return s.replaceAll(" ", "%20"); } - private static class Param { + private static final class Param { final String name; final String value; diff --git a/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java b/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java index 9830c6f6a1..e31213eb17 100644 --- a/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java +++ b/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java @@ -425,7 +425,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor { return true; } - private static class Settings { + private static final class Settings { final ObjectSizeEstimator objectSizeEstimator; final long maxChannelMemorySize; final long maxTotalMemorySize; @@ -438,7 +438,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor { } } - private static class NewThreadRunsPolicy implements RejectedExecutionHandler { + private static final class NewThreadRunsPolicy implements RejectedExecutionHandler { NewThreadRunsPolicy() { super(); } @@ -454,7 +454,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor { } } - private static class MemoryAwareRunnable implements Runnable { + private static final class MemoryAwareRunnable implements Runnable { final Runnable task; volatile int estimatedSize; diff --git a/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java b/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java index 062bf22c5f..27f812732f 100644 --- a/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java +++ b/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java @@ -177,7 +177,7 @@ public class OrderedMemoryAwareThreadPoolExecutor extends return super.shouldCount(task); } - private class ChildExecutor implements Executor, Runnable { + private final class ChildExecutor implements Executor, Runnable { private final LinkedList tasks = new LinkedList(); ChildExecutor() { @@ -205,14 +205,14 @@ public class OrderedMemoryAwareThreadPoolExecutor extends } boolean ran = false; - OrderedMemoryAwareThreadPoolExecutor.this.beforeExecute(thread, task); + beforeExecute(thread, task); try { task.run(); ran = true; - OrderedMemoryAwareThreadPoolExecutor.this.afterExecute(task, null); + afterExecute(task, null); } catch (RuntimeException e) { if (!ran) { - OrderedMemoryAwareThreadPoolExecutor.this.afterExecute(task, e); + afterExecute(task, e); } throw e; } finally { diff --git a/src/main/java/org/jboss/netty/handler/ssl/SslHandler.java b/src/main/java/org/jboss/netty/handler/ssl/SslHandler.java index d2378ee210..0f0ff32638 100644 --- a/src/main/java/org/jboss/netty/handler/ssl/SslHandler.java +++ b/src/main/java/org/jboss/netty/handler/ssl/SslHandler.java @@ -779,7 +779,7 @@ public class SslHandler extends FrameDecoder { return future; } - private static class PendingWrite { + private static final class PendingWrite { final ChannelFuture future; final ByteBuffer outAppBuf;