Replace usage of System.currentTimeMillis() with System.nanoTime()
Motivation: Currently we use System.currentTimeMillis() in our timeout handlers this is bad for various reasons like when the clock adjusts etc. Modifications: Replace System.currentTimeMillis() with System.nanoTime() Result: More robust timeout handling
This commit is contained in:
parent
17ba35b6d0
commit
cf2c8b40ae
@ -95,10 +95,11 @@ import java.util.concurrent.TimeUnit;
|
||||
* @see WriteTimeoutHandler
|
||||
*/
|
||||
public class IdleStateHandler extends ChannelDuplexHandler {
|
||||
private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
|
||||
|
||||
private final long readerIdleTimeMillis;
|
||||
private final long writerIdleTimeMillis;
|
||||
private final long allIdleTimeMillis;
|
||||
private final long readerIdleTimeNanos;
|
||||
private final long writerIdleTimeNanos;
|
||||
private final long allIdleTimeNanos;
|
||||
|
||||
volatile ScheduledFuture<?> readerIdleTimeout;
|
||||
volatile long lastReadTime;
|
||||
@ -165,19 +166,19 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
||||
}
|
||||
|
||||
if (readerIdleTime <= 0) {
|
||||
readerIdleTimeMillis = 0;
|
||||
readerIdleTimeNanos = 0;
|
||||
} else {
|
||||
readerIdleTimeMillis = Math.max(unit.toMillis(readerIdleTime), 1);
|
||||
readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
|
||||
}
|
||||
if (writerIdleTime <= 0) {
|
||||
writerIdleTimeMillis = 0;
|
||||
writerIdleTimeNanos = 0;
|
||||
} else {
|
||||
writerIdleTimeMillis = Math.max(unit.toMillis(writerIdleTime), 1);
|
||||
writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
|
||||
}
|
||||
if (allIdleTime <= 0) {
|
||||
allIdleTimeMillis = 0;
|
||||
allIdleTimeNanos = 0;
|
||||
} else {
|
||||
allIdleTimeMillis = Math.max(unit.toMillis(allIdleTime), 1);
|
||||
allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
|
||||
}
|
||||
}
|
||||
|
||||
@ -186,7 +187,7 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
||||
*
|
||||
*/
|
||||
public long getReaderIdleTimeInMillis() {
|
||||
return readerIdleTimeMillis;
|
||||
return TimeUnit.NANOSECONDS.toMillis(readerIdleTimeNanos);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -194,7 +195,7 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
||||
*
|
||||
*/
|
||||
public long getWriterIdleTimeInMillis() {
|
||||
return writerIdleTimeMillis;
|
||||
return TimeUnit.NANOSECONDS.toMillis(writerIdleTimeNanos);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -202,7 +203,7 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
||||
*
|
||||
*/
|
||||
public long getAllIdleTimeInMillis() {
|
||||
return allIdleTimeMillis;
|
||||
return TimeUnit.NANOSECONDS.toMillis(allIdleTimeNanos);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -248,7 +249,7 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
lastReadTime = System.currentTimeMillis();
|
||||
lastReadTime = System.nanoTime();
|
||||
firstReaderIdleEvent = firstAllIdleEvent = true;
|
||||
ctx.fireChannelRead(msg);
|
||||
}
|
||||
@ -258,7 +259,7 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
||||
promise.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
lastWriteTime = System.currentTimeMillis();
|
||||
lastWriteTime = System.nanoTime();
|
||||
firstWriterIdleEvent = firstAllIdleEvent = true;
|
||||
}
|
||||
});
|
||||
@ -278,21 +279,21 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
||||
|
||||
EventExecutor loop = ctx.executor();
|
||||
|
||||
lastReadTime = lastWriteTime = System.currentTimeMillis();
|
||||
if (readerIdleTimeMillis > 0) {
|
||||
lastReadTime = lastWriteTime = System.nanoTime();
|
||||
if (readerIdleTimeNanos > 0) {
|
||||
readerIdleTimeout = loop.schedule(
|
||||
new ReaderIdleTimeoutTask(ctx),
|
||||
readerIdleTimeMillis, TimeUnit.MILLISECONDS);
|
||||
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
if (writerIdleTimeMillis > 0) {
|
||||
if (writerIdleTimeNanos > 0) {
|
||||
writerIdleTimeout = loop.schedule(
|
||||
new WriterIdleTimeoutTask(ctx),
|
||||
writerIdleTimeMillis, TimeUnit.MILLISECONDS);
|
||||
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
if (allIdleTimeMillis > 0) {
|
||||
if (allIdleTimeNanos > 0) {
|
||||
allIdleTimeout = loop.schedule(
|
||||
new AllIdleTimeoutTask(ctx),
|
||||
allIdleTimeMillis, TimeUnit.MILLISECONDS);
|
||||
allIdleTimeNanos, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
@ -335,13 +336,13 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
||||
return;
|
||||
}
|
||||
|
||||
long currentTime = System.currentTimeMillis();
|
||||
long currentTime = System.nanoTime();
|
||||
long lastReadTime = IdleStateHandler.this.lastReadTime;
|
||||
long nextDelay = readerIdleTimeMillis - (currentTime - lastReadTime);
|
||||
long nextDelay = readerIdleTimeNanos - (currentTime - lastReadTime);
|
||||
if (nextDelay <= 0) {
|
||||
// Reader is idle - set a new timeout and notify the callback.
|
||||
readerIdleTimeout =
|
||||
ctx.executor().schedule(this, readerIdleTimeMillis, TimeUnit.MILLISECONDS);
|
||||
ctx.executor().schedule(this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
|
||||
try {
|
||||
IdleStateEvent event;
|
||||
if (firstReaderIdleEvent) {
|
||||
@ -356,7 +357,7 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
||||
}
|
||||
} else {
|
||||
// Read occurred before the timeout - set a new timeout with shorter delay.
|
||||
readerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.MILLISECONDS);
|
||||
readerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -375,13 +376,13 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
||||
return;
|
||||
}
|
||||
|
||||
long currentTime = System.currentTimeMillis();
|
||||
long currentTime = System.nanoTime();
|
||||
long lastWriteTime = IdleStateHandler.this.lastWriteTime;
|
||||
long nextDelay = writerIdleTimeMillis - (currentTime - lastWriteTime);
|
||||
long nextDelay = writerIdleTimeNanos - (currentTime - lastWriteTime);
|
||||
if (nextDelay <= 0) {
|
||||
// Writer is idle - set a new timeout and notify the callback.
|
||||
writerIdleTimeout = ctx.executor().schedule(
|
||||
this, writerIdleTimeMillis, TimeUnit.MILLISECONDS);
|
||||
this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
|
||||
try {
|
||||
IdleStateEvent event;
|
||||
if (firstWriterIdleEvent) {
|
||||
@ -396,7 +397,7 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
||||
}
|
||||
} else {
|
||||
// Write occurred before the timeout - set a new timeout with shorter delay.
|
||||
writerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.MILLISECONDS);
|
||||
writerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -415,14 +416,14 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
||||
return;
|
||||
}
|
||||
|
||||
long currentTime = System.currentTimeMillis();
|
||||
long currentTime = System.nanoTime();
|
||||
long lastIoTime = Math.max(lastReadTime, lastWriteTime);
|
||||
long nextDelay = allIdleTimeMillis - (currentTime - lastIoTime);
|
||||
long nextDelay = allIdleTimeNanos - (currentTime - lastIoTime);
|
||||
if (nextDelay <= 0) {
|
||||
// Both reader and writer are idle - set a new timeout and
|
||||
// notify the callback.
|
||||
allIdleTimeout = ctx.executor().schedule(
|
||||
this, allIdleTimeMillis, TimeUnit.MILLISECONDS);
|
||||
this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
|
||||
try {
|
||||
IdleStateEvent event;
|
||||
if (firstAllIdleEvent) {
|
||||
@ -438,7 +439,7 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
||||
} else {
|
||||
// Either read or write occurred before the timeout - set a new
|
||||
// timeout with shorter delay.
|
||||
allIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.MILLISECONDS);
|
||||
allIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -62,8 +62,9 @@ import java.util.concurrent.TimeUnit;
|
||||
* @see IdleStateHandler
|
||||
*/
|
||||
public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter {
|
||||
private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
|
||||
|
||||
private final long timeoutMillis;
|
||||
private final long timeoutNanos;
|
||||
|
||||
private volatile ScheduledFuture<?> timeout;
|
||||
private volatile long lastReadTime;
|
||||
@ -96,9 +97,9 @@ public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter {
|
||||
}
|
||||
|
||||
if (timeout <= 0) {
|
||||
timeoutMillis = 0;
|
||||
timeoutNanos = 0;
|
||||
} else {
|
||||
timeoutMillis = Math.max(unit.toMillis(timeout), 1);
|
||||
timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS);
|
||||
}
|
||||
}
|
||||
|
||||
@ -145,7 +146,7 @@ public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter {
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
lastReadTime = System.currentTimeMillis();
|
||||
lastReadTime = System.nanoTime();
|
||||
ctx.fireChannelRead(msg);
|
||||
}
|
||||
|
||||
@ -160,11 +161,11 @@ public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter {
|
||||
|
||||
state = 1;
|
||||
|
||||
lastReadTime = System.currentTimeMillis();
|
||||
if (timeoutMillis > 0) {
|
||||
lastReadTime = System.nanoTime();
|
||||
if (timeoutNanos > 0) {
|
||||
timeout = ctx.executor().schedule(
|
||||
new ReadTimeoutTask(ctx),
|
||||
timeoutMillis, TimeUnit.MILLISECONDS);
|
||||
timeoutNanos, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
@ -202,11 +203,11 @@ public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter {
|
||||
return;
|
||||
}
|
||||
|
||||
long currentTime = System.currentTimeMillis();
|
||||
long nextDelay = timeoutMillis - (currentTime - lastReadTime);
|
||||
long currentTime = System.nanoTime();
|
||||
long nextDelay = timeoutNanos - (currentTime - lastReadTime);
|
||||
if (nextDelay <= 0) {
|
||||
// Read timed out - set a new timeout and notify the callback.
|
||||
timeout = ctx.executor().schedule(this, timeoutMillis, TimeUnit.MILLISECONDS);
|
||||
timeout = ctx.executor().schedule(this, timeoutNanos, TimeUnit.NANOSECONDS);
|
||||
try {
|
||||
readTimedOut(ctx);
|
||||
} catch (Throwable t) {
|
||||
@ -214,7 +215,7 @@ public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter {
|
||||
}
|
||||
} else {
|
||||
// Read occurred before the timeout - set a new timeout with shorter delay.
|
||||
timeout = ctx.executor().schedule(this, nextDelay, TimeUnit.MILLISECONDS);
|
||||
timeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -65,8 +65,9 @@ import java.util.concurrent.TimeUnit;
|
||||
* @see IdleStateHandler
|
||||
*/
|
||||
public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
|
||||
private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
|
||||
|
||||
private final long timeoutMillis;
|
||||
private final long timeoutNanos;
|
||||
|
||||
private boolean closed;
|
||||
|
||||
@ -94,9 +95,9 @@ public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
|
||||
}
|
||||
|
||||
if (timeout <= 0) {
|
||||
timeoutMillis = 0;
|
||||
timeoutNanos = 0;
|
||||
} else {
|
||||
timeoutMillis = Math.max(unit.toMillis(timeout), 1);
|
||||
timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS);
|
||||
}
|
||||
}
|
||||
|
||||
@ -107,7 +108,7 @@ public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
|
||||
}
|
||||
|
||||
private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise future) {
|
||||
if (timeoutMillis > 0) {
|
||||
if (timeoutNanos > 0) {
|
||||
// Schedule a timeout.
|
||||
final ScheduledFuture<?> sf = ctx.executor().schedule(new Runnable() {
|
||||
@Override
|
||||
@ -123,7 +124,7 @@ public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
|
||||
}
|
||||
}
|
||||
}
|
||||
}, timeoutMillis, TimeUnit.MILLISECONDS);
|
||||
}, timeoutNanos, TimeUnit.NANOSECONDS);
|
||||
|
||||
// Cancel the scheduled timeout if the flush future is complete.
|
||||
future.addListener(new ChannelFutureListener() {
|
||||
|
Loading…
Reference in New Issue
Block a user