From 9e6f8b46df8cd067306e2f563e2d35e3180f8821 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Wed, 2 May 2012 15:01:58 +0900 Subject: [PATCH] Retrofit the NIO transport with the new API / improve the new API - Remove the classes and properties that are not necessary anymore - Remove SingleThreadEventLoop.newRegistrationTask() and let Channel.Unsafe handle registration by itself - Channel.Unsafe.localAddress() and remoteAddress() - JdkChannel is replaced by Channel.Unsafe. --- .../io/netty/channel/AbstractChannel.java | 14 + .../main/java/io/netty/channel/Channel.java | 3 + .../netty/channel/SingleThreadEventLoop.java | 15 +- .../socket/nio/AbstractJdkChannel.java | 73 ---- .../socket/nio/AbstractNioChannel.java | 341 ++--------------- .../socket/nio/AbstractNioChannelSink.java | 52 --- .../socket/nio/AbstractNioWorkerPool.java | 83 ----- .../nio/DefaultNioDatagramChannelConfig.java | 134 +++---- .../nio/DefaultNioSocketChannelConfig.java | 134 +------ .../netty/channel/socket/nio/JdkChannel.java | 53 --- .../channel/socket/nio/NioChannelConfig.java | 38 +- .../socket/nio/NioSocketChannelConfig.java | 50 +-- .../channel/socket/nio/SelectorEventLoop.java | 78 ++-- .../channel/socket/nio/SendBufferPool.java | 346 ------------------ .../socket/nio/ShareableWorkerPool.java | 48 --- .../netty/channel/socket/nio/WorkerPool.java | 35 -- 16 files changed, 142 insertions(+), 1355 deletions(-) delete mode 100644 transport/src/main/java/io/netty/channel/socket/nio/AbstractJdkChannel.java delete mode 100644 transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java delete mode 100644 transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorkerPool.java delete mode 100644 transport/src/main/java/io/netty/channel/socket/nio/JdkChannel.java delete mode 100644 transport/src/main/java/io/netty/channel/socket/nio/SendBufferPool.java delete mode 100644 transport/src/main/java/io/netty/channel/socket/nio/ShareableWorkerPool.java delete mode 100644 transport/src/main/java/io/netty/channel/socket/nio/WorkerPool.java diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index f3dd8ff045..88c8d1ae28 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -359,6 +359,17 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return firstOut(); } + @Override + public SocketAddress localAddress() { + return localAddress0(); + } + + @Override + public SocketAddress remoteAddress() { + // TODO Auto-generated method stub + return remoteAddress0(); + } + @Override public void register(EventLoop eventLoop, ChannelFuture future) { if (eventLoop == null) { @@ -476,6 +487,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha protected abstract java.nio.channels.Channel javaChannel(); protected abstract ChannelBufferHolder firstOut(); + protected abstract SocketAddress localAddress0(); + protected abstract SocketAddress remoteAddress0(); + protected abstract void doRegister(ChannelFuture future); protected abstract void doBind(SocketAddress localAddress, ChannelFuture future); protected abstract void doConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future); diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index 8935014d39..bab0aaf291 100644 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -184,6 +184,9 @@ public interface Channel extends AttributeMap, ChannelFutureFactory, Comparable< java.nio.channels.Channel ch(); ChannelBufferHolder out(); + SocketAddress localAddress(); + SocketAddress remoteAddress(); + void register(EventLoop eventLoop, ChannelFuture future); void bind(SocketAddress localAddress, ChannelFuture future); void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future); diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index 47e765bf92..2de1538ed1 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -57,8 +57,17 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl } @Override - public EventLoop register(Channel channel, ChannelFuture future) { - execute(newRegistrationTask(channel, future)); + public EventLoop register(final Channel channel, final ChannelFuture future) { + if (inEventLoop()) { + channel.unsafe().register(this, future); + } else { + execute(new Runnable() { + @Override + public void run() { + channel.unsafe().register(SingleThreadEventLoop.this, future); + } + }); + } return this; } @@ -111,8 +120,6 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl protected abstract void wakeup(boolean inEventLoop); - protected abstract Runnable newRegistrationTask(Channel channel, ChannelFuture future); - @Override public boolean inEventLoop() { return Thread.currentThread() == thread; diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractJdkChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractJdkChannel.java deleted file mode 100644 index ed639a8add..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractJdkChannel.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import java.io.IOException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.spi.AbstractSelectableChannel; - -public abstract class AbstractJdkChannel implements JdkChannel { - - final AbstractSelectableChannel channel; - - protected AbstractJdkChannel(AbstractSelectableChannel channel) { - this.channel = channel; - } - - protected AbstractSelectableChannel getChannel() { - return channel; - } - - @Override - public boolean isOpen() { - return channel.isOpen(); - } - - @Override - public void close() throws IOException { - channel.close(); - } - - @Override - public SelectionKey keyFor(Selector selector) { - return channel.keyFor(selector); - } - - @Override - public SelectionKey register(Selector selector, int interestedOps, Object attachment) throws ClosedChannelException { - return channel.register(selector, interestedOps, attachment); - } - - @Override - public boolean isRegistered() { - return channel.isRegistered(); - } - - - @Override - public void configureBlocking(boolean block) throws IOException { - channel.configureBlocking(block); - } - - - @Override - public boolean finishConnect() throws IOException { - return true; - } - -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java index 71259fdc72..d34bcee481 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java @@ -15,59 +15,21 @@ */ package io.netty.channel.socket.nio; -import static io.netty.channel.Channels.fireChannelInterestChanged; -import io.netty.buffer.ChannelBuffer; import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; -import io.netty.channel.ChannelFactory; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelSink; -import io.netty.channel.MessageEvent; -import io.netty.channel.socket.nio.SendBufferPool.SendBuffer; -import io.netty.util.internal.QueueFactory; -import io.netty.util.internal.ThreadLocalBoolean; import java.net.InetSocketAddress; -import java.util.Collection; -import java.util.Iterator; -import java.util.Queue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; +import java.nio.channels.SelectableChannel; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -public abstract class AbstractNioChannel extends AbstractChannel implements NioChannel { - - /** - * The {@link SelectorEventLoop}. - */ - private final SelectorEventLoop worker; - - /** - * Monitor object to synchronize access to InterestedOps. - */ - protected final Object interestOpsLock = new Object(); - - /** - * Monitor object for synchronizing access to the {@link WriteRequestQueue}. - */ - protected final Object writeLock = new Object(); - - /** - * WriteTask that performs write operations. - */ - final Runnable writeTask = new WriteTask(); +public abstract class AbstractNioChannel extends AbstractChannel { /** * Indicates if there is a {@link WriteTask} in the task queue. */ final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean(); - /** - * Queue of write {@link MessageEvent}s. - */ - protected final Queue writeBufferQueue = createRequestQueue(); - /** * Keeps track of the number of bytes that the {@link WriteRequestQueue} currently * contains. @@ -79,59 +41,40 @@ public abstract class AbstractNioChannel extends AbstractChannel implements NioC */ final AtomicInteger highWaterMarkCounter = new AtomicInteger(); - /** - * The current write {@link MessageEvent} - */ - protected MessageEvent currentWriteEvent; - protected SendBuffer currentWriteBuffer; - /** * Boolean that indicates that write operation is in progress. */ protected boolean inWriteNowLoop; protected boolean writeSuspended; - + private volatile InetSocketAddress localAddress; volatile InetSocketAddress remoteAddress; - - private final JdkChannel channel; - - protected AbstractNioChannel(Integer id, Channel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, SelectorEventLoop worker, JdkChannel ch) { - super(id, parent, factory, pipeline, sink); - this.worker = worker; - this.channel = ch; - } - - protected AbstractNioChannel( - Channel parent, ChannelFactory factory, - ChannelPipeline pipeline, ChannelSink sink, SelectorEventLoop worker, JdkChannel ch) { - super(parent, factory, pipeline, sink); - this.worker = worker; - this.channel = ch; + + private final SelectableChannel ch; + + protected AbstractNioChannel(Integer id, Channel parent, SelectableChannel ch) { + super(id, parent); + this.ch = ch; } - protected JdkChannel getJdkChannel() { - return channel; + protected AbstractNioChannel(Channel parent, SelectableChannel ch) { + super(parent); + this.ch = ch; } - - /** - * Return the {@link SelectorEventLoop} that handle the IO of the {@link AbstractNioChannel} - * - * @return worker - */ - public SelectorEventLoop getWorker() { - return worker; - } - - + @Override - public InetSocketAddress getLocalAddress() { + protected SelectableChannel javaChannel() { + return ch; + } + + @Override + public InetSocketAddress localAddress() { InetSocketAddress localAddress = this.localAddress; if (localAddress == null) { try { this.localAddress = localAddress = - (InetSocketAddress) channel.getLocalSocketAddress(); + (InetSocketAddress) unsafe().localAddress(); } catch (Throwable t) { // Sometimes fails on a closed socket in Windows. return null; @@ -141,12 +84,12 @@ public abstract class AbstractNioChannel extends AbstractChannel implements NioC } @Override - public InetSocketAddress getRemoteAddress() { + public InetSocketAddress remoteAddress() { InetSocketAddress remoteAddress = this.remoteAddress; if (remoteAddress == null) { try { this.remoteAddress = remoteAddress = - (InetSocketAddress) channel.getRemoteSocketAddress(); + (InetSocketAddress) unsafe().remoteAddress(); } catch (Throwable t) { // Sometimes fails on a closed socket in Windows. return null; @@ -154,245 +97,7 @@ public abstract class AbstractNioChannel extends AbstractChannel implements NioC } return remoteAddress; } - - @Override - public abstract NioChannelConfig getConfig(); - - int getRawInterestOps() { - return super.getInterestOps(); - } - - void setRawInterestOpsNow(int interestOps) { - super.setInterestOpsNow(interestOps); - } @Override - public int getInterestOps() { - if (!isOpen()) { - return Channel.OP_WRITE; - } - - int interestOps = getRawInterestOps(); - int writeBufferSize = this.writeBufferSize.get(); - if (writeBufferSize != 0) { - if (highWaterMarkCounter.get() > 0) { - int lowWaterMark = getConfig().getWriteBufferLowWaterMark(); - if (writeBufferSize >= lowWaterMark) { - interestOps |= Channel.OP_WRITE; - } else { - interestOps &= ~Channel.OP_WRITE; - } - } else { - int highWaterMark = getConfig().getWriteBufferHighWaterMark(); - if (writeBufferSize >= highWaterMark) { - interestOps |= Channel.OP_WRITE; - } else { - interestOps &= ~Channel.OP_WRITE; - } - } - } else { - interestOps &= ~Channel.OP_WRITE; - } - - return interestOps; - } - - @Override - protected boolean setClosed() { - return super.setClosed(); - } - - protected WriteRequestQueue createRequestQueue() { - return new WriteRequestQueue(); - } - - public class WriteRequestQueue implements BlockingQueue { - private final ThreadLocalBoolean notifying = new ThreadLocalBoolean(); - - private final BlockingQueue queue; - - public WriteRequestQueue() { - this.queue = QueueFactory.createQueue(MessageEvent.class); - } - - @Override - public MessageEvent remove() { - return queue.remove(); - } - - @Override - public MessageEvent element() { - return queue.element(); - } - - @Override - public MessageEvent peek() { - return queue.peek(); - } - - @Override - public int size() { - return queue.size(); - } - - @Override - public boolean isEmpty() { - return queue.isEmpty(); - } - - @Override - public Iterator iterator() { - return queue.iterator(); - } - - @Override - public Object[] toArray() { - return queue.toArray(); - } - - @Override - public T[] toArray(T[] a) { - return queue.toArray(a); - } - - @Override - public boolean containsAll(Collection c) { - return queue.containsAll(c); - } - - @Override - public boolean addAll(Collection c) { - return queue.addAll(c); - } - - @Override - public boolean removeAll(Collection c) { - return queue.removeAll(c); - } - - @Override - public boolean retainAll(Collection c) { - return queue.retainAll(c); - } - - @Override - public void clear() { - queue.clear(); - } - - @Override - public boolean add(MessageEvent e) { - return queue.add(e); - } - - @Override - public void put(MessageEvent e) throws InterruptedException { - queue.put(e); - } - - @Override - public boolean offer(MessageEvent e, long timeout, TimeUnit unit) throws InterruptedException { - return queue.offer(e, timeout, unit); - } - - @Override - public MessageEvent take() throws InterruptedException { - return queue.take(); - } - - @Override - public MessageEvent poll(long timeout, TimeUnit unit) throws InterruptedException { - return queue.poll(timeout, unit); - } - - @Override - public int remainingCapacity() { - return queue.remainingCapacity(); - } - - @Override - public boolean remove(Object o) { - return queue.remove(o); - } - - @Override - public boolean contains(Object o) { - return queue.contains(o); - } - - @Override - public int drainTo(Collection c) { - return queue.drainTo(c); - } - - @Override - public int drainTo(Collection c, int maxElements) { - return queue.drainTo(c, maxElements); - } - - @Override - public boolean offer(MessageEvent e) { - boolean success = queue.offer(e); - assert success; - - int messageSize = getMessageSize(e); - int newWriteBufferSize = writeBufferSize.addAndGet(messageSize); - int highWaterMark = getConfig().getWriteBufferHighWaterMark(); - - if (newWriteBufferSize >= highWaterMark) { - if (newWriteBufferSize - messageSize < highWaterMark) { - highWaterMarkCounter.incrementAndGet(); - if (!notifying.get()) { - notifying.set(Boolean.TRUE); - fireChannelInterestChanged(AbstractNioChannel.this); - notifying.set(Boolean.FALSE); - } - } - } - return true; - } - - @Override - public MessageEvent poll() { - MessageEvent e = queue.poll(); - if (e != null) { - int messageSize = getMessageSize(e); - int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize); - int lowWaterMark = getConfig().getWriteBufferLowWaterMark(); - - if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) { - if (newWriteBufferSize + messageSize >= lowWaterMark) { - highWaterMarkCounter.decrementAndGet(); - if (isConnected() && !notifying.get()) { - notifying.set(Boolean.TRUE); - fireChannelInterestChanged(AbstractNioChannel.this); - notifying.set(Boolean.FALSE); - } - } - } - } - return e; - } - - protected int getMessageSize(MessageEvent e) { - Object m = e.getMessage(); - if (m instanceof ChannelBuffer) { - return ((ChannelBuffer) m).readableBytes(); - } - return 0; - } - } - - private final class WriteTask implements Runnable { - - WriteTask() { - } - - @Override - public void run() { - writeTaskInTaskQueue.set(false); - worker.writeFromTaskLoop(AbstractNioChannel.this); - } - } - + public abstract NioChannelConfig config(); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java deleted file mode 100644 index 5a7eff8a9c..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.channel.socket.nio; - -import io.netty.channel.AbstractChannelSink; -import io.netty.channel.Channel; -import io.netty.channel.ChannelEvent; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.socket.ChannelRunnableWrapper; - -public abstract class AbstractNioChannelSink extends AbstractChannelSink { - - @Override - public ChannelFuture execute(ChannelPipeline pipeline, final Runnable task) { - Channel ch = pipeline.channel(); - if (ch instanceof AbstractNioChannel) { - AbstractNioChannel channel = (AbstractNioChannel) ch; - ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.channel(), task); - channel.getWorker().executeInIoThread(wrapper); - return wrapper; - } - return super.execute(pipeline, task); - - - } - - @Override - protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) { - Channel channel = event.getChannel(); - boolean fireLater = false; - if (channel instanceof AbstractNioChannel) { - fireLater = !((AbstractNioChannel) channel).getWorker().isIoThread(); - } - return fireLater; - } - -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorkerPool.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorkerPool.java deleted file mode 100644 index fb64811e52..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorkerPool.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.channel.socket.nio; - -import io.netty.channel.Channel; -import io.netty.channel.socket.Worker; -import io.netty.util.ExternalResourceReleasable; -import io.netty.util.internal.ExecutorUtil; - -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Abstract base class for {@link WorkerPool} implementations that create the {@link Worker}'s up-front and return them in a "fair" fashion when calling - * {@link #nextWorker()} - * - */ -public abstract class AbstractNioWorkerPool implements WorkerPool , ExternalResourceReleasable { - - private final SelectorEventLoop[] workers; - private final AtomicInteger workerIndex = new AtomicInteger(); - private final Executor workerExecutor; - - /** - * Create a new instance - * - * @param workerExecutor the {@link Executor} to use for the {@link Worker}'s - * @param allowShutdownOnIdle allow the {@link Worker}'s to shutdown when there is not {@link Channel} is registered with it - * @param workerCount the count of {@link Worker}'s to create - */ - protected AbstractNioWorkerPool(Executor workerExecutor, int workerCount, boolean allowShutDownOnIdle) { - if (workerExecutor == null) { - throw new NullPointerException("workerExecutor"); - } - if (workerCount <= 0) { - throw new IllegalArgumentException( - "workerCount (" + workerCount + ") " + - "must be a positive integer."); - } - workers = new SelectorEventLoop[workerCount]; - - for (int i = 0; i < workers.length; i++) { - workers[i] = createWorker(workerExecutor, allowShutDownOnIdle); - } - this.workerExecutor = workerExecutor; - - } - - /** - * Create a new {@link Worker} which uses the given {@link Executor} to service IO - * - * - * @param executor the {@link Executor} to use - * @param allowShutdownOnIdle allow the {@link Worker} to shutdown when there is not {@link Channel} is registered with it - * @return worker the new {@link Worker} - */ - protected abstract E createWorker(Executor executor, boolean allowShutdownOnIdle); - - @SuppressWarnings("unchecked") - public E nextWorker() { - return (E) workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]; - } - - @Override - public void releaseExternalResources() { - ExecutorUtil.terminate(workerExecutor); - } - -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/DefaultNioDatagramChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/nio/DefaultNioDatagramChannelConfig.java index 8a329b546f..ba06b45222 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/DefaultNioDatagramChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/DefaultNioDatagramChannelConfig.java @@ -17,16 +17,12 @@ package io.netty.channel.socket.nio; import io.netty.channel.ChannelException; import io.netty.channel.socket.DefaultDatagramChannelConfig; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; import io.netty.util.internal.ConversionUtil; import io.netty.util.internal.DetectionUtil; -import java.io.IOException; +import java.lang.reflect.Method; import java.net.NetworkInterface; -import java.net.StandardSocketOptions; import java.nio.channels.DatagramChannel; -import java.util.Map; /** * The default {@link NioSocketChannelConfig} implementation. @@ -34,48 +30,61 @@ import java.util.Map; class DefaultNioDatagramChannelConfig extends DefaultDatagramChannelConfig implements NioDatagramChannelConfig { - private static final InternalLogger logger = - InternalLoggerFactory - .getInstance(DefaultNioDatagramChannelConfig.class); + private static final Object IP_MULTICAST_IF; + private static final Method GET_OPTION; + private static final Method SET_OPTION; - private volatile int writeBufferHighWaterMark = 64 * 1024; - private volatile int writeBufferLowWaterMark = 32 * 1024; - private volatile int writeSpinCount = 16; + static { + ClassLoader classLoader = DatagramChannel.class.getClassLoader(); + Class socketOptionType = null; + try { + socketOptionType = Class.forName("java.net.SocketOption", true, classLoader); + } catch (Exception e) { + // Not Java 7+ + } + + Object ipMulticastIf = null; + if (socketOptionType != null) { + try { + ipMulticastIf = Class.forName("java.net.StandardSocketOptions", true, classLoader).getDeclaredField("IP_MULTICAST_IF").get(null); + } catch (Exception e) { + throw new Error("cannot locate the IP_MULTICAST_IF field", e); + } + } + IP_MULTICAST_IF = ipMulticastIf; + + Method getOption; + try { + getOption = DatagramChannel.class.getDeclaredMethod("getOption", socketOptionType); + } catch (Exception e) { + throw new Error("cannot locate the getOption() method", e); + } + GET_OPTION = getOption; + + Method setOption; + try { + setOption = DatagramChannel.class.getDeclaredMethod("setOption", socketOptionType, Object.class); + } catch (Exception e) { + throw new Error("cannot locate the setOption() method", e); + } + SET_OPTION = setOption; + } private final DatagramChannel channel; + private volatile int writeSpinCount = 16; DefaultNioDatagramChannelConfig(DatagramChannel channel) { super(channel.socket()); this.channel = channel; } - @Override - public void setOptions(Map options) { - super.setOptions(options); - if (getWriteBufferHighWaterMark() < getWriteBufferLowWaterMark()) { - // Recover the integrity of the configuration with a sensible value. - setWriteBufferLowWaterMark0(getWriteBufferHighWaterMark() >>> 1); - if (logger.isWarnEnabled()) { - // Notify the user about misconfiguration. - logger.warn("writeBufferLowWaterMark cannot be greater than " - + "writeBufferHighWaterMark; setting to the half of the " - + "writeBufferHighWaterMark."); - } - - } - } - @Override public boolean setOption(String key, Object value) { if (super.setOption(key, value)) { return true; } - if (key.equals("writeBufferHighWaterMark")) { - setWriteBufferHighWaterMark0(ConversionUtil.toInt(value)); - } else if (key.equals("writeBufferLowWaterMark")) { - setWriteBufferLowWaterMark0(ConversionUtil.toInt(value)); - } else if (key.equals("writeSpinCount")) { + if (key.equals("writeSpinCount")) { setWriteSpinCount(ConversionUtil.toInt(value)); } else { return false; @@ -83,56 +92,6 @@ class DefaultNioDatagramChannelConfig extends DefaultDatagramChannelConfig return true; } - @Override - public int getWriteBufferHighWaterMark() { - return writeBufferHighWaterMark; - } - - @Override - public void setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { - if (writeBufferHighWaterMark < getWriteBufferLowWaterMark()) { - throw new IllegalArgumentException( - "writeBufferHighWaterMark cannot be less than " + - "writeBufferLowWaterMark (" + - getWriteBufferLowWaterMark() + "): " + - writeBufferHighWaterMark); - } - setWriteBufferHighWaterMark0(writeBufferHighWaterMark); - } - - private void setWriteBufferHighWaterMark0(int writeBufferHighWaterMark) { - if (writeBufferHighWaterMark < 0) { - throw new IllegalArgumentException("writeBufferHighWaterMark: " + - writeBufferHighWaterMark); - } - this.writeBufferHighWaterMark = writeBufferHighWaterMark; - } - - @Override - public int getWriteBufferLowWaterMark() { - return writeBufferLowWaterMark; - } - - @Override - public void setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { - if (writeBufferLowWaterMark > getWriteBufferHighWaterMark()) { - throw new IllegalArgumentException( - "writeBufferLowWaterMark cannot be greater than " + - "writeBufferHighWaterMark (" + - getWriteBufferHighWaterMark() + "): " + - writeBufferLowWaterMark); - } - setWriteBufferLowWaterMark0(writeBufferLowWaterMark); - } - - private void setWriteBufferLowWaterMark0(int writeBufferLowWaterMark) { - if (writeBufferLowWaterMark < 0) { - throw new IllegalArgumentException("writeBufferLowWaterMark: " + - writeBufferLowWaterMark); - } - this.writeBufferLowWaterMark = writeBufferLowWaterMark; - } - @Override public int getWriteSpinCount() { return writeSpinCount; @@ -146,15 +105,15 @@ class DefaultNioDatagramChannelConfig extends DefaultDatagramChannelConfig } this.writeSpinCount = writeSpinCount; } - + @Override public void setNetworkInterface(NetworkInterface networkInterface) { if (DetectionUtil.javaVersion() < 7) { throw new UnsupportedOperationException(); } else { try { - channel.setOption(StandardSocketOptions.IP_MULTICAST_IF, networkInterface); - } catch (IOException e) { + SET_OPTION.invoke(channel, IP_MULTICAST_IF, networkInterface); + } catch (Exception e) { throw new ChannelException(e); } } @@ -166,11 +125,10 @@ class DefaultNioDatagramChannelConfig extends DefaultDatagramChannelConfig throw new UnsupportedOperationException(); } else { try { - return (NetworkInterface) channel.getOption(StandardSocketOptions.IP_MULTICAST_IF); - } catch (IOException e) { + return (NetworkInterface) GET_OPTION.invoke(channel, IP_MULTICAST_IF); + } catch (Exception e) { throw new ChannelException(e); } } } - } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/DefaultNioSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/nio/DefaultNioSocketChannelConfig.java index b637f012ae..e218b46242 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/DefaultNioSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/DefaultNioSocketChannelConfig.java @@ -15,127 +15,37 @@ */ package io.netty.channel.socket.nio; -import java.net.Socket; -import java.util.Map; - -import io.netty.channel.AdaptiveReceiveBufferSizePredictorFactory; -import io.netty.channel.ChannelException; -import io.netty.channel.ReceiveBufferSizePredictor; -import io.netty.channel.ReceiveBufferSizePredictorFactory; import io.netty.channel.socket.DefaultSocketChannelConfig; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; import io.netty.util.internal.ConversionUtil; +import java.net.Socket; + /** * The default {@link NioSocketChannelConfig} implementation. */ class DefaultNioSocketChannelConfig extends DefaultSocketChannelConfig implements NioSocketChannelConfig { - private static final InternalLogger logger = - InternalLoggerFactory.getInstance(DefaultNioSocketChannelConfig.class); - - private static final ReceiveBufferSizePredictorFactory DEFAULT_PREDICTOR_FACTORY = - new AdaptiveReceiveBufferSizePredictorFactory(); - - private volatile int writeBufferHighWaterMark = 64 * 1024; - private volatile int writeBufferLowWaterMark = 32 * 1024; - private volatile ReceiveBufferSizePredictor predictor; - private volatile ReceiveBufferSizePredictorFactory predictorFactory = DEFAULT_PREDICTOR_FACTORY; private volatile int writeSpinCount = 16; DefaultNioSocketChannelConfig(Socket socket) { super(socket); } - @Override - public void setOptions(Map options) { - super.setOptions(options); - if (getWriteBufferHighWaterMark() < getWriteBufferLowWaterMark()) { - // Recover the integrity of the configuration with a sensible value. - setWriteBufferLowWaterMark0(getWriteBufferHighWaterMark() >>> 1); - if (logger.isWarnEnabled()) { - // Notify the user about misconfiguration. - logger.warn( - "writeBufferLowWaterMark cannot be greater than " + - "writeBufferHighWaterMark; setting to the half of the " + - "writeBufferHighWaterMark."); - } - - } - } - @Override public boolean setOption(String key, Object value) { if (super.setOption(key, value)) { return true; } - if (key.equals("writeBufferHighWaterMark")) { - setWriteBufferHighWaterMark0(ConversionUtil.toInt(value)); - } else if (key.equals("writeBufferLowWaterMark")) { - setWriteBufferLowWaterMark0(ConversionUtil.toInt(value)); - } else if (key.equals("writeSpinCount")) { + if (key.equals("writeSpinCount")) { setWriteSpinCount(ConversionUtil.toInt(value)); - } else if (key.equals("receiveBufferSizePredictorFactory")) { - setReceiveBufferSizePredictorFactory((ReceiveBufferSizePredictorFactory) value); - } else if (key.equals("receiveBufferSizePredictor")) { - setReceiveBufferSizePredictor((ReceiveBufferSizePredictor) value); } else { return false; } return true; } - @Override - public int getWriteBufferHighWaterMark() { - return writeBufferHighWaterMark; - } - - @Override - public void setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { - if (writeBufferHighWaterMark < getWriteBufferLowWaterMark()) { - throw new IllegalArgumentException( - "writeBufferHighWaterMark cannot be less than " + - "writeBufferLowWaterMark (" + getWriteBufferLowWaterMark() + "): " + - writeBufferHighWaterMark); - } - setWriteBufferHighWaterMark0(writeBufferHighWaterMark); - } - - private void setWriteBufferHighWaterMark0(int writeBufferHighWaterMark) { - if (writeBufferHighWaterMark < 0) { - throw new IllegalArgumentException( - "writeBufferHighWaterMark: " + writeBufferHighWaterMark); - } - this.writeBufferHighWaterMark = writeBufferHighWaterMark; - } - - @Override - public int getWriteBufferLowWaterMark() { - return writeBufferLowWaterMark; - } - - @Override - public void setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { - if (writeBufferLowWaterMark > getWriteBufferHighWaterMark()) { - throw new IllegalArgumentException( - "writeBufferLowWaterMark cannot be greater than " + - "writeBufferHighWaterMark (" + getWriteBufferHighWaterMark() + "): " + - writeBufferLowWaterMark); - } - setWriteBufferLowWaterMark0(writeBufferLowWaterMark); - } - - private void setWriteBufferLowWaterMark0(int writeBufferLowWaterMark) { - if (writeBufferLowWaterMark < 0) { - throw new IllegalArgumentException( - "writeBufferLowWaterMark: " + writeBufferLowWaterMark); - } - this.writeBufferLowWaterMark = writeBufferLowWaterMark; - } - @Override public int getWriteSpinCount() { return writeSpinCount; @@ -149,42 +59,4 @@ class DefaultNioSocketChannelConfig extends DefaultSocketChannelConfig } this.writeSpinCount = writeSpinCount; } - - @Override - public ReceiveBufferSizePredictor getReceiveBufferSizePredictor() { - ReceiveBufferSizePredictor predictor = this.predictor; - if (predictor == null) { - try { - this.predictor = predictor = getReceiveBufferSizePredictorFactory().getPredictor(); - } catch (Exception e) { - throw new ChannelException( - "Failed to create a new " + - ReceiveBufferSizePredictor.class.getSimpleName() + '.', - e); - } - } - return predictor; - } - - @Override - public void setReceiveBufferSizePredictor( - ReceiveBufferSizePredictor predictor) { - if (predictor == null) { - throw new NullPointerException("predictor"); - } - this.predictor = predictor; - } - - @Override - public ReceiveBufferSizePredictorFactory getReceiveBufferSizePredictorFactory() { - return predictorFactory; - } - - @Override - public void setReceiveBufferSizePredictorFactory(ReceiveBufferSizePredictorFactory predictorFactory) { - if (predictorFactory == null) { - throw new NullPointerException("predictorFactory"); - } - this.predictorFactory = predictorFactory; - } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/JdkChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/JdkChannel.java deleted file mode 100644 index 3d3bb41f2b..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/JdkChannel.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import java.io.IOException; -import java.net.SocketAddress; -import java.nio.channels.Channel; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.WritableByteChannel; - -public interface JdkChannel extends Channel, WritableByteChannel { - - SelectionKey keyFor(Selector selector); - - SelectionKey register(Selector selector, int interestedOps, Object attachment) throws ClosedChannelException; - - boolean isRegistered(); - - SocketAddress getRemoteSocketAddress(); - - SocketAddress getLocalSocketAddress(); - - boolean isConnected(); - - boolean isSocketBound(); - - boolean finishConnect() throws IOException; - - void disconnectSocket() throws IOException; - - void closeSocket() throws IOException; - - void bind(SocketAddress local) throws IOException; - - void connect(SocketAddress remote) throws IOException; - - void configureBlocking(boolean block) throws IOException; -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/nio/NioChannelConfig.java index 251d38ddab..bdc54ffa39 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioChannelConfig.java @@ -15,50 +15,16 @@ */ package io.netty.channel.socket.nio; +import io.netty.channel.ChannelConfig; + import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; -import io.netty.channel.Channel; -import io.netty.channel.ChannelConfig; - /** * Special {@link ChannelConfig} sub-type which offers extra methods which are useful for NIO. * */ public interface NioChannelConfig extends ChannelConfig { - - /** - * Returns the high water mark of the write buffer. If the number of bytes - * queued in the write buffer exceeds this value, {@link Channel#isWritable()} - * will start to return {@code true}. - */ - int getWriteBufferHighWaterMark(); - - /** - * Sets the high water mark of the write buffer. If the number of bytes - * queued in the write buffer exceeds this value, {@link Channel#isWritable()} - * will start to return {@code true}. - */ - void setWriteBufferHighWaterMark(int writeBufferHighWaterMark); - - /** - * Returns the low water mark of the write buffer. Once the number of bytes - * queued in the write buffer exceeded the - * {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then - * dropped down below this value, {@link Channel#isWritable()} will return - * {@code false} again. - */ - int getWriteBufferLowWaterMark(); - - /** - * Sets the low water mark of the write buffer. Once the number of bytes - * queued in the write buffer exceeded the - * {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then - * dropped down below this value, {@link Channel#isWritable()} will return - * {@code false} again. - */ - void setWriteBufferLowWaterMark(int writeBufferLowWaterMark); - /** * Returns the maximum loop count for a write operation until * {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value. diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannelConfig.java index bb0fd64e62..2f8da9a026 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannelConfig.java @@ -15,11 +15,7 @@ */ package io.netty.channel.socket.nio; -import io.netty.channel.AdaptiveReceiveBufferSizePredictor; -import io.netty.channel.AdaptiveReceiveBufferSizePredictorFactory; import io.netty.channel.ChannelConfig; -import io.netty.channel.ReceiveBufferSizePredictor; -import io.netty.channel.ReceiveBufferSizePredictorFactory; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannelConfig; @@ -36,53 +32,11 @@ import io.netty.channel.socket.SocketChannelConfig; * * NameAssociated setter method * - * {@code "writeBufferHighWaterMark"}{@link #setWriteBufferHighWaterMark(int)} - * - * {@code "writeBufferLowWaterMark"}{@link #setWriteBufferLowWaterMark(int)} - * * {@code "writeSpinCount"}{@link #setWriteSpinCount(int)} - * - * {@code "receiveBufferSizePredictor"}{@link #setReceiveBufferSizePredictor(ReceiveBufferSizePredictor)} - * - * {@code "receiveBufferSizePredictorFactory"}{@link #setReceiveBufferSizePredictorFactory(ReceiveBufferSizePredictorFactory)} * * */ public interface NioSocketChannelConfig extends SocketChannelConfig, NioChannelConfig { - - - /** - * Returns the {@link ReceiveBufferSizePredictor} which predicts the - * number of readable bytes in the socket receive buffer. The default - * predictor is {@link AdaptiveReceiveBufferSizePredictor}(64, 1024, 65536). - */ - ReceiveBufferSizePredictor getReceiveBufferSizePredictor(); - - /** - * Sets the {@link ReceiveBufferSizePredictor} which predicts the - * number of readable bytes in the socket receive buffer. The default - * predictor is {@link AdaptiveReceiveBufferSizePredictor}(64, 1024, 65536). - */ - void setReceiveBufferSizePredictor(ReceiveBufferSizePredictor predictor); - - /** - * Returns the {@link ReceiveBufferSizePredictorFactory} which creates a new - * {@link ReceiveBufferSizePredictor} when a new channel is created and - * no {@link ReceiveBufferSizePredictor} was set. If no predictor was set - * for the channel, {@link #setReceiveBufferSizePredictor(ReceiveBufferSizePredictor)} - * will be called with the new predictor. The default factory is - * {@link AdaptiveReceiveBufferSizePredictorFactory}(64, 1024, 65536). - */ - ReceiveBufferSizePredictorFactory getReceiveBufferSizePredictorFactory(); - - /** - * Sets the {@link ReceiveBufferSizePredictor} which creates a new - * {@link ReceiveBufferSizePredictor} when a new channel is created and - * no {@link ReceiveBufferSizePredictor} was set. If no predictor was set - * for the channel, {@link #setReceiveBufferSizePredictor(ReceiveBufferSizePredictor)} - * will be called with the new predictor. The default factory is - * {@link AdaptiveReceiveBufferSizePredictorFactory}(64, 1024, 65536). - */ - void setReceiveBufferSizePredictorFactory( - ReceiveBufferSizePredictorFactory predictorFactory); + // This method does not provide a configuration property by itself. + // It just combined SocketChannelConfig and NioChannelConfig for user's sake. } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java b/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java index 575e515d1d..13e72d727f 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java @@ -19,7 +19,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; -import io.netty.channel.MessageEvent; +import io.netty.channel.EventLoop; import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.socket.nio.SendBufferPool.SendBuffer; import io.netty.logging.InternalLogger; @@ -47,8 +47,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import static io.netty.channel.Channels.*; - abstract class SelectorEventLoop extends SingleThreadEventLoop { /** * Internal Netty logger. @@ -60,7 +58,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization. - + /** * The NIO {@link Selector}. */ @@ -117,7 +115,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { } @Override - public void register(final Channel channel, final ChannelFuture future) { + public EventLoop register(final Channel channel, final ChannelFuture future) { try { if (channel instanceof NioServerSocketChannel) { final NioServerSocketChannel ch = (NioServerSocketChannel) channel; @@ -134,7 +132,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { }); } else if (channel instanceof NioClientSocketChannel) { final NioClientSocketChannel clientChannel = (NioClientSocketChannel) channel; - + execute(new Runnable() { @Override public void run() { @@ -164,7 +162,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { future.setFailure(t); fireExceptionCaught(channel, t); } - + } }); } else { @@ -229,7 +227,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { if (wakenUp.get()) { selector.wakeup(); } - + cancelledKeys = 0; processTaskQueue(); processSelectedKeys(selector.selectedKeys()); @@ -279,7 +277,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { "Failed to close a selector.", e); } } - + private void processTaskQueue() throws IOException { for (;;) { final Runnable task = pollTask(); @@ -308,7 +306,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { if ((readyOps & SelectionKey.OP_WRITE) != 0) { writeFromSelectorLoop(k); } - + if ((readyOps & SelectionKey.OP_ACCEPT) != 0) { removeKey = accept(k); } @@ -323,7 +321,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { i.remove(); } } - + if (cleanUpCancelledKeys()) { @@ -336,7 +334,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { NioServerSocketChannel channel = (NioServerSocketChannel) key.attachment(); try { boolean handled = false; - + // accept all sockets that are waiting atm for (;;) { SocketChannel acceptedSocket = channel.socket.accept(); @@ -347,7 +345,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { ChannelPipeline pipeline = channel.getConfig().getPipelineFactory().getPipeline(); NioWorker worker = channel.workers.nextWorker(); - + worker.registerWithWorker(NioAcceptedSocketChannel.create(channel.getFactory(), pipeline, channel, channel.getPipeline().getSink(), acceptedSocket, worker), null); handled = true; @@ -370,8 +368,8 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { } return true; } - - + + protected void processConnectTimeout(Set keys, long currentTimeNanos) { ConnectException cause = null; for (SelectionKey k: keys) { @@ -385,7 +383,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { //close(k); continue; } - + // Something is ready so skip it if (k.readyOps() != 0) { continue; @@ -405,9 +403,9 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { ch.getWorker().close(ch, succeededFuture(ch)); } } - - - + + + } } @@ -425,7 +423,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { ch.getWorker().close(ch, succeededFuture(ch)); } } - + private boolean cleanUpCancelledKeys() throws IOException { if (cancelledKeys >= CLEANUP_INTERVAL) { cancelledKeys = 0; @@ -434,9 +432,9 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { } return false; } - - + + protected void close(SelectionKey k) { Object attachment = k.attachment(); if (attachment instanceof AbstractNioChannel) { @@ -458,7 +456,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { } if (scheduleWriteIfNecessary(channel)) { return; - } + } // From here, we are sure Thread.currentThread() == workerThread. @@ -478,14 +476,14 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { write0(ch); } } - + void writeFromSelectorLoop(final SelectionKey k) { AbstractNioChannel ch = (AbstractNioChannel) k.attachment(); ch.writeSuspended = false; write0(ch); } - + protected boolean scheduleWriteIfNecessary(final AbstractNioChannel channel) { if (!inEventLoop()) { if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { @@ -498,12 +496,12 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { workerSelector.wakeup(); } } - + return true; } return false; - } + } protected void write0(AbstractNioChannel channel) { boolean open = true; @@ -514,7 +512,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { long writtenBytes = 0; final SendBufferPool sendBufferPool = this.sendBufferPool; - + final WritableByteChannel ch = channel.getJdkChannel(); final Queue writeBuffer = channel.writeBufferQueue; final int writeSpinCount = channel.getConfig().getWriteSpinCount(); @@ -660,11 +658,11 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { } } } - + public void close(NioServerSocketChannel channel, ChannelFuture future) { boolean inEventLoop = inEventLoop(); - + boolean bound = channel.isBound(); try { if (channel.socket.isOpen()) { @@ -705,16 +703,16 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { fireExceptionCaught(channel, t); } else { fireExceptionCaughtLater(channel, t); - + } } } - + public void close(AbstractNioChannel channel, ChannelFuture future) { boolean connected = channel.isConnected(); boolean bound = channel.isBound(); boolean inEventLoop = inEventLoop(); - + try { channel.getJdkChannel().close(); cancelledKeys ++; @@ -825,16 +823,16 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { // Override OP_WRITE flag - a user cannot change this flag. interestOps &= ~Channel.OP_WRITE; interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE; - + if (key == null || selector == null) { if (channel.getRawInterestOps() != interestOps) { changed = true; } - + // Not registered to the worker yet. // Set the rawInterestOps immediately; RegisterTask will pick it up. channel.setRawInterestOpsNow(interestOps); - + future.setSuccess(); if (changed) { if (inEventLoop) { @@ -843,10 +841,10 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { fireChannelInterestChangedLater(channel); } } - + return; } - + switch (CONSTRAINT_LEVEL) { case 0: if (channel.getRawInterestOps() != interestOps) { @@ -913,7 +911,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { } } } - + /** * Read is called when a Selector has been notified that the underlying channel * was something to be read. The channel would previously have registered its interest @@ -924,5 +922,5 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { protected abstract boolean read(SelectionKey k); protected abstract void registerTask(AbstractNioChannel channel, ChannelFuture future); - + } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/SendBufferPool.java b/transport/src/main/java/io/netty/channel/socket/nio/SendBufferPool.java deleted file mode 100644 index 5a40272792..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/SendBufferPool.java +++ /dev/null @@ -1,346 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import java.io.IOException; -import java.lang.ref.SoftReference; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; -import java.nio.channels.WritableByteChannel; - -import io.netty.buffer.ChannelBuffer; -import io.netty.channel.FileRegion; - -public class SendBufferPool { - - private static final SendBuffer EMPTY_BUFFER = new EmptySendBuffer(); - - public static final int DEFAULT_PREALLOCATION_SIZE = 65536; - public static final int ALIGN_SHIFT = 4; - public static final int ALIGN_MASK = 15; - - protected PreallocationRef poolHead; - protected Preallocation current = new Preallocation(DEFAULT_PREALLOCATION_SIZE); - - public SendBufferPool() { - } - - - public SendBuffer acquire(Object message) { - if (message instanceof ChannelBuffer) { - return acquire((ChannelBuffer) message); - } else if (message instanceof FileRegion) { - return acquire((FileRegion) message); - } - - throw new IllegalArgumentException( - "unsupported message type: " + message.getClass()); - } - - protected SendBuffer acquire(FileRegion src) { - if (src.getCount() == 0) { - return EMPTY_BUFFER; - } - return new FileSendBuffer(src); - } - - private SendBuffer acquire(ChannelBuffer src) { - final int size = src.readableBytes(); - if (size == 0) { - return EMPTY_BUFFER; - } - - if (src.isDirect()) { - return new UnpooledSendBuffer(src.toByteBuffer()); - } - if (src.readableBytes() > DEFAULT_PREALLOCATION_SIZE) { - return new UnpooledSendBuffer(src.toByteBuffer()); - } - - Preallocation current = this.current; - ByteBuffer buffer = current.buffer; - int remaining = buffer.remaining(); - PooledSendBuffer dst; - - if (size < remaining) { - int nextPos = buffer.position() + size; - ByteBuffer slice = buffer.duplicate(); - buffer.position(align(nextPos)); - slice.limit(nextPos); - current.refCnt ++; - dst = new PooledSendBuffer(current, slice); - } else if (size > remaining) { - this.current = current = getPreallocation(); - buffer = current.buffer; - ByteBuffer slice = buffer.duplicate(); - buffer.position(align(size)); - slice.limit(size); - current.refCnt ++; - dst = new PooledSendBuffer(current, slice); - } else { // size == remaining - current.refCnt ++; - this.current = getPreallocation0(); - dst = new PooledSendBuffer(current, current.buffer); - } - - ByteBuffer dstbuf = dst.buffer; - dstbuf.mark(); - src.getBytes(src.readerIndex(), dstbuf); - dstbuf.reset(); - return dst; - } - - protected Preallocation getPreallocation() { - Preallocation current = this.current; - if (current.refCnt == 0) { - current.buffer.clear(); - return current; - } - - return getPreallocation0(); - } - - protected Preallocation getPreallocation0() { - PreallocationRef ref = poolHead; - if (ref != null) { - do { - Preallocation p = ref.get(); - ref = ref.next; - - if (p != null) { - poolHead = ref; - return p; - } - } while (ref != null); - - poolHead = ref; - } - - return new Preallocation(DEFAULT_PREALLOCATION_SIZE); - } - - protected static int align(int pos) { - int q = pos >>> ALIGN_SHIFT; - int r = pos & ALIGN_MASK; - if (r != 0) { - q ++; - } - return q << ALIGN_SHIFT; - } - - public static final class Preallocation { - public final ByteBuffer buffer; - public int refCnt; - - public Preallocation(int capacity) { - buffer = ByteBuffer.allocateDirect(capacity); - } - } - - public final class PreallocationRef extends SoftReference { - final PreallocationRef next; - - public PreallocationRef(Preallocation prealloation, PreallocationRef next) { - super(prealloation); - this.next = next; - } - } - - public interface SendBuffer { - boolean finished(); - long writtenBytes(); - long totalBytes(); - - long transferTo(WritableByteChannel ch) throws IOException; - long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException; - - void release(); - } - - public class UnpooledSendBuffer implements SendBuffer { - - protected final ByteBuffer buffer; - final int initialPos; - - public UnpooledSendBuffer(ByteBuffer buffer) { - this.buffer = buffer; - initialPos = buffer.position(); - } - - @Override - public final boolean finished() { - return !buffer.hasRemaining(); - } - - @Override - public final long writtenBytes() { - return buffer.position() - initialPos; - } - - @Override - public final long totalBytes() { - return buffer.limit() - initialPos; - } - - @Override - public final long transferTo(WritableByteChannel ch) throws IOException { - return ch.write(buffer); - } - - @Override - public final long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException { - return ch.send(buffer, raddr); - } - - @Override - public void release() { - // Unpooled. - } - } - - public class PooledSendBuffer implements SendBuffer { - - protected final Preallocation parent; - public final ByteBuffer buffer; - final int initialPos; - - public PooledSendBuffer(Preallocation parent, ByteBuffer buffer) { - this.parent = parent; - this.buffer = buffer; - initialPos = buffer.position(); - } - - @Override - public boolean finished() { - return !buffer.hasRemaining(); - } - - @Override - public long writtenBytes() { - return buffer.position() - initialPos; - } - - @Override - public long totalBytes() { - return buffer.limit() - initialPos; - } - - @Override - public long transferTo(WritableByteChannel ch) throws IOException { - return ch.write(buffer); - } - - @Override - public long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException { - return ch.send(buffer, raddr); - } - - @Override - public void release() { - final Preallocation parent = this.parent; - if (-- parent.refCnt == 0) { - parent.buffer.clear(); - if (parent != current) { - poolHead = new PreallocationRef(parent, poolHead); - } - } - } - } - - static final class FileSendBuffer implements SendBuffer { - - private final FileRegion file; - private long writtenBytes; - - - FileSendBuffer(FileRegion file) { - this.file = file; - } - - @Override - public boolean finished() { - return writtenBytes >= file.getCount(); - } - - @Override - public long writtenBytes() { - return writtenBytes; - } - - @Override - public long totalBytes() { - return file.getCount(); - } - - @Override - public long transferTo(WritableByteChannel ch) throws IOException { - long localWrittenBytes = file.transferTo(ch, writtenBytes); - writtenBytes += localWrittenBytes; - return localWrittenBytes; - } - - @Override - public long transferTo(DatagramChannel ch, SocketAddress raddr) - throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void release() { - if (file.releaseAfterTransfer()) { - // Make sure the FileRegion resource are released otherwise it may cause a FD leak or something similar - file.releaseExternalResources(); - } - } - } - - static final class EmptySendBuffer implements SendBuffer { - - EmptySendBuffer() { - } - - @Override - public boolean finished() { - return true; - } - - @Override - public long writtenBytes() { - return 0; - } - - @Override - public long totalBytes() { - return 0; - } - - @Override - public long transferTo(WritableByteChannel ch) throws IOException { - return 0; - } - - @Override - public long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException { - return 0; - } - - @Override - public void release() { - // Unpooled. - } - } -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/ShareableWorkerPool.java b/transport/src/main/java/io/netty/channel/socket/nio/ShareableWorkerPool.java deleted file mode 100644 index c3fb95b89e..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/ShareableWorkerPool.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import io.netty.channel.socket.Worker; -import io.netty.util.ExternalResourceReleasable; - -/** - * This implementation of a {@link WorkerPool} should be used if you plan to share a {@link WorkerPool} between different Factories. You will need to call {@link #destroy()} by your own once - * you want to release any resources of it. - * - * - */ -public final class ShareableWorkerPool implements WorkerPool { - - private final WorkerPool wrapped; - - public ShareableWorkerPool(WorkerPool wrapped) { - this.wrapped = wrapped; - } - - @Override - public E nextWorker() { - return wrapped.nextWorker(); - } - - /** - * Destroy the {@link ShareableWorkerPool} and release all resources. After this is called its not usable anymore - */ - public void destroy() { - if (wrapped instanceof ExternalResourceReleasable) { - ((ExternalResourceReleasable) wrapped).releaseExternalResources(); - } - } -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/WorkerPool.java b/transport/src/main/java/io/netty/channel/socket/nio/WorkerPool.java deleted file mode 100644 index f99f936544..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/WorkerPool.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.channel.socket.nio; - -import io.netty.channel.socket.Worker; - -/** - * The {@link WorkerPool} is responsible to hand of {@link Worker}'s on demand - * - */ -public interface WorkerPool { - - /** - * Return the next {@link Worker} to use - * - * @return worker - */ - E nextWorker(); - - -}