From b64124efd6aaadb7bea478d8dcb74af26cf9ebea Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Sat, 27 Sep 2008 14:22:52 +0000 Subject: [PATCH] * Renamed WriteMessageQueue to FastQueue, made it generic and optimized it * Added ConcurrentFastQueue (proof of concept yet - slow) --- .../channel/socket/nio/NioSocketChannel.java | 17 +-- .../netty/channel/socket/nio/NioWorker.java | 13 +- .../jboss/netty/util/ConcurrentFastQueue.java | 87 +++++++++++ .../java/org/jboss/netty/util/FastQueue.java | 77 ++++++++++ .../jboss/netty/util/WriteMessageQueue.java | 136 ------------------ 5 files changed, 172 insertions(+), 158 deletions(-) create mode 100644 src/main/java/org/jboss/netty/util/ConcurrentFastQueue.java create mode 100644 src/main/java/org/jboss/netty/util/FastQueue.java delete mode 100644 src/main/java/org/jboss/netty/util/WriteMessageQueue.java diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java index 1793e28a31..3e1af63d59 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java @@ -25,7 +25,6 @@ package org.jboss.netty.channel.socket.nio; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.channels.SocketChannel; -import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; import org.jboss.netty.channel.AbstractChannel; @@ -35,7 +34,7 @@ import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelSink; import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.util.WriteMessageQueue; +import org.jboss.netty.util.FastQueue; /** * @author The Netty Project (netty-dev@lists.jboss.org) @@ -53,8 +52,7 @@ abstract class NioSocketChannel extends AbstractChannel final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean(); final Runnable writeTask = new WriteTask(); final Object writeLock = new Object(); - final WriteMessageQueue writeBuffer = new WriteMessageQueue(); - private Queue internalWriteBuffer; + final FastQueue writeBuffer = new FastQueue(); MessageEvent currentWriteEvent; int currentWriteIndex; @@ -71,17 +69,6 @@ abstract class NioSocketChannel extends AbstractChannel abstract NioWorker getWorker(); abstract void setWorker(NioWorker worker); - Queue getInternalWriteBuffer() { - if (internalWriteBuffer == null) { - internalWriteBuffer = writeBuffer.drainAll(); - } - return internalWriteBuffer; - } - - void clearInternalWriteBuffer() { - internalWriteBuffer = null; - } - public NioSocketChannelConfig getConfig() { return config; } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index 024dd2cf5c..8bbee4bcda 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -32,7 +32,6 @@ import java.nio.channels.ScatteringByteChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; -import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; @@ -49,6 +48,7 @@ import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.util.FastQueue; import org.jboss.netty.util.ThreadRenamingRunnable; /** @@ -74,7 +74,8 @@ class NioWorker implements Runnable { volatile Selector selector; final AtomicBoolean wakenUp = new AtomicBoolean(); final ReadWriteLock selectorGuard = new ReentrantReadWriteLock(); - final Queue taskQueue = new ConcurrentLinkedQueue(); + final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue(); + //final ConcurrentFastQueue taskQueue = new ConcurrentFastQueue(); NioWorker(int bossId, int id, Executor executor) { this.bossId = bossId; @@ -366,13 +367,12 @@ class NioWorker implements Runnable { int bufIdx; synchronized (channel.writeLock) { - Queue internalWriteBuffer = channel.getInternalWriteBuffer(); + FastQueue internalWriteBuffer = channel.writeBuffer; evt = channel.currentWriteEvent; for (;;) { if (evt == null) { evt = internalWriteBuffer.poll(); if (evt == null) { - channel.clearInternalWriteBuffer(); channel.currentWriteEvent = null; removeOpWrite = true; break; @@ -443,13 +443,12 @@ class NioWorker implements Runnable { int writtenBytes = 0; synchronized (channel.writeLock) { - Queue internalWriteBuffer = channel.getInternalWriteBuffer(); + FastQueue internalWriteBuffer = channel.writeBuffer; evt = channel.currentWriteEvent; for (;;) { if (evt == null) { evt = internalWriteBuffer.poll(); if (evt == null) { - channel.clearInternalWriteBuffer(); channel.currentWriteEvent = null; removeOpWrite = true; break; @@ -692,7 +691,7 @@ class NioWorker implements Runnable { fireExceptionCaught(channel, cause); } - Queue internalWriteBuffer = channel.getInternalWriteBuffer(); + FastQueue internalWriteBuffer = channel.writeBuffer; for (;;) { evt = internalWriteBuffer.poll(); if (evt == null) { diff --git a/src/main/java/org/jboss/netty/util/ConcurrentFastQueue.java b/src/main/java/org/jboss/netty/util/ConcurrentFastQueue.java new file mode 100644 index 0000000000..4a4c0e46f9 --- /dev/null +++ b/src/main/java/org/jboss/netty/util/ConcurrentFastQueue.java @@ -0,0 +1,87 @@ +/* + * JBoss, Home of Professional Open Source + * + * Copyright 2008, Red Hat Middleware LLC, and individual contributors + * by the @author tags. See the COPYRIGHT.txt in the distribution for a + * full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.jboss.netty.util; + + +/** + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Trustin Lee (tlee@redhat.com) + * + * @version $Rev$, $Date$ + * + */ +public class ConcurrentFastQueue { + + private static final int DEFAULT_CONCURRENCY_LEVEL = 16; + + final FastQueue[] segments; + private final int mask; + private int pollIndex; + + public ConcurrentFastQueue() { + this(DEFAULT_CONCURRENCY_LEVEL); + } + + @SuppressWarnings("unchecked") + public ConcurrentFastQueue(int concurrencyLevel) { + if (concurrencyLevel <= 0) { + throw new IllegalArgumentException( + "concurrencyLevel: " + concurrencyLevel); + } + + int actualConcurrencyLevel = 1; + while (actualConcurrencyLevel < concurrencyLevel) { + actualConcurrencyLevel <<= 1; + } + + mask = actualConcurrencyLevel - 1; + segments = new FastQueue[actualConcurrencyLevel]; + for (int i = 0; i < actualConcurrencyLevel; i ++) { + segments[i] = new FastQueue(); + } + } + + public void offer(E e) { + segments[hash(e)].offer(e); + } + + public E poll() { + while (pollIndex < segments.length) { + E v = segments[pollIndex].poll(); + if (v != null) { + return v; + } + + pollIndex ++; + } + pollIndex = 0; + return null; + } + + private int hash(Object o) { + int hash = System.identityHashCode(o); + hash = (hash << 1) - (hash << 8); + hash &= mask; + return hash; + } +} diff --git a/src/main/java/org/jboss/netty/util/FastQueue.java b/src/main/java/org/jboss/netty/util/FastQueue.java new file mode 100644 index 0000000000..a688806606 --- /dev/null +++ b/src/main/java/org/jboss/netty/util/FastQueue.java @@ -0,0 +1,77 @@ +/* + * JBoss, Home of Professional Open Source + * + * Copyright 2008, Red Hat Middleware LLC, and individual contributors + * by the @author tags. See the COPYRIGHT.txt in the distribution for a + * full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.jboss.netty.util; + + +/** + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Trustin Lee (tlee@redhat.com) + * + * @version $Rev$, $Date$ + * + */ +public class FastQueue { + + static final int INITIAL_CAPACITY = 4; + + // Put + private Object[] elements; + private int size; + + // Take + private Object[] drainedElements; + private int drainedElementCount; + private int index; + + public synchronized void offer(E e) { + if (elements == null) { + elements = new Object[INITIAL_CAPACITY]; + } else if (size == elements.length) { + Object[] newElements = new Object[size << 1]; + System.arraycopy(elements, 0, newElements, 0, size); + elements = newElements; + } + + elements[size ++] = e; + } + + @SuppressWarnings("unchecked") + public E poll() { + if (drainedElements == null) { + synchronized (this) { + drainedElements = elements; + drainedElementCount = size; + elements = null; + size = 0; + } + index = 0; + } + + if (index < drainedElementCount) { + return (E) drainedElements[index ++]; + } + + drainedElements = null; + return null; + } +} diff --git a/src/main/java/org/jboss/netty/util/WriteMessageQueue.java b/src/main/java/org/jboss/netty/util/WriteMessageQueue.java deleted file mode 100644 index 40961f8aae..0000000000 --- a/src/main/java/org/jboss/netty/util/WriteMessageQueue.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * JBoss, Home of Professional Open Source - * - * Copyright 2008, Red Hat Middleware LLC, and individual contributors - * by the @author tags. See the COPYRIGHT.txt in the distribution for a - * full listing of individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ -package org.jboss.netty.util; - -import java.util.AbstractQueue; -import java.util.Iterator; -import java.util.Queue; - -import org.jboss.netty.channel.MessageEvent; - -/** - * @author The Netty Project (netty-dev@lists.jboss.org) - * @author Trustin Lee (tlee@redhat.com) - * - * @version $Rev$, $Date$ - * - */ -public class WriteMessageQueue { - - private static final Queue EMPTY_QUEUE = new EmptyQueue(); - - private MessageEvent[] elements; - private int size; - - public synchronized void offer(MessageEvent e) { - if (elements == null) { - elements = new MessageEvent[4]; - } else if (size == elements.length) { - MessageEvent[] newElements = new MessageEvent[size << 1]; - System.arraycopy(elements, 0, newElements, 0, size); - elements = newElements; - } - - elements[size ++] = e; - } - - public synchronized Queue drainAll() { - if (size == 0) { - return EMPTY_QUEUE; - } - - Queue drainedQueue = new DrainedQueue(elements, size); - elements = null; - size = 0; - return drainedQueue; - } - - private static class DrainedQueue extends AbstractQueue { - - private final MessageEvent[] elements; - private final int nElements; - private int index; - - DrainedQueue(MessageEvent[] elements, int nElements) { - this.elements = elements; - this.nElements = nElements; - } - - @Override - public Iterator iterator() { - throw new UnsupportedOperationException(); - } - - @Override - public int size() { - return nElements - index; - } - - public boolean offer(MessageEvent e) { - return false; - } - - public MessageEvent peek() { - if (index < nElements) { - return elements[index]; - } - return null; - } - - public MessageEvent poll() { - if (index < nElements) { - return elements[index ++]; - } - return null; - } - } - - private static class EmptyQueue extends AbstractQueue { - - EmptyQueue() { - super(); - } - - @Override - public Iterator iterator() { - throw new UnsupportedOperationException(); - } - - @Override - public int size() { - return 0; - } - - public boolean offer(MessageEvent e) { - return false; - } - - public MessageEvent peek() { - return null; - } - - public MessageEvent poll() { - return null; - } - } -}