From cbb8654193352a4dd23c0f2da31854fb83919659 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 18 Apr 2012 21:49:28 +0200 Subject: [PATCH 1/4] Use gathering writes if java version >= 7 and the ChannelBuffer is an instanceof CompositeChannelBuffer. See #267 --- .../socket/nio/SocketSendBufferPool.java | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/SocketSendBufferPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/SocketSendBufferPool.java index 14f5f96c41..6fec6306bc 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/SocketSendBufferPool.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/SocketSendBufferPool.java @@ -20,11 +20,14 @@ import java.lang.ref.SoftReference; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; +import java.nio.channels.GatheringByteChannel; import java.nio.channels.WritableByteChannel; import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.CompositeChannelBuffer; import org.jboss.netty.channel.DefaultFileRegion; import org.jboss.netty.channel.FileRegion; +import org.jboss.netty.util.internal.DetectionUtil; final class SocketSendBufferPool { @@ -65,6 +68,9 @@ final class SocketSendBufferPool { return EMPTY_BUFFER; } + if (src instanceof CompositeChannelBuffer && DetectionUtil.javaVersion() >= 7) { + return new GatheringSendBuffer(src.toByteBuffers()); + } if (src.isDirect()) { return new UnpooledSendBuffer(src.toByteBuffer()); } @@ -249,6 +255,80 @@ final class SocketSendBufferPool { } } } + + class GatheringSendBuffer implements SendBuffer { + + private final ByteBuffer[] buffers; + private final int last; + private long written; + private final int total; + + GatheringSendBuffer(ByteBuffer[] buffers) { + this.buffers = buffers; + this.last = buffers.length - 1; + int total = 0; + for (ByteBuffer buf: buffers) { + total += buf.remaining(); + } + this.total = total; + } + + public boolean finished() { + return !buffers[last].hasRemaining(); + } + + public long writtenBytes() { + return written; + } + + public long totalBytes() { + return total; + } + + public long transferTo(WritableByteChannel ch) throws IOException { + if (ch instanceof GatheringByteChannel) { + long w = ((GatheringByteChannel) ch).write(buffers); + written += w; + return w; + } else { + int send = 0; + for (ByteBuffer buf: buffers) { + if (buf.hasRemaining()) { + int w = ch.write(buf); + if (w == 0) { + break; + } else { + send += w; + } + } + } + written += send; + return send; + } + } + + public long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException { + int send = 0; + for (ByteBuffer buf: buffers) { + if (buf.hasRemaining()) { + int w = ch.send(buf, raddr); + if (w == 0) { + break; + } else { + send += w; + } + } + } + written += send; + + return send; + } + + public void release() { + // nothing todo + } + + } final class FileSendBuffer implements SendBuffer { From c5ab2f5dad7e9eb4d91a8473bdf43d31df6f1bd7 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 19 Apr 2012 13:12:28 +0200 Subject: [PATCH 2/4] Use gathering writes in CompositeChannelBuffer if jdk >= 7. See #267 --- .../java/org/jboss/netty/buffer/CompositeChannelBuffer.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/org/jboss/netty/buffer/CompositeChannelBuffer.java b/src/main/java/org/jboss/netty/buffer/CompositeChannelBuffer.java index fc72fd7ea4..66d1cd0da9 100644 --- a/src/main/java/org/jboss/netty/buffer/CompositeChannelBuffer.java +++ b/src/main/java/org/jboss/netty/buffer/CompositeChannelBuffer.java @@ -26,6 +26,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.jboss.netty.util.internal.DetectionUtil; + /** * A virtual buffer which shows multiple buffers as a single merged buffer. It @@ -275,6 +277,9 @@ public class CompositeChannelBuffer extends AbstractChannelBuffer { public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { + if (DetectionUtil.javaVersion() >= 7) { + return (int) out.write(toByteBuffers(index, length)); + } // XXX Gathering write is not supported because of a known issue. // See http://bugs.sun.com/view_bug.do?bug_id=6210541 // This issue appeared in 2004 and is still unresolved!? From 510692d7e74075bebc75205fda359db3e02076c4 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 19 Apr 2012 17:45:41 +0200 Subject: [PATCH 3/4] Add benchmark for gathering writes. See #269 --- .../socket/nio/SocketSendBufferPool.java | 2 +- .../socket/NioGatheringWriteBenchmark.java | 133 ++++++++++++++++++ 2 files changed, 134 insertions(+), 1 deletion(-) create mode 100644 src/test/java/org/jboss/netty/channel/socket/NioGatheringWriteBenchmark.java diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/SocketSendBufferPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/SocketSendBufferPool.java index 6fec6306bc..2d7c3a0ef1 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/SocketSendBufferPool.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/SocketSendBufferPool.java @@ -69,7 +69,7 @@ final class SocketSendBufferPool { } if (src instanceof CompositeChannelBuffer && DetectionUtil.javaVersion() >= 7) { - return new GatheringSendBuffer(src.toByteBuffers()); + //return new GatheringSendBuffer(src.toByteBuffers()); } if (src.isDirect()) { return new UnpooledSendBuffer(src.toByteBuffer()); diff --git a/src/test/java/org/jboss/netty/channel/socket/NioGatheringWriteBenchmark.java b/src/test/java/org/jboss/netty/channel/socket/NioGatheringWriteBenchmark.java new file mode 100644 index 0000000000..409a045791 --- /dev/null +++ b/src/test/java/org/jboss/netty/channel/socket/NioGatheringWriteBenchmark.java @@ -0,0 +1,133 @@ +package org.jboss.netty.channel.socket; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; + +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; + +public class NioGatheringWriteBenchmark { + + final static byte[] first = new byte[1024]; + final static byte[] second = new byte[1024 * 10]; + + final static ChannelBuffer firstDirect = ChannelBuffers.directBuffer(first.length); + final static ChannelBuffer secondDirect = ChannelBuffers.directBuffer(second.length); + + + + static { + for (int i = 0; i < first.length; i++) { + first[i] = (byte) i; + } + + for (int i = 0; i < second.length; i++) { + second[i] = (byte) i; + } + + firstDirect.writeBytes(first); + secondDirect.writeBytes(second); + } + + final static ChannelBuffer firstHeap = ChannelBuffers.wrappedBuffer(second); + final static ChannelBuffer secondHeap = ChannelBuffers.wrappedBuffer(second); + + public static void main(String args[]) throws IOException { + if (args.length != 2) { + System.err.println("Give argument direct|heap|mixed $rounds"); + System.exit(1); + + } + final ServerSocket socket = new ServerSocket(); + socket.bind(new InetSocketAddress(0)); + + Thread serverThread = new Thread(new Runnable() { + public void run() { + while(!Thread.interrupted()) { + try { + final Socket acceptedSocket = socket.accept(); + new Thread(new Runnable() { + + public void run() { + InputStream in = null; + try { + in = acceptedSocket.getInputStream(); + int i = 0; + + while ((i = in.read()) != -1) { + //System.out.print(i); + } + } catch (IOException e) { + if (in != null) { + try { + in.close(); + } catch (IOException e1) { + // ignore + } + } + } + } + }).start(); + } catch (IOException e) { + // ignore + } + + + } + } + }); + serverThread.start(); + + ClientBootstrap cb = new ClientBootstrap(new NioClientSocketChannelFactory()); + ChannelFuture future = cb.connect(socket.getLocalSocketAddress()); + assertTrue(future.awaitUninterruptibly().isSuccess()); + Channel channel = future.getChannel(); + + + ChannelFuture f = null; + long start = System.currentTimeMillis(); + + ChannelBuffer firstBuf; + ChannelBuffer secondBuf; + + String type = args[0]; + if (type.equalsIgnoreCase("direct")) { + firstBuf = firstDirect; + secondBuf = secondDirect; + } else if (type.equalsIgnoreCase("heap")) { + firstBuf = firstHeap; + secondBuf = secondHeap; + } else if (type.equalsIgnoreCase("mixed")) { + firstBuf = firstDirect; + secondBuf = secondHeap; + } else { + throw new IllegalArgumentException("Use direct|heap|mixed as arguments"); + } + + int rounds = Integer.parseInt(args[1]); + + for (int i = 0; i < rounds; i++) { + f = channel.write(ChannelBuffers.wrappedBuffer(firstBuf.duplicate(), secondBuf.duplicate())); + } + assertTrue(f.awaitUninterruptibly().isSuccess()); + long stop = System.currentTimeMillis() - start; + ChannelFuture cf = channel.close(); + assertTrue(cf.awaitUninterruptibly().isSuccess()); + socket.close(); + serverThread.interrupt(); + + System.out.println("Execute " + rounds + " in " + stop + "ms"); + + + } + +} From 670c4fa42fb46c16f21d87b56825bf558ff2bfde Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 19 Apr 2012 17:52:35 +0200 Subject: [PATCH 4/4] Enable gathering writes by default. See #269 --- .../jboss/netty/channel/socket/nio/SocketSendBufferPool.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/SocketSendBufferPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/SocketSendBufferPool.java index 2d7c3a0ef1..6fec6306bc 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/SocketSendBufferPool.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/SocketSendBufferPool.java @@ -69,7 +69,7 @@ final class SocketSendBufferPool { } if (src instanceof CompositeChannelBuffer && DetectionUtil.javaVersion() >= 7) { - //return new GatheringSendBuffer(src.toByteBuffers()); + return new GatheringSendBuffer(src.toByteBuffers()); } if (src.isDirect()) { return new UnpooledSendBuffer(src.toByteBuffer());