diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySession.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySession.java index a8d20d5a55..727b2a9ec0 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySession.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySession.java @@ -15,14 +15,13 @@ */ package io.netty.handler.codec.spdy; -import io.netty.util.internal.QueueFactory; - import java.util.Comparator; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.TreeSet; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; final class SpdySession { @@ -177,7 +176,7 @@ final class SpdySession { private final AtomicInteger sendWindowSize; private final AtomicInteger receiveWindowSize; private volatile int receiveWindowSizeLowerBound; - private final BlockingQueue pendingWriteQueue = QueueFactory.createQueue(); + private final Queue pendingWriteQueue = new ConcurrentLinkedQueue(); StreamState( byte priority, boolean remoteSideClosed, boolean localSideClosed, diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index e025f35ea5..c845595854 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -21,13 +21,13 @@ import io.netty.buffer.ChannelBuf; import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; import io.netty.util.DefaultAttributeMap; -import io.netty.util.internal.QueueFactory; import java.net.SocketAddress; import java.util.Collections; import java.util.EnumSet; +import java.util.Queue; import java.util.Set; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicReference; final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext { @@ -746,7 +746,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements static final class MessageBridge { final MessageBuf msgBuf = Unpooled.messageBuffer(); - final BlockingQueue exchangeBuf = QueueFactory.createQueue(); + final Queue exchangeBuf = new ConcurrentLinkedQueue(); void fill() { if (msgBuf.isEmpty()) { @@ -771,7 +771,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements static final class ByteBridge { final ByteBuf byteBuf = Unpooled.buffer(); - final BlockingQueue exchangeBuf = QueueFactory.createQueue(); + final Queue exchangeBuf = new ConcurrentLinkedQueue(); void fill() { if (!byteBuf.readable()) { diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java b/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java index 20e23cad42..ca066cb2a2 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java @@ -17,7 +17,6 @@ package io.netty.channel; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; -import io.netty.util.internal.QueueFactory; import java.util.ArrayList; import java.util.Collections; @@ -32,6 +31,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.Semaphore; @@ -71,7 +71,7 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService } }; - private final BlockingQueue taskQueue = QueueFactory.createQueue(); + private final BlockingQueue taskQueue = new LinkedBlockingQueue(); private final Thread thread; private final Object stateLock = new Object(); private final Semaphore threadLock = new Semaphore(0); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java b/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java index 72130dea46..bca22c8119 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java @@ -22,7 +22,6 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.EventExecutor; import io.netty.channel.EventLoop; import io.netty.channel.SingleThreadEventExecutor; -import io.netty.util.internal.QueueFactory; import java.util.Collection; import java.util.Collections; @@ -31,6 +30,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -45,7 +45,7 @@ public class OioEventLoop implements EventLoop { final ThreadFactory threadFactory; final Set activeChildren = Collections.newSetFromMap( new ConcurrentHashMap()); - final Queue idleChildren = QueueFactory.createQueue(); + final Queue idleChildren = new ConcurrentLinkedQueue(); private final ChannelException tooManyChannels; private final Unsafe unsafe = new Unsafe() { @Override diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java index cfca2d5851..012e9e8a2c 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java @@ -32,11 +32,11 @@ import io.netty.channel.ChannelOutboundMessageHandler; import io.netty.channel.DefaultEventExecutor; import io.netty.channel.EventExecutor; import io.netty.channel.EventLoop; -import io.netty.util.internal.QueueFactory; import java.util.HashSet; import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -337,8 +337,8 @@ public class LocalTransportThreadModelTest { private final AtomicReference exception = new AtomicReference(); - private final Queue inboundThreadNames = QueueFactory.createQueue(); - private final Queue outboundThreadNames = QueueFactory.createQueue(); + private final Queue inboundThreadNames = new ConcurrentLinkedQueue(); + private final Queue outboundThreadNames = new ConcurrentLinkedQueue(); @Override public MessageBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {