javadoc fix and better cleanup for WriteTimeoutHandler

Motivation:

- Javadoc is not correct (#4353)
- WriteTimeoutHandler does not always cancel the timeout task (#2973)

Modifications:

Fix the javadoc and cleanup timeout task in handlerRemoved

Result:

WriteTimeoutHandler's javadoc describes the correct behavior and it will cancel timeout tasks when it's removed.
This commit is contained in:
Xiaoyan Lin 2015-12-28 00:33:19 -08:00 committed by Norman Maurer
parent da17b44fb2
commit 59a7c1288e

View File

@ -30,12 +30,10 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* Raises a {@link WriteTimeoutException} when no data was written within a
* certain period of time.
* Raises a {@link WriteTimeoutException} when a write operation cannot finish in a certain period of time.
*
* <pre>
* // The connection is closed when there is no outbound traffic
* // for 30 seconds.
* // The connection is closed when a write operation cannot finish in 30 seconds.
*
* public class MyChannelInitializer extends {@link ChannelInitializer}&lt;{@link Channel}&gt; {
* public void initChannel({@link Channel} channel) {
@ -70,6 +68,11 @@ public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
private final long timeoutNanos;
/**
* A doubly-linked list to track all WriteTimeoutTasks
*/
private WriteTimeoutTask lastTask;
private boolean closed;
/**
@ -108,33 +111,62 @@ public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
ctx.write(msg, promise);
}
private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise future) {
if (timeoutNanos > 0) {
// Schedule a timeout.
final ScheduledFuture<?> sf = ctx.executor().schedule(new OneTimeTask() {
@Override
public void run() {
// Was not written yet so issue a write timeout
// The future itself will be failed with a ClosedChannelException once the close() was issued
// See https://github.com/netty/netty/issues/2159
if (!future.isDone()) {
try {
writeTimedOut(ctx);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
WriteTimeoutTask task = lastTask;
lastTask = null;
while (task != null) {
task.scheduledFuture.cancel(false);
WriteTimeoutTask prev = task.prev;
task.prev = null;
task.next = null;
task = prev;
}
}
}
}, timeoutNanos, TimeUnit.NANOSECONDS);
// Cancel the scheduled timeout if the flush future is complete.
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
sf.cancel(false);
private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise promise) {
// Schedule a timeout.
final WriteTimeoutTask task = new WriteTimeoutTask(ctx, promise);
task.scheduledFuture = ctx.executor().schedule(task, timeoutNanos, TimeUnit.NANOSECONDS);
if (!task.scheduledFuture.isDone()) {
addWriteTimeoutTask(task);
// Cancel the scheduled timeout if the flush promise is complete.
promise.addListener(task);
}
});
}
private void addWriteTimeoutTask(WriteTimeoutTask task) {
if (lastTask == null) {
lastTask = task;
} else {
lastTask.next = task;
task.prev = lastTask;
lastTask = task;
}
}
private void removeWriteTimeoutTask(WriteTimeoutTask task) {
if (task == lastTask) {
// task is the tail of list
assert task.next == null;
lastTask = lastTask.prev;
if (lastTask != null) {
lastTask.next = null;
}
} else if (task.prev == null && task.next == null) {
// Since task is not lastTask, then it has been removed or not been added.
return;
} else if (task.prev == null) {
// task is the head of list and the list has at least 2 nodes
task.next.prev = null;
} else {
task.prev.next = task.next;
task.next.prev = task.prev;
}
task.prev = null;
task.next = null;
}
/**
@ -147,4 +179,43 @@ public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
closed = true;
}
}
private final class WriteTimeoutTask extends OneTimeTask implements ChannelFutureListener {
private final ChannelHandlerContext ctx;
private final ChannelPromise promise;
// WriteTimeoutTask is also a node of a doubly-linked list
WriteTimeoutTask prev;
WriteTimeoutTask next;
ScheduledFuture<?> scheduledFuture;
WriteTimeoutTask(ChannelHandlerContext ctx, ChannelPromise promise) {
this.ctx = ctx;
this.promise = promise;
}
@Override
public void run() {
// Was not written yet so issue a write timeout
// The promise itself will be failed with a ClosedChannelException once the close() was issued
// See https://github.com/netty/netty/issues/2159
if (!promise.isDone()) {
try {
writeTimedOut(ctx);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
}
removeWriteTimeoutTask(this);
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// scheduledFuture has already be set when reaching here
scheduledFuture.cancel(false);
removeWriteTimeoutTask(this);
}
}
}