diff --git a/src/main/java/org/jboss/netty/buffer/ChannelBuffers.java b/src/main/java/org/jboss/netty/buffer/ChannelBuffers.java index df0413ada8..6528d2cc95 100644 --- a/src/main/java/org/jboss/netty/buffer/ChannelBuffers.java +++ b/src/main/java/org/jboss/netty/buffer/ChannelBuffers.java @@ -385,7 +385,7 @@ public final class ChannelBuffers { * Creates a new composite buffer which wraps the readable bytes of the * specified buffers without copying them. A modification on the content * of the specified buffers will be visible to the returned buffer. - * + * * @throws IllegalArgumentException * if the specified buffers' endianness are different from each * other @@ -399,7 +399,7 @@ public final class ChannelBuffers { * of the specified buffers will be visible to the returned buffer. * If gathering is true then gathering writes will be used when ever * possible. - * + * * @throws IllegalArgumentException * if the specified buffers' endianness are different from each * other @@ -444,7 +444,7 @@ public final class ChannelBuffers { } return EMPTY_BUFFER; } - + /** * Creates a new composite buffer which wraps the slices of the specified diff --git a/src/main/java/org/jboss/netty/buffer/CompositeChannelBuffer.java b/src/main/java/org/jboss/netty/buffer/CompositeChannelBuffer.java index 9a014ee481..3c84532832 100644 --- a/src/main/java/org/jboss/netty/buffer/CompositeChannelBuffer.java +++ b/src/main/java/org/jboss/netty/buffer/CompositeChannelBuffer.java @@ -48,13 +48,13 @@ public class CompositeChannelBuffer extends AbstractChannelBuffer { } /** - * Return true if gathering writes / reads should be used + * Return true if gathering writes / reads should be used * for this {@link CompositeChannelBuffer} */ public boolean useGathering() { return gathering && DetectionUtil.javaVersion() >= 7; } - + /** * Same with {@link #slice(int, int)} except that this method returns a list. */ @@ -140,7 +140,7 @@ public class CompositeChannelBuffer extends AbstractChannelBuffer { private CompositeChannelBuffer(CompositeChannelBuffer buffer) { order = buffer.order; - this.gathering = buffer.gathering; + gathering = buffer.gathering; components = buffer.components.clone(); indices = buffer.indices.clone(); setIndex(buffer.readerIndex(), buffer.writerIndex()); diff --git a/src/main/java/org/jboss/netty/channel/socket/ChannelRunnableWrapper.java b/src/main/java/org/jboss/netty/channel/socket/ChannelRunnableWrapper.java index 494ccd0e33..9ae7f37369 100644 --- a/src/main/java/org/jboss/netty/channel/socket/ChannelRunnableWrapper.java +++ b/src/main/java/org/jboss/netty/channel/socket/ChannelRunnableWrapper.java @@ -1,58 +1,58 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.channel.socket; - -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.DefaultChannelFuture; - -public class ChannelRunnableWrapper extends DefaultChannelFuture implements Runnable { - - private final Runnable task; - private boolean started; - - public ChannelRunnableWrapper(Channel channel, Runnable task) { - super(channel, true); - this.task = task; - } - - - public void run() { - synchronized (this) { - if (!isCancelled()) { - started = true; - } else { - return; - } - } - try { - task.run(); - setSuccess(); - } catch (Throwable t) { - setFailure(t); - } - } - - @Override - public synchronized boolean cancel() { - if (started) { - return false; - } - return super.cancel(); - } - - - -} +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket; + +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.DefaultChannelFuture; + +public class ChannelRunnableWrapper extends DefaultChannelFuture implements Runnable { + + private final Runnable task; + private boolean started; + + public ChannelRunnableWrapper(Channel channel, Runnable task) { + super(channel, true); + this.task = task; + } + + + public void run() { + synchronized (this) { + if (!isCancelled()) { + started = true; + } else { + return; + } + } + try { + task.run(); + setSuccess(); + } catch (Throwable t) { + setFailure(t); + } + } + + @Override + public synchronized boolean cancel() { + if (started) { + return false; + } + return super.cancel(); + } + + + +} diff --git a/src/main/java/org/jboss/netty/channel/socket/Worker.java b/src/main/java/org/jboss/netty/channel/socket/Worker.java index c69c24912a..07780d344c 100644 --- a/src/main/java/org/jboss/netty/channel/socket/Worker.java +++ b/src/main/java/org/jboss/netty/channel/socket/Worker.java @@ -1,33 +1,33 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package org.jboss.netty.channel.socket; - -/** - * A {@link Worker} is responsible to dispatch IO operations - * - */ -public interface Worker extends Runnable { - - /** - * Execute the given {@link Runnable} in the IO-Thread. This may be now or - * later once the IO-Thread do some other work. - * - * @param task - * the {@link Runnable} to execute - */ - void executeInIoThread(Runnable task); -} +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.jboss.netty.channel.socket; + +/** + * A {@link Worker} is responsible to dispatch IO operations + * + */ +public interface Worker extends Runnable { + + /** + * Execute the given {@link Runnable} in the IO-Thread. This may be now or + * later once the IO-Thread do some other work. + * + * @param task + * the {@link Runnable} to execute + */ + void executeInIoThread(Runnable task); +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioChannel.java index 31421411c1..be7fbbd4e1 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioChannel.java @@ -1,368 +1,368 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.channel.socket.nio; - -import static org.jboss.netty.channel.Channels.*; - -import java.net.InetSocketAddress; -import java.nio.channels.SelectableChannel; -import java.nio.channels.WritableByteChannel; -import java.util.Collection; -import java.util.Iterator; -import java.util.Queue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.AbstractChannel; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelSink; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; -import org.jboss.netty.util.internal.QueueFactory; -import org.jboss.netty.util.internal.ThreadLocalBoolean; - -abstract class AbstractNioChannel extends AbstractChannel { - - /** - * The {@link AbstractNioWorker}. - */ - final AbstractNioWorker worker; - - /** - * Monitor object to synchronize access to InterestedOps. - */ - final Object interestOpsLock = new Object(); - - /** - * Monitor object for synchronizing access to the {@link WriteRequestQueue}. - */ - final Object writeLock = new Object(); - - /** - * WriteTask that performs write operations. - */ - final Runnable writeTask = new WriteTask(); - - /** - * Indicates if there is a {@link WriteTask} in the task queue. - */ - final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean(); - - /** - * Queue of write {@link MessageEvent}s. - */ - final Queue writeBufferQueue = new WriteRequestQueue(); - - /** - * Keeps track of the number of bytes that the {@link WriteRequestQueue} currently - * contains. - */ - final AtomicInteger writeBufferSize = new AtomicInteger(); - - /** - * Keeps track of the highWaterMark. - */ - final AtomicInteger highWaterMarkCounter = new AtomicInteger(); - - /** - * The current write {@link MessageEvent} - */ - MessageEvent currentWriteEvent; - SendBuffer currentWriteBuffer; - - /** - * Boolean that indicates that write operation is in progress. - */ - boolean inWriteNowLoop; - boolean writeSuspended; - - - private volatile InetSocketAddress localAddress; - volatile InetSocketAddress remoteAddress; - - final C channel; - - protected AbstractNioChannel(Integer id, Channel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, AbstractNioWorker worker, C ch) { - super(id, parent, factory, pipeline, sink); - this.worker = worker; - this.channel = ch; - } - - protected AbstractNioChannel( - Channel parent, ChannelFactory factory, - ChannelPipeline pipeline, ChannelSink sink, AbstractNioWorker worker, C ch) { - super(parent, factory, pipeline, sink); - this.worker = worker; - this.channel = ch; - } - - /** - * Return the {@link AbstractNioWorker} that handle the IO of the - * {@link AbstractNioChannel} - * - * @return worker - */ - public AbstractNioWorker getWorker() { - return worker; - } - - public InetSocketAddress getLocalAddress() { - InetSocketAddress localAddress = this.localAddress; - if (localAddress == null) { - try { - this.localAddress = localAddress = getLocalSocketAddress(); - } catch (Throwable t) { - // Sometimes fails on a closed socket in Windows. - return null; - } - } - return localAddress; - } - - public InetSocketAddress getRemoteAddress() { - InetSocketAddress remoteAddress = this.remoteAddress; - if (remoteAddress == null) { - try { - this.remoteAddress = remoteAddress = - getRemoteSocketAddress(); - } catch (Throwable t) { - // Sometimes fails on a closed socket in Windows. - return null; - } - } - return remoteAddress; - } - - public abstract NioChannelConfig getConfig(); - - int getRawInterestOps() { - return super.getInterestOps(); - } - - void setRawInterestOpsNow(int interestOps) { - super.setInterestOpsNow(interestOps); - } - - - @Override - public int getInterestOps() { - if (!isOpen()) { - return Channel.OP_WRITE; - } - - int interestOps = getRawInterestOps(); - int writeBufferSize = this.writeBufferSize.get(); - if (writeBufferSize != 0) { - if (highWaterMarkCounter.get() > 0) { - int lowWaterMark = getConfig().getWriteBufferLowWaterMark(); - if (writeBufferSize >= lowWaterMark) { - interestOps |= Channel.OP_WRITE; - } else { - interestOps &= ~Channel.OP_WRITE; - } - } else { - int highWaterMark = getConfig().getWriteBufferHighWaterMark(); - if (writeBufferSize >= highWaterMark) { - interestOps |= Channel.OP_WRITE; - } else { - interestOps &= ~Channel.OP_WRITE; - } - } - } else { - interestOps &= ~Channel.OP_WRITE; - } - - return interestOps; - } - - @Override - protected boolean setClosed() { - return super.setClosed(); - } - - abstract InetSocketAddress getLocalSocketAddress() throws Exception; - - abstract InetSocketAddress getRemoteSocketAddress() throws Exception; - - private final class WriteRequestQueue implements BlockingQueue { - private final ThreadLocalBoolean notifying = new ThreadLocalBoolean(); - - private final BlockingQueue queue; - - public WriteRequestQueue() { - this.queue = QueueFactory.createQueue(MessageEvent.class); - } - - public MessageEvent remove() { - return queue.remove(); - } - - public MessageEvent element() { - return queue.element(); - } - - public MessageEvent peek() { - return queue.peek(); - } - - public int size() { - return queue.size(); - } - - public boolean isEmpty() { - return queue.isEmpty(); - } - - public Iterator iterator() { - return queue.iterator(); - } - - public Object[] toArray() { - return queue.toArray(); - } - - public T[] toArray(T[] a) { - return queue.toArray(a); - } - - public boolean containsAll(Collection c) { - return queue.containsAll(c); - } - - public boolean addAll(Collection c) { - return queue.addAll(c); - } - - public boolean removeAll(Collection c) { - return queue.removeAll(c); - } - - public boolean retainAll(Collection c) { - return queue.retainAll(c); - } - - public void clear() { - queue.clear(); - } - - public boolean add(MessageEvent e) { - return queue.add(e); - } - - public void put(MessageEvent e) throws InterruptedException { - queue.put(e); - } - - public boolean offer(MessageEvent e, long timeout, TimeUnit unit) throws InterruptedException { - return queue.offer(e, timeout, unit); - } - - public MessageEvent take() throws InterruptedException { - return queue.take(); - } - - public MessageEvent poll(long timeout, TimeUnit unit) throws InterruptedException { - return queue.poll(timeout, unit); - } - - public int remainingCapacity() { - return queue.remainingCapacity(); - } - - public boolean remove(Object o) { - return queue.remove(o); - } - - public boolean contains(Object o) { - return queue.contains(o); - } - - public int drainTo(Collection c) { - return queue.drainTo(c); - } - - public int drainTo(Collection c, int maxElements) { - return queue.drainTo(c, maxElements); - } - - public boolean offer(MessageEvent e) { - boolean success = queue.offer(e); - assert success; - - int messageSize = getMessageSize(e); - int newWriteBufferSize = writeBufferSize.addAndGet(messageSize); - int highWaterMark = getConfig().getWriteBufferHighWaterMark(); - - if (newWriteBufferSize >= highWaterMark) { - if (newWriteBufferSize - messageSize < highWaterMark) { - highWaterMarkCounter.incrementAndGet(); - if (!notifying.get()) { - notifying.set(Boolean.TRUE); - fireChannelInterestChanged(AbstractNioChannel.this); - notifying.set(Boolean.FALSE); - } - } - } - return true; - } - - public MessageEvent poll() { - MessageEvent e = queue.poll(); - if (e != null) { - int messageSize = getMessageSize(e); - int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize); - int lowWaterMark = getConfig().getWriteBufferLowWaterMark(); - - if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) { - if (newWriteBufferSize + messageSize >= lowWaterMark) { - highWaterMarkCounter.decrementAndGet(); - if (isConnected() && !notifying.get()) { - notifying.set(Boolean.TRUE); - fireChannelInterestChanged(AbstractNioChannel.this); - notifying.set(Boolean.FALSE); - } - } - } - } - return e; - } - - private int getMessageSize(MessageEvent e) { - Object m = e.getMessage(); - if (m instanceof ChannelBuffer) { - return ((ChannelBuffer) m).readableBytes(); - } - return 0; - } - } - - private final class WriteTask implements Runnable { - - WriteTask() { - } - - public void run() { - writeTaskInTaskQueue.set(false); - worker.writeFromTaskLoop(AbstractNioChannel.this); - } - } - -} +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.nio; + +import static org.jboss.netty.channel.Channels.*; + +import java.net.InetSocketAddress; +import java.nio.channels.SelectableChannel; +import java.nio.channels.WritableByteChannel; +import java.util.Collection; +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.AbstractChannel; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelSink; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; +import org.jboss.netty.util.internal.QueueFactory; +import org.jboss.netty.util.internal.ThreadLocalBoolean; + +abstract class AbstractNioChannel extends AbstractChannel { + + /** + * The {@link AbstractNioWorker}. + */ + final AbstractNioWorker worker; + + /** + * Monitor object to synchronize access to InterestedOps. + */ + final Object interestOpsLock = new Object(); + + /** + * Monitor object for synchronizing access to the {@link WriteRequestQueue}. + */ + final Object writeLock = new Object(); + + /** + * WriteTask that performs write operations. + */ + final Runnable writeTask = new WriteTask(); + + /** + * Indicates if there is a {@link WriteTask} in the task queue. + */ + final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean(); + + /** + * Queue of write {@link MessageEvent}s. + */ + final Queue writeBufferQueue = new WriteRequestQueue(); + + /** + * Keeps track of the number of bytes that the {@link WriteRequestQueue} currently + * contains. + */ + final AtomicInteger writeBufferSize = new AtomicInteger(); + + /** + * Keeps track of the highWaterMark. + */ + final AtomicInteger highWaterMarkCounter = new AtomicInteger(); + + /** + * The current write {@link MessageEvent} + */ + MessageEvent currentWriteEvent; + SendBuffer currentWriteBuffer; + + /** + * Boolean that indicates that write operation is in progress. + */ + boolean inWriteNowLoop; + boolean writeSuspended; + + + private volatile InetSocketAddress localAddress; + volatile InetSocketAddress remoteAddress; + + final C channel; + + protected AbstractNioChannel(Integer id, Channel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, AbstractNioWorker worker, C ch) { + super(id, parent, factory, pipeline, sink); + this.worker = worker; + this.channel = ch; + } + + protected AbstractNioChannel( + Channel parent, ChannelFactory factory, + ChannelPipeline pipeline, ChannelSink sink, AbstractNioWorker worker, C ch) { + super(parent, factory, pipeline, sink); + this.worker = worker; + this.channel = ch; + } + + /** + * Return the {@link AbstractNioWorker} that handle the IO of the + * {@link AbstractNioChannel} + * + * @return worker + */ + public AbstractNioWorker getWorker() { + return worker; + } + + public InetSocketAddress getLocalAddress() { + InetSocketAddress localAddress = this.localAddress; + if (localAddress == null) { + try { + this.localAddress = localAddress = getLocalSocketAddress(); + } catch (Throwable t) { + // Sometimes fails on a closed socket in Windows. + return null; + } + } + return localAddress; + } + + public InetSocketAddress getRemoteAddress() { + InetSocketAddress remoteAddress = this.remoteAddress; + if (remoteAddress == null) { + try { + this.remoteAddress = remoteAddress = + getRemoteSocketAddress(); + } catch (Throwable t) { + // Sometimes fails on a closed socket in Windows. + return null; + } + } + return remoteAddress; + } + + public abstract NioChannelConfig getConfig(); + + int getRawInterestOps() { + return super.getInterestOps(); + } + + void setRawInterestOpsNow(int interestOps) { + super.setInterestOpsNow(interestOps); + } + + + @Override + public int getInterestOps() { + if (!isOpen()) { + return Channel.OP_WRITE; + } + + int interestOps = getRawInterestOps(); + int writeBufferSize = this.writeBufferSize.get(); + if (writeBufferSize != 0) { + if (highWaterMarkCounter.get() > 0) { + int lowWaterMark = getConfig().getWriteBufferLowWaterMark(); + if (writeBufferSize >= lowWaterMark) { + interestOps |= Channel.OP_WRITE; + } else { + interestOps &= ~Channel.OP_WRITE; + } + } else { + int highWaterMark = getConfig().getWriteBufferHighWaterMark(); + if (writeBufferSize >= highWaterMark) { + interestOps |= Channel.OP_WRITE; + } else { + interestOps &= ~Channel.OP_WRITE; + } + } + } else { + interestOps &= ~Channel.OP_WRITE; + } + + return interestOps; + } + + @Override + protected boolean setClosed() { + return super.setClosed(); + } + + abstract InetSocketAddress getLocalSocketAddress() throws Exception; + + abstract InetSocketAddress getRemoteSocketAddress() throws Exception; + + private final class WriteRequestQueue implements BlockingQueue { + private final ThreadLocalBoolean notifying = new ThreadLocalBoolean(); + + private final BlockingQueue queue; + + public WriteRequestQueue() { + this.queue = QueueFactory.createQueue(MessageEvent.class); + } + + public MessageEvent remove() { + return queue.remove(); + } + + public MessageEvent element() { + return queue.element(); + } + + public MessageEvent peek() { + return queue.peek(); + } + + public int size() { + return queue.size(); + } + + public boolean isEmpty() { + return queue.isEmpty(); + } + + public Iterator iterator() { + return queue.iterator(); + } + + public Object[] toArray() { + return queue.toArray(); + } + + public T[] toArray(T[] a) { + return queue.toArray(a); + } + + public boolean containsAll(Collection c) { + return queue.containsAll(c); + } + + public boolean addAll(Collection c) { + return queue.addAll(c); + } + + public boolean removeAll(Collection c) { + return queue.removeAll(c); + } + + public boolean retainAll(Collection c) { + return queue.retainAll(c); + } + + public void clear() { + queue.clear(); + } + + public boolean add(MessageEvent e) { + return queue.add(e); + } + + public void put(MessageEvent e) throws InterruptedException { + queue.put(e); + } + + public boolean offer(MessageEvent e, long timeout, TimeUnit unit) throws InterruptedException { + return queue.offer(e, timeout, unit); + } + + public MessageEvent take() throws InterruptedException { + return queue.take(); + } + + public MessageEvent poll(long timeout, TimeUnit unit) throws InterruptedException { + return queue.poll(timeout, unit); + } + + public int remainingCapacity() { + return queue.remainingCapacity(); + } + + public boolean remove(Object o) { + return queue.remove(o); + } + + public boolean contains(Object o) { + return queue.contains(o); + } + + public int drainTo(Collection c) { + return queue.drainTo(c); + } + + public int drainTo(Collection c, int maxElements) { + return queue.drainTo(c, maxElements); + } + + public boolean offer(MessageEvent e) { + boolean success = queue.offer(e); + assert success; + + int messageSize = getMessageSize(e); + int newWriteBufferSize = writeBufferSize.addAndGet(messageSize); + int highWaterMark = getConfig().getWriteBufferHighWaterMark(); + + if (newWriteBufferSize >= highWaterMark) { + if (newWriteBufferSize - messageSize < highWaterMark) { + highWaterMarkCounter.incrementAndGet(); + if (!notifying.get()) { + notifying.set(Boolean.TRUE); + fireChannelInterestChanged(AbstractNioChannel.this); + notifying.set(Boolean.FALSE); + } + } + } + return true; + } + + public MessageEvent poll() { + MessageEvent e = queue.poll(); + if (e != null) { + int messageSize = getMessageSize(e); + int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize); + int lowWaterMark = getConfig().getWriteBufferLowWaterMark(); + + if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) { + if (newWriteBufferSize + messageSize >= lowWaterMark) { + highWaterMarkCounter.decrementAndGet(); + if (isConnected() && !notifying.get()) { + notifying.set(Boolean.TRUE); + fireChannelInterestChanged(AbstractNioChannel.this); + notifying.set(Boolean.FALSE); + } + } + } + } + return e; + } + + private int getMessageSize(MessageEvent e) { + Object m = e.getMessage(); + if (m instanceof ChannelBuffer) { + return ((ChannelBuffer) m).readableBytes(); + } + return 0; + } + } + + private final class WriteTask implements Runnable { + + WriteTask() { + } + + public void run() { + writeTaskInTaskQueue.set(false); + worker.writeFromTaskLoop(AbstractNioChannel.this); + } + } + +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioChannelSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioChannelSink.java index b54d1f15f5..13f52a53bf 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioChannelSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioChannelSink.java @@ -1,52 +1,52 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package org.jboss.netty.channel.socket.nio; - -import org.jboss.netty.channel.AbstractChannelSink; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelEvent; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.socket.ChannelRunnableWrapper; - -public abstract class AbstractNioChannelSink extends AbstractChannelSink { - - @Override - public ChannelFuture execute(ChannelPipeline pipeline, final Runnable task) { - Channel ch = pipeline.getChannel(); - if (ch instanceof AbstractNioChannel) { - AbstractNioChannel channel = (AbstractNioChannel) ch; - ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task); - channel.worker.executeInIoThread(wrapper); - return wrapper; - } - return super.execute(pipeline, task); - - - } - - @Override - protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) { - Channel channel = event.getChannel(); - boolean fireLater = false; - if (channel instanceof AbstractNioChannel) { - fireLater = !AbstractNioWorker.isIoThread((AbstractNioChannel) channel); - } - return fireLater; - } - -} +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.jboss.netty.channel.socket.nio; + +import org.jboss.netty.channel.AbstractChannelSink; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelEvent; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.socket.ChannelRunnableWrapper; + +public abstract class AbstractNioChannelSink extends AbstractChannelSink { + + @Override + public ChannelFuture execute(ChannelPipeline pipeline, final Runnable task) { + Channel ch = pipeline.getChannel(); + if (ch instanceof AbstractNioChannel) { + AbstractNioChannel channel = (AbstractNioChannel) ch; + ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task); + channel.worker.executeInIoThread(wrapper); + return wrapper; + } + return super.execute(pipeline, task); + + + } + + @Override + protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) { + Channel channel = event.getChannel(); + boolean fireLater = false; + if (channel instanceof AbstractNioChannel) { + fireLater = !AbstractNioWorker.isIoThread((AbstractNioChannel) channel); + } + return fireLater; + } + +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorker.java index c5e6df1004..f7f9ad10e2 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorker.java @@ -1,824 +1,824 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.channel.socket.nio; - -import static org.jboss.netty.channel.Channels.*; - -import java.io.IOException; -import java.nio.channels.AsynchronousCloseException; -import java.nio.channels.CancelledKeyException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.NotYetConnectedException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.WritableByteChannel; -import java.util.Iterator; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelException; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.socket.Worker; -import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; -import org.jboss.netty.logging.InternalLogger; -import org.jboss.netty.logging.InternalLoggerFactory; -import org.jboss.netty.util.ThreadRenamingRunnable; -import org.jboss.netty.util.internal.DeadLockProofWorker; -import org.jboss.netty.util.internal.QueueFactory; - -abstract class AbstractNioWorker implements Worker { - - - private static final AtomicInteger nextId = new AtomicInteger(); - - final int id = nextId.incrementAndGet(); - - /** - * Internal Netty logger. - */ - private static final InternalLogger logger = InternalLoggerFactory - .getInstance(AbstractNioWorker.class); - - private static final int CONSTRAINT_LEVEL = NioProviderMetadata.CONSTRAINT_LEVEL; - - static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization. - - - /** - * Executor used to execute {@link Runnable}s such as channel registration - * task. - */ - private final Executor executor; - - /** - * Boolean to indicate if this worker has been started. - */ - private boolean started; - - /** - * If this worker has been started thread will be a reference to the thread - * used when starting. i.e. the current thread when the run method is executed. - */ - protected volatile Thread thread; - - /** - * The NIO {@link Selector}. - */ - volatile Selector selector; - - /** - * Boolean that controls determines if a blocked Selector.select should - * break out of its selection process. In our case we use a timeone for - * the select method and the select method will block for that time unless - * waken up. - */ - protected final AtomicBoolean wakenUp = new AtomicBoolean(); - - /** - * Lock for this workers Selector. - */ - private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock(); - - /** - * Monitor object used to synchronize selector open/close. - */ - private final Object startStopLock = new Object(); - - /** - * Queue of channel registration tasks. - */ - private final Queue registerTaskQueue = QueueFactory.createQueue(Runnable.class); - - /** - * Queue of WriteTasks - */ - protected final Queue writeTaskQueue = QueueFactory.createQueue(Runnable.class); - - private final Queue eventQueue = QueueFactory.createQueue(Runnable.class); - - - private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation - - protected final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool(); - - private final boolean allowShutdownOnIdle; - - AbstractNioWorker(Executor executor) { - this(executor, true); - } - - public AbstractNioWorker(Executor executor, boolean allowShutdownOnIdle) { - this.executor = executor; - this.allowShutdownOnIdle = allowShutdownOnIdle; - } - - void register(AbstractNioChannel channel, ChannelFuture future) { - - Runnable registerTask = createRegisterTask(channel, future); - Selector selector = start(); - - - boolean offered = registerTaskQueue.offer(registerTask); - assert offered; - - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - } - - /** - * Start the {@link AbstractNioWorker} and return the {@link Selector} that will be used for the {@link AbstractNioChannel}'s when they get registered - * - * @return selector - */ - private Selector start() { - synchronized (startStopLock) { - if (!started) { - // Open a selector if this worker didn't start yet. - try { - selector = Selector.open(); - } catch (Throwable t) { - throw new ChannelException("Failed to create a selector.", t); - } - - // Start the worker thread with the new Selector. - boolean success = false; - try { - DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O worker #" + id)); - success = true; - } finally { - if (!success) { - // Release the Selector if the execution fails. - try { - selector.close(); - } catch (Throwable t) { - logger.warn("Failed to close a selector.", t); - } - selector = null; - // The method will return to the caller at this point. - } - } - } - - assert selector != null && selector.isOpen(); - - started = true; - } - return selector; - } - - - public void run() { - thread = Thread.currentThread(); - - boolean shutdown = false; - Selector selector = this.selector; - for (;;) { - wakenUp.set(false); - - if (CONSTRAINT_LEVEL != 0) { - selectorGuard.writeLock().lock(); - // This empty synchronization block prevents the selector - // from acquiring its lock. - selectorGuard.writeLock().unlock(); - } - - try { - SelectorUtil.select(selector); - - // 'wakenUp.compareAndSet(false, true)' is always evaluated - // before calling 'selector.wakeup()' to reduce the wake-up - // overhead. (Selector.wakeup() is an expensive operation.) - // - // However, there is a race condition in this approach. - // The race condition is triggered when 'wakenUp' is set to - // true too early. - // - // 'wakenUp' is set to true too early if: - // 1) Selector is waken up between 'wakenUp.set(false)' and - // 'selector.select(...)'. (BAD) - // 2) Selector is waken up between 'selector.select(...)' and - // 'if (wakenUp.get()) { ... }'. (OK) - // - // In the first case, 'wakenUp' is set to true and the - // following 'selector.select(...)' will wake up immediately. - // Until 'wakenUp' is set to false again in the next round, - // 'wakenUp.compareAndSet(false, true)' will fail, and therefore - // any attempt to wake up the Selector will fail, too, causing - // the following 'selector.select(...)' call to block - // unnecessarily. - // - // To fix this problem, we wake up the selector again if wakenUp - // is true immediately after selector.select(...). - // It is inefficient in that it wakes up the selector for both - // the first case (BAD - wake-up required) and the second case - // (OK - no wake-up required). - - if (wakenUp.get()) { - selector.wakeup(); - } - - cancelledKeys = 0; - processRegisterTaskQueue(); - processEventQueue(); - processWriteTaskQueue(); - processSelectedKeys(selector.selectedKeys()); - - // Exit the loop when there's nothing to handle. - // The shutdown flag is used to delay the shutdown of this - // loop to avoid excessive Selector creation when - // connections are registered in a one-by-one manner instead of - // concurrent manner. - if (selector.keys().isEmpty()) { - if (shutdown || - executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) { - - synchronized (startStopLock) { - if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) { - started = false; - try { - selector.close(); - } catch (IOException e) { - logger.warn( - "Failed to close a selector.", e); - } finally { - this.selector = null; - } - break; - } else { - shutdown = false; - } - } - } else { - if (allowShutdownOnIdle) { - // Give one more second. - shutdown = true; - } - } - } else { - shutdown = false; - } - } catch (Throwable t) { - logger.warn( - "Unexpected exception in the selector loop.", t); - - // Prevent possible consecutive immediate failures that lead to - // excessive CPU consumption. - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // Ignore. - } - } - } - } - - public void executeInIoThread(Runnable task) { - executeInIoThread(task, false); - } - - /** - * Execute the {@link Runnable} in a IO-Thread - * - * @param task - * the {@link Runnable} to execute - * @param alwaysAsync - * true if the {@link Runnable} should be executed - * in an async fashion even if the current Thread == IO Thread - */ - public void executeInIoThread(Runnable task, boolean alwaysAsync) { - if (!alwaysAsync && Thread.currentThread() == thread) { - task.run(); - } else { - start(); - boolean added = eventQueue.offer(task); - - assert added; - if (added) { - // wake up the selector to speed things - Selector selector = this.selector; - if (selector != null) { - selector.wakeup(); - } - } - } - - } - - - private void processRegisterTaskQueue() throws IOException { - for (;;) { - final Runnable task = registerTaskQueue.poll(); - if (task == null) { - break; - } - - task.run(); - cleanUpCancelledKeys(); - } - } - - private void processWriteTaskQueue() throws IOException { - for (;;) { - final Runnable task = writeTaskQueue.poll(); - if (task == null) { - break; - } - - task.run(); - cleanUpCancelledKeys(); - } - } - - private void processEventQueue() throws IOException { - for (;;) { - final Runnable task = eventQueue.poll(); - if (task == null) { - break; - } - task.run(); - cleanUpCancelledKeys(); - } - } - - private void processSelectedKeys(Set selectedKeys) throws IOException { - for (Iterator i = selectedKeys.iterator(); i.hasNext();) { - SelectionKey k = i.next(); - i.remove(); - try { - int readyOps = k.readyOps(); - if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) { - if (!read(k)) { - // Connection already closed - no need to handle write. - continue; - } - } - if ((readyOps & SelectionKey.OP_WRITE) != 0) { - writeFromSelectorLoop(k); - } - } catch (CancelledKeyException e) { - close(k); - } - - if (cleanUpCancelledKeys()) { - break; // break the loop to avoid ConcurrentModificationException - } - } - } - - private boolean cleanUpCancelledKeys() throws IOException { - if (cancelledKeys >= CLEANUP_INTERVAL) { - cancelledKeys = 0; - selector.selectNow(); - return true; - } - return false; - } - - - - private void close(SelectionKey k) { - AbstractNioChannel ch = (AbstractNioChannel) k.attachment(); - close(ch, succeededFuture(ch)); - } - - void writeFromUserCode(final AbstractNioChannel channel) { - if (!channel.isConnected()) { - cleanUpWriteBuffer(channel); - return; - } - - if (scheduleWriteIfNecessary(channel)) { - return; - } - - // From here, we are sure Thread.currentThread() == workerThread. - - if (channel.writeSuspended) { - return; - } - - if (channel.inWriteNowLoop) { - return; - } - - write0(channel); - } - - void writeFromTaskLoop(AbstractNioChannel ch) { - if (!ch.writeSuspended) { - write0(ch); - } - } - - void writeFromSelectorLoop(final SelectionKey k) { - AbstractNioChannel ch = (AbstractNioChannel) k.attachment(); - ch.writeSuspended = false; - write0(ch); - } - - protected abstract boolean scheduleWriteIfNecessary(final AbstractNioChannel channel); - - protected void write0(AbstractNioChannel channel) { - boolean open = true; - boolean addOpWrite = false; - boolean removeOpWrite = false; - boolean iothread = isIoThread(channel); - - long writtenBytes = 0; - - final SocketSendBufferPool sendBufferPool = this.sendBufferPool; - final WritableByteChannel ch = channel.channel; - final Queue writeBuffer = channel.writeBufferQueue; - final int writeSpinCount = channel.getConfig().getWriteSpinCount(); - synchronized (channel.writeLock) { - channel.inWriteNowLoop = true; - for (;;) { - MessageEvent evt = channel.currentWriteEvent; - SendBuffer buf; - if (evt == null) { - if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) { - removeOpWrite = true; - channel.writeSuspended = false; - break; - } - - channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage()); - } else { - buf = channel.currentWriteBuffer; - } - - ChannelFuture future = evt.getFuture(); - try { - long localWrittenBytes = 0; - for (int i = writeSpinCount; i > 0; i --) { - localWrittenBytes = buf.transferTo(ch); - if (localWrittenBytes != 0) { - writtenBytes += localWrittenBytes; - break; - } - if (buf.finished()) { - break; - } - } - - if (buf.finished()) { - // Successful write - proceed to the next message. - buf.release(); - channel.currentWriteEvent = null; - channel.currentWriteBuffer = null; - evt = null; - buf = null; - future.setSuccess(); - } else { - // Not written fully - perhaps the kernel buffer is full. - addOpWrite = true; - channel.writeSuspended = true; - - if (localWrittenBytes > 0) { - // Notify progress listeners if necessary. - future.setProgress( - localWrittenBytes, - buf.writtenBytes(), buf.totalBytes()); - } - break; - } - } catch (AsynchronousCloseException e) { - // Doesn't need a user attention - ignore. - } catch (Throwable t) { - if (buf != null) { - buf.release(); - } - channel.currentWriteEvent = null; - channel.currentWriteBuffer = null; - buf = null; - evt = null; - future.setFailure(t); - if (iothread) { - fireExceptionCaught(channel, t); - } else { - fireExceptionCaughtLater(channel, t); - } - if (t instanceof IOException) { - open = false; - close(channel, succeededFuture(channel)); - } - } - } - channel.inWriteNowLoop = false; - - // Initially, the following block was executed after releasing - // the writeLock, but there was a race condition, and it has to be - // executed before releasing the writeLock: - // - // https://issues.jboss.org/browse/NETTY-410 - // - if (open) { - if (addOpWrite) { - setOpWrite(channel); - } else if (removeOpWrite) { - clearOpWrite(channel); - } - } - } - if (iothread) { - fireWriteComplete(channel, writtenBytes); - } else { - fireWriteCompleteLater(channel, writtenBytes); - } - } - - static boolean isIoThread(AbstractNioChannel channel) { - return Thread.currentThread() == channel.worker.thread; - } - - protected void setOpWrite(AbstractNioChannel channel) { - Selector selector = this.selector; - SelectionKey key = channel.channel.keyFor(selector); - if (key == null) { - return; - } - if (!key.isValid()) { - close(key); - return; - } - - // interestOps can change at any time and at any thread. - // Acquire a lock to avoid possible race condition. - synchronized (channel.interestOpsLock) { - int interestOps = channel.getRawInterestOps(); - if ((interestOps & SelectionKey.OP_WRITE) == 0) { - interestOps |= SelectionKey.OP_WRITE; - key.interestOps(interestOps); - channel.setRawInterestOpsNow(interestOps); - } - } - } - - protected void clearOpWrite(AbstractNioChannel channel) { - Selector selector = this.selector; - SelectionKey key = channel.channel.keyFor(selector); - if (key == null) { - return; - } - if (!key.isValid()) { - close(key); - return; - } - - // interestOps can change at any time and at any thread. - // Acquire a lock to avoid possible race condition. - synchronized (channel.interestOpsLock) { - int interestOps = channel.getRawInterestOps(); - if ((interestOps & SelectionKey.OP_WRITE) != 0) { - interestOps &= ~SelectionKey.OP_WRITE; - key.interestOps(interestOps); - channel.setRawInterestOpsNow(interestOps); - } - } - } - - - void close(AbstractNioChannel channel, ChannelFuture future) { - boolean connected = channel.isConnected(); - boolean bound = channel.isBound(); - boolean iothread = isIoThread(channel); - - try { - channel.channel.close(); - cancelledKeys ++; - - if (channel.setClosed()) { - future.setSuccess(); - if (connected) { - if (iothread) { - fireChannelDisconnected(channel); - } else { - fireChannelDisconnectedLater(channel); - } - } - if (bound) { - if (iothread) { - fireChannelUnbound(channel); - } else { - fireChannelUnboundLater(channel); - } - } - - cleanUpWriteBuffer(channel); - if (iothread) { - fireChannelClosed(channel); - } else { - fireChannelClosedLater(channel); - } - } else { - future.setSuccess(); - } - } catch (Throwable t) { - future.setFailure(t); - if (iothread) { - fireExceptionCaught(channel, t); - } else { - fireExceptionCaughtLater(channel, t); - } - } - } - - protected void cleanUpWriteBuffer(AbstractNioChannel channel) { - Exception cause = null; - boolean fireExceptionCaught = false; - - // Clean up the stale messages in the write buffer. - synchronized (channel.writeLock) { - MessageEvent evt = channel.currentWriteEvent; - if (evt != null) { - // Create the exception only once to avoid the excessive overhead - // caused by fillStackTrace. - if (channel.isOpen()) { - cause = new NotYetConnectedException(); - } else { - cause = new ClosedChannelException(); - } - - ChannelFuture future = evt.getFuture(); - channel.currentWriteBuffer.release(); - channel.currentWriteBuffer = null; - channel.currentWriteEvent = null; - evt = null; - future.setFailure(cause); - fireExceptionCaught = true; - } - - Queue writeBuffer = channel.writeBufferQueue; - for (;;) { - evt = writeBuffer.poll(); - if (evt == null) { - break; - } - // Create the exception only once to avoid the excessive overhead - // caused by fillStackTrace. - if (cause == null) { - if (channel.isOpen()) { - cause = new NotYetConnectedException(); - } else { - cause = new ClosedChannelException(); - } - fireExceptionCaught = true; - } - evt.getFuture().setFailure(cause); - - - } - } - - if (fireExceptionCaught) { - if (isIoThread(channel)) { - fireExceptionCaught(channel, cause); - } else { - fireExceptionCaughtLater(channel, cause); - } - } - } - - void setInterestOps(AbstractNioChannel channel, ChannelFuture future, int interestOps) { - boolean changed = false; - boolean iothread = isIoThread(channel); - try { - // interestOps can change at any time and at any thread. - // Acquire a lock to avoid possible race condition. - synchronized (channel.interestOpsLock) { - Selector selector = this.selector; - SelectionKey key = channel.channel.keyFor(selector); - - // Override OP_WRITE flag - a user cannot change this flag. - interestOps &= ~Channel.OP_WRITE; - interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE; - - if (key == null || selector == null) { - if (channel.getRawInterestOps() != interestOps) { - changed = true; - } - - // Not registered to the worker yet. - // Set the rawInterestOps immediately; RegisterTask will pick it up. - channel.setRawInterestOpsNow(interestOps); - - future.setSuccess(); - if (changed) { - if (iothread) { - fireChannelInterestChanged(channel); - } else { - fireChannelInterestChangedLater(channel); - } - } - - return; - } - - switch (CONSTRAINT_LEVEL) { - case 0: - if (channel.getRawInterestOps() != interestOps) { - key.interestOps(interestOps); - if (Thread.currentThread() != thread && - wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - changed = true; - } - break; - case 1: - case 2: - if (channel.getRawInterestOps() != interestOps) { - if (Thread.currentThread() == thread) { - key.interestOps(interestOps); - changed = true; - } else { - selectorGuard.readLock().lock(); - try { - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - key.interestOps(interestOps); - changed = true; - } finally { - selectorGuard.readLock().unlock(); - } - } - } - break; - default: - throw new Error(); - } - - if (changed) { - channel.setRawInterestOpsNow(interestOps); - } - } - - future.setSuccess(); - if (changed) { - if (iothread) { - fireChannelInterestChanged(channel); - } else { - fireChannelInterestChangedLater(channel); - } - } - } catch (CancelledKeyException e) { - // setInterestOps() was called on a closed channel. - ClosedChannelException cce = new ClosedChannelException(); - future.setFailure(cce); - if (iothread) { - fireExceptionCaught(channel, cce); - } else { - fireExceptionCaughtLater(channel, cce); - } - } catch (Throwable t) { - future.setFailure(t); - if (iothread) { - fireExceptionCaught(channel, t); - } else { - fireExceptionCaughtLater(channel, t); - } - } - } - - /** - * Read is called when a Selector has been notified that the underlying channel - * was something to be read. The channel would previously have registered its interest - * in read operations. - * - * @param k The selection key which contains the Selector registration information. - */ - protected abstract boolean read(SelectionKey k); - - /** - * Create a new {@link Runnable} which will register the {@link AbstractNioWorker} with the {@link Channel} - * - * @param channel - * @param future - * @return task - */ - protected abstract Runnable createRegisterTask(AbstractNioChannel channel, ChannelFuture future); - -} +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.nio; + +import static org.jboss.netty.channel.Channels.*; + +import java.io.IOException; +import java.nio.channels.AsynchronousCloseException; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.NotYetConnectedException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.WritableByteChannel; +import java.util.Iterator; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.socket.Worker; +import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; +import org.jboss.netty.logging.InternalLogger; +import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.util.ThreadRenamingRunnable; +import org.jboss.netty.util.internal.DeadLockProofWorker; +import org.jboss.netty.util.internal.QueueFactory; + +abstract class AbstractNioWorker implements Worker { + + + private static final AtomicInteger nextId = new AtomicInteger(); + + final int id = nextId.incrementAndGet(); + + /** + * Internal Netty logger. + */ + private static final InternalLogger logger = InternalLoggerFactory + .getInstance(AbstractNioWorker.class); + + private static final int CONSTRAINT_LEVEL = NioProviderMetadata.CONSTRAINT_LEVEL; + + static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization. + + + /** + * Executor used to execute {@link Runnable}s such as channel registration + * task. + */ + private final Executor executor; + + /** + * Boolean to indicate if this worker has been started. + */ + private boolean started; + + /** + * If this worker has been started thread will be a reference to the thread + * used when starting. i.e. the current thread when the run method is executed. + */ + protected volatile Thread thread; + + /** + * The NIO {@link Selector}. + */ + volatile Selector selector; + + /** + * Boolean that controls determines if a blocked Selector.select should + * break out of its selection process. In our case we use a timeone for + * the select method and the select method will block for that time unless + * waken up. + */ + protected final AtomicBoolean wakenUp = new AtomicBoolean(); + + /** + * Lock for this workers Selector. + */ + private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock(); + + /** + * Monitor object used to synchronize selector open/close. + */ + private final Object startStopLock = new Object(); + + /** + * Queue of channel registration tasks. + */ + private final Queue registerTaskQueue = QueueFactory.createQueue(Runnable.class); + + /** + * Queue of WriteTasks + */ + protected final Queue writeTaskQueue = QueueFactory.createQueue(Runnable.class); + + private final Queue eventQueue = QueueFactory.createQueue(Runnable.class); + + + private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation + + protected final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool(); + + private final boolean allowShutdownOnIdle; + + AbstractNioWorker(Executor executor) { + this(executor, true); + } + + public AbstractNioWorker(Executor executor, boolean allowShutdownOnIdle) { + this.executor = executor; + this.allowShutdownOnIdle = allowShutdownOnIdle; + } + + void register(AbstractNioChannel channel, ChannelFuture future) { + + Runnable registerTask = createRegisterTask(channel, future); + Selector selector = start(); + + + boolean offered = registerTaskQueue.offer(registerTask); + assert offered; + + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + } + + /** + * Start the {@link AbstractNioWorker} and return the {@link Selector} that will be used for the {@link AbstractNioChannel}'s when they get registered + * + * @return selector + */ + private Selector start() { + synchronized (startStopLock) { + if (!started) { + // Open a selector if this worker didn't start yet. + try { + selector = Selector.open(); + } catch (Throwable t) { + throw new ChannelException("Failed to create a selector.", t); + } + + // Start the worker thread with the new Selector. + boolean success = false; + try { + DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O worker #" + id)); + success = true; + } finally { + if (!success) { + // Release the Selector if the execution fails. + try { + selector.close(); + } catch (Throwable t) { + logger.warn("Failed to close a selector.", t); + } + selector = null; + // The method will return to the caller at this point. + } + } + } + + assert selector != null && selector.isOpen(); + + started = true; + } + return selector; + } + + + public void run() { + thread = Thread.currentThread(); + + boolean shutdown = false; + Selector selector = this.selector; + for (;;) { + wakenUp.set(false); + + if (CONSTRAINT_LEVEL != 0) { + selectorGuard.writeLock().lock(); + // This empty synchronization block prevents the selector + // from acquiring its lock. + selectorGuard.writeLock().unlock(); + } + + try { + SelectorUtil.select(selector); + + // 'wakenUp.compareAndSet(false, true)' is always evaluated + // before calling 'selector.wakeup()' to reduce the wake-up + // overhead. (Selector.wakeup() is an expensive operation.) + // + // However, there is a race condition in this approach. + // The race condition is triggered when 'wakenUp' is set to + // true too early. + // + // 'wakenUp' is set to true too early if: + // 1) Selector is waken up between 'wakenUp.set(false)' and + // 'selector.select(...)'. (BAD) + // 2) Selector is waken up between 'selector.select(...)' and + // 'if (wakenUp.get()) { ... }'. (OK) + // + // In the first case, 'wakenUp' is set to true and the + // following 'selector.select(...)' will wake up immediately. + // Until 'wakenUp' is set to false again in the next round, + // 'wakenUp.compareAndSet(false, true)' will fail, and therefore + // any attempt to wake up the Selector will fail, too, causing + // the following 'selector.select(...)' call to block + // unnecessarily. + // + // To fix this problem, we wake up the selector again if wakenUp + // is true immediately after selector.select(...). + // It is inefficient in that it wakes up the selector for both + // the first case (BAD - wake-up required) and the second case + // (OK - no wake-up required). + + if (wakenUp.get()) { + selector.wakeup(); + } + + cancelledKeys = 0; + processRegisterTaskQueue(); + processEventQueue(); + processWriteTaskQueue(); + processSelectedKeys(selector.selectedKeys()); + + // Exit the loop when there's nothing to handle. + // The shutdown flag is used to delay the shutdown of this + // loop to avoid excessive Selector creation when + // connections are registered in a one-by-one manner instead of + // concurrent manner. + if (selector.keys().isEmpty()) { + if (shutdown || + executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) { + + synchronized (startStopLock) { + if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) { + started = false; + try { + selector.close(); + } catch (IOException e) { + logger.warn( + "Failed to close a selector.", e); + } finally { + this.selector = null; + } + break; + } else { + shutdown = false; + } + } + } else { + if (allowShutdownOnIdle) { + // Give one more second. + shutdown = true; + } + } + } else { + shutdown = false; + } + } catch (Throwable t) { + logger.warn( + "Unexpected exception in the selector loop.", t); + + // Prevent possible consecutive immediate failures that lead to + // excessive CPU consumption. + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Ignore. + } + } + } + } + + public void executeInIoThread(Runnable task) { + executeInIoThread(task, false); + } + + /** + * Execute the {@link Runnable} in a IO-Thread + * + * @param task + * the {@link Runnable} to execute + * @param alwaysAsync + * true if the {@link Runnable} should be executed + * in an async fashion even if the current Thread == IO Thread + */ + public void executeInIoThread(Runnable task, boolean alwaysAsync) { + if (!alwaysAsync && Thread.currentThread() == thread) { + task.run(); + } else { + start(); + boolean added = eventQueue.offer(task); + + assert added; + if (added) { + // wake up the selector to speed things + Selector selector = this.selector; + if (selector != null) { + selector.wakeup(); + } + } + } + + } + + + private void processRegisterTaskQueue() throws IOException { + for (;;) { + final Runnable task = registerTaskQueue.poll(); + if (task == null) { + break; + } + + task.run(); + cleanUpCancelledKeys(); + } + } + + private void processWriteTaskQueue() throws IOException { + for (;;) { + final Runnable task = writeTaskQueue.poll(); + if (task == null) { + break; + } + + task.run(); + cleanUpCancelledKeys(); + } + } + + private void processEventQueue() throws IOException { + for (;;) { + final Runnable task = eventQueue.poll(); + if (task == null) { + break; + } + task.run(); + cleanUpCancelledKeys(); + } + } + + private void processSelectedKeys(Set selectedKeys) throws IOException { + for (Iterator i = selectedKeys.iterator(); i.hasNext();) { + SelectionKey k = i.next(); + i.remove(); + try { + int readyOps = k.readyOps(); + if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) { + if (!read(k)) { + // Connection already closed - no need to handle write. + continue; + } + } + if ((readyOps & SelectionKey.OP_WRITE) != 0) { + writeFromSelectorLoop(k); + } + } catch (CancelledKeyException e) { + close(k); + } + + if (cleanUpCancelledKeys()) { + break; // break the loop to avoid ConcurrentModificationException + } + } + } + + private boolean cleanUpCancelledKeys() throws IOException { + if (cancelledKeys >= CLEANUP_INTERVAL) { + cancelledKeys = 0; + selector.selectNow(); + return true; + } + return false; + } + + + + private void close(SelectionKey k) { + AbstractNioChannel ch = (AbstractNioChannel) k.attachment(); + close(ch, succeededFuture(ch)); + } + + void writeFromUserCode(final AbstractNioChannel channel) { + if (!channel.isConnected()) { + cleanUpWriteBuffer(channel); + return; + } + + if (scheduleWriteIfNecessary(channel)) { + return; + } + + // From here, we are sure Thread.currentThread() == workerThread. + + if (channel.writeSuspended) { + return; + } + + if (channel.inWriteNowLoop) { + return; + } + + write0(channel); + } + + void writeFromTaskLoop(AbstractNioChannel ch) { + if (!ch.writeSuspended) { + write0(ch); + } + } + + void writeFromSelectorLoop(final SelectionKey k) { + AbstractNioChannel ch = (AbstractNioChannel) k.attachment(); + ch.writeSuspended = false; + write0(ch); + } + + protected abstract boolean scheduleWriteIfNecessary(final AbstractNioChannel channel); + + protected void write0(AbstractNioChannel channel) { + boolean open = true; + boolean addOpWrite = false; + boolean removeOpWrite = false; + boolean iothread = isIoThread(channel); + + long writtenBytes = 0; + + final SocketSendBufferPool sendBufferPool = this.sendBufferPool; + final WritableByteChannel ch = channel.channel; + final Queue writeBuffer = channel.writeBufferQueue; + final int writeSpinCount = channel.getConfig().getWriteSpinCount(); + synchronized (channel.writeLock) { + channel.inWriteNowLoop = true; + for (;;) { + MessageEvent evt = channel.currentWriteEvent; + SendBuffer buf; + if (evt == null) { + if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) { + removeOpWrite = true; + channel.writeSuspended = false; + break; + } + + channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage()); + } else { + buf = channel.currentWriteBuffer; + } + + ChannelFuture future = evt.getFuture(); + try { + long localWrittenBytes = 0; + for (int i = writeSpinCount; i > 0; i --) { + localWrittenBytes = buf.transferTo(ch); + if (localWrittenBytes != 0) { + writtenBytes += localWrittenBytes; + break; + } + if (buf.finished()) { + break; + } + } + + if (buf.finished()) { + // Successful write - proceed to the next message. + buf.release(); + channel.currentWriteEvent = null; + channel.currentWriteBuffer = null; + evt = null; + buf = null; + future.setSuccess(); + } else { + // Not written fully - perhaps the kernel buffer is full. + addOpWrite = true; + channel.writeSuspended = true; + + if (localWrittenBytes > 0) { + // Notify progress listeners if necessary. + future.setProgress( + localWrittenBytes, + buf.writtenBytes(), buf.totalBytes()); + } + break; + } + } catch (AsynchronousCloseException e) { + // Doesn't need a user attention - ignore. + } catch (Throwable t) { + if (buf != null) { + buf.release(); + } + channel.currentWriteEvent = null; + channel.currentWriteBuffer = null; + buf = null; + evt = null; + future.setFailure(t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } + if (t instanceof IOException) { + open = false; + close(channel, succeededFuture(channel)); + } + } + } + channel.inWriteNowLoop = false; + + // Initially, the following block was executed after releasing + // the writeLock, but there was a race condition, and it has to be + // executed before releasing the writeLock: + // + // https://issues.jboss.org/browse/NETTY-410 + // + if (open) { + if (addOpWrite) { + setOpWrite(channel); + } else if (removeOpWrite) { + clearOpWrite(channel); + } + } + } + if (iothread) { + fireWriteComplete(channel, writtenBytes); + } else { + fireWriteCompleteLater(channel, writtenBytes); + } + } + + static boolean isIoThread(AbstractNioChannel channel) { + return Thread.currentThread() == channel.worker.thread; + } + + protected void setOpWrite(AbstractNioChannel channel) { + Selector selector = this.selector; + SelectionKey key = channel.channel.keyFor(selector); + if (key == null) { + return; + } + if (!key.isValid()) { + close(key); + return; + } + + // interestOps can change at any time and at any thread. + // Acquire a lock to avoid possible race condition. + synchronized (channel.interestOpsLock) { + int interestOps = channel.getRawInterestOps(); + if ((interestOps & SelectionKey.OP_WRITE) == 0) { + interestOps |= SelectionKey.OP_WRITE; + key.interestOps(interestOps); + channel.setRawInterestOpsNow(interestOps); + } + } + } + + protected void clearOpWrite(AbstractNioChannel channel) { + Selector selector = this.selector; + SelectionKey key = channel.channel.keyFor(selector); + if (key == null) { + return; + } + if (!key.isValid()) { + close(key); + return; + } + + // interestOps can change at any time and at any thread. + // Acquire a lock to avoid possible race condition. + synchronized (channel.interestOpsLock) { + int interestOps = channel.getRawInterestOps(); + if ((interestOps & SelectionKey.OP_WRITE) != 0) { + interestOps &= ~SelectionKey.OP_WRITE; + key.interestOps(interestOps); + channel.setRawInterestOpsNow(interestOps); + } + } + } + + + void close(AbstractNioChannel channel, ChannelFuture future) { + boolean connected = channel.isConnected(); + boolean bound = channel.isBound(); + boolean iothread = isIoThread(channel); + + try { + channel.channel.close(); + cancelledKeys ++; + + if (channel.setClosed()) { + future.setSuccess(); + if (connected) { + if (iothread) { + fireChannelDisconnected(channel); + } else { + fireChannelDisconnectedLater(channel); + } + } + if (bound) { + if (iothread) { + fireChannelUnbound(channel); + } else { + fireChannelUnboundLater(channel); + } + } + + cleanUpWriteBuffer(channel); + if (iothread) { + fireChannelClosed(channel); + } else { + fireChannelClosedLater(channel); + } + } else { + future.setSuccess(); + } + } catch (Throwable t) { + future.setFailure(t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } + } + } + + protected void cleanUpWriteBuffer(AbstractNioChannel channel) { + Exception cause = null; + boolean fireExceptionCaught = false; + + // Clean up the stale messages in the write buffer. + synchronized (channel.writeLock) { + MessageEvent evt = channel.currentWriteEvent; + if (evt != null) { + // Create the exception only once to avoid the excessive overhead + // caused by fillStackTrace. + if (channel.isOpen()) { + cause = new NotYetConnectedException(); + } else { + cause = new ClosedChannelException(); + } + + ChannelFuture future = evt.getFuture(); + channel.currentWriteBuffer.release(); + channel.currentWriteBuffer = null; + channel.currentWriteEvent = null; + evt = null; + future.setFailure(cause); + fireExceptionCaught = true; + } + + Queue writeBuffer = channel.writeBufferQueue; + for (;;) { + evt = writeBuffer.poll(); + if (evt == null) { + break; + } + // Create the exception only once to avoid the excessive overhead + // caused by fillStackTrace. + if (cause == null) { + if (channel.isOpen()) { + cause = new NotYetConnectedException(); + } else { + cause = new ClosedChannelException(); + } + fireExceptionCaught = true; + } + evt.getFuture().setFailure(cause); + + + } + } + + if (fireExceptionCaught) { + if (isIoThread(channel)) { + fireExceptionCaught(channel, cause); + } else { + fireExceptionCaughtLater(channel, cause); + } + } + } + + void setInterestOps(AbstractNioChannel channel, ChannelFuture future, int interestOps) { + boolean changed = false; + boolean iothread = isIoThread(channel); + try { + // interestOps can change at any time and at any thread. + // Acquire a lock to avoid possible race condition. + synchronized (channel.interestOpsLock) { + Selector selector = this.selector; + SelectionKey key = channel.channel.keyFor(selector); + + // Override OP_WRITE flag - a user cannot change this flag. + interestOps &= ~Channel.OP_WRITE; + interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE; + + if (key == null || selector == null) { + if (channel.getRawInterestOps() != interestOps) { + changed = true; + } + + // Not registered to the worker yet. + // Set the rawInterestOps immediately; RegisterTask will pick it up. + channel.setRawInterestOpsNow(interestOps); + + future.setSuccess(); + if (changed) { + if (iothread) { + fireChannelInterestChanged(channel); + } else { + fireChannelInterestChangedLater(channel); + } + } + + return; + } + + switch (CONSTRAINT_LEVEL) { + case 0: + if (channel.getRawInterestOps() != interestOps) { + key.interestOps(interestOps); + if (Thread.currentThread() != thread && + wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + changed = true; + } + break; + case 1: + case 2: + if (channel.getRawInterestOps() != interestOps) { + if (Thread.currentThread() == thread) { + key.interestOps(interestOps); + changed = true; + } else { + selectorGuard.readLock().lock(); + try { + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + key.interestOps(interestOps); + changed = true; + } finally { + selectorGuard.readLock().unlock(); + } + } + } + break; + default: + throw new Error(); + } + + if (changed) { + channel.setRawInterestOpsNow(interestOps); + } + } + + future.setSuccess(); + if (changed) { + if (iothread) { + fireChannelInterestChanged(channel); + } else { + fireChannelInterestChangedLater(channel); + } + } + } catch (CancelledKeyException e) { + // setInterestOps() was called on a closed channel. + ClosedChannelException cce = new ClosedChannelException(); + future.setFailure(cce); + if (iothread) { + fireExceptionCaught(channel, cce); + } else { + fireExceptionCaughtLater(channel, cce); + } + } catch (Throwable t) { + future.setFailure(t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } + } + } + + /** + * Read is called when a Selector has been notified that the underlying channel + * was something to be read. The channel would previously have registered its interest + * in read operations. + * + * @param k The selection key which contains the Selector registration information. + */ + protected abstract boolean read(SelectionKey k); + + /** + * Create a new {@link Runnable} which will register the {@link AbstractNioWorker} with the {@link Channel} + * + * @param channel + * @param future + * @return task + */ + protected abstract Runnable createRegisterTask(AbstractNioChannel channel, ChannelFuture future); + +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorkerPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorkerPool.java index b3400c9832..35e7055c7a 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorkerPool.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorkerPool.java @@ -1,83 +1,83 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package org.jboss.netty.channel.socket.nio; - -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; - -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.socket.Worker; -import org.jboss.netty.util.ExternalResourceReleasable; -import org.jboss.netty.util.internal.ExecutorUtil; - -/** - * Abstract base class for {@link WorkerPool} implementations that create the {@link Worker}'s up-front and return them in a "fair" fashion when calling - * {@link #nextWorker()} - * - */ -public abstract class AbstractNioWorkerPool implements WorkerPool , ExternalResourceReleasable { - - private final AbstractNioWorker[] workers; - private final AtomicInteger workerIndex = new AtomicInteger(); - private final Executor workerExecutor; - - /** - * Create a new instance - * - * @param workerExecutor the {@link Executor} to use for the {@link Worker}'s - * @param allowShutdownOnIdle allow the {@link Worker}'s to shutdown when there is not {@link Channel} is registered with it - * @param workerCount the count of {@link Worker}'s to create - */ - AbstractNioWorkerPool(Executor workerExecutor, int workerCount, boolean allowShutDownOnIdle) { - if (workerExecutor == null) { - throw new NullPointerException("workerExecutor"); - } - if (workerCount <= 0) { - throw new IllegalArgumentException( - "workerCount (" + workerCount + ") " + - "must be a positive integer."); - } - workers = new AbstractNioWorker[workerCount]; - - for (int i = 0; i < workers.length; i++) { - workers[i] = createWorker(workerExecutor, allowShutDownOnIdle); - } - this.workerExecutor = workerExecutor; - - } - - /** - * Create a new {@link Worker} which uses the given {@link Executor} to service IO - * - * - * @param executor the {@link Executor} to use - * @param allowShutdownOnIdle allow the {@link Worker} to shutdown when there is not {@link Channel} is registered with it - * @return worker the new {@link Worker} - */ - protected abstract E createWorker(Executor executor, boolean allowShutdownOnIdle); - - @SuppressWarnings("unchecked") - public E nextWorker() { - return (E) workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]; - } - - - public void releaseExternalResources() { - ExecutorUtil.terminate(workerExecutor); - } - -} +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.jboss.netty.channel.socket.nio; + +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; + +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.socket.Worker; +import org.jboss.netty.util.ExternalResourceReleasable; +import org.jboss.netty.util.internal.ExecutorUtil; + +/** + * Abstract base class for {@link WorkerPool} implementations that create the {@link Worker}'s up-front and return them in a "fair" fashion when calling + * {@link #nextWorker()} + * + */ +public abstract class AbstractNioWorkerPool implements WorkerPool , ExternalResourceReleasable { + + private final AbstractNioWorker[] workers; + private final AtomicInteger workerIndex = new AtomicInteger(); + private final Executor workerExecutor; + + /** + * Create a new instance + * + * @param workerExecutor the {@link Executor} to use for the {@link Worker}'s + * @param allowShutdownOnIdle allow the {@link Worker}'s to shutdown when there is not {@link Channel} is registered with it + * @param workerCount the count of {@link Worker}'s to create + */ + AbstractNioWorkerPool(Executor workerExecutor, int workerCount, boolean allowShutDownOnIdle) { + if (workerExecutor == null) { + throw new NullPointerException("workerExecutor"); + } + if (workerCount <= 0) { + throw new IllegalArgumentException( + "workerCount (" + workerCount + ") " + + "must be a positive integer."); + } + workers = new AbstractNioWorker[workerCount]; + + for (int i = 0; i < workers.length; i++) { + workers[i] = createWorker(workerExecutor, allowShutDownOnIdle); + } + this.workerExecutor = workerExecutor; + + } + + /** + * Create a new {@link Worker} which uses the given {@link Executor} to service IO + * + * + * @param executor the {@link Executor} to use + * @param allowShutdownOnIdle allow the {@link Worker} to shutdown when there is not {@link Channel} is registered with it + * @return worker the new {@link Worker} + */ + protected abstract E createWorker(Executor executor, boolean allowShutdownOnIdle); + + @SuppressWarnings("unchecked") + public E nextWorker() { + return (E) workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]; + } + + + public void releaseExternalResources() { + ExecutorUtil.terminate(workerExecutor); + } + +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioChannelConfig.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioChannelConfig.java index 3f963878fd..b9156a3c10 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioChannelConfig.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioChannelConfig.java @@ -1,82 +1,82 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.channel.socket.nio; - -import java.nio.ByteBuffer; -import java.nio.channels.WritableByteChannel; - -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelConfig; - -/** - * Special {@link ChannelConfig} sub-type which offers extra methods which are useful for NIO. - * - */ -public interface NioChannelConfig extends ChannelConfig { - - /** - * Returns the high water mark of the write buffer. If the number of bytes - * queued in the write buffer exceeds this value, {@link Channel#isWritable()} - * will start to return {@code true}. - */ - int getWriteBufferHighWaterMark(); - - /** - * Sets the high water mark of the write buffer. If the number of bytes - * queued in the write buffer exceeds this value, {@link Channel#isWritable()} - * will start to return {@code true}. - */ - void setWriteBufferHighWaterMark(int writeBufferHighWaterMark); - - /** - * Returns the low water mark of the write buffer. Once the number of bytes - * queued in the write buffer exceeded the - * {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then - * dropped down below this value, {@link Channel#isWritable()} will return - * {@code false} again. - */ - int getWriteBufferLowWaterMark(); - - /** - * Sets the low water mark of the write buffer. Once the number of bytes - * queued in the write buffer exceeded the - * {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then - * dropped down below this value, {@link Channel#isWritable()} will return - * {@code false} again. - */ - void setWriteBufferLowWaterMark(int writeBufferLowWaterMark); - - /** - * Returns the maximum loop count for a write operation until - * {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value. - * It is similar to what a spin lock is used for in concurrency programming. - * It improves memory utilization and write throughput depending on - * the platform that JVM runs on. The default value is {@code 16}. - */ - int getWriteSpinCount(); - - /** - * Sets the maximum loop count for a write operation until - * {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value. - * It is similar to what a spin lock is used for in concurrency programming. - * It improves memory utilization and write throughput depending on - * the platform that JVM runs on. The default value is {@code 16}. - * - * @throws IllegalArgumentException - * if the specified value is {@code 0} or less than {@code 0} - */ - void setWriteSpinCount(int writeSpinCount); -} +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.nio; + +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; + +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelConfig; + +/** + * Special {@link ChannelConfig} sub-type which offers extra methods which are useful for NIO. + * + */ +public interface NioChannelConfig extends ChannelConfig { + + /** + * Returns the high water mark of the write buffer. If the number of bytes + * queued in the write buffer exceeds this value, {@link Channel#isWritable()} + * will start to return {@code true}. + */ + int getWriteBufferHighWaterMark(); + + /** + * Sets the high water mark of the write buffer. If the number of bytes + * queued in the write buffer exceeds this value, {@link Channel#isWritable()} + * will start to return {@code true}. + */ + void setWriteBufferHighWaterMark(int writeBufferHighWaterMark); + + /** + * Returns the low water mark of the write buffer. Once the number of bytes + * queued in the write buffer exceeded the + * {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then + * dropped down below this value, {@link Channel#isWritable()} will return + * {@code false} again. + */ + int getWriteBufferLowWaterMark(); + + /** + * Sets the low water mark of the write buffer. Once the number of bytes + * queued in the write buffer exceeded the + * {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then + * dropped down below this value, {@link Channel#isWritable()} will return + * {@code false} again. + */ + void setWriteBufferLowWaterMark(int writeBufferLowWaterMark); + + /** + * Returns the maximum loop count for a write operation until + * {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value. + * It is similar to what a spin lock is used for in concurrency programming. + * It improves memory utilization and write throughput depending on + * the platform that JVM runs on. The default value is {@code 16}. + */ + int getWriteSpinCount(); + + /** + * Sets the maximum loop count for a write operation until + * {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value. + * It is similar to what a spin lock is used for in concurrency programming. + * It improves memory utilization and write throughput depending on + * the platform that JVM runs on. The default value is {@code 16}. + * + * @throws IllegalArgumentException + * if the specified value is {@code 0} or less than {@code 0} + */ + void setWriteSpinCount(int writeSpinCount); +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorkerPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorkerPool.java index 7b53e80cf4..a2e8fa8c9c 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorkerPool.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorkerPool.java @@ -1,37 +1,37 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.channel.socket.nio; - -import java.util.concurrent.Executor; - - -/** - * Default implementation which hands of {@link NioDatagramWorker}'s - * - * - */ -public class NioDatagramWorkerPool extends AbstractNioWorkerPool { - - public NioDatagramWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) { - super(executor, workerCount, allowShutdownOnIdle); - } - - @Override - protected NioDatagramWorker createWorker(Executor executor, boolean allowShutdownOnIdle) { - return new NioDatagramWorker(executor, allowShutdownOnIdle); - } - -} +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.nio; + +import java.util.concurrent.Executor; + + +/** + * Default implementation which hands of {@link NioDatagramWorker}'s + * + * + */ +public class NioDatagramWorkerPool extends AbstractNioWorkerPool { + + public NioDatagramWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) { + super(executor, workerCount, allowShutdownOnIdle); + } + + @Override + protected NioDatagramWorker createWorker(Executor executor, boolean allowShutdownOnIdle) { + return new NioDatagramWorker(executor, allowShutdownOnIdle); + } + +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorkerPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorkerPool.java index 950cad76bd..2b08cf1157 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorkerPool.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorkerPool.java @@ -1,37 +1,37 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.channel.socket.nio; - -import java.util.concurrent.Executor; - - -/** - * Default implementation which hands of {@link NioWorker}'s - * - * - */ -public class NioWorkerPool extends AbstractNioWorkerPool { - - public NioWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) { - super(executor, workerCount, allowShutdownOnIdle); - } - - @Override - protected NioWorker createWorker(Executor executor, boolean allowShutdownOnIdle) { - return new NioWorker(executor, allowShutdownOnIdle); - } - -} +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.nio; + +import java.util.concurrent.Executor; + + +/** + * Default implementation which hands of {@link NioWorker}'s + * + * + */ +public class NioWorkerPool extends AbstractNioWorkerPool { + + public NioWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) { + super(executor, workerCount, allowShutdownOnIdle); + } + + @Override + protected NioWorker createWorker(Executor executor, boolean allowShutdownOnIdle) { + return new NioWorker(executor, allowShutdownOnIdle); + } + +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/ShareableWorkerPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/ShareableWorkerPool.java index 8664731809..192a23feaa 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/ShareableWorkerPool.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/ShareableWorkerPool.java @@ -1,47 +1,47 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.channel.socket.nio; - -import org.jboss.netty.channel.socket.Worker; -import org.jboss.netty.util.ExternalResourceReleasable; - -/** - * This implementation of a {@link WorkerPool} should be used if you plan to share a {@link WorkerPool} between different Factories. You will need to call {@link #destroy()} by your own once - * you want to release any resources of it. - * - * - */ -public final class ShareableWorkerPool implements WorkerPool { - - private final WorkerPool wrapped; - - public ShareableWorkerPool(WorkerPool wrapped) { - this.wrapped = wrapped; - } - - public E nextWorker() { - return wrapped.nextWorker(); - } - - /** - * Destroy the {@link ShareableWorkerPool} and release all resources. After this is called its not usable anymore - */ - public void destroy() { - if (wrapped instanceof ExternalResourceReleasable) { - ((ExternalResourceReleasable) wrapped).releaseExternalResources(); - } - } -} +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.nio; + +import org.jboss.netty.channel.socket.Worker; +import org.jboss.netty.util.ExternalResourceReleasable; + +/** + * This implementation of a {@link WorkerPool} should be used if you plan to share a {@link WorkerPool} between different Factories. You will need to call {@link #destroy()} by your own once + * you want to release any resources of it. + * + * + */ +public final class ShareableWorkerPool implements WorkerPool { + + private final WorkerPool wrapped; + + public ShareableWorkerPool(WorkerPool wrapped) { + this.wrapped = wrapped; + } + + public E nextWorker() { + return wrapped.nextWorker(); + } + + /** + * Destroy the {@link ShareableWorkerPool} and release all resources. After this is called its not usable anymore + */ + public void destroy() { + if (wrapped instanceof ExternalResourceReleasable) { + ((ExternalResourceReleasable) wrapped).releaseExternalResources(); + } + } +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/WorkerPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/WorkerPool.java index 29053ac5d1..4b049ffdba 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/WorkerPool.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/WorkerPool.java @@ -1,35 +1,35 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package org.jboss.netty.channel.socket.nio; - -import org.jboss.netty.channel.socket.Worker; - -/** - * The {@link WorkerPool} is responsible to hand of {@link Worker}'s on demand - * - */ -public interface WorkerPool { - - /** - * Return the next {@link Worker} to use - * - * @return worker - */ - E nextWorker(); - - -} +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.jboss.netty.channel.socket.nio; + +import org.jboss.netty.channel.socket.Worker; + +/** + * The {@link WorkerPool} is responsible to hand of {@link Worker}'s on demand + * + */ +public interface WorkerPool { + + /** + * Return the next {@link Worker} to use + * + * @return worker + */ + E nextWorker(); + + +} diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioChannel.java b/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioChannel.java index d64ee29a22..56da38bf35 100644 --- a/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioChannel.java @@ -1,117 +1,117 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.channel.socket.oio; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; - -import org.jboss.netty.channel.AbstractChannel; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelSink; -import org.jboss.netty.channel.socket.Worker; - -abstract class AbstractOioChannel extends AbstractChannel { - private volatile InetSocketAddress localAddress; - volatile InetSocketAddress remoteAddress; - volatile Thread workerThread; - volatile Worker worker; - - final Object interestOpsLock = new Object(); - - AbstractOioChannel( - Channel parent, - ChannelFactory factory, - ChannelPipeline pipeline, - ChannelSink sink) { - super(parent, factory, pipeline, sink); - } - - @Override - protected boolean setClosed() { - return super.setClosed(); - } - - @Override - protected void setInterestOpsNow(int interestOps) { - super.setInterestOpsNow(interestOps); - } - - @Override - public ChannelFuture write(Object message, SocketAddress remoteAddress) { - if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) { - return super.write(message, null); - } else { - return super.write(message, remoteAddress); - } - } - - - public boolean isBound() { - return isOpen() && isSocketBound(); - } - - - public boolean isConnected() { - return isOpen() && isSocketConnected(); - } - - - - public InetSocketAddress getLocalAddress() { - InetSocketAddress localAddress = this.localAddress; - if (localAddress == null) { - try { - this.localAddress = localAddress = getLocalSocketAddress(); - } catch (Throwable t) { - // Sometimes fails on a closed socket in Windows. - return null; - } - } - return localAddress; - } - - - public InetSocketAddress getRemoteAddress() { - InetSocketAddress remoteAddress = this.remoteAddress; - if (remoteAddress == null) { - try { - this.remoteAddress = remoteAddress = - getRemoteSocketAddress(); - } catch (Throwable t) { - // Sometimes fails on a closed socket in Windows. - return null; - } - } - return remoteAddress; - } - - abstract boolean isSocketBound(); - - abstract boolean isSocketConnected(); - - abstract boolean isSocketClosed(); - - abstract InetSocketAddress getLocalSocketAddress() throws Exception; - - abstract InetSocketAddress getRemoteSocketAddress() throws Exception; - - abstract void closeSocket() throws IOException; - -} +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.oio; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +import org.jboss.netty.channel.AbstractChannel; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelSink; +import org.jboss.netty.channel.socket.Worker; + +abstract class AbstractOioChannel extends AbstractChannel { + private volatile InetSocketAddress localAddress; + volatile InetSocketAddress remoteAddress; + volatile Thread workerThread; + volatile Worker worker; + + final Object interestOpsLock = new Object(); + + AbstractOioChannel( + Channel parent, + ChannelFactory factory, + ChannelPipeline pipeline, + ChannelSink sink) { + super(parent, factory, pipeline, sink); + } + + @Override + protected boolean setClosed() { + return super.setClosed(); + } + + @Override + protected void setInterestOpsNow(int interestOps) { + super.setInterestOpsNow(interestOps); + } + + @Override + public ChannelFuture write(Object message, SocketAddress remoteAddress) { + if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) { + return super.write(message, null); + } else { + return super.write(message, remoteAddress); + } + } + + + public boolean isBound() { + return isOpen() && isSocketBound(); + } + + + public boolean isConnected() { + return isOpen() && isSocketConnected(); + } + + + + public InetSocketAddress getLocalAddress() { + InetSocketAddress localAddress = this.localAddress; + if (localAddress == null) { + try { + this.localAddress = localAddress = getLocalSocketAddress(); + } catch (Throwable t) { + // Sometimes fails on a closed socket in Windows. + return null; + } + } + return localAddress; + } + + + public InetSocketAddress getRemoteAddress() { + InetSocketAddress remoteAddress = this.remoteAddress; + if (remoteAddress == null) { + try { + this.remoteAddress = remoteAddress = + getRemoteSocketAddress(); + } catch (Throwable t) { + // Sometimes fails on a closed socket in Windows. + return null; + } + } + return remoteAddress; + } + + abstract boolean isSocketBound(); + + abstract boolean isSocketConnected(); + + abstract boolean isSocketClosed(); + + abstract InetSocketAddress getLocalSocketAddress() throws Exception; + + abstract InetSocketAddress getRemoteSocketAddress() throws Exception; + + abstract void closeSocket() throws IOException; + +} diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioChannelSink.java b/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioChannelSink.java index e74b2c5dc3..00f4107e2f 100644 --- a/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioChannelSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioChannelSink.java @@ -1,57 +1,57 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package org.jboss.netty.channel.socket.oio; - -import org.jboss.netty.channel.AbstractChannelSink; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelEvent; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.socket.ChannelRunnableWrapper; -import org.jboss.netty.channel.socket.Worker; - -public abstract class AbstractOioChannelSink extends AbstractChannelSink { - - @Override - public ChannelFuture execute(final ChannelPipeline pipeline, final Runnable task) { - Channel ch = pipeline.getChannel(); - if (ch instanceof AbstractOioChannel) { - AbstractOioChannel channel = (AbstractOioChannel) ch; - Worker worker = channel.worker; - if (worker != null) { - ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task); - channel.worker.executeInIoThread(wrapper); - return wrapper; - } - } - - return super.execute(pipeline, task); - - - } - - @Override - protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) { - Channel channel = event.getChannel(); - boolean fireLater = false; - if (channel instanceof AbstractOioChannel) { - fireLater = !AbstractOioWorker.isIoThread((AbstractOioChannel) channel); - } - return fireLater; - } - -} +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.jboss.netty.channel.socket.oio; + +import org.jboss.netty.channel.AbstractChannelSink; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelEvent; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.socket.ChannelRunnableWrapper; +import org.jboss.netty.channel.socket.Worker; + +public abstract class AbstractOioChannelSink extends AbstractChannelSink { + + @Override + public ChannelFuture execute(final ChannelPipeline pipeline, final Runnable task) { + Channel ch = pipeline.getChannel(); + if (ch instanceof AbstractOioChannel) { + AbstractOioChannel channel = (AbstractOioChannel) ch; + Worker worker = channel.worker; + if (worker != null) { + ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task); + channel.worker.executeInIoThread(wrapper); + return wrapper; + } + } + + return super.execute(pipeline, task); + + + } + + @Override + protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) { + Channel channel = event.getChannel(); + boolean fireLater = false; + if (channel instanceof AbstractOioChannel) { + fireLater = !AbstractOioWorker.isIoThread((AbstractOioChannel) channel); + } + return fireLater; + } + +} diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioWorker.java b/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioWorker.java index b2485fd6df..5cae6e4cbd 100644 --- a/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioWorker.java @@ -1,241 +1,241 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.channel.socket.oio; - -import static org.jboss.netty.channel.Channels.*; - -import java.io.IOException; -import java.util.Queue; - -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.socket.Worker; -import org.jboss.netty.util.internal.QueueFactory; - -/** - * Abstract base class for Oio-Worker implementations - * - * @param {@link AbstractOioChannel} - */ -abstract class AbstractOioWorker implements Worker { - - private final Queue eventQueue = QueueFactory.createQueue(Runnable.class); - - protected final C channel; - - /** - * If this worker has been started thread will be a reference to the thread - * used when starting. i.e. the current thread when the run method is executed. - */ - protected volatile Thread thread; - - private volatile boolean done; - - public AbstractOioWorker(C channel) { - this.channel = channel; - channel.worker = this; - } - - - public void run() { - thread = channel.workerThread = Thread.currentThread(); - - while (channel.isOpen()) { - synchronized (channel.interestOpsLock) { - while (!channel.isReadable()) { - try { - // notify() is not called at all. - // close() and setInterestOps() calls Thread.interrupt() - channel.interestOpsLock.wait(); - } catch (InterruptedException e) { - if (!channel.isOpen()) { - break; - } - } - } - } - - boolean cont = false; - try { - cont = process(); - } catch (Throwable t) { - if (!channel.isSocketClosed()) { - fireExceptionCaught(channel, t); - } - } finally { - processEventQueue(); - - if (!cont) { - break; - } - } - } - - // Setting the workerThread to null will prevent any channel - // operations from interrupting this thread from now on. - channel.workerThread = null; - - // Clean up. - close(channel, succeededFuture(channel), true); - - // Mark the worker event loop as done so we know that we need to run tasks directly and not queue them - // See #287 - done = true; - - // just to make we don't have something left - processEventQueue(); - - } - - static boolean isIoThread(AbstractOioChannel channel) { - return Thread.currentThread() == channel.workerThread; - } - - - public void executeInIoThread(Runnable task) { - // check if the current thread is the worker thread - // - // Also check if the event loop of the worker is complete. If so we need to run the task now. - // See #287 - if (Thread.currentThread() == thread || done) { - task.run(); - } else { - boolean added = eventQueue.offer(task); - - if (added) { - // as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest - } - } - } - - private void processEventQueue() { - for (;;) { - final Runnable task = eventQueue.poll(); - if (task == null) { - break; - } - task.run(); - } - } - - - /** - * Process the incoming messages and also is responsible for call {@link Channels#fireMessageReceived(Channel, Object)} once a message - * was processed without errors. - * - * @return continue returns true as long as this worker should continue to try processing incoming messages - * @throws IOException - */ - abstract boolean process() throws IOException; - - static void setInterestOps( - AbstractOioChannel channel, ChannelFuture future, int interestOps) { - boolean iothread = isIoThread(channel); - - // Override OP_WRITE flag - a user cannot change this flag. - interestOps &= ~Channel.OP_WRITE; - interestOps |= channel.getInterestOps() & Channel.OP_WRITE; - - boolean changed = false; - try { - if (channel.getInterestOps() != interestOps) { - if ((interestOps & Channel.OP_READ) != 0) { - channel.setInterestOpsNow(Channel.OP_READ); - } else { - channel.setInterestOpsNow(Channel.OP_NONE); - } - changed = true; - } - - future.setSuccess(); - if (changed) { - synchronized (channel.interestOpsLock) { - channel.setInterestOpsNow(interestOps); - - // Notify the worker so it stops or continues reading. - Thread currentThread = Thread.currentThread(); - Thread workerThread = channel.workerThread; - if (workerThread != null && currentThread != workerThread) { - workerThread.interrupt(); - } - } - if (iothread) { - fireChannelInterestChanged(channel); - } else { - fireChannelInterestChangedLater(channel); - } - } - } catch (Throwable t) { - future.setFailure(t); - if (iothread) { - fireExceptionCaught(channel, t); - } else { - fireExceptionCaughtLater(channel, t); - } - } - } - - static void close(AbstractOioChannel channel, ChannelFuture future) { - close(channel, future, isIoThread(channel)); - } - - private static void close(AbstractOioChannel channel, ChannelFuture future, boolean iothread) { - boolean connected = channel.isConnected(); - boolean bound = channel.isBound(); - - try { - channel.closeSocket(); - if (channel.setClosed()) { - future.setSuccess(); - if (connected) { - // Notify the worker so it stops reading. - Thread currentThread = Thread.currentThread(); - Thread workerThread = channel.workerThread; - if (workerThread != null && currentThread != workerThread) { - workerThread.interrupt(); - } - if (iothread) { - fireChannelDisconnected(channel); - } else { - fireChannelDisconnectedLater(channel); - } - } - if (bound) { - if (iothread) { - fireChannelUnbound(channel); - } else { - fireChannelUnboundLater(channel); - } - } - if (iothread) { - fireChannelClosed(channel); - } else { - fireChannelClosedLater(channel); - } - } else { - future.setSuccess(); - } - } catch (Throwable t) { - future.setFailure(t); - if (iothread) { - fireExceptionCaught(channel, t); - } else { - fireExceptionCaughtLater(channel, t); - } - } - } -} +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.oio; + +import static org.jboss.netty.channel.Channels.*; + +import java.io.IOException; +import java.util.Queue; + +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.socket.Worker; +import org.jboss.netty.util.internal.QueueFactory; + +/** + * Abstract base class for Oio-Worker implementations + * + * @param {@link AbstractOioChannel} + */ +abstract class AbstractOioWorker implements Worker { + + private final Queue eventQueue = QueueFactory.createQueue(Runnable.class); + + protected final C channel; + + /** + * If this worker has been started thread will be a reference to the thread + * used when starting. i.e. the current thread when the run method is executed. + */ + protected volatile Thread thread; + + private volatile boolean done; + + public AbstractOioWorker(C channel) { + this.channel = channel; + channel.worker = this; + } + + + public void run() { + thread = channel.workerThread = Thread.currentThread(); + + while (channel.isOpen()) { + synchronized (channel.interestOpsLock) { + while (!channel.isReadable()) { + try { + // notify() is not called at all. + // close() and setInterestOps() calls Thread.interrupt() + channel.interestOpsLock.wait(); + } catch (InterruptedException e) { + if (!channel.isOpen()) { + break; + } + } + } + } + + boolean cont = false; + try { + cont = process(); + } catch (Throwable t) { + if (!channel.isSocketClosed()) { + fireExceptionCaught(channel, t); + } + } finally { + processEventQueue(); + + if (!cont) { + break; + } + } + } + + // Setting the workerThread to null will prevent any channel + // operations from interrupting this thread from now on. + channel.workerThread = null; + + // Clean up. + close(channel, succeededFuture(channel), true); + + // Mark the worker event loop as done so we know that we need to run tasks directly and not queue them + // See #287 + done = true; + + // just to make we don't have something left + processEventQueue(); + + } + + static boolean isIoThread(AbstractOioChannel channel) { + return Thread.currentThread() == channel.workerThread; + } + + + public void executeInIoThread(Runnable task) { + // check if the current thread is the worker thread + // + // Also check if the event loop of the worker is complete. If so we need to run the task now. + // See #287 + if (Thread.currentThread() == thread || done) { + task.run(); + } else { + boolean added = eventQueue.offer(task); + + if (added) { + // as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest + } + } + } + + private void processEventQueue() { + for (;;) { + final Runnable task = eventQueue.poll(); + if (task == null) { + break; + } + task.run(); + } + } + + + /** + * Process the incoming messages and also is responsible for call {@link Channels#fireMessageReceived(Channel, Object)} once a message + * was processed without errors. + * + * @return continue returns true as long as this worker should continue to try processing incoming messages + * @throws IOException + */ + abstract boolean process() throws IOException; + + static void setInterestOps( + AbstractOioChannel channel, ChannelFuture future, int interestOps) { + boolean iothread = isIoThread(channel); + + // Override OP_WRITE flag - a user cannot change this flag. + interestOps &= ~Channel.OP_WRITE; + interestOps |= channel.getInterestOps() & Channel.OP_WRITE; + + boolean changed = false; + try { + if (channel.getInterestOps() != interestOps) { + if ((interestOps & Channel.OP_READ) != 0) { + channel.setInterestOpsNow(Channel.OP_READ); + } else { + channel.setInterestOpsNow(Channel.OP_NONE); + } + changed = true; + } + + future.setSuccess(); + if (changed) { + synchronized (channel.interestOpsLock) { + channel.setInterestOpsNow(interestOps); + + // Notify the worker so it stops or continues reading. + Thread currentThread = Thread.currentThread(); + Thread workerThread = channel.workerThread; + if (workerThread != null && currentThread != workerThread) { + workerThread.interrupt(); + } + } + if (iothread) { + fireChannelInterestChanged(channel); + } else { + fireChannelInterestChangedLater(channel); + } + } + } catch (Throwable t) { + future.setFailure(t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } + } + } + + static void close(AbstractOioChannel channel, ChannelFuture future) { + close(channel, future, isIoThread(channel)); + } + + private static void close(AbstractOioChannel channel, ChannelFuture future, boolean iothread) { + boolean connected = channel.isConnected(); + boolean bound = channel.isBound(); + + try { + channel.closeSocket(); + if (channel.setClosed()) { + future.setSuccess(); + if (connected) { + // Notify the worker so it stops reading. + Thread currentThread = Thread.currentThread(); + Thread workerThread = channel.workerThread; + if (workerThread != null && currentThread != workerThread) { + workerThread.interrupt(); + } + if (iothread) { + fireChannelDisconnected(channel); + } else { + fireChannelDisconnectedLater(channel); + } + } + if (bound) { + if (iothread) { + fireChannelUnbound(channel); + } else { + fireChannelUnboundLater(channel); + } + } + if (iothread) { + fireChannelClosed(channel); + } else { + fireChannelClosedLater(channel); + } + } else { + future.setSuccess(); + } + } catch (Throwable t) { + future.setFailure(t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } + } + } +} diff --git a/src/main/java/org/jboss/netty/handler/execution/ChainedExecutor.java b/src/main/java/org/jboss/netty/handler/execution/ChainedExecutor.java index a50a752a89..4463e00fbb 100644 --- a/src/main/java/org/jboss/netty/handler/execution/ChainedExecutor.java +++ b/src/main/java/org/jboss/netty/handler/execution/ChainedExecutor.java @@ -1,83 +1,83 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.handler.execution; - - -import java.util.concurrent.Executor; - -import org.jboss.netty.util.ExternalResourceReleasable; -import org.jboss.netty.util.internal.ExecutorUtil; - -/** - * A special {@link Executor} which allows to chain a series of - * {@link Executor}s and {@link ChannelEventRunnableFilter}. - */ -public class ChainedExecutor implements Executor, ExternalResourceReleasable { - - private final Executor cur; - private final Executor next; - private final ChannelEventRunnableFilter filter; - - /** - * Create a new {@link ChainedExecutor} which will used the given {@link ChannelEventRunnableFilter} to see if the {@link #cur} {@link Executor} should get used. - * Otherwise it will pass the work to the {@link #next} {@link Executor} - * - * @param filter the {@link ChannelEventRunnableFilter} which will be used to check if the {@link ChannelEventRunnable} should be passed to the cur or next {@link Executor} - * @param cur the {@link Executor} to use if the {@link ChannelEventRunnableFilter} match - * @param next the {@link Executor} to use if the {@link ChannelEventRunnableFilter} does not match - */ - public ChainedExecutor(ChannelEventRunnableFilter filter, Executor cur, Executor next) { - if (filter == null) { - throw new NullPointerException("filter"); - } - if (cur == null) { - throw new NullPointerException("cur"); - } - if (next == null) { - throw new NullPointerException("next"); - } - - this.filter = filter; - this.cur = cur; - this.next = next; - } - - /** - * Execute the passed {@link ChannelEventRunnable} with the current {@link Executor} if the {@link ChannelEventRunnableFilter} match. - * Otherwise pass it to the next {@link Executor} in the chain. - */ - public void execute(Runnable command) { - assert command instanceof ChannelEventRunnable; - if (filter.filter((ChannelEventRunnable) command)) { - cur.execute(command); - } else { - next.execute(command); - } - } - - public void releaseExternalResources() { - ExecutorUtil.terminate(cur, next); - releaseExternal(cur); - releaseExternal(next); - } - - - private static void releaseExternal(Executor executor) { - if (executor instanceof ExternalResourceReleasable) { - ((ExternalResourceReleasable) executor).releaseExternalResources(); - } - } -} +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.handler.execution; + + +import java.util.concurrent.Executor; + +import org.jboss.netty.util.ExternalResourceReleasable; +import org.jboss.netty.util.internal.ExecutorUtil; + +/** + * A special {@link Executor} which allows to chain a series of + * {@link Executor}s and {@link ChannelEventRunnableFilter}. + */ +public class ChainedExecutor implements Executor, ExternalResourceReleasable { + + private final Executor cur; + private final Executor next; + private final ChannelEventRunnableFilter filter; + + /** + * Create a new {@link ChainedExecutor} which will used the given {@link ChannelEventRunnableFilter} to see if the {@link #cur} {@link Executor} should get used. + * Otherwise it will pass the work to the {@link #next} {@link Executor} + * + * @param filter the {@link ChannelEventRunnableFilter} which will be used to check if the {@link ChannelEventRunnable} should be passed to the cur or next {@link Executor} + * @param cur the {@link Executor} to use if the {@link ChannelEventRunnableFilter} match + * @param next the {@link Executor} to use if the {@link ChannelEventRunnableFilter} does not match + */ + public ChainedExecutor(ChannelEventRunnableFilter filter, Executor cur, Executor next) { + if (filter == null) { + throw new NullPointerException("filter"); + } + if (cur == null) { + throw new NullPointerException("cur"); + } + if (next == null) { + throw new NullPointerException("next"); + } + + this.filter = filter; + this.cur = cur; + this.next = next; + } + + /** + * Execute the passed {@link ChannelEventRunnable} with the current {@link Executor} if the {@link ChannelEventRunnableFilter} match. + * Otherwise pass it to the next {@link Executor} in the chain. + */ + public void execute(Runnable command) { + assert command instanceof ChannelEventRunnable; + if (filter.filter((ChannelEventRunnable) command)) { + cur.execute(command); + } else { + next.execute(command); + } + } + + public void releaseExternalResources() { + ExecutorUtil.terminate(cur, next); + releaseExternal(cur); + releaseExternal(next); + } + + + private static void releaseExternal(Executor executor) { + if (executor instanceof ExternalResourceReleasable) { + ((ExternalResourceReleasable) executor).releaseExternalResources(); + } + } +} diff --git a/src/main/java/org/jboss/netty/handler/execution/ChannelDownstreamEventRunnable.java b/src/main/java/org/jboss/netty/handler/execution/ChannelDownstreamEventRunnable.java index 59515c070f..4817482c4e 100644 --- a/src/main/java/org/jboss/netty/handler/execution/ChannelDownstreamEventRunnable.java +++ b/src/main/java/org/jboss/netty/handler/execution/ChannelDownstreamEventRunnable.java @@ -35,6 +35,6 @@ public class ChannelDownstreamEventRunnable extends ChannelEventRunnable { */ @Override protected void doRun() { - ctx.sendDownstream(e); + ctx.sendDownstream(e); } } diff --git a/src/main/java/org/jboss/netty/handler/execution/ChannelDownstreamEventRunnableFilter.java b/src/main/java/org/jboss/netty/handler/execution/ChannelDownstreamEventRunnableFilter.java index d0bf193334..0b747682af 100644 --- a/src/main/java/org/jboss/netty/handler/execution/ChannelDownstreamEventRunnableFilter.java +++ b/src/main/java/org/jboss/netty/handler/execution/ChannelDownstreamEventRunnableFilter.java @@ -1,27 +1,27 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.handler.execution; - -/** - * {@link ChannelEventRunnableFilter} implementation which matches {@link ChannelDownstreamEventRunnable} - * - */ -public class ChannelDownstreamEventRunnableFilter implements ChannelEventRunnableFilter { - - public boolean filter(ChannelEventRunnable event) { - return event instanceof ChannelDownstreamEventRunnable; - } -} +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.handler.execution; + +/** + * {@link ChannelEventRunnableFilter} implementation which matches {@link ChannelDownstreamEventRunnable} + * + */ +public class ChannelDownstreamEventRunnableFilter implements ChannelEventRunnableFilter { + + public boolean filter(ChannelEventRunnable event) { + return event instanceof ChannelDownstreamEventRunnable; + } +} diff --git a/src/main/java/org/jboss/netty/handler/execution/ChannelEventRunnable.java b/src/main/java/org/jboss/netty/handler/execution/ChannelEventRunnable.java index 84235f631c..d122bae385 100644 --- a/src/main/java/org/jboss/netty/handler/execution/ChannelEventRunnable.java +++ b/src/main/java/org/jboss/netty/handler/execution/ChannelEventRunnable.java @@ -1,72 +1,72 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.handler.execution; - -import java.util.concurrent.Executor; - -import org.jboss.netty.channel.ChannelEvent; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.util.EstimatableObjectWrapper; -import org.jboss.netty.util.internal.DeadLockProofWorker; - -public abstract class ChannelEventRunnable implements Runnable, EstimatableObjectWrapper { - - protected final ChannelHandlerContext ctx; - protected final ChannelEvent e; - int estimatedSize; - private Executor executor; - - /** - * Creates a {@link Runnable} which sends the specified {@link ChannelEvent} - * upstream via the specified {@link ChannelHandlerContext}. - */ - public ChannelEventRunnable(ChannelHandlerContext ctx, ChannelEvent e, Executor executor) { - this.ctx = ctx; - this.e = e; - this.executor = executor; - } - - /** - * Returns the {@link ChannelHandlerContext} which will be used to - * send the {@link ChannelEvent} upstream. - */ - public ChannelHandlerContext getContext() { - return ctx; - } - - /** - * Returns the {@link ChannelEvent} which will be sent upstream. - */ - public ChannelEvent getEvent() { - return e; - } - - public Object unwrap() { - return e; - } - - public final void run() { - try { - DeadLockProofWorker.PARENT.set(executor); - doRun(); - } finally { - DeadLockProofWorker.PARENT.remove(); - } - - } - - protected abstract void doRun(); -} +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.handler.execution; + +import java.util.concurrent.Executor; + +import org.jboss.netty.channel.ChannelEvent; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.util.EstimatableObjectWrapper; +import org.jboss.netty.util.internal.DeadLockProofWorker; + +public abstract class ChannelEventRunnable implements Runnable, EstimatableObjectWrapper { + + protected final ChannelHandlerContext ctx; + protected final ChannelEvent e; + int estimatedSize; + private final Executor executor; + + /** + * Creates a {@link Runnable} which sends the specified {@link ChannelEvent} + * upstream via the specified {@link ChannelHandlerContext}. + */ + public ChannelEventRunnable(ChannelHandlerContext ctx, ChannelEvent e, Executor executor) { + this.ctx = ctx; + this.e = e; + this.executor = executor; + } + + /** + * Returns the {@link ChannelHandlerContext} which will be used to + * send the {@link ChannelEvent} upstream. + */ + public ChannelHandlerContext getContext() { + return ctx; + } + + /** + * Returns the {@link ChannelEvent} which will be sent upstream. + */ + public ChannelEvent getEvent() { + return e; + } + + public Object unwrap() { + return e; + } + + public final void run() { + try { + DeadLockProofWorker.PARENT.set(executor); + doRun(); + } finally { + DeadLockProofWorker.PARENT.remove(); + } + + } + + protected abstract void doRun(); +} diff --git a/src/main/java/org/jboss/netty/handler/execution/ChannelEventRunnableFilter.java b/src/main/java/org/jboss/netty/handler/execution/ChannelEventRunnableFilter.java index 8eff1851fe..6b13693edf 100644 --- a/src/main/java/org/jboss/netty/handler/execution/ChannelEventRunnableFilter.java +++ b/src/main/java/org/jboss/netty/handler/execution/ChannelEventRunnableFilter.java @@ -1,27 +1,27 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.handler.execution; - -import java.util.concurrent.Executor; - -public interface ChannelEventRunnableFilter { - - /** - * Return true if the {@link ChannelEventRunnable} should get handled by the {@link Executor} - * - */ - boolean filter(ChannelEventRunnable event); -} +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.handler.execution; + +import java.util.concurrent.Executor; + +public interface ChannelEventRunnableFilter { + + /** + * Return true if the {@link ChannelEventRunnable} should get handled by the {@link Executor} + * + */ + boolean filter(ChannelEventRunnable event); +} diff --git a/src/main/java/org/jboss/netty/handler/execution/ChannelUpstreamEventRunnableFilter.java b/src/main/java/org/jboss/netty/handler/execution/ChannelUpstreamEventRunnableFilter.java index 4d1f9bd0c3..49655a44c6 100644 --- a/src/main/java/org/jboss/netty/handler/execution/ChannelUpstreamEventRunnableFilter.java +++ b/src/main/java/org/jboss/netty/handler/execution/ChannelUpstreamEventRunnableFilter.java @@ -1,25 +1,25 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.handler.execution; - -/** - * {@link ChannelEventRunnableFilter} which matches {@link ChannelDownstreamEventRunnable} - */ -public class ChannelUpstreamEventRunnableFilter implements ChannelEventRunnableFilter { - public boolean filter(ChannelEventRunnable event) { - return event instanceof ChannelDownstreamEventRunnable; - } -} +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.handler.execution; + +/** + * {@link ChannelEventRunnableFilter} which matches {@link ChannelDownstreamEventRunnable} + */ +public class ChannelUpstreamEventRunnableFilter implements ChannelEventRunnableFilter { + public boolean filter(ChannelEventRunnable event) { + return event instanceof ChannelDownstreamEventRunnable; + } +} diff --git a/src/main/java/org/jboss/netty/util/internal/DetectionUtil.java b/src/main/java/org/jboss/netty/util/internal/DetectionUtil.java index 8529fcc5e1..0d6e812d6f 100644 --- a/src/main/java/org/jboss/netty/util/internal/DetectionUtil.java +++ b/src/main/java/org/jboss/netty/util/internal/DetectionUtil.java @@ -1,117 +1,117 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.util.internal; - -import java.security.AccessController; -import java.security.PrivilegedActionException; -import java.security.PrivilegedExceptionAction; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.zip.Deflater; - - -/** - * Utility that detects various properties specific to the current runtime - * environment, such as Java version and the availability of the - * {@code sun.misc.Unsafe} object. - * - *
- * You can disable the use of {@code sun.misc.Unsafe} if you specify - * the System property org.jboss.netty.tryUnsafe with - * value of false. Default is true. - */ -public final class DetectionUtil { - - private static final int JAVA_VERSION = javaVersion0(); - private static final boolean HAS_UNSAFE = hasUnsafe(AtomicInteger.class.getClassLoader()); - private static final boolean IS_WINDOWS; - static { - String os = System.getProperty("os.name").toLowerCase(); - // windows - IS_WINDOWS = os.indexOf("win") >= 0; - } - - /** - * Return true if the JVM is running on Windows - * - */ - public static boolean isWindows() { - return IS_WINDOWS; - } - - public static boolean hasUnsafe() { - return HAS_UNSAFE; - } - - public static int javaVersion() { - return JAVA_VERSION; - } - - private static boolean hasUnsafe(ClassLoader loader) { - boolean useUnsafe = Boolean.valueOf(SystemPropertyUtil.get("org.jboss.netty.tryUnsafe", "true")); - if (!useUnsafe) { - return false; - } - - try { - Class unsafeClazz = Class.forName("sun.misc.Unsafe", true, loader); - return hasUnsafeField(unsafeClazz); - } catch (Exception e) { - // Ignore - } - return false; - } - - private static boolean hasUnsafeField(final Class unsafeClass) throws PrivilegedActionException { - return AccessController.doPrivileged(new PrivilegedExceptionAction() { - public Boolean run() throws Exception { - unsafeClass.getDeclaredField("theUnsafe"); - return true; - } - }); - } - - private static int javaVersion0() { - try { - // Check if its android, if so handle it the same way as java6. - // - // See https://github.com/netty/netty/issues/282 - Class.forName("android.app.Application"); - return 6; - } catch (ClassNotFoundException e) { - //Ignore - } - - try { - Deflater.class.getDeclaredField("SYNC_FLUSH"); - return 7; - } catch (Exception e) { - // Ignore - } - - try { - Double.class.getDeclaredField("MIN_NORMAL"); - return 6; - } catch (Exception e) { - // Ignore - } - - return 5; - } - - private DetectionUtil() { - // only static method supported - } -} +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.util.internal; + +import java.security.AccessController; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.zip.Deflater; + + +/** + * Utility that detects various properties specific to the current runtime + * environment, such as Java version and the availability of the + * {@code sun.misc.Unsafe} object. + * + *
+ * You can disable the use of {@code sun.misc.Unsafe} if you specify + * the System property org.jboss.netty.tryUnsafe with + * value of false. Default is true. + */ +public final class DetectionUtil { + + private static final int JAVA_VERSION = javaVersion0(); + private static final boolean HAS_UNSAFE = hasUnsafe(AtomicInteger.class.getClassLoader()); + private static final boolean IS_WINDOWS; + static { + String os = System.getProperty("os.name").toLowerCase(); + // windows + IS_WINDOWS = os.indexOf("win") >= 0; + } + + /** + * Return true if the JVM is running on Windows + * + */ + public static boolean isWindows() { + return IS_WINDOWS; + } + + public static boolean hasUnsafe() { + return HAS_UNSAFE; + } + + public static int javaVersion() { + return JAVA_VERSION; + } + + private static boolean hasUnsafe(ClassLoader loader) { + boolean useUnsafe = Boolean.valueOf(SystemPropertyUtil.get("org.jboss.netty.tryUnsafe", "true")); + if (!useUnsafe) { + return false; + } + + try { + Class unsafeClazz = Class.forName("sun.misc.Unsafe", true, loader); + return hasUnsafeField(unsafeClazz); + } catch (Exception e) { + // Ignore + } + return false; + } + + private static boolean hasUnsafeField(final Class unsafeClass) throws PrivilegedActionException { + return AccessController.doPrivileged(new PrivilegedExceptionAction() { + public Boolean run() throws Exception { + unsafeClass.getDeclaredField("theUnsafe"); + return true; + } + }); + } + + private static int javaVersion0() { + try { + // Check if its android, if so handle it the same way as java6. + // + // See https://github.com/netty/netty/issues/282 + Class.forName("android.app.Application"); + return 6; + } catch (ClassNotFoundException e) { + //Ignore + } + + try { + Deflater.class.getDeclaredField("SYNC_FLUSH"); + return 7; + } catch (Exception e) { + // Ignore + } + + try { + Double.class.getDeclaredField("MIN_NORMAL"); + return 6; + } catch (Exception e) { + // Ignore + } + + return 5; + } + + private DetectionUtil() { + // only static method supported + } +} diff --git a/src/main/java/org/jboss/netty/util/internal/QueueFactory.java b/src/main/java/org/jboss/netty/util/internal/QueueFactory.java index 8e039620c3..b08029a71b 100644 --- a/src/main/java/org/jboss/netty/util/internal/QueueFactory.java +++ b/src/main/java/org/jboss/netty/util/internal/QueueFactory.java @@ -1,100 +1,100 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.util.internal; - -import java.util.Collection; -import java.util.concurrent.BlockingQueue; - -import org.jboss.netty.logging.InternalLogger; -import org.jboss.netty.logging.InternalLoggerFactory; - -/** - * This factory should be used to create the "optimal" {@link BlockingQueue} - * instance for the running JVM. - */ -public final class QueueFactory { - - private static final boolean useUnsafe = DetectionUtil.hasUnsafe(); - private static final InternalLogger LOGGER = InternalLoggerFactory.getInstance(QueueFactory.class); - - private QueueFactory() { - // only use static methods! - } - - - /** - * Create a new unbound {@link BlockingQueue} - * - * @param itemClass the {@link Class} type which will be used as {@link BlockingQueue} items - * @return queue the {@link BlockingQueue} implementation - */ - public static BlockingQueue createQueue(Class itemClass) { - // if we run in java >=7 its the best to just use the LinkedTransferQueue which - // comes with java bundled. See #273 - if (DetectionUtil.javaVersion() >= 7) { - return new java.util.concurrent.LinkedTransferQueue(); - } - - try { - if (useUnsafe) { - return new LinkedTransferQueue(); - } - } catch (Throwable t) { - // For whatever reason an exception was thrown while loading the LinkedTransferQueue - // - // This mostly happens because of a custom classloader or security policy that did not allow us to access the - // com.sun.Unmisc class. So just log it and fallback to the old LegacyLinkedTransferQueue that works in all cases - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Unable to instance LinkedTransferQueue, fallback to LegacyLinkedTransferQueue", t); - } - } - - return new LegacyLinkedTransferQueue(); - - } - - /** - * Create a new unbound {@link BlockingQueue} - * - * @param collection the collection which should get copied to the newly created {@link BlockingQueue} - * @param itemClass the {@link Class} type which will be used as {@link BlockingQueue} items - * @return queue the {@link BlockingQueue} implementation - */ - public static BlockingQueue createQueue(Collection collection, Class itemClass) { - // if we run in java >=7 its the best to just use the LinkedTransferQueue which - // comes with java bundled. See #273 - if (DetectionUtil.javaVersion() >= 7) { - return new java.util.concurrent.LinkedTransferQueue(); - } - - try { - if (useUnsafe) { - return new LinkedTransferQueue(collection); - } - } catch (Throwable t) { - // For whatever reason an exception was thrown while loading the LinkedTransferQueue - // - // This mostly happens because of a custom classloader or security policy that did not allow us to access the - // com.sun.Unmisc class. So just log it and fallback to the old LegacyLinkedTransferQueue that works in all cases - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Unable to instance LinkedTransferQueue, fallback to LegacyLinkedTransferQueue", t); - } - } - - return new LegacyLinkedTransferQueue(collection); - - } -} +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.util.internal; + +import java.util.Collection; +import java.util.concurrent.BlockingQueue; + +import org.jboss.netty.logging.InternalLogger; +import org.jboss.netty.logging.InternalLoggerFactory; + +/** + * This factory should be used to create the "optimal" {@link BlockingQueue} + * instance for the running JVM. + */ +public final class QueueFactory { + + private static final boolean useUnsafe = DetectionUtil.hasUnsafe(); + private static final InternalLogger LOGGER = InternalLoggerFactory.getInstance(QueueFactory.class); + + private QueueFactory() { + // only use static methods! + } + + + /** + * Create a new unbound {@link BlockingQueue} + * + * @param itemClass the {@link Class} type which will be used as {@link BlockingQueue} items + * @return queue the {@link BlockingQueue} implementation + */ + public static BlockingQueue createQueue(Class itemClass) { + // if we run in java >=7 its the best to just use the LinkedTransferQueue which + // comes with java bundled. See #273 + if (DetectionUtil.javaVersion() >= 7) { + return new java.util.concurrent.LinkedTransferQueue(); + } + + try { + if (useUnsafe) { + return new LinkedTransferQueue(); + } + } catch (Throwable t) { + // For whatever reason an exception was thrown while loading the LinkedTransferQueue + // + // This mostly happens because of a custom classloader or security policy that did not allow us to access the + // com.sun.Unmisc class. So just log it and fallback to the old LegacyLinkedTransferQueue that works in all cases + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Unable to instance LinkedTransferQueue, fallback to LegacyLinkedTransferQueue", t); + } + } + + return new LegacyLinkedTransferQueue(); + + } + + /** + * Create a new unbound {@link BlockingQueue} + * + * @param collection the collection which should get copied to the newly created {@link BlockingQueue} + * @param itemClass the {@link Class} type which will be used as {@link BlockingQueue} items + * @return queue the {@link BlockingQueue} implementation + */ + public static BlockingQueue createQueue(Collection collection, Class itemClass) { + // if we run in java >=7 its the best to just use the LinkedTransferQueue which + // comes with java bundled. See #273 + if (DetectionUtil.javaVersion() >= 7) { + return new java.util.concurrent.LinkedTransferQueue(); + } + + try { + if (useUnsafe) { + return new LinkedTransferQueue(collection); + } + } catch (Throwable t) { + // For whatever reason an exception was thrown while loading the LinkedTransferQueue + // + // This mostly happens because of a custom classloader or security policy that did not allow us to access the + // com.sun.Unmisc class. So just log it and fallback to the old LegacyLinkedTransferQueue that works in all cases + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Unable to instance LinkedTransferQueue, fallback to LegacyLinkedTransferQueue", t); + } + } + + return new LegacyLinkedTransferQueue(collection); + + } +}