From ef9ee90b1a0eb1789222273b6565b49af50eb0d9 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sun, 11 May 2014 15:52:48 +0200 Subject: [PATCH] Minimize memory footprint of HashedWheelTimer and context-switching Motivation: At the moment there are two issues with HashedWheelTimer: * the memory footprint of it is pretty heavy (250kb fon an empty instance) * the way how added Timeouts are handled is inefficient in terms of how locks etc are used and so a lot of context-switching / condition can happen. Modification: Rewrite HashedWheelTimer to use an optimized bucket implementation to store the submitted Timeouts and a queue to handover the timeouts. So memory foot-print of the buckets itself is reduced a lot as the bucket uses a double-linked-list. Beside this we use Atomic*FieldUpdater where-ever possible to improve the memory foot-print and performance. Result: Lower memory-footprint and better performance --- .../jboss/netty/util/HashedWheelTimer.java | 378 +++++++++++------- 1 file changed, 228 insertions(+), 150 deletions(-) diff --git a/src/main/java/org/jboss/netty/util/HashedWheelTimer.java b/src/main/java/org/jboss/netty/util/HashedWheelTimer.java index a1d3514b70..cc9b10762c 100644 --- a/src/main/java/org/jboss/netty/util/HashedWheelTimer.java +++ b/src/main/java/org/jboss/netty/util/HashedWheelTimer.java @@ -18,23 +18,20 @@ package org.jboss.netty.util; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; -import org.jboss.netty.util.internal.ConcurrentIdentityHashMap; import org.jboss.netty.util.internal.DetectionUtil; -import org.jboss.netty.util.internal.ReusableIterator; import org.jboss.netty.util.internal.SharedResourceMisuseDetector; -import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; -import java.util.List; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; /** * A {@link Timer} optimized for approximated I/O timeout scheduling. @@ -88,22 +85,24 @@ public class HashedWheelTimer implements Timer { private static final SharedResourceMisuseDetector misuseDetector = new SharedResourceMisuseDetector(HashedWheelTimer.class); + private static final AtomicIntegerFieldUpdater WORKER_STATE_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState"); + private final Worker worker = new Worker(); - final Thread workerThread; + private final Thread workerThread; public static final int WORKER_STATE_INIT = 0; public static final int WORKER_STATE_STARTED = 1; public static final int WORKER_STATE_SHUTDOWN = 2; - final AtomicInteger workerState = new AtomicInteger(); // 0 - init, 1 - started, 2 - shut down + @SuppressWarnings({ "unused", "FieldMayBeFinal", "RedundantFieldInitialization" }) + private volatile int workerState = WORKER_STATE_INIT; // 0 - init, 1 - started, 2 - shut down - final long tickDuration; - final Set[] wheel; - final ReusableIterator[] iterators; - final int mask; - final ReadWriteLock lock = new ReentrantReadWriteLock(); - final CountDownLatch startTimeInitialized = new CountDownLatch(1); - volatile long startTime; - volatile long tick; + private final long tickDuration; + private final HashedWheelBucket[] wheel; + private final int mask; + private final CountDownLatch startTimeInitialized = new CountDownLatch(1); + private final Queue timeouts = new ConcurrentLinkedQueue(); + private volatile long startTime; /** * Creates a new timer with the default thread factory @@ -213,7 +212,6 @@ public class HashedWheelTimer implements Timer { // Normalize ticksPerWheel to power of two and initialize the wheel. wheel = createWheel(ticksPerWheel); - iterators = createIterators(wheel); mask = wheel.length - 1; // Convert tickDuration to nanos. @@ -235,7 +233,7 @@ public class HashedWheelTimer implements Timer { } @SuppressWarnings("unchecked") - private static Set[] createWheel(int ticksPerWheel) { + private static HashedWheelBucket[] createWheel(int ticksPerWheel) { if (ticksPerWheel <= 0) { throw new IllegalArgumentException( "ticksPerWheel must be greater than 0: " + ticksPerWheel); @@ -246,23 +244,13 @@ public class HashedWheelTimer implements Timer { } ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); - Set[] wheel = new Set[ticksPerWheel]; + HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel]; for (int i = 0; i < wheel.length; i ++) { - wheel[i] = new MapBackedSet( - new ConcurrentIdentityHashMap(16, 0.95f, 4)); + wheel[i] = new HashedWheelBucket(); } return wheel; } - @SuppressWarnings("unchecked") - private static ReusableIterator[] createIterators(Set[] wheel) { - ReusableIterator[] iterators = new ReusableIterator[wheel.length]; - for (int i = 0; i < wheel.length; i ++) { - iterators[i] = (ReusableIterator) wheel[i].iterator(); - } - return iterators; - } - private static int normalizeTicksPerWheel(int ticksPerWheel) { int normalizedTicksPerWheel = 1; while (normalizedTicksPerWheel < ticksPerWheel) { @@ -279,18 +267,18 @@ public class HashedWheelTimer implements Timer { * {@linkplain #stop() stopped} already */ public void start() { - switch (workerState.get()) { - case WORKER_STATE_INIT: - if (workerState.compareAndSet(WORKER_STATE_INIT, WORKER_STATE_STARTED)) { - workerThread.start(); - } - break; - case WORKER_STATE_STARTED: - break; - case WORKER_STATE_SHUTDOWN: - throw new IllegalStateException("cannot be started once stopped"); - default: - throw new Error("Invalid WorkerState"); + switch (WORKER_STATE_UPDATER.get(this)) { + case WORKER_STATE_INIT: + if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { + workerThread.start(); + } + break; + case WORKER_STATE_STARTED: + break; + case WORKER_STATE_SHUTDOWN: + throw new IllegalStateException("cannot be started once stopped"); + default: + throw new Error("Invalid WorkerState"); } // Wait until the startTime is initialized by the worker. @@ -307,13 +295,16 @@ public class HashedWheelTimer implements Timer { if (Thread.currentThread() == workerThread) { throw new IllegalStateException( HashedWheelTimer.class.getSimpleName() + - ".stop() cannot be called from " + - TimerTask.class.getSimpleName()); + ".stop() cannot be called from " + + TimerTask.class.getSimpleName()); } - if (!workerState.compareAndSet(WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) { + if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) { // workerState can be 0 or 2 at this moment - let it always be 2. - workerState.set(WORKER_STATE_SHUTDOWN); + WORKER_STATE_UPDATER.set(this, WORKER_STATE_SHUTDOWN); + + misuseDetector.decrease(); + return Collections.emptySet(); } @@ -333,47 +324,30 @@ public class HashedWheelTimer implements Timer { misuseDetector.decrease(); - Set unprocessedTimeouts = new HashSet(); - for (Set bucket: wheel) { - unprocessedTimeouts.addAll(bucket); - bucket.clear(); - } - - return Collections.unmodifiableSet(unprocessedTimeouts); + return worker.unprocessedTimeouts(); } public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { - start(); - if (task == null) { throw new NullPointerException("task"); } if (unit == null) { throw new NullPointerException("unit"); } + start(); + // Add the timeout to the timeout queue which will be processed on the next tick. + // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; - - // Add the timeout to the wheel. - HashedWheelTimeout timeout; - lock.readLock().lock(); - try { - timeout = new HashedWheelTimeout(task, deadline); - if (workerState.get() == WORKER_STATE_SHUTDOWN) { - throw new IllegalStateException("Cannot enqueue after shutdown"); - } - wheel[timeout.stopIndex].add(timeout); - } finally { - lock.readLock().unlock(); - } - + HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); + timeouts.add(timeout); return timeout; } private final class Worker implements Runnable { + private final Set unprocessedTimeouts = new HashSet(); - Worker() { - } + private long tick; public void run() { // Initialize the startTime. @@ -386,71 +360,51 @@ public class HashedWheelTimer implements Timer { // Notify the other threads waiting for the initialization at start(). startTimeInitialized.countDown(); - List expiredTimeouts = new ArrayList(); - do { final long deadline = waitForNextTick(); if (deadline > 0) { - fetchExpiredTimeouts(expiredTimeouts, deadline); - notifyExpiredTimeouts(expiredTimeouts); + transferTimeoutsToBuckets(); + HashedWheelBucket bucket = + wheel[(int) (tick & mask)]; + bucket.expireTimeouts(deadline); + tick++; } - } while (workerState.get() == WORKER_STATE_STARTED); - } + } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); - private void fetchExpiredTimeouts( - List expiredTimeouts, long deadline) { - - // Find the expired timeouts and decrease the round counter - // if necessary. Note that we don't send the notification - // immediately to make sure the listeners are called without - // an exclusive lock. - lock.writeLock().lock(); - try { - ReusableIterator i = iterators[(int) (tick & mask)]; - fetchExpiredTimeouts(expiredTimeouts, i, deadline); - } finally { - // Note that the tick is updated only while the writer lock is held, - // so that newTimeout() and consequently new HashedWheelTimeout() never see an old value - // while the reader lock is held. - tick ++; - lock.writeLock().unlock(); + // Fill the unprocessedTimeouts so we can return them from stop() method. + for (HashedWheelBucket bucket: wheel) { + bucket.clearTimeouts(unprocessedTimeouts); } - } - - private void fetchExpiredTimeouts( - List expiredTimeouts, - ReusableIterator i, long deadline) { - - i.rewind(); - while (i.hasNext()) { - HashedWheelTimeout timeout = i.next(); - if (timeout.remainingRounds <= 0) { - i.remove(); - if (timeout.deadline <= deadline) { - expiredTimeouts.add(timeout); - } else { - // The timeout was placed into a wrong slot. This should never happen. - throw new Error(String.format( - "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); - } - } else { - timeout.remainingRounds --; + for (;;) { + HashedWheelTimeout timeout = timeouts.poll(); + if (timeout == null) { + break; } + unprocessedTimeouts.add(timeout); } } - private void notifyExpiredTimeouts( - List expiredTimeouts) { - // Notify the expired timeouts. - for (int i = expiredTimeouts.size() - 1; i >= 0; i --) { - expiredTimeouts.get(i).expire(); + private void transferTimeoutsToBuckets() { + // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just + // adds new timeouts in a loop. + for (int i = 0; i < 100000; i++) { + HashedWheelTimeout timeout = timeouts.poll(); + if (timeout == null) { + // all processed + break; + } + long calculated = timeout.deadline / tickDuration; + long remainingRounds = (calculated - tick) / wheel.length; + timeout.remainingRounds = remainingRounds; + + final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past. + int stopIndex = (int) (ticks & mask); + + HashedWheelBucket bucket = wheel[stopIndex]; + bucket.addTimeout(timeout); } - - // Clean up the temporary list. - expiredTimeouts.clear(); } - - /** + /** * calculate goal nanoTime from startTime and current tick number, * then wait until that goal has been reached. * @return Long.MIN_VALUE if received a shutdown request, @@ -479,41 +433,54 @@ public class HashedWheelTimer implements Timer { if (DetectionUtil.isWindows()) { sleepTimeMs = sleepTimeMs / 10 * 10; } + try { Thread.sleep(sleepTimeMs); } catch (InterruptedException e) { - if (workerState.get() == WORKER_STATE_SHUTDOWN) { + if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) { return Long.MIN_VALUE; } } } } + + public Set unprocessedTimeouts() { + return Collections.unmodifiableSet(unprocessedTimeouts); + } } - private final class HashedWheelTimeout implements Timeout { + private static final class HashedWheelTimeout implements Timeout { private static final int ST_INIT = 0; private static final int ST_CANCELLED = 1; private static final int ST_EXPIRED = 2; + private static final AtomicIntegerFieldUpdater STATE_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state"); + private final HashedWheelTimer timer; private final TimerTask task; - final long deadline; - final int stopIndex; - volatile long remainingRounds; - private final AtomicInteger state = new AtomicInteger(ST_INIT); + private final long deadline; - HashedWheelTimeout(TimerTask task, long deadline) { + @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" }) + private volatile int state = ST_INIT; + + // remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the + // HashedWheelTimeout will be added to the correct HashedWheelBucket. + long remainingRounds; + + // This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list. + // As only the workerThread will act on it there is no need for synchronization / volatile. + HashedWheelTimeout next; + HashedWheelTimeout prev; + + HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) { + this.timer = timer; this.task = task; this.deadline = deadline; - - long calculated = deadline / tickDuration; - final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past. - stopIndex = (int) (ticks & mask); - remainingRounds = (calculated - tick) / wheel.length; } public Timer getTimer() { - return HashedWheelTimer.this; + return timer; } public TimerTask getTask() { @@ -521,24 +488,24 @@ public class HashedWheelTimer implements Timer { } public void cancel() { - if (!state.compareAndSet(ST_INIT, ST_CANCELLED)) { - // TODO return false - return; - } - - wheel[stopIndex].remove(this); + // only update the state it will be removed from HashedWheelBucket on next tick. + STATE_UPDATER.compareAndSet(this, ST_INIT, ST_CANCELLED); } public boolean isCancelled() { - return state.get() == ST_CANCELLED; + return STATE_UPDATER.get(this) == ST_CANCELLED; } public boolean isExpired() { - return state.get() != ST_INIT; + return STATE_UPDATER.get(this) != ST_INIT; + } + + public HashedWheelTimeout value() { + return this; } public void expire() { - if (!state.compareAndSet(ST_INIT, ST_EXPIRED)) { + if (!STATE_UPDATER.compareAndSet(this, ST_INIT, ST_EXPIRED)) { return; } @@ -546,9 +513,7 @@ public class HashedWheelTimer implements Timer { task.run(this); } catch (Throwable t) { if (logger.isWarnEnabled()) { - logger.warn( - "An exception was thrown by " + - TimerTask.class.getSimpleName() + '.', t); + logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t); } } } @@ -556,7 +521,7 @@ public class HashedWheelTimer implements Timer { @Override public String toString() { final long currentTime = System.nanoTime(); - long remaining = deadline - currentTime + startTime; + long remaining = deadline - currentTime + timer.startTime; StringBuilder buf = new StringBuilder(192); buf.append(getClass().getSimpleName()); @@ -583,4 +548,117 @@ public class HashedWheelTimer implements Timer { return buf.append(')').toString(); } } + + /** + * Bucket that stores HashedWheelTimeouts. These are stored in a linked-list like datastructure to allow easy + * removal of HashedWheelTimeouts in the middle. Also the HashedWheelTimeout act as nodes themself and so no + * extra object creation is needed. + */ + private static final class HashedWheelBucket { + + // Used for the linked-list datastructure + private HashedWheelTimeout head; + private HashedWheelTimeout tail; + + /** + * Add {@link HashedWheelTimeout} to this bucket. + */ + public void addTimeout(HashedWheelTimeout timeout) { + if (head == null) { + head = tail = timeout; + } else { + tail.next = timeout; + timeout.prev = tail; + tail = timeout; + } + } + + /** + * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}. + */ + public void expireTimeouts(long deadline) { + HashedWheelTimeout timeout = head; + + // process all timeouts + while (timeout != null) { + boolean remove = false; + if (timeout.remainingRounds <= 0) { + if (timeout.deadline <= deadline) { + timeout.expire(); + } else { + // The timeout was placed into a wrong slot. This should never happen. + throw new IllegalStateException(String.format( + "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); + } + remove = true; + } else if (timeout.isCancelled()) { + remove = true; + } else { + timeout.remainingRounds --; + } + // store reference to next as we may null out timeout.next in the remove block. + HashedWheelTimeout next = timeout.next; + if (remove) { + // remove timeout that was either processed or cancelled by updating the linked-list + if (timeout.prev != null) { + timeout.prev.next = timeout.next; + } + if (timeout.next != null) { + timeout.next.prev = timeout.prev; + } + + if (timeout == head) { + // if timeout is head we need to replace the head with the next entry + head = next; + if (timeout == tail) { + // if timeout is also the tail we need to adjust the entry too + tail = timeout.next; + } + } else if (timeout == tail) { + // if the timeout is the tail modify the tail to be the prev node. + tail = timeout.prev; + } + // null out prev and next to allow for GC. + timeout.prev = null; + timeout.next = null; + } + timeout = next; + } + } + + /** + * Clear this bucket and return all not expired / cancelled {@link Timeout}s. + */ + public void clearTimeouts(Set set) { + for (;;) { + HashedWheelTimeout timeout = pollTimeout(); + if (timeout == null) { + return; + } + if (timeout.isExpired() || timeout.isCancelled()) { + continue; + } + set.add(timeout); + } + } + + private HashedWheelTimeout pollTimeout() { + HashedWheelTimeout head = this.head; + if (head == null) { + return null; + } + HashedWheelTimeout next = head.next; + if (next == null) { + tail = this.head = null; + } else { + this.head = next; + next.prev = null; + } + + // null out prev and next to allow for GC. + head.next = null; + head.prev = null; + return head; + } + } }