[#2680] NioSocketChannelOutboundBuffer.nioBuffers() / EpollSocketChannelOutboundBuffer.memoryAddresses() should always return non-null array as stated in javadocs
Motivation: At the moment NioSocketChannelOutboundBuffer.nioBuffers() / EpollSocketChannelOutboundBuffer.memoryAddresses() returns null if something is contained in the ChannelOutboundBuffer which is not a ByteBuf. This is a problem for two reasons: 1 - In the javadocs we state that it will never return null 2 - We may do a not optimal write as there may be things that could be written via gathering writes Modifications: Change NioSocketChannelOutboundBuffer.nioBuffers() / EpollSocketChannelOutboundBuffer.memoryAddresses() to never return null but have it contain all ByteBuffer that were found before the non ByteBuf. This way we can do a gathering write and also conform to the javadocs. Result: Better speed and also correct implementation in terms of the api.
This commit is contained in:
parent
702ebbc19b
commit
faf9ac9a30
@ -89,9 +89,9 @@ final class EpollChannelOutboundBuffer extends ChannelOutboundBuffer {
|
||||
int flushed = flushed();
|
||||
while (flushed != unflushed && (m = buffer[flushed].msg()) != null) {
|
||||
if (!(m instanceof ByteBuf)) {
|
||||
this.addressCount = 0;
|
||||
this.addressSize = 0;
|
||||
return null;
|
||||
// Just break out of the loop as we can still use gathering writes for the buffers that we
|
||||
// found by now.
|
||||
break;
|
||||
}
|
||||
|
||||
AddressEntry entry = (AddressEntry) buffer[flushed];
|
||||
|
@ -138,15 +138,13 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
||||
return done;
|
||||
} else {
|
||||
ByteBuffer[] nioBuffers = buf.nioBuffers();
|
||||
return writeBytesMultiple0(in, 1, nioBuffers, nioBuffers.length, readableBytes);
|
||||
return writeBytesMultiple(in, 1, nioBuffers, nioBuffers.length, readableBytes);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean writeBytesMultiple(
|
||||
EpollChannelOutboundBuffer in, int msgCount, AddressEntry[] addresses) throws IOException {
|
||||
|
||||
int addressCnt = in.addressCount();
|
||||
long expectedWrittenBytes = in.addressSize();
|
||||
EpollChannelOutboundBuffer in, int msgCount, AddressEntry[] addresses,
|
||||
int addressCnt, int expectedWrittenBytes) throws IOException {
|
||||
boolean done = false;
|
||||
long writtenBytes = 0;
|
||||
int offset = 0;
|
||||
@ -191,11 +189,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
||||
}
|
||||
|
||||
private boolean writeBytesMultiple(
|
||||
NioSocketChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers) throws IOException {
|
||||
return writeBytesMultiple0(in, msgCount, nioBuffers, in.nioBufferCount(), in.nioBufferSize());
|
||||
}
|
||||
|
||||
private boolean writeBytesMultiple0(
|
||||
ChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers,
|
||||
int nioBufferCnt, long expectedWrittenBytes) throws IOException {
|
||||
boolean done = false;
|
||||
@ -240,7 +233,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
||||
return done;
|
||||
}
|
||||
|
||||
private void updateOutboundBuffer(ChannelOutboundBuffer in, long writtenBytes, int msgCount,
|
||||
private static void updateOutboundBuffer(ChannelOutboundBuffer in, long writtenBytes, int msgCount,
|
||||
boolean done) {
|
||||
if (done) {
|
||||
// Release all buffers
|
||||
@ -328,8 +321,9 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
||||
EpollChannelOutboundBuffer epollIn = (EpollChannelOutboundBuffer) in;
|
||||
// Ensure the pending writes are made of memoryaddresses only.
|
||||
AddressEntry[] addresses = epollIn.memoryAddresses();
|
||||
if (addresses != null) {
|
||||
if (!writeBytesMultiple(epollIn, msgCount, addresses)) {
|
||||
int addressesCnt = epollIn.addressCount();
|
||||
if (addressesCnt > 1) {
|
||||
if (!writeBytesMultiple(epollIn, msgCount, addresses, addressesCnt, epollIn.addressCount())) {
|
||||
// was not able to write everything so break here we will get notified later again once
|
||||
// the network stack can handle more writes.
|
||||
break;
|
||||
@ -344,8 +338,9 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
||||
NioSocketChannelOutboundBuffer nioIn = (NioSocketChannelOutboundBuffer) in;
|
||||
// Ensure the pending writes are made of memoryaddresses only.
|
||||
ByteBuffer[] nioBuffers = nioIn.nioBuffers();
|
||||
if (nioBuffers != null) {
|
||||
if (!writeBytesMultiple(nioIn, msgCount, nioBuffers)) {
|
||||
int nioBufferCnt = nioIn.nioBufferCount();
|
||||
if (nioBufferCnt > 1) {
|
||||
if (!writeBytesMultiple(nioIn, msgCount, nioBuffers, nioBufferCnt, nioIn.nioBufferSize())) {
|
||||
// was not able to write everything so break here we will get notified later again once
|
||||
// the network stack can handle more writes.
|
||||
break;
|
||||
|
@ -246,15 +246,16 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
|
||||
super.doWrite(in);
|
||||
return;
|
||||
}
|
||||
NioSocketChannelOutboundBuffer nioIn = (NioSocketChannelOutboundBuffer) in;
|
||||
// Ensure the pending writes are made of ByteBufs only.
|
||||
NioSocketChannelOutboundBuffer nioIn = (NioSocketChannelOutboundBuffer) in;
|
||||
ByteBuffer[] nioBuffers = nioIn.nioBuffers();
|
||||
if (nioBuffers == null) {
|
||||
int nioBufferCnt = nioIn.nioBufferCount();
|
||||
if (nioBufferCnt <= 1) {
|
||||
// We have something else beside ByteBuffers to write so fallback to normal writes.
|
||||
super.doWrite(in);
|
||||
return;
|
||||
}
|
||||
|
||||
int nioBufferCnt = nioIn.nioBufferCount();
|
||||
long expectedWrittenBytes = nioIn.nioBufferSize();
|
||||
|
||||
final SocketChannel ch = javaChannel();
|
||||
|
@ -96,9 +96,9 @@ public final class NioSocketChannelOutboundBuffer extends ChannelOutboundBuffer
|
||||
int i = flushed();
|
||||
while (i != unflushed && (m = buffer[i].msg()) != null) {
|
||||
if (!(m instanceof ByteBuf)) {
|
||||
this.nioBufferCount = 0;
|
||||
this.nioBufferSize = 0;
|
||||
return null;
|
||||
// Just break out of the loop as we can still use gathering writes for the buffers that we
|
||||
// found by now.
|
||||
break;
|
||||
}
|
||||
|
||||
NioEntry entry = (NioEntry) buffer[i];
|
||||
|
Loading…
Reference in New Issue
Block a user