Merge pull request #271 from netty/gathering_write_support

Use gathering writes if java version is >= 7 . See #269
This commit is contained in:
Norman Maurer 2012-04-24 11:06:30 -07:00
commit f0f152085a
3 changed files with 218 additions and 0 deletions

View File

@ -26,6 +26,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.jboss.netty.util.internal.DetectionUtil;
/**
* A virtual buffer which shows multiple buffers as a single merged buffer. It
@ -275,6 +277,9 @@ public class CompositeChannelBuffer extends AbstractChannelBuffer {
public int getBytes(int index, GatheringByteChannel out, int length)
throws IOException {
if (DetectionUtil.javaVersion() >= 7) {
return (int) out.write(toByteBuffers(index, length));
}
// XXX Gathering write is not supported because of a known issue.
// See http://bugs.sun.com/view_bug.do?bug_id=6210541
// This issue appeared in 2004 and is still unresolved!?

View File

@ -20,11 +20,14 @@ import java.lang.ref.SoftReference;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.WritableByteChannel;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.CompositeChannelBuffer;
import org.jboss.netty.channel.DefaultFileRegion;
import org.jboss.netty.channel.FileRegion;
import org.jboss.netty.util.internal.DetectionUtil;
final class SocketSendBufferPool {
@ -65,6 +68,9 @@ final class SocketSendBufferPool {
return EMPTY_BUFFER;
}
if (src instanceof CompositeChannelBuffer && DetectionUtil.javaVersion() >= 7) {
return new GatheringSendBuffer(src.toByteBuffers());
}
if (src.isDirect()) {
return new UnpooledSendBuffer(src.toByteBuffer());
}
@ -250,6 +256,80 @@ final class SocketSendBufferPool {
}
}
class GatheringSendBuffer implements SendBuffer {
private final ByteBuffer[] buffers;
private final int last;
private long written;
private final int total;
GatheringSendBuffer(ByteBuffer[] buffers) {
this.buffers = buffers;
this.last = buffers.length - 1;
int total = 0;
for (ByteBuffer buf: buffers) {
total += buf.remaining();
}
this.total = total;
}
public boolean finished() {
return !buffers[last].hasRemaining();
}
public long writtenBytes() {
return written;
}
public long totalBytes() {
return total;
}
public long transferTo(WritableByteChannel ch) throws IOException {
if (ch instanceof GatheringByteChannel) {
long w = ((GatheringByteChannel) ch).write(buffers);
written += w;
return w;
} else {
int send = 0;
for (ByteBuffer buf: buffers) {
if (buf.hasRemaining()) {
int w = ch.write(buf);
if (w == 0) {
break;
} else {
send += w;
}
}
}
written += send;
return send;
}
}
public long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
int send = 0;
for (ByteBuffer buf: buffers) {
if (buf.hasRemaining()) {
int w = ch.send(buf, raddr);
if (w == 0) {
break;
} else {
send += w;
}
}
}
written += send;
return send;
}
public void release() {
// nothing todo
}
}
final class FileSendBuffer implements SendBuffer {
private final FileRegion file;

View File

@ -0,0 +1,133 @@
package org.jboss.netty.channel.socket;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
public class NioGatheringWriteBenchmark {
final static byte[] first = new byte[1024];
final static byte[] second = new byte[1024 * 10];
final static ChannelBuffer firstDirect = ChannelBuffers.directBuffer(first.length);
final static ChannelBuffer secondDirect = ChannelBuffers.directBuffer(second.length);
static {
for (int i = 0; i < first.length; i++) {
first[i] = (byte) i;
}
for (int i = 0; i < second.length; i++) {
second[i] = (byte) i;
}
firstDirect.writeBytes(first);
secondDirect.writeBytes(second);
}
final static ChannelBuffer firstHeap = ChannelBuffers.wrappedBuffer(second);
final static ChannelBuffer secondHeap = ChannelBuffers.wrappedBuffer(second);
public static void main(String args[]) throws IOException {
if (args.length != 2) {
System.err.println("Give argument direct|heap|mixed $rounds");
System.exit(1);
}
final ServerSocket socket = new ServerSocket();
socket.bind(new InetSocketAddress(0));
Thread serverThread = new Thread(new Runnable() {
public void run() {
while(!Thread.interrupted()) {
try {
final Socket acceptedSocket = socket.accept();
new Thread(new Runnable() {
public void run() {
InputStream in = null;
try {
in = acceptedSocket.getInputStream();
int i = 0;
while ((i = in.read()) != -1) {
//System.out.print(i);
}
} catch (IOException e) {
if (in != null) {
try {
in.close();
} catch (IOException e1) {
// ignore
}
}
}
}
}).start();
} catch (IOException e) {
// ignore
}
}
}
});
serverThread.start();
ClientBootstrap cb = new ClientBootstrap(new NioClientSocketChannelFactory());
ChannelFuture future = cb.connect(socket.getLocalSocketAddress());
assertTrue(future.awaitUninterruptibly().isSuccess());
Channel channel = future.getChannel();
ChannelFuture f = null;
long start = System.currentTimeMillis();
ChannelBuffer firstBuf;
ChannelBuffer secondBuf;
String type = args[0];
if (type.equalsIgnoreCase("direct")) {
firstBuf = firstDirect;
secondBuf = secondDirect;
} else if (type.equalsIgnoreCase("heap")) {
firstBuf = firstHeap;
secondBuf = secondHeap;
} else if (type.equalsIgnoreCase("mixed")) {
firstBuf = firstDirect;
secondBuf = secondHeap;
} else {
throw new IllegalArgumentException("Use direct|heap|mixed as arguments");
}
int rounds = Integer.parseInt(args[1]);
for (int i = 0; i < rounds; i++) {
f = channel.write(ChannelBuffers.wrappedBuffer(firstBuf.duplicate(), secondBuf.duplicate()));
}
assertTrue(f.awaitUninterruptibly().isSuccess());
long stop = System.currentTimeMillis() - start;
ChannelFuture cf = channel.close();
assertTrue(cf.awaitUninterruptibly().isSuccess());
socket.close();
serverThread.interrupt();
System.out.println("Execute " + rounds + " in " + stop + "ms");
}
}