Replace usage of System.currentTimeMillis() with System.nanoTime()
Motivation: Currently we use System.currentTimeMillis() in our timeout handlers this is bad for various reasons like when the clock adjusts etc. Modifications: Replace System.currentTimeMillis() with System.nanoTime() Result: More robust timeout handling
This commit is contained in:
parent
1bce46dbb3
commit
9856563144
@ -95,10 +95,11 @@ import java.util.concurrent.TimeUnit;
|
|||||||
* @see WriteTimeoutHandler
|
* @see WriteTimeoutHandler
|
||||||
*/
|
*/
|
||||||
public class IdleStateHandler extends ChannelDuplexHandler {
|
public class IdleStateHandler extends ChannelDuplexHandler {
|
||||||
|
private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
|
||||||
|
|
||||||
private final long readerIdleTimeMillis;
|
private final long readerIdleTimeNanos;
|
||||||
private final long writerIdleTimeMillis;
|
private final long writerIdleTimeNanos;
|
||||||
private final long allIdleTimeMillis;
|
private final long allIdleTimeNanos;
|
||||||
|
|
||||||
volatile ScheduledFuture<?> readerIdleTimeout;
|
volatile ScheduledFuture<?> readerIdleTimeout;
|
||||||
volatile long lastReadTime;
|
volatile long lastReadTime;
|
||||||
@ -165,19 +166,19 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (readerIdleTime <= 0) {
|
if (readerIdleTime <= 0) {
|
||||||
readerIdleTimeMillis = 0;
|
readerIdleTimeNanos = 0;
|
||||||
} else {
|
} else {
|
||||||
readerIdleTimeMillis = Math.max(unit.toMillis(readerIdleTime), 1);
|
readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
|
||||||
}
|
}
|
||||||
if (writerIdleTime <= 0) {
|
if (writerIdleTime <= 0) {
|
||||||
writerIdleTimeMillis = 0;
|
writerIdleTimeNanos = 0;
|
||||||
} else {
|
} else {
|
||||||
writerIdleTimeMillis = Math.max(unit.toMillis(writerIdleTime), 1);
|
writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
|
||||||
}
|
}
|
||||||
if (allIdleTime <= 0) {
|
if (allIdleTime <= 0) {
|
||||||
allIdleTimeMillis = 0;
|
allIdleTimeNanos = 0;
|
||||||
} else {
|
} else {
|
||||||
allIdleTimeMillis = Math.max(unit.toMillis(allIdleTime), 1);
|
allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -186,7 +187,7 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public long getReaderIdleTimeInMillis() {
|
public long getReaderIdleTimeInMillis() {
|
||||||
return readerIdleTimeMillis;
|
return TimeUnit.NANOSECONDS.toMillis(readerIdleTimeNanos);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -194,7 +195,7 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public long getWriterIdleTimeInMillis() {
|
public long getWriterIdleTimeInMillis() {
|
||||||
return writerIdleTimeMillis;
|
return TimeUnit.NANOSECONDS.toMillis(writerIdleTimeNanos);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -202,7 +203,7 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public long getAllIdleTimeInMillis() {
|
public long getAllIdleTimeInMillis() {
|
||||||
return allIdleTimeMillis;
|
return TimeUnit.NANOSECONDS.toMillis(allIdleTimeNanos);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -248,7 +249,7 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||||
lastReadTime = System.currentTimeMillis();
|
lastReadTime = System.nanoTime();
|
||||||
firstReaderIdleEvent = firstAllIdleEvent = true;
|
firstReaderIdleEvent = firstAllIdleEvent = true;
|
||||||
ctx.fireChannelRead(msg);
|
ctx.fireChannelRead(msg);
|
||||||
}
|
}
|
||||||
@ -258,7 +259,7 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
|||||||
promise.addListener(new ChannelFutureListener() {
|
promise.addListener(new ChannelFutureListener() {
|
||||||
@Override
|
@Override
|
||||||
public void operationComplete(ChannelFuture future) throws Exception {
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
lastWriteTime = System.currentTimeMillis();
|
lastWriteTime = System.nanoTime();
|
||||||
firstWriterIdleEvent = firstAllIdleEvent = true;
|
firstWriterIdleEvent = firstAllIdleEvent = true;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -278,21 +279,21 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
|||||||
|
|
||||||
EventExecutor loop = ctx.executor();
|
EventExecutor loop = ctx.executor();
|
||||||
|
|
||||||
lastReadTime = lastWriteTime = System.currentTimeMillis();
|
lastReadTime = lastWriteTime = System.nanoTime();
|
||||||
if (readerIdleTimeMillis > 0) {
|
if (readerIdleTimeNanos > 0) {
|
||||||
readerIdleTimeout = loop.schedule(
|
readerIdleTimeout = loop.schedule(
|
||||||
new ReaderIdleTimeoutTask(ctx),
|
new ReaderIdleTimeoutTask(ctx),
|
||||||
readerIdleTimeMillis, TimeUnit.MILLISECONDS);
|
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
|
||||||
}
|
}
|
||||||
if (writerIdleTimeMillis > 0) {
|
if (writerIdleTimeNanos > 0) {
|
||||||
writerIdleTimeout = loop.schedule(
|
writerIdleTimeout = loop.schedule(
|
||||||
new WriterIdleTimeoutTask(ctx),
|
new WriterIdleTimeoutTask(ctx),
|
||||||
writerIdleTimeMillis, TimeUnit.MILLISECONDS);
|
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
|
||||||
}
|
}
|
||||||
if (allIdleTimeMillis > 0) {
|
if (allIdleTimeNanos > 0) {
|
||||||
allIdleTimeout = loop.schedule(
|
allIdleTimeout = loop.schedule(
|
||||||
new AllIdleTimeoutTask(ctx),
|
new AllIdleTimeoutTask(ctx),
|
||||||
allIdleTimeMillis, TimeUnit.MILLISECONDS);
|
allIdleTimeNanos, TimeUnit.NANOSECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -335,13 +336,13 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
long currentTime = System.currentTimeMillis();
|
long currentTime = System.nanoTime();
|
||||||
long lastReadTime = IdleStateHandler.this.lastReadTime;
|
long lastReadTime = IdleStateHandler.this.lastReadTime;
|
||||||
long nextDelay = readerIdleTimeMillis - (currentTime - lastReadTime);
|
long nextDelay = readerIdleTimeNanos - (currentTime - lastReadTime);
|
||||||
if (nextDelay <= 0) {
|
if (nextDelay <= 0) {
|
||||||
// Reader is idle - set a new timeout and notify the callback.
|
// Reader is idle - set a new timeout and notify the callback.
|
||||||
readerIdleTimeout =
|
readerIdleTimeout =
|
||||||
ctx.executor().schedule(this, readerIdleTimeMillis, TimeUnit.MILLISECONDS);
|
ctx.executor().schedule(this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
|
||||||
try {
|
try {
|
||||||
IdleStateEvent event;
|
IdleStateEvent event;
|
||||||
if (firstReaderIdleEvent) {
|
if (firstReaderIdleEvent) {
|
||||||
@ -356,7 +357,7 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Read occurred before the timeout - set a new timeout with shorter delay.
|
// Read occurred before the timeout - set a new timeout with shorter delay.
|
||||||
readerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.MILLISECONDS);
|
readerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -375,13 +376,13 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
long currentTime = System.currentTimeMillis();
|
long currentTime = System.nanoTime();
|
||||||
long lastWriteTime = IdleStateHandler.this.lastWriteTime;
|
long lastWriteTime = IdleStateHandler.this.lastWriteTime;
|
||||||
long nextDelay = writerIdleTimeMillis - (currentTime - lastWriteTime);
|
long nextDelay = writerIdleTimeNanos - (currentTime - lastWriteTime);
|
||||||
if (nextDelay <= 0) {
|
if (nextDelay <= 0) {
|
||||||
// Writer is idle - set a new timeout and notify the callback.
|
// Writer is idle - set a new timeout and notify the callback.
|
||||||
writerIdleTimeout = ctx.executor().schedule(
|
writerIdleTimeout = ctx.executor().schedule(
|
||||||
this, writerIdleTimeMillis, TimeUnit.MILLISECONDS);
|
this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
|
||||||
try {
|
try {
|
||||||
IdleStateEvent event;
|
IdleStateEvent event;
|
||||||
if (firstWriterIdleEvent) {
|
if (firstWriterIdleEvent) {
|
||||||
@ -396,7 +397,7 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Write occurred before the timeout - set a new timeout with shorter delay.
|
// Write occurred before the timeout - set a new timeout with shorter delay.
|
||||||
writerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.MILLISECONDS);
|
writerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -415,14 +416,14 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
long currentTime = System.currentTimeMillis();
|
long currentTime = System.nanoTime();
|
||||||
long lastIoTime = Math.max(lastReadTime, lastWriteTime);
|
long lastIoTime = Math.max(lastReadTime, lastWriteTime);
|
||||||
long nextDelay = allIdleTimeMillis - (currentTime - lastIoTime);
|
long nextDelay = allIdleTimeNanos - (currentTime - lastIoTime);
|
||||||
if (nextDelay <= 0) {
|
if (nextDelay <= 0) {
|
||||||
// Both reader and writer are idle - set a new timeout and
|
// Both reader and writer are idle - set a new timeout and
|
||||||
// notify the callback.
|
// notify the callback.
|
||||||
allIdleTimeout = ctx.executor().schedule(
|
allIdleTimeout = ctx.executor().schedule(
|
||||||
this, allIdleTimeMillis, TimeUnit.MILLISECONDS);
|
this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
|
||||||
try {
|
try {
|
||||||
IdleStateEvent event;
|
IdleStateEvent event;
|
||||||
if (firstAllIdleEvent) {
|
if (firstAllIdleEvent) {
|
||||||
@ -438,7 +439,7 @@ public class IdleStateHandler extends ChannelDuplexHandler {
|
|||||||
} else {
|
} else {
|
||||||
// Either read or write occurred before the timeout - set a new
|
// Either read or write occurred before the timeout - set a new
|
||||||
// timeout with shorter delay.
|
// timeout with shorter delay.
|
||||||
allIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.MILLISECONDS);
|
allIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -62,8 +62,9 @@ import java.util.concurrent.TimeUnit;
|
|||||||
* @see IdleStateHandler
|
* @see IdleStateHandler
|
||||||
*/
|
*/
|
||||||
public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter {
|
public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter {
|
||||||
|
private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
|
||||||
|
|
||||||
private final long timeoutMillis;
|
private final long timeoutNanos;
|
||||||
|
|
||||||
private volatile ScheduledFuture<?> timeout;
|
private volatile ScheduledFuture<?> timeout;
|
||||||
private volatile long lastReadTime;
|
private volatile long lastReadTime;
|
||||||
@ -96,9 +97,9 @@ public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (timeout <= 0) {
|
if (timeout <= 0) {
|
||||||
timeoutMillis = 0;
|
timeoutNanos = 0;
|
||||||
} else {
|
} else {
|
||||||
timeoutMillis = Math.max(unit.toMillis(timeout), 1);
|
timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -145,7 +146,7 @@ public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||||
lastReadTime = System.currentTimeMillis();
|
lastReadTime = System.nanoTime();
|
||||||
ctx.fireChannelRead(msg);
|
ctx.fireChannelRead(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -160,11 +161,11 @@ public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter {
|
|||||||
|
|
||||||
state = 1;
|
state = 1;
|
||||||
|
|
||||||
lastReadTime = System.currentTimeMillis();
|
lastReadTime = System.nanoTime();
|
||||||
if (timeoutMillis > 0) {
|
if (timeoutNanos > 0) {
|
||||||
timeout = ctx.executor().schedule(
|
timeout = ctx.executor().schedule(
|
||||||
new ReadTimeoutTask(ctx),
|
new ReadTimeoutTask(ctx),
|
||||||
timeoutMillis, TimeUnit.MILLISECONDS);
|
timeoutNanos, TimeUnit.NANOSECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -202,11 +203,11 @@ public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
long currentTime = System.currentTimeMillis();
|
long currentTime = System.nanoTime();
|
||||||
long nextDelay = timeoutMillis - (currentTime - lastReadTime);
|
long nextDelay = timeoutNanos - (currentTime - lastReadTime);
|
||||||
if (nextDelay <= 0) {
|
if (nextDelay <= 0) {
|
||||||
// Read timed out - set a new timeout and notify the callback.
|
// Read timed out - set a new timeout and notify the callback.
|
||||||
timeout = ctx.executor().schedule(this, timeoutMillis, TimeUnit.MILLISECONDS);
|
timeout = ctx.executor().schedule(this, timeoutNanos, TimeUnit.NANOSECONDS);
|
||||||
try {
|
try {
|
||||||
readTimedOut(ctx);
|
readTimedOut(ctx);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
@ -214,7 +215,7 @@ public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter {
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Read occurred before the timeout - set a new timeout with shorter delay.
|
// Read occurred before the timeout - set a new timeout with shorter delay.
|
||||||
timeout = ctx.executor().schedule(this, nextDelay, TimeUnit.MILLISECONDS);
|
timeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -65,8 +65,9 @@ import java.util.concurrent.TimeUnit;
|
|||||||
* @see IdleStateHandler
|
* @see IdleStateHandler
|
||||||
*/
|
*/
|
||||||
public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
|
public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
|
||||||
|
private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
|
||||||
|
|
||||||
private final long timeoutMillis;
|
private final long timeoutNanos;
|
||||||
|
|
||||||
private boolean closed;
|
private boolean closed;
|
||||||
|
|
||||||
@ -94,9 +95,9 @@ public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (timeout <= 0) {
|
if (timeout <= 0) {
|
||||||
timeoutMillis = 0;
|
timeoutNanos = 0;
|
||||||
} else {
|
} else {
|
||||||
timeoutMillis = Math.max(unit.toMillis(timeout), 1);
|
timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -107,7 +108,7 @@ public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise future) {
|
private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise future) {
|
||||||
if (timeoutMillis > 0) {
|
if (timeoutNanos > 0) {
|
||||||
// Schedule a timeout.
|
// Schedule a timeout.
|
||||||
final ScheduledFuture<?> sf = ctx.executor().schedule(new Runnable() {
|
final ScheduledFuture<?> sf = ctx.executor().schedule(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
@ -123,7 +124,7 @@ public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}, timeoutMillis, TimeUnit.MILLISECONDS);
|
}, timeoutNanos, TimeUnit.NANOSECONDS);
|
||||||
|
|
||||||
// Cancel the scheduled timeout if the flush future is complete.
|
// Cancel the scheduled timeout if the flush future is complete.
|
||||||
future.addListener(new ChannelFutureListener() {
|
future.addListener(new ChannelFutureListener() {
|
||||||
|
Loading…
Reference in New Issue
Block a user