From 6248db59051774a25ee1b4f940384bbc32ef102a Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 2 May 2014 09:52:59 +0200 Subject: [PATCH] Minimize memory footprint of HashedWheelTimer and context-switching Motivation: At the moment there are two issues with HashedWheelTimer: * the memory footprint of it is pretty heavy (250kb fon an empty instance) * the way how added Timeouts are handled is inefficient in terms of how locks etc are used and so a lot of context-switching / condition can happen. Modification: Rewrite HashedWheelTimer to use an optimized bucket implementation to store the submitted Timeouts and a MPSC queue to handover the timeouts. So volatile writes are reduced to a minimum and also the memory foot-print of the buckets itself is reduced a lot as the bucket uses a double-linked-list. Beside this we use Atomic*FieldUpdater where-ever possible to improve the memory foot-print and performance. Result: Lower memory-footprint and better performance --- .../java/io/netty/util/HashedWheelTimer.java | 348 ++++++++++++------ .../netty/util/internal/MpscLinkedQueue.java | 121 +++--- .../io/netty/util/internal/OneTimeTask.java | 32 +- .../util/internal/PlatformDependent.java | 6 +- 4 files changed, 308 insertions(+), 199 deletions(-) diff --git a/common/src/main/java/io/netty/util/HashedWheelTimer.java b/common/src/main/java/io/netty/util/HashedWheelTimer.java index 6c5d575f4f..ad835c515a 100644 --- a/common/src/main/java/io/netty/util/HashedWheelTimer.java +++ b/common/src/main/java/io/netty/util/HashedWheelTimer.java @@ -15,24 +15,21 @@ */ package io.netty.util; +import io.netty.util.internal.MpscLinkedQueue; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; -import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; -import java.util.Iterator; -import java.util.List; +import java.util.Queue; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; /** * A {@link Timer} optimized for approximated I/O timeout scheduling. @@ -84,22 +81,32 @@ public class HashedWheelTimer implements Timer { new ResourceLeakDetector( HashedWheelTimer.class, 1, Runtime.getRuntime().availableProcessors() * 4); + private static final AtomicIntegerFieldUpdater WORKER_STATE_UPDATER; + static { + AtomicIntegerFieldUpdater workerStateUpdater = + PlatformDependent.newAtomicIntegerFieldUpdater(HashedWheelTimer.class, "workerState"); + if (workerStateUpdater == null) { + workerStateUpdater = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState"); + } + WORKER_STATE_UPDATER = workerStateUpdater; + } + private final ResourceLeak leak; private final Worker worker = new Worker(); - final Thread workerThread; + private final Thread workerThread; public static final int WORKER_STATE_INIT = 0; public static final int WORKER_STATE_STARTED = 1; public static final int WORKER_STATE_SHUTDOWN = 2; - final AtomicInteger workerState = new AtomicInteger(); // 0 - init, 1 - started, 2 - shut down + @SuppressWarnings({ "unused", "FieldMayBeFinal", "RedundantFieldInitialization" }) + private volatile int workerState = WORKER_STATE_INIT; // 0 - init, 1 - started, 2 - shut down - final long tickDuration; - final Set[] wheel; - final int mask; - final ReadWriteLock lock = new ReentrantReadWriteLock(); - final CountDownLatch startTimeInitialized = new CountDownLatch(1); - volatile long startTime; - volatile long tick; + private final long tickDuration; + private final HashedWheelBucket[] wheel; + private final int mask; + private final CountDownLatch startTimeInitialized = new CountDownLatch(1); + private final Queue timeouts = PlatformDependent.newMpscQueue(); + private volatile long startTime; /** * Creates a new timer with the default thread factory @@ -209,13 +216,13 @@ public class HashedWheelTimer implements Timer { "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length)); } - workerThread = threadFactory.newThread(worker); + leak = leakDetector.open(this); } @SuppressWarnings("unchecked") - private static Set[] createWheel(int ticksPerWheel) { + private static HashedWheelBucket[] createWheel(int ticksPerWheel) { if (ticksPerWheel <= 0) { throw new IllegalArgumentException( "ticksPerWheel must be greater than 0: " + ticksPerWheel); @@ -226,10 +233,9 @@ public class HashedWheelTimer implements Timer { } ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); - Set[] wheel = new Set[ticksPerWheel]; + HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel]; for (int i = 0; i < wheel.length; i ++) { - wheel[i] = Collections.newSetFromMap( - PlatformDependent.newConcurrentHashMap()); + wheel[i] = new HashedWheelBucket(); } return wheel; } @@ -250,9 +256,9 @@ public class HashedWheelTimer implements Timer { * {@linkplain #stop() stopped} already */ public void start() { - switch (workerState.get()) { + switch (WORKER_STATE_UPDATER.get(this)) { case WORKER_STATE_INIT: - if (workerState.compareAndSet(WORKER_STATE_INIT, WORKER_STATE_STARTED)) { + if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { workerThread.start(); } break; @@ -283,9 +289,9 @@ public class HashedWheelTimer implements Timer { TimerTask.class.getSimpleName()); } - if (!workerState.compareAndSet(WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) { + if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) { // workerState can be 0 or 2 at this moment - let it always be 2. - workerState.set(WORKER_STATE_SHUTDOWN); + WORKER_STATE_UPDATER.set(this, WORKER_STATE_SHUTDOWN); if (leak != null) { leak.close(); @@ -311,49 +317,31 @@ public class HashedWheelTimer implements Timer { if (leak != null) { leak.close(); } - - Set unprocessedTimeouts = new HashSet(); - for (Set bucket: wheel) { - unprocessedTimeouts.addAll(bucket); - bucket.clear(); - } - - return Collections.unmodifiableSet(unprocessedTimeouts); + return worker.unprocessedTimeouts(); } @Override public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { - start(); - if (task == null) { throw new NullPointerException("task"); } if (unit == null) { throw new NullPointerException("unit"); } + start(); + // Add the timeout to the timeout queue which will be processed on the next tick. + // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; - - // Add the timeout to the wheel. - HashedWheelTimeout timeout; - lock.readLock().lock(); - try { - timeout = new HashedWheelTimeout(task, deadline); - if (workerState.get() == WORKER_STATE_SHUTDOWN) { - throw new IllegalStateException("Cannot enqueue after shutdown"); - } - wheel[timeout.stopIndex].add(timeout); - } finally { - lock.readLock().unlock(); - } - + HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); + timeouts.add(timeout); return timeout; } private final class Worker implements Runnable { + private final Set unprocessedTimeouts = new HashSet(); - Worker() { - } + private long tick; @Override public void run() { @@ -367,68 +355,50 @@ public class HashedWheelTimer implements Timer { // Notify the other threads waiting for the initialization at start(). startTimeInitialized.countDown(); - List expiredTimeouts = new ArrayList(); - do { final long deadline = waitForNextTick(); if (deadline > 0) { - fetchExpiredTimeouts(expiredTimeouts, deadline); - notifyExpiredTimeouts(expiredTimeouts); + transferTimeoutsToBuckets(); + HashedWheelBucket bucket = + wheel[(int) (tick & mask)]; + bucket.expireTimeouts(deadline); + tick++; } - } while (workerState.get() == WORKER_STATE_STARTED); - } + } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); - private void fetchExpiredTimeouts( - List expiredTimeouts, long deadline) { - - // Find the expired timeouts and decrease the round counter - // if necessary. Note that we don't send the notification - // immediately to make sure the listeners are called without - // an exclusive lock. - lock.writeLock().lock(); - try { - fetchExpiredTimeouts(expiredTimeouts, wheel[(int) (tick & mask)].iterator(), deadline); - } finally { - // Note that the tick is updated only while the writer lock is held, - // so that newTimeout() and consequently new HashedWheelTimeout() never see an old value - // while the reader lock is held. - tick ++; - lock.writeLock().unlock(); + // Fill the unprocessedTimeouts so we can return them from stop() method. + for (HashedWheelBucket bucket: wheel) { + bucket.clearTimeouts(unprocessedTimeouts); } - } - - private void fetchExpiredTimeouts( - List expiredTimeouts, - Iterator i, long deadline) { - - while (i.hasNext()) { - HashedWheelTimeout timeout = i.next(); - if (timeout.remainingRounds <= 0) { - i.remove(); - if (timeout.deadline <= deadline) { - expiredTimeouts.add(timeout); - } else { - // The timeout was placed into a wrong slot. This should never happen. - throw new Error(String.format( - "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); - } - } else { - timeout.remainingRounds --; + for (;;) { + HashedWheelTimeout timeout = timeouts.poll(); + if (timeout == null) { + break; } + unprocessedTimeouts.add(timeout); } } - private void notifyExpiredTimeouts( - List expiredTimeouts) { - // Notify the expired timeouts. - for (int i = expiredTimeouts.size() - 1; i >= 0; i --) { - expiredTimeouts.get(i).expire(); + private void transferTimeoutsToBuckets() { + // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just + // adds new timeouts in a loop. + for (int i = 0; i < 100000; i++) { + HashedWheelTimeout timeout = timeouts.poll(); + if (timeout == null) { + // all processed + break; + } + long calculated = timeout.deadline / tickDuration; + long remainingRounds = (calculated - tick) / wheel.length; + timeout.remainingRounds = remainingRounds; + + final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past. + int stopIndex = (int) (ticks & mask); + + HashedWheelBucket bucket = wheel[stopIndex]; + bucket.addTimeout(timeout); } - - // Clean up the temporary list. - expiredTimeouts.clear(); } - /** * calculate goal nanoTime from startTime and current tick number, * then wait until that goal has been reached. @@ -462,39 +432,60 @@ public class HashedWheelTimer implements Timer { try { Thread.sleep(sleepTimeMs); } catch (InterruptedException e) { - if (workerState.get() == WORKER_STATE_SHUTDOWN) { + if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) { return Long.MIN_VALUE; } } } } + + public Set unprocessedTimeouts() { + return Collections.unmodifiableSet(unprocessedTimeouts); + } } - private final class HashedWheelTimeout implements Timeout { + private static final class HashedWheelTimeout extends MpscLinkedQueue.Node + implements Timeout { private static final int ST_INIT = 0; private static final int ST_CANCELLED = 1; private static final int ST_EXPIRED = 2; + private static final AtomicIntegerFieldUpdater STATE_UPDATER; + static { + AtomicIntegerFieldUpdater updater = + PlatformDependent.newAtomicIntegerFieldUpdater(HashedWheelTimeout.class, "state"); + if (updater == null) { + updater = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state"); + } + STATE_UPDATER = updater; + } + + private final HashedWheelTimer timer; private final TimerTask task; - final long deadline; - final int stopIndex; - volatile long remainingRounds; - private final AtomicInteger state = new AtomicInteger(ST_INIT); + private final long deadline; - HashedWheelTimeout(TimerTask task, long deadline) { + @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" }) + private volatile int state = ST_INIT; + + // remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the + // HashedWheelTimeout will be added to the correct HashedWheelBucket. + long remainingRounds; + + // This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list. + // As only the workerThread will act on it there is no need for synchronization / volatile. + HashedWheelTimeout next; + HashedWheelTimeout prev; + + HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) { + this.timer = timer; this.task = task; this.deadline = deadline; - - long calculated = deadline / tickDuration; - final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past. - stopIndex = (int) (ticks & mask); - remainingRounds = (calculated - tick) / wheel.length; } @Override public Timer timer() { - return HashedWheelTimer.this; + return timer; } @Override @@ -504,26 +495,30 @@ public class HashedWheelTimer implements Timer { @Override public boolean cancel() { - if (!state.compareAndSet(ST_INIT, ST_CANCELLED)) { + // only update the state it will be removed from HashedWheelBucket on next tick. + if (!STATE_UPDATER.compareAndSet(this, ST_INIT, ST_CANCELLED)) { return false; } - - wheel[stopIndex].remove(this); return true; } @Override public boolean isCancelled() { - return state.get() == ST_CANCELLED; + return STATE_UPDATER.get(this) == ST_CANCELLED; } @Override public boolean isExpired() { - return state.get() != ST_INIT; + return STATE_UPDATER.get(this) != ST_INIT; + } + + @Override + public HashedWheelTimeout value() { + return this; } public void expire() { - if (!state.compareAndSet(ST_INIT, ST_EXPIRED)) { + if (!STATE_UPDATER.compareAndSet(this, ST_INIT, ST_EXPIRED)) { return; } @@ -539,7 +534,7 @@ public class HashedWheelTimer implements Timer { @Override public String toString() { final long currentTime = System.nanoTime(); - long remaining = deadline - currentTime + startTime; + long remaining = deadline - currentTime + timer.startTime; StringBuilder buf = new StringBuilder(192); buf.append(StringUtil.simpleClassName(this)); @@ -566,4 +561,117 @@ public class HashedWheelTimer implements Timer { return buf.append(')').toString(); } } + + /** + * Bucket that stores HashedWheelTimeouts. These are stored in a linked-list like datastructure to allow easy + * removal of HashedWheelTimeouts in the middle. Also the HashedWheelTimeout act as nodes themself and so no + * extra object creation is needed. + */ + private static final class HashedWheelBucket { + + // Used for the linked-list datastructure + private HashedWheelTimeout head; + private HashedWheelTimeout tail; + + /** + * Add {@link HashedWheelTimeout} to this bucket. + */ + public void addTimeout(HashedWheelTimeout timeout) { + if (head == null) { + head = tail = timeout; + } else { + tail.next = timeout; + timeout.prev = tail; + tail = timeout; + } + } + + /** + * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}. + */ + public void expireTimeouts(long deadline) { + HashedWheelTimeout timeout = head; + + // process all timeouts + while (timeout != null) { + boolean remove = false; + if (timeout.remainingRounds <= 0) { + if (timeout.deadline <= deadline) { + timeout.expire(); + } else { + // The timeout was placed into a wrong slot. This should never happen. + throw new IllegalStateException(String.format( + "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); + } + remove = true; + } else if (timeout.isCancelled()) { + remove = true; + } else { + timeout.remainingRounds --; + } + // store reference to next as we may null out timeout.next in the remove block. + HashedWheelTimeout next = timeout.next; + if (remove) { + // remove timeout that was either processed or cancelled by updating the linked-list + if (timeout.prev != null) { + timeout.prev.next = timeout.next; + } + if (timeout.next != null) { + timeout.next.prev = timeout.prev; + } + + if (timeout == head) { + // if timeout is head we need to replace the head with the next entry + head = next; + if (timeout == tail) { + // if timeout is also the tail we need to adjust the entry too + tail = timeout.next; + } + } else if (timeout == tail) { + // if the timeout is the tail modify the tail to be the prev node. + tail = timeout.prev; + } + // null out prev and next to allow for GC. + timeout.prev = null; + timeout.next = null; + } + timeout = next; + } + } + + /** + * Clear this bucket and return all not expired / cancelled {@link Timeout}s. + */ + public void clearTimeouts(Set set) { + for (;;) { + HashedWheelTimeout timeout = pollTimeout(); + if (timeout == null) { + return; + } + if (timeout.isExpired() || timeout.isCancelled()) { + continue; + } + set.add(timeout); + } + } + + private HashedWheelTimeout pollTimeout() { + HashedWheelTimeout head = this.head; + if (head == null) { + return null; + } + HashedWheelTimeout next = head.next; + if (next == null) { + tail = this.head = null; + } else { + this.head = next; + next.prev = null; + } + + // null out prev and next to allow for GC. + head.next = null; + head.prev = null; + return head; + } + } } diff --git a/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java b/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java index 349b03f26e..b2bc425bd4 100644 --- a/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java +++ b/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java @@ -39,7 +39,7 @@ import java.util.concurrent.atomic.AtomicReference; * */ @SuppressWarnings("serial") -final class MpscLinkedQueue extends AtomicReference implements Queue { +public final class MpscLinkedQueue extends AtomicReference> implements Queue { private static final long tailOffset; static { @@ -54,74 +54,75 @@ final class MpscLinkedQueue extends AtomicReference implements Queu // Extends AtomicReference for the "head" slot (which is the one that is appended to) // since Unsafe does not expose XCHG operation intrinsically @SuppressWarnings({ "unused", "FieldMayBeFinal" }) - private volatile OneTimeTask tail; + private volatile Node tail; MpscLinkedQueue() { - final OneTimeTask task = new OneTimeTaskAdapter(null); + final Node task = new DefaultNode(null); tail = task; set(task); } + @SuppressWarnings("unchecked") @Override - public boolean add(Runnable runnable) { - if (runnable instanceof OneTimeTask) { - OneTimeTask node = (OneTimeTask) runnable; + public boolean add(T value) { + if (value instanceof Node) { + Node node = (Node) value; node.setNext(null); getAndSet(node).setNext(node); } else { - final OneTimeTask n = new OneTimeTaskAdapter(runnable); + final Node n = new DefaultNode(value); getAndSet(n).setNext(n); } return true; } @Override - public boolean offer(Runnable runnable) { - return add(runnable); + public boolean offer(T value) { + return add(value); } @Override - public Runnable remove() { - Runnable task = poll(); - if (task == null) { + public T remove() { + T v = poll(); + if (v == null) { throw new NoSuchElementException(); } - return task; + return v; } @Override - public Runnable poll() { - final OneTimeTask next = peekTask(); + public T poll() { + final Node next = peekNode(); if (next == null) { return null; } - final OneTimeTask ret = next; + final Node ret = next; PlatformDependent.putOrderedObject(this, tailOffset, next); - return unwrapIfNeeded(ret); + return ret.value(); } @Override - public Runnable element() { - final OneTimeTask next = peekTask(); + public T element() { + final Node next = peekNode(); if (next == null) { throw new NoSuchElementException(); } - return unwrapIfNeeded(next); + return next.value(); } @Override - public Runnable peek() { - final OneTimeTask next = peekTask(); + public T peek() { + final Node next = peekNode(); if (next == null) { return null; } - return unwrapIfNeeded(next); + return next.value(); } @Override public int size() { int count = 0; - OneTimeTask n = peekTask(); + Node n = peekNode(); for (;;) { if (n == null) { break; @@ -133,10 +134,10 @@ final class MpscLinkedQueue extends AtomicReference implements Queu } @SuppressWarnings("unchecked") - private OneTimeTask peekTask() { + private Node peekNode() { for (;;) { - final OneTimeTask tail = (OneTimeTask) PlatformDependent.getObjectVolatile(this, tailOffset); - final OneTimeTask next = tail.next(); + final Node tail = (Node) PlatformDependent.getObjectVolatile(this, tailOffset); + final Node next = tail.next(); if (next != null || get() == tail) { return next; } @@ -150,12 +151,12 @@ final class MpscLinkedQueue extends AtomicReference implements Queu @Override public boolean contains(Object o) { - OneTimeTask n = peekTask(); + Node n = peekNode(); for (;;) { if (n == null) { break; } - if (unwrapIfNeeded(n) == o) { + if (n.value() == o) { return true; } n = n.next(); @@ -164,7 +165,7 @@ final class MpscLinkedQueue extends AtomicReference implements Queu } @Override - public Iterator iterator() { + public Iterator iterator() { throw new UnsupportedOperationException(); } @@ -194,8 +195,8 @@ final class MpscLinkedQueue extends AtomicReference implements Queu } @Override - public boolean addAll(Collection c) { - for (Runnable r: c) { + public boolean addAll(Collection c) { + for (T r: c) { add(r); } return false; @@ -220,26 +221,50 @@ final class MpscLinkedQueue extends AtomicReference implements Queu } } - /** - * Unwrap {@link OneTimeTask} if needed and so return the proper queued task. - */ - private static Runnable unwrapIfNeeded(OneTimeTask task) { - if (task instanceof OneTimeTaskAdapter) { - return ((OneTimeTaskAdapter) task).task; - } - return task; - } + private static final class DefaultNode extends Node { + private final T value; - private static final class OneTimeTaskAdapter extends OneTimeTask { - private final Runnable task; - - OneTimeTaskAdapter(Runnable task) { - this.task = task; + DefaultNode(T value) { + this.value = value; } @Override - public void run() { - task.run(); + public T value() { + return value; } } + + public abstract static class Node { + + private static final long nextOffset; + + static { + if (PlatformDependent0.hasUnsafe()) { + try { + nextOffset = PlatformDependent.objectFieldOffset( + Node.class.getDeclaredField("tail")); + } catch (Throwable t) { + throw new ExceptionInInitializerError(t); + } + } else { + nextOffset = -1; + } + } + + @SuppressWarnings("unused") + private volatile Node tail; + + // Only use from MpscLinkedQueue and so we are sure Unsafe is present + @SuppressWarnings("unchecked") + final Node next() { + return (Node) PlatformDependent.getObjectVolatile(this, nextOffset); + } + + // Only use from MpscLinkedQueue and so we are sure Unsafe is present + final void setNext(final Node newNext) { + PlatformDependent.putOrderedObject(this, nextOffset, newNext); + } + + public abstract T value(); + } } diff --git a/common/src/main/java/io/netty/util/internal/OneTimeTask.java b/common/src/main/java/io/netty/util/internal/OneTimeTask.java index b3f46cbfc1..69819a3fdf 100644 --- a/common/src/main/java/io/netty/util/internal/OneTimeTask.java +++ b/common/src/main/java/io/netty/util/internal/OneTimeTask.java @@ -23,34 +23,10 @@ import io.netty.util.concurrent.EventExecutor; * * It is important this will not be reused. After submitted it is not allowed to get submitted again! */ -public abstract class OneTimeTask implements Runnable { +public abstract class OneTimeTask extends MpscLinkedQueue.Node implements Runnable { - private static final long nextOffset; - - static { - if (PlatformDependent0.hasUnsafe()) { - try { - nextOffset = PlatformDependent.objectFieldOffset( - OneTimeTask.class.getDeclaredField("tail")); - } catch (Throwable t) { - throw new ExceptionInInitializerError(t); - } - } else { - nextOffset = -1; - } - } - - @SuppressWarnings("unused") - private volatile OneTimeTask tail; - - // Only use from MpscLinkedQueue and so we are sure Unsafe is present - @SuppressWarnings("unchecked") - final OneTimeTask next() { - return (OneTimeTask) PlatformDependent.getObjectVolatile(this, nextOffset); - } - - // Only use from MpscLinkedQueue and so we are sure Unsafe is present - final void setNext(final OneTimeTask newNext) { - PlatformDependent.putOrderedObject(this, nextOffset, newNext); + @Override + public Runnable value() { + return this; } } diff --git a/common/src/main/java/io/netty/util/internal/PlatformDependent.java b/common/src/main/java/io/netty/util/internal/PlatformDependent.java index f34d5877aa..733c356fba 100644 --- a/common/src/main/java/io/netty/util/internal/PlatformDependent.java +++ b/common/src/main/java/io/netty/util/internal/PlatformDependent.java @@ -379,11 +379,11 @@ public final class PlatformDependent { * Create a new {@link Queue} which is safe to use for multiple producers (different threads) and a single * consumer (one thread!). */ - public static Queue newMpscQueue() { + public static Queue newMpscQueue() { if (hasUnsafe()) { - return new MpscLinkedQueue(); + return new MpscLinkedQueue(); } else { - return new ConcurrentLinkedQueue(); + return new ConcurrentLinkedQueue(); } }