diff --git a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java index 299e4c7ec7..ee80dcf7e0 100644 --- a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java @@ -129,6 +129,7 @@ public class IdleStateHandler extends ChannelDuplexHandler { private long lastChangeCheckTimeStamp; private int lastMessageHashCode; private long lastPendingWriteBytes; + private long lastFlushProgress; /** * Creates a new instance firing {@link IdleStateEvent}s. @@ -399,6 +400,7 @@ public class IdleStateHandler extends ChannelDuplexHandler { if (buf != null) { lastMessageHashCode = System.identityHashCode(buf.current()); lastPendingWriteBytes = buf.totalPendingWriteBytes(); + lastFlushProgress = buf.currentProgress(); } } } @@ -443,6 +445,15 @@ public class IdleStateHandler extends ChannelDuplexHandler { return true; } } + + long flushProgress = buf.currentProgress(); + if (flushProgress != lastFlushProgress) { + lastFlushProgress = flushProgress; + + if (!first) { + return true; + } + } } } diff --git a/handler/src/test/java/io/netty/handler/timeout/IdleStateHandlerTest.java b/handler/src/test/java/io/netty/handler/timeout/IdleStateHandlerTest.java index a27364f439..668f3f1ea6 100644 --- a/handler/src/test/java/io/netty/handler/timeout/IdleStateHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/timeout/IdleStateHandlerTest.java @@ -236,6 +236,7 @@ public class IdleStateHandlerTest { channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[] { 1 })); channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[] { 2 })); channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[] { 3 })); + channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[5 * 1024])); // Establish a baseline. We're not consuming anything and let it idle once. idleStateHandler.tickRun(); @@ -283,6 +284,30 @@ public class IdleStateHandlerTest { assertEquals(0, events.size()); assertEquals(26L, idleStateHandler.tick(TimeUnit.SECONDS)); // 23s + 2s + 1s + // Consume part of the message every 2 seconds, then be idle for 1 seconds, + // then run the task and we should get an IdleStateEvent because the first trigger + idleStateHandler.tick(2L, TimeUnit.SECONDS); + assertNotNullAndRelease(channel.consumePart(1024)); + idleStateHandler.tick(2L, TimeUnit.SECONDS); + assertNotNullAndRelease(channel.consumePart(1024)); + idleStateHandler.tickRun(1L, TimeUnit.SECONDS); + assertEquals(1, events.size()); + assertEquals(31L, idleStateHandler.tick(TimeUnit.SECONDS)); // 26s + 2s + 2s + 1s + events.clear(); + + // Consume part of the message every 2 seconds, then be idle for 1 seconds, + // then consume all the rest of the message, then run the task and we shouldn't + // get an IdleStateEvent because the data is flowing and we haven't been idle for long enough! + idleStateHandler.tick(2L, TimeUnit.SECONDS); + assertNotNullAndRelease(channel.consumePart(1024)); + idleStateHandler.tick(2L, TimeUnit.SECONDS); + assertNotNullAndRelease(channel.consumePart(1024)); + idleStateHandler.tickRun(1L, TimeUnit.SECONDS); + assertEquals(0, events.size()); + assertEquals(36L, idleStateHandler.tick(TimeUnit.SECONDS)); // 31s + 2s + 2s + 1s + idleStateHandler.tick(2L, TimeUnit.SECONDS); + assertNotNullAndRelease(channel.consumePart(1024)); + // There are no messages left! Advance the ticker by 3 seconds, // attempt a consume() but it will be null, then advance the // ticker by an another 2 seconds and we should get an IdleStateEvent @@ -292,7 +317,7 @@ public class IdleStateHandlerTest { idleStateHandler.tickRun(2L, TimeUnit.SECONDS); assertEquals(1, events.size()); - assertEquals(31L, idleStateHandler.tick(TimeUnit.SECONDS)); // 26s + 3s + 2s + assertEquals(43L, idleStateHandler.tick(TimeUnit.SECONDS)); // 36s + 2s + 3s + 2s // q.e.d. } finally { @@ -379,7 +404,7 @@ public class IdleStateHandlerTest { // the messages in the ChannelOutboundBuffer. } - public Object consume() { + private Object consume() { ChannelOutboundBuffer buf = unsafe().outboundBuffer(); if (buf != null) { Object msg = buf.current(); @@ -391,5 +416,24 @@ public class IdleStateHandlerTest { } return null; } + + /** + * Consume the part of a message. + * + * @param byteCount count of byte to be consumed + * @return the message currently being consumed + */ + private Object consumePart(int byteCount) { + ChannelOutboundBuffer buf = unsafe().outboundBuffer(); + if (buf != null) { + Object msg = buf.current(); + if (msg != null) { + ReferenceCountUtil.retain(msg); + buf.removeBytes(byteCount); + return msg; + } + } + return null; + } } } diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index d3a934a829..e042490f30 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -220,6 +220,18 @@ public final class ChannelOutboundBuffer { return entry.msg; } + /** + * Return the current message flush progress. + * @return {@code 0} if nothing was flushed before for the current message or there is no current message + */ + public long currentProgress() { + Entry entry = flushedEntry; + if (entry == null) { + return 0; + } + return entry.progress; + } + /** * Notify the {@link ChannelPromise} of the current message about writing progress. */ @@ -227,9 +239,9 @@ public final class ChannelOutboundBuffer { Entry e = flushedEntry; assert e != null; ChannelPromise p = e.promise; + long progress = e.progress + amount; + e.progress = progress; if (p instanceof ChannelProgressivePromise) { - long progress = e.progress + amount; - e.progress = progress; ((ChannelProgressivePromise) p).tryProgress(progress, e.total); } }