Make sure cancelled Timeouts are able to be GC'ed fast.
Motivation: At the moment the HashedWheelTimer will only remove the cancelled Timeouts once the HashedWheelBucket is processed again. Until this the instance will not be able to be GC'ed as there are still strong referenced to it even if the user not reference it by himself/herself. This can cause to waste a lot of memory even if the Timeout was cancelled before. Modification: Add a new queue which holds CancelTasks that will be processed on each tick to remove cancelled Timeouts. Because all of this is done only by the WorkerThread there is no need for synchronization and only one extra object creation is needed when cancel() is executed. For addTimeout(...) no new overhead is introduced. Result: Less memory usage for cancelled Timeouts.
This commit is contained in:
parent
8180f7922f
commit
3d81afb8a5
@ -106,6 +106,7 @@ public class HashedWheelTimer implements Timer {
|
|||||||
private final int mask;
|
private final int mask;
|
||||||
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
|
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
|
||||||
private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
|
private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
|
||||||
|
|
||||||
private volatile long startTime;
|
private volatile long startTime;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -388,6 +389,13 @@ public class HashedWheelTimer implements Timer {
|
|||||||
// all processed
|
// all processed
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED
|
||||||
|
|| !timeout.compareAndSetState(HashedWheelTimeout.ST_INIT, HashedWheelTimeout.ST_IN_BUCKET)) {
|
||||||
|
// Was cancelled in the meantime. So just remove it and continue with next HashedWheelTimeout
|
||||||
|
// in the queue
|
||||||
|
timeout.remove();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
long calculated = timeout.deadline / tickDuration;
|
long calculated = timeout.deadline / tickDuration;
|
||||||
long remainingRounds = (calculated - tick) / wheel.length;
|
long remainingRounds = (calculated - tick) / wheel.length;
|
||||||
timeout.remainingRounds = remainingRounds;
|
timeout.remainingRounds = remainingRounds;
|
||||||
@ -448,8 +456,9 @@ public class HashedWheelTimer implements Timer {
|
|||||||
implements Timeout {
|
implements Timeout {
|
||||||
|
|
||||||
private static final int ST_INIT = 0;
|
private static final int ST_INIT = 0;
|
||||||
private static final int ST_CANCELLED = 1;
|
private static final int ST_IN_BUCKET = 1;
|
||||||
private static final int ST_EXPIRED = 2;
|
private static final int ST_CANCELLED = 2;
|
||||||
|
private static final int ST_EXPIRED = 3;
|
||||||
private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER;
|
private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
@ -477,6 +486,9 @@ public class HashedWheelTimer implements Timer {
|
|||||||
HashedWheelTimeout next;
|
HashedWheelTimeout next;
|
||||||
HashedWheelTimeout prev;
|
HashedWheelTimeout prev;
|
||||||
|
|
||||||
|
// The bucket to which the timeout was added
|
||||||
|
HashedWheelBucket bucket;
|
||||||
|
|
||||||
HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
|
HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
|
||||||
this.timer = timer;
|
this.timer = timer;
|
||||||
this.task = task;
|
this.task = task;
|
||||||
@ -495,21 +507,50 @@ public class HashedWheelTimer implements Timer {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean cancel() {
|
public boolean cancel() {
|
||||||
// only update the state it will be removed from HashedWheelBucket on next tick.
|
int state = state();
|
||||||
if (!STATE_UPDATER.compareAndSet(this, ST_INIT, ST_CANCELLED)) {
|
if (state >= ST_CANCELLED) {
|
||||||
|
// fail fast if the task was cancelled or expired before.
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
if (state != ST_IN_BUCKET && compareAndSetState(ST_INIT, ST_CANCELLED)) {
|
||||||
|
// Was cancelled before the HashedWheelTimeout was added to its HashedWheelBucket.
|
||||||
|
// In this case we can just return here as it will be discarded by the WorkerThread when handling
|
||||||
|
// the adding of HashedWheelTimeout to the HashedWheelBuckets.
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
// only update the state it will be removed from HashedWheelBucket on next tick.
|
||||||
|
if (!compareAndSetState(ST_IN_BUCKET, ST_CANCELLED)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// Add the HashedWheelTimeout back to the timeouts queue so it will be picked up on the next tick
|
||||||
|
// and remove this HashedTimeTask from the HashedWheelBucket. After this is done it is ready to get
|
||||||
|
// GC'ed once the user has no reference to it anymore.
|
||||||
|
timer.timeouts.add(this);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void remove() {
|
||||||
|
if (bucket != null) {
|
||||||
|
bucket.remove(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean compareAndSetState(int expected, int state) {
|
||||||
|
return STATE_UPDATER.compareAndSet(this, expected, state);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int state() {
|
||||||
|
return state;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isCancelled() {
|
public boolean isCancelled() {
|
||||||
return STATE_UPDATER.get(this) == ST_CANCELLED;
|
return state() == ST_CANCELLED;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isExpired() {
|
public boolean isExpired() {
|
||||||
return STATE_UPDATER.get(this) != ST_INIT;
|
return state() > ST_IN_BUCKET;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -518,7 +559,8 @@ public class HashedWheelTimer implements Timer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void expire() {
|
public void expire() {
|
||||||
if (!STATE_UPDATER.compareAndSet(this, ST_INIT, ST_EXPIRED)) {
|
if (!compareAndSetState(ST_IN_BUCKET, ST_EXPIRED)) {
|
||||||
|
assert state() != ST_INIT;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -577,6 +619,8 @@ public class HashedWheelTimer implements Timer {
|
|||||||
* Add {@link HashedWheelTimeout} to this bucket.
|
* Add {@link HashedWheelTimeout} to this bucket.
|
||||||
*/
|
*/
|
||||||
public void addTimeout(HashedWheelTimeout timeout) {
|
public void addTimeout(HashedWheelTimeout timeout) {
|
||||||
|
assert timeout.bucket == null;
|
||||||
|
timeout.bucket = this;
|
||||||
if (head == null) {
|
if (head == null) {
|
||||||
head = tail = timeout;
|
head = tail = timeout;
|
||||||
} else {
|
} else {
|
||||||
@ -612,31 +656,38 @@ public class HashedWheelTimer implements Timer {
|
|||||||
// store reference to next as we may null out timeout.next in the remove block.
|
// store reference to next as we may null out timeout.next in the remove block.
|
||||||
HashedWheelTimeout next = timeout.next;
|
HashedWheelTimeout next = timeout.next;
|
||||||
if (remove) {
|
if (remove) {
|
||||||
|
remove(timeout);
|
||||||
|
}
|
||||||
|
timeout = next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void remove(HashedWheelTimeout timeout) {
|
||||||
|
HashedWheelTimeout next = timeout.next;
|
||||||
// remove timeout that was either processed or cancelled by updating the linked-list
|
// remove timeout that was either processed or cancelled by updating the linked-list
|
||||||
if (timeout.prev != null) {
|
if (timeout.prev != null) {
|
||||||
timeout.prev.next = timeout.next;
|
timeout.prev.next = next;
|
||||||
}
|
}
|
||||||
if (timeout.next != null) {
|
if (timeout.next != null) {
|
||||||
timeout.next.prev = timeout.prev;
|
timeout.next.prev = timeout.prev;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (timeout == head) {
|
if (timeout == head) {
|
||||||
// if timeout is head we need to replace the head with the next entry
|
|
||||||
head = next;
|
|
||||||
if (timeout == tail) {
|
|
||||||
// if timeout is also the tail we need to adjust the entry too
|
// if timeout is also the tail we need to adjust the entry too
|
||||||
tail = timeout.next;
|
if (timeout == tail) {
|
||||||
|
tail = null;
|
||||||
|
head = null;
|
||||||
|
} else {
|
||||||
|
head = next;
|
||||||
}
|
}
|
||||||
} else if (timeout == tail) {
|
} else if (timeout == tail) {
|
||||||
// if the timeout is the tail modify the tail to be the prev node.
|
// if the timeout is the tail modify the tail to be the prev node.
|
||||||
tail = timeout.prev;
|
tail = timeout.prev;
|
||||||
}
|
}
|
||||||
// null out prev and next to allow for GC.
|
// null out prev, next and bucket to allow for GC.
|
||||||
timeout.prev = null;
|
timeout.prev = null;
|
||||||
timeout.next = null;
|
timeout.next = null;
|
||||||
}
|
timeout.bucket = null;
|
||||||
timeout = next;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -671,6 +722,7 @@ public class HashedWheelTimer implements Timer {
|
|||||||
// null out prev and next to allow for GC.
|
// null out prev and next to allow for GC.
|
||||||
head.next = null;
|
head.next = null;
|
||||||
head.prev = null;
|
head.prev = null;
|
||||||
|
head.bucket = null;
|
||||||
return head;
|
return head;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user