diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOutboundBuffer.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOutboundBuffer.java index f15ec3c381..009ace0c8a 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOutboundBuffer.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOutboundBuffer.java @@ -89,9 +89,9 @@ final class EpollChannelOutboundBuffer extends ChannelOutboundBuffer { int flushed = flushed(); while (flushed != unflushed && (m = buffer[flushed].msg()) != null) { if (!(m instanceof ByteBuf)) { - this.addressCount = 0; - this.addressSize = 0; - return null; + // Just break out of the loop as we can still use gathering writes for the buffers that we + // found by now. + break; } AddressEntry entry = (AddressEntry) buffer[flushed]; diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index 45f37ede61..a06d869278 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -138,15 +138,13 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So return done; } else { ByteBuffer[] nioBuffers = buf.nioBuffers(); - return writeBytesMultiple0(in, 1, nioBuffers, nioBuffers.length, readableBytes); + return writeBytesMultiple(in, 1, nioBuffers, nioBuffers.length, readableBytes); } } private boolean writeBytesMultiple( - EpollChannelOutboundBuffer in, int msgCount, AddressEntry[] addresses) throws IOException { - - int addressCnt = in.addressCount(); - long expectedWrittenBytes = in.addressSize(); + EpollChannelOutboundBuffer in, int msgCount, AddressEntry[] addresses, + int addressCnt, int expectedWrittenBytes) throws IOException { boolean done = false; long writtenBytes = 0; int offset = 0; @@ -191,11 +189,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So } private boolean writeBytesMultiple( - NioSocketChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers) throws IOException { - return writeBytesMultiple0(in, msgCount, nioBuffers, in.nioBufferCount(), in.nioBufferSize()); - } - - private boolean writeBytesMultiple0( ChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes) throws IOException { boolean done = false; @@ -240,7 +233,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So return done; } - private void updateOutboundBuffer(ChannelOutboundBuffer in, long writtenBytes, int msgCount, + private static void updateOutboundBuffer(ChannelOutboundBuffer in, long writtenBytes, int msgCount, boolean done) { if (done) { // Release all buffers @@ -328,8 +321,9 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So EpollChannelOutboundBuffer epollIn = (EpollChannelOutboundBuffer) in; // Ensure the pending writes are made of memoryaddresses only. AddressEntry[] addresses = epollIn.memoryAddresses(); - if (addresses != null) { - if (!writeBytesMultiple(epollIn, msgCount, addresses)) { + int addressesCnt = epollIn.addressCount(); + if (addressesCnt > 1) { + if (!writeBytesMultiple(epollIn, msgCount, addresses, addressesCnt, epollIn.addressCount())) { // was not able to write everything so break here we will get notified later again once // the network stack can handle more writes. break; @@ -344,8 +338,9 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So NioSocketChannelOutboundBuffer nioIn = (NioSocketChannelOutboundBuffer) in; // Ensure the pending writes are made of memoryaddresses only. ByteBuffer[] nioBuffers = nioIn.nioBuffers(); - if (nioBuffers != null) { - if (!writeBytesMultiple(nioIn, msgCount, nioBuffers)) { + int nioBufferCnt = nioIn.nioBufferCount(); + if (nioBufferCnt > 1) { + if (!writeBytesMultiple(nioIn, msgCount, nioBuffers, nioBufferCnt, nioIn.nioBufferSize())) { // was not able to write everything so break here we will get notified later again once // the network stack can handle more writes. break; diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index f5d547081e..fabcdefbb3 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -246,15 +246,16 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty super.doWrite(in); return; } - NioSocketChannelOutboundBuffer nioIn = (NioSocketChannelOutboundBuffer) in; // Ensure the pending writes are made of ByteBufs only. + NioSocketChannelOutboundBuffer nioIn = (NioSocketChannelOutboundBuffer) in; ByteBuffer[] nioBuffers = nioIn.nioBuffers(); - if (nioBuffers == null) { + int nioBufferCnt = nioIn.nioBufferCount(); + if (nioBufferCnt <= 1) { + // We have something else beside ByteBuffers to write so fallback to normal writes. super.doWrite(in); return; } - int nioBufferCnt = nioIn.nioBufferCount(); long expectedWrittenBytes = nioIn.nioBufferSize(); final SocketChannel ch = javaChannel(); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannelOutboundBuffer.java index 31ff82f6c9..bf43e9f1a5 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannelOutboundBuffer.java @@ -96,9 +96,9 @@ public final class NioSocketChannelOutboundBuffer extends ChannelOutboundBuffer int i = flushed(); while (i != unflushed && (m = buffer[i].msg()) != null) { if (!(m instanceof ByteBuf)) { - this.nioBufferCount = 0; - this.nioBufferSize = 0; - return null; + // Just break out of the loop as we can still use gathering writes for the buffers that we + // found by now. + break; } NioEntry entry = (NioEntry) buffer[i];