Rename the old LinkedTransferQueue to LegacyLinkedTransferQueue and fork

the "current" implementation of LinkedTransferQueue (from Doug Lea).
Introduce a QueueFactory which will load the right implementation
depending on the JVM version. This will make sure that the one with the
best performance is choosen and the code also works with java 5. See
#102
This commit is contained in:
norman 2011-12-07 12:08:39 +01:00
parent 51f69877a6
commit 766525431d
19 changed files with 3046 additions and 1408 deletions

View File

@ -32,7 +32,7 @@ import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.DefaultChannelConfig;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.util.internal.LinkedTransferQueue;
import org.jboss.netty.util.internal.QueueFactory;
import org.jboss.netty.util.internal.ThreadLocalBoolean;
/**
@ -52,7 +52,7 @@ final class DefaultLocalChannel extends AbstractChannel implements LocalChannel
private final ChannelConfig config;
private final ThreadLocalBoolean delivering = new ThreadLocalBoolean();
final Queue<MessageEvent> writeBuffer = new LinkedTransferQueue<MessageEvent>();
final Queue<MessageEvent> writeBuffer = QueueFactory.createQueue(MessageEvent.class);
volatile DefaultLocalChannel pairedChannel;
volatile LocalAddress localAddress;

View File

@ -0,0 +1,158 @@
/*
* Copyright 2011 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.util.internal.QueueFactory;
/**
*
*
*
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://www.murkycloud.com/">Norman Maurer</a>
*
*/
abstract class AbstractWriteRequestQueue implements BlockingQueue<MessageEvent>{
protected final BlockingQueue<MessageEvent> queue;
public AbstractWriteRequestQueue() {
this.queue = QueueFactory.createQueue(MessageEvent.class);
}
@Override
public MessageEvent remove() {
return queue.remove();
}
@Override
public MessageEvent element() {
return queue.element();
}
@Override
public MessageEvent peek() {
return queue.peek();
}
@Override
public int size() {
return queue.size();
}
@Override
public boolean isEmpty() {
return queue.isEmpty();
}
@Override
public Iterator<MessageEvent> iterator() {
return queue.iterator();
}
@Override
public Object[] toArray() {
return queue.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
return queue.toArray(a);
}
@Override
public boolean containsAll(Collection<?> c) {
return queue.containsAll(c);
}
@Override
public boolean addAll(Collection<? extends MessageEvent> c) {
return queue.addAll(c);
}
@Override
public boolean removeAll(Collection<?> c) {
return queue.removeAll(c);
}
@Override
public boolean retainAll(Collection<?> c) {
return queue.retainAll(c);
}
@Override
public void clear() {
queue.clear();
}
@Override
public boolean add(MessageEvent e) {
return queue.add(e);
}
@Override
public void put(MessageEvent e) throws InterruptedException {
queue.put(e);
}
@Override
public boolean offer(MessageEvent e, long timeout, TimeUnit unit) throws InterruptedException {
return queue.offer(e, timeout, unit);
}
@Override
public MessageEvent take() throws InterruptedException {
return queue.take();
}
@Override
public MessageEvent poll(long timeout, TimeUnit unit) throws InterruptedException {
return queue.poll(timeout, unit);
}
@Override
public int remainingCapacity() {
return queue.remainingCapacity();
}
@Override
public boolean remove(Object o) {
return queue.remove(o);
}
@Override
public boolean contains(Object o) {
return queue.contains(o);
}
@Override
public int drainTo(Collection<? super MessageEvent> c) {
return queue.drainTo(c);
}
@Override
public int drainTo(Collection<? super MessageEvent> c, int maxElements) {
return queue.drainTo(c, maxElements);
}
}

View File

