From 36e804bbec47711aaa7e7387e207b3fce2641ea4 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Wed, 27 Jan 2010 05:07:32 +0000 Subject: [PATCH] Resolved issue: NETTY-282 Reduce memory copy between heap buffers and direct buffers in NIO transport * Replaced JDK's internal direct buffer pool with a custom pool optimized for Netty * Added ChannelBuffer.isDirect() * Cleaned up NioWorker.writeNow() and NioDatagramWorker.writeNow() --- .../buffer/ByteBufferBackedChannelBuffer.java | 4 + .../org/jboss/netty/buffer/ChannelBuffer.java | 6 + .../netty/buffer/CompositeChannelBuffer.java | 4 + .../netty/buffer/DuplicatedChannelBuffer.java | 4 + .../netty/buffer/DynamicChannelBuffer.java | 4 + .../jboss/netty/buffer/HeapChannelBuffer.java | 4 + .../netty/buffer/ReadOnlyChannelBuffer.java | 4 + .../netty/buffer/SlicedChannelBuffer.java | 4 + .../netty/buffer/TruncatedChannelBuffer.java | 4 + .../channel/socket/nio/DirectBufferPool.java | 112 +++++++++++++ .../nio/NioClientSocketPipelineSink.java | 20 +-- .../socket/nio/NioDatagramChannel.java | 5 +- .../socket/nio/NioDatagramPipelineSink.java | 11 +- .../channel/socket/nio/NioDatagramWorker.java | 153 ++++++++++-------- .../nio/NioServerSocketPipelineSink.java | 8 +- .../channel/socket/nio/NioSocketChannel.java | 6 +- .../netty/channel/socket/nio/NioWorker.java | 128 ++++++++------- .../codec/replay/ReplayingDecoderBuffer.java | 4 + 18 files changed, 333 insertions(+), 152 deletions(-) create mode 100644 src/main/java/org/jboss/netty/channel/socket/nio/DirectBufferPool.java diff --git a/src/main/java/org/jboss/netty/buffer/ByteBufferBackedChannelBuffer.java b/src/main/java/org/jboss/netty/buffer/ByteBufferBackedChannelBuffer.java index 1a4bc5678f..0e59a5794a 100644 --- a/src/main/java/org/jboss/netty/buffer/ByteBufferBackedChannelBuffer.java +++ b/src/main/java/org/jboss/netty/buffer/ByteBufferBackedChannelBuffer.java @@ -70,6 +70,10 @@ public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer { } } + public boolean isDirect() { + return buffer.isDirect(); + } + public ByteOrder order() { return order; } diff --git a/src/main/java/org/jboss/netty/buffer/ChannelBuffer.java b/src/main/java/org/jboss/netty/buffer/ChannelBuffer.java index 5e43f07ab0..f9b50b1ac8 100644 --- a/src/main/java/org/jboss/netty/buffer/ChannelBuffer.java +++ b/src/main/java/org/jboss/netty/buffer/ChannelBuffer.java @@ -251,6 +251,12 @@ public interface ChannelBuffer extends Comparable { */ ByteOrder order(); + /** + * Returns {@code true} if and only if this buffer is backed by an + * NIO direct buffer. + */ + boolean isDirect(); + /** * Returns the {@code readerIndex} of this buffer. */ diff --git a/src/main/java/org/jboss/netty/buffer/CompositeChannelBuffer.java b/src/main/java/org/jboss/netty/buffer/CompositeChannelBuffer.java index 3bdc9752f9..6b2acfa499 100644 --- a/src/main/java/org/jboss/netty/buffer/CompositeChannelBuffer.java +++ b/src/main/java/org/jboss/netty/buffer/CompositeChannelBuffer.java @@ -148,6 +148,10 @@ public class CompositeChannelBuffer extends AbstractChannelBuffer { return order; } + public boolean isDirect() { + return false; + } + public boolean hasArray() { return false; } diff --git a/src/main/java/org/jboss/netty/buffer/DuplicatedChannelBuffer.java b/src/main/java/org/jboss/netty/buffer/DuplicatedChannelBuffer.java index 7d1fa4da93..7b4f80e199 100644 --- a/src/main/java/org/jboss/netty/buffer/DuplicatedChannelBuffer.java +++ b/src/main/java/org/jboss/netty/buffer/DuplicatedChannelBuffer.java @@ -64,6 +64,10 @@ public class DuplicatedChannelBuffer extends AbstractChannelBuffer implements Wr return buffer.order(); } + public boolean isDirect() { + return buffer.isDirect(); + } + public int capacity() { return buffer.capacity(); } diff --git a/src/main/java/org/jboss/netty/buffer/DynamicChannelBuffer.java b/src/main/java/org/jboss/netty/buffer/DynamicChannelBuffer.java index d851e30506..460bd631fe 100644 --- a/src/main/java/org/jboss/netty/buffer/DynamicChannelBuffer.java +++ b/src/main/java/org/jboss/netty/buffer/DynamicChannelBuffer.java @@ -94,6 +94,10 @@ public class DynamicChannelBuffer extends AbstractChannelBuffer { return endianness; } + public boolean isDirect() { + return buffer.isDirect(); + } + public int capacity() { return buffer.capacity(); } diff --git a/src/main/java/org/jboss/netty/buffer/HeapChannelBuffer.java b/src/main/java/org/jboss/netty/buffer/HeapChannelBuffer.java index d269b63091..7d88d62bbd 100644 --- a/src/main/java/org/jboss/netty/buffer/HeapChannelBuffer.java +++ b/src/main/java/org/jboss/netty/buffer/HeapChannelBuffer.java @@ -71,6 +71,10 @@ public abstract class HeapChannelBuffer extends AbstractChannelBuffer { setIndex(readerIndex, writerIndex); } + public boolean isDirect() { + return false; + } + public int capacity() { return array.length; } diff --git a/src/main/java/org/jboss/netty/buffer/ReadOnlyChannelBuffer.java b/src/main/java/org/jboss/netty/buffer/ReadOnlyChannelBuffer.java index 5adc8297b9..21664e48f1 100644 --- a/src/main/java/org/jboss/netty/buffer/ReadOnlyChannelBuffer.java +++ b/src/main/java/org/jboss/netty/buffer/ReadOnlyChannelBuffer.java @@ -64,6 +64,10 @@ public class ReadOnlyChannelBuffer extends AbstractChannelBuffer implements Wrap return buffer.order(); } + public boolean isDirect() { + return buffer.isDirect(); + } + public boolean hasArray() { return false; } diff --git a/src/main/java/org/jboss/netty/buffer/SlicedChannelBuffer.java b/src/main/java/org/jboss/netty/buffer/SlicedChannelBuffer.java index bd9dee9450..d3039a9b77 100644 --- a/src/main/java/org/jboss/netty/buffer/SlicedChannelBuffer.java +++ b/src/main/java/org/jboss/netty/buffer/SlicedChannelBuffer.java @@ -69,6 +69,10 @@ public class SlicedChannelBuffer extends AbstractChannelBuffer implements Wrappe return buffer.order(); } + public boolean isDirect() { + return buffer.isDirect(); + } + public int capacity() { return length; } diff --git a/src/main/java/org/jboss/netty/buffer/TruncatedChannelBuffer.java b/src/main/java/org/jboss/netty/buffer/TruncatedChannelBuffer.java index 1e24bd7d55..d5d140a311 100644 --- a/src/main/java/org/jboss/netty/buffer/TruncatedChannelBuffer.java +++ b/src/main/java/org/jboss/netty/buffer/TruncatedChannelBuffer.java @@ -63,6 +63,10 @@ public class TruncatedChannelBuffer extends AbstractChannelBuffer implements Wra return buffer.order(); } + public boolean isDirect() { + return buffer.isDirect(); + } + public int capacity() { return length; } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/DirectBufferPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/DirectBufferPool.java new file mode 100644 index 0000000000..9a92911140 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/nio/DirectBufferPool.java @@ -0,0 +1,112 @@ +/* + * Copyright 2010 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.nio; + +import java.lang.ref.SoftReference; +import java.nio.ByteBuffer; + +import org.jboss.netty.buffer.ChannelBuffer; + +/** + * @author The Netty Project + * @author Trustin Lee + * @version $Rev$, $Date$ + */ +final class DirectBufferPool { + + private final SoftReference[] pool; + + DirectBufferPool() { + this(4); + } + + @SuppressWarnings("unchecked") + DirectBufferPool(int poolSize) { + pool = new SoftReference[poolSize]; + } + + final ByteBuffer acquire(ChannelBuffer src) { + ByteBuffer dst = acquire(src.readableBytes()); + src.getBytes(src.readerIndex(), dst); + dst.rewind(); + return dst; + } + + private final ByteBuffer acquire(int size) { + for (int i = 0; i < pool.length; i ++) { + SoftReference ref = pool[i]; + if (ref == null) { + continue; + } + + ByteBuffer buf = ref.get(); + if (buf == null) { + pool[i] = null; + continue; + } + + if (buf.capacity() < size) { + continue; + } + + pool[i] = null; + + buf.rewind(); + buf.limit(size); + return buf; + } + + ByteBuffer buf = ByteBuffer.allocateDirect(normalizeCapacity(size)); + buf.limit(size); + return buf; + } + + final void release(ByteBuffer buffer) { + if (buffer == null) { + return; + } + + for (int i = 0; i < pool.length; i ++) { + SoftReference ref = pool[i]; + if (ref == null || ref.get() == null) { + pool[i] = new SoftReference(buffer); + return; + } + } + + // pool is full - replace one + for (int i = 0; i< pool.length; i ++) { + SoftReference ref = pool[i]; + ByteBuffer pooled = ref.get(); + if (pooled == null) { + pool[i] = null; + continue; + } + + if (pooled.capacity() < buffer.capacity()) { + pool[i] = new SoftReference(buffer); + return; + } + } + } + + static final int normalizeCapacity(int capacity) { + // Normalize to multiple of 4096. + // Strictly speaking, 4096 should be normalized to 4096, + // but it becomes 8192 to keep the calculation simplistic. + return (capacity & 0xfffff000) + 0x1000; + } +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java index 11348f044a..40866a70f8 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -88,25 +88,25 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { switch (state) { case OPEN: if (Boolean.FALSE.equals(value)) { - NioWorker.close(channel, future); + channel.worker.close(channel, future); } break; case BOUND: if (value != null) { bind(channel, future, (SocketAddress) value); } else { - NioWorker.close(channel, future); + channel.worker.close(channel, future); } break; case CONNECTED: if (value != null) { connect(channel, future, (SocketAddress) value); } else { - NioWorker.close(channel, future); + channel.worker.close(channel, future); } break; case INTEREST_OPS: - NioWorker.setInterestOps(channel, future, ((Integer) value).intValue()); + channel.worker.setInterestOps(channel, future, ((Integer) value).intValue()); break; } } else if (e instanceof MessageEvent) { @@ -114,7 +114,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { NioSocketChannel channel = (NioSocketChannel) event.getChannel(); boolean offered = channel.writeBuffer.offer(event); assert offered; - NioWorker.write(channel, true); + channel.worker.write(channel, true); } } @@ -156,7 +156,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { } catch (Throwable t) { cf.setFailure(t); fireExceptionCaught(channel, t); - NioWorker.close(channel, succeededFuture(channel)); + channel.worker.close(channel, succeededFuture(channel)); } } @@ -373,7 +373,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { ch.connectFuture.setFailure(cause); fireExceptionCaught(ch, cause); - NioWorker.close(ch, succeededFuture(ch)); + ch.worker.close(ch, succeededFuture(ch)); } } } @@ -388,13 +388,13 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { } catch (Throwable t) { ch.connectFuture.setFailure(t); fireExceptionCaught(ch, t); - NioWorker.close(ch, succeededFuture(ch)); + ch.worker.close(ch, succeededFuture(ch)); } } private void close(SelectionKey k) { NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); - NioWorker.close(ch, succeededFuture(ch)); + ch.worker.close(ch, succeededFuture(ch)); } } @@ -412,7 +412,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { channel.socket.register( boss.selector, SelectionKey.OP_CONNECT, channel); } catch (ClosedChannelException e) { - NioWorker.close(channel, succeededFuture(channel)); + channel.worker.close(channel, succeededFuture(channel)); } int connectTimeout = channel.getConfig().getConnectTimeoutMillis(); diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java index 9257b7eef4..933c137155 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java @@ -22,6 +22,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.NetworkInterface; import java.net.SocketAddress; +import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; @@ -107,6 +108,8 @@ class NioDatagramChannel extends AbstractChannel * The current write {@link MessageEvent} */ MessageEvent currentWriteEvent; + ByteBuffer currentWriteBuffer; + boolean currentWriteBufferIsPooled; /** * Boolean that indicates that write operation is in progress. @@ -313,7 +316,7 @@ class NioDatagramChannel extends AbstractChannel public void run() { writeTaskInTaskQueue.set(false); - NioDatagramWorker.write(NioDatagramChannel.this, false); + worker.write(NioDatagramChannel.this, false); } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java index 26ec84bd58..55fa3bfbe5 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java @@ -83,14 +83,14 @@ class NioDatagramPipelineSink extends AbstractChannelSink { switch (state) { case OPEN: if (Boolean.FALSE.equals(value)) { - NioDatagramWorker.close(channel, future); + channel.worker.close(channel, future); } break; case BOUND: if (value != null) { bind(channel, future, (InetSocketAddress) value); } else { - NioDatagramWorker.close(channel, future); + channel.worker.close(channel, future); } break; case CONNECTED: @@ -101,15 +101,14 @@ class NioDatagramPipelineSink extends AbstractChannelSink { } break; case INTEREST_OPS: - NioDatagramWorker.setInterestOps(channel, future, ((Integer) value) - .intValue()); + channel.worker.setInterestOps(channel, future, ((Integer) value).intValue()); break; } } else if (e instanceof MessageEvent) { final MessageEvent event = (MessageEvent) e; final boolean offered = channel.writeBufferQueue.offer(event); assert offered; - NioDatagramWorker.write(channel, true); + channel.worker.write(channel, true); } } @@ -194,7 +193,7 @@ class NioDatagramPipelineSink extends AbstractChannelSink { fireExceptionCaught(channel, t); } finally { if (connected && !workerStarted) { - NioDatagramWorker.close(channel, future); + channel.worker.close(channel, future); } } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java index 50d80e5669..decc051f59 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java @@ -127,6 +127,8 @@ class NioDatagramWorker implements Runnable { private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation + private final DirectBufferPool directBufferPool = new DirectBufferPool(); + /** * Sole constructor. * @@ -369,7 +371,7 @@ class NioDatagramWorker implements Runnable { return false; } - private static void write(SelectionKey k) { + private void write(SelectionKey k) { write((NioDatagramChannel) k.attachment(), false); } @@ -380,7 +382,7 @@ class NioDatagramWorker implements Runnable { * * @param key The selection key which contains the Selector registration information. */ - private static boolean read(final SelectionKey key) { + private boolean read(final SelectionKey key) { final NioDatagramChannel channel = (NioDatagramChannel) key.attachment(); ReceiveBufferSizePredictor predictor = channel.getConfig().getReceiveBufferSizePredictor(); @@ -430,12 +432,12 @@ class NioDatagramWorker implements Runnable { return true; } - private static void close(SelectionKey k) { + private void close(SelectionKey k) { final NioDatagramChannel ch = (NioDatagramChannel) k.attachment(); close(ch, succeededFuture(ch)); } - static void write(final NioDatagramChannel channel, + void write(final NioDatagramChannel channel, final boolean mightNeedWakeup) { /* * Note that we are not checking if the channel is connected. Connected @@ -458,23 +460,19 @@ class NioDatagramWorker implements Runnable { } } - private static boolean scheduleWriteIfNecessary( - final NioDatagramChannel channel) { - final NioDatagramWorker worker = channel.worker; - final Thread workerThread = worker.thread; - + private boolean scheduleWriteIfNecessary(final NioDatagramChannel channel) { + final Thread workerThread = thread; if (workerThread == null || Thread.currentThread() != workerThread) { if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { // "add" the channels writeTask to the writeTaskQueue. - boolean offered = worker.writeTaskQueue - .offer(channel.writeTask); + boolean offered = writeTaskQueue.offer(channel.writeTask); assert offered; } - final Selector workerSelector = worker.selector; - if (workerSelector != null) { - if (worker.wakenUp.compareAndSet(false, true)) { - workerSelector.wakeup(); + final Selector selector = this.selector; + if (selector != null) { + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); } } return true; @@ -483,78 +481,92 @@ class NioDatagramWorker implements Runnable { return false; } - private static void writeNow(final NioDatagramChannel channel, + private void writeNow(final NioDatagramChannel channel, final int writeSpinCount) { boolean addOpWrite = false; boolean removeOpWrite = false; - MessageEvent evt; - ChannelBuffer buf; int writtenBytes = 0; Queue writeBuffer = channel.writeBufferQueue; synchronized (channel.writeLock) { // inform the channel that write is in-progress channel.inWriteNowLoop = true; - // get the write event. - evt = channel.currentWriteEvent; // loop forever... for (;;) { + MessageEvent evt = channel.currentWriteEvent; + ByteBuffer buf; if (evt == null) { - evt = writeBuffer.poll(); - if (evt == null) { - channel.currentWriteEvent = null; + if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) { removeOpWrite = true; break; } - evt = NioWorker.consolidateComposite(evt); - buf = (ChannelBuffer) evt.getMessage(); + ChannelBuffer origBuf = (ChannelBuffer) evt.getMessage(); + if (origBuf.isDirect()) { + channel.currentWriteBuffer = buf = origBuf.toByteBuffer(); + channel.currentWriteBufferIsPooled = false; + } else { + channel.currentWriteBuffer = buf = directBufferPool.acquire(origBuf); + channel.currentWriteBufferIsPooled = true; + } } else { - buf = (ChannelBuffer) evt.getMessage(); + buf = channel.currentWriteBuffer; } try { int localWrittenBytes = 0; - for (int i = writeSpinCount; i > 0; i --) { - if (evt.getRemoteAddress() == null) { - localWrittenBytes = - buf.getBytes( - buf.readerIndex(), - channel.getDatagramChannel(), - buf.readableBytes()); - } else { - localWrittenBytes = - channel.getDatagramChannel().send( - buf.toByteBuffer(), - evt.getRemoteAddress()); + java.nio.channels.DatagramChannel dch = channel.getDatagramChannel(); + SocketAddress raddr = evt.getRemoteAddress(); + if (raddr == null) { + for (int i = writeSpinCount; i > 0; i --) { + localWrittenBytes = dch.write(buf); + if (localWrittenBytes != 0) { + writtenBytes += localWrittenBytes; + break; + } } - - if (localWrittenBytes != 0) { - writtenBytes += localWrittenBytes; - break; + } else { + for (int i = writeSpinCount; i > 0; i --) { + localWrittenBytes = dch.send(buf, raddr); + if (localWrittenBytes != 0) { + writtenBytes += localWrittenBytes; + break; + } } } if (localWrittenBytes > 0) { // Successful write - proceed to the next message. - evt.getFuture().setSuccess(); + if (channel.currentWriteBufferIsPooled) { + directBufferPool.release(buf); + } + + ChannelFuture future = evt.getFuture(); + channel.currentWriteEvent = null; + channel.currentWriteBuffer = null; evt = null; + buf = null; + future.setSuccess(); } else { // Not written at all - perhaps the kernel buffer is full. - channel.currentWriteEvent = evt; addOpWrite = true; break; } } catch (final AsynchronousCloseException e) { // Doesn't need a user attention - ignore. - channel.currentWriteEvent = evt; } catch (final Throwable t) { + if (channel.currentWriteBufferIsPooled) { + directBufferPool.release(buf); + } + ChannelFuture future = evt.getFuture(); channel.currentWriteEvent = null; - evt.getFuture().setFailure(t); + channel.currentWriteBuffer = null; + buf = null; evt = null; + future.setFailure(t); fireExceptionCaught(channel, t); } } @@ -570,9 +582,8 @@ class NioDatagramWorker implements Runnable { } } - private static void setOpWrite(final NioDatagramChannel channel) { - NioDatagramWorker worker = channel.worker; - Selector selector = worker.selector; + private void setOpWrite(final NioDatagramChannel channel) { + Selector selector = this.selector; SelectionKey key = channel.getDatagramChannel().keyFor(selector); if (key == null) { return; @@ -600,9 +611,8 @@ class NioDatagramWorker implements Runnable { } } - private static void clearOpWrite(NioDatagramChannel channel) { - NioDatagramWorker worker = channel.worker; - Selector selector = worker.selector; + private void clearOpWrite(NioDatagramChannel channel) { + Selector selector = this.selector; SelectionKey key = channel.getDatagramChannel().keyFor(selector); if (key == null) { return; @@ -644,15 +654,13 @@ class NioDatagramWorker implements Runnable { } } - static void close(final NioDatagramChannel channel, + void close(final NioDatagramChannel channel, final ChannelFuture future) { - NioDatagramWorker worker = channel.worker; - boolean connected = channel.isConnected(); boolean bound = channel.isBound(); try { channel.getDatagramChannel().close(); - worker.cancelledKeys ++; + cancelledKeys ++; if (channel.setClosed()) { future.setSuccess(); @@ -674,16 +682,15 @@ class NioDatagramWorker implements Runnable { } } - private static void cleanUpWriteBuffer(final NioDatagramChannel channel) { + private void cleanUpWriteBuffer(final NioDatagramChannel channel) { Exception cause = null; boolean fireExceptionCaught = false; // Clean up the stale messages in the write buffer. synchronized (channel.writeLock) { MessageEvent evt = channel.currentWriteEvent; + ByteBuffer buf = channel.currentWriteBuffer; if (evt != null) { - channel.currentWriteEvent = null; - // Create the exception only once to avoid the excessive overhead // caused by fillStackTrace. if (channel.isOpen()) { @@ -691,7 +698,16 @@ class NioDatagramWorker implements Runnable { } else { cause = new ClosedChannelException(); } - evt.getFuture().setFailure(cause); + if (channel.currentWriteBufferIsPooled) { + directBufferPool.release(buf); + } + + ChannelFuture future = evt.getFuture(); + channel.currentWriteEvent = null; + channel.currentWriteBuffer = null; + buf = null; + evt = null; + future.setFailure(cause); fireExceptionCaught = true; } @@ -723,7 +739,7 @@ class NioDatagramWorker implements Runnable { } } - static void setInterestOps(final NioDatagramChannel channel, + void setInterestOps(final NioDatagramChannel channel, ChannelFuture future, int interestOps) { boolean changed = false; @@ -731,8 +747,7 @@ class NioDatagramWorker implements Runnable { // interestOps can change at any time and by any thread. // Acquire a lock to avoid possible race condition. synchronized (channel.interestOpsLock) { - final NioDatagramWorker worker = channel.worker; - final Selector selector = worker.selector; + final Selector selector = this.selector; final SelectionKey key = channel.getDatagramChannel().keyFor(selector); if (key == null || selector == null) { @@ -754,8 +769,8 @@ class NioDatagramWorker implements Runnable { // If the worker thread (the one that that might possibly be blocked // in a select() call) is not the thread executing this method wakeup // the select() operation. - if (Thread.currentThread() != worker.thread && - worker.wakenUp.compareAndSet(false, true)) { + if (Thread.currentThread() != thread && + wakenUp.compareAndSet(false, true)) { selector.wakeup(); } changed = true; @@ -764,7 +779,7 @@ class NioDatagramWorker implements Runnable { case 1: case 2: if (channel.getRawInterestOps() != interestOps) { - if (Thread.currentThread() == worker.thread) { + if (Thread.currentThread() == thread) { // Going to set the interestOps from the same thread. // Set the interesteOps on the SelectionKey key.interestOps(interestOps); @@ -772,15 +787,15 @@ class NioDatagramWorker implements Runnable { } else { // Going to set the interestOps from a different thread // and some old provides will need synchronization. - worker.selectorGuard.readLock().lock(); + selectorGuard.readLock().lock(); try { - if (worker.wakenUp.compareAndSet(false, true)) { + if (wakenUp.compareAndSet(false, true)) { selector.wakeup(); } key.interestOps(interestOps); changed = true; } finally { - worker.selectorGuard.readLock().unlock(); + selectorGuard.readLock().unlock(); } } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java index 2c1bef5c17..bcd8143b6f 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java @@ -116,17 +116,17 @@ class NioServerSocketPipelineSink extends AbstractChannelSink { switch (state) { case OPEN: if (Boolean.FALSE.equals(value)) { - NioWorker.close(channel, future); + channel.worker.close(channel, future); } break; case BOUND: case CONNECTED: if (value == null) { - NioWorker.close(channel, future); + channel.worker.close(channel, future); } break; case INTEREST_OPS: - NioWorker.setInterestOps(channel, future, ((Integer) value).intValue()); + channel.worker.setInterestOps(channel, future, ((Integer) value).intValue()); break; } } else if (e instanceof MessageEvent) { @@ -134,7 +134,7 @@ class NioServerSocketPipelineSink extends AbstractChannelSink { NioSocketChannel channel = (NioSocketChannel) event.getChannel(); boolean offered = channel.writeBuffer.offer(event); assert offered; - NioWorker.write(channel, true); + channel.worker.write(channel, true); } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java index db81e0f3c6..8d0e4791c7 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java @@ -19,6 +19,7 @@ import static org.jboss.netty.channel.Channels.*; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; @@ -69,7 +70,8 @@ class NioSocketChannel extends AbstractChannel volatile boolean inWriteNowLoop; MessageEvent currentWriteEvent; - int currentWriteIndex; + ByteBuffer currentWriteBuffer; + boolean currentWriteBufferIsPooled; public NioSocketChannel( Channel parent, ChannelFactory factory, @@ -255,7 +257,7 @@ class NioSocketChannel extends AbstractChannel public void run() { writeTaskInTaskQueue.set(false); - NioWorker.write(NioSocketChannel.this, false); + worker.write(NioSocketChannel.this, false); } } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index 5c83ad95ea..a2c6f158ce 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -19,6 +19,7 @@ import static org.jboss.netty.channel.Channels.*; import java.io.IOException; import java.net.SocketAddress; +import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; @@ -79,6 +80,7 @@ class NioWorker implements Runnable { private final Queue registerTaskQueue = new LinkedTransferQueue(); private final Queue writeTaskQueue = new LinkedTransferQueue(); private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation + private final DirectBufferPool directBufferPool = new DirectBufferPool(); NioWorker(int bossId, int id, Executor executor) { this.bossId = bossId; @@ -302,7 +304,7 @@ class NioWorker implements Runnable { return false; } - private static boolean read(SelectionKey k) { + private boolean read(SelectionKey k) { ScatteringByteChannel ch = (ScatteringByteChannel) k.channel(); NioSocketChannel channel = (NioSocketChannel) k.attachment(); @@ -347,17 +349,17 @@ class NioWorker implements Runnable { return true; } - private static void write(SelectionKey k) { + private void write(SelectionKey k) { NioSocketChannel ch = (NioSocketChannel) k.attachment(); write(ch, false); } - private static void close(SelectionKey k) { + private void close(SelectionKey k) { NioSocketChannel ch = (NioSocketChannel) k.attachment(); close(ch, succeededFuture(ch)); } - static void write(final NioSocketChannel channel, boolean mightNeedWakeup) { + void write(final NioSocketChannel channel, boolean mightNeedWakeup) { if (!channel.isConnected()) { cleanUpWriteBuffer(channel); return; @@ -374,21 +376,20 @@ class NioWorker implements Runnable { } } - private static boolean scheduleWriteIfNecessary(final NioSocketChannel channel) { - final NioWorker worker = channel.worker; + private boolean scheduleWriteIfNecessary(final NioSocketChannel channel) { final Thread currentThread = Thread.currentThread(); - final Thread workerThread = worker.thread; + final Thread workerThread = thread; if (currentThread != workerThread) { if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { - boolean offered = worker.writeTaskQueue.offer(channel.writeTask); + boolean offered = writeTaskQueue.offer(channel.writeTask); assert offered; } if (!(channel instanceof NioAcceptedSocketChannel) || ((NioAcceptedSocketChannel) channel).bossThread != currentThread) { - final Selector workerSelector = worker.selector; + final Selector workerSelector = selector; if (workerSelector != null) { - if (worker.wakenUp.compareAndSet(false, true)) { + if (wakenUp.compareAndSet(false, true)) { workerSelector.wakeup(); } } @@ -410,72 +411,76 @@ class NioWorker implements Runnable { return false; } - private static void writeNow(NioSocketChannel channel, int writeSpinCount) { + private void writeNow(NioSocketChannel channel, int writeSpinCount) { boolean open = true; boolean addOpWrite = false; boolean removeOpWrite = false; - MessageEvent evt; - ChannelBuffer buf; - int bufIdx; int writtenBytes = 0; Queue writeBuffer = channel.writeBuffer; synchronized (channel.writeLock) { channel.inWriteNowLoop = true; - evt = channel.currentWriteEvent; for (;;) { + MessageEvent evt = channel.currentWriteEvent; + ByteBuffer buf; if (evt == null) { - evt = writeBuffer.poll(); - if (evt == null) { - channel.currentWriteEvent = null; + if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) { removeOpWrite = true; break; } - evt = consolidateComposite(evt); - buf = (ChannelBuffer) evt.getMessage(); - bufIdx = buf.readerIndex(); + ChannelBuffer origBuf = (ChannelBuffer) evt.getMessage(); + if (origBuf.isDirect()) { + channel.currentWriteBuffer = buf = origBuf.toByteBuffer(); + channel.currentWriteBufferIsPooled = false; + } else { + channel.currentWriteBuffer = buf = directBufferPool.acquire(origBuf); + channel.currentWriteBufferIsPooled = true; + } } else { - buf = (ChannelBuffer) evt.getMessage(); - bufIdx = channel.currentWriteIndex; + buf = channel.currentWriteBuffer; } try { for (int i = writeSpinCount; i > 0; i --) { - int localWrittenBytes = buf.getBytes( - bufIdx, - channel.socket, - buf.writerIndex() - bufIdx); - + int localWrittenBytes = channel.socket.write(buf); if (localWrittenBytes != 0) { - bufIdx += localWrittenBytes; writtenBytes += localWrittenBytes; break; } } - if (bufIdx == buf.writerIndex()) { + if (!buf.hasRemaining()) { // Successful write - proceed to the next message. + if (channel.currentWriteBufferIsPooled) { + directBufferPool.release(buf); + } + + ChannelFuture future = evt.getFuture(); channel.currentWriteEvent = null; - evt.getFuture().setSuccess(); + channel.currentWriteBuffer = null; evt = null; + buf = null; + future.setSuccess(); } else { // Not written fully - perhaps the kernel buffer is full. - channel.currentWriteEvent = evt; - channel.currentWriteIndex = bufIdx; addOpWrite = true; break; } } catch (AsynchronousCloseException e) { // Doesn't need a user attention - ignore. - channel.currentWriteEvent = evt; - channel.currentWriteIndex = bufIdx; } catch (Throwable t) { + if (channel.currentWriteBufferIsPooled) { + directBufferPool.release(buf); + } + ChannelFuture future = evt.getFuture(); channel.currentWriteEvent = null; - evt.getFuture().setFailure(t); + channel.currentWriteBuffer = null; + buf = null; evt = null; + future.setFailure(t); fireExceptionCaught(channel, t); if (t instanceof IOException) { open = false; @@ -511,9 +516,8 @@ class NioWorker implements Runnable { return e; } - private static void setOpWrite(NioSocketChannel channel) { - NioWorker worker = channel.worker; - Selector selector = worker.selector; + private void setOpWrite(NioSocketChannel channel) { + Selector selector = this.selector; SelectionKey key = channel.socket.keyFor(selector); if (key == null) { return; @@ -541,9 +545,8 @@ class NioWorker implements Runnable { } } - private static void clearOpWrite(NioSocketChannel channel) { - NioWorker worker = channel.worker; - Selector selector = worker.selector; + private void clearOpWrite(NioSocketChannel channel) { + Selector selector = this.selector; SelectionKey key = channel.socket.keyFor(selector); if (key == null) { return; @@ -571,14 +574,12 @@ class NioWorker implements Runnable { } } - static void close(NioSocketChannel channel, ChannelFuture future) { - NioWorker worker = channel.worker; - + void close(NioSocketChannel channel, ChannelFuture future) { boolean connected = channel.isConnected(); boolean bound = channel.isBound(); try { channel.socket.close(); - worker.cancelledKeys ++; + cancelledKeys ++; if (channel.setClosed()) { future.setSuccess(); @@ -600,17 +601,15 @@ class NioWorker implements Runnable { } } - private static void cleanUpWriteBuffer(NioSocketChannel channel) { + private void cleanUpWriteBuffer(NioSocketChannel channel) { Exception cause = null; boolean fireExceptionCaught = false; // Clean up the stale messages in the write buffer. synchronized (channel.writeLock) { MessageEvent evt = channel.currentWriteEvent; + ByteBuffer buf = channel.currentWriteBuffer; if (evt != null) { - channel.currentWriteEvent = null; - channel.currentWriteIndex = 0; - // Create the exception only once to avoid the excessive overhead // caused by fillStackTrace. if (channel.isOpen()) { @@ -618,7 +617,17 @@ class NioWorker implements Runnable { } else { cause = new ClosedChannelException(); } - evt.getFuture().setFailure(cause); + + if (channel.currentWriteBufferIsPooled) { + directBufferPool.release(buf); + } + + ChannelFuture future = evt.getFuture(); + channel.currentWriteEvent = null; + channel.currentWriteBuffer = null; + buf = null; + evt = null; + future.setFailure(cause); fireExceptionCaught = true; } @@ -650,15 +659,14 @@ class NioWorker implements Runnable { } } - static void setInterestOps( + void setInterestOps( NioSocketChannel channel, ChannelFuture future, int interestOps) { boolean changed = false; try { // interestOps can change at any time and at any thread. // Acquire a lock to avoid possible race condition. synchronized (channel.interestOpsLock) { - NioWorker worker = channel.worker; - Selector selector = worker.selector; + Selector selector = this.selector; SelectionKey key = channel.socket.keyFor(selector); if (key == null || selector == null) { @@ -676,8 +684,8 @@ class NioWorker implements Runnable { case 0: if (channel.getRawInterestOps() != interestOps) { key.interestOps(interestOps); - if (Thread.currentThread() != worker.thread && - worker.wakenUp.compareAndSet(false, true)) { + if (Thread.currentThread() != thread && + wakenUp.compareAndSet(false, true)) { selector.wakeup(); } changed = true; @@ -686,19 +694,19 @@ class NioWorker implements Runnable { case 1: case 2: if (channel.getRawInterestOps() != interestOps) { - if (Thread.currentThread() == worker.thread) { + if (Thread.currentThread() == thread) { key.interestOps(interestOps); changed = true; } else { - worker.selectorGuard.readLock().lock(); + selectorGuard.readLock().lock(); try { - if (worker.wakenUp.compareAndSet(false, true)) { + if (wakenUp.compareAndSet(false, true)) { selector.wakeup(); } key.interestOps(interestOps); changed = true; } finally { - worker.selectorGuard.readLock().unlock(); + selectorGuard.readLock().unlock(); } } } diff --git a/src/main/java/org/jboss/netty/handler/codec/replay/ReplayingDecoderBuffer.java b/src/main/java/org/jboss/netty/handler/codec/replay/ReplayingDecoderBuffer.java index 9ef02732bf..93f1cee66a 100644 --- a/src/main/java/org/jboss/netty/handler/codec/replay/ReplayingDecoderBuffer.java +++ b/src/main/java/org/jboss/netty/handler/codec/replay/ReplayingDecoderBuffer.java @@ -58,6 +58,10 @@ class ReplayingDecoderBuffer implements ChannelBuffer { } } + public boolean isDirect() { + return buffer.isDirect(); + } + public boolean hasArray() { return false; }