Add support for AIO scattering reads and gathering writes.

o Add ByteBuf.hasNioBuffers() method
 o Promote CompositeByteBuf.nioBuffers() methods to ByteBuf
 o Use ByteBuf.nioBuffers() methods from AioSocketChannel
This commit is contained in:
John Fallows 2012-08-12 17:29:07 -07:00
parent 1162f26df5
commit 06fd869711
12 changed files with 197 additions and 40 deletions

View File

@ -726,6 +726,11 @@ public abstract class AbstractByteBuf implements ByteBuf {
return nioBuffer(readerIndex, readableBytes());
}
@Override
public ByteBuffer[] nioBuffers() {
return nioBuffers(readerIndex, readableBytes());
}
@Override
public String toString(Charset charset) {
return toString(readerIndex, readableBytes(), charset);

View File

@ -1701,6 +1701,39 @@ public interface ByteBuf extends ChannelBuf, Comparable<ByteBuf> {
*/
ByteBuffer nioBuffer(int index, int length);
/**
* Returns {@code true} if and only if {@link #nioBuffers()} method will not fail.
*/
boolean hasNioBuffers();
/**
* Exposes this buffer's readable bytes as an NIO {@link ByteBuffer}'s. The returned buffer
* shares the content with this buffer, while changing the position and limit of the returned
* NIO buffer does not affect the indexes and marks of this buffer. This method does not
* modify {@code readerIndex} or {@code writerIndex} of this buffer. Please note that the
* returned NIO buffer will not see the changes of this buffer if this buffer is a dynamic
* buffer and it adjusted its capacity.
*
*
* @throws UnsupportedOperationException
* if this buffer cannot create a {@link ByteBuffer} that shares the content with itself
*/
ByteBuffer[] nioBuffers();
/**
* Exposes this buffer's bytes as an NIO {@link ByteBuffer}'s for the specified offset and length
* The returned buffer shares the content with this buffer, while changing the position and limit
* of the returned NIO buffer does not affect the indexes and marks of this buffer. This method does
* not modify {@code readerIndex} or {@code writerIndex} of this buffer. Please note that the
* returned NIO buffer will not see the changes of this buffer if this buffer is a dynamic
* buffer and it adjusted its capacity.
*
*
* @throws UnsupportedOperationException
* if this buffer cannot create a {@link ByteBuffer} that shares the content with itself
*/
ByteBuffer[] nioBuffers(int offset, int length);
/**
* Returns {@code true} if and only if this buffer has a backing byte array.
* If this method returns true, you can safely call {@link #array()} and
@ -1801,6 +1834,13 @@ public interface ByteBuf extends ChannelBuf, Comparable<ByteBuf> {
*/
ByteBuffer nioBuffer();
/**
* Returns the internal NIO buffer array that is reused for I/O.
*
* @throws UnsupportedOperationException if the buffer has no internal NIO buffer array
*/
ByteBuffer[] nioBuffers();
/**
* Returns a new buffer whose type is identical to the callee.
*

View File

@ -15,7 +15,6 @@
*/
package io.netty.buffer;
import java.nio.ByteBuffer;
import java.util.List;
public interface CompositeByteBuf extends ByteBuf, Iterable<ByteBuf> {
@ -48,32 +47,4 @@ public interface CompositeByteBuf extends ByteBuf, Iterable<ByteBuf> {
* Same with {@link #slice(int, int)} except that this method returns a list.
*/
List<ByteBuf> decompose(int offset, int length);
/**
* Exposes this buffer's readable bytes as an NIO {@link ByteBuffer}'s. The returned buffer
* shares the content with this buffer, while changing the position and limit of the returned
* NIO buffer does not affect the indexes and marks of this buffer. This method does not
* modify {@code readerIndex} or {@code writerIndex} of this buffer. Please note that the
* returned NIO buffer will not see the changes of this buffer if this buffer is a dynamic
* buffer and it adjusted its capacity.
*
*
* @throws UnsupportedOperationException
* if this buffer cannot create a {@link ByteBuffer} that shares the content with itself
*/
ByteBuffer[] nioBuffers();
/**
* Exposes this buffer's bytes as an NIO {@link ByteBuffer}'s for the specified offset and length
* The returned buffer shares the content with this buffer, while changing the position and limit
* of the returned NIO buffer does not affect the indexes and marks of this buffer. This method does
* not modify {@code readerIndex} or {@code writerIndex} of this buffer. Please note that the
* returned NIO buffer will not see the changes of this buffer if this buffer is a dynamic
* buffer and it adjusted its capacity.
*
*
* @throws UnsupportedOperationException
* if this buffer cannot create a {@link ByteBuffer} that shares the content with itself
*/
ByteBuffer[] nioBuffers(int offset, int length);
}

View File

@ -1001,6 +1001,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
}
return false;
}
@Override
public ByteBuffer nioBuffer(int index, int length) {
if (components.size() == 1) {
@ -1023,6 +1024,11 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
return merged;
}
@Override
public boolean hasNioBuffers() {
return true;
}
@Override
public ByteBuffer[] nioBuffers(int index, int length) {
int componentId = toComponentIndex(index);
@ -1206,6 +1212,16 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
throw new UnsupportedOperationException();
}
@Override
public ByteBuffer[] nioBuffers() {
ByteBuffer[] nioBuffers = new ByteBuffer[components.size()];
int index = 0;
for (Component component : components) {
nioBuffers[index++] = component.buf.unsafe().nioBuffer();
}
return nioBuffers;
}
@Override
public ByteBuf newBuffer(int initialCapacity) {
CompositeByteBuf buf = new DefaultCompositeByteBuf(maxNumComponents);

View File

@ -380,6 +380,16 @@ public class DirectByteBuf extends AbstractByteBuf {
}
}
@Override
public boolean hasNioBuffers() {
return false;
}
@Override
public ByteBuffer[] nioBuffers(int offset, int length) {
throw new UnsupportedOperationException();
}
@Override
public ByteBuf copy(int index, int length) {
ByteBuffer src;
@ -408,6 +418,11 @@ public class DirectByteBuf extends AbstractByteBuf {
return tmpBuf;
}
@Override
public ByteBuffer[] nioBuffers() {
throw new UnsupportedOperationException();
}
@Override
public ByteBuf newBuffer(int initialCapacity) {
return new DirectByteBuf(initialCapacity, Math.max(initialCapacity, maxCapacity()));

View File

@ -211,6 +211,16 @@ public class DuplicatedByteBuf extends AbstractByteBuf implements WrappedByteBuf
return buffer.nioBuffer(index, length);
}
@Override
public boolean hasNioBuffers() {
return buffer.hasNioBuffers();
}
@Override
public ByteBuffer[] nioBuffers(int offset, int length) {
return buffer.nioBuffers(offset, length);
}
@Override
public Unsafe unsafe() {
return unsafe;
@ -223,6 +233,11 @@ public class DuplicatedByteBuf extends AbstractByteBuf implements WrappedByteBuf
return buffer.unsafe().nioBuffer();
}
@Override
public ByteBuffer[] nioBuffers() {
return buffer.unsafe().nioBuffers();
}
@Override
public ByteBuf newBuffer(int initialCapacity) {
return buffer.unsafe().newBuffer(initialCapacity);

View File

@ -209,6 +209,16 @@ public class HeapByteBuf extends AbstractByteBuf {
return ByteBuffer.wrap(array, index, length);
}
@Override
public boolean hasNioBuffers() {
return false;
}
@Override
public ByteBuffer[] nioBuffers(int offset, int length) {
throw new UnsupportedOperationException();
}
@Override
public short getShort(int index) {
return (short) (array[index] << 8 | array[index + 1] & 0xFF);
@ -297,6 +307,11 @@ public class HeapByteBuf extends AbstractByteBuf {
return nioBuf;
}
@Override
public ByteBuffer[] nioBuffers() {
throw new UnsupportedOperationException();
}
@Override
public ByteBuf newBuffer(int initialCapacity) {
return new HeapByteBuf(initialCapacity, Math.max(initialCapacity, maxCapacity()));

View File

@ -203,6 +203,16 @@ public class ReadOnlyByteBuf extends AbstractByteBuf implements WrappedByteBuf {
return buffer.nioBuffer(index, length).asReadOnlyBuffer();
}
@Override
public boolean hasNioBuffers() {
return buffer.hasNioBuffers();
}
@Override
public ByteBuffer[] nioBuffers(int offset, int length) {
return buffer.nioBuffers(offset, length);
}
@Override
public int capacity() {
return buffer.capacity();

View File

@ -257,6 +257,17 @@ public class SlicedByteBuf extends AbstractByteBuf implements WrappedByteBuf {
return buffer.nioBuffer(index + adjustment, length);
}
@Override
public boolean hasNioBuffers() {
return buffer.hasNioBuffers();
}
@Override
public ByteBuffer[] nioBuffers(int index, int length) {
checkIndex(index, length);
return buffer.nioBuffers(index, length);
}
private void checkIndex(int index) {
if (index < 0 || index >= capacity()) {
throw new IndexOutOfBoundsException("Invalid index: " + index
@ -290,6 +301,11 @@ public class SlicedByteBuf extends AbstractByteBuf implements WrappedByteBuf {
return buffer.nioBuffer(adjustment, length);
}
@Override
public ByteBuffer[] nioBuffers() {
return buffer.nioBuffers(adjustment, length);
}
@Override
public ByteBuf newBuffer(int initialCapacity) {
return buffer.unsafe().newBuffer(initialCapacity);

View File

@ -657,6 +657,29 @@ public class SwappedByteBuf implements WrappedByteBuf {
return buf.nioBuffer(index, length).order(order);
}
@Override
public boolean hasNioBuffers() {
return buf.hasNioBuffers();
}
@Override
public ByteBuffer[] nioBuffers() {
ByteBuffer[] nioBuffers = buf.nioBuffers();
for (int i = 0; i < nioBuffers.length; i++) {
nioBuffers[i] = nioBuffers[i].order(order);
}
return nioBuffers;
}
@Override
public ByteBuffer[] nioBuffers(int offset, int length) {
ByteBuffer[] nioBuffers = buf.nioBuffers(offset, length);
for (int i = 0; i < nioBuffers.length; i++) {
nioBuffers[i] = nioBuffers[i].order(order);
}
return nioBuffers;
}
@Override
public boolean hasArray() {
return buf.hasArray();

View File

@ -672,6 +672,22 @@ class ReplayingDecoderBuffer implements ByteBuf {
return buffer.nioBuffer(index, length);
}
@Override
public boolean hasNioBuffers() {
return buffer.hasNioBuffers();
}
@Override
public ByteBuffer[] nioBuffers() {
throw new UnreplayableOperationException();
}
@Override
public ByteBuffer[] nioBuffers(int index, int length) {
checkIndex(index, length);
return buffer.nioBuffers(index, length);
}
@Override
public String toString(int index, int length, Charset charset) {
checkIndex(index, length);

View File

@ -15,6 +15,7 @@
*/
package io.netty.channel.socket.aio;
import static java.util.concurrent.TimeUnit.SECONDS;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ChannelBufType;
import io.netty.channel.ChannelException;
@ -39,8 +40,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
private static final ChannelMetadata METADATA = new ChannelMetadata(ChannelBufType.BYTE, false);
private static final CompletionHandler<Void, AioSocketChannel> CONNECT_HANDLER = new ConnectHandler();
private static final CompletionHandler<Integer, AioSocketChannel> WRITE_HANDLER = new WriteHandler();
private static final CompletionHandler<Integer, AioSocketChannel> READ_HANDLER = new ReadHandler();
private static final CompletionHandler<Integer, AioSocketChannel> WRITE_HANDLER = new WriteHandler<Integer>();
private static final CompletionHandler<Integer, AioSocketChannel> READ_HANDLER = new ReadHandler<Integer>();
private static final CompletionHandler<Long, AioSocketChannel> GATHERING_WRITE_HANDLER = new WriteHandler<Long>();
private static final CompletionHandler<Long, AioSocketChannel> SCATTERING_READ_HANDLER = new ReadHandler<Long>();
private static AsynchronousSocketChannel newSocket(AsynchronousChannelGroup group) {
try {
@ -180,7 +183,13 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
buf.discardReadBytes();
if (buf.readable()) {
javaChannel().write(buf.nioBuffer(), this, WRITE_HANDLER);
if (buf.hasNioBuffers()) {
ByteBuffer[] buffers = buf.nioBuffers(buf.readerIndex(), buf.readableBytes());
javaChannel().write(buffers, 0, buffers.length, 0L, SECONDS, AioSocketChannel.this,
GATHERING_WRITE_HANDLER);
} else {
javaChannel().write(buf.nioBuffer(), this, WRITE_HANDLER);
}
} else {
notifyFlushFutures();
flushing = false;
@ -204,17 +213,23 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
expandReadBuffer(byteBuf);
}
// Get a ByteBuffer view on the ByteBuf
ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes());
javaChannel().read(buffer, AioSocketChannel.this, READ_HANDLER);
if (byteBuf.hasNioBuffers()) {
ByteBuffer[] buffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.writableBytes());
javaChannel().read(buffers, 0, buffers.length, 0L, SECONDS, AioSocketChannel.this,
SCATTERING_READ_HANDLER);
} else {
// Get a ByteBuffer view on the ByteBuf
ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes());
javaChannel().read(buffer, AioSocketChannel.this, READ_HANDLER);
}
}
private static final class WriteHandler extends AioCompletionHandler<Integer, AioSocketChannel> {
private static final class WriteHandler<T extends Number> extends AioCompletionHandler<T, AioSocketChannel> {
@Override
protected void completed0(Integer result, AioSocketChannel channel) {
protected void completed0(T result, AioSocketChannel channel) {
ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer();
int writtenBytes = result;
int writtenBytes = result.intValue();
if (writtenBytes > 0) {
// Update the readerIndex with the amount of read bytes
buf.readerIndex(buf.readerIndex() + writtenBytes);
@ -263,10 +278,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
}
}
private static final class ReadHandler extends AioCompletionHandler<Integer, AioSocketChannel> {
private static final class ReadHandler<T extends Number> extends AioCompletionHandler<T, AioSocketChannel> {
@Override
protected void completed0(Integer result, AioSocketChannel channel) {
protected void completed0(T result, AioSocketChannel channel) {
final ChannelPipeline pipeline = channel.pipeline();
final ByteBuf byteBuf = pipeline.inboundByteBuffer();