From 68a941c091f5a5b5d69715327db9ca77b53e1864 Mon Sep 17 00:00:00 2001 From: Roger Kapsi Date: Wed, 28 Dec 2016 11:24:52 -0500 Subject: [PATCH] Detecting actual Channel write idleness vs. slowness Motivation The IdleStateHandler tracks write() idleness on message granularity but does not take into consideration that the client may be just slow and has managed to consume a subset of the message's bytes in the configured period of time. Modifications Adding an optional configuration parameter to IdleStateHandler which tells it to observe ChannelOutboundBuffer's state. Result Fixes https://github.com/netty/netty/issues/6150 --- .../handler/timeout/IdleStateHandler.java | 210 +++++++--- .../handler/timeout/IdleStateHandlerTest.java | 395 ++++++++++++++++++ 2 files changed, 551 insertions(+), 54 deletions(-) create mode 100644 handler/src/test/java/io/netty/handler/timeout/IdleStateHandlerTest.java diff --git a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java index 3d2acd1165..6d6229c0e6 100644 --- a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java @@ -17,11 +17,13 @@ package io.netty.handler.timeout; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; +import io.netty.channel.Channel.Unsafe; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPromise; import io.netty.util.concurrent.EventExecutor; @@ -101,11 +103,12 @@ public class IdleStateHandler extends ChannelDuplexHandler { private final ChannelFutureListener writeListener = new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - lastWriteTime = System.nanoTime(); + lastWriteTime = ticksInNanos(); firstWriterIdleEvent = firstAllIdleEvent = true; } }; + private final boolean observeOutput; private final long readerIdleTimeNanos; private final long writerIdleTimeNanos; private final long allIdleTimeNanos; @@ -124,6 +127,10 @@ public class IdleStateHandler extends ChannelDuplexHandler { private byte state; // 0 - none, 1 - initialized, 2 - destroyed private boolean reading; + private long lastChangeCheckTimeStamp; + private int lastMessageHashCode; + private long lastPendingWriteBytes; + /** * Creates a new instance firing {@link IdleStateEvent}s. * @@ -149,9 +156,21 @@ public class IdleStateHandler extends ChannelDuplexHandler { TimeUnit.SECONDS); } + /** + * @see #IdleStateHandler(boolean, long, long, long, TimeUnit) + */ + public IdleStateHandler( + long readerIdleTime, long writerIdleTime, long allIdleTime, + TimeUnit unit) { + this(false, readerIdleTime, writerIdleTime, allIdleTime, unit); + } + /** * Creates a new instance firing {@link IdleStateEvent}s. * + * @param observeOutput + * whether or not the consumption of {@code bytes} should be taken into + * consideration when assessing write idleness. The default is {@code false}. * @param readerIdleTime * an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE} * will be triggered when no read was performed for the specified @@ -168,13 +187,15 @@ public class IdleStateHandler extends ChannelDuplexHandler { * the {@link TimeUnit} of {@code readerIdleTime}, * {@code writeIdleTime}, and {@code allIdleTime} */ - public IdleStateHandler( + public IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { if (unit == null) { throw new NullPointerException("unit"); } + this.observeOutput = observeOutput; + if (readerIdleTime <= 0) { readerIdleTimeNanos = 0; } else { @@ -269,7 +290,7 @@ public class IdleStateHandler extends ChannelDuplexHandler { @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) { - lastReadTime = System.nanoTime(); + lastReadTime = ticksInNanos(); reading = false; } ctx.fireChannelReadComplete(); @@ -297,27 +318,37 @@ public class IdleStateHandler extends ChannelDuplexHandler { } state = 1; + initOutputChanged(ctx); - EventExecutor loop = ctx.executor(); - - lastReadTime = lastWriteTime = System.nanoTime(); + lastReadTime = lastWriteTime = ticksInNanos(); if (readerIdleTimeNanos > 0) { - readerIdleTimeout = loop.schedule( - new ReaderIdleTimeoutTask(ctx), + readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx), readerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (writerIdleTimeNanos > 0) { - writerIdleTimeout = loop.schedule( - new WriterIdleTimeoutTask(ctx), + writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx), writerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (allIdleTimeNanos > 0) { - allIdleTimeout = loop.schedule( - new AllIdleTimeoutTask(ctx), + allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx), allIdleTimeNanos, TimeUnit.NANOSECONDS); } } + /** + * This method is visible for testing! + */ + long ticksInNanos() { + return System.nanoTime(); + } + + /** + * This method is visible for testing! + */ + ScheduledFuture schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) { + return ctx.executor().schedule(task, delay, unit); + } + private void destroy() { state = 2; @@ -355,15 +386,77 @@ public class IdleStateHandler extends ChannelDuplexHandler { case WRITER_IDLE: return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT; default: - throw new Error(); + throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first); } } - private final class ReaderIdleTimeoutTask implements Runnable { + /** + * @see #hasOutputChanged(ChannelHandlerContext, boolean) + */ + private void initOutputChanged(ChannelHandlerContext ctx) { + if (observeOutput) { + Channel channel = ctx.channel(); + Unsafe unsafe = channel.unsafe(); + ChannelOutboundBuffer buf = unsafe.outboundBuffer(); + + if (buf != null) { + lastMessageHashCode = System.identityHashCode(buf.current()); + lastPendingWriteBytes = buf.totalPendingWriteBytes(); + } + } + } + + /** + * Returns {@code true} if and only if the {@link IdleStateHandler} was constructed + * with {@link #observeOutput} enabled and there has been an observed change in the + * {@link ChannelOutboundBuffer} between two consecutive calls of this method. + * + * https://github.com/netty/netty/issues/6150 + */ + private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) { + if (observeOutput) { + + // We can take this shortcut if the ChannelPromises that got passed into write() + // appear to complete. It indicates "change" on message level and we simply assume + // that there's change happening on byte level. If the user doesn't observe channel + // writability events then they'll eventually OOME and there's clearly a different + // problem and idleness is least of their concerns. + if (lastChangeCheckTimeStamp != lastWriteTime) { + lastChangeCheckTimeStamp = lastWriteTime; + + // But this applies only if it's the non-first call. + if (!first) { + return true; + } + } + + Channel channel = ctx.channel(); + Unsafe unsafe = channel.unsafe(); + ChannelOutboundBuffer buf = unsafe.outboundBuffer(); + + if (buf != null) { + int messageHashCode = System.identityHashCode(buf.current()); + long pendingWriteBytes = buf.totalPendingWriteBytes(); + + if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) { + lastMessageHashCode = messageHashCode; + lastPendingWriteBytes = pendingWriteBytes; + + if (!first) { + return true; + } + } + } + } + + return false; + } + + private abstract static class AbstractIdleTask implements Runnable { private final ChannelHandlerContext ctx; - ReaderIdleTimeoutTask(ChannelHandlerContext ctx) { + AbstractIdleTask(ChannelHandlerContext ctx) { this.ctx = ctx; } @@ -373,98 +466,107 @@ public class IdleStateHandler extends ChannelDuplexHandler { return; } + run(ctx); + } + + protected abstract void run(ChannelHandlerContext ctx); + } + + private final class ReaderIdleTimeoutTask extends AbstractIdleTask { + + ReaderIdleTimeoutTask(ChannelHandlerContext ctx) { + super(ctx); + } + + @Override + protected void run(ChannelHandlerContext ctx) { long nextDelay = readerIdleTimeNanos; if (!reading) { - nextDelay -= System.nanoTime() - lastReadTime; + nextDelay -= ticksInNanos() - lastReadTime; } if (nextDelay <= 0) { // Reader is idle - set a new timeout and notify the callback. - readerIdleTimeout = - ctx.executor().schedule(this, readerIdleTimeNanos, TimeUnit.NANOSECONDS); - try { - IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, firstReaderIdleEvent); - if (firstReaderIdleEvent) { - firstReaderIdleEvent = false; - } + readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS); + boolean first = firstReaderIdleEvent; + firstReaderIdleEvent = false; + + try { + IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first); channelIdle(ctx, event); } catch (Throwable t) { ctx.fireExceptionCaught(t); } } else { // Read occurred before the timeout - set a new timeout with shorter delay. - readerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS); + readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS); } } } - private final class WriterIdleTimeoutTask implements Runnable { - - private final ChannelHandlerContext ctx; + private final class WriterIdleTimeoutTask extends AbstractIdleTask { WriterIdleTimeoutTask(ChannelHandlerContext ctx) { - this.ctx = ctx; + super(ctx); } @Override - public void run() { - if (!ctx.channel().isOpen()) { - return; - } + protected void run(ChannelHandlerContext ctx) { long lastWriteTime = IdleStateHandler.this.lastWriteTime; - long nextDelay = writerIdleTimeNanos - (System.nanoTime() - lastWriteTime); + long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime); if (nextDelay <= 0) { // Writer is idle - set a new timeout and notify the callback. - writerIdleTimeout = ctx.executor().schedule( - this, writerIdleTimeNanos, TimeUnit.NANOSECONDS); + writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS); + + boolean first = firstWriterIdleEvent; + firstWriterIdleEvent = false; + try { - IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, firstWriterIdleEvent); - if (firstWriterIdleEvent) { - firstWriterIdleEvent = false; + if (hasOutputChanged(ctx, first)) { + return; } + IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first); channelIdle(ctx, event); } catch (Throwable t) { ctx.fireExceptionCaught(t); } } else { // Write occurred before the timeout - set a new timeout with shorter delay. - writerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS); + writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS); } } } - private final class AllIdleTimeoutTask implements Runnable { - - private final ChannelHandlerContext ctx; + private final class AllIdleTimeoutTask extends AbstractIdleTask { AllIdleTimeoutTask(ChannelHandlerContext ctx) { - this.ctx = ctx; + super(ctx); } @Override - public void run() { - if (!ctx.channel().isOpen()) { - return; - } + protected void run(ChannelHandlerContext ctx) { long nextDelay = allIdleTimeNanos; if (!reading) { - nextDelay -= System.nanoTime() - Math.max(lastReadTime, lastWriteTime); + nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime); } if (nextDelay <= 0) { // Both reader and writer are idle - set a new timeout and // notify the callback. - allIdleTimeout = ctx.executor().schedule( - this, allIdleTimeNanos, TimeUnit.NANOSECONDS); + allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS); + + boolean first = firstAllIdleEvent; + firstAllIdleEvent = false; + try { - IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, firstAllIdleEvent); - if (firstAllIdleEvent) { - firstAllIdleEvent = false; + if (hasOutputChanged(ctx, first)) { + return; } + IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first); channelIdle(ctx, event); } catch (Throwable t) { ctx.fireExceptionCaught(t); @@ -472,7 +574,7 @@ public class IdleStateHandler extends ChannelDuplexHandler { } else { // Either read or write occurred before the timeout - set a new // timeout with shorter delay. - allIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS); + allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS); } } } diff --git a/handler/src/test/java/io/netty/handler/timeout/IdleStateHandlerTest.java b/handler/src/test/java/io/netty/handler/timeout/IdleStateHandlerTest.java new file mode 100644 index 0000000000..f7e1c42153 --- /dev/null +++ b/handler/src/test/java/io/netty/handler/timeout/IdleStateHandlerTest.java @@ -0,0 +1,395 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.timeout; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelOutboundBuffer; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.util.ReferenceCountUtil; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +public class IdleStateHandlerTest { + + @Test + public void testReaderIdle() throws Exception { + TestableIdleStateHandler idleStateHandler = new TestableIdleStateHandler( + false, 1L, 0L, 0L, TimeUnit.SECONDS); + + // We start with one FIRST_READER_IDLE_STATE_EVENT, followed by an infinite number of READER_IDLE_STATE_EVENTs + anyIdle(idleStateHandler, IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT, + IdleStateEvent.READER_IDLE_STATE_EVENT, IdleStateEvent.READER_IDLE_STATE_EVENT); + } + + @Test + public void testWriterIdle() throws Exception { + TestableIdleStateHandler idleStateHandler = new TestableIdleStateHandler( + false, 0L, 1L, 0L, TimeUnit.SECONDS); + + anyIdle(idleStateHandler, IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT, + IdleStateEvent.WRITER_IDLE_STATE_EVENT, IdleStateEvent.WRITER_IDLE_STATE_EVENT); + } + + @Test + public void testAllIdle() throws Exception { + TestableIdleStateHandler idleStateHandler = new TestableIdleStateHandler( + false, 0L, 0L, 1L, TimeUnit.SECONDS); + + anyIdle(idleStateHandler, IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT, + IdleStateEvent.ALL_IDLE_STATE_EVENT, IdleStateEvent.ALL_IDLE_STATE_EVENT); + } + + private void anyIdle(TestableIdleStateHandler idleStateHandler, Object... expected) throws Exception { + + assertTrue("The number of expected events must be >= 1", expected.length >= 1); + + final List events = new ArrayList(); + ChannelInboundHandlerAdapter handler = new ChannelInboundHandlerAdapter() { + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + events.add(evt); + } + }; + + EmbeddedChannel channel = new EmbeddedChannel(idleStateHandler, handler); + try { + // For each expected event advance the ticker and run() the task. Each + // step should yield in an IdleStateEvent because we haven't written + // or read anything from the channel. + for (int i = 0; i < expected.length; i++) { + idleStateHandler.tickRun(); + } + + assertEquals(expected.length, events.size()); + + // Compare the expected with the actual IdleStateEvents + for (int i = 0; i < expected.length; i++) { + Object evt = events.get(i); + assertSame("Element " + i + " is not matching", expected[i], evt); + } + } finally { + channel.finishAndReleaseAll(); + } + } + + @Test + public void testReaderNotIdle() throws Exception { + TestableIdleStateHandler idleStateHandler = new TestableIdleStateHandler( + false, 1L, 0L, 0L, TimeUnit.SECONDS); + + Action action = new Action() { + @Override + public void run(EmbeddedChannel channel) throws Exception { + channel.writeInbound("Hello, World!"); + } + }; + + anyNotIdle(idleStateHandler, action, IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT); + } + + @Test + public void testWriterNotIdle() throws Exception { + TestableIdleStateHandler idleStateHandler = new TestableIdleStateHandler( + false, 0L, 1L, 0L, TimeUnit.SECONDS); + + Action action = new Action() { + @Override + public void run(EmbeddedChannel channel) throws Exception { + channel.writeAndFlush("Hello, World!"); + } + }; + + anyNotIdle(idleStateHandler, action, IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT); + } + + @Test + public void testAllNotIdle() throws Exception { + // Reader... + TestableIdleStateHandler idleStateHandler = new TestableIdleStateHandler( + false, 0L, 0L, 1L, TimeUnit.SECONDS); + + Action reader = new Action() { + @Override + public void run(EmbeddedChannel channel) throws Exception { + channel.writeInbound("Hello, World!"); + } + }; + + anyNotIdle(idleStateHandler, reader, IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT); + + // Writer... + idleStateHandler = new TestableIdleStateHandler( + false, 0L, 0L, 1L, TimeUnit.SECONDS); + + Action writer = new Action() { + @Override + public void run(EmbeddedChannel channel) throws Exception { + channel.writeAndFlush("Hello, World!"); + } + }; + + anyNotIdle(idleStateHandler, writer, IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT); + } + + private void anyNotIdle(TestableIdleStateHandler idleStateHandler, + Action action, Object expected) throws Exception { + + final List events = new ArrayList(); + ChannelInboundHandlerAdapter handler = new ChannelInboundHandlerAdapter() { + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + events.add(evt); + } + }; + + EmbeddedChannel channel = new EmbeddedChannel(idleStateHandler, handler); + try { + idleStateHandler.tick(1L, TimeUnit.NANOSECONDS); + action.run(channel); + + // Advance the ticker by some fraction and run() the task. + // There shouldn't be an IdleStateEvent getting fired because + // we've just performed an action on the channel that is meant + // to reset the idle task. + long delayInNanos = idleStateHandler.delay(TimeUnit.NANOSECONDS); + assertNotEquals(0L, delayInNanos); + + idleStateHandler.tickRun(delayInNanos / 2L, TimeUnit.NANOSECONDS); + assertEquals(0, events.size()); + + // Advance the ticker by the full amount and it should yield + // in an IdleStateEvent. + idleStateHandler.tickRun(); + assertEquals(1, events.size()); + assertSame(expected, events.get(0)); + } finally { + channel.finishAndReleaseAll(); + } + } + + @Test + public void testObserveWriterIdle() throws Exception { + observeOutputIdle(true); + } + + @Test + public void testObserveAllIdle() throws Exception { + observeOutputIdle(false); + } + + private void observeOutputIdle(boolean writer) throws Exception { + + long writerIdleTime = 0L; + long allIdleTime = 0L; + IdleStateEvent expeced = null; + + if (writer) { + writerIdleTime = 5L; + expeced = IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT; + } else { + allIdleTime = 5L; + expeced = IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT; + } + + TestableIdleStateHandler idleStateHandler = new TestableIdleStateHandler( + true, 0L, writerIdleTime, allIdleTime, TimeUnit.SECONDS); + + final List events = new ArrayList(); + ChannelInboundHandlerAdapter handler = new ChannelInboundHandlerAdapter() { + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + events.add(evt); + } + }; + + ObservableChannel channel = new ObservableChannel(idleStateHandler, handler); + try { + // We're writing 3 messages that will be consumed at different rates! + channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[] { 1 })); + channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[] { 2 })); + channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[] { 3 })); + + // Establish a baseline. We're not consuming anything and let it idle once. + idleStateHandler.tickRun(); + assertEquals(1, events.size()); + assertSame(expeced, events.get(0)); + events.clear(); + + // Our ticker should be at second 5 + assertEquals(5L, idleStateHandler.tick(TimeUnit.SECONDS)); + + // Consume one message in 4 seconds, then be idle for 2 seconds, + // then run the task and we shouldn't get an IdleStateEvent because + // we haven't been idle for long enough! + idleStateHandler.tick(4L, TimeUnit.SECONDS); + assertNotNullAndRelease(channel.consume()); + + idleStateHandler.tickRun(2L, TimeUnit.SECONDS); + assertEquals(0, events.size()); + assertEquals(11L, idleStateHandler.tick(TimeUnit.SECONDS)); // 5s + 4s + 2s + + // Consume one message in 3 seconds, then be idle for 4 seconds, + // then run the task and we shouldn't get an IdleStateEvent because + // we haven't been idle for long enough! + idleStateHandler.tick(3L, TimeUnit.SECONDS); + assertNotNullAndRelease(channel.consume()); + + idleStateHandler.tickRun(4L, TimeUnit.SECONDS); + assertEquals(0, events.size()); + assertEquals(18L, idleStateHandler.tick(TimeUnit.SECONDS)); // 11s + 3s + 4s + + // Don't consume a message and be idle for 5 seconds. + // We should get an IdleStateEvent! + idleStateHandler.tickRun(5L, TimeUnit.SECONDS); + assertEquals(1, events.size()); + assertEquals(23L, idleStateHandler.tick(TimeUnit.SECONDS)); // 18s + 5s + events.clear(); + + // Consume one message in 2 seconds, then be idle for 1 seconds, + // then run the task and we shouldn't get an IdleStateEvent because + // we haven't been idle for long enough! + idleStateHandler.tick(2L, TimeUnit.SECONDS); + assertNotNullAndRelease(channel.consume()); + + idleStateHandler.tickRun(1L, TimeUnit.SECONDS); + assertEquals(0, events.size()); + assertEquals(26L, idleStateHandler.tick(TimeUnit.SECONDS)); // 23s + 2s + 1s + + // There are no messages left! Advance the ticker by 3 seconds, + // attempt a consume() but it will be null, then advance the + // ticker by an another 2 seconds and we should get an IdleStateEvent + // because we've been idle for 5 seconds. + idleStateHandler.tick(3L, TimeUnit.SECONDS); + assertNull(channel.consume()); + + idleStateHandler.tickRun(2L, TimeUnit.SECONDS); + assertEquals(1, events.size()); + assertEquals(31L, idleStateHandler.tick(TimeUnit.SECONDS)); // 26s + 3s + 2s + + // q.e.d. + } finally { + channel.finishAndReleaseAll(); + } + } + + private static void assertNotNullAndRelease(Object msg) { + assertNotNull(msg); + ReferenceCountUtil.release(msg); + } + + private interface Action { + void run(EmbeddedChannel channel) throws Exception; + } + + private static class TestableIdleStateHandler extends IdleStateHandler { + + private Runnable task; + + private long delayInNanos; + + private long ticksInNanos; + + public TestableIdleStateHandler(boolean observeOutput, + long readerIdleTime, long writerIdleTime, long allIdleTime, + TimeUnit unit) { + super(observeOutput, readerIdleTime, writerIdleTime, allIdleTime, unit); + } + + public long delay(TimeUnit unit) { + return unit.convert(delayInNanos, TimeUnit.NANOSECONDS); + } + + public void run() { + task.run(); + } + + public void tickRun() { + tickRun(delayInNanos, TimeUnit.NANOSECONDS); + } + + public void tickRun(long delay, TimeUnit unit) { + tick(delay, unit); + run(); + } + + /** + * Advances the current ticker by the given amount. + */ + public void tick(long delay, TimeUnit unit) { + ticksInNanos += unit.toNanos(delay); + } + + /** + * Returns {@link #ticksInNanos()} in the given {@link TimeUnit}. + */ + public long tick(TimeUnit unit) { + return unit.convert(ticksInNanos(), TimeUnit.NANOSECONDS); + } + + @Override + long ticksInNanos() { + return ticksInNanos; + } + + @Override + ScheduledFuture schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) { + this.task = task; + this.delayInNanos = unit.toNanos(delay); + return null; + } + } + + private static class ObservableChannel extends EmbeddedChannel { + + public ObservableChannel(ChannelHandler... handlers) { + super(handlers); + } + + @Override + protected void doWrite(ChannelOutboundBuffer in) throws Exception { + // Overridden to change EmbeddedChannel's default behavior. We went to keep + // the messages in the ChannelOutboundBuffer. + } + + public Object consume() { + ChannelOutboundBuffer buf = unsafe().outboundBuffer(); + if (buf != null) { + Object msg = buf.current(); + if (msg != null) { + ReferenceCountUtil.retain(msg); + buf.remove(); + return msg; + } + } + return null; + } + } +}