Resolved issue: NETTY-282 Reduce memory copy between heap buffers and direct buffers in NIO transport

* Replaced JDK's internal direct buffer pool with a custom pool optimized for Netty
* Added ChannelBuffer.isDirect()
* Cleaned up NioWorker.writeNow() and NioDatagramWorker.writeNow()
This commit is contained in:
Trustin Lee 2010-01-27 05:07:32 +00:00
parent 030ece5bf9
commit 36e804bbec
18 changed files with 333 additions and 152 deletions

View File

@ -70,6 +70,10 @@ public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer {
} }
} }
public boolean isDirect() {
return buffer.isDirect();
}
public ByteOrder order() { public ByteOrder order() {
return order; return order;
} }

View File

@ -251,6 +251,12 @@ public interface ChannelBuffer extends Comparable<ChannelBuffer> {
*/ */
ByteOrder order(); ByteOrder order();
/**
* Returns {@code true} if and only if this buffer is backed by an
* NIO direct buffer.
*/
boolean isDirect();
/** /**
* Returns the {@code readerIndex} of this buffer. * Returns the {@code readerIndex} of this buffer.
*/ */

View File

@ -148,6 +148,10 @@ public class CompositeChannelBuffer extends AbstractChannelBuffer {
return order; return order;
} }
public boolean isDirect() {
return false;
}
public boolean hasArray() { public boolean hasArray() {
return false; return false;
} }

View File

@ -64,6 +64,10 @@ public class DuplicatedChannelBuffer extends AbstractChannelBuffer implements Wr
return buffer.order(); return buffer.order();
} }
public boolean isDirect() {
return buffer.isDirect();
}
public int capacity() { public int capacity() {
return buffer.capacity(); return buffer.capacity();
} }

View File

@ -94,6 +94,10 @@ public class DynamicChannelBuffer extends AbstractChannelBuffer {
return endianness; return endianness;
} }
public boolean isDirect() {
return buffer.isDirect();
}
public int capacity() { public int capacity() {
return buffer.capacity(); return buffer.capacity();
} }

View File

@ -71,6 +71,10 @@ public abstract class HeapChannelBuffer extends AbstractChannelBuffer {
setIndex(readerIndex, writerIndex); setIndex(readerIndex, writerIndex);
} }
public boolean isDirect() {
return false;
}
public int capacity() { public int capacity() {
return array.length; return array.length;
} }

View File

@ -64,6 +64,10 @@ public class ReadOnlyChannelBuffer extends AbstractChannelBuffer implements Wrap
return buffer.order(); return buffer.order();
} }
public boolean isDirect() {
return buffer.isDirect();
}
public boolean hasArray() { public boolean hasArray() {
return false; return false;
} }

View File

@ -69,6 +69,10 @@ public class SlicedChannelBuffer extends AbstractChannelBuffer implements Wrappe
return buffer.order(); return buffer.order();
} }
public boolean isDirect() {
return buffer.isDirect();
}
public int capacity() { public int capacity() {
return length; return length;
} }

View File

@ -63,6 +63,10 @@ public class TruncatedChannelBuffer extends AbstractChannelBuffer implements Wra
return buffer.order(); return buffer.order();
} }
public boolean isDirect() {
return buffer.isDirect();
}
public int capacity() { public int capacity() {
return length; return length;
} }

View File

@ -0,0 +1,112 @@
/*
* Copyright 2010 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import org.jboss.netty.buffer.ChannelBuffer;
/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @version $Rev$, $Date$
*/
final class DirectBufferPool {
private final SoftReference<ByteBuffer>[] pool;
DirectBufferPool() {
this(4);
}
@SuppressWarnings("unchecked")
DirectBufferPool(int poolSize) {
pool = new SoftReference[poolSize];
}
final ByteBuffer acquire(ChannelBuffer src) {
ByteBuffer dst = acquire(src.readableBytes());
src.getBytes(src.readerIndex(), dst);
dst.rewind();
return dst;
}
private final ByteBuffer acquire(int size) {
for (int i = 0; i < pool.length; i ++) {
SoftReference<ByteBuffer> ref = pool[i];
if (ref == null) {
continue;
}
ByteBuffer buf = ref.get();
if (buf == null) {
pool[i] = null;
continue;
}
if (buf.capacity() < size) {
continue;
}
pool[i] = null;
buf.rewind();
buf.limit(size);
return buf;
}
ByteBuffer buf = ByteBuffer.allocateDirect(normalizeCapacity(size));
buf.limit(size);
return buf;
}
final void release(ByteBuffer buffer) {
if (buffer == null) {
return;
}
for (int i = 0; i < pool.length; i ++) {
SoftReference<ByteBuffer> ref = pool[i];
if (ref == null || ref.get() == null) {
pool[i] = new SoftReference<ByteBuffer>(buffer);
return;
}
}
// pool is full - replace one
for (int i = 0; i< pool.length; i ++) {
SoftReference<ByteBuffer> ref = pool[i];
ByteBuffer pooled = ref.get();
if (pooled == null) {
pool[i] = null;
continue;
}
if (pooled.capacity() < buffer.capacity()) {
pool[i] = new SoftReference<ByteBuffer>(buffer);
return;
}
}
}
static final int normalizeCapacity(int capacity) {
// Normalize to multiple of 4096.
// Strictly speaking, 4096 should be normalized to 4096,
// but it becomes 8192 to keep the calculation simplistic.
return (capacity & 0xfffff000) + 0x1000;
}
}

