Detecting actual Channel write idleness vs. slowness

Motivation

The IdleStateHandler tracks write() idleness on message granularity but does not take into consideration that the client may be just slow and has managed to consume a subset of the message's bytes in the configured period of time.

Modifications

Adding an optional configuration parameter to IdleStateHandler which tells it to observe ChannelOutboundBuffer's state.

Result

Fixes https://github.com/netty/netty/issues/6150
This commit is contained in:
Roger Kapsi 2016-12-28 11:24:52 -05:00 committed by Scott Mitchell
parent 56ddc47f23
commit 68a941c091
2 changed files with 551 additions and 54 deletions

View File

@ -17,11 +17,13 @@ package io.netty.handler.timeout;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.Channel.Unsafe;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.EventExecutor;
@ -101,11 +103,12 @@ public class IdleStateHandler extends ChannelDuplexHandler {
private final ChannelFutureListener writeListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
lastWriteTime = System.nanoTime();
lastWriteTime = ticksInNanos();
firstWriterIdleEvent = firstAllIdleEvent = true;
}
};
private final boolean observeOutput;
private final long readerIdleTimeNanos;
private final long writerIdleTimeNanos;
private final long allIdleTimeNanos;
@ -124,6 +127,10 @@ public class IdleStateHandler extends ChannelDuplexHandler {
private byte state; // 0 - none, 1 - initialized, 2 - destroyed
private boolean reading;
private long lastChangeCheckTimeStamp;
private int lastMessageHashCode;
private long lastPendingWriteBytes;
/**
* Creates a new instance firing {@link IdleStateEvent}s.
*
@ -149,9 +156,21 @@ public class IdleStateHandler extends ChannelDuplexHandler {
TimeUnit.SECONDS);
}
/**
* @see #IdleStateHandler(boolean, long, long, long, TimeUnit)
*/
public IdleStateHandler(
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
/**
* Creates a new instance firing {@link IdleStateEvent}s.
*
* @param observeOutput
* whether or not the consumption of {@code bytes} should be taken into
* consideration when assessing write idleness. The default is {@code false}.
* @param readerIdleTime
* an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
* will be triggered when no read was performed for the specified
@ -168,13 +187,15 @@ public class IdleStateHandler extends ChannelDuplexHandler {
* the {@link TimeUnit} of {@code readerIdleTime},
* {@code writeIdleTime}, and {@code allIdleTime}
*/
public IdleStateHandler(
public IdleStateHandler(boolean observeOutput,
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
if (unit == null) {
throw new NullPointerException("unit");
}
this.observeOutput = observeOutput;
if (readerIdleTime <= 0) {
readerIdleTimeNanos = 0;
} else {
@ -269,7 +290,7 @@ public class IdleStateHandler extends ChannelDuplexHandler {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
lastReadTime = System.nanoTime();
lastReadTime = ticksInNanos();
reading = false;
}
ctx.fireChannelReadComplete();
@ -297,27 +318,37 @@ public class IdleStateHandler extends ChannelDuplexHandler {
}
state = 1;
initOutputChanged(ctx);
EventExecutor loop = ctx.executor();
lastReadTime = lastWriteTime = System.nanoTime();
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = loop.schedule(
new ReaderIdleTimeoutTask(ctx),
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = loop.schedule(
new WriterIdleTimeoutTask(ctx),
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = loop.schedule(
new AllIdleTimeoutTask(ctx),
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
/**
* This method is visible for testing!
*/
long ticksInNanos() {
return System.nanoTime();
}
/**
* This method is visible for testing!
*/
ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
return ctx.executor().schedule(task, delay, unit);
}
private void destroy() {
state = 2;
@ -355,15 +386,77 @@ public class IdleStateHandler extends ChannelDuplexHandler {
case WRITER_IDLE:
return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT;
default:
throw new Error();
throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first);
}
}
private final class ReaderIdleTimeoutTask implements Runnable {
/**
* @see #hasOutputChanged(ChannelHandlerContext, boolean)
*/
private void initOutputChanged(ChannelHandlerContext ctx) {
if (observeOutput) {
Channel channel = ctx.channel();
Unsafe unsafe = channel.unsafe();
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
if (buf != null) {
lastMessageHashCode = System.identityHashCode(buf.current());
lastPendingWriteBytes = buf.totalPendingWriteBytes();
}
}
}
/**
* Returns {@code true} if and only if the {@link IdleStateHandler} was constructed
* with {@link #observeOutput} enabled and there has been an observed change in the
* {@link ChannelOutboundBuffer} between two consecutive calls of this method.
*
* https://github.com/netty/netty/issues/6150
*/
private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
if (observeOutput) {
// We can take this shortcut if the ChannelPromises that got passed into write()
// appear to complete. It indicates "change" on message level and we simply assume
// that there's change happening on byte level. If the user doesn't observe channel
// writability events then they'll eventually OOME and there's clearly a different
// problem and idleness is least of their concerns.
if (lastChangeCheckTimeStamp != lastWriteTime) {
lastChangeCheckTimeStamp = lastWriteTime;
// But this applies only if it's the non-first call.
if (!first) {
return true;
}
}
Channel channel = ctx.channel();
Unsafe unsafe = channel.unsafe();
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
if (buf != null) {
int messageHashCode = System.identityHashCode(buf.current());
long pendingWriteBytes = buf.totalPendingWriteBytes();
if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
lastMessageHashCode = messageHashCode;
lastPendingWriteBytes = pendingWriteBytes;
if (!first) {
return true;
}
}
}
}
return false;
}
private abstract static class AbstractIdleTask implements Runnable {
private final ChannelHandlerContext ctx;
ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
AbstractIdleTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@ -373,98 +466,107 @@ public class IdleStateHandler extends ChannelDuplexHandler {
return;
}
run(ctx);
}
protected abstract void run(ChannelHandlerContext ctx);
}
private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}
@Override
protected void run(ChannelHandlerContext ctx) {
long nextDelay = readerIdleTimeNanos;
if (!reading) {
nextDelay -= System.nanoTime() - lastReadTime;
nextDelay -= ticksInNanos() - lastReadTime;
}
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout =
ctx.executor().schedule(this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
try {
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, firstReaderIdleEvent);
if (firstReaderIdleEvent) {
firstReaderIdleEvent = false;
}
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
private final class WriterIdleTimeoutTask implements Runnable {
private final ChannelHandlerContext ctx;
private final class WriterIdleTimeoutTask extends AbstractIdleTask {
WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
super(ctx);
}
@Override
public void run() {
if (!ctx.channel().isOpen()) {
return;
}
protected void run(ChannelHandlerContext ctx) {
long lastWriteTime = IdleStateHandler.this.lastWriteTime;
long nextDelay = writerIdleTimeNanos - (System.nanoTime() - lastWriteTime);
long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
if (nextDelay <= 0) {
// Writer is idle - set a new timeout and notify the callback.
writerIdleTimeout = ctx.executor().schedule(
this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstWriterIdleEvent;
firstWriterIdleEvent = false;
try {
IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, firstWriterIdleEvent);
if (firstWriterIdleEvent) {
firstWriterIdleEvent = false;
if (hasOutputChanged(ctx, first)) {
return;
}
IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Write occurred before the timeout - set a new timeout with shorter delay.
writerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
private final class AllIdleTimeoutTask implements Runnable {
private final ChannelHandlerContext ctx;
private final class AllIdleTimeoutTask extends AbstractIdleTask {
AllIdleTimeoutTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
super(ctx);
}
@Override
public void run() {
if (!ctx.channel().isOpen()) {
return;
}
protected void run(ChannelHandlerContext ctx) {
long nextDelay = allIdleTimeNanos;
if (!reading) {
nextDelay -= System.nanoTime() - Math.max(lastReadTime, lastWriteTime);
nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
}
if (nextDelay <= 0) {
// Both reader and writer are idle - set a new timeout and
// notify the callback.
allIdleTimeout = ctx.executor().schedule(
this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstAllIdleEvent;
firstAllIdleEvent = false;
try {
IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, firstAllIdleEvent);
if (firstAllIdleEvent) {
firstAllIdleEvent = false;
if (hasOutputChanged(ctx, first)) {
return;
}
IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
@ -472,7 +574,7 @@ public class IdleStateHandler extends ChannelDuplexHandler {
} else {
// Either read or write occurred before the timeout - set a new
// timeout with shorter delay.
allIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}

View File

@ -0,0 +1,395 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.timeout;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.ReferenceCountUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
public class IdleStateHandlerTest {
@Test
public void testReaderIdle() throws Exception {
TestableIdleStateHandler idleStateHandler = new TestableIdleStateHandler(
false, 1L, 0L, 0L, TimeUnit.SECONDS);
// We start with one FIRST_READER_IDLE_STATE_EVENT, followed by an infinite number of READER_IDLE_STATE_EVENTs
anyIdle(idleStateHandler, IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT,
IdleStateEvent.READER_IDLE_STATE_EVENT, IdleStateEvent.READER_IDLE_STATE_EVENT);
}
@Test
public void testWriterIdle() throws Exception {
TestableIdleStateHandler idleStateHandler = new TestableIdleStateHandler(
false, 0L, 1L, 0L, TimeUnit.SECONDS);
anyIdle(idleStateHandler, IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT,
IdleStateEvent.WRITER_IDLE_STATE_EVENT, IdleStateEvent.WRITER_IDLE_STATE_EVENT);
}
@Test
public void testAllIdle() throws Exception {
TestableIdleStateHandler idleStateHandler = new TestableIdleStateHandler(
false, 0L, 0L, 1L, TimeUnit.SECONDS);
anyIdle(idleStateHandler, IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT,
IdleStateEvent.ALL_IDLE_STATE_EVENT, IdleStateEvent.ALL_IDLE_STATE_EVENT);
}
private void anyIdle(TestableIdleStateHandler idleStateHandler, Object... expected) throws Exception {
assertTrue("The number of expected events must be >= 1", expected.length >= 1);
final List<Object> events = new ArrayList<Object>();
ChannelInboundHandlerAdapter handler = new ChannelInboundHandlerAdapter() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
events.add(evt);
}
};
EmbeddedChannel channel = new EmbeddedChannel(idleStateHandler, handler);
try {
// For each expected event advance the ticker and run() the task. Each
// step should yield in an IdleStateEvent because we haven't written
// or read anything from the channel.
for (int i = 0; i < expected.length; i++) {
idleStateHandler.tickRun();
}
assertEquals(expected.length, events.size());
// Compare the expected with the actual IdleStateEvents
for (int i = 0; i < expected.length; i++) {
Object evt = events.get(i);
assertSame("Element " + i + " is not matching", expected[i], evt);
}
} finally {
channel.finishAndReleaseAll();
}
}
@Test
public void testReaderNotIdle() throws Exception {
TestableIdleStateHandler idleStateHandler = new TestableIdleStateHandler(
false, 1L, 0L, 0L, TimeUnit.SECONDS);
Action action = new Action() {
@Override
public void run(EmbeddedChannel channel) throws Exception {
channel.writeInbound("Hello, World!");
}
};
anyNotIdle(idleStateHandler, action, IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT);
}
@Test
public void testWriterNotIdle() throws Exception {
TestableIdleStateHandler idleStateHandler = new TestableIdleStateHandler(
false, 0L, 1L, 0L, TimeUnit.SECONDS);
Action action = new Action() {
@Override
public void run(EmbeddedChannel channel) throws Exception {
channel.writeAndFlush("Hello, World!");
}
};
anyNotIdle(idleStateHandler, action, IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT);
}
@Test
public void testAllNotIdle() throws Exception {
// Reader...
TestableIdleStateHandler idleStateHandler = new TestableIdleStateHandler(
false, 0L, 0L, 1L, TimeUnit.SECONDS);
Action reader = new Action() {
@Override
public void run(EmbeddedChannel channel) throws Exception {
channel.writeInbound("Hello, World!");
}
};
anyNotIdle(idleStateHandler, reader, IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT);
// Writer...
idleStateHandler = new TestableIdleStateHandler(
false, 0L, 0L, 1L, TimeUnit.SECONDS);
Action writer = new Action() {
@Override
public void run(EmbeddedChannel channel) throws Exception {
channel.writeAndFlush("Hello, World!");
}
};
anyNotIdle(idleStateHandler, writer, IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT);
}
private void anyNotIdle(TestableIdleStateHandler idleStateHandler,
Action action, Object expected) throws Exception {
final List<Object> events = new ArrayList<Object>();
ChannelInboundHandlerAdapter handler = new ChannelInboundHandlerAdapter() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
events.add(evt);
}
};
EmbeddedChannel channel = new EmbeddedChannel(idleStateHandler, handler);
try {
idleStateHandler.tick(1L, TimeUnit.NANOSECONDS);
action.run(channel);
// Advance the ticker by some fraction and run() the task.
// There shouldn't be an IdleStateEvent getting fired because
// we've just performed an action on the channel that is meant
// to reset the idle task.
long delayInNanos = idleStateHandler.delay(TimeUnit.NANOSECONDS);
assertNotEquals(0L, delayInNanos);
idleStateHandler.tickRun(delayInNanos / 2L, TimeUnit.NANOSECONDS);
assertEquals(0, events.size());
// Advance the ticker by the full amount and it should yield
// in an IdleStateEvent.
idleStateHandler.tickRun();
assertEquals(1, events.size());
assertSame(expected, events.get(0));
} finally {
channel.finishAndReleaseAll();
}
}
@Test
public void testObserveWriterIdle() throws Exception {
observeOutputIdle(true);
}
@Test
public void testObserveAllIdle() throws Exception {
observeOutputIdle(false);
}
private void observeOutputIdle(boolean writer) throws Exception {
long writerIdleTime = 0L;
long allIdleTime = 0L;
IdleStateEvent expeced = null;
if (writer) {
writerIdleTime = 5L;
expeced = IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT;
} else {
allIdleTime = 5L;
expeced = IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT;
}
TestableIdleStateHandler idleStateHandler = new TestableIdleStateHandler(
true, 0L, writerIdleTime, allIdleTime, TimeUnit.SECONDS);
final List<Object> events = new ArrayList<Object>();
ChannelInboundHandlerAdapter handler = new ChannelInboundHandlerAdapter() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
events.add(evt);
}
};
ObservableChannel channel = new ObservableChannel(idleStateHandler, handler);
try {
// We're writing 3 messages that will be consumed at different rates!
channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[] { 1 }));
channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[] { 2 }));
channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[] { 3 }));
// Establish a baseline. We're not consuming anything and let it idle once.
idleStateHandler.tickRun();
assertEquals(1, events.size());
assertSame(expeced, events.get(0));
events.clear();
// Our ticker should be at second 5
assertEquals(5L, idleStateHandler.tick(TimeUnit.SECONDS));
// Consume one message in 4 seconds, then be idle for 2 seconds,
// then run the task and we shouldn't get an IdleStateEvent because
// we haven't been idle for long enough!
idleStateHandler.tick(4L, TimeUnit.SECONDS);
assertNotNullAndRelease(channel.consume());
idleStateHandler.tickRun(2L, TimeUnit.SECONDS);
assertEquals(0, events.size());
assertEquals(11L, idleStateHandler.tick(TimeUnit.SECONDS)); // 5s + 4s + 2s
// Consume one message in 3 seconds, then be idle for 4 seconds,
// then run the task and we shouldn't get an IdleStateEvent because
// we haven't been idle for long enough!
idleStateHandler.tick(3L, TimeUnit.SECONDS);
assertNotNullAndRelease(channel.consume());
idleStateHandler.tickRun(4L, TimeUnit.SECONDS);
assertEquals(0, events.size());
assertEquals(18L, idleStateHandler.tick(TimeUnit.SECONDS)); // 11s + 3s + 4s
// Don't consume a message and be idle for 5 seconds.
// We should get an IdleStateEvent!
idleStateHandler.tickRun(5L, TimeUnit.SECONDS);
assertEquals(1, events.size());
assertEquals(23L, idleStateHandler.tick(TimeUnit.SECONDS)); // 18s + 5s
events.clear();
// Consume one message in 2 seconds, then be idle for 1 seconds,
// then run the task and we shouldn't get an IdleStateEvent because
// we haven't been idle for long enough!
idleStateHandler.tick(2L, TimeUnit.SECONDS);
assertNotNullAndRelease(channel.consume());
idleStateHandler.tickRun(1L, TimeUnit.SECONDS);
assertEquals(0, events.size());
assertEquals(26L, idleStateHandler.tick(TimeUnit.SECONDS)); // 23s + 2s + 1s
// There are no messages left! Advance the ticker by 3 seconds,
// attempt a consume() but it will be null, then advance the
// ticker by an another 2 seconds and we should get an IdleStateEvent
// because we've been idle for 5 seconds.
idleStateHandler.tick(3L, TimeUnit.SECONDS);
assertNull(channel.consume());
idleStateHandler.tickRun(2L, TimeUnit.SECONDS);
assertEquals(1, events.size());
assertEquals(31L, idleStateHandler.tick(TimeUnit.SECONDS)); // 26s + 3s + 2s
// q.e.d.
} finally {
channel.finishAndReleaseAll();
}
}
private static void assertNotNullAndRelease(Object msg) {
assertNotNull(msg);
ReferenceCountUtil.release(msg);
}
private interface Action {
void run(EmbeddedChannel channel) throws Exception;
}
private static class TestableIdleStateHandler extends IdleStateHandler {
private Runnable task;
private long delayInNanos;
private long ticksInNanos;
public TestableIdleStateHandler(boolean observeOutput,
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
super(observeOutput, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
public long delay(TimeUnit unit) {
return unit.convert(delayInNanos, TimeUnit.NANOSECONDS);
}
public void run() {
task.run();
}
public void tickRun() {
tickRun(delayInNanos, TimeUnit.NANOSECONDS);
}
public void tickRun(long delay, TimeUnit unit) {
tick(delay, unit);
run();
}
/**
* Advances the current ticker by the given amount.
*/
public void tick(long delay, TimeUnit unit) {
ticksInNanos += unit.toNanos(delay);
}
/**
* Returns {@link #ticksInNanos()} in the given {@link TimeUnit}.
*/
public long tick(TimeUnit unit) {
return unit.convert(ticksInNanos(), TimeUnit.NANOSECONDS);
}
@Override
long ticksInNanos() {
return ticksInNanos;
}
@Override
ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
this.task = task;
this.delayInNanos = unit.toNanos(delay);
return null;
}
}
private static class ObservableChannel extends EmbeddedChannel {
public ObservableChannel(ChannelHandler... handlers) {
super(handlers);
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
// Overridden to change EmbeddedChannel's default behavior. We went to keep
// the messages in the ChannelOutboundBuffer.
}
public Object consume() {
ChannelOutboundBuffer buf = unsafe().outboundBuffer();
if (buf != null) {
Object msg = buf.current();
if (msg != null) {
ReferenceCountUtil.retain(msg);
buf.remove();
return msg;
}
}
return null;
}
}
}