Applied send buffer pool to nio datagram transport

This commit is contained in:
Trustin Lee 2010-02-19 03:28:11 +00:00
parent 32ef9e3e52
commit d9de1675d0
2 changed files with 17 additions and 6 deletions

View File

@ -22,7 +22,6 @@ import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.NetworkInterface; import java.net.NetworkInterface;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel; import java.nio.channels.DatagramChannel;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -38,6 +37,7 @@ import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink; import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.socket.DatagramChannelConfig; import org.jboss.netty.channel.socket.DatagramChannelConfig;
import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
import org.jboss.netty.util.internal.LinkedTransferQueue; import org.jboss.netty.util.internal.LinkedTransferQueue;
import org.jboss.netty.util.internal.ThreadLocalBoolean; import org.jboss.netty.util.internal.ThreadLocalBoolean;
@ -108,7 +108,7 @@ class NioDatagramChannel extends AbstractChannel
* The current write {@link MessageEvent} * The current write {@link MessageEvent}
*/ */
MessageEvent currentWriteEvent; MessageEvent currentWriteEvent;
ByteBuffer currentWriteBuffer; SendBuffer currentWriteBuffer;
/** /**
* Boolean that indicates that write operation is in progress. * Boolean that indicates that write operation is in progress.

View File

@ -43,6 +43,7 @@ import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.ReceiveBufferSizePredictor; import org.jboss.netty.channel.ReceiveBufferSizePredictor;
import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.ThreadRenamingRunnable;
@ -127,6 +128,8 @@ class NioDatagramWorker implements Runnable {
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
/** /**
* Sole constructor. * Sole constructor.
* *
@ -499,6 +502,7 @@ class NioDatagramWorker implements Runnable {
int writtenBytes = 0; int writtenBytes = 0;
final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
final DatagramChannel ch = channel.getDatagramChannel(); final DatagramChannel ch = channel.getDatagramChannel();
final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue; final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
final int writeSpinCount = channel.getConfig().getWriteSpinCount(); final int writeSpinCount = channel.getConfig().getWriteSpinCount();
@ -509,7 +513,8 @@ class NioDatagramWorker implements Runnable {
// loop forever... // loop forever...
for (;;) { for (;;) {
MessageEvent evt = channel.currentWriteEvent; MessageEvent evt = channel.currentWriteEvent;
ByteBuffer buf; SendBuffer buf;
ByteBuffer bb;
if (evt == null) { if (evt == null) {
if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) { if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
removeOpWrite = true; removeOpWrite = true;
@ -518,9 +523,11 @@ class NioDatagramWorker implements Runnable {
} }
ChannelBuffer origBuf = (ChannelBuffer) evt.getMessage(); ChannelBuffer origBuf = (ChannelBuffer) evt.getMessage();
channel.currentWriteBuffer = buf = origBuf.toByteBuffer(); channel.currentWriteBuffer = buf = sendBufferPool.acquire(origBuf);
bb = buf.buffer;
} else { } else {
buf = channel.currentWriteBuffer; buf = channel.currentWriteBuffer;
bb = buf.buffer;
} }
try { try {
@ -528,7 +535,7 @@ class NioDatagramWorker implements Runnable {
SocketAddress raddr = evt.getRemoteAddress(); SocketAddress raddr = evt.getRemoteAddress();
if (raddr == null) { if (raddr == null) {
for (int i = writeSpinCount; i > 0; i --) { for (int i = writeSpinCount; i > 0; i --) {
localWrittenBytes = ch.write(buf); localWrittenBytes = ch.write(bb);
if (localWrittenBytes != 0) { if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes; writtenBytes += localWrittenBytes;
break; break;
@ -536,7 +543,7 @@ class NioDatagramWorker implements Runnable {
} }
} else { } else {
for (int i = writeSpinCount; i > 0; i --) { for (int i = writeSpinCount; i > 0; i --) {
localWrittenBytes = ch.send(buf, raddr); localWrittenBytes = ch.send(bb, raddr);
if (localWrittenBytes != 0) { if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes; writtenBytes += localWrittenBytes;
break; break;
@ -546,11 +553,13 @@ class NioDatagramWorker implements Runnable {
if (localWrittenBytes > 0) { if (localWrittenBytes > 0) {
// Successful write - proceed to the next message. // Successful write - proceed to the next message.
buf.release();
ChannelFuture future = evt.getFuture(); ChannelFuture future = evt.getFuture();
channel.currentWriteEvent = null; channel.currentWriteEvent = null;
channel.currentWriteBuffer = null; channel.currentWriteBuffer = null;
evt = null; evt = null;
buf = null; buf = null;
bb = null;
future.setSuccess(); future.setSuccess();
} else { } else {
// Not written at all - perhaps the kernel buffer is full. // Not written at all - perhaps the kernel buffer is full.
@ -561,11 +570,13 @@ class NioDatagramWorker implements Runnable {
} catch (final AsynchronousCloseException e) { } catch (final AsynchronousCloseException e) {
// Doesn't need a user attention - ignore. // Doesn't need a user attention - ignore.
} catch (final Throwable t) { } catch (final Throwable t) {
buf.release();
ChannelFuture future = evt.getFuture(); ChannelFuture future = evt.getFuture();
channel.currentWriteEvent = null; channel.currentWriteEvent = null;
channel.currentWriteBuffer = null; channel.currentWriteBuffer = null;
buf = null; buf = null;
evt = null; evt = null;
bb = null;
future.setFailure(t); future.setFailure(t);
fireExceptionCaught(channel, t); fireExceptionCaught(channel, t);
} }