View File

@ -88,25 +88,25 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
switch (state) { switch (state) {
case OPEN: case OPEN:
if (Boolean.FALSE.equals(value)) { if (Boolean.FALSE.equals(value)) {
NioWorker.close(channel, future); channel.worker.close(channel, future);
} }
break; break;
case BOUND: case BOUND:
if (value != null) { if (value != null) {
bind(channel, future, (SocketAddress) value); bind(channel, future, (SocketAddress) value);
} else { } else {
NioWorker.close(channel, future); channel.worker.close(channel, future);
} }
break; break;
case CONNECTED: case CONNECTED:
if (value != null) { if (value != null) {
connect(channel, future, (SocketAddress) value); connect(channel, future, (SocketAddress) value);
} else { } else {
NioWorker.close(channel, future); channel.worker.close(channel, future);
} }
break; break;
case INTEREST_OPS: case INTEREST_OPS:
NioWorker.setInterestOps(channel, future, ((Integer) value).intValue()); channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
break; break;
} }
} else if (e instanceof MessageEvent) { } else if (e instanceof MessageEvent) {
@ -114,7 +114,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
NioSocketChannel channel = (NioSocketChannel) event.getChannel(); NioSocketChannel channel = (NioSocketChannel) event.getChannel();
boolean offered = channel.writeBuffer.offer(event); boolean offered = channel.writeBuffer.offer(event);
assert offered; assert offered;
NioWorker.write(channel, true); channel.worker.write(channel, true);
} }
} }
@ -156,7 +156,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
} catch (Throwable t) { } catch (Throwable t) {
cf.setFailure(t); cf.setFailure(t);
fireExceptionCaught(channel, t); fireExceptionCaught(channel, t);
NioWorker.close(channel, succeededFuture(channel)); channel.worker.close(channel, succeededFuture(channel));
} }
} }
@ -373,7 +373,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
ch.connectFuture.setFailure(cause); ch.connectFuture.setFailure(cause);
fireExceptionCaught(ch, cause); fireExceptionCaught(ch, cause);
NioWorker.close(ch, succeededFuture(ch)); ch.worker.close(ch, succeededFuture(ch));
} }
} }
} }
@ -388,13 +388,13 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
} catch (Throwable t) { } catch (Throwable t) {
ch.connectFuture.setFailure(t); ch.connectFuture.setFailure(t);
fireExceptionCaught(ch, t); fireExceptionCaught(ch, t);
NioWorker.close(ch, succeededFuture(ch)); ch.worker.close(ch, succeededFuture(ch));
} }
} }
private void close(SelectionKey k) { private void close(SelectionKey k) {
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
NioWorker.close(ch, succeededFuture(ch)); ch.worker.close(ch, succeededFuture(ch));
} }
} }
@ -412,7 +412,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
channel.socket.register( channel.socket.register(
boss.selector, SelectionKey.OP_CONNECT, channel); boss.selector, SelectionKey.OP_CONNECT, channel);
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
NioWorker.close(channel, succeededFuture(channel)); channel.worker.close(channel, succeededFuture(channel));
} }
int connectTimeout = channel.getConfig().getConnectTimeoutMillis(); int connectTimeout = channel.getConfig().getConnectTimeoutMillis();

View File

