diff --git a/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java b/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java index d046f06dca..7438ce45a3 100644 --- a/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java @@ -726,6 +726,11 @@ public abstract class AbstractByteBuf implements ByteBuf { return nioBuffer(readerIndex, readableBytes()); } + @Override + public ByteBuffer[] nioBuffers() { + return nioBuffers(readerIndex, readableBytes()); + } + @Override public String toString(Charset charset) { return toString(readerIndex, readableBytes(), charset); diff --git a/buffer/src/main/java/io/netty/buffer/ByteBuf.java b/buffer/src/main/java/io/netty/buffer/ByteBuf.java index 76a68e881e..09d19c4a59 100644 --- a/buffer/src/main/java/io/netty/buffer/ByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/ByteBuf.java @@ -1701,6 +1701,39 @@ public interface ByteBuf extends ChannelBuf, Comparable { */ ByteBuffer nioBuffer(int index, int length); + /** + * Returns {@code true} if and only if {@link #nioBuffers()} method will not fail. + */ + boolean hasNioBuffers(); + + /** + * Exposes this buffer's readable bytes as an NIO {@link ByteBuffer}'s. The returned buffer + * shares the content with this buffer, while changing the position and limit of the returned + * NIO buffer does not affect the indexes and marks of this buffer. This method does not + * modify {@code readerIndex} or {@code writerIndex} of this buffer. Please note that the + * returned NIO buffer will not see the changes of this buffer if this buffer is a dynamic + * buffer and it adjusted its capacity. + * + * + * @throws UnsupportedOperationException + * if this buffer cannot create a {@link ByteBuffer} that shares the content with itself + */ + ByteBuffer[] nioBuffers(); + + /** + * Exposes this buffer's bytes as an NIO {@link ByteBuffer}'s for the specified offset and length + * The returned buffer shares the content with this buffer, while changing the position and limit + * of the returned NIO buffer does not affect the indexes and marks of this buffer. This method does + * not modify {@code readerIndex} or {@code writerIndex} of this buffer. Please note that the + * returned NIO buffer will not see the changes of this buffer if this buffer is a dynamic + * buffer and it adjusted its capacity. + * + * + * @throws UnsupportedOperationException + * if this buffer cannot create a {@link ByteBuffer} that shares the content with itself + */ + ByteBuffer[] nioBuffers(int offset, int length); + /** * Returns {@code true} if and only if this buffer has a backing byte array. * If this method returns true, you can safely call {@link #array()} and @@ -1801,6 +1834,13 @@ public interface ByteBuf extends ChannelBuf, Comparable { */ ByteBuffer nioBuffer(); + /** + * Returns the internal NIO buffer array that is reused for I/O. + * + * @throws UnsupportedOperationException if the buffer has no internal NIO buffer array + */ + ByteBuffer[] nioBuffers(); + /** * Returns a new buffer whose type is identical to the callee. * diff --git a/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java b/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java index e9f44435e2..6c5a184c7b 100644 --- a/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java @@ -15,7 +15,6 @@ */ package io.netty.buffer; -import java.nio.ByteBuffer; import java.util.List; public interface CompositeByteBuf extends ByteBuf, Iterable { @@ -48,32 +47,4 @@ public interface CompositeByteBuf extends ByteBuf, Iterable { * Same with {@link #slice(int, int)} except that this method returns a list. */ List decompose(int offset, int length); - - /** - * Exposes this buffer's readable bytes as an NIO {@link ByteBuffer}'s. The returned buffer - * shares the content with this buffer, while changing the position and limit of the returned - * NIO buffer does not affect the indexes and marks of this buffer. This method does not - * modify {@code readerIndex} or {@code writerIndex} of this buffer. Please note that the - * returned NIO buffer will not see the changes of this buffer if this buffer is a dynamic - * buffer and it adjusted its capacity. - * - * - * @throws UnsupportedOperationException - * if this buffer cannot create a {@link ByteBuffer} that shares the content with itself - */ - ByteBuffer[] nioBuffers(); - - /** - * Exposes this buffer's bytes as an NIO {@link ByteBuffer}'s for the specified offset and length - * The returned buffer shares the content with this buffer, while changing the position and limit - * of the returned NIO buffer does not affect the indexes and marks of this buffer. This method does - * not modify {@code readerIndex} or {@code writerIndex} of this buffer. Please note that the - * returned NIO buffer will not see the changes of this buffer if this buffer is a dynamic - * buffer and it adjusted its capacity. - * - * - * @throws UnsupportedOperationException - * if this buffer cannot create a {@link ByteBuffer} that shares the content with itself - */ - ByteBuffer[] nioBuffers(int offset, int length); } diff --git a/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java b/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java index cfbc99d8b8..4074111e43 100644 --- a/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java @@ -1001,6 +1001,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit } return false; } + @Override public ByteBuffer nioBuffer(int index, int length) { if (components.size() == 1) { @@ -1023,6 +1024,11 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit return merged; } + @Override + public boolean hasNioBuffers() { + return true; + } + @Override public ByteBuffer[] nioBuffers(int index, int length) { int componentId = toComponentIndex(index); @@ -1206,6 +1212,16 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit throw new UnsupportedOperationException(); } + @Override + public ByteBuffer[] nioBuffers() { + ByteBuffer[] nioBuffers = new ByteBuffer[components.size()]; + int index = 0; + for (Component component : components) { + nioBuffers[index++] = component.buf.unsafe().nioBuffer(); + } + return nioBuffers; + } + @Override public ByteBuf newBuffer(int initialCapacity) { CompositeByteBuf buf = new DefaultCompositeByteBuf(maxNumComponents); diff --git a/buffer/src/main/java/io/netty/buffer/DirectByteBuf.java b/buffer/src/main/java/io/netty/buffer/DirectByteBuf.java index 07f9802b03..229baf111f 100644 --- a/buffer/src/main/java/io/netty/buffer/DirectByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/DirectByteBuf.java @@ -380,6 +380,16 @@ public class DirectByteBuf extends AbstractByteBuf { } } + @Override + public boolean hasNioBuffers() { + return false; + } + + @Override + public ByteBuffer[] nioBuffers(int offset, int length) { + throw new UnsupportedOperationException(); + } + @Override public ByteBuf copy(int index, int length) { ByteBuffer src; @@ -408,6 +418,11 @@ public class DirectByteBuf extends AbstractByteBuf { return tmpBuf; } + @Override + public ByteBuffer[] nioBuffers() { + throw new UnsupportedOperationException(); + } + @Override public ByteBuf newBuffer(int initialCapacity) { return new DirectByteBuf(initialCapacity, Math.max(initialCapacity, maxCapacity())); diff --git a/buffer/src/main/java/io/netty/buffer/DuplicatedByteBuf.java b/buffer/src/main/java/io/netty/buffer/DuplicatedByteBuf.java index 7cf17084d0..cb3f1f2b48 100644 --- a/buffer/src/main/java/io/netty/buffer/DuplicatedByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/DuplicatedByteBuf.java @@ -211,6 +211,16 @@ public class DuplicatedByteBuf extends AbstractByteBuf implements WrappedByteBuf return buffer.nioBuffer(index, length); } + @Override + public boolean hasNioBuffers() { + return buffer.hasNioBuffers(); + } + + @Override + public ByteBuffer[] nioBuffers(int offset, int length) { + return buffer.nioBuffers(offset, length); + } + @Override public Unsafe unsafe() { return unsafe; @@ -223,6 +233,11 @@ public class DuplicatedByteBuf extends AbstractByteBuf implements WrappedByteBuf return buffer.unsafe().nioBuffer(); } + @Override + public ByteBuffer[] nioBuffers() { + return buffer.unsafe().nioBuffers(); + } + @Override public ByteBuf newBuffer(int initialCapacity) { return buffer.unsafe().newBuffer(initialCapacity); diff --git a/buffer/src/main/java/io/netty/buffer/HeapByteBuf.java b/buffer/src/main/java/io/netty/buffer/HeapByteBuf.java index 5b19e2185c..00cdbecbcc 100644 --- a/buffer/src/main/java/io/netty/buffer/HeapByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/HeapByteBuf.java @@ -209,6 +209,16 @@ public class HeapByteBuf extends AbstractByteBuf { return ByteBuffer.wrap(array, index, length); } + @Override + public boolean hasNioBuffers() { + return false; + } + + @Override + public ByteBuffer[] nioBuffers(int offset, int length) { + throw new UnsupportedOperationException(); + } + @Override public short getShort(int index) { return (short) (array[index] << 8 | array[index + 1] & 0xFF); @@ -297,6 +307,11 @@ public class HeapByteBuf extends AbstractByteBuf { return nioBuf; } + @Override + public ByteBuffer[] nioBuffers() { + throw new UnsupportedOperationException(); + } + @Override public ByteBuf newBuffer(int initialCapacity) { return new HeapByteBuf(initialCapacity, Math.max(initialCapacity, maxCapacity())); diff --git a/buffer/src/main/java/io/netty/buffer/ReadOnlyByteBuf.java b/buffer/src/main/java/io/netty/buffer/ReadOnlyByteBuf.java index ba69c906f2..774570d569 100644 --- a/buffer/src/main/java/io/netty/buffer/ReadOnlyByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/ReadOnlyByteBuf.java @@ -203,6 +203,16 @@ public class ReadOnlyByteBuf extends AbstractByteBuf implements WrappedByteBuf { return buffer.nioBuffer(index, length).asReadOnlyBuffer(); } + @Override + public boolean hasNioBuffers() { + return buffer.hasNioBuffers(); + } + + @Override + public ByteBuffer[] nioBuffers(int offset, int length) { + return buffer.nioBuffers(offset, length); + } + @Override public int capacity() { return buffer.capacity(); diff --git a/buffer/src/main/java/io/netty/buffer/SlicedByteBuf.java b/buffer/src/main/java/io/netty/buffer/SlicedByteBuf.java index 4a11e53566..c23b5cff56 100644 --- a/buffer/src/main/java/io/netty/buffer/SlicedByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/SlicedByteBuf.java @@ -257,6 +257,17 @@ public class SlicedByteBuf extends AbstractByteBuf implements WrappedByteBuf { return buffer.nioBuffer(index + adjustment, length); } + @Override + public boolean hasNioBuffers() { + return buffer.hasNioBuffers(); + } + + @Override + public ByteBuffer[] nioBuffers(int index, int length) { + checkIndex(index, length); + return buffer.nioBuffers(index, length); + } + private void checkIndex(int index) { if (index < 0 || index >= capacity()) { throw new IndexOutOfBoundsException("Invalid index: " + index @@ -290,6 +301,11 @@ public class SlicedByteBuf extends AbstractByteBuf implements WrappedByteBuf { return buffer.nioBuffer(adjustment, length); } + @Override + public ByteBuffer[] nioBuffers() { + return buffer.nioBuffers(adjustment, length); + } + @Override public ByteBuf newBuffer(int initialCapacity) { return buffer.unsafe().newBuffer(initialCapacity); diff --git a/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java b/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java index 5ae917261f..cb3810116a 100644 --- a/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java @@ -657,6 +657,29 @@ public class SwappedByteBuf implements WrappedByteBuf { return buf.nioBuffer(index, length).order(order); } + @Override + public boolean hasNioBuffers() { + return buf.hasNioBuffers(); + } + + @Override + public ByteBuffer[] nioBuffers() { + ByteBuffer[] nioBuffers = buf.nioBuffers(); + for (int i = 0; i < nioBuffers.length; i++) { + nioBuffers[i] = nioBuffers[i].order(order); + } + return nioBuffers; + } + + @Override + public ByteBuffer[] nioBuffers(int offset, int length) { + ByteBuffer[] nioBuffers = buf.nioBuffers(offset, length); + for (int i = 0; i < nioBuffers.length; i++) { + nioBuffers[i] = nioBuffers[i].order(order); + } + return nioBuffers; + } + @Override public boolean hasArray() { return buf.hasArray(); diff --git a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java index 0e3bf4d99b..d4ce5dc931 100644 --- a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java @@ -672,6 +672,22 @@ class ReplayingDecoderBuffer implements ByteBuf { return buffer.nioBuffer(index, length); } + @Override + public boolean hasNioBuffers() { + return buffer.hasNioBuffers(); + } + + @Override + public ByteBuffer[] nioBuffers() { + throw new UnreplayableOperationException(); + } + + @Override + public ByteBuffer[] nioBuffers(int index, int length) { + checkIndex(index, length); + return buffer.nioBuffers(index, length); + } + @Override public String toString(int index, int length, Charset charset) { checkIndex(index, length); diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java index 883b020a26..88eb87c3d3 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java @@ -15,6 +15,7 @@ */ package io.netty.channel.socket.aio; +import static java.util.concurrent.TimeUnit.SECONDS; import io.netty.buffer.ByteBuf; import io.netty.buffer.ChannelBufType; import io.netty.channel.ChannelException; @@ -39,8 +40,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne private static final ChannelMetadata METADATA = new ChannelMetadata(ChannelBufType.BYTE, false); private static final CompletionHandler CONNECT_HANDLER = new ConnectHandler(); - private static final CompletionHandler WRITE_HANDLER = new WriteHandler(); - private static final CompletionHandler READ_HANDLER = new ReadHandler(); + private static final CompletionHandler WRITE_HANDLER = new WriteHandler(); + private static final CompletionHandler READ_HANDLER = new ReadHandler(); + private static final CompletionHandler GATHERING_WRITE_HANDLER = new WriteHandler(); + private static final CompletionHandler SCATTERING_READ_HANDLER = new ReadHandler(); private static AsynchronousSocketChannel newSocket(AsynchronousChannelGroup group) { try { @@ -180,7 +183,13 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne buf.discardReadBytes(); if (buf.readable()) { - javaChannel().write(buf.nioBuffer(), this, WRITE_HANDLER); + if (buf.hasNioBuffers()) { + ByteBuffer[] buffers = buf.nioBuffers(buf.readerIndex(), buf.readableBytes()); + javaChannel().write(buffers, 0, buffers.length, 0L, SECONDS, AioSocketChannel.this, + GATHERING_WRITE_HANDLER); + } else { + javaChannel().write(buf.nioBuffer(), this, WRITE_HANDLER); + } } else { notifyFlushFutures(); flushing = false; @@ -204,17 +213,23 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne expandReadBuffer(byteBuf); } - // Get a ByteBuffer view on the ByteBuf - ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes()); - javaChannel().read(buffer, AioSocketChannel.this, READ_HANDLER); + if (byteBuf.hasNioBuffers()) { + ByteBuffer[] buffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.writableBytes()); + javaChannel().read(buffers, 0, buffers.length, 0L, SECONDS, AioSocketChannel.this, + SCATTERING_READ_HANDLER); + } else { + // Get a ByteBuffer view on the ByteBuf + ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes()); + javaChannel().read(buffer, AioSocketChannel.this, READ_HANDLER); + } } - private static final class WriteHandler extends AioCompletionHandler { + private static final class WriteHandler extends AioCompletionHandler { @Override - protected void completed0(Integer result, AioSocketChannel channel) { + protected void completed0(T result, AioSocketChannel channel) { ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer(); - int writtenBytes = result; + int writtenBytes = result.intValue(); if (writtenBytes > 0) { // Update the readerIndex with the amount of read bytes buf.readerIndex(buf.readerIndex() + writtenBytes); @@ -263,10 +278,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne } } - private static final class ReadHandler extends AioCompletionHandler { + private static final class ReadHandler extends AioCompletionHandler { @Override - protected void completed0(Integer result, AioSocketChannel channel) { + protected void completed0(T result, AioSocketChannel channel) { final ChannelPipeline pipeline = channel.pipeline(); final ByteBuf byteBuf = pipeline.inboundByteBuffer();