From 51112e2b36ec5550a73d72bfc59f4523f7b8ec27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A7=A6=E4=B8=96=E6=88=90?= Date: Tue, 9 Apr 2019 22:26:27 +0800 Subject: [PATCH] Avoid IdleStateHandler triggering unexpected idle events when flushing large entries to slow clients (#9020) Motivation: IdleStateHandler may trigger unexpected idle events when flushing large entries to slow clients. Modification: In netty design, we check the identity hash code and total pending write bytes of the current flush entry to determine whether there is a change in output. But if a large entry has been flushing slowly (for some reason, the network speed is slow, or the client processing speed is too slow to cause the TCP sliding window to be zero), the total pending write bytes size and identity hash code would remain unchanged. Avoid this issue by adding checks for the current entry flush progress. Result: Fixes #8912 . --- .../handler/timeout/IdleStateHandler.java | 11 +++++ .../handler/timeout/IdleStateHandlerTest.java | 48 ++++++++++++++++++- .../netty/channel/ChannelOutboundBuffer.java | 16 ++++++- 3 files changed, 71 insertions(+), 4 deletions(-) diff --git a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java index 299e4c7ec7..ee80dcf7e0 100644 --- a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java @@ -129,6 +129,7 @@ public class IdleStateHandler extends ChannelDuplexHandler { private long lastChangeCheckTimeStamp; private int lastMessageHashCode; private long lastPendingWriteBytes; + private long lastFlushProgress; /** * Creates a new instance firing {@link IdleStateEvent}s. @@ -399,6 +400,7 @@ public class IdleStateHandler extends ChannelDuplexHandler { if (buf != null) { lastMessageHashCode = System.identityHashCode(buf.current()); lastPendingWriteBytes = buf.totalPendingWriteBytes(); + lastFlushProgress = buf.currentProgress(); } } } @@ -443,6 +445,15 @@ public class IdleStateHandler extends ChannelDuplexHandler { return true; } } + + long flushProgress = buf.currentProgress(); + if (flushProgress != lastFlushProgress) { + lastFlushProgress = flushProgress; + + if (!first) { + return true; + } + } } } diff --git a/handler/src/test/java/io/netty/handler/timeout/IdleStateHandlerTest.java b/handler/src/test/java/io/netty/handler/timeout/IdleStateHandlerTest.java index a27364f439..668f3f1ea6 100644 --- a/handler/src/test/java/io/netty/handler/timeout/IdleStateHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/timeout/IdleStateHandlerTest.java @@ -236,6 +236,7 @@ public class IdleStateHandlerTest { channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[] { 1 })); channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[] { 2 })); channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[] { 3 })); + channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[5 * 1024])); // Establish a baseline. We're not consuming anything and let it idle once. idleStateHandler.tickRun(); @@ -283,6 +284,30 @@ public class IdleStateHandlerTest { assertEquals(0, events.size()); assertEquals(26L, idleStateHandler.tick(TimeUnit.SECONDS)); // 23s + 2s + 1s + // Consume part of the message every 2 seconds, then be idle for 1 seconds, + // then run the task and we should get an IdleStateEvent because the first trigger + idleStateHandler.tick(2L, TimeUnit.SECONDS); + assertNotNullAndRelease(channel.consumePart(1024)); + idleStateHandler.tick(2L, TimeUnit.SECONDS); + assertNotNullAndRelease(channel.consumePart(1024)); + idleStateHandler.tickRun(1L, TimeUnit.SECONDS); + assertEquals(1, events.size()); + assertEquals(31L, idleStateHandler.tick(TimeUnit.SECONDS)); // 26s + 2s + 2s + 1s + events.clear(); + + // Consume part of the message every 2 seconds, then be idle for 1 seconds, + // then consume all the rest of the message, then run the task and we shouldn't + // get an IdleStateEvent because the data is flowing and we haven't been idle for long enough! + idleStateHandler.tick(2L, TimeUnit.SECONDS); + assertNotNullAndRelease(channel.consumePart(1024)); + idleStateHandler.tick(2L, TimeUnit.SECONDS); + assertNotNullAndRelease(channel.consumePart(1024)); + idleStateHandler.tickRun(1L, TimeUnit.SECONDS); + assertEquals(0, events.size()); + assertEquals(36L, idleStateHandler.tick(TimeUnit.SECONDS)); // 31s + 2s + 2s + 1s + idleStateHandler.tick(2L, TimeUnit.SECONDS); + assertNotNullAndRelease(channel.consumePart(1024)); + // There are no messages left! Advance the ticker by 3 seconds, // attempt a consume() but it will be null, then advance the // ticker by an another 2 seconds and we should get an IdleStateEvent @@ -292,7 +317,7 @@ public class IdleStateHandlerTest { idleStateHandler.tickRun(2L, TimeUnit.SECONDS); assertEquals(1, events.size()); - assertEquals(31L, idleStateHandler.tick(TimeUnit.SECONDS)); // 26s + 3s + 2s + assertEquals(43L, idleStateHandler.tick(TimeUnit.SECONDS)); // 36s + 2s + 3s + 2s // q.e.d. } finally { @@ -379,7 +404,7 @@ public class IdleStateHandlerTest { // the messages in the ChannelOutboundBuffer. } - public Object consume() { + private Object consume() { ChannelOutboundBuffer buf = unsafe().outboundBuffer(); if (buf != null) { Object msg = buf.current(); @@ -391,5 +416,24 @@ public class IdleStateHandlerTest { } return null; } + + /** + * Consume the part of a message. + * + * @param byteCount count of byte to be consumed + * @return the message currently being consumed + */ + private Object consumePart(int byteCount) { + ChannelOutboundBuffer buf = unsafe().outboundBuffer(); + if (buf != null) { + Object msg = buf.current(); + if (msg != null) { + ReferenceCountUtil.retain(msg); + buf.removeBytes(byteCount); + return msg; + } + } + return null; + } } } diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index d3a934a829..e042490f30 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -220,6 +220,18 @@ public final class ChannelOutboundBuffer { return entry.msg; } + /** + * Return the current message flush progress. + * @return {@code 0} if nothing was flushed before for the current message or there is no current message + */ + public long currentProgress() { + Entry entry = flushedEntry; + if (entry == null) { + return 0; + } + return entry.progress; + } + /** * Notify the {@link ChannelPromise} of the current message about writing progress. */ @@ -227,9 +239,9 @@ public final class ChannelOutboundBuffer { Entry e = flushedEntry; assert e != null; ChannelPromise p = e.promise; + long progress = e.progress + amount; + e.progress = progress; if (p instanceof ChannelProgressivePromise) { - long progress = e.progress + amount; - e.progress = progress; ((ChannelProgressivePromise) p).tryProgress(progress, e.total); } }