Set readPending to false when ever a read is done
Motivation: readPending is currently only set to false if data is delivered to the application, however this may result in duplicate events being received from the selector in the event that the socket was closed. Modifications: - We should set readPending to false before each read attempt for all transports besides NIO. - Based upon the Javadocs it is possible that NIO may have spurious wakeups [1]. In this case we should be more cautious and only set readPending to false if data was actually read. [1] https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SelectionKey.html That a selection key's ready set indicates that its channel is ready for some operation category is a hint, but not a guarantee, that an operation in such a category may be performed by a thread without causing the thread to block. Result: Notification from the selector (or simulated events from kqueue/epoll ET) in the event of socket closure. Fixes https://github.com/netty/netty/issues/7255
This commit is contained in:
parent
424bb09d24
commit
413c7c2cd8
@ -727,7 +727,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
|
|||||||
EpollRecvByteAllocatorHandle allocHandle) {
|
EpollRecvByteAllocatorHandle allocHandle) {
|
||||||
if (byteBuf != null) {
|
if (byteBuf != null) {
|
||||||
if (byteBuf.isReadable()) {
|
if (byteBuf.isReadable()) {
|
||||||
readPending = false;
|
|
||||||
pipeline.fireChannelRead(byteBuf);
|
pipeline.fireChannelRead(byteBuf);
|
||||||
} else {
|
} else {
|
||||||
byteBuf.release();
|
byteBuf.release();
|
||||||
@ -784,6 +783,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
|
|||||||
// we use a direct buffer here as the native implementations only be able
|
// we use a direct buffer here as the native implementations only be able
|
||||||
// to handle direct buffers.
|
// to handle direct buffers.
|
||||||
byteBuf = allocHandle.allocate(allocator);
|
byteBuf = allocHandle.allocate(allocator);
|
||||||
|
readPending = false;
|
||||||
allocHandle.lastBytesRead(doReadBytes(byteBuf));
|
allocHandle.lastBytesRead(doReadBytes(byteBuf));
|
||||||
if (allocHandle.lastBytesRead() <= 0) {
|
if (allocHandle.lastBytesRead() <= 0) {
|
||||||
// nothing was read, release the buffer.
|
// nothing was read, release the buffer.
|
||||||
@ -793,7 +793,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
allocHandle.incMessagesRead(1);
|
allocHandle.incMessagesRead(1);
|
||||||
readPending = false;
|
|
||||||
pipeline.fireChannelRead(byteBuf);
|
pipeline.fireChannelRead(byteBuf);
|
||||||
byteBuf = null;
|
byteBuf = null;
|
||||||
|
|
||||||
|
@ -526,6 +526,7 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
|
|||||||
// we use a direct buffer here as the native implementations only be able
|
// we use a direct buffer here as the native implementations only be able
|
||||||
// to handle direct buffers.
|
// to handle direct buffers.
|
||||||
byteBuf = allocHandle.allocate(allocator);
|
byteBuf = allocHandle.allocate(allocator);
|
||||||
|
readPending = false;
|
||||||
allocHandle.lastBytesRead(doReadBytes(byteBuf));
|
allocHandle.lastBytesRead(doReadBytes(byteBuf));
|
||||||
if (allocHandle.lastBytesRead() <= 0) {
|
if (allocHandle.lastBytesRead() <= 0) {
|
||||||
// nothing was read, release the buffer.
|
// nothing was read, release the buffer.
|
||||||
@ -535,7 +536,6 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
allocHandle.incMessagesRead(1);
|
allocHandle.incMessagesRead(1);
|
||||||
readPending = false;
|
|
||||||
pipeline.fireChannelRead(byteBuf);
|
pipeline.fireChannelRead(byteBuf);
|
||||||
byteBuf = null;
|
byteBuf = null;
|
||||||
|
|
||||||
@ -572,7 +572,6 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
|
|||||||
KQueueRecvByteAllocatorHandle allocHandle) {
|
KQueueRecvByteAllocatorHandle allocHandle) {
|
||||||
if (byteBuf != null) {
|
if (byteBuf != null) {
|
||||||
if (byteBuf.isReadable()) {
|
if (byteBuf.isReadable()) {
|
||||||
readPending = false;
|
|
||||||
pipeline.fireChannelRead(byteBuf);
|
pipeline.fireChannelRead(byteBuf);
|
||||||
} else {
|
} else {
|
||||||
byteBuf.release();
|
byteBuf.release();
|
||||||
|
@ -126,6 +126,16 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
|
|||||||
byteBuf.release();
|
byteBuf.release();
|
||||||
byteBuf = null;
|
byteBuf = null;
|
||||||
close = allocHandle.lastBytesRead() < 0;
|
close = allocHandle.lastBytesRead() < 0;
|
||||||
|
if (close) {
|
||||||
|
// Based upon the Javadocs it is possible that NIO may have spurious wake ups [1]. In this
|
||||||
|
// case we should be more cautious and only set readPending to false if data was actually
|
||||||
|
// read.
|
||||||
|
// [1] https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SelectionKey.html
|
||||||
|
// That a selection key's ready set indicates that its channel is ready for some operation
|
||||||
|
// category is a hint, but not a guarantee, that an operation in such a category may be
|
||||||
|
// performed by a thread without causing the thread to block.
|
||||||
|
readPending = false;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -80,7 +80,6 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
|
|||||||
RecvByteBufAllocator.Handle allocHandle) {
|
RecvByteBufAllocator.Handle allocHandle) {
|
||||||
if (byteBuf != null) {
|
if (byteBuf != null) {
|
||||||
if (byteBuf.isReadable()) {
|
if (byteBuf.isReadable()) {
|
||||||
readPending = false;
|
|
||||||
pipeline.fireChannelRead(byteBuf);
|
pipeline.fireChannelRead(byteBuf);
|
||||||
} else {
|
} else {
|
||||||
byteBuf.release();
|
byteBuf.release();
|
||||||
@ -102,9 +101,6 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
|
|||||||
// during the same read loop readPending was set to false.
|
// during the same read loop readPending was set to false.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// In OIO we should set readPending to false even if the read was not successful so we can schedule
|
|
||||||
// another read on the event loop if no reads are done.
|
|
||||||
readPending = false;
|
|
||||||
|
|
||||||
final ChannelPipeline pipeline = pipeline();
|
final ChannelPipeline pipeline = pipeline();
|
||||||
final ByteBufAllocator allocator = config.getAllocator();
|
final ByteBufAllocator allocator = config.getAllocator();
|
||||||
@ -117,6 +113,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
|
|||||||
try {
|
try {
|
||||||
byteBuf = allocHandle.allocate(allocator);
|
byteBuf = allocHandle.allocate(allocator);
|
||||||
do {
|
do {
|
||||||
|
readPending = false;
|
||||||
allocHandle.lastBytesRead(doReadBytes(byteBuf));
|
allocHandle.lastBytesRead(doReadBytes(byteBuf));
|
||||||
if (allocHandle.lastBytesRead() <= 0) {
|
if (allocHandle.lastBytesRead() <= 0) {
|
||||||
if (!byteBuf.isReadable()) { // nothing was read. release the buffer.
|
if (!byteBuf.isReadable()) { // nothing was read. release the buffer.
|
||||||
@ -140,7 +137,6 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
|
|||||||
final int maxCapacity = byteBuf.maxCapacity();
|
final int maxCapacity = byteBuf.maxCapacity();
|
||||||
if (capacity == maxCapacity) {
|
if (capacity == maxCapacity) {
|
||||||
allocHandle.incMessagesRead(1);
|
allocHandle.incMessagesRead(1);
|
||||||
readPending = false;
|
|
||||||
pipeline.fireChannelRead(byteBuf);
|
pipeline.fireChannelRead(byteBuf);
|
||||||
byteBuf = allocHandle.allocate(allocator);
|
byteBuf = allocHandle.allocate(allocator);
|
||||||
} else {
|
} else {
|
||||||
@ -158,7 +154,6 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
|
|||||||
// It is possible we allocated a buffer because the previous one was not writable, but then didn't use
|
// It is possible we allocated a buffer because the previous one was not writable, but then didn't use
|
||||||
// it because allocHandle.continueReading() returned false.
|
// it because allocHandle.continueReading() returned false.
|
||||||
if (byteBuf.isReadable()) {
|
if (byteBuf.isReadable()) {
|
||||||
readPending = false;
|
|
||||||
pipeline.fireChannelRead(byteBuf);
|
pipeline.fireChannelRead(byteBuf);
|
||||||
} else {
|
} else {
|
||||||
byteBuf.release();
|
byteBuf.release();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user