Replaced FastQueue with LinkedTransferQueue
This commit is contained in:
parent
6d372c367b
commit
dae3b05ebb
@ -25,6 +25,7 @@ package org.jboss.netty.channel.socket.nio;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.jboss.netty.channel.AbstractChannel;
|
||||
@ -34,7 +35,7 @@ import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.ChannelPipeline;
|
||||
import org.jboss.netty.channel.ChannelSink;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.util.FastQueue;
|
||||
import org.jboss.netty.util.LinkedTransferQueue;
|
||||
|
||||
/**
|
||||
* @author The Netty Project (netty-dev@lists.jboss.org)
|
||||
@ -52,7 +53,7 @@ abstract class NioSocketChannel extends AbstractChannel
|
||||
final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
|
||||
final Runnable writeTask = new WriteTask();
|
||||
final Object writeLock = new Object();
|
||||
final FastQueue<MessageEvent> writeBuffer = new FastQueue<MessageEvent>();
|
||||
final Queue<MessageEvent> writeBuffer = new LinkedTransferQueue<MessageEvent>();
|
||||
MessageEvent currentWriteEvent;
|
||||
int currentWriteIndex;
|
||||
|
||||
|
@ -32,8 +32,8 @@ import java.nio.channels.ScatteringByteChannel;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.util.Iterator;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
@ -48,7 +48,7 @@ import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.logging.InternalLogger;
|
||||
import org.jboss.netty.logging.InternalLoggerFactory;
|
||||
import org.jboss.netty.util.FastQueue;
|
||||
import org.jboss.netty.util.LinkedTransferQueue;
|
||||
import org.jboss.netty.util.ThreadRenamingRunnable;
|
||||
|
||||
/**
|
||||
@ -76,10 +76,8 @@ class NioWorker implements Runnable {
|
||||
private final AtomicBoolean wakenUp = new AtomicBoolean();
|
||||
private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock();
|
||||
private final Object shutdownLock = new Object();
|
||||
//private final FastQueue<Runnable> taskQueue = new FastQueue<Runnable>();
|
||||
//private final ConcurrentFastQueue<Runnable> taskQueue = new ConcurrentFastQueue<Runnable>();
|
||||
private final FastQueue<Runnable> registerTaskQueue = new FastQueue<Runnable>();
|
||||
private final ConcurrentLinkedQueue<Runnable> writeTaskQueue = new ConcurrentLinkedQueue<Runnable>();
|
||||
private final Queue<Runnable> registerTaskQueue = new LinkedTransferQueue<Runnable>();
|
||||
private final Queue<Runnable> writeTaskQueue = new LinkedTransferQueue<Runnable>();
|
||||
|
||||
NioWorker(int bossId, int id, Executor executor) {
|
||||
this.bossId = bossId;
|
||||
@ -409,7 +407,7 @@ class NioWorker implements Runnable {
|
||||
int bufIdx;
|
||||
|
||||
synchronized (channel.writeLock) {
|
||||
FastQueue<MessageEvent> writeBuffer = channel.writeBuffer;
|
||||
Queue<MessageEvent> writeBuffer = channel.writeBuffer;
|
||||
evt = channel.currentWriteEvent;
|
||||
for (;;) {
|
||||
if (evt == null) {
|
||||
@ -485,7 +483,7 @@ class NioWorker implements Runnable {
|
||||
int writtenBytes = 0;
|
||||
|
||||
synchronized (channel.writeLock) {
|
||||
FastQueue<MessageEvent> writeBuffer = channel.writeBuffer;
|
||||
Queue<MessageEvent> writeBuffer = channel.writeBuffer;
|
||||
evt = channel.currentWriteEvent;
|
||||
for (;;) {
|
||||
if (evt == null) {
|
||||
@ -733,7 +731,7 @@ class NioWorker implements Runnable {
|
||||
fireExceptionCaught(channel, cause);
|
||||
}
|
||||
|
||||
FastQueue<MessageEvent> writeBuffer = channel.writeBuffer;
|
||||
Queue<MessageEvent> writeBuffer = channel.writeBuffer;
|
||||
for (;;) {
|
||||
evt = writeBuffer.poll();
|
||||
if (evt == null) {
|
||||
|
@ -27,7 +27,6 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
@ -38,6 +37,7 @@ import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelEvent;
|
||||
import org.jboss.netty.channel.ChannelState;
|
||||
import org.jboss.netty.channel.ChannelStateEvent;
|
||||
import org.jboss.netty.util.LinkedTransferQueue;
|
||||
|
||||
/**
|
||||
* A {@link ThreadPoolExecutor} which blocks the task submission when there's
|
||||
@ -162,7 +162,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
|
||||
long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator,
|
||||
ThreadFactory threadFactory) {
|
||||
|
||||
super(corePoolSize, corePoolSize, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), threadFactory);
|
||||
super(corePoolSize, corePoolSize, keepAliveTime, unit, new LinkedTransferQueue<Runnable>(), threadFactory);
|
||||
|
||||
if (objectSizeEstimator == null) {
|
||||
throw new NullPointerException("objectSizeEstimator");
|
||||
|
@ -1,95 +0,0 @@
|
||||
/*
|
||||
* JBoss, Home of Professional Open Source
|
||||
*
|
||||
* Copyright 2008, Red Hat Middleware LLC, and individual contributors
|
||||
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
|
||||
* full listing of individual contributors.
|
||||
*
|
||||
* This is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU Lesser General Public License as
|
||||
* published by the Free Software Foundation; either version 2.1 of
|
||||
* the License, or (at your option) any later version.
|
||||
*
|
||||
* This software is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
* Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public
|
||||
* License along with this software; if not, write to the Free
|
||||
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
|
||||
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
|
||||
*/
|
||||
package org.jboss.netty.util;
|
||||
|
||||
|
||||
/**
|
||||
* @author The Netty Project (netty-dev@lists.jboss.org)
|
||||
* @author Trustin Lee (tlee@redhat.com)
|
||||
*
|
||||
* @version $Rev$, $Date$
|
||||
*
|
||||
*/
|
||||
public class ConcurrentFastQueue<E> {
|
||||
|
||||
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
|
||||
|
||||
final FastQueue<E>[] segments;
|
||||
private final int mask;
|
||||
private int pollIndex;
|
||||
|
||||
public ConcurrentFastQueue() {
|
||||
this(DEFAULT_CONCURRENCY_LEVEL);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public ConcurrentFastQueue(int concurrencyLevel) {
|
||||
if (concurrencyLevel <= 0) {
|
||||
throw new IllegalArgumentException(
|
||||
"concurrencyLevel: " + concurrencyLevel);
|
||||
}
|
||||
|
||||
int actualConcurrencyLevel = 1;
|
||||
while (actualConcurrencyLevel < concurrencyLevel) {
|
||||
actualConcurrencyLevel <<= 1;
|
||||
}
|
||||
|
||||
mask = actualConcurrencyLevel - 1;
|
||||
segments = new FastQueue[actualConcurrencyLevel];
|
||||
for (int i = 0; i < actualConcurrencyLevel; i ++) {
|
||||
segments[i] = new FastQueue<E>();
|
||||
}
|
||||
}
|
||||
|
||||
public void offer(E e) {
|
||||
segments[hash(e)].offer(e);
|
||||
}
|
||||
|
||||
public E poll() {
|
||||
int oldPollIndex = pollIndex;
|
||||
while (pollIndex < segments.length) {
|
||||
E v = segments[pollIndex].poll();
|
||||
if (v != null) {
|
||||
return v;
|
||||
}
|
||||
|
||||
pollIndex ++;
|
||||
}
|
||||
|
||||
for (pollIndex = 0; pollIndex < oldPollIndex; pollIndex ++) {
|
||||
E v = segments[pollIndex].poll();
|
||||
if (v != null) {
|
||||
return v;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private int hash(Object o) {
|
||||
int hash = System.identityHashCode(o);
|
||||
hash = (hash << 1) - (hash << 8);
|
||||
hash &= mask;
|
||||
return hash;
|
||||
}
|
||||
}
|
@ -1,83 +0,0 @@
|
||||
/*
|
||||
* JBoss, Home of Professional Open Source
|
||||
*
|
||||
* Copyright 2008, Red Hat Middleware LLC, and individual contributors
|
||||
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
|
||||
* full listing of individual contributors.
|
||||
*
|
||||
* This is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU Lesser General Public License as
|
||||
* published by the Free Software Foundation; either version 2.1 of
|
||||
* the License, or (at your option) any later version.
|
||||
*
|
||||
* This software is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
* Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public
|
||||
* License along with this software; if not, write to the Free
|
||||
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
|
||||
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
|
||||
*/
|
||||
package org.jboss.netty.util;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.Queue;
|
||||
|
||||
|
||||
/**
|
||||
* @author The Netty Project (netty-dev@lists.jboss.org)
|
||||
* @author Trustin Lee (tlee@redhat.com)
|
||||
*
|
||||
* @version $Rev$, $Date$
|
||||
*
|
||||
*/
|
||||
public class FastQueue<E> {
|
||||
|
||||
// Put
|
||||
private Queue<E> offeredElements;
|
||||
|
||||
// Take
|
||||
private Queue<E> drainedElements;
|
||||
|
||||
public synchronized void offer(E e) {
|
||||
if (offeredElements == null) {
|
||||
offeredElements = new LinkedList<E>();
|
||||
}
|
||||
|
||||
offeredElements.offer(e);
|
||||
}
|
||||
|
||||
public E poll() {
|
||||
for (;;) {
|
||||
if (drainedElements == null) {
|
||||
synchronized (this) {
|
||||
drainedElements = offeredElements;
|
||||
if (offeredElements == null) {
|
||||
break;
|
||||
}
|
||||
offeredElements = null;
|
||||
}
|
||||
}
|
||||
|
||||
E e = cast(drainedElements.poll());
|
||||
if (e != null) {
|
||||
return e;
|
||||
}
|
||||
|
||||
drainedElements = null;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public synchronized boolean isEmpty() {
|
||||
return offeredElements == null &&
|
||||
(drainedElements == null || drainedElements.isEmpty());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private E cast(Object o) {
|
||||
return (E) o;
|
||||
}
|
||||
}
|
@ -150,26 +150,26 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
* cleanMe, to alleviate contention across threads CASing one vs
|
||||
* the other.
|
||||
*/
|
||||
static final class PaddedAtomicReference<T> extends AtomicReference<T> {
|
||||
private static final long serialVersionUID = 4684288940772921317L;
|
||||
|
||||
// enough padding for 64bytes with 4byte refs
|
||||
Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
|
||||
PaddedAtomicReference(T r) { super(r); }
|
||||
}
|
||||
// static final class PaddedAtomicReference<T> extends AtomicReference<T> {
|
||||
// private static final long serialVersionUID = 4684288940772921317L;
|
||||
//
|
||||
// // enough padding for 64bytes with 4byte refs
|
||||
// Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
|
||||
// PaddedAtomicReference(T r) { super(r); }
|
||||
// }
|
||||
|
||||
|
||||
/** head of the queue */
|
||||
private transient final PaddedAtomicReference<QNode> head;
|
||||
private transient final AtomicReference<QNode> head;
|
||||
/** tail of the queue */
|
||||
private transient final PaddedAtomicReference<QNode> tail;
|
||||
private transient final AtomicReference<QNode> tail;
|
||||
|
||||
/**
|
||||
* Reference to a cancelled node that might not yet have been
|
||||
* unlinked from queue because it was the last inserted node
|
||||
* when it cancelled.
|
||||
*/
|
||||
private transient final PaddedAtomicReference<QNode> cleanMe;
|
||||
private transient final AtomicReference<QNode> cleanMe;
|
||||
|
||||
/**
|
||||
* Tries to cas nh as new head; if successful, unlink
|
||||
@ -194,8 +194,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
private Object xfer(Object e, int mode, long nanos) {
|
||||
boolean isData = e != null;
|
||||
QNode s = null;
|
||||
final PaddedAtomicReference<QNode> head = this.head;
|
||||
final PaddedAtomicReference<QNode> tail = this.tail;
|
||||
final AtomicReference<QNode> head = this.head;
|
||||
final AtomicReference<QNode> tail = this.tail;
|
||||
|
||||
for (;;) {
|
||||
QNode t = tail.get();
|
||||
@ -238,8 +238,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
*/
|
||||
private Object fulfill(Object e) {
|
||||
boolean isData = e != null;
|
||||
final PaddedAtomicReference<QNode> head = this.head;
|
||||
final PaddedAtomicReference<QNode> tail = this.tail;
|
||||
final AtomicReference<QNode> head = this.head;
|
||||
final AtomicReference<QNode> tail = this.tail;
|
||||
|
||||
for (;;) {
|
||||
QNode t = tail.get();
|
||||
@ -410,9 +410,9 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||
*/
|
||||
public LinkedTransferQueue() {
|
||||
QNode dummy = new QNode(null, false);
|
||||
head = new PaddedAtomicReference<QNode>(dummy);
|
||||
tail = new PaddedAtomicReference<QNode>(dummy);
|
||||
cleanMe = new PaddedAtomicReference<QNode>(null);
|
||||
head = new AtomicReference<QNode>(dummy);
|
||||
tail = new AtomicReference<QNode>(dummy);
|
||||
cleanMe = new AtomicReference<QNode>(null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
Reference in New Issue
Block a user