Avoid IdleStateHandler triggering unexpected idle events when flushing large entries to slow clients (#9020)

Motivation:

IdleStateHandler may trigger unexpected idle events when flushing large entries to slow clients.

Modification:

In netty design, we check the identity hash code and total pending write bytes of the current flush entry to determine whether there is a change in output. But if a large entry has been flushing slowly (for some reason, the network speed is slow, or the client processing speed is too slow to cause the TCP sliding window to be zero), the total pending write bytes size and identity hash code would remain unchanged.

Avoid this issue by adding checks for the current entry flush progress.

Result:

Fixes #8912 .
This commit is contained in:
秦世成 2019-04-09 22:26:27 +08:00 committed by Norman Maurer
parent b26a61acd1
commit 51112e2b36
3 changed files with 71 additions and 4 deletions

View File

@ -129,6 +129,7 @@ public class IdleStateHandler extends ChannelDuplexHandler {
private long lastChangeCheckTimeStamp; private long lastChangeCheckTimeStamp;
private int lastMessageHashCode; private int lastMessageHashCode;
private long lastPendingWriteBytes; private long lastPendingWriteBytes;
private long lastFlushProgress;
/** /**
* Creates a new instance firing {@link IdleStateEvent}s. * Creates a new instance firing {@link IdleStateEvent}s.
@ -399,6 +400,7 @@ public class IdleStateHandler extends ChannelDuplexHandler {
if (buf != null) { if (buf != null) {
lastMessageHashCode = System.identityHashCode(buf.current()); lastMessageHashCode = System.identityHashCode(buf.current());
lastPendingWriteBytes = buf.totalPendingWriteBytes(); lastPendingWriteBytes = buf.totalPendingWriteBytes();
lastFlushProgress = buf.currentProgress();
} }
} }
} }
@ -443,6 +445,15 @@ public class IdleStateHandler extends ChannelDuplexHandler {
return true; return true;
} }
} }
long flushProgress = buf.currentProgress();
if (flushProgress != lastFlushProgress) {
lastFlushProgress = flushProgress;
if (!first) {
return true;
}
}
} }
} }

View File

@ -236,6 +236,7 @@ public class IdleStateHandlerTest {
channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[] { 1 })); channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[] { 1 }));
channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[] { 2 })); channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[] { 2 }));
channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[] { 3 })); channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[] { 3 }));
channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[5 * 1024]));
// Establish a baseline. We're not consuming anything and let it idle once. // Establish a baseline. We're not consuming anything and let it idle once.
idleStateHandler.tickRun(); idleStateHandler.tickRun();
@ -283,6 +284,30 @@ public class IdleStateHandlerTest {
assertEquals(0, events.size()); assertEquals(0, events.size());
assertEquals(26L, idleStateHandler.tick(TimeUnit.SECONDS)); // 23s + 2s + 1s assertEquals(26L, idleStateHandler.tick(TimeUnit.SECONDS)); // 23s + 2s + 1s
// Consume part of the message every 2 seconds, then be idle for 1 seconds,
// then run the task and we should get an IdleStateEvent because the first trigger
idleStateHandler.tick(2L, TimeUnit.SECONDS);
assertNotNullAndRelease(channel.consumePart(1024));
idleStateHandler.tick(2L, TimeUnit.SECONDS);
assertNotNullAndRelease(channel.consumePart(1024));
idleStateHandler.tickRun(1L, TimeUnit.SECONDS);
assertEquals(1, events.size());
assertEquals(31L, idleStateHandler.tick(TimeUnit.SECONDS)); // 26s + 2s + 2s + 1s
events.clear();
// Consume part of the message every 2 seconds, then be idle for 1 seconds,
// then consume all the rest of the message, then run the task and we shouldn't
// get an IdleStateEvent because the data is flowing and we haven't been idle for long enough!
idleStateHandler.tick(2L, TimeUnit.SECONDS);
assertNotNullAndRelease(channel.consumePart(1024));
idleStateHandler.tick(2L, TimeUnit.SECONDS);
assertNotNullAndRelease(channel.consumePart(1024));
idleStateHandler.tickRun(1L, TimeUnit.SECONDS);
assertEquals(0, events.size());
assertEquals(36L, idleStateHandler.tick(TimeUnit.SECONDS)); // 31s + 2s + 2s + 1s
idleStateHandler.tick(2L, TimeUnit.SECONDS);
assertNotNullAndRelease(channel.consumePart(1024));
// There are no messages left! Advance the ticker by 3 seconds, // There are no messages left! Advance the ticker by 3 seconds,
// attempt a consume() but it will be null, then advance the // attempt a consume() but it will be null, then advance the
// ticker by an another 2 seconds and we should get an IdleStateEvent // ticker by an another 2 seconds and we should get an IdleStateEvent
@ -292,7 +317,7 @@ public class IdleStateHandlerTest {
idleStateHandler.tickRun(2L, TimeUnit.SECONDS); idleStateHandler.tickRun(2L, TimeUnit.SECONDS);
assertEquals(1, events.size()); assertEquals(1, events.size());
assertEquals(31L, idleStateHandler.tick(TimeUnit.SECONDS)); // 26s + 3s + 2s assertEquals(43L, idleStateHandler.tick(TimeUnit.SECONDS)); // 36s + 2s + 3s + 2s
// q.e.d. // q.e.d.
} finally { } finally {
@ -379,7 +404,7 @@ public class IdleStateHandlerTest {
// the messages in the ChannelOutboundBuffer. // the messages in the ChannelOutboundBuffer.
} }
public Object consume() { private Object consume() {
ChannelOutboundBuffer buf = unsafe().outboundBuffer(); ChannelOutboundBuffer buf = unsafe().outboundBuffer();
if (buf != null) { if (buf != null) {
Object msg = buf.current(); Object msg = buf.current();
@ -391,5 +416,24 @@ public class IdleStateHandlerTest {
} }
return null; return null;
} }
/**
* Consume the part of a message.
*
* @param byteCount count of byte to be consumed
* @return the message currently being consumed
*/
private Object consumePart(int byteCount) {
ChannelOutboundBuffer buf = unsafe().outboundBuffer();
if (buf != null) {
Object msg = buf.current();
if (msg != null) {
ReferenceCountUtil.retain(msg);
buf.removeBytes(byteCount);
return msg;
}
}
return null;
}
} }
} }

View File

@ -220,6 +220,18 @@ public final class ChannelOutboundBuffer {
return entry.msg; return entry.msg;
} }
/**
* Return the current message flush progress.
* @return {@code 0} if nothing was flushed before for the current message or there is no current message
*/
public long currentProgress() {
Entry entry = flushedEntry;
if (entry == null) {
return 0;
}
return entry.progress;
}
/** /**
* Notify the {@link ChannelPromise} of the current message about writing progress. * Notify the {@link ChannelPromise} of the current message about writing progress.
*/ */
@ -227,9 +239,9 @@ public final class ChannelOutboundBuffer {
Entry e = flushedEntry; Entry e = flushedEntry;
assert e != null; assert e != null;
ChannelPromise p = e.promise; ChannelPromise p = e.promise;
if (p instanceof ChannelProgressivePromise) {
long progress = e.progress + amount; long progress = e.progress + amount;
e.progress = progress; e.progress = progress;
if (p instanceof ChannelProgressivePromise) {
((ChannelProgressivePromise) p).tryProgress(progress, e.total); ((ChannelProgressivePromise) p).tryProgress(progress, e.total);
} }
} }