Replace usage of QueueFactory with ConcurrentLinkedQueue and LinkedBlockingQueue. See #477

This commit is contained in:
norman 2012-07-30 08:01:46 +02:00
parent bdde5a20f6
commit ba1c7c5c55
5 changed files with 14 additions and 15 deletions

View File

@ -15,14 +15,13 @@
*/
package io.netty.handler.codec.spdy;
import io.netty.util.internal.QueueFactory;
import java.util.Comparator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
final class SpdySession {
@ -177,7 +176,7 @@ final class SpdySession {
private final AtomicInteger sendWindowSize;
private final AtomicInteger receiveWindowSize;
private volatile int receiveWindowSizeLowerBound;
private final BlockingQueue<Object> pendingWriteQueue = QueueFactory.createQueue();
private final Queue<Object> pendingWriteQueue = new ConcurrentLinkedQueue<Object>();
StreamState(
byte priority, boolean remoteSideClosed, boolean localSideClosed,

View File

@ -21,13 +21,13 @@ import io.netty.buffer.ChannelBuf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.DefaultAttributeMap;
import io.netty.util.internal.QueueFactory;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext {
@ -746,7 +746,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
static final class MessageBridge {
final MessageBuf<Object> msgBuf = Unpooled.messageBuffer();
final BlockingQueue<Object[]> exchangeBuf = QueueFactory.createQueue();
final Queue<Object[]> exchangeBuf = new ConcurrentLinkedQueue<Object[]>();
void fill() {
if (msgBuf.isEmpty()) {
@ -771,7 +771,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
static final class ByteBridge {
final ByteBuf byteBuf = Unpooled.buffer();
final BlockingQueue<ByteBuf> exchangeBuf = QueueFactory.createQueue();
final Queue<ByteBuf> exchangeBuf = new ConcurrentLinkedQueue<ByteBuf>();
void fill() {
if (!byteBuf.readable()) {

View File

@ -17,7 +17,6 @@ package io.netty.channel;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.QueueFactory;
import java.util.ArrayList;
import java.util.Collections;
@ -32,6 +31,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
@ -71,7 +71,7 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
}
};
private final BlockingQueue<Runnable> taskQueue = QueueFactory.createQueue();
private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
private final Thread thread;
private final Object stateLock = new Object();
private final Semaphore threadLock = new Semaphore(0);

View File

@ -22,7 +22,6 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.EventExecutor;
import io.netty.channel.EventLoop;
import io.netty.channel.SingleThreadEventExecutor;
import io.netty.util.internal.QueueFactory;
import java.util.Collection;
import java.util.Collections;
@ -31,6 +30,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@ -45,7 +45,7 @@ public class OioEventLoop implements EventLoop {
final ThreadFactory threadFactory;
final Set<OioChildEventLoop> activeChildren = Collections.newSetFromMap(
new ConcurrentHashMap<OioChildEventLoop, Boolean>());
final Queue<OioChildEventLoop> idleChildren = QueueFactory.createQueue();
final Queue<OioChildEventLoop> idleChildren = new ConcurrentLinkedQueue<OioChildEventLoop>();
private final ChannelException tooManyChannels;
private final Unsafe unsafe = new Unsafe() {
@Override

View File

@ -32,11 +32,11 @@ import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.channel.DefaultEventExecutor;
import io.netty.channel.EventExecutor;
import io.netty.channel.EventLoop;
import io.netty.util.internal.QueueFactory;
import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@ -337,8 +337,8 @@ public class LocalTransportThreadModelTest {
private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
private final Queue<String> inboundThreadNames = QueueFactory.createQueue();
private final Queue<String> outboundThreadNames = QueueFactory.createQueue();
private final Queue<String> inboundThreadNames = new ConcurrentLinkedQueue<String>();
private final Queue<String> outboundThreadNames = new ConcurrentLinkedQueue<String>();
@Override
public MessageBuf<Object> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {