Revert "Set readPending to false when ever a read is done"

This reverts commit 413c7c2cd8 as it introduced an regression when edge-triggered mode is used which is true for our native transports by default. With 413c7c2cd8 included it was possible that we set readPending to false by mistake even if we would be interested in read more.
This commit is contained in:
Norman Maurer 2017-11-06 09:21:42 -08:00
parent e0bbff74f7
commit bcad9dbf97
4 changed files with 10 additions and 13 deletions

View File

@ -727,6 +727,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
EpollRecvByteAllocatorHandle allocHandle) { EpollRecvByteAllocatorHandle allocHandle) {
if (byteBuf != null) { if (byteBuf != null) {
if (byteBuf.isReadable()) { if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf); pipeline.fireChannelRead(byteBuf);
} else { } else {
byteBuf.release(); byteBuf.release();
@ -783,7 +784,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
// we use a direct buffer here as the native implementations only be able // we use a direct buffer here as the native implementations only be able
// to handle direct buffers. // to handle direct buffers.
byteBuf = allocHandle.allocate(allocator); byteBuf = allocHandle.allocate(allocator);
readPending = false;
allocHandle.lastBytesRead(doReadBytes(byteBuf)); allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) { if (allocHandle.lastBytesRead() <= 0) {
// nothing was read, release the buffer. // nothing was read, release the buffer.
@ -793,6 +793,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
break; break;
} }
allocHandle.incMessagesRead(1); allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf); pipeline.fireChannelRead(byteBuf);
byteBuf = null; byteBuf = null;

View File

@ -526,7 +526,6 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
// we use a direct buffer here as the native implementations only be able // we use a direct buffer here as the native implementations only be able
// to handle direct buffers. // to handle direct buffers.
byteBuf = allocHandle.allocate(allocator); byteBuf = allocHandle.allocate(allocator);
readPending = false;
allocHandle.lastBytesRead(doReadBytes(byteBuf)); allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) { if (allocHandle.lastBytesRead() <= 0) {
// nothing was read, release the buffer. // nothing was read, release the buffer.
@ -536,6 +535,7 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
break; break;
} }
allocHandle.incMessagesRead(1); allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf); pipeline.fireChannelRead(byteBuf);
byteBuf = null; byteBuf = null;
@ -572,6 +572,7 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
KQueueRecvByteAllocatorHandle allocHandle) { KQueueRecvByteAllocatorHandle allocHandle) {
if (byteBuf != null) { if (byteBuf != null) {
if (byteBuf.isReadable()) { if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf); pipeline.fireChannelRead(byteBuf);
} else { } else {
byteBuf.release(); byteBuf.release();

View File

@ -126,16 +126,6 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
byteBuf.release(); byteBuf.release();
byteBuf = null; byteBuf = null;
close = allocHandle.lastBytesRead() < 0; close = allocHandle.lastBytesRead() < 0;
if (close) {
// Based upon the Javadocs it is possible that NIO may have spurious wake ups [1]. In this
// case we should be more cautious and only set readPending to false if data was actually
// read.
// [1] https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SelectionKey.html
// That a selection key's ready set indicates that its channel is ready for some operation
// category is a hint, but not a guarantee, that an operation in such a category may be
// performed by a thread without causing the thread to block.
readPending = false;
}
break; break;
} }

View File

@ -80,6 +80,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
RecvByteBufAllocator.Handle allocHandle) { RecvByteBufAllocator.Handle allocHandle) {
if (byteBuf != null) { if (byteBuf != null) {
if (byteBuf.isReadable()) { if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf); pipeline.fireChannelRead(byteBuf);
} else { } else {
byteBuf.release(); byteBuf.release();
@ -101,6 +102,9 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
// during the same read loop readPending was set to false. // during the same read loop readPending was set to false.
return; return;
} }
// In OIO we should set readPending to false even if the read was not successful so we can schedule
// another read on the event loop if no reads are done.
readPending = false;
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator(); final ByteBufAllocator allocator = config.getAllocator();
@ -113,7 +117,6 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
try { try {
byteBuf = allocHandle.allocate(allocator); byteBuf = allocHandle.allocate(allocator);
do { do {
readPending = false;
allocHandle.lastBytesRead(doReadBytes(byteBuf)); allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) { if (allocHandle.lastBytesRead() <= 0) {
if (!byteBuf.isReadable()) { // nothing was read. release the buffer. if (!byteBuf.isReadable()) { // nothing was read. release the buffer.
@ -137,6 +140,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
final int maxCapacity = byteBuf.maxCapacity(); final int maxCapacity = byteBuf.maxCapacity();
if (capacity == maxCapacity) { if (capacity == maxCapacity) {
allocHandle.incMessagesRead(1); allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf); pipeline.fireChannelRead(byteBuf);
byteBuf = allocHandle.allocate(allocator); byteBuf = allocHandle.allocate(allocator);
} else { } else {
@ -154,6 +158,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
// It is possible we allocated a buffer because the previous one was not writable, but then didn't use // It is possible we allocated a buffer because the previous one was not writable, but then didn't use
// it because allocHandle.continueReading() returned false. // it because allocHandle.continueReading() returned false.
if (byteBuf.isReadable()) { if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf); pipeline.fireChannelRead(byteBuf);
} else { } else {
byteBuf.release(); byteBuf.release();