Merge pull request #506 from jfallows/master
Add support for AIO scattering reads and gathering writes (see #492)
This commit is contained in:
commit
b5aa2108ec
@ -726,6 +726,11 @@ public abstract class AbstractByteBuf implements ByteBuf {
|
||||
return nioBuffer(readerIndex, readableBytes());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer[] nioBuffers() {
|
||||
return nioBuffers(readerIndex, readableBytes());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString(Charset charset) {
|
||||
return toString(readerIndex, readableBytes(), charset);
|
||||
|
@ -1701,6 +1701,39 @@ public interface ByteBuf extends ChannelBuf, Comparable<ByteBuf> {
|
||||
*/
|
||||
ByteBuffer nioBuffer(int index, int length);
|
||||
|
||||
/**
|
||||
* Returns {@code true} if and only if {@link #nioBuffers()} method will not fail.
|
||||
*/
|
||||
boolean hasNioBuffers();
|
||||
|
||||
/**
|
||||
* Exposes this buffer's readable bytes as an NIO {@link ByteBuffer}'s. The returned buffer
|
||||
* shares the content with this buffer, while changing the position and limit of the returned
|
||||
* NIO buffer does not affect the indexes and marks of this buffer. This method does not
|
||||
* modify {@code readerIndex} or {@code writerIndex} of this buffer. Please note that the
|
||||
* returned NIO buffer will not see the changes of this buffer if this buffer is a dynamic
|
||||
* buffer and it adjusted its capacity.
|
||||
*
|
||||
*
|
||||
* @throws UnsupportedOperationException
|
||||
* if this buffer cannot create a {@link ByteBuffer} that shares the content with itself
|
||||
*/
|
||||
ByteBuffer[] nioBuffers();
|
||||
|
||||
/**
|
||||
* Exposes this buffer's bytes as an NIO {@link ByteBuffer}'s for the specified offset and length
|
||||
* The returned buffer shares the content with this buffer, while changing the position and limit
|
||||
* of the returned NIO buffer does not affect the indexes and marks of this buffer. This method does
|
||||
* not modify {@code readerIndex} or {@code writerIndex} of this buffer. Please note that the
|
||||
* returned NIO buffer will not see the changes of this buffer if this buffer is a dynamic
|
||||
* buffer and it adjusted its capacity.
|
||||
*
|
||||
*
|
||||
* @throws UnsupportedOperationException
|
||||
* if this buffer cannot create a {@link ByteBuffer} that shares the content with itself
|
||||
*/
|
||||
ByteBuffer[] nioBuffers(int offset, int length);
|
||||
|
||||
/**
|
||||
* Returns {@code true} if and only if this buffer has a backing byte array.
|
||||
* If this method returns true, you can safely call {@link #array()} and
|
||||
@ -1801,6 +1834,13 @@ public interface ByteBuf extends ChannelBuf, Comparable<ByteBuf> {
|
||||
*/
|
||||
ByteBuffer nioBuffer();
|
||||
|
||||
/**
|
||||
* Returns the internal NIO buffer array that is reused for I/O.
|
||||
*
|
||||
* @throws UnsupportedOperationException if the buffer has no internal NIO buffer array
|
||||
*/
|
||||
ByteBuffer[] nioBuffers();
|
||||
|
||||
/**
|
||||
* Returns a new buffer whose type is identical to the callee.
|
||||
*
|
||||
|
@ -15,7 +15,6 @@
|
||||
*/
|
||||
package io.netty.buffer;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
|
||||
public interface CompositeByteBuf extends ByteBuf, Iterable<ByteBuf> {
|
||||
@ -48,32 +47,4 @@ public interface CompositeByteBuf extends ByteBuf, Iterable<ByteBuf> {
|
||||
* Same with {@link #slice(int, int)} except that this method returns a list.
|
||||
*/
|
||||
List<ByteBuf> decompose(int offset, int length);
|
||||
|
||||
/**
|
||||
* Exposes this buffer's readable bytes as an NIO {@link ByteBuffer}'s. The returned buffer
|
||||
* shares the content with this buffer, while changing the position and limit of the returned
|
||||
* NIO buffer does not affect the indexes and marks of this buffer. This method does not
|
||||
* modify {@code readerIndex} or {@code writerIndex} of this buffer. Please note that the
|
||||
* returned NIO buffer will not see the changes of this buffer if this buffer is a dynamic
|
||||
* buffer and it adjusted its capacity.
|
||||
*
|
||||
*
|
||||
* @throws UnsupportedOperationException
|
||||
* if this buffer cannot create a {@link ByteBuffer} that shares the content with itself
|
||||
*/
|
||||
ByteBuffer[] nioBuffers();
|
||||
|
||||
/**
|
||||
* Exposes this buffer's bytes as an NIO {@link ByteBuffer}'s for the specified offset and length
|
||||
* The returned buffer shares the content with this buffer, while changing the position and limit
|
||||
* of the returned NIO buffer does not affect the indexes and marks of this buffer. This method does
|
||||
* not modify {@code readerIndex} or {@code writerIndex} of this buffer. Please note that the
|
||||
* returned NIO buffer will not see the changes of this buffer if this buffer is a dynamic
|
||||
* buffer and it adjusted its capacity.
|
||||
*
|
||||
*
|
||||
* @throws UnsupportedOperationException
|
||||
* if this buffer cannot create a {@link ByteBuffer} that shares the content with itself
|
||||
*/
|
||||
ByteBuffer[] nioBuffers(int offset, int length);
|
||||
}
|
||||
|
@ -1001,6 +1001,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer nioBuffer(int index, int length) {
|
||||
if (components.size() == 1) {
|
||||
@ -1023,6 +1024,11 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
|
||||
return merged;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNioBuffers() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer[] nioBuffers(int index, int length) {
|
||||
int componentId = toComponentIndex(index);
|
||||
@ -1206,6 +1212,16 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer[] nioBuffers() {
|
||||
ByteBuffer[] nioBuffers = new ByteBuffer[components.size()];
|
||||
int index = 0;
|
||||
for (Component component : components) {
|
||||
nioBuffers[index++] = component.buf.unsafe().nioBuffer();
|
||||
}
|
||||
return nioBuffers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf newBuffer(int initialCapacity) {
|
||||
CompositeByteBuf buf = new DefaultCompositeByteBuf(maxNumComponents);
|
||||
|
@ -380,6 +380,16 @@ public class DirectByteBuf extends AbstractByteBuf {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNioBuffers() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer[] nioBuffers(int offset, int length) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf copy(int index, int length) {
|
||||
ByteBuffer src;
|
||||
@ -408,6 +418,11 @@ public class DirectByteBuf extends AbstractByteBuf {
|
||||
return tmpBuf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer[] nioBuffers() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf newBuffer(int initialCapacity) {
|
||||
return new DirectByteBuf(initialCapacity, Math.max(initialCapacity, maxCapacity()));
|
||||
|
@ -211,6 +211,16 @@ public class DuplicatedByteBuf extends AbstractByteBuf implements WrappedByteBuf
|
||||
return buffer.nioBuffer(index, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNioBuffers() {
|
||||
return buffer.hasNioBuffers();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer[] nioBuffers(int offset, int length) {
|
||||
return buffer.nioBuffers(offset, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Unsafe unsafe() {
|
||||
return unsafe;
|
||||
@ -223,6 +233,11 @@ public class DuplicatedByteBuf extends AbstractByteBuf implements WrappedByteBuf
|
||||
return buffer.unsafe().nioBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer[] nioBuffers() {
|
||||
return buffer.unsafe().nioBuffers();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf newBuffer(int initialCapacity) {
|
||||
return buffer.unsafe().newBuffer(initialCapacity);
|
||||
|
@ -209,6 +209,16 @@ public class HeapByteBuf extends AbstractByteBuf {
|
||||
return ByteBuffer.wrap(array, index, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNioBuffers() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer[] nioBuffers(int offset, int length) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public short getShort(int index) {
|
||||
return (short) (array[index] << 8 | array[index + 1] & 0xFF);
|
||||
@ -297,6 +307,11 @@ public class HeapByteBuf extends AbstractByteBuf {
|
||||
return nioBuf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer[] nioBuffers() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf newBuffer(int initialCapacity) {
|
||||
return new HeapByteBuf(initialCapacity, Math.max(initialCapacity, maxCapacity()));
|
||||
|
@ -203,6 +203,16 @@ public class ReadOnlyByteBuf extends AbstractByteBuf implements WrappedByteBuf {
|
||||
return buffer.nioBuffer(index, length).asReadOnlyBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNioBuffers() {
|
||||
return buffer.hasNioBuffers();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer[] nioBuffers(int offset, int length) {
|
||||
return buffer.nioBuffers(offset, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int capacity() {
|
||||
return buffer.capacity();
|
||||
|
@ -257,6 +257,17 @@ public class SlicedByteBuf extends AbstractByteBuf implements WrappedByteBuf {
|
||||
return buffer.nioBuffer(index + adjustment, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNioBuffers() {
|
||||
return buffer.hasNioBuffers();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer[] nioBuffers(int index, int length) {
|
||||
checkIndex(index, length);
|
||||
return buffer.nioBuffers(index, length);
|
||||
}
|
||||
|
||||
private void checkIndex(int index) {
|
||||
if (index < 0 || index >= capacity()) {
|
||||
throw new IndexOutOfBoundsException("Invalid index: " + index
|
||||
@ -290,6 +301,11 @@ public class SlicedByteBuf extends AbstractByteBuf implements WrappedByteBuf {
|
||||
return buffer.nioBuffer(adjustment, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer[] nioBuffers() {
|
||||
return buffer.nioBuffers(adjustment, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf newBuffer(int initialCapacity) {
|
||||
return buffer.unsafe().newBuffer(initialCapacity);
|
||||
|
@ -657,6 +657,29 @@ public class SwappedByteBuf implements WrappedByteBuf {
|
||||
return buf.nioBuffer(index, length).order(order);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNioBuffers() {
|
||||
return buf.hasNioBuffers();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer[] nioBuffers() {
|
||||
ByteBuffer[] nioBuffers = buf.nioBuffers();
|
||||
for (int i = 0; i < nioBuffers.length; i++) {
|
||||
nioBuffers[i] = nioBuffers[i].order(order);
|
||||
}
|
||||
return nioBuffers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer[] nioBuffers(int offset, int length) {
|
||||
ByteBuffer[] nioBuffers = buf.nioBuffers(offset, length);
|
||||
for (int i = 0; i < nioBuffers.length; i++) {
|
||||
nioBuffers[i] = nioBuffers[i].order(order);
|
||||
}
|
||||
return nioBuffers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasArray() {
|
||||
return buf.hasArray();
|
||||
|
@ -672,6 +672,22 @@ class ReplayingDecoderBuffer implements ByteBuf {
|
||||
return buffer.nioBuffer(index, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNioBuffers() {
|
||||
return buffer.hasNioBuffers();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer[] nioBuffers() {
|
||||
throw new UnreplayableOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer[] nioBuffers(int index, int length) {
|
||||
checkIndex(index, length);
|
||||
return buffer.nioBuffers(index, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString(int index, int length, Charset charset) {
|
||||
checkIndex(index, length);
|
||||
|
@ -15,6 +15,7 @@
|
||||
*/
|
||||
package io.netty.channel.socket.aio;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ChannelBufType;
|
||||
import io.netty.channel.ChannelException;
|
||||
@ -39,8 +40,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
||||
private static final ChannelMetadata METADATA = new ChannelMetadata(ChannelBufType.BYTE, false);
|
||||
|
||||
private static final CompletionHandler<Void, AioSocketChannel> CONNECT_HANDLER = new ConnectHandler();
|
||||
private static final CompletionHandler<Integer, AioSocketChannel> WRITE_HANDLER = new WriteHandler();
|
||||
private static final CompletionHandler<Integer, AioSocketChannel> READ_HANDLER = new ReadHandler();
|
||||
private static final CompletionHandler<Integer, AioSocketChannel> WRITE_HANDLER = new WriteHandler<Integer>();
|
||||
private static final CompletionHandler<Integer, AioSocketChannel> READ_HANDLER = new ReadHandler<Integer>();
|
||||
private static final CompletionHandler<Long, AioSocketChannel> GATHERING_WRITE_HANDLER = new WriteHandler<Long>();
|
||||
private static final CompletionHandler<Long, AioSocketChannel> SCATTERING_READ_HANDLER = new ReadHandler<Long>();
|
||||
|
||||
private static AsynchronousSocketChannel newSocket(AsynchronousChannelGroup group) {
|
||||
try {
|
||||
@ -180,7 +183,13 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
||||
buf.discardReadBytes();
|
||||
|
||||
if (buf.readable()) {
|
||||
if (buf.hasNioBuffers()) {
|
||||
ByteBuffer[] buffers = buf.nioBuffers(buf.readerIndex(), buf.readableBytes());
|
||||
javaChannel().write(buffers, 0, buffers.length, 0L, SECONDS, AioSocketChannel.this,
|
||||
GATHERING_WRITE_HANDLER);
|
||||
} else {
|
||||
javaChannel().write(buf.nioBuffer(), this, WRITE_HANDLER);
|
||||
}
|
||||
} else {
|
||||
notifyFlushFutures();
|
||||
flushing = false;
|
||||
@ -204,17 +213,23 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
||||
expandReadBuffer(byteBuf);
|
||||
}
|
||||
|
||||
if (byteBuf.hasNioBuffers()) {
|
||||
ByteBuffer[] buffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.writableBytes());
|
||||
javaChannel().read(buffers, 0, buffers.length, 0L, SECONDS, AioSocketChannel.this,
|
||||
SCATTERING_READ_HANDLER);
|
||||
} else {
|
||||
// Get a ByteBuffer view on the ByteBuf
|
||||
ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes());
|
||||
javaChannel().read(buffer, AioSocketChannel.this, READ_HANDLER);
|
||||
}
|
||||
}
|
||||
|
||||
private static final class WriteHandler extends AioCompletionHandler<Integer, AioSocketChannel> {
|
||||
private static final class WriteHandler<T extends Number> extends AioCompletionHandler<T, AioSocketChannel> {
|
||||
|
||||
@Override
|
||||
protected void completed0(Integer result, AioSocketChannel channel) {
|
||||
protected void completed0(T result, AioSocketChannel channel) {
|
||||
ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer();
|
||||
int writtenBytes = result;
|
||||
int writtenBytes = result.intValue();
|
||||
if (writtenBytes > 0) {
|
||||
// Update the readerIndex with the amount of read bytes
|
||||
buf.readerIndex(buf.readerIndex() + writtenBytes);
|
||||
@ -263,10 +278,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
||||
}
|
||||
}
|
||||
|
||||
private static final class ReadHandler extends AioCompletionHandler<Integer, AioSocketChannel> {
|
||||
private static final class ReadHandler<T extends Number> extends AioCompletionHandler<T, AioSocketChannel> {
|
||||
|
||||
@Override
|
||||
protected void completed0(Integer result, AioSocketChannel channel) {
|
||||
protected void completed0(T result, AioSocketChannel channel) {
|
||||
final ChannelPipeline pipeline = channel.pipeline();
|
||||
final ByteBuf byteBuf = pipeline.inboundByteBuffer();
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user