From 812a9026b88244ffb6f8c24ea827cc4454973f55 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sat, 18 Feb 2012 23:02:56 +0100 Subject: [PATCH] Start to refactor nio transport to share more code. See #186 --- .../socket/nio/AbstractNioChannel.java | 394 ++++++++++ .../channel/socket/nio/AbstractNioWorker.java | 694 +++++++++++++++++ .../socket/nio/AbstractWriteRequestQueue.java | 149 ---- .../channel/socket/nio/NioChannelConfig.java | 2 +- .../nio/NioClientSocketPipelineSink.java | 10 +- .../socket/nio/NioDatagramChannel.java | 275 +------ .../channel/socket/nio/NioDatagramWorker.java | 705 +----------------- .../nio/NioServerSocketPipelineSink.java | 2 +- .../channel/socket/nio/NioSocketChannel.java | 192 +---- .../netty/channel/socket/nio/NioWorker.java | 624 +--------------- .../socket/oio/AbstractOioChannel.java | 2 +- .../socket/oio/OioDatagramChannel.java | 4 +- 12 files changed, 1181 insertions(+), 1872 deletions(-) create mode 100644 transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java create mode 100644 transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java delete mode 100644 transport/src/main/java/io/netty/channel/socket/nio/AbstractWriteRequestQueue.java diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java new file mode 100644 index 0000000000..501af6473d --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java @@ -0,0 +1,394 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.nio; + +import static io.netty.channel.Channels.fireChannelInterestChanged; +import io.netty.buffer.ChannelBuffer; +import io.netty.channel.AbstractChannel; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFactory; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelSink; +import io.netty.channel.MessageEvent; +import io.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; +import io.netty.util.internal.QueueFactory; +import io.netty.util.internal.ThreadLocalBoolean; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.SelectableChannel; +import java.nio.channels.WritableByteChannel; +import java.util.Collection; +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +abstract class AbstractNioChannel extends AbstractChannel { + + /** + * The {@link AbstractNioWorker}. + */ + final AbstractNioWorker worker; + + /** + * Monitor object to synchronize access to InterestedOps. + */ + final Object interestOpsLock = new Object(); + + /** + * Monitor object for synchronizing access to the {@link WriteRequestQueue}. + */ + final Object writeLock = new Object(); + + /** + * WriteTask that performs write operations. + */ + final Runnable writeTask = new WriteTask(); + + /** + * Indicates if there is a {@link WriteTask} in the task queue. + */ + final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean(); + + /** + * Queue of write {@link MessageEvent}s. + */ + final Queue writeBufferQueue = new WriteRequestQueue(); + + /** + * Keeps track of the number of bytes that the {@link WriteRequestQueue} currently + * contains. + */ + final AtomicInteger writeBufferSize = new AtomicInteger(); + + /** + * Keeps track of the highWaterMark. + */ + final AtomicInteger highWaterMarkCounter = new AtomicInteger(); + + /** + * The current write {@link MessageEvent} + */ + MessageEvent currentWriteEvent; + SendBuffer currentWriteBuffer; + + /** + * Boolean that indicates that write operation is in progress. + */ + boolean inWriteNowLoop; + boolean writeSuspended; + + + private volatile InetSocketAddress localAddress; + volatile InetSocketAddress remoteAddress; + + final C channel; + + protected AbstractNioChannel(Integer id, Channel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, AbstractNioWorker worker, C ch) { + super(id, parent, factory, pipeline, sink); + this.worker = worker; + this.channel = ch; + } + + protected AbstractNioChannel( + Channel parent, ChannelFactory factory, + ChannelPipeline pipeline, ChannelSink sink, AbstractNioWorker worker, C ch) { + super(parent, factory, pipeline, sink); + this.worker = worker; + this.channel = ch; + } + + @Override + public InetSocketAddress getLocalAddress() { + InetSocketAddress localAddress = this.localAddress; + if (localAddress == null) { + try { + this.localAddress = localAddress = + (InetSocketAddress) getLocalSocketAddress(); + } catch (Throwable t) { + // Sometimes fails on a closed socket in Windows. + return null; + } + } + return localAddress; + } + + @Override + public InetSocketAddress getRemoteAddress() { + InetSocketAddress remoteAddress = this.remoteAddress; + if (remoteAddress == null) { + try { + this.remoteAddress = remoteAddress = + (InetSocketAddress) getRemoteSocketAddress(); + } catch (Throwable t) { + // Sometimes fails on a closed socket in Windows. + return null; + } + } + return remoteAddress; + } + + int getRawInterestOps() { + return super.getInterestOps(); + } + + void setRawInterestOpsNow(int interestOps) { + super.setInterestOpsNow(interestOps); + } + + @Override + public ChannelFuture write(Object message, SocketAddress remoteAddress) { + if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) { + return super.write(message, null); + } else { + return getUnsupportedOperationFuture(); + } + } + + @Override + public int getInterestOps() { + if (!isOpen()) { + return Channel.OP_WRITE; + } + + int interestOps = getRawInterestOps(); + int writeBufferSize = this.writeBufferSize.get(); + if (writeBufferSize != 0) { + if (highWaterMarkCounter.get() > 0) { + int lowWaterMark = ((NioChannelConfig) getConfig()).getWriteBufferLowWaterMark(); + if (writeBufferSize >= lowWaterMark) { + interestOps |= Channel.OP_WRITE; + } else { + interestOps &= ~Channel.OP_WRITE; + } + } else { + int highWaterMark = ((NioChannelConfig) getConfig()).getWriteBufferHighWaterMark(); + if (writeBufferSize >= highWaterMark) { + interestOps |= Channel.OP_WRITE; + } else { + interestOps &= ~Channel.OP_WRITE; + } + } + } else { + interestOps &= ~Channel.OP_WRITE; + } + + return interestOps; + } + + @Override + protected boolean setClosed() { + return super.setClosed(); + } + + abstract InetSocketAddress getLocalSocketAddress() throws Exception; + + abstract InetSocketAddress getRemoteSocketAddress() throws Exception; + + private final class WriteRequestQueue implements BlockingQueue { + private final ThreadLocalBoolean notifying = new ThreadLocalBoolean(); + + private final BlockingQueue queue; + + public WriteRequestQueue() { + this.queue = QueueFactory.createQueue(MessageEvent.class); + } + + @Override + public MessageEvent remove() { + return queue.remove(); + } + + @Override + public MessageEvent element() { + return queue.element(); + } + + @Override + public MessageEvent peek() { + return queue.peek(); + } + + @Override + public int size() { + return queue.size(); + } + + @Override + public boolean isEmpty() { + return queue.isEmpty(); + } + + @Override + public Iterator iterator() { + return queue.iterator(); + } + + @Override + public Object[] toArray() { + return queue.toArray(); + } + + @Override + public T[] toArray(T[] a) { + return queue.toArray(a); + } + + @Override + public boolean containsAll(Collection c) { + return queue.containsAll(c); + } + + @Override + public boolean addAll(Collection c) { + return queue.addAll(c); + } + + @Override + public boolean removeAll(Collection c) { + return queue.removeAll(c); + } + + @Override + public boolean retainAll(Collection c) { + return queue.retainAll(c); + } + + @Override + public void clear() { + queue.clear(); + } + + @Override + public boolean add(MessageEvent e) { + return queue.add(e); + } + + @Override + public void put(MessageEvent e) throws InterruptedException { + queue.put(e); + } + + @Override + public boolean offer(MessageEvent e, long timeout, TimeUnit unit) throws InterruptedException { + return queue.offer(e, timeout, unit); + } + + @Override + public MessageEvent take() throws InterruptedException { + return queue.take(); + } + + @Override + public MessageEvent poll(long timeout, TimeUnit unit) throws InterruptedException { + return queue.poll(timeout, unit); + } + + @Override + public int remainingCapacity() { + return queue.remainingCapacity(); + } + + @Override + public boolean remove(Object o) { + return queue.remove(o); + } + + @Override + public boolean contains(Object o) { + return queue.contains(o); + } + + @Override + public int drainTo(Collection c) { + return queue.drainTo(c); + } + + @Override + public int drainTo(Collection c, int maxElements) { + return queue.drainTo(c, maxElements); + } + + @Override + public boolean offer(MessageEvent e) { + boolean success = queue.offer(e); + assert success; + + int messageSize = getMessageSize(e); + int newWriteBufferSize = writeBufferSize.addAndGet(messageSize); + int highWaterMark = ((NioChannelConfig) getConfig()).getWriteBufferHighWaterMark(); + + if (newWriteBufferSize >= highWaterMark) { + if (newWriteBufferSize - messageSize < highWaterMark) { + highWaterMarkCounter.incrementAndGet(); + if (!notifying.get()) { + notifying.set(Boolean.TRUE); + fireChannelInterestChanged(AbstractNioChannel.this); + notifying.set(Boolean.FALSE); + } + } + } + return true; + } + + @Override + public MessageEvent poll() { + MessageEvent e = queue.poll(); + if (e != null) { + int messageSize = getMessageSize(e); + int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize); + int lowWaterMark = ((NioChannelConfig) getConfig()).getWriteBufferLowWaterMark(); + + if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) { + if (newWriteBufferSize + messageSize >= lowWaterMark) { + highWaterMarkCounter.decrementAndGet(); + if (isConnected() && !notifying.get()) { + notifying.set(Boolean.TRUE); + fireChannelInterestChanged(AbstractNioChannel.this); + notifying.set(Boolean.FALSE); + } + } + } + } + return e; + } + + private int getMessageSize(MessageEvent e) { + Object m = e.getMessage(); + if (m instanceof ChannelBuffer) { + return ((ChannelBuffer) m).readableBytes(); + } + return 0; + } + } + + private final class WriteTask implements Runnable { + + WriteTask() { + } + + @Override + public void run() { + writeTaskInTaskQueue.set(false); + worker.writeFromTaskLoop(AbstractNioChannel.this); + } + } + +} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java new file mode 100644 index 0000000000..8faccf893a --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -0,0 +1,694 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.nio; + +import static io.netty.channel.Channels.*; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; +import io.netty.channel.MessageEvent; +import io.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; +import io.netty.logging.InternalLogger; +import io.netty.logging.InternalLoggerFactory; +import io.netty.util.internal.DeadLockProofWorker; +import io.netty.util.internal.QueueFactory; + +import java.io.IOException; +import java.nio.channels.AsynchronousCloseException; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.NotYetConnectedException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.WritableByteChannel; +import java.util.Iterator; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +abstract class AbstractNioWorker implements Runnable { + /** + * Internal Netty logger. + */ + private static final InternalLogger logger = InternalLoggerFactory + .getInstance(AbstractNioWorker.class); + + private static final int CONSTRAINT_LEVEL = NioProviderMetadata.CONSTRAINT_LEVEL; + + static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization. + + + /** + * Executor used to execute {@link Runnable}s such as + * {@link ChannelRegistionTask}. + */ + private final Executor executor; + + /** + * Boolean to indicate if this worker has been started. + */ + private boolean started; + + /** + * If this worker has been started thread will be a reference to the thread + * used when starting. i.e. the current thread when the run method is executed. + */ + protected volatile Thread thread; + + /** + * The NIO {@link Selector}. + */ + volatile Selector selector; + + /** + * Boolean that controls determines if a blocked Selector.select should + * break out of its selection process. In our case we use a timeone for + * the select method and the select method will block for that time unless + * waken up. + */ + protected final AtomicBoolean wakenUp = new AtomicBoolean(); + + /** + * Lock for this workers Selector. + */ + private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock(); + + /** + * Monitor object used to synchronize selector open/close. + */ + private final Object startStopLock = new Object(); + + /** + * Queue of {@link ChannelRegistionTask}s + */ + private final Queue registerTaskQueue = QueueFactory.createQueue(Runnable.class); + + /** + * Queue of WriteTasks + */ + protected final Queue writeTaskQueue = QueueFactory.createQueue(Runnable.class); + + private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation + + private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool(); + + AbstractNioWorker(Executor executor) { + this.executor = executor; + } + + void register(AbstractNioChannel channel, ChannelFuture future) { + + Runnable registerTask = createRegisterTask(channel, future); + Selector selector; + + synchronized (startStopLock) { + if (!started) { + // Open a selector if this worker didn't start yet. + try { + this.selector = selector = Selector.open(); + } catch (Throwable t) { + throw new ChannelException("Failed to create a selector.", t); + } + + // Start the worker thread with the new Selector. + boolean success = false; + try { + DeadLockProofWorker.start(executor, this); + success = true; + } finally { + if (!success) { + // Release the Selector if the execution fails. + try { + selector.close(); + } catch (Throwable t) { + logger.warn("Failed to close a selector.", t); + } + this.selector = selector = null; + // The method will return to the caller at this point. + } + } + } else { + // Use the existing selector if this worker has been started. + selector = this.selector; + } + + assert selector != null && selector.isOpen(); + + started = true; + boolean offered = registerTaskQueue.offer(registerTask); + assert offered; + } + + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + } + + + @Override + public void run() { + thread = Thread.currentThread(); + + boolean shutdown = false; + Selector selector = this.selector; + for (;;) { + wakenUp.set(false); + + if (CONSTRAINT_LEVEL != 0) { + selectorGuard.writeLock().lock(); + // This empty synchronization block prevents the selector + // from acquiring its lock. + selectorGuard.writeLock().unlock(); + } + + try { + SelectorUtil.select(selector); + + // 'wakenUp.compareAndSet(false, true)' is always evaluated + // before calling 'selector.wakeup()' to reduce the wake-up + // overhead. (Selector.wakeup() is an expensive operation.) + // + // However, there is a race condition in this approach. + // The race condition is triggered when 'wakenUp' is set to + // true too early. + // + // 'wakenUp' is set to true too early if: + // 1) Selector is waken up between 'wakenUp.set(false)' and + // 'selector.select(...)'. (BAD) + // 2) Selector is waken up between 'selector.select(...)' and + // 'if (wakenUp.get()) { ... }'. (OK) + // + // In the first case, 'wakenUp' is set to true and the + // following 'selector.select(...)' will wake up immediately. + // Until 'wakenUp' is set to false again in the next round, + // 'wakenUp.compareAndSet(false, true)' will fail, and therefore + // any attempt to wake up the Selector will fail, too, causing + // the following 'selector.select(...)' call to block + // unnecessarily. + // + // To fix this problem, we wake up the selector again if wakenUp + // is true immediately after selector.select(...). + // It is inefficient in that it wakes up the selector for both + // the first case (BAD - wake-up required) and the second case + // (OK - no wake-up required). + + if (wakenUp.get()) { + selector.wakeup(); + } + + cancelledKeys = 0; + processRegisterTaskQueue(); + processWriteTaskQueue(); + processSelectedKeys(selector.selectedKeys()); + + // Exit the loop when there's nothing to handle. + // The shutdown flag is used to delay the shutdown of this + // loop to avoid excessive Selector creation when + // connections are registered in a one-by-one manner instead of + // concurrent manner. + if (selector.keys().isEmpty()) { + if (shutdown || + executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) { + + synchronized (startStopLock) { + if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) { + started = false; + try { + selector.close(); + } catch (IOException e) { + logger.warn( + "Failed to close a selector.", e); + } finally { + this.selector = null; + } + break; + } else { + shutdown = false; + } + } + } else { + // Give one more second. + shutdown = true; + } + } else { + shutdown = false; + } + } catch (Throwable t) { + logger.warn( + "Unexpected exception in the selector loop.", t); + + // Prevent possible consecutive immediate failures that lead to + // excessive CPU consumption. + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Ignore. + } + } + } + } + + + private void processRegisterTaskQueue() throws IOException { + for (;;) { + final Runnable task = registerTaskQueue.poll(); + if (task == null) { + break; + } + + task.run(); + cleanUpCancelledKeys(); + } + } + + private void processWriteTaskQueue() throws IOException { + for (;;) { + final Runnable task = writeTaskQueue.poll(); + if (task == null) { + break; + } + + task.run(); + cleanUpCancelledKeys(); + } + } + + private void processSelectedKeys(Set selectedKeys) throws IOException { + for (Iterator i = selectedKeys.iterator(); i.hasNext();) { + SelectionKey k = i.next(); + i.remove(); + try { + int readyOps = k.readyOps(); + if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) { + if (!read(k)) { + // Connection already closed - no need to handle write. + continue; + } + } + if ((readyOps & SelectionKey.OP_WRITE) != 0) { + writeFromSelectorLoop(k); + } + } catch (CancelledKeyException e) { + close(k); + } + + if (cleanUpCancelledKeys()) { + break; // break the loop to avoid ConcurrentModificationException + } + } + } + + private boolean cleanUpCancelledKeys() throws IOException { + if (cancelledKeys >= CLEANUP_INTERVAL) { + cancelledKeys = 0; + selector.selectNow(); + return true; + } + return false; + } + + + + private void close(SelectionKey k) { + AbstractNioChannel ch = (AbstractNioChannel) k.attachment(); + close(ch, succeededFuture(ch)); + } + + void writeFromUserCode(final AbstractNioChannel channel) { + if (!channel.isConnected()) { + cleanUpWriteBuffer(channel); + return; + } + + if (scheduleWriteIfNecessary(channel)) { + return; + } + + // From here, we are sure Thread.currentThread() == workerThread. + + if (channel.writeSuspended) { + return; + } + + if (channel.inWriteNowLoop) { + return; + } + + write0(channel); + } + + void writeFromTaskLoop(AbstractNioChannel ch) { + if (!ch.writeSuspended) { + write0(ch); + } + } + + void writeFromSelectorLoop(final SelectionKey k) { + AbstractNioChannel ch = (AbstractNioChannel) k.attachment(); + ch.writeSuspended = false; + write0(ch); + } + + protected abstract boolean scheduleWriteIfNecessary(final AbstractNioChannel channel); + + + private void write0(AbstractNioChannel channel) { + boolean open = true; + boolean addOpWrite = false; + boolean removeOpWrite = false; + + long writtenBytes = 0; + + final SocketSendBufferPool sendBufferPool = this.sendBufferPool; + final WritableByteChannel ch = channel.channel; + final Queue writeBuffer = channel.writeBufferQueue; + final int writeSpinCount = ((NioChannelConfig) channel.getConfig()).getWriteSpinCount(); + synchronized (channel.writeLock) { + channel.inWriteNowLoop = true; + for (;;) { + MessageEvent evt = channel.currentWriteEvent; + SendBuffer buf; + if (evt == null) { + if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) { + removeOpWrite = true; + channel.writeSuspended = false; + break; + } + + channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage()); + } else { + buf = channel.currentWriteBuffer; + } + + ChannelFuture future = evt.getFuture(); + try { + long localWrittenBytes = 0; + for (int i = writeSpinCount; i > 0; i --) { + localWrittenBytes = buf.transferTo(ch); + if (localWrittenBytes != 0) { + writtenBytes += localWrittenBytes; + break; + } + if (buf.finished()) { + break; + } + } + + if (buf.finished()) { + // Successful write - proceed to the next message. + buf.release(); + channel.currentWriteEvent = null; + channel.currentWriteBuffer = null; + evt = null; + buf = null; + future.setSuccess(); + } else { + // Not written fully - perhaps the kernel buffer is full. + addOpWrite = true; + channel.writeSuspended = true; + + if (localWrittenBytes > 0) { + // Notify progress listeners if necessary. + future.setProgress( + localWrittenBytes, + buf.writtenBytes(), buf.totalBytes()); + } + break; + } + } catch (AsynchronousCloseException e) { + // Doesn't need a user attention - ignore. + } catch (Throwable t) { + if (buf != null) { + buf.release(); + } + channel.currentWriteEvent = null; + channel.currentWriteBuffer = null; + buf = null; + evt = null; + future.setFailure(t); + fireExceptionCaught(channel, t); + if (t instanceof IOException) { + open = false; + close(channel, succeededFuture(channel)); + } + } + } + channel.inWriteNowLoop = false; + + // Initially, the following block was executed after releasing + // the writeLock, but there was a race condition, and it has to be + // executed before releasing the writeLock: + // + // https://issues.jboss.org/browse/NETTY-410 + // + if (open) { + if (addOpWrite) { + setOpWrite(channel); + } else if (removeOpWrite) { + clearOpWrite(channel); + } + } + } + + fireWriteComplete(channel, writtenBytes); + } + + private void setOpWrite(AbstractNioChannel channel) { + Selector selector = this.selector; + SelectionKey key = channel.channel.keyFor(selector); + if (key == null) { + return; + } + if (!key.isValid()) { + close(key); + return; + } + + // interestOps can change at any time and at any thread. + // Acquire a lock to avoid possible race condition. + synchronized (channel.interestOpsLock) { + int interestOps = channel.getRawInterestOps(); + if ((interestOps & SelectionKey.OP_WRITE) == 0) { + interestOps |= SelectionKey.OP_WRITE; + key.interestOps(interestOps); + channel.setRawInterestOpsNow(interestOps); + } + } + } + + private void clearOpWrite(AbstractNioChannel channel) { + Selector selector = this.selector; + SelectionKey key = channel.channel.keyFor(selector); + if (key == null) { + return; + } + if (!key.isValid()) { + close(key); + return; + } + + // interestOps can change at any time and at any thread. + // Acquire a lock to avoid possible race condition. + synchronized (channel.interestOpsLock) { + int interestOps = channel.getRawInterestOps(); + if ((interestOps & SelectionKey.OP_WRITE) != 0) { + interestOps &= ~SelectionKey.OP_WRITE; + key.interestOps(interestOps); + channel.setRawInterestOpsNow(interestOps); + } + } + } + + + void close(AbstractNioChannel channel, ChannelFuture future) { + boolean connected = channel.isConnected(); + boolean bound = channel.isBound(); + try { + channel.channel.close(); + cancelledKeys ++; + + if (channel.setClosed()) { + future.setSuccess(); + if (connected) { + fireChannelDisconnected(channel); + } + if (bound) { + fireChannelUnbound(channel); + } + + cleanUpWriteBuffer(channel); + fireChannelClosed(channel); + } else { + future.setSuccess(); + } + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } + } + + private void cleanUpWriteBuffer(AbstractNioChannel channel) { + Exception cause = null; + boolean fireExceptionCaught = false; + + // Clean up the stale messages in the write buffer. + synchronized (channel.writeLock) { + MessageEvent evt = channel.currentWriteEvent; + if (evt != null) { + // Create the exception only once to avoid the excessive overhead + // caused by fillStackTrace. + if (channel.isOpen()) { + cause = new NotYetConnectedException(); + } else { + cause = new ClosedChannelException(); + } + + ChannelFuture future = evt.getFuture(); + channel.currentWriteBuffer.release(); + channel.currentWriteBuffer = null; + channel.currentWriteEvent = null; + evt = null; + future.setFailure(cause); + fireExceptionCaught = true; + } + + Queue writeBuffer = channel.writeBufferQueue; + if (!writeBuffer.isEmpty()) { + // Create the exception only once to avoid the excessive overhead + // caused by fillStackTrace. + if (cause == null) { + if (channel.isOpen()) { + cause = new NotYetConnectedException(); + } else { + cause = new ClosedChannelException(); + } + } + + for (;;) { + evt = writeBuffer.poll(); + if (evt == null) { + break; + } + evt.getFuture().setFailure(cause); + fireExceptionCaught = true; + } + } + } + + if (fireExceptionCaught) { + fireExceptionCaught(channel, cause); + } + } + + void setInterestOps(AbstractNioChannel channel, ChannelFuture future, int interestOps) { + boolean changed = false; + try { + // interestOps can change at any time and at any thread. + // Acquire a lock to avoid possible race condition. + synchronized (channel.interestOpsLock) { + Selector selector = this.selector; + SelectionKey key = channel.channel.keyFor(selector); + + if (key == null || selector == null) { + // Not registered to the worker yet. + // Set the rawInterestOps immediately; RegisterTask will pick it up. + channel.setRawInterestOpsNow(interestOps); + return; + } + + // Override OP_WRITE flag - a user cannot change this flag. + interestOps &= ~Channel.OP_WRITE; + interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE; + + switch (CONSTRAINT_LEVEL) { + case 0: + if (channel.getRawInterestOps() != interestOps) { + key.interestOps(interestOps); + if (Thread.currentThread() != thread && + wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + changed = true; + } + break; + case 1: + case 2: + if (channel.getRawInterestOps() != interestOps) { + if (Thread.currentThread() == thread) { + key.interestOps(interestOps); + changed = true; + } else { + selectorGuard.readLock().lock(); + try { + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + key.interestOps(interestOps); + changed = true; + } finally { + selectorGuard.readLock().unlock(); + } + } + } + break; + default: + throw new Error(); + } + + if (changed) { + channel.setRawInterestOpsNow(interestOps); + } + } + + future.setSuccess(); + if (changed) { + fireChannelInterestChanged(channel); + } + } catch (CancelledKeyException e) { + // setInterestOps() was called on a closed channel. + ClosedChannelException cce = new ClosedChannelException(); + future.setFailure(cce); + fireExceptionCaught(channel, cce); + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } + } + + /** + * Read is called when a Selector has been notified that the underlying channel + * was something to be read. The channel would previously have registered its interest + * in read operations. + * + * @param key The selection key which contains the Selector registration information. + */ + protected abstract boolean read(SelectionKey k); + + /** + * Create a new {@link Runnable} which will register the {@link AbstractNioWorker} with the {@link Channel} + * + * @param channel + * @param future + * @return task + */ + protected abstract Runnable createRegisterTask(AbstractNioChannel channel, ChannelFuture future); + +} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractWriteRequestQueue.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractWriteRequestQueue.java deleted file mode 100644 index b9a848ead1..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractWriteRequestQueue.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import java.util.Collection; -import java.util.Iterator; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - -import io.netty.channel.MessageEvent; -import io.netty.util.internal.QueueFactory; - -abstract class AbstractWriteRequestQueue implements BlockingQueue { - - protected final BlockingQueue queue; - - public AbstractWriteRequestQueue() { - this.queue = QueueFactory.createQueue(MessageEvent.class); - } - - @Override - public MessageEvent remove() { - return queue.remove(); - } - - @Override - public MessageEvent element() { - return queue.element(); - } - - @Override - public MessageEvent peek() { - return queue.peek(); - } - - @Override - public int size() { - return queue.size(); - } - - @Override - public boolean isEmpty() { - return queue.isEmpty(); - } - - @Override - public Iterator iterator() { - return queue.iterator(); - } - - @Override - public Object[] toArray() { - return queue.toArray(); - } - - @Override - public T[] toArray(T[] a) { - return queue.toArray(a); - } - - @Override - public boolean containsAll(Collection c) { - return queue.containsAll(c); - } - - @Override - public boolean addAll(Collection c) { - return queue.addAll(c); - } - - @Override - public boolean removeAll(Collection c) { - return queue.removeAll(c); - } - - @Override - public boolean retainAll(Collection c) { - return queue.retainAll(c); - } - - @Override - public void clear() { - queue.clear(); - } - - @Override - public boolean add(MessageEvent e) { - return queue.add(e); - } - - @Override - public void put(MessageEvent e) throws InterruptedException { - queue.put(e); - } - - @Override - public boolean offer(MessageEvent e, long timeout, TimeUnit unit) throws InterruptedException { - return queue.offer(e, timeout, unit); - } - - @Override - public MessageEvent take() throws InterruptedException { - return queue.take(); - } - - @Override - public MessageEvent poll(long timeout, TimeUnit unit) throws InterruptedException { - return queue.poll(timeout, unit); - } - - @Override - public int remainingCapacity() { - return queue.remainingCapacity(); - } - - @Override - public boolean remove(Object o) { - return queue.remove(o); - } - - @Override - public boolean contains(Object o) { - return queue.contains(o); - } - - @Override - public int drainTo(Collection c) { - return queue.drainTo(c); - } - - @Override - public int drainTo(Collection c, int maxElements) { - return queue.drainTo(c, maxElements); - } - -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/nio/NioChannelConfig.java index 65dd184993..251d38ddab 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioChannelConfig.java @@ -25,7 +25,7 @@ import io.netty.channel.ChannelConfig; * Special {@link ChannelConfig} sub-type which offers extra methods which are useful for NIO. * */ -public interface NioChannelConfig extends ChannelConfig{ +public interface NioChannelConfig extends ChannelConfig { /** * Returns the high water mark of the write buffer. If the number of bytes diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java index 0b269857e7..14226a5ee6 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -113,7 +113,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { } else if (e instanceof MessageEvent) { MessageEvent event = (MessageEvent) e; NioSocketChannel channel = (NioSocketChannel) event.getChannel(); - boolean offered = channel.writeBuffer.offer(event); + boolean offered = channel.writeBufferQueue.offer(event); assert offered; channel.worker.writeFromUserCode(channel); } @@ -123,7 +123,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { NioClientSocketChannel channel, ChannelFuture future, SocketAddress localAddress) { try { - channel.socket.socket().bind(localAddress); + channel.channel.socket().bind(localAddress); channel.boundManually = true; channel.setBound(); future.setSuccess(); @@ -138,7 +138,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { final NioClientSocketChannel channel, final ChannelFuture cf, SocketAddress remoteAddress) { try { - if (channel.socket.connect(remoteAddress)) { + if (channel.channel.connect(remoteAddress)) { channel.worker.register(channel, cf); } else { channel.getCloseFuture().addListener(new ChannelFutureListener() { @@ -392,7 +392,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { private void connect(SelectionKey k) { NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); try { - if (ch.socket.finishConnect()) { + if (ch.channel.finishConnect()) { k.cancel(); ch.worker.register(ch, ch.connectFuture); } @@ -422,7 +422,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { @Override public void run() { try { - channel.socket.register( + channel.channel.register( boss.selector, SelectionKey.OP_CONNECT, channel); } catch (ClosedChannelException e) { channel.worker.close(channel, succeededFuture(channel)); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java index 89d54527cc..c8dcc77813 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java @@ -15,36 +15,23 @@ */ package io.netty.channel.socket.nio; -import static io.netty.channel.Channels.*; +import static io.netty.channel.Channels.fireChannelOpen; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFactory; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelSink; +import io.netty.channel.socket.DatagramChannelConfig; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.NetworkInterface; -import java.net.SocketAddress; import java.nio.channels.DatagramChannel; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import io.netty.buffer.ChannelBuffer; -import io.netty.channel.AbstractChannel; -import io.netty.channel.Channel; -import io.netty.channel.ChannelException; -import io.netty.channel.ChannelFactory; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelSink; -import io.netty.channel.MessageEvent; -import io.netty.channel.socket.DatagramChannelConfig; -import io.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; -import io.netty.util.internal.LegacyLinkedTransferQueue; -import io.netty.util.internal.ThreadLocalBoolean; /** * Provides an NIO based {@link io.netty.channel.socket.DatagramChannel}. */ -final class NioDatagramChannel extends AbstractChannel +final class NioDatagramChannel extends AbstractNioChannel implements io.netty.channel.socket.DatagramChannel { /** @@ -52,67 +39,7 @@ final class NioDatagramChannel extends AbstractChannel */ private final NioDatagramChannelConfig config; - /** - * The {@link NioDatagramWorker} for this NioDatagramChannnel. - */ - final NioDatagramWorker worker; - - /** - * The {@link DatagramChannel} that this channel uses. - */ - private final java.nio.channels.DatagramChannel datagramChannel; - - /** - * Monitor object to synchronize access to InterestedOps. - */ - final Object interestOpsLock = new Object(); - - /** - * Monitor object for synchronizing access to the {@link WriteRequestQueue}. - */ - final Object writeLock = new Object(); - - /** - * WriteTask that performs write operations. - */ - final Runnable writeTask = new WriteTask(); - - /** - * Indicates if there is a {@link WriteTask} in the task queue. - */ - final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean(); - - /** - * Queue of write {@link MessageEvent}s. - */ - final Queue writeBufferQueue = new WriteRequestQueue(); - - /** - * Keeps track of the number of bytes that the {@link WriteRequestQueue} currently - * contains. - */ - final AtomicInteger writeBufferSize = new AtomicInteger(); - - /** - * Keeps track of the highWaterMark. - */ - final AtomicInteger highWaterMarkCounter = new AtomicInteger(); - - /** - * The current write {@link MessageEvent} - */ - MessageEvent currentWriteEvent; - SendBuffer currentWriteBuffer; - - /** - * Boolean that indicates that write operation is in progress. - */ - boolean inWriteNowLoop; - boolean writeSuspended; - - private volatile InetSocketAddress localAddress; - volatile InetSocketAddress remoteAddress; - + static NioDatagramChannel create(ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, NioDatagramWorker worker) { NioDatagramChannel instance = @@ -124,13 +51,11 @@ final class NioDatagramChannel extends AbstractChannel private NioDatagramChannel(final ChannelFactory factory, final ChannelPipeline pipeline, final ChannelSink sink, final NioDatagramWorker worker) { - super(null, factory, pipeline, sink); - this.worker = worker; - datagramChannel = openNonBlockingChannel(); - config = new DefaultNioDatagramChannelConfig(datagramChannel.socket()); + super(null, factory, pipeline, sink, worker, openNonBlockingChannel()); + config = new DefaultNioDatagramChannelConfig(channel.socket()); } - private DatagramChannel openNonBlockingChannel() { + private static DatagramChannel openNonBlockingChannel() { try { final DatagramChannel channel = DatagramChannel.open(); channel.configureBlocking(false); @@ -140,44 +65,15 @@ final class NioDatagramChannel extends AbstractChannel } } - @Override - public InetSocketAddress getLocalAddress() { - InetSocketAddress localAddress = this.localAddress; - if (localAddress == null) { - try { - this.localAddress = localAddress = - (InetSocketAddress) datagramChannel.socket().getLocalSocketAddress(); - } catch (Throwable t) { - // Sometimes fails on a closed socket in Windows. - return null; - } - } - return localAddress; - } - - @Override - public InetSocketAddress getRemoteAddress() { - InetSocketAddress remoteAddress = this.remoteAddress; - if (remoteAddress == null) { - try { - this.remoteAddress = remoteAddress = - (InetSocketAddress) datagramChannel.socket().getRemoteSocketAddress(); - } catch (Throwable t) { - // Sometimes fails on a closed socket in Windows. - return null; - } - } - return remoteAddress; - } @Override public boolean isBound() { - return isOpen() && datagramChannel.socket().isBound(); + return isOpen() && channel.socket().isBound(); } @Override public boolean isConnected() { - return datagramChannel.isConnected(); + return channel.isConnected(); } @Override @@ -191,140 +87,7 @@ final class NioDatagramChannel extends AbstractChannel } DatagramChannel getDatagramChannel() { - return datagramChannel; - } - - @Override - public int getInterestOps() { - if (!isOpen()) { - return Channel.OP_WRITE; - } - - int interestOps = getRawInterestOps(); - int writeBufferSize = this.writeBufferSize.get(); - if (writeBufferSize != 0) { - if (highWaterMarkCounter.get() > 0) { - int lowWaterMark = getConfig().getWriteBufferLowWaterMark(); - if (writeBufferSize >= lowWaterMark) { - interestOps |= Channel.OP_WRITE; - } else { - interestOps &= ~Channel.OP_WRITE; - } - } else { - int highWaterMark = getConfig().getWriteBufferHighWaterMark(); - if (writeBufferSize >= highWaterMark) { - interestOps |= Channel.OP_WRITE; - } else { - interestOps &= ~Channel.OP_WRITE; - } - } - } else { - interestOps &= ~Channel.OP_WRITE; - } - - return interestOps; - } - - int getRawInterestOps() { - return super.getInterestOps(); - } - - void setRawInterestOpsNow(int interestOps) { - super.setInterestOpsNow(interestOps); - } - - @Override - public ChannelFuture write(Object message, SocketAddress remoteAddress) { - if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) { - return super.write(message, null); - } else { - return super.write(message, remoteAddress); - } - } - - /** - * {@link WriteRequestQueue} is an extension of {@link AbstractWriteRequestQueue} - * that adds support for highWaterMark checking of the write buffer size. - */ - private final class WriteRequestQueue extends - AbstractWriteRequestQueue { - - private final ThreadLocalBoolean notifying = new ThreadLocalBoolean(); - - - /** - * This method first delegates to {@link LegacyLinkedTransferQueue#offer(Object)} and - * adds support for keeping track of the size of the this write buffer. - */ - @Override - public boolean offer(MessageEvent e) { - boolean success = queue.offer(e); - assert success; - - int messageSize = getMessageSize(e); - int newWriteBufferSize = writeBufferSize.addAndGet(messageSize); - int highWaterMark = getConfig().getWriteBufferHighWaterMark(); - - if (newWriteBufferSize >= highWaterMark) { - if (newWriteBufferSize - messageSize < highWaterMark) { - highWaterMarkCounter.incrementAndGet(); - if (!notifying.get()) { - notifying.set(Boolean.TRUE); - fireChannelInterestChanged(NioDatagramChannel.this); - notifying.set(Boolean.FALSE); - } - } - } - return true; - } - - /** - * This method first delegates to {@link LegacyLinkedTransferQueue#poll()} and - * adds support for keeping track of the size of the this writebuffers queue. - */ - @Override - public MessageEvent poll() { - MessageEvent e = queue.poll(); - if (e != null) { - int messageSize = getMessageSize(e); - int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize); - int lowWaterMark = getConfig().getWriteBufferLowWaterMark(); - - if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) { - if (newWriteBufferSize + messageSize >= lowWaterMark) { - highWaterMarkCounter.decrementAndGet(); - if (isBound() && !notifying.get()) { - notifying.set(Boolean.TRUE); - fireChannelInterestChanged(NioDatagramChannel.this); - notifying.set(Boolean.FALSE); - } - } - } - } - return e; - } - - private int getMessageSize(MessageEvent e) { - Object m = e.getMessage(); - if (m instanceof ChannelBuffer) { - return ((ChannelBuffer) m).readableBytes(); - } - return 0; - } - } - - /** - * WriteTask is a simple runnable performs writes by delegating the {@link NioDatagramWorker}. - */ - private final class WriteTask implements Runnable { - WriteTask() { - } - - @Override - public void run() { - writeTaskInTaskQueue.set(false); - worker.writeFromTaskLoop(NioDatagramChannel.this); - } + return channel; } @Override @@ -348,4 +111,14 @@ final class NioDatagramChannel extends AbstractChannel NetworkInterface networkInterface) { throw new UnsupportedOperationException(); } + + @Override + InetSocketAddress getLocalSocketAddress() throws Exception { + return (InetSocketAddress) channel.socket().getLocalSocketAddress(); + } + + @Override + InetSocketAddress getRemoteSocketAddress() throws Exception { + return (InetSocketAddress) channel.socket().getRemoteSocketAddress(); + } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java index eff53e3433..54ed300b96 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java @@ -15,102 +15,28 @@ */ package io.netty.channel.socket.nio; -import static io.netty.channel.Channels.*; - -import java.io.IOException; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousCloseException; -import java.nio.channels.CancelledKeyException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.DatagramChannel; -import java.nio.channels.NotYetBoundException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.util.Iterator; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - +import static io.netty.channel.Channels.fireChannelDisconnected; +import static io.netty.channel.Channels.fireExceptionCaught; +import static io.netty.channel.Channels.fireMessageReceived; +import static io.netty.channel.Channels.succeededFuture; import io.netty.buffer.ChannelBufferFactory; -import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; -import io.netty.channel.MessageEvent; import io.netty.channel.ReceiveBufferSizePredictor; -import io.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; -import io.netty.util.internal.QueueFactory; + +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.concurrent.Executor; /** * A class responsible for registering channels with {@link Selector}. * It also implements the {@link Selector} loop. */ -class NioDatagramWorker implements Runnable { - /** - * Internal Netty logger. - */ - private static final InternalLogger logger = InternalLoggerFactory - .getInstance(NioDatagramWorker.class); - - /** - * Executor used to execute {@link Runnable}s such as - * {@link ChannelRegistionTask}. - */ - private final Executor executor; - - /** - * Boolean to indicate if this worker has been started. - */ - private boolean started; - - /** - * If this worker has been started thread will be a reference to the thread - * used when starting. i.e. the current thread when the run method is executed. - */ - private volatile Thread thread; - - /** - * The NIO {@link Selector}. - */ - volatile Selector selector; - - /** - * Boolean that controls determines if a blocked Selector.select should - * break out of its selection process. In our case we use a timeone for - * the select method and the select method will block for that time unless - * waken up. - */ - private final AtomicBoolean wakenUp = new AtomicBoolean(); - - /** - * Lock for this workers Selector. - */ - private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock(); - - /** - * Monitor object used to synchronize selector open/close. - */ - private final Object startStopLock = new Object(); - - /** - * Queue of {@link ChannelRegistionTask}s - */ - private final Queue registerTaskQueue = QueueFactory.createQueue(Runnable.class); - - /** - * Queue of WriteTasks - */ - private final Queue writeTaskQueue = QueueFactory.createQueue(Runnable.class); - - private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation - - private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool(); +class NioDatagramWorker extends AbstractNioWorker { /** * Sole constructor. @@ -119,246 +45,13 @@ class NioDatagramWorker implements Runnable { * such as {@link ChannelRegistionTask} */ NioDatagramWorker(final Executor executor) { - this.executor = executor; + super(executor); } - /** - * Registers the passed-in channel with a selector. - * - * @param channel the channel to register - * @param future the {@link ChannelFuture} that has to be notified on - * completion - */ - void register(final NioDatagramChannel channel, final ChannelFuture future) { - final Runnable channelRegTask = new ChannelRegistionTask(channel, - future); - Selector selector; + - synchronized (startStopLock) { - if (!started) { - // Open a selector if this worker didn't start yet. - try { - this.selector = selector = Selector.open(); - } catch (final Throwable t) { - throw new ChannelException("Failed to create a selector.", - t); - } - - boolean success = false; - try { - // Start the main selector loop. See run() for details. - executor.execute(this); - success = true; - } finally { - if (!success) { - try { - // Release the Selector if the execution fails. - selector.close(); - } catch (final Throwable t) { - logger.warn("Failed to close a selector.", t); - } - this.selector = selector = null; - // The method will return to the caller at this point. - } - } - } else { - // Use the existing selector if this worker has been started. - selector = this.selector; - } - assert selector != null && selector.isOpen(); - - started = true; - - // "Add" the registration task to the register task queue. - boolean offered = registerTaskQueue.offer(channelRegTask); - assert offered; - } - - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - } - - /** - * Selector loop. - */ @Override - public void run() { - // Store a ref to the current thread. - thread = Thread.currentThread(); - - final Selector selector = this.selector; - boolean shutdown = false; - - for (;;) { - wakenUp.set(false); - - if (NioProviderMetadata.CONSTRAINT_LEVEL != 0) { - selectorGuard.writeLock().lock(); - // This empty synchronization block prevents the selector from acquiring its lock. - selectorGuard.writeLock().unlock(); - } - - try { - SelectorUtil.select(selector); - - // 'wakenUp.compareAndSet(false, true)' is always evaluated - // before calling 'selector.wakeup()' to reduce the wake-up - // overhead. (Selector.wakeup() is an expensive operation.) - // - // However, there is a race condition in this approach. - // The race condition is triggered when 'wakenUp' is set to - // true too early. - // - // 'wakenUp' is set to true too early if: - // 1) Selector is waken up between 'wakenUp.set(false)' and - // 'selector.select(...)'. (BAD) - // 2) Selector is waken up between 'selector.select(...)' and - // 'if (wakenUp.get()) { ... }'. (OK) - // - // In the first case, 'wakenUp' is set to true and the - // following 'selector.select(...)' will wake up immediately. - // Until 'wakenUp' is set to false again in the next round, - // 'wakenUp.compareAndSet(false, true)' will fail, and therefore - // any attempt to wake up the Selector will fail, too, causing - // the following 'selector.select(...)' call to block - // unnecessarily. - // - // To fix this problem, we wake up the selector again if wakenUp - // is true immediately after selector.select(...). - // It is inefficient in that it wakes up the selector for both - // the first case (BAD - wake-up required) and the second case - // (OK - no wake-up required). - - if (wakenUp.get()) { - selector.wakeup(); - } - - cancelledKeys = 0; - processRegisterTaskQueue(); - processWriteTaskQueue(); - processSelectedKeys(selector.selectedKeys()); - - // Exit the loop when there's nothing to handle (the registered - // key set is empty. - // The shutdown flag is used to delay the shutdown of this - // loop to avoid excessive Selector creation when - // connections are registered in a one-by-one manner instead of - // concurrent manner. - if (selector.keys().isEmpty()) { - if (shutdown || executor instanceof ExecutorService && - ((ExecutorService) executor).isShutdown()) { - synchronized (startStopLock) { - if (registerTaskQueue.isEmpty() && - selector.keys().isEmpty()) { - started = false; - try { - selector.close(); - } catch (IOException e) { - logger.warn("Failed to close a selector.", - e); - } finally { - this.selector = null; - } - break; - } else { - shutdown = false; - } - } - } else { - // Give one more second. - shutdown = true; - } - } else { - shutdown = false; - } - } catch (Throwable t) { - logger.warn("Unexpected exception in the selector loop.", t); - - // Prevent possible consecutive immediate failures that lead to - // excessive CPU consumption. - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // Ignore. - } - } - } - } - - /** - * Will go through all the {@link ChannelRegistionTask}s in the - * task queue and run them (registering them). - */ - private void processRegisterTaskQueue() throws IOException { - for (;;) { - final Runnable task = registerTaskQueue.poll(); - if (task == null) { - break; - } - - task.run(); - cleanUpCancelledKeys(); - } - } - - /** - * Will go through all the WriteTasks and run them. - */ - private void processWriteTaskQueue() throws IOException { - for (;;) { - final Runnable task = writeTaskQueue.poll(); - if (task == null) { - break; - } - - task.run(); - cleanUpCancelledKeys(); - } - } - - private void processSelectedKeys(final Set selectedKeys) throws IOException { - for (Iterator i = selectedKeys.iterator(); i.hasNext();) { - SelectionKey k = i.next(); - i.remove(); - try { - int readyOps = k.readyOps(); - if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) { - if (!read(k)) { - // Connection already closed - no need to handle write. - continue; - } - } - if ((readyOps & SelectionKey.OP_WRITE) != 0) { - writeFromSelectorLoop(k); - } - } catch (CancelledKeyException e) { - close(k); - } - - if (cleanUpCancelledKeys()) { - break; // Break the loop to avoid ConcurrentModificationException - } - } - } - - private boolean cleanUpCancelledKeys() throws IOException { - if (cancelledKeys >= NioWorker.CLEANUP_INTERVAL) { - cancelledKeys = 0; - selector.selectNow(); - return true; - } - return false; - } - - /** - * Read is called when a Selector has been notified that the underlying channel - * was something to be read. The channel would previously have registered its interest - * in read operations. - * - * @param key The selection key which contains the Selector registration information. - */ - private boolean read(final SelectionKey key) { + protected boolean read(final SelectionKey key) { final NioDatagramChannel channel = (NioDatagramChannel) key.attachment(); ReceiveBufferSizePredictor predictor = channel.getConfig().getReceiveBufferSizePredictor(); @@ -408,53 +101,10 @@ class NioDatagramWorker implements Runnable { return true; } + - private void close(SelectionKey k) { - final NioDatagramChannel ch = (NioDatagramChannel) k.attachment(); - close(ch, succeededFuture(ch)); - } - - void writeFromUserCode(final NioDatagramChannel channel) { - /* - * Note that we are not checking if the channel is connected. Connected - * has a different meaning in UDP and means that the channels socket is - * configured to only send and receive from a given remote peer. - */ - if (!channel.isBound()) { - cleanUpWriteBuffer(channel); - return; - } - - if (scheduleWriteIfNecessary(channel)) { - return; - } - - // From here, we are sure Thread.currentThread() == workerThread. - - if (channel.writeSuspended) { - return; - } - - if (channel.inWriteNowLoop) { - return; - } - - write0(channel); - } - - void writeFromTaskLoop(final NioDatagramChannel ch) { - if (!ch.writeSuspended) { - write0(ch); - } - } - - void writeFromSelectorLoop(final SelectionKey k) { - NioDatagramChannel ch = (NioDatagramChannel) k.attachment(); - ch.writeSuspended = false; - write0(ch); - } - - private boolean scheduleWriteIfNecessary(final NioDatagramChannel channel) { + @Override + protected boolean scheduleWriteIfNecessary(final AbstractNioChannel channel) { final Thread workerThread = thread; if (workerThread == null || Thread.currentThread() != workerThread) { if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { @@ -475,155 +125,6 @@ class NioDatagramWorker implements Runnable { return false; } - private void write0(final NioDatagramChannel channel) { - - boolean addOpWrite = false; - boolean removeOpWrite = false; - - long writtenBytes = 0; - - final SocketSendBufferPool sendBufferPool = this.sendBufferPool; - final DatagramChannel ch = channel.getDatagramChannel(); - final Queue writeBuffer = channel.writeBufferQueue; - final int writeSpinCount = channel.getConfig().getWriteSpinCount(); - synchronized (channel.writeLock) { - // inform the channel that write is in-progress - channel.inWriteNowLoop = true; - - // loop forever... - for (;;) { - MessageEvent evt = channel.currentWriteEvent; - SendBuffer buf; - if (evt == null) { - if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) { - removeOpWrite = true; - channel.writeSuspended = false; - break; - } - - channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage()); - } else { - buf = channel.currentWriteBuffer; - } - - try { - long localWrittenBytes = 0; - SocketAddress raddr = evt.getRemoteAddress(); - if (raddr == null) { - for (int i = writeSpinCount; i > 0; i --) { - localWrittenBytes = buf.transferTo(ch); - if (localWrittenBytes != 0) { - writtenBytes += localWrittenBytes; - break; - } - if (buf.finished()) { - break; - } - } - } else { - for (int i = writeSpinCount; i > 0; i --) { - localWrittenBytes = buf.transferTo(ch, raddr); - if (localWrittenBytes != 0) { - writtenBytes += localWrittenBytes; - break; - } - if (buf.finished()) { - break; - } - } - } - - if (localWrittenBytes > 0 || buf.finished()) { - // Successful write - proceed to the next message. - buf.release(); - ChannelFuture future = evt.getFuture(); - channel.currentWriteEvent = null; - channel.currentWriteBuffer = null; - evt = null; - buf = null; - future.setSuccess(); - } else { - // Not written at all - perhaps the kernel buffer is full. - addOpWrite = true; - channel.writeSuspended = true; - break; - } - } catch (final AsynchronousCloseException e) { - // Doesn't need a user attention - ignore. - } catch (final Throwable t) { - buf.release(); - ChannelFuture future = evt.getFuture(); - channel.currentWriteEvent = null; - channel.currentWriteBuffer = null; - buf = null; - evt = null; - future.setFailure(t); - fireExceptionCaught(channel, t); - } - } - channel.inWriteNowLoop = false; - - // Initially, the following block was executed after releasing - // the writeLock, but there was a race condition, and it has to be - // executed before releasing the writeLock: - // - // https://issues.jboss.org/browse/NETTY-410 - // - if (addOpWrite) { - setOpWrite(channel); - } else if (removeOpWrite) { - clearOpWrite(channel); - } - } - - fireWriteComplete(channel, writtenBytes); - } - - private void setOpWrite(final NioDatagramChannel channel) { - Selector selector = this.selector; - SelectionKey key = channel.getDatagramChannel().keyFor(selector); - if (key == null) { - return; - } - if (!key.isValid()) { - close(key); - return; - } - - // interestOps can change at any time and at any thread. - // Acquire a lock to avoid possible race condition. - synchronized (channel.interestOpsLock) { - int interestOps = channel.getRawInterestOps(); - if ((interestOps & SelectionKey.OP_WRITE) == 0) { - interestOps |= SelectionKey.OP_WRITE; - key.interestOps(interestOps); - channel.setRawInterestOpsNow(interestOps); - } - } - } - - private void clearOpWrite(NioDatagramChannel channel) { - Selector selector = this.selector; - SelectionKey key = channel.getDatagramChannel().keyFor(selector); - if (key == null) { - return; - } - if (!key.isValid()) { - close(key); - return; - } - - // interestOps can change at any time and at any thread. - // Acquire a lock to avoid possible race condition. - synchronized (channel.interestOpsLock) { - int interestOps = channel.getRawInterestOps(); - if ((interestOps & SelectionKey.OP_WRITE) != 0) { - interestOps &= ~SelectionKey.OP_WRITE; - key.interestOps(interestOps); - channel.setRawInterestOpsNow(interestOps); - } - } - } static void disconnect(NioDatagramChannel channel, ChannelFuture future) { boolean connected = channel.isConnected(); @@ -639,171 +140,12 @@ class NioDatagramWorker implements Runnable { } } - void close(final NioDatagramChannel channel, - final ChannelFuture future) { - boolean connected = channel.isConnected(); - boolean bound = channel.isBound(); - try { - channel.getDatagramChannel().close(); - cancelledKeys ++; - if (channel.setClosed()) { - future.setSuccess(); - if (connected) { - fireChannelDisconnected(channel); - } - if (bound) { - fireChannelUnbound(channel); - } - - cleanUpWriteBuffer(channel); - fireChannelClosed(channel); - } else { - future.setSuccess(); - } - } catch (Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } + @Override + protected Runnable createRegisterTask(AbstractNioChannel channel, ChannelFuture future) { + return new ChannelRegistionTask((NioDatagramChannel) channel, future); } - - private void cleanUpWriteBuffer(final NioDatagramChannel channel) { - Exception cause = null; - boolean fireExceptionCaught = false; - - // Clean up the stale messages in the write buffer. - synchronized (channel.writeLock) { - MessageEvent evt = channel.currentWriteEvent; - if (evt != null) { - // Create the exception only once to avoid the excessive overhead - // caused by fillStackTrace. - if (channel.isOpen()) { - cause = new NotYetBoundException(); - } else { - cause = new ClosedChannelException(); - } - - ChannelFuture future = evt.getFuture(); - channel.currentWriteBuffer.release(); - channel.currentWriteBuffer = null; - channel.currentWriteEvent = null; - evt = null; - future.setFailure(cause); - fireExceptionCaught = true; - } - - Queue writeBuffer = channel.writeBufferQueue; - if (!writeBuffer.isEmpty()) { - // Create the exception only once to avoid the excessive overhead - // caused by fillStackTrace. - if (cause == null) { - if (channel.isOpen()) { - cause = new NotYetBoundException(); - } else { - cause = new ClosedChannelException(); - } - } - - for (;;) { - evt = writeBuffer.poll(); - if (evt == null) { - break; - } - evt.getFuture().setFailure(cause); - fireExceptionCaught = true; - } - } - } - - if (fireExceptionCaught) { - fireExceptionCaught(channel, cause); - } - } - - void setInterestOps(final NioDatagramChannel channel, - ChannelFuture future, int interestOps) { - - boolean changed = false; - try { - // interestOps can change at any time and by any thread. - // Acquire a lock to avoid possible race condition. - synchronized (channel.interestOpsLock) { - final Selector selector = this.selector; - final SelectionKey key = channel.getDatagramChannel().keyFor(selector); - - if (key == null || selector == null) { - // Not registered to the worker yet. - // Set the rawInterestOps immediately; RegisterTask will pick it up. - channel.setRawInterestOpsNow(interestOps); - return; - } - - // Override OP_WRITE flag - a user cannot change this flag. - interestOps &= ~Channel.OP_WRITE; - interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE; - - switch (NioProviderMetadata.CONSTRAINT_LEVEL) { - case 0: - if (channel.getRawInterestOps() != interestOps) { - // Set the interesteOps on the SelectionKey - key.interestOps(interestOps); - // If the worker thread (the one that that might possibly be blocked - // in a select() call) is not the thread executing this method wakeup - // the select() operation. - if (Thread.currentThread() != thread && - wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - changed = true; - } - break; - case 1: - case 2: - if (channel.getRawInterestOps() != interestOps) { - if (Thread.currentThread() == thread) { - // Going to set the interestOps from the same thread. - // Set the interesteOps on the SelectionKey - key.interestOps(interestOps); - changed = true; - } else { - // Going to set the interestOps from a different thread - // and some old provides will need synchronization. - selectorGuard.readLock().lock(); - try { - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - key.interestOps(interestOps); - changed = true; - } finally { - selectorGuard.readLock().unlock(); - } - } - } - break; - default: - throw new Error(); - } - if (changed) { - channel.setRawInterestOpsNow(interestOps); - } - } - - future.setSuccess(); - if (changed) { - fireChannelInterestChanged(channel); - } - } catch (final CancelledKeyException e) { - // setInterestOps() was called on a closed channel. - ClosedChannelException cce = new ClosedChannelException(); - future.setFailure(cce); - fireExceptionCaught(channel, cce); - } catch (final Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } - } - + /** * RegisterTask is a task responsible for registering a channel with a * selector. @@ -852,4 +194,5 @@ class NioDatagramWorker implements Runnable { } } } + } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java index 7663886d52..9410102d2c 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java @@ -122,7 +122,7 @@ class NioServerSocketPipelineSink extends AbstractChannelSink { } else if (e instanceof MessageEvent) { MessageEvent event = (MessageEvent) e; NioSocketChannel channel = (NioSocketChannel) event.getChannel(); - boolean offered = channel.writeBuffer.offer(event); + boolean offered = channel.writeBufferQueue.offer(event); assert offered; channel.worker.writeFromUserCode(channel); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index e73d133d2b..37f4e669ff 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -15,27 +15,15 @@ */ package io.netty.channel.socket.nio; -import static io.netty.channel.Channels.*; - -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.channels.SocketChannel; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import io.netty.buffer.ChannelBuffer; -import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; import io.netty.channel.ChannelFactory; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelSink; -import io.netty.channel.MessageEvent; -import io.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; -import io.netty.util.internal.ThreadLocalBoolean; -class NioSocketChannel extends AbstractChannel +import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; + +class NioSocketChannel extends AbstractNioChannel implements io.netty.channel.socket.SocketChannel { private static final int ST_OPEN = 0; @@ -44,35 +32,14 @@ class NioSocketChannel extends AbstractChannel private static final int ST_CLOSED = -1; volatile int state = ST_OPEN; - final SocketChannel socket; - final NioWorker worker; private final NioSocketChannelConfig config; - private volatile InetSocketAddress localAddress; - private volatile InetSocketAddress remoteAddress; - - final Object interestOpsLock = new Object(); - final Object writeLock = new Object(); - - final Runnable writeTask = new WriteTask(); - final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean(); - - final Queue writeBuffer = new WriteRequestQueue(); - final AtomicInteger writeBufferSize = new AtomicInteger(); - final AtomicInteger highWaterMarkCounter = new AtomicInteger(); - boolean inWriteNowLoop; - boolean writeSuspended; - - MessageEvent currentWriteEvent; - SendBuffer currentWriteBuffer; public NioSocketChannel( Channel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, SocketChannel socket, NioWorker worker) { - super(parent, factory, pipeline, sink); + super(parent, factory, pipeline, sink, worker, socket); - this.socket = socket; - this.worker = worker; config = new DefaultNioSocketChannelConfig(socket.socket()); } @@ -81,36 +48,6 @@ class NioSocketChannel extends AbstractChannel return config; } - @Override - public InetSocketAddress getLocalAddress() { - InetSocketAddress localAddress = this.localAddress; - if (localAddress == null) { - try { - this.localAddress = localAddress = - (InetSocketAddress) socket.socket().getLocalSocketAddress(); - } catch (Throwable t) { - // Sometimes fails on a closed socket in Windows. - return null; - } - } - return localAddress; - } - - @Override - public InetSocketAddress getRemoteAddress() { - InetSocketAddress remoteAddress = this.remoteAddress; - if (remoteAddress == null) { - try { - this.remoteAddress = remoteAddress = - (InetSocketAddress) socket.socket().getRemoteSocketAddress(); - } catch (Throwable t) { - // Sometimes fails on a closed socket in Windows. - return null; - } - } - return remoteAddress; - } - @Override public boolean isOpen() { return state >= ST_OPEN; @@ -143,123 +80,14 @@ class NioSocketChannel extends AbstractChannel return super.setClosed(); } + @Override - public int getInterestOps() { - if (!isOpen()) { - return Channel.OP_WRITE; - } - - int interestOps = getRawInterestOps(); - int writeBufferSize = this.writeBufferSize.get(); - if (writeBufferSize != 0) { - if (highWaterMarkCounter.get() > 0) { - int lowWaterMark = getConfig().getWriteBufferLowWaterMark(); - if (writeBufferSize >= lowWaterMark) { - interestOps |= Channel.OP_WRITE; - } else { - interestOps &= ~Channel.OP_WRITE; - } - } else { - int highWaterMark = getConfig().getWriteBufferHighWaterMark(); - if (writeBufferSize >= highWaterMark) { - interestOps |= Channel.OP_WRITE; - } else { - interestOps &= ~Channel.OP_WRITE; - } - } - } else { - interestOps &= ~Channel.OP_WRITE; - } - - return interestOps; - } - - int getRawInterestOps() { - return super.getInterestOps(); - } - - void setRawInterestOpsNow(int interestOps) { - super.setInterestOpsNow(interestOps); + InetSocketAddress getLocalSocketAddress() throws Exception { + return (InetSocketAddress) channel.socket().getLocalSocketAddress(); } @Override - public ChannelFuture write(Object message, SocketAddress remoteAddress) { - if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) { - return super.write(message, null); - } else { - return getUnsupportedOperationFuture(); - } - } - - private final class WriteRequestQueue extends AbstractWriteRequestQueue { - - private final ThreadLocalBoolean notifying = new ThreadLocalBoolean(); - - WriteRequestQueue() { - } - - @Override - public boolean offer(MessageEvent e) { - boolean success = queue.offer(e); - assert success; - - int messageSize = getMessageSize(e); - int newWriteBufferSize = writeBufferSize.addAndGet(messageSize); - int highWaterMark = getConfig().getWriteBufferHighWaterMark(); - - if (newWriteBufferSize >= highWaterMark) { - if (newWriteBufferSize - messageSize < highWaterMark) { - highWaterMarkCounter.incrementAndGet(); - if (!notifying.get()) { - notifying.set(Boolean.TRUE); - fireChannelInterestChanged(NioSocketChannel.this); - notifying.set(Boolean.FALSE); - } - } - } - return true; - } - - @Override - public MessageEvent poll() { - MessageEvent e = queue.poll(); - if (e != null) { - int messageSize = getMessageSize(e); - int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize); - int lowWaterMark = getConfig().getWriteBufferLowWaterMark(); - - if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) { - if (newWriteBufferSize + messageSize >= lowWaterMark) { - highWaterMarkCounter.decrementAndGet(); - if (isConnected() && !notifying.get()) { - notifying.set(Boolean.TRUE); - fireChannelInterestChanged(NioSocketChannel.this); - notifying.set(Boolean.FALSE); - } - } - } - } - return e; - } - - private int getMessageSize(MessageEvent e) { - Object m = e.getMessage(); - if (m instanceof ChannelBuffer) { - return ((ChannelBuffer) m).readableBytes(); - } - return 0; - } - } - - private final class WriteTask implements Runnable { - - WriteTask() { - } - - @Override - public void run() { - writeTaskInTaskQueue.set(false); - worker.writeFromTaskLoop(NioSocketChannel.this); - } + InetSocketAddress getRemoteSocketAddress() throws Exception { + return (InetSocketAddress) channel.socket().getRemoteSocketAddress(); } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java index 942d0d133d..e22e614da4 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java @@ -15,279 +15,38 @@ */ package io.netty.channel.socket.nio; -import static io.netty.channel.Channels.*; +import static io.netty.channel.Channels.fireChannelBound; +import static io.netty.channel.Channels.fireChannelConnected; +import static io.netty.channel.Channels.fireExceptionCaught; +import static io.netty.channel.Channels.fireMessageReceived; +import static io.netty.channel.Channels.succeededFuture; +import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ChannelBufferFactory; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ReceiveBufferSizePredictor; import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousCloseException; -import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; -import java.nio.channels.NotYetConnectedException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; -import java.util.Iterator; -import java.util.Queue; -import java.util.Set; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import io.netty.buffer.ChannelBuffer; -import io.netty.buffer.ChannelBufferFactory; -import io.netty.channel.Channel; -import io.netty.channel.ChannelException; -import io.netty.channel.ChannelFuture; -import io.netty.channel.MessageEvent; -import io.netty.channel.ReceiveBufferSizePredictor; -import io.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; -import io.netty.util.internal.DeadLockProofWorker; -import io.netty.util.internal.QueueFactory; - -class NioWorker implements Runnable { - - private static final InternalLogger logger = - InternalLoggerFactory.getInstance(NioWorker.class); - - private static final int CONSTRAINT_LEVEL = NioProviderMetadata.CONSTRAINT_LEVEL; - - static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization. - - private final Executor executor; - private boolean started; - private volatile Thread thread; - volatile Selector selector; - private final AtomicBoolean wakenUp = new AtomicBoolean(); - private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock(); - private final Object startStopLock = new Object(); - private final Queue registerTaskQueue = QueueFactory.createQueue(Runnable.class); - private final Queue writeTaskQueue = QueueFactory.createQueue(Runnable.class); - private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation +class NioWorker extends AbstractNioWorker { private final SocketReceiveBufferPool recvBufferPool = new SocketReceiveBufferPool(); - private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool(); NioWorker(Executor executor) { - this.executor = executor; + super(executor); } - void register(NioSocketChannel channel, ChannelFuture future) { - boolean server = !(channel instanceof NioClientSocketChannel); - Runnable registerTask = new RegisterTask(channel, future, server); - Selector selector; - - synchronized (startStopLock) { - if (!started) { - // Open a selector if this worker didn't start yet. - try { - this.selector = selector = Selector.open(); - } catch (Throwable t) { - throw new ChannelException( - "Failed to create a selector.", t); - } - - // Start the worker thread with the new Selector. - boolean success = false; - try { - DeadLockProofWorker.start(executor, this); - success = true; - } finally { - if (!success) { - // Release the Selector if the execution fails. - try { - selector.close(); - } catch (Throwable t) { - logger.warn("Failed to close a selector.", t); - } - this.selector = selector = null; - // The method will return to the caller at this point. - } - } - } else { - // Use the existing selector if this worker has been started. - selector = this.selector; - } - - assert selector != null && selector.isOpen(); - - started = true; - boolean offered = registerTaskQueue.offer(registerTask); - assert offered; - } - - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - } @Override - public void run() { - thread = Thread.currentThread(); - - boolean shutdown = false; - Selector selector = this.selector; - for (;;) { - wakenUp.set(false); - - if (CONSTRAINT_LEVEL != 0) { - selectorGuard.writeLock().lock(); - // This empty synchronization block prevents the selector - // from acquiring its lock. - selectorGuard.writeLock().unlock(); - } - - try { - SelectorUtil.select(selector); - - // 'wakenUp.compareAndSet(false, true)' is always evaluated - // before calling 'selector.wakeup()' to reduce the wake-up - // overhead. (Selector.wakeup() is an expensive operation.) - // - // However, there is a race condition in this approach. - // The race condition is triggered when 'wakenUp' is set to - // true too early. - // - // 'wakenUp' is set to true too early if: - // 1) Selector is waken up between 'wakenUp.set(false)' and - // 'selector.select(...)'. (BAD) - // 2) Selector is waken up between 'selector.select(...)' and - // 'if (wakenUp.get()) { ... }'. (OK) - // - // In the first case, 'wakenUp' is set to true and the - // following 'selector.select(...)' will wake up immediately. - // Until 'wakenUp' is set to false again in the next round, - // 'wakenUp.compareAndSet(false, true)' will fail, and therefore - // any attempt to wake up the Selector will fail, too, causing - // the following 'selector.select(...)' call to block - // unnecessarily. - // - // To fix this problem, we wake up the selector again if wakenUp - // is true immediately after selector.select(...). - // It is inefficient in that it wakes up the selector for both - // the first case (BAD - wake-up required) and the second case - // (OK - no wake-up required). - - if (wakenUp.get()) { - selector.wakeup(); - } - - cancelledKeys = 0; - processRegisterTaskQueue(); - processWriteTaskQueue(); - processSelectedKeys(selector.selectedKeys()); - - // Exit the loop when there's nothing to handle. - // The shutdown flag is used to delay the shutdown of this - // loop to avoid excessive Selector creation when - // connections are registered in a one-by-one manner instead of - // concurrent manner. - if (selector.keys().isEmpty()) { - if (shutdown || - executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) { - - synchronized (startStopLock) { - if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) { - started = false; - try { - selector.close(); - } catch (IOException e) { - logger.warn( - "Failed to close a selector.", e); - } finally { - this.selector = null; - } - break; - } else { - shutdown = false; - } - } - } else { - // Give one more second. - shutdown = true; - } - } else { - shutdown = false; - } - } catch (Throwable t) { - logger.warn( - "Unexpected exception in the selector loop.", t); - - // Prevent possible consecutive immediate failures that lead to - // excessive CPU consumption. - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // Ignore. - } - } - } - } - - private void processRegisterTaskQueue() throws IOException { - for (;;) { - final Runnable task = registerTaskQueue.poll(); - if (task == null) { - break; - } - - task.run(); - cleanUpCancelledKeys(); - } - } - - private void processWriteTaskQueue() throws IOException { - for (;;) { - final Runnable task = writeTaskQueue.poll(); - if (task == null) { - break; - } - - task.run(); - cleanUpCancelledKeys(); - } - } - - private void processSelectedKeys(Set selectedKeys) throws IOException { - for (Iterator i = selectedKeys.iterator(); i.hasNext();) { - SelectionKey k = i.next(); - i.remove(); - try { - int readyOps = k.readyOps(); - if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) { - if (!read(k)) { - // Connection already closed - no need to handle write. - continue; - } - } - if ((readyOps & SelectionKey.OP_WRITE) != 0) { - writeFromSelectorLoop(k); - } - } catch (CancelledKeyException e) { - close(k); - } - - if (cleanUpCancelledKeys()) { - break; // break the loop to avoid ConcurrentModificationException - } - } - } - - private boolean cleanUpCancelledKeys() throws IOException { - if (cancelledKeys >= CLEANUP_INTERVAL) { - cancelledKeys = 0; - selector.selectNow(); - return true; - } - return false; - } - - private boolean read(SelectionKey k) { + protected boolean read(SelectionKey k) { final SocketChannel ch = (SocketChannel) k.channel(); final NioSocketChannel channel = (NioSocketChannel) k.attachment(); @@ -343,47 +102,9 @@ class NioWorker implements Runnable { return true; } - private void close(SelectionKey k) { - NioSocketChannel ch = (NioSocketChannel) k.attachment(); - close(ch, succeededFuture(ch)); - } - void writeFromUserCode(final NioSocketChannel channel) { - if (!channel.isConnected()) { - cleanUpWriteBuffer(channel); - return; - } - - if (scheduleWriteIfNecessary(channel)) { - return; - } - - // From here, we are sure Thread.currentThread() == workerThread. - - if (channel.writeSuspended) { - return; - } - - if (channel.inWriteNowLoop) { - return; - } - - write0(channel); - } - - void writeFromTaskLoop(final NioSocketChannel ch) { - if (!ch.writeSuspended) { - write0(ch); - } - } - - void writeFromSelectorLoop(final SelectionKey k) { - NioSocketChannel ch = (NioSocketChannel) k.attachment(); - ch.writeSuspended = false; - write0(ch); - } - - private boolean scheduleWriteIfNecessary(final NioSocketChannel channel) { + @Override + protected boolean scheduleWriteIfNecessary(final AbstractNioChannel channel) { final Thread currentThread = Thread.currentThread(); final Thread workerThread = thread; if (currentThread != workerThread) { @@ -417,310 +138,13 @@ class NioWorker implements Runnable { return false; } - - private void write0(NioSocketChannel channel) { - boolean open = true; - boolean addOpWrite = false; - boolean removeOpWrite = false; - - long writtenBytes = 0; - - final SocketSendBufferPool sendBufferPool = this.sendBufferPool; - final SocketChannel ch = channel.socket; - final Queue writeBuffer = channel.writeBuffer; - final int writeSpinCount = channel.getConfig().getWriteSpinCount(); - synchronized (channel.writeLock) { - channel.inWriteNowLoop = true; - for (;;) { - MessageEvent evt = channel.currentWriteEvent; - SendBuffer buf; - if (evt == null) { - if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) { - removeOpWrite = true; - channel.writeSuspended = false; - break; - } - - channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage()); - } else { - buf = channel.currentWriteBuffer; - } - - ChannelFuture future = evt.getFuture(); - try { - long localWrittenBytes = 0; - for (int i = writeSpinCount; i > 0; i --) { - localWrittenBytes = buf.transferTo(ch); - if (localWrittenBytes != 0) { - writtenBytes += localWrittenBytes; - break; - } - if (buf.finished()) { - break; - } - } - - if (buf.finished()) { - // Successful write - proceed to the next message. - buf.release(); - channel.currentWriteEvent = null; - channel.currentWriteBuffer = null; - evt = null; - buf = null; - future.setSuccess(); - } else { - // Not written fully - perhaps the kernel buffer is full. - addOpWrite = true; - channel.writeSuspended = true; - - if (localWrittenBytes > 0) { - // Notify progress listeners if necessary. - future.setProgress( - localWrittenBytes, - buf.writtenBytes(), buf.totalBytes()); - } - break; - } - } catch (AsynchronousCloseException e) { - // Doesn't need a user attention - ignore. - } catch (Throwable t) { - if (buf != null) { - buf.release(); - } - channel.currentWriteEvent = null; - channel.currentWriteBuffer = null; - buf = null; - evt = null; - future.setFailure(t); - fireExceptionCaught(channel, t); - if (t instanceof IOException) { - open = false; - close(channel, succeededFuture(channel)); - } - } - } - channel.inWriteNowLoop = false; - - // Initially, the following block was executed after releasing - // the writeLock, but there was a race condition, and it has to be - // executed before releasing the writeLock: - // - // https://issues.jboss.org/browse/NETTY-410 - // - if (open) { - if (addOpWrite) { - setOpWrite(channel); - } else if (removeOpWrite) { - clearOpWrite(channel); - } - } - } - - fireWriteComplete(channel, writtenBytes); + + @Override + protected Runnable createRegisterTask(AbstractNioChannel channel, ChannelFuture future) { + boolean server = !(channel instanceof NioClientSocketChannel); + return new RegisterTask((NioSocketChannel) channel, future, server); } - - private void setOpWrite(NioSocketChannel channel) { - Selector selector = this.selector; - SelectionKey key = channel.socket.keyFor(selector); - if (key == null) { - return; - } - if (!key.isValid()) { - close(key); - return; - } - - // interestOps can change at any time and at any thread. - // Acquire a lock to avoid possible race condition. - synchronized (channel.interestOpsLock) { - int interestOps = channel.getRawInterestOps(); - if ((interestOps & SelectionKey.OP_WRITE) == 0) { - interestOps |= SelectionKey.OP_WRITE; - key.interestOps(interestOps); - channel.setRawInterestOpsNow(interestOps); - } - } - } - - private void clearOpWrite(NioSocketChannel channel) { - Selector selector = this.selector; - SelectionKey key = channel.socket.keyFor(selector); - if (key == null) { - return; - } - if (!key.isValid()) { - close(key); - return; - } - - // interestOps can change at any time and at any thread. - // Acquire a lock to avoid possible race condition. - synchronized (channel.interestOpsLock) { - int interestOps = channel.getRawInterestOps(); - if ((interestOps & SelectionKey.OP_WRITE) != 0) { - interestOps &= ~SelectionKey.OP_WRITE; - key.interestOps(interestOps); - channel.setRawInterestOpsNow(interestOps); - } - } - } - - void close(NioSocketChannel channel, ChannelFuture future) { - boolean connected = channel.isConnected(); - boolean bound = channel.isBound(); - try { - channel.socket.close(); - cancelledKeys ++; - - if (channel.setClosed()) { - future.setSuccess(); - if (connected) { - fireChannelDisconnected(channel); - } - if (bound) { - fireChannelUnbound(channel); - } - - cleanUpWriteBuffer(channel); - fireChannelClosed(channel); - } else { - future.setSuccess(); - } - } catch (Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } - } - - private void cleanUpWriteBuffer(NioSocketChannel channel) { - Exception cause = null; - boolean fireExceptionCaught = false; - - // Clean up the stale messages in the write buffer. - synchronized (channel.writeLock) { - MessageEvent evt = channel.currentWriteEvent; - if (evt != null) { - // Create the exception only once to avoid the excessive overhead - // caused by fillStackTrace. - if (channel.isOpen()) { - cause = new NotYetConnectedException(); - } else { - cause = new ClosedChannelException(); - } - - ChannelFuture future = evt.getFuture(); - channel.currentWriteBuffer.release(); - channel.currentWriteBuffer = null; - channel.currentWriteEvent = null; - evt = null; - future.setFailure(cause); - fireExceptionCaught = true; - } - - Queue writeBuffer = channel.writeBuffer; - if (!writeBuffer.isEmpty()) { - // Create the exception only once to avoid the excessive overhead - // caused by fillStackTrace. - if (cause == null) { - if (channel.isOpen()) { - cause = new NotYetConnectedException(); - } else { - cause = new ClosedChannelException(); - } - } - - for (;;) { - evt = writeBuffer.poll(); - if (evt == null) { - break; - } - evt.getFuture().setFailure(cause); - fireExceptionCaught = true; - } - } - } - - if (fireExceptionCaught) { - fireExceptionCaught(channel, cause); - } - } - - void setInterestOps( - NioSocketChannel channel, ChannelFuture future, int interestOps) { - boolean changed = false; - try { - // interestOps can change at any time and at any thread. - // Acquire a lock to avoid possible race condition. - synchronized (channel.interestOpsLock) { - Selector selector = this.selector; - SelectionKey key = channel.socket.keyFor(selector); - - if (key == null || selector == null) { - // Not registered to the worker yet. - // Set the rawInterestOps immediately; RegisterTask will pick it up. - channel.setRawInterestOpsNow(interestOps); - return; - } - - // Override OP_WRITE flag - a user cannot change this flag. - interestOps &= ~Channel.OP_WRITE; - interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE; - - switch (CONSTRAINT_LEVEL) { - case 0: - if (channel.getRawInterestOps() != interestOps) { - key.interestOps(interestOps); - if (Thread.currentThread() != thread && - wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - changed = true; - } - break; - case 1: - case 2: - if (channel.getRawInterestOps() != interestOps) { - if (Thread.currentThread() == thread) { - key.interestOps(interestOps); - changed = true; - } else { - selectorGuard.readLock().lock(); - try { - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - key.interestOps(interestOps); - changed = true; - } finally { - selectorGuard.readLock().unlock(); - } - } - } - break; - default: - throw new Error(); - } - - if (changed) { - channel.setRawInterestOpsNow(interestOps); - } - } - - future.setSuccess(); - if (changed) { - fireChannelInterestChanged(channel); - } - } catch (CancelledKeyException e) { - // setInterestOps() was called on a closed channel. - ClosedChannelException cce = new ClosedChannelException(); - future.setFailure(cce); - fireExceptionCaught(channel, cce); - } catch (Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } - } - + private final class RegisterTask implements Runnable { private final NioSocketChannel channel; private final ChannelFuture future; @@ -738,6 +162,7 @@ class NioWorker implements Runnable { public void run() { SocketAddress localAddress = channel.getLocalAddress(); SocketAddress remoteAddress = channel.getRemoteAddress(); + if (localAddress == null || remoteAddress == null) { if (future != null) { future.setFailure(new ClosedChannelException()); @@ -748,11 +173,11 @@ class NioWorker implements Runnable { try { if (server) { - channel.socket.configureBlocking(false); + channel.channel.configureBlocking(false); } synchronized (channel.interestOpsLock) { - channel.socket.register( + channel.channel.register( selector, channel.getRawInterestOps(), channel); } if (future != null) { @@ -776,4 +201,5 @@ class NioWorker implements Runnable { fireChannelConnected(channel, remoteAddress); } } + } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java index 1d7106b4c7..7447254cde 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java @@ -26,7 +26,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelSink; -abstract class AbstractOioChannel extends AbstractChannel{ +abstract class AbstractOioChannel extends AbstractChannel { private volatile InetSocketAddress localAddress; volatile InetSocketAddress remoteAddress; volatile Thread workerThread; diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java index 9f55095fa0..a73a24c503 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java @@ -134,12 +134,12 @@ final class OioDatagramChannel extends AbstractOioChannel } @Override - InetSocketAddress getLocalSocketAddress() throws Exception{ + InetSocketAddress getLocalSocketAddress() throws Exception { return (InetSocketAddress) socket.getLocalSocketAddress(); } @Override - InetSocketAddress getRemoteSocketAddress() throws Exception{ + InetSocketAddress getRemoteSocketAddress() throws Exception { return (InetSocketAddress) socket.getRemoteSocketAddress(); }