@ -43,7 +43,7 @@ import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.internal.DeadLockProofWorker;
import org.jboss.netty.util.internal.LinkedTransferQueue;
import org.jboss.netty.util.internal.QueueFactory;
/**
*
@ -183,7 +183,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
private boolean started;
private final AtomicBoolean wakenUp = new AtomicBoolean();
private final Object startStopLock = new Object();
private final Queue<Runnable> registerTaskQueue = new LinkedTransferQueue<Runnable>();
private final Queue<Runnable> registerTaskQueue = QueueFactory.createQueue(Runnable.class);
Boss() {
}

View File

@ -38,7 +38,7 @@ import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.socket.DatagramChannelConfig;
import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
import org.jboss.netty.util.internal.LinkedTransferQueue;
import org.jboss.netty.util.internal.LegacyLinkedTransferQueue;
import org.jboss.netty.util.internal.ThreadLocalBoolean;
/**
@ -247,26 +247,22 @@ class NioDatagramChannel extends AbstractChannel
}
/**
* {@link WriteRequestQueue} is an extension of {@link LinkedTransferQueue}
* {@link WriteRequestQueue} is an extension of {@link AbstractWriteRequestQueue}
* that adds support for highWaterMark checking of the write buffer size.
*/
private final class WriteRequestQueue extends
LinkedTransferQueue<MessageEvent> {
private static final long serialVersionUID = 5057413071460766376L;
AbstractWriteRequestQueue {
private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
WriteRequestQueue() {
}
/**
* This method first delegates to {@link LinkedTransferQueue#offer(Object)} and
* This method first delegates to {@link LegacyLinkedTransferQueue#offer(Object)} and
* adds support for keeping track of the size of the this write buffer.
*/
@Override
public boolean offer(MessageEvent e) {
boolean success = super.offer(e);
boolean success = queue.offer(e);
assert success;
int messageSize = getMessageSize(e);
@ -287,12 +283,12 @@ class NioDatagramChannel extends AbstractChannel
}
/**
* This method first delegates to {@link LinkedTransferQueue#poll()} and
* This method first delegates to {@link LegacyLinkedTransferQueue#poll()} and
* adds support for keeping track of the size of the this writebuffers queue.
*/
@Override
public MessageEvent poll() {
MessageEvent e = super.poll();
MessageEvent e = queue.poll();
if (e != null) {
int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);

View File

@ -45,7 +45,7 @@ import org.jboss.netty.channel.ReceiveBufferSizePredictor;
import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.internal.LinkedTransferQueue;
import org.jboss.netty.util.internal.QueueFactory;
/**
* A class responsible for registering channels with {@link Selector}.
@ -105,12 +105,12 @@ class NioDatagramWorker implements Runnable {
/**
* Queue of {@link ChannelRegistionTask}s
*/
private final Queue<Runnable> registerTaskQueue = new LinkedTransferQueue<Runnable>();
private final Queue<Runnable> registerTaskQueue = QueueFactory.createQueue(Runnable.class);
/**
* Queue of WriteTasks
*/
private final Queue<Runnable> writeTaskQueue = new LinkedTransferQueue<Runnable>();
private final Queue<Runnable> writeTaskQueue = QueueFactory.createQueue(Runnable.class);
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation

View File

@ -33,7 +33,6 @@ import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
import org.jboss.netty.util.internal.LinkedTransferQueue;
import org.jboss.netty.util.internal.ThreadLocalBoolean;
/**
@ -196,9 +195,7 @@ class NioSocketChannel extends AbstractChannel
}
}
private final class WriteRequestQueue extends LinkedTransferQueue<MessageEvent> {
private static final long serialVersionUID = -246694024103520626L;
private final class WriteRequestQueue extends AbstractWriteRequestQueue {
private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
@ -207,7 +204,7 @@ class NioSocketChannel extends AbstractChannel
@Override
public boolean offer(MessageEvent e) {
boolean success = super.offer(e);
boolean success = queue.offer(e);
assert success;
int messageSize = getMessageSize(e);
@ -229,7 +226,7 @@ class NioSocketChannel extends AbstractChannel
@Override
public MessageEvent poll() {
MessageEvent e = super.poll();
MessageEvent e = queue.poll();
if (e != null) {
int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);

View File

@ -47,7 +47,7 @@ import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.internal.DeadLockProofWorker;
import org.jboss.netty.util.internal.LinkedTransferQueue;
import org.jboss.netty.util.internal.QueueFactory;
/**
*
@ -70,8 +70,8 @@ class NioWorker implements Runnable {
private final AtomicBoolean wakenUp = new AtomicBoolean();
private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock();
private final Object startStopLock = new Object();
private final Queue<Runnable> registerTaskQueue = new LinkedTransferQueue<Runnable>();
private final Queue<Runnable> writeTaskQueue = new LinkedTransferQueue<Runnable>();
private final Queue<Runnable> registerTaskQueue = QueueFactory.createQueue(Runnable.class);
private final Queue<Runnable> writeTaskQueue = QueueFactory.createQueue(Runnable.class);
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
private final SocketReceiveBufferPool recvBufferPool = new SocketReceiveBufferPool();

View File

@ -23,7 +23,7 @@ import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.util.internal.LinkedTransferQueue;
import org.jboss.netty.util.internal.QueueFactory;
/**
* A combination of {@link HttpRequestEncoder} and {@link HttpResponseDecoder}
@ -46,7 +46,7 @@ public class HttpClientCodec implements ChannelUpstreamHandler,
ChannelDownstreamHandler {
/** A queue that is used for correlating a request and a response. */
final Queue<HttpMethod> queue = new LinkedTransferQueue<HttpMethod>();
final Queue<HttpMethod> queue = QueueFactory.createQueue(HttpMethod.class);
/** If true, decoding stops (i.e. pass-through) */
volatile boolean done;

View File

@ -24,7 +24,7 @@ import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.handler.codec.embedder.EncoderEmbedder;
import org.jboss.netty.util.internal.LinkedTransferQueue;
import org.jboss.netty.util.internal.QueueFactory;
/**
* Encodes the content of the outbound {@link HttpResponse} and {@link HttpChunk}.
@ -53,7 +53,7 @@ import org.jboss.netty.util.internal.LinkedTransferQueue;
*/
public abstract class HttpContentEncoder extends SimpleChannelHandler {
private final Queue<String> acceptEncodingQueue = new LinkedTransferQueue<String>();
private final Queue<String> acceptEncodingQueue = QueueFactory.createQueue(String.class);
private volatile EncoderEmbedder<ChannelBuffer> encoder;
/**

View File

@ -36,7 +36,7 @@ import org.jboss.netty.channel.WriteCompletionEvent;
import org.jboss.netty.util.DefaultObjectSizeEstimator;
import org.jboss.netty.util.ObjectSizeEstimator;
import org.jboss.netty.util.internal.ConcurrentIdentityHashMap;
import org.jboss.netty.util.internal.LinkedTransferQueue;
import org.jboss.netty.util.internal.QueueFactory;
import org.jboss.netty.util.internal.SharedResourceMisuseDetector;
/**
@ -212,7 +212,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
ThreadFactory threadFactory) {
super(corePoolSize, corePoolSize, keepAliveTime, unit,
new LinkedTransferQueue<Runnable>(), threadFactory, new NewThreadRunsPolicy());
QueueFactory.createQueue(Runnable.class), threadFactory, new NewThreadRunsPolicy());
if (objectSizeEstimator == null) {
throw new NullPointerException("objectSizeEstimator");

View File

@ -31,7 +31,7 @@ import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.util.ObjectSizeEstimator;
import org.jboss.netty.util.internal.ConcurrentIdentityWeakKeyHashMap;
import org.jboss.netty.util.internal.LinkedTransferQueue;
import org.jboss.netty.util.internal.QueueFactory;
/**
* A {@link MemoryAwareThreadPoolExecutor} which makes sure the events from the
@ -284,7 +284,7 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
}
private final class ChildExecutor implements Executor, Runnable {
private final Queue<Runnable> tasks = new LinkedTransferQueue<Runnable>();
private final Queue<Runnable> tasks = QueueFactory.createQueue(Runnable.class);
private final AtomicBoolean isRunning = new AtomicBoolean(false);
ChildExecutor() {

View File

@ -29,7 +29,7 @@ import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.util.internal.DeadLockProofWorker;
import org.jboss.netty.util.internal.LinkedTransferQueue;
import org.jboss.netty.util.internal.QueueFactory;
/**
* Emulates blocking read operation. This handler stores all received messages
@ -84,7 +84,7 @@ public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler {
* implementation.
*/
public BlockingReadHandler() {
this(new LinkedTransferQueue<ChannelEvent>());
this(QueueFactory.createQueue(ChannelEvent.class));
}
/**

View File

@ -33,7 +33,7 @@ import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.internal.LinkedTransferQueue;
import org.jboss.netty.util.internal.QueueFactory;
/**
* Emulates buffered write operation. This handler stores all write requests
@ -193,7 +193,7 @@ public class BufferedWriteHandler extends SimpleChannelHandler {
* into a single write request on {@link #flush()}
*/
public BufferedWriteHandler(boolean consolidateOnFlush) {
this(new LinkedTransferQueue<MessageEvent>(), consolidateOnFlush);
this(QueueFactory.createQueue(MessageEvent.class), consolidateOnFlush);
}
/**

View File

@ -50,8 +50,8 @@ import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.internal.LinkedTransferQueue;
import org.jboss.netty.util.internal.NonReentrantLock;
import org.jboss.netty.util.internal.QueueFactory;
/**
* Adds <a href="http://en.wikipedia.org/wiki/Transport_Layer_Security">SSL
@ -196,7 +196,7 @@ public class SslHandler extends FrameDecoder
int ignoreClosedChannelException;
final Object ignoreClosedChannelExceptionLock = new Object();
private final Queue<PendingWrite> pendingUnencryptedWrites = new LinkedList<PendingWrite>();
private final Queue<MessageEvent> pendingEncryptedWrites = new LinkedTransferQueue<MessageEvent>();
private final Queue<MessageEvent> pendingEncryptedWrites = QueueFactory.createQueue(MessageEvent.class);
private final NonReentrantLock pendingEncryptedWritesLock = new NonReentrantLock();
private volatile boolean issueHandshake = false;

View File

@ -35,7 +35,7 @@ import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.internal.LinkedTransferQueue;
import org.jboss.netty.util.internal.QueueFactory;
/**
* A {@link ChannelHandler} that adds support for writing a large data stream
@ -79,8 +79,7 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
private final Queue<MessageEvent> queue =
new LinkedTransferQueue<MessageEvent>();
private final Queue<MessageEvent> queue = QueueFactory.createQueue(MessageEvent.class);
private ChannelHandlerContext ctx;
private MessageEvent currentEvent;

View File

@ -0,0 +1,48 @@
/*
* Copyright 2011 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.util;
/**
* Utility which checks if {@value #UNSAFE} class can be found in the classpath
*
*
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://www.murkycloud.com/">Norman Maurer</a>
*
*/
public class UnsafeDetectUtil {
private static final String UNSAFE = "sun.misc.Unsafe";
public static boolean isUnsafeFound(ClassLoader loader) {
try {
Class.forName(UNSAFE, true, loader);
return true;
} catch (ClassNotFoundException e) {
return false;
}
}
public static boolean isUnsafeFound() {
return isUnsafeFound(UnsafeDetectUtil.class.getClassLoader());
}
private UnsafeDetectUtil() {
// only static method supported
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,69 @@
/*
* Copyright 2011 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.util.internal;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import org.jboss.netty.util.UnsafeDetectUtil;
/**
* This factory should be used to create the "optimal" {@link BlockingQueue} instance for the running JVM.
*
*
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://www.murkycloud.com/">Norman Maurer</a>
*
*/
public class QueueFactory {
private static final boolean useUnsafe = UnsafeDetectUtil.isUnsafeFound(QueueFactory.class.getClassLoader());
private QueueFactory() {
// only use static methods!
}
/**
* Create a new unbound {@link BlockingQueue}
*
* @param itemClass the {@link Class} type which will be used as {@link BlockingQueue} items
* @return queue the {@link BlockingQueue} implementation
*/
public static final <T> BlockingQueue<T> createQueue(Class<T> itemClass) {
if (useUnsafe) {
return new LinkedTransferQueue<T>();
} else {
return new LegacyLinkedTransferQueue<T>();
}
}
/**
* Create a new unbound {@link BlockingQueue}
*
* @param collection the collection which should get copied to the newly created {@link BlockingQueue}
* @param itemClass the {@link Class} type which will be used as {@link BlockingQueue} items
* @return queue the {@link BlockingQueue} implementation
*/
public static final <T> BlockingQueue<T> createQueue(Collection<? extends T> collection, Class<T> itemClass) {
if (useUnsafe) {
return new LinkedTransferQueue<T>(collection);
} else {
return new LegacyLinkedTransferQueue<T>(collection);
}
}
}