@ -22,6 +22,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.NetworkInterface; import java.net.NetworkInterface;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel; import java.nio.channels.DatagramChannel;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -107,6 +108,8 @@ class NioDatagramChannel extends AbstractChannel
* The current write {@link MessageEvent} * The current write {@link MessageEvent}
*/ */
MessageEvent currentWriteEvent; MessageEvent currentWriteEvent;
ByteBuffer currentWriteBuffer;
boolean currentWriteBufferIsPooled;
/** /**
* Boolean that indicates that write operation is in progress. * Boolean that indicates that write operation is in progress.
@ -313,7 +316,7 @@ class NioDatagramChannel extends AbstractChannel
public void run() { public void run() {
writeTaskInTaskQueue.set(false); writeTaskInTaskQueue.set(false);
NioDatagramWorker.write(NioDatagramChannel.this, false); worker.write(NioDatagramChannel.this, false);
} }
} }

View File

@ -83,14 +83,14 @@ class NioDatagramPipelineSink extends AbstractChannelSink {
switch (state) { switch (state) {
case OPEN: case OPEN:
if (Boolean.FALSE.equals(value)) { if (Boolean.FALSE.equals(value)) {
NioDatagramWorker.close(channel, future); channel.worker.close(channel, future);
} }
break; break;
case BOUND: case BOUND:
if (value != null) { if (value != null) {
bind(channel, future, (InetSocketAddress) value); bind(channel, future, (InetSocketAddress) value);
} else { } else {
NioDatagramWorker.close(channel, future); channel.worker.close(channel, future);
} }
break; break;
case CONNECTED: case CONNECTED:
@ -101,15 +101,14 @@ class NioDatagramPipelineSink extends AbstractChannelSink {
} }
break; break;
case INTEREST_OPS: case INTEREST_OPS:
NioDatagramWorker.setInterestOps(channel, future, ((Integer) value) channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
.intValue());
break; break;
} }
} else if (e instanceof MessageEvent) { } else if (e instanceof MessageEvent) {
final MessageEvent event = (MessageEvent) e; final MessageEvent event = (MessageEvent) e;
final boolean offered = channel.writeBufferQueue.offer(event); final boolean offered = channel.writeBufferQueue.offer(event);
assert offered; assert offered;
NioDatagramWorker.write(channel, true); channel.worker.write(channel, true);
} }
} }
@ -194,7 +193,7 @@ class NioDatagramPipelineSink extends AbstractChannelSink {
fireExceptionCaught(channel, t); fireExceptionCaught(channel, t);
} finally { } finally {
if (connected && !workerStarted) { if (connected && !workerStarted) {
NioDatagramWorker.close(channel, future); channel.worker.close(channel, future);
} }
} }
} }

View File

@ -127,6 +127,8 @@ class NioDatagramWorker implements Runnable {
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
private final DirectBufferPool directBufferPool = new DirectBufferPool();
/** /**
* Sole constructor. * Sole constructor.
* *
@ -369,7 +371,7 @@ class NioDatagramWorker implements Runnable {
return false; return false;
} }
private static void write(SelectionKey k) { private void write(SelectionKey k) {
write((NioDatagramChannel) k.attachment(), false); write((NioDatagramChannel) k.attachment(), false);
} }
@ -380,7 +382,7 @@ class NioDatagramWorker implements Runnable {
* *
* @param key The selection key which contains the Selector registration information. * @param key The selection key which contains the Selector registration information.
*/ */
private static boolean read(final SelectionKey key) { private boolean read(final SelectionKey key) {
final NioDatagramChannel channel = (NioDatagramChannel) key.attachment(); final NioDatagramChannel channel = (NioDatagramChannel) key.attachment();
ReceiveBufferSizePredictor predictor = ReceiveBufferSizePredictor predictor =
channel.getConfig().getReceiveBufferSizePredictor(); channel.getConfig().getReceiveBufferSizePredictor();
@ -430,12 +432,12 @@ class NioDatagramWorker implements Runnable {
return true; return true;
} }
private static void close(SelectionKey k) { private void close(SelectionKey k) {
final NioDatagramChannel ch = (NioDatagramChannel) k.attachment(); final NioDatagramChannel ch = (NioDatagramChannel) k.attachment();
close(ch, succeededFuture(ch)); close(ch, succeededFuture(ch));
} }
static void write(final NioDatagramChannel channel, void write(final NioDatagramChannel channel,
final boolean mightNeedWakeup) { final boolean mightNeedWakeup) {
/* /*
* Note that we are not checking if the channel is connected. Connected * Note that we are not checking if the channel is connected. Connected
@ -458,23 +460,19 @@ class NioDatagramWorker implements Runnable {
} }
} }
private static boolean scheduleWriteIfNecessary( private boolean scheduleWriteIfNecessary(final NioDatagramChannel channel) {
final NioDatagramChannel channel) { final Thread workerThread = thread;
final NioDatagramWorker worker = channel.worker;
final Thread workerThread = worker.thread;
if (workerThread == null || Thread.currentThread() != workerThread) { if (workerThread == null || Thread.currentThread() != workerThread) {
if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
// "add" the channels writeTask to the writeTaskQueue. // "add" the channels writeTask to the writeTaskQueue.
boolean offered = worker.writeTaskQueue boolean offered = writeTaskQueue.offer(channel.writeTask);
.offer(channel.writeTask);
assert offered; assert offered;
} }
final Selector workerSelector = worker.selector; final Selector selector = this.selector;
if (workerSelector != null) { if (selector != null) {
if (worker.wakenUp.compareAndSet(false, true)) { if (wakenUp.compareAndSet(false, true)) {
workerSelector.wakeup(); selector.wakeup();
} }
} }
return true; return true;
@ -483,78 +481,92 @@ class NioDatagramWorker implements Runnable {
return false; return false;
} }
private static void writeNow(final NioDatagramChannel channel, private void writeNow(final NioDatagramChannel channel,
final int writeSpinCount) { final int writeSpinCount) {
boolean addOpWrite = false; boolean addOpWrite = false;
boolean removeOpWrite = false; boolean removeOpWrite = false;
MessageEvent evt;
ChannelBuffer buf;
int writtenBytes = 0; int writtenBytes = 0;
Queue<MessageEvent> writeBuffer = channel.writeBufferQueue; Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
synchronized (channel.writeLock) { synchronized (channel.writeLock) {
// inform the channel that write is in-progress // inform the channel that write is in-progress
channel.inWriteNowLoop = true; channel.inWriteNowLoop = true;
// get the write event.
evt = channel.currentWriteEvent;
// loop forever... // loop forever...
for (;;) { for (;;) {
MessageEvent evt = channel.currentWriteEvent;
ByteBuffer buf;
if (evt == null) { if (evt == null) {
evt = writeBuffer.poll(); if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
if (evt == null) {
channel.currentWriteEvent = null;
removeOpWrite = true; removeOpWrite = true;
break; break;
} }
evt = NioWorker.consolidateComposite(evt); ChannelBuffer origBuf = (ChannelBuffer) evt.getMessage();
buf = (ChannelBuffer) evt.getMessage(); if (origBuf.isDirect()) {
channel.currentWriteBuffer = buf = origBuf.toByteBuffer();
channel.currentWriteBufferIsPooled = false;
} else {
channel.currentWriteBuffer = buf = directBufferPool.acquire(origBuf);
channel.currentWriteBufferIsPooled = true;
}
} else { } else {
buf = (ChannelBuffer) evt.getMessage(); buf = channel.currentWriteBuffer;
} }
try { try {
int localWrittenBytes = 0; int localWrittenBytes = 0;
for (int i = writeSpinCount; i > 0; i --) { java.nio.channels.DatagramChannel dch = channel.getDatagramChannel();
if (evt.getRemoteAddress() == null) { SocketAddress raddr = evt.getRemoteAddress();
localWrittenBytes = if (raddr == null) {
buf.getBytes( for (int i = writeSpinCount; i > 0; i --) {
buf.readerIndex(), localWrittenBytes = dch.write(buf);
channel.getDatagramChannel(), if (localWrittenBytes != 0) {
buf.readableBytes()); writtenBytes += localWrittenBytes;
} else { break;
localWrittenBytes = }
channel.getDatagramChannel().send(
buf.toByteBuffer(),
evt.getRemoteAddress());
} }
} else {
if (localWrittenBytes != 0) { for (int i = writeSpinCount; i > 0; i --) {
writtenBytes += localWrittenBytes; localWrittenBytes = dch.send(buf, raddr);
break; if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
break;
}
} }
} }
if (localWrittenBytes > 0) { if (localWrittenBytes > 0) {
// Successful write - proceed to the next message. // Successful write - proceed to the next message.
evt.getFuture().setSuccess(); if (channel.currentWriteBufferIsPooled) {
directBufferPool.release(buf);
}
ChannelFuture future = evt.getFuture();
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
evt = null; evt = null;
buf = null;
future.setSuccess();
} else { } else {
// Not written at all - perhaps the kernel buffer is full. // Not written at all - perhaps the kernel buffer is full.
channel.currentWriteEvent = evt;
addOpWrite = true; addOpWrite = true;
break; break;
} }
} catch (final AsynchronousCloseException e) { } catch (final AsynchronousCloseException e) {
// Doesn't need a user attention - ignore. // Doesn't need a user attention - ignore.
channel.currentWriteEvent = evt;
} catch (final Throwable t) { } catch (final Throwable t) {
if (channel.currentWriteBufferIsPooled) {
directBufferPool.release(buf);
}
ChannelFuture future = evt.getFuture();
channel.currentWriteEvent = null; channel.currentWriteEvent = null;
evt.getFuture().setFailure(t); channel.currentWriteBuffer = null;
buf = null;
evt = null; evt = null;
future.setFailure(t);
fireExceptionCaught(channel, t); fireExceptionCaught(channel, t);
} }
} }
@ -570,9 +582,8 @@ class NioDatagramWorker implements Runnable {
} }
} }
private static void setOpWrite(final NioDatagramChannel channel) { private void setOpWrite(final NioDatagramChannel channel) {
NioDatagramWorker worker = channel.worker; Selector selector = this.selector;
Selector selector = worker.selector;
SelectionKey key = channel.getDatagramChannel().keyFor(selector); SelectionKey key = channel.getDatagramChannel().keyFor(selector);
if (key == null) { if (key == null) {
return; return;
@ -600,9 +611,8 @@ class NioDatagramWorker implements Runnable {
} }
} }
private static void clearOpWrite(NioDatagramChannel channel) { private void clearOpWrite(NioDatagramChannel channel) {
NioDatagramWorker worker = channel.worker; Selector selector = this.selector;
Selector selector = worker.selector;
SelectionKey key = channel.getDatagramChannel().keyFor(selector); SelectionKey key = channel.getDatagramChannel().keyFor(selector);
if (key == null) { if (key == null) {
return; return;
@ -644,15 +654,13 @@ class NioDatagramWorker implements Runnable {
} }
} }
static void close(final NioDatagramChannel channel, void close(final NioDatagramChannel channel,
final ChannelFuture future) { final ChannelFuture future) {
NioDatagramWorker worker = channel.worker;
boolean connected = channel.isConnected(); boolean connected = channel.isConnected();
boolean bound = channel.isBound(); boolean bound = channel.isBound();
try { try {
channel.getDatagramChannel().close(); channel.getDatagramChannel().close();
worker.cancelledKeys ++; cancelledKeys ++;
if (channel.setClosed()) { if (channel.setClosed()) {
future.setSuccess(); future.setSuccess();
@ -674,16 +682,15 @@ class NioDatagramWorker implements Runnable {
} }
} }
private static void cleanUpWriteBuffer(final NioDatagramChannel channel) { private void cleanUpWriteBuffer(final NioDatagramChannel channel) {
Exception cause = null; Exception cause = null;
boolean fireExceptionCaught = false; boolean fireExceptionCaught = false;
// Clean up the stale messages in the write buffer. // Clean up the stale messages in the write buffer.
synchronized (channel.writeLock) { synchronized (channel.writeLock) {
MessageEvent evt = channel.currentWriteEvent; MessageEvent evt = channel.currentWriteEvent;
ByteBuffer buf = channel.currentWriteBuffer;
if (evt != null) { if (evt != null) {
channel.currentWriteEvent = null;
// Create the exception only once to avoid the excessive overhead // Create the exception only once to avoid the excessive overhead
// caused by fillStackTrace. // caused by fillStackTrace.
if (channel.isOpen()) { if (channel.isOpen()) {
@ -691,7 +698,16 @@ class NioDatagramWorker implements Runnable {
} else { } else {
cause = new ClosedChannelException(); cause = new ClosedChannelException();
} }
evt.getFuture().setFailure(cause); if (channel.currentWriteBufferIsPooled) {
directBufferPool.release(buf);
}
ChannelFuture future = evt.getFuture();
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
buf = null;
evt = null;
future.setFailure(cause);
fireExceptionCaught = true; fireExceptionCaught = true;
} }
@ -723,7 +739,7 @@ class NioDatagramWorker implements Runnable {
} }
} }
static void setInterestOps(final NioDatagramChannel channel, void setInterestOps(final NioDatagramChannel channel,
ChannelFuture future, int interestOps) { ChannelFuture future, int interestOps) {
boolean changed = false; boolean changed = false;
@ -731,8 +747,7 @@ class NioDatagramWorker implements Runnable {
// interestOps can change at any time and by any thread. // interestOps can change at any time and by any thread.
// Acquire a lock to avoid possible race condition. // Acquire a lock to avoid possible race condition.
synchronized (channel.interestOpsLock) { synchronized (channel.interestOpsLock) {
final NioDatagramWorker worker = channel.worker; final Selector selector = this.selector;
final Selector selector = worker.selector;
final SelectionKey key = channel.getDatagramChannel().keyFor(selector); final SelectionKey key = channel.getDatagramChannel().keyFor(selector);
if (key == null || selector == null) { if (key == null || selector == null) {
@ -754,8 +769,8 @@ class NioDatagramWorker implements Runnable {
// If the worker thread (the one that that might possibly be blocked // If the worker thread (the one that that might possibly be blocked
// in a select() call) is not the thread executing this method wakeup // in a select() call) is not the thread executing this method wakeup
// the select() operation. // the select() operation.
if (Thread.currentThread() != worker.thread && if (Thread.currentThread() != thread &&
worker.wakenUp.compareAndSet(false, true)) { wakenUp.compareAndSet(false, true)) {
selector.wakeup(); selector.wakeup();
} }
changed = true; changed = true;
@ -764,7 +779,7 @@ class NioDatagramWorker implements Runnable {
case 1: case 1:
case 2: case 2:
if (channel.getRawInterestOps() != interestOps) { if (channel.getRawInterestOps() != interestOps) {
if (Thread.currentThread() == worker.thread) { if (Thread.currentThread() == thread) {
// Going to set the interestOps from the same thread. // Going to set the interestOps from the same thread.
// Set the interesteOps on the SelectionKey // Set the interesteOps on the SelectionKey
key.interestOps(interestOps); key.interestOps(interestOps);
@ -772,15 +787,15 @@ class NioDatagramWorker implements Runnable {
} else { } else {
// Going to set the interestOps from a different thread // Going to set the interestOps from a different thread
// and some old provides will need synchronization. // and some old provides will need synchronization.
worker.selectorGuard.readLock().lock(); selectorGuard.readLock().lock();
try { try {
if (worker.wakenUp.compareAndSet(false, true)) { if (wakenUp.compareAndSet(false, true)) {
selector.wakeup(); selector.wakeup();
} }
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true; changed = true;
} finally { } finally {
worker.selectorGuard.readLock().unlock(); selectorGuard.readLock().unlock();
} }
} }
} }

View File

@ -116,17 +116,17 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
switch (state) { switch (state) {
case OPEN: case OPEN:
if (Boolean.FALSE.equals(value)) { if (Boolean.FALSE.equals(value)) {
NioWorker.close(channel, future); channel.worker.close(channel, future);
} }
break; break;
case BOUND: case BOUND:
case CONNECTED: case CONNECTED:
if (value == null) { if (value == null) {
NioWorker.close(channel, future); channel.worker.close(channel, future);
} }
break; break;
case INTEREST_OPS: case INTEREST_OPS:
NioWorker.setInterestOps(channel, future, ((Integer) value).intValue()); channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
break; break;
} }
} else if (e instanceof MessageEvent) { } else if (e instanceof MessageEvent) {
@ -134,7 +134,7 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
NioSocketChannel channel = (NioSocketChannel) event.getChannel(); NioSocketChannel channel = (NioSocketChannel) event.getChannel();
boolean offered = channel.writeBuffer.offer(event); boolean offered = channel.writeBuffer.offer(event);
assert offered; assert offered;
NioWorker.write(channel, true); channel.worker.write(channel, true);
} }
} }

View File

@ -19,6 +19,7 @@ import static org.jboss.netty.channel.Channels.*;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -69,7 +70,8 @@ class NioSocketChannel extends AbstractChannel
volatile boolean inWriteNowLoop; volatile boolean inWriteNowLoop;
MessageEvent currentWriteEvent; MessageEvent currentWriteEvent;
int currentWriteIndex; ByteBuffer currentWriteBuffer;
boolean currentWriteBufferIsPooled;
public NioSocketChannel( public NioSocketChannel(
Channel parent, ChannelFactory factory, Channel parent, ChannelFactory factory,
@ -255,7 +257,7 @@ class NioSocketChannel extends AbstractChannel
public void run() { public void run() {
writeTaskInTaskQueue.set(false); writeTaskInTaskQueue.set(false);
NioWorker.write(NioSocketChannel.this, false); worker.write(NioSocketChannel.this, false);
} }
} }
} }

View File

@ -19,6 +19,7 @@ import static org.jboss.netty.channel.Channels.*;
import java.io.IOException; import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException; import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.CancelledKeyException; import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
@ -79,6 +80,7 @@ class NioWorker implements Runnable {
private final Queue<Runnable> registerTaskQueue = new LinkedTransferQueue<Runnable>(); private final Queue<Runnable> registerTaskQueue = new LinkedTransferQueue<Runnable>();
private final Queue<Runnable> writeTaskQueue = new LinkedTransferQueue<Runnable>(); private final Queue<Runnable> writeTaskQueue = new LinkedTransferQueue<Runnable>();
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
private final DirectBufferPool directBufferPool = new DirectBufferPool();
NioWorker(int bossId, int id, Executor executor) { NioWorker(int bossId, int id, Executor executor) {
this.bossId = bossId; this.bossId = bossId;
@ -302,7 +304,7 @@ class NioWorker implements Runnable {
return false; return false;
} }
private static boolean read(SelectionKey k) { private boolean read(SelectionKey k) {
ScatteringByteChannel ch = (ScatteringByteChannel) k.channel(); ScatteringByteChannel ch = (ScatteringByteChannel) k.channel();
NioSocketChannel channel = (NioSocketChannel) k.attachment(); NioSocketChannel channel = (NioSocketChannel) k.attachment();
@ -347,17 +349,17 @@ class NioWorker implements Runnable {
return true; return true;
} }
private static void write(SelectionKey k) { private void write(SelectionKey k) {
NioSocketChannel ch = (NioSocketChannel) k.attachment(); NioSocketChannel ch = (NioSocketChannel) k.attachment();
write(ch, false); write(ch, false);
} }
private static void close(SelectionKey k) { private void close(SelectionKey k) {
NioSocketChannel ch = (NioSocketChannel) k.attachment(); NioSocketChannel ch = (NioSocketChannel) k.attachment();
close(ch, succeededFuture(ch)); close(ch, succeededFuture(ch));
} }
static void write(final NioSocketChannel channel, boolean mightNeedWakeup) { void write(final NioSocketChannel channel, boolean mightNeedWakeup) {
if (!channel.isConnected()) { if (!channel.isConnected()) {
cleanUpWriteBuffer(channel); cleanUpWriteBuffer(channel);
return; return;
@ -374,21 +376,20 @@ class NioWorker implements Runnable {
} }
} }
private static boolean scheduleWriteIfNecessary(final NioSocketChannel channel) { private boolean scheduleWriteIfNecessary(final NioSocketChannel channel) {
final NioWorker worker = channel.worker;
final Thread currentThread = Thread.currentThread(); final Thread currentThread = Thread.currentThread();
final Thread workerThread = worker.thread; final Thread workerThread = thread;
if (currentThread != workerThread) { if (currentThread != workerThread) {
if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
boolean offered = worker.writeTaskQueue.offer(channel.writeTask); boolean offered = writeTaskQueue.offer(channel.writeTask);
assert offered; assert offered;
} }
if (!(channel instanceof NioAcceptedSocketChannel) || if (!(channel instanceof NioAcceptedSocketChannel) ||
((NioAcceptedSocketChannel) channel).bossThread != currentThread) { ((NioAcceptedSocketChannel) channel).bossThread != currentThread) {
final Selector workerSelector = worker.selector; final Selector workerSelector = selector;
if (workerSelector != null) { if (workerSelector != null) {
if (worker.wakenUp.compareAndSet(false, true)) { if (wakenUp.compareAndSet(false, true)) {
workerSelector.wakeup(); workerSelector.wakeup();
} }
} }
@ -410,72 +411,76 @@ class NioWorker implements Runnable {
return false; return false;
} }
private static void writeNow(NioSocketChannel channel, int writeSpinCount) { private void writeNow(NioSocketChannel channel, int writeSpinCount) {
boolean open = true; boolean open = true;
boolean addOpWrite = false; boolean addOpWrite = false;
boolean removeOpWrite = false; boolean removeOpWrite = false;
MessageEvent evt;
ChannelBuffer buf;
int bufIdx;
int writtenBytes = 0; int writtenBytes = 0;
Queue<MessageEvent> writeBuffer = channel.writeBuffer; Queue<MessageEvent> writeBuffer = channel.writeBuffer;
synchronized (channel.writeLock) { synchronized (channel.writeLock) {
channel.inWriteNowLoop = true; channel.inWriteNowLoop = true;
evt = channel.currentWriteEvent;
for (;;) { for (;;) {
MessageEvent evt = channel.currentWriteEvent;
ByteBuffer buf;
if (evt == null) { if (evt == null) {
evt = writeBuffer.poll(); if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
if (evt == null) {
channel.currentWriteEvent = null;
removeOpWrite = true; removeOpWrite = true;
break; break;
} }
evt = consolidateComposite(evt); ChannelBuffer origBuf = (ChannelBuffer) evt.getMessage();
buf = (ChannelBuffer) evt.getMessage(); if (origBuf.isDirect()) {
bufIdx = buf.readerIndex(); channel.currentWriteBuffer = buf = origBuf.toByteBuffer();
channel.currentWriteBufferIsPooled = false;
} else {
channel.currentWriteBuffer = buf = directBufferPool.acquire(origBuf);
channel.currentWriteBufferIsPooled = true;
}
} else { } else {
buf = (ChannelBuffer) evt.getMessage(); buf = channel.currentWriteBuffer;
bufIdx = channel.currentWriteIndex;
} }
try { try {
for (int i = writeSpinCount; i > 0; i --) { for (int i = writeSpinCount; i > 0; i --) {
int localWrittenBytes = buf.getBytes( int localWrittenBytes = channel.socket.write(buf);
bufIdx,
channel.socket,
buf.writerIndex() - bufIdx);
if (localWrittenBytes != 0) { if (localWrittenBytes != 0) {
bufIdx += localWrittenBytes;
writtenBytes += localWrittenBytes; writtenBytes += localWrittenBytes;
break; break;
} }
} }
if (bufIdx == buf.writerIndex()) { if (!buf.hasRemaining()) {
// Successful write - proceed to the next message. // Successful write - proceed to the next message.
if (channel.currentWriteBufferIsPooled) {
directBufferPool.release(buf);
}
ChannelFuture future = evt.getFuture();
channel.currentWriteEvent = null; channel.currentWriteEvent = null;
evt.getFuture().setSuccess(); channel.currentWriteBuffer = null;
evt = null; evt = null;
buf = null;
future.setSuccess();
} else { } else {
// Not written fully - perhaps the kernel buffer is full. // Not written fully - perhaps the kernel buffer is full.
channel.currentWriteEvent = evt;
channel.currentWriteIndex = bufIdx;
addOpWrite = true; addOpWrite = true;
break; break;
} }
} catch (AsynchronousCloseException e) { } catch (AsynchronousCloseException e) {
// Doesn't need a user attention - ignore. // Doesn't need a user attention - ignore.
channel.currentWriteEvent = evt;
channel.currentWriteIndex = bufIdx;
} catch (Throwable t) { } catch (Throwable t) {
if (channel.currentWriteBufferIsPooled) {
directBufferPool.release(buf);
}
ChannelFuture future = evt.getFuture();
channel.currentWriteEvent = null; channel.currentWriteEvent = null;
evt.getFuture().setFailure(t); channel.currentWriteBuffer = null;
buf = null;
evt = null; evt = null;
future.setFailure(t);
fireExceptionCaught(channel, t); fireExceptionCaught(channel, t);
if (t instanceof IOException) { if (t instanceof IOException) {
open = false; open = false;
@ -511,9 +516,8 @@ class NioWorker implements Runnable {
return e; return e;
} }
private static void setOpWrite(NioSocketChannel channel) { private void setOpWrite(NioSocketChannel channel) {
NioWorker worker = channel.worker; Selector selector = this.selector;
Selector selector = worker.selector;
SelectionKey key = channel.socket.keyFor(selector); SelectionKey key = channel.socket.keyFor(selector);
if (key == null) { if (key == null) {
return; return;
@ -541,9 +545,8 @@ class NioWorker implements Runnable {
} }
} }
private static void clearOpWrite(NioSocketChannel channel) { private void clearOpWrite(NioSocketChannel channel) {
NioWorker worker = channel.worker; Selector selector = this.selector;
Selector selector = worker.selector;
SelectionKey key = channel.socket.keyFor(selector); SelectionKey key = channel.socket.keyFor(selector);
if (key == null) { if (key == null) {
return; return;
@ -571,14 +574,12 @@ class NioWorker implements Runnable {
} }
} }
static void close(NioSocketChannel channel, ChannelFuture future) { void close(NioSocketChannel channel, ChannelFuture future) {
NioWorker worker = channel.worker;
boolean connected = channel.isConnected(); boolean connected = channel.isConnected();
boolean bound = channel.isBound(); boolean bound = channel.isBound();
try { try {
channel.socket.close(); channel.socket.close();
worker.cancelledKeys ++; cancelledKeys ++;
if (channel.setClosed()) { if (channel.setClosed()) {
future.setSuccess(); future.setSuccess();
@ -600,17 +601,15 @@ class NioWorker implements Runnable {
} }
} }
private static void cleanUpWriteBuffer(NioSocketChannel channel) { private void cleanUpWriteBuffer(NioSocketChannel channel) {
Exception cause = null; Exception cause = null;
boolean fireExceptionCaught = false; boolean fireExceptionCaught = false;
// Clean up the stale messages in the write buffer. // Clean up the stale messages in the write buffer.
synchronized (channel.writeLock) { synchronized (channel.writeLock) {
MessageEvent evt = channel.currentWriteEvent; MessageEvent evt = channel.currentWriteEvent;
ByteBuffer buf = channel.currentWriteBuffer;
if (evt != null) { if (evt != null) {
channel.currentWriteEvent = null;
channel.currentWriteIndex = 0;
// Create the exception only once to avoid the excessive overhead // Create the exception only once to avoid the excessive overhead
// caused by fillStackTrace. // caused by fillStackTrace.
if (channel.isOpen()) { if (channel.isOpen()) {
@ -618,7 +617,17 @@ class NioWorker implements Runnable {
} else { } else {
cause = new ClosedChannelException(); cause = new ClosedChannelException();
} }
evt.getFuture().setFailure(cause);
if (channel.currentWriteBufferIsPooled) {
directBufferPool.release(buf);
}
ChannelFuture future = evt.getFuture();
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
buf = null;
evt = null;
future.setFailure(cause);
fireExceptionCaught = true; fireExceptionCaught = true;
} }
@ -650,15 +659,14 @@ class NioWorker implements Runnable {
} }
} }
static void setInterestOps( void setInterestOps(
NioSocketChannel channel, ChannelFuture future, int interestOps) { NioSocketChannel channel, ChannelFuture future, int interestOps) {
boolean changed = false; boolean changed = false;
try { try {
// interestOps can change at any time and at any thread. // interestOps can change at any time and at any thread.
// Acquire a lock to avoid possible race condition. // Acquire a lock to avoid possible race condition.
synchronized (channel.interestOpsLock) { synchronized (channel.interestOpsLock) {
NioWorker worker = channel.worker; Selector selector = this.selector;
Selector selector = worker.selector;
SelectionKey key = channel.socket.keyFor(selector); SelectionKey key = channel.socket.keyFor(selector);
if (key == null || selector == null) { if (key == null || selector == null) {
@ -676,8 +684,8 @@ class NioWorker implements Runnable {
case 0: case 0:
if (channel.getRawInterestOps() != interestOps) { if (channel.getRawInterestOps() != interestOps) {
key.interestOps(interestOps); key.interestOps(interestOps);
if (Thread.currentThread() != worker.thread && if (Thread.currentThread() != thread &&
worker.wakenUp.compareAndSet(false, true)) { wakenUp.compareAndSet(false, true)) {
selector.wakeup(); selector.wakeup();
} }
changed = true; changed = true;
@ -686,19 +694,19 @@ class NioWorker implements Runnable {
case 1: case 1:
case 2: case 2:
if (channel.getRawInterestOps() != interestOps) { if (channel.getRawInterestOps() != interestOps) {
if (Thread.currentThread() == worker.thread) { if (Thread.currentThread() == thread) {
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true; changed = true;
} else { } else {
worker.selectorGuard.readLock().lock(); selectorGuard.readLock().lock();
try { try {
if (worker.wakenUp.compareAndSet(false, true)) { if (wakenUp.compareAndSet(false, true)) {
selector.wakeup(); selector.wakeup();
} }
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true; changed = true;
} finally { } finally {
worker.selectorGuard.readLock().unlock(); selectorGuard.readLock().unlock();
} }
} }
} }

View File

@ -58,6 +58,10 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
} }
} }
public boolean isDirect() {
return buffer.isDirect();
}
public boolean hasArray() { public boolean hasArray() {
return false; return false;
} }