Retrofit the NIO transport with the new API / improve the new API

- Remove the classes and properties that are not necessary anymore
- Remove SingleThreadEventLoop.newRegistrationTask() and let
  Channel.Unsafe handle registration by itself
- Channel.Unsafe.localAddress() and remoteAddress()
  - JdkChannel is replaced by Channel.Unsafe.
This commit is contained in:
Trustin Lee 2012-05-02 15:01:58 +09:00
parent 5dda9d1840
commit 9e6f8b46df
16 changed files with 142 additions and 1355 deletions

View File

@ -359,6 +359,17 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return firstOut(); return firstOut();
} }
@Override
public SocketAddress localAddress() {
return localAddress0();
}
@Override
public SocketAddress remoteAddress() {
// TODO Auto-generated method stub
return remoteAddress0();
}
@Override @Override
public void register(EventLoop eventLoop, ChannelFuture future) { public void register(EventLoop eventLoop, ChannelFuture future) {
if (eventLoop == null) { if (eventLoop == null) {
@ -476,6 +487,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
protected abstract java.nio.channels.Channel javaChannel(); protected abstract java.nio.channels.Channel javaChannel();
protected abstract ChannelBufferHolder<Object> firstOut(); protected abstract ChannelBufferHolder<Object> firstOut();
protected abstract SocketAddress localAddress0();
protected abstract SocketAddress remoteAddress0();
protected abstract void doRegister(ChannelFuture future); protected abstract void doRegister(ChannelFuture future);
protected abstract void doBind(SocketAddress localAddress, ChannelFuture future); protected abstract void doBind(SocketAddress localAddress, ChannelFuture future);
protected abstract void doConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future); protected abstract void doConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future);

View File

@ -184,6 +184,9 @@ public interface Channel extends AttributeMap, ChannelFutureFactory, Comparable<
java.nio.channels.Channel ch(); java.nio.channels.Channel ch();
ChannelBufferHolder<Object> out(); ChannelBufferHolder<Object> out();
SocketAddress localAddress();
SocketAddress remoteAddress();
void register(EventLoop eventLoop, ChannelFuture future); void register(EventLoop eventLoop, ChannelFuture future);
void bind(SocketAddress localAddress, ChannelFuture future); void bind(SocketAddress localAddress, ChannelFuture future);
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future); void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future);

View File

@ -57,8 +57,17 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl
} }
@Override @Override
public EventLoop register(Channel channel, ChannelFuture future) { public EventLoop register(final Channel channel, final ChannelFuture future) {
execute(newRegistrationTask(channel, future)); if (inEventLoop()) {
channel.unsafe().register(this, future);
} else {
execute(new Runnable() {
@Override
public void run() {
channel.unsafe().register(SingleThreadEventLoop.this, future);
}
});
}
return this; return this;
} }
@ -111,8 +120,6 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl
protected abstract void wakeup(boolean inEventLoop); protected abstract void wakeup(boolean inEventLoop);
protected abstract Runnable newRegistrationTask(Channel channel, ChannelFuture future);
@Override @Override
public boolean inEventLoop() { public boolean inEventLoop() {
return Thread.currentThread() == thread; return Thread.currentThread() == thread;

View File

@ -1,73 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.AbstractSelectableChannel;
public abstract class AbstractJdkChannel implements JdkChannel {
final AbstractSelectableChannel channel;
protected AbstractJdkChannel(AbstractSelectableChannel channel) {
this.channel = channel;
}
protected AbstractSelectableChannel getChannel() {
return channel;
}
@Override
public boolean isOpen() {
return channel.isOpen();
}
@Override
public void close() throws IOException {
channel.close();
}
@Override
public SelectionKey keyFor(Selector selector) {
return channel.keyFor(selector);
}
@Override
public SelectionKey register(Selector selector, int interestedOps, Object attachment) throws ClosedChannelException {
return channel.register(selector, interestedOps, attachment);
}
@Override
public boolean isRegistered() {
return channel.isRegistered();
}
@Override
public void configureBlocking(boolean block) throws IOException {
channel.configureBlocking(block);
}
@Override
public boolean finishConnect() throws IOException {
return true;
}
}

View File

@ -15,59 +15,21 @@
*/ */
package io.netty.channel.socket.nio; package io.netty.channel.socket.nio;
import static io.netty.channel.Channels.fireChannelInterestChanged;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.AbstractChannel; import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import io.netty.channel.MessageEvent;
import io.netty.channel.socket.nio.SendBufferPool.SendBuffer;
import io.netty.util.internal.QueueFactory;
import io.netty.util.internal.ThreadLocalBoolean;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Collection; import java.nio.channels.SelectableChannel;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
public abstract class AbstractNioChannel extends AbstractChannel implements NioChannel { public abstract class AbstractNioChannel extends AbstractChannel {
/**
* The {@link SelectorEventLoop}.
*/
private final SelectorEventLoop worker;
/**
* Monitor object to synchronize access to InterestedOps.
*/
protected final Object interestOpsLock = new Object();
/**
* Monitor object for synchronizing access to the {@link WriteRequestQueue}.
*/
protected final Object writeLock = new Object();
/**
* WriteTask that performs write operations.
*/
final Runnable writeTask = new WriteTask();
/** /**
* Indicates if there is a {@link WriteTask} in the task queue. * Indicates if there is a {@link WriteTask} in the task queue.
*/ */
final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean(); final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
/**
* Queue of write {@link MessageEvent}s.
*/
protected final Queue<MessageEvent> writeBufferQueue = createRequestQueue();
/** /**
* Keeps track of the number of bytes that the {@link WriteRequestQueue} currently * Keeps track of the number of bytes that the {@link WriteRequestQueue} currently
* contains. * contains.
@ -79,59 +41,40 @@ public abstract class AbstractNioChannel extends AbstractChannel implements NioC
*/ */
final AtomicInteger highWaterMarkCounter = new AtomicInteger(); final AtomicInteger highWaterMarkCounter = new AtomicInteger();
/**
* The current write {@link MessageEvent}
*/
protected MessageEvent currentWriteEvent;
protected SendBuffer currentWriteBuffer;
/** /**
* Boolean that indicates that write operation is in progress. * Boolean that indicates that write operation is in progress.
*/ */
protected boolean inWriteNowLoop; protected boolean inWriteNowLoop;
protected boolean writeSuspended; protected boolean writeSuspended;
private volatile InetSocketAddress localAddress; private volatile InetSocketAddress localAddress;
volatile InetSocketAddress remoteAddress; volatile InetSocketAddress remoteAddress;
private final JdkChannel channel; private final SelectableChannel ch;
protected AbstractNioChannel(Integer id, Channel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, SelectorEventLoop worker, JdkChannel ch) { protected AbstractNioChannel(Integer id, Channel parent, SelectableChannel ch) {
super(id, parent, factory, pipeline, sink); super(id, parent);
this.worker = worker; this.ch = ch;
this.channel = ch;
}
protected AbstractNioChannel(
Channel parent, ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink, SelectorEventLoop worker, JdkChannel ch) {
super(parent, factory, pipeline, sink);
this.worker = worker;
this.channel = ch;
} }
protected JdkChannel getJdkChannel() { protected AbstractNioChannel(Channel parent, SelectableChannel ch) {
return channel; super(parent);
this.ch = ch;
} }
/**
* Return the {@link SelectorEventLoop} that handle the IO of the {@link AbstractNioChannel}
*
* @return worker
*/
public SelectorEventLoop getWorker() {
return worker;
}
@Override @Override
public InetSocketAddress getLocalAddress() { protected SelectableChannel javaChannel() {
return ch;
}
@Override
public InetSocketAddress localAddress() {
InetSocketAddress localAddress = this.localAddress; InetSocketAddress localAddress = this.localAddress;
if (localAddress == null) { if (localAddress == null) {
try { try {
this.localAddress = localAddress = this.localAddress = localAddress =
(InetSocketAddress) channel.getLocalSocketAddress(); (InetSocketAddress) unsafe().localAddress();
} catch (Throwable t) { } catch (Throwable t) {
// Sometimes fails on a closed socket in Windows. // Sometimes fails on a closed socket in Windows.
return null; return null;
@ -141,12 +84,12 @@ public abstract class AbstractNioChannel extends AbstractChannel implements NioC
} }
@Override @Override
public InetSocketAddress getRemoteAddress() { public InetSocketAddress remoteAddress() {
InetSocketAddress remoteAddress = this.remoteAddress; InetSocketAddress remoteAddress = this.remoteAddress;
if (remoteAddress == null) { if (remoteAddress == null) {
try { try {
this.remoteAddress = remoteAddress = this.remoteAddress = remoteAddress =
(InetSocketAddress) channel.getRemoteSocketAddress(); (InetSocketAddress) unsafe().remoteAddress();
} catch (Throwable t) { } catch (Throwable t) {
// Sometimes fails on a closed socket in Windows. // Sometimes fails on a closed socket in Windows.
return null; return null;
@ -154,245 +97,7 @@ public abstract class AbstractNioChannel extends AbstractChannel implements NioC
} }
return remoteAddress; return remoteAddress;
} }
@Override
public abstract NioChannelConfig getConfig();
int getRawInterestOps() {
return super.getInterestOps();
}
void setRawInterestOpsNow(int interestOps) {
super.setInterestOpsNow(interestOps);
}
@Override @Override
public int getInterestOps() { public abstract NioChannelConfig config();
if (!isOpen()) {
return Channel.OP_WRITE;
}
int interestOps = getRawInterestOps();
int writeBufferSize = this.writeBufferSize.get();
if (writeBufferSize != 0) {
if (highWaterMarkCounter.get() > 0) {
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
if (writeBufferSize >= lowWaterMark) {
interestOps |= Channel.OP_WRITE;
} else {
interestOps &= ~Channel.OP_WRITE;
}
} else {
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
if (writeBufferSize >= highWaterMark) {
interestOps |= Channel.OP_WRITE;
} else {
interestOps &= ~Channel.OP_WRITE;
}
}
} else {
interestOps &= ~Channel.OP_WRITE;
}
return interestOps;
}
@Override
protected boolean setClosed() {
return super.setClosed();
}
protected WriteRequestQueue createRequestQueue() {
return new WriteRequestQueue();
}
public class WriteRequestQueue implements BlockingQueue<MessageEvent> {
private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
private final BlockingQueue<MessageEvent> queue;
public WriteRequestQueue() {
this.queue = QueueFactory.createQueue(MessageEvent.class);
}
@Override
public MessageEvent remove() {
return queue.remove();
}
@Override
public MessageEvent element() {
return queue.element();
}
@Override
public MessageEvent peek() {
return queue.peek();
}
@Override
public int size() {
return queue.size();
}
@Override
public boolean isEmpty() {
return queue.isEmpty();
}
@Override
public Iterator<MessageEvent> iterator() {
return queue.iterator();
}
@Override
public Object[] toArray() {
return queue.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
return queue.toArray(a);
}
@Override
public boolean containsAll(Collection<?> c) {
return queue.containsAll(c);
}
@Override
public boolean addAll(Collection<? extends MessageEvent> c) {
return queue.addAll(c);
}
@Override
public boolean removeAll(Collection<?> c) {
return queue.removeAll(c);
}
@Override
public boolean retainAll(Collection<?> c) {
return queue.retainAll(c);
}
@Override
public void clear() {
queue.clear();
}
@Override
public boolean add(MessageEvent e) {
return queue.add(e);
}
@Override
public void put(MessageEvent e) throws InterruptedException {
queue.put(e);
}
@Override
public boolean offer(MessageEvent e, long timeout, TimeUnit unit) throws InterruptedException {
return queue.offer(e, timeout, unit);
}
@Override
public MessageEvent take() throws InterruptedException {
return queue.take();
}
@Override
public MessageEvent poll(long timeout, TimeUnit unit) throws InterruptedException {
return queue.poll(timeout, unit);
}
@Override
public int remainingCapacity() {
return queue.remainingCapacity();
}
@Override
public boolean remove(Object o) {
return queue.remove(o);
}
@Override
public boolean contains(Object o) {
return queue.contains(o);
}
@Override
public int drainTo(Collection<? super MessageEvent> c) {
return queue.drainTo(c);
}
@Override
public int drainTo(Collection<? super MessageEvent> c, int maxElements) {
return queue.drainTo(c, maxElements);
}
@Override
public boolean offer(MessageEvent e) {
boolean success = queue.offer(e);
assert success;
int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
if (newWriteBufferSize >= highWaterMark) {
if (newWriteBufferSize - messageSize < highWaterMark) {
highWaterMarkCounter.incrementAndGet();
if (!notifying.get()) {
notifying.set(Boolean.TRUE);
fireChannelInterestChanged(AbstractNioChannel.this);
notifying.set(Boolean.FALSE);
}
}
}
return true;
}
@Override
public MessageEvent poll() {
MessageEvent e = queue.poll();
if (e != null) {
int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
if (newWriteBufferSize + messageSize >= lowWaterMark) {
highWaterMarkCounter.decrementAndGet();
if (isConnected() && !notifying.get()) {
notifying.set(Boolean.TRUE);
fireChannelInterestChanged(AbstractNioChannel.this);
notifying.set(Boolean.FALSE);
}
}
}
}
return e;
}
protected int getMessageSize(MessageEvent e) {
Object m = e.getMessage();
if (m instanceof ChannelBuffer) {
return ((ChannelBuffer) m).readableBytes();
}
return 0;
}
}
private final class WriteTask implements Runnable {
WriteTask() {
}
@Override
public void run() {
writeTaskInTaskQueue.set(false);
worker.writeFromTaskLoop(AbstractNioChannel.this);
}
}
} }

View File

@ -1,52 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.ChannelRunnableWrapper;
public abstract class AbstractNioChannelSink extends AbstractChannelSink {
@Override
public ChannelFuture execute(ChannelPipeline pipeline, final Runnable task) {
Channel ch = pipeline.channel();
if (ch instanceof AbstractNioChannel) {
AbstractNioChannel channel = (AbstractNioChannel) ch;
ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.channel(), task);
channel.getWorker().executeInIoThread(wrapper);
return wrapper;
}
return super.execute(pipeline, task);
}
@Override
protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) {
Channel channel = event.getChannel();
boolean fireLater = false;
if (channel instanceof AbstractNioChannel) {
fireLater = !((AbstractNioChannel) channel).getWorker().isIoThread();
}
return fireLater;
}
}

View File

@ -1,83 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import io.netty.channel.Channel;
import io.netty.channel.socket.Worker;
import io.netty.util.ExternalResourceReleasable;
import io.netty.util.internal.ExecutorUtil;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Abstract base class for {@link WorkerPool} implementations that create the {@link Worker}'s up-front and return them in a "fair" fashion when calling
* {@link #nextWorker()}
*
*/
public abstract class AbstractNioWorkerPool<E extends SelectorEventLoop> implements WorkerPool<E> , ExternalResourceReleasable {
private final SelectorEventLoop[] workers;
private final AtomicInteger workerIndex = new AtomicInteger();
private final Executor workerExecutor;
/**
* Create a new instance
*
* @param workerExecutor the {@link Executor} to use for the {@link Worker}'s
* @param allowShutdownOnIdle allow the {@link Worker}'s to shutdown when there is not {@link Channel} is registered with it
* @param workerCount the count of {@link Worker}'s to create
*/
protected AbstractNioWorkerPool(Executor workerExecutor, int workerCount, boolean allowShutDownOnIdle) {
if (workerExecutor == null) {
throw new NullPointerException("workerExecutor");
}
if (workerCount <= 0) {
throw new IllegalArgumentException(
"workerCount (" + workerCount + ") " +
"must be a positive integer.");
}
workers = new SelectorEventLoop[workerCount];
for (int i = 0; i < workers.length; i++) {
workers[i] = createWorker(workerExecutor, allowShutDownOnIdle);
}
this.workerExecutor = workerExecutor;
}
/**
* Create a new {@link Worker} which uses the given {@link Executor} to service IO
*
*
* @param executor the {@link Executor} to use
* @param allowShutdownOnIdle allow the {@link Worker} to shutdown when there is not {@link Channel} is registered with it
* @return worker the new {@link Worker}
*/
protected abstract E createWorker(Executor executor, boolean allowShutdownOnIdle);
@SuppressWarnings("unchecked")
public E nextWorker() {
return (E) workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];
}
@Override
public void releaseExternalResources() {
ExecutorUtil.terminate(workerExecutor);
}
}

View File

@ -17,16 +17,12 @@ package io.netty.channel.socket.nio;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.socket.DefaultDatagramChannelConfig; import io.netty.channel.socket.DefaultDatagramChannelConfig;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.ConversionUtil; import io.netty.util.internal.ConversionUtil;
import io.netty.util.internal.DetectionUtil; import io.netty.util.internal.DetectionUtil;
import java.io.IOException; import java.lang.reflect.Method;
import java.net.NetworkInterface; import java.net.NetworkInterface;
import java.net.StandardSocketOptions;
import java.nio.channels.DatagramChannel; import java.nio.channels.DatagramChannel;
import java.util.Map;
/** /**
* The default {@link NioSocketChannelConfig} implementation. * The default {@link NioSocketChannelConfig} implementation.
@ -34,48 +30,61 @@ import java.util.Map;
class DefaultNioDatagramChannelConfig extends DefaultDatagramChannelConfig class DefaultNioDatagramChannelConfig extends DefaultDatagramChannelConfig
implements NioDatagramChannelConfig { implements NioDatagramChannelConfig {
private static final InternalLogger logger = private static final Object IP_MULTICAST_IF;
InternalLoggerFactory private static final Method GET_OPTION;
.getInstance(DefaultNioDatagramChannelConfig.class); private static final Method SET_OPTION;
private volatile int writeBufferHighWaterMark = 64 * 1024; static {
private volatile int writeBufferLowWaterMark = 32 * 1024; ClassLoader classLoader = DatagramChannel.class.getClassLoader();
private volatile int writeSpinCount = 16; Class<?> socketOptionType = null;
try {
socketOptionType = Class.forName("java.net.SocketOption", true, classLoader);
} catch (Exception e) {
// Not Java 7+
}
Object ipMulticastIf = null;
if (socketOptionType != null) {
try {
ipMulticastIf = Class.forName("java.net.StandardSocketOptions", true, classLoader).getDeclaredField("IP_MULTICAST_IF").get(null);
} catch (Exception e) {
throw new Error("cannot locate the IP_MULTICAST_IF field", e);
}
}
IP_MULTICAST_IF = ipMulticastIf;
Method getOption;
try {
getOption = DatagramChannel.class.getDeclaredMethod("getOption", socketOptionType);
} catch (Exception e) {
throw new Error("cannot locate the getOption() method", e);
}
GET_OPTION = getOption;
Method setOption;
try {
setOption = DatagramChannel.class.getDeclaredMethod("setOption", socketOptionType, Object.class);
} catch (Exception e) {
throw new Error("cannot locate the setOption() method", e);
}
SET_OPTION = setOption;
}
private final DatagramChannel channel; private final DatagramChannel channel;
private volatile int writeSpinCount = 16;
DefaultNioDatagramChannelConfig(DatagramChannel channel) { DefaultNioDatagramChannelConfig(DatagramChannel channel) {
super(channel.socket()); super(channel.socket());
this.channel = channel; this.channel = channel;
} }
@Override
public void setOptions(Map<String, Object> options) {
super.setOptions(options);
if (getWriteBufferHighWaterMark() < getWriteBufferLowWaterMark()) {
// Recover the integrity of the configuration with a sensible value.
setWriteBufferLowWaterMark0(getWriteBufferHighWaterMark() >>> 1);
if (logger.isWarnEnabled()) {
// Notify the user about misconfiguration.
logger.warn("writeBufferLowWaterMark cannot be greater than "
+ "writeBufferHighWaterMark; setting to the half of the "
+ "writeBufferHighWaterMark.");
}
}
}
@Override @Override
public boolean setOption(String key, Object value) { public boolean setOption(String key, Object value) {
if (super.setOption(key, value)) { if (super.setOption(key, value)) {
return true; return true;
} }
if (key.equals("writeBufferHighWaterMark")) { if (key.equals("writeSpinCount")) {
setWriteBufferHighWaterMark0(ConversionUtil.toInt(value));
} else if (key.equals("writeBufferLowWaterMark")) {
setWriteBufferLowWaterMark0(ConversionUtil.toInt(value));
} else if (key.equals("writeSpinCount")) {
setWriteSpinCount(ConversionUtil.toInt(value)); setWriteSpinCount(ConversionUtil.toInt(value));
} else { } else {
return false; return false;
@ -83,56 +92,6 @@ class DefaultNioDatagramChannelConfig extends DefaultDatagramChannelConfig
return true; return true;
} }
@Override
public int getWriteBufferHighWaterMark() {
return writeBufferHighWaterMark;
}
@Override
public void setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
if (writeBufferHighWaterMark < getWriteBufferLowWaterMark()) {
throw new IllegalArgumentException(
"writeBufferHighWaterMark cannot be less than " +
"writeBufferLowWaterMark (" +
getWriteBufferLowWaterMark() + "): " +
writeBufferHighWaterMark);
}
setWriteBufferHighWaterMark0(writeBufferHighWaterMark);
}
private void setWriteBufferHighWaterMark0(int writeBufferHighWaterMark) {
if (writeBufferHighWaterMark < 0) {
throw new IllegalArgumentException("writeBufferHighWaterMark: " +
writeBufferHighWaterMark);
}
this.writeBufferHighWaterMark = writeBufferHighWaterMark;
}
@Override
public int getWriteBufferLowWaterMark() {
return writeBufferLowWaterMark;
}
@Override
public void setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
if (writeBufferLowWaterMark > getWriteBufferHighWaterMark()) {
throw new IllegalArgumentException(
"writeBufferLowWaterMark cannot be greater than " +
"writeBufferHighWaterMark (" +
getWriteBufferHighWaterMark() + "): " +
writeBufferLowWaterMark);
}
setWriteBufferLowWaterMark0(writeBufferLowWaterMark);
}
private void setWriteBufferLowWaterMark0(int writeBufferLowWaterMark) {
if (writeBufferLowWaterMark < 0) {
throw new IllegalArgumentException("writeBufferLowWaterMark: " +
writeBufferLowWaterMark);
}
this.writeBufferLowWaterMark = writeBufferLowWaterMark;
}
@Override @Override
public int getWriteSpinCount() { public int getWriteSpinCount() {
return writeSpinCount; return writeSpinCount;
@ -146,15 +105,15 @@ class DefaultNioDatagramChannelConfig extends DefaultDatagramChannelConfig
} }
this.writeSpinCount = writeSpinCount; this.writeSpinCount = writeSpinCount;
} }
@Override @Override
public void setNetworkInterface(NetworkInterface networkInterface) { public void setNetworkInterface(NetworkInterface networkInterface) {
if (DetectionUtil.javaVersion() < 7) { if (DetectionUtil.javaVersion() < 7) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} else { } else {
try { try {
channel.setOption(StandardSocketOptions.IP_MULTICAST_IF, networkInterface); SET_OPTION.invoke(channel, IP_MULTICAST_IF, networkInterface);
} catch (IOException e) { } catch (Exception e) {
throw new ChannelException(e); throw new ChannelException(e);
} }
} }
@ -166,11 +125,10 @@ class DefaultNioDatagramChannelConfig extends DefaultDatagramChannelConfig
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} else { } else {
try { try {
return (NetworkInterface) channel.getOption(StandardSocketOptions.IP_MULTICAST_IF); return (NetworkInterface) GET_OPTION.invoke(channel, IP_MULTICAST_IF);
} catch (IOException e) { } catch (Exception e) {
throw new ChannelException(e); throw new ChannelException(e);
} }
} }
} }
} }

View File

@ -15,127 +15,37 @@
*/ */
package io.netty.channel.socket.nio; package io.netty.channel.socket.nio;
import java.net.Socket;
import java.util.Map;
import io.netty.channel.AdaptiveReceiveBufferSizePredictorFactory;
import io.netty.channel.ChannelException;
import io.netty.channel.ReceiveBufferSizePredictor;
import io.netty.channel.ReceiveBufferSizePredictorFactory;
import io.netty.channel.socket.DefaultSocketChannelConfig; import io.netty.channel.socket.DefaultSocketChannelConfig;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.ConversionUtil; import io.netty.util.internal.ConversionUtil;
import java.net.Socket;
/** /**
* The default {@link NioSocketChannelConfig} implementation. * The default {@link NioSocketChannelConfig} implementation.
*/ */
class DefaultNioSocketChannelConfig extends DefaultSocketChannelConfig class DefaultNioSocketChannelConfig extends DefaultSocketChannelConfig
implements NioSocketChannelConfig { implements NioSocketChannelConfig {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(DefaultNioSocketChannelConfig.class);
private static final ReceiveBufferSizePredictorFactory DEFAULT_PREDICTOR_FACTORY =
new AdaptiveReceiveBufferSizePredictorFactory();
private volatile int writeBufferHighWaterMark = 64 * 1024;
private volatile int writeBufferLowWaterMark = 32 * 1024;
private volatile ReceiveBufferSizePredictor predictor;
private volatile ReceiveBufferSizePredictorFactory predictorFactory = DEFAULT_PREDICTOR_FACTORY;
private volatile int writeSpinCount = 16; private volatile int writeSpinCount = 16;
DefaultNioSocketChannelConfig(Socket socket) { DefaultNioSocketChannelConfig(Socket socket) {
super(socket); super(socket);
} }
@Override
public void setOptions(Map<String, Object> options) {
super.setOptions(options);
if (getWriteBufferHighWaterMark() < getWriteBufferLowWaterMark()) {
// Recover the integrity of the configuration with a sensible value.
setWriteBufferLowWaterMark0(getWriteBufferHighWaterMark() >>> 1);
if (logger.isWarnEnabled()) {
// Notify the user about misconfiguration.
logger.warn(
"writeBufferLowWaterMark cannot be greater than " +
"writeBufferHighWaterMark; setting to the half of the " +
"writeBufferHighWaterMark.");
}
}
}
@Override @Override
public boolean setOption(String key, Object value) { public boolean setOption(String key, Object value) {
if (super.setOption(key, value)) { if (super.setOption(key, value)) {
return true; return true;
} }
if (key.equals("writeBufferHighWaterMark")) { if (key.equals("writeSpinCount")) {
setWriteBufferHighWaterMark0(ConversionUtil.toInt(value));
} else if (key.equals("writeBufferLowWaterMark")) {
setWriteBufferLowWaterMark0(ConversionUtil.toInt(value));
} else if (key.equals("writeSpinCount")) {
setWriteSpinCount(ConversionUtil.toInt(value)); setWriteSpinCount(ConversionUtil.toInt(value));
} else if (key.equals("receiveBufferSizePredictorFactory")) {
setReceiveBufferSizePredictorFactory((ReceiveBufferSizePredictorFactory) value);
} else if (key.equals("receiveBufferSizePredictor")) {
setReceiveBufferSizePredictor((ReceiveBufferSizePredictor) value);
} else { } else {
return false; return false;
} }
return true; return true;
} }
@Override
public int getWriteBufferHighWaterMark() {
return writeBufferHighWaterMark;
}
@Override
public void setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
if (writeBufferHighWaterMark < getWriteBufferLowWaterMark()) {
throw new IllegalArgumentException(
"writeBufferHighWaterMark cannot be less than " +
"writeBufferLowWaterMark (" + getWriteBufferLowWaterMark() + "): " +
writeBufferHighWaterMark);
}
setWriteBufferHighWaterMark0(writeBufferHighWaterMark);
}
private void setWriteBufferHighWaterMark0(int writeBufferHighWaterMark) {
if (writeBufferHighWaterMark < 0) {
throw new IllegalArgumentException(
"writeBufferHighWaterMark: " + writeBufferHighWaterMark);
}
this.writeBufferHighWaterMark = writeBufferHighWaterMark;
}
@Override
public int getWriteBufferLowWaterMark() {
return writeBufferLowWaterMark;
}
@Override
public void setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
if (writeBufferLowWaterMark > getWriteBufferHighWaterMark()) {
throw new IllegalArgumentException(
"writeBufferLowWaterMark cannot be greater than " +
"writeBufferHighWaterMark (" + getWriteBufferHighWaterMark() + "): " +
writeBufferLowWaterMark);
}
setWriteBufferLowWaterMark0(writeBufferLowWaterMark);
}
private void setWriteBufferLowWaterMark0(int writeBufferLowWaterMark) {
if (writeBufferLowWaterMark < 0) {
throw new IllegalArgumentException(
"writeBufferLowWaterMark: " + writeBufferLowWaterMark);
}
this.writeBufferLowWaterMark = writeBufferLowWaterMark;
}
@Override @Override
public int getWriteSpinCount() { public int getWriteSpinCount() {
return writeSpinCount; return writeSpinCount;
@ -149,42 +59,4 @@ class DefaultNioSocketChannelConfig extends DefaultSocketChannelConfig
} }
this.writeSpinCount = writeSpinCount; this.writeSpinCount = writeSpinCount;
} }
@Override
public ReceiveBufferSizePredictor getReceiveBufferSizePredictor() {
ReceiveBufferSizePredictor predictor = this.predictor;
if (predictor == null) {
try {
this.predictor = predictor = getReceiveBufferSizePredictorFactory().getPredictor();
} catch (Exception e) {
throw new ChannelException(
"Failed to create a new " +
ReceiveBufferSizePredictor.class.getSimpleName() + '.',
e);
}
}
return predictor;
}
@Override
public void setReceiveBufferSizePredictor(
ReceiveBufferSizePredictor predictor) {
if (predictor == null) {
throw new NullPointerException("predictor");
}
this.predictor = predictor;
}
@Override
public ReceiveBufferSizePredictorFactory getReceiveBufferSizePredictorFactory() {
return predictorFactory;
}
@Override
public void setReceiveBufferSizePredictorFactory(ReceiveBufferSizePredictorFactory predictorFactory) {
if (predictorFactory == null) {
throw new NullPointerException("predictorFactory");
}
this.predictorFactory = predictorFactory;
}
} }

View File

@ -1,53 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.Channel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.WritableByteChannel;
public interface JdkChannel extends Channel, WritableByteChannel {
SelectionKey keyFor(Selector selector);
SelectionKey register(Selector selector, int interestedOps, Object attachment) throws ClosedChannelException;
boolean isRegistered();
SocketAddress getRemoteSocketAddress();
SocketAddress getLocalSocketAddress();
boolean isConnected();
boolean isSocketBound();
boolean finishConnect() throws IOException;
void disconnectSocket() throws IOException;
void closeSocket() throws IOException;
void bind(SocketAddress local) throws IOException;
void connect(SocketAddress remote) throws IOException;
void configureBlocking(boolean block) throws IOException;
}

View File

@ -15,50 +15,16 @@
*/ */
package io.netty.channel.socket.nio; package io.netty.channel.socket.nio;
import io.netty.channel.ChannelConfig;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
/** /**
* Special {@link ChannelConfig} sub-type which offers extra methods which are useful for NIO. * Special {@link ChannelConfig} sub-type which offers extra methods which are useful for NIO.
* *
*/ */
public interface NioChannelConfig extends ChannelConfig { public interface NioChannelConfig extends ChannelConfig {
/**
* Returns the high water mark of the write buffer. If the number of bytes
* queued in the write buffer exceeds this value, {@link Channel#isWritable()}
* will start to return {@code true}.
*/
int getWriteBufferHighWaterMark();
/**
* Sets the high water mark of the write buffer. If the number of bytes
* queued in the write buffer exceeds this value, {@link Channel#isWritable()}
* will start to return {@code true}.
*/
void setWriteBufferHighWaterMark(int writeBufferHighWaterMark);
/**
* Returns the low water mark of the write buffer. Once the number of bytes
* queued in the write buffer exceeded the
* {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then
* dropped down below this value, {@link Channel#isWritable()} will return
* {@code false} again.
*/
int getWriteBufferLowWaterMark();
/**
* Sets the low water mark of the write buffer. Once the number of bytes
* queued in the write buffer exceeded the
* {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then
* dropped down below this value, {@link Channel#isWritable()} will return
* {@code false} again.
*/
void setWriteBufferLowWaterMark(int writeBufferLowWaterMark);
/** /**
* Returns the maximum loop count for a write operation until * Returns the maximum loop count for a write operation until
* {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value. * {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.

View File

@ -15,11 +15,7 @@
*/ */
package io.netty.channel.socket.nio; package io.netty.channel.socket.nio;
import io.netty.channel.AdaptiveReceiveBufferSizePredictor;
import io.netty.channel.AdaptiveReceiveBufferSizePredictorFactory;
import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelConfig;
import io.netty.channel.ReceiveBufferSizePredictor;
import io.netty.channel.ReceiveBufferSizePredictorFactory;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.SocketChannelConfig; import io.netty.channel.socket.SocketChannelConfig;
@ -36,53 +32,11 @@ import io.netty.channel.socket.SocketChannelConfig;
* <tr> * <tr>
* <th>Name</th><th>Associated setter method</th> * <th>Name</th><th>Associated setter method</th>
* </tr><tr> * </tr><tr>
* <td>{@code "writeBufferHighWaterMark"}</td><td>{@link #setWriteBufferHighWaterMark(int)}</td>
* </tr><tr>
* <td>{@code "writeBufferLowWaterMark"}</td><td>{@link #setWriteBufferLowWaterMark(int)}</td>
* </tr><tr>
* <td>{@code "writeSpinCount"}</td><td>{@link #setWriteSpinCount(int)}</td> * <td>{@code "writeSpinCount"}</td><td>{@link #setWriteSpinCount(int)}</td>
* </tr><tr>
* <td>{@code "receiveBufferSizePredictor"}</td><td>{@link #setReceiveBufferSizePredictor(ReceiveBufferSizePredictor)}</td>
* </tr><tr>
* <td>{@code "receiveBufferSizePredictorFactory"}</td><td>{@link #setReceiveBufferSizePredictorFactory(ReceiveBufferSizePredictorFactory)}</td>
* </tr> * </tr>
* </table> * </table>
*/ */
public interface NioSocketChannelConfig extends SocketChannelConfig, NioChannelConfig { public interface NioSocketChannelConfig extends SocketChannelConfig, NioChannelConfig {
// This method does not provide a configuration property by itself.
// It just combined SocketChannelConfig and NioChannelConfig for user's sake.
/**
* Returns the {@link ReceiveBufferSizePredictor} which predicts the
* number of readable bytes in the socket receive buffer. The default
* predictor is <tt>{@link AdaptiveReceiveBufferSizePredictor}(64, 1024, 65536)</tt>.
*/
ReceiveBufferSizePredictor getReceiveBufferSizePredictor();
/**
* Sets the {@link ReceiveBufferSizePredictor} which predicts the
* number of readable bytes in the socket receive buffer. The default
* predictor is <tt>{@link AdaptiveReceiveBufferSizePredictor}(64, 1024, 65536)</tt>.
*/
void setReceiveBufferSizePredictor(ReceiveBufferSizePredictor predictor);
/**
* Returns the {@link ReceiveBufferSizePredictorFactory} which creates a new
* {@link ReceiveBufferSizePredictor} when a new channel is created and
* no {@link ReceiveBufferSizePredictor} was set. If no predictor was set
* for the channel, {@link #setReceiveBufferSizePredictor(ReceiveBufferSizePredictor)}
* will be called with the new predictor. The default factory is
* <tt>{@link AdaptiveReceiveBufferSizePredictorFactory}(64, 1024, 65536)</tt>.
*/
ReceiveBufferSizePredictorFactory getReceiveBufferSizePredictorFactory();
/**
* Sets the {@link ReceiveBufferSizePredictor} which creates a new
* {@link ReceiveBufferSizePredictor} when a new channel is created and
* no {@link ReceiveBufferSizePredictor} was set. If no predictor was set
* for the channel, {@link #setReceiveBufferSizePredictor(ReceiveBufferSizePredictor)}
* will be called with the new predictor. The default factory is
* <tt>{@link AdaptiveReceiveBufferSizePredictorFactory}(64, 1024, 65536)</tt>.
*/
void setReceiveBufferSizePredictorFactory(
ReceiveBufferSizePredictorFactory predictorFactory);
} }

View File

@ -19,7 +19,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageEvent; import io.netty.channel.EventLoop;
import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.socket.nio.SendBufferPool.SendBuffer; import io.netty.channel.socket.nio.SendBufferPool.SendBuffer;
import io.netty.logging.InternalLogger; import io.netty.logging.InternalLogger;
@ -47,8 +47,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import static io.netty.channel.Channels.*;
abstract class SelectorEventLoop extends SingleThreadEventLoop { abstract class SelectorEventLoop extends SingleThreadEventLoop {
/** /**
* Internal Netty logger. * Internal Netty logger.
@ -60,7 +58,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization. static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
/** /**
* The NIO {@link Selector}. * The NIO {@link Selector}.
*/ */
@ -117,7 +115,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
} }
@Override @Override
public void register(final Channel channel, final ChannelFuture future) { public EventLoop register(final Channel channel, final ChannelFuture future) {
try { try {
if (channel instanceof NioServerSocketChannel) { if (channel instanceof NioServerSocketChannel) {
final NioServerSocketChannel ch = (NioServerSocketChannel) channel; final NioServerSocketChannel ch = (NioServerSocketChannel) channel;
@ -134,7 +132,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
}); });
} else if (channel instanceof NioClientSocketChannel) { } else if (channel instanceof NioClientSocketChannel) {
final NioClientSocketChannel clientChannel = (NioClientSocketChannel) channel; final NioClientSocketChannel clientChannel = (NioClientSocketChannel) channel;
execute(new Runnable() { execute(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -164,7 +162,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
future.setFailure(t); future.setFailure(t);
fireExceptionCaught(channel, t); fireExceptionCaught(channel, t);
} }
} }
}); });
} else { } else {
@ -229,7 +227,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
if (wakenUp.get()) { if (wakenUp.get()) {
selector.wakeup(); selector.wakeup();
} }
cancelledKeys = 0; cancelledKeys = 0;
processTaskQueue(); processTaskQueue();
processSelectedKeys(selector.selectedKeys()); processSelectedKeys(selector.selectedKeys());
@ -279,7 +277,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
"Failed to close a selector.", e); "Failed to close a selector.", e);
} }
} }
private void processTaskQueue() throws IOException { private void processTaskQueue() throws IOException {
for (;;) { for (;;) {
final Runnable task = pollTask(); final Runnable task = pollTask();
@ -308,7 +306,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
if ((readyOps & SelectionKey.OP_WRITE) != 0) { if ((readyOps & SelectionKey.OP_WRITE) != 0) {
writeFromSelectorLoop(k); writeFromSelectorLoop(k);
} }
if ((readyOps & SelectionKey.OP_ACCEPT) != 0) { if ((readyOps & SelectionKey.OP_ACCEPT) != 0) {
removeKey = accept(k); removeKey = accept(k);
} }
@ -323,7 +321,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
i.remove(); i.remove();
} }
} }
if (cleanUpCancelledKeys()) { if (cleanUpCancelledKeys()) {
@ -336,7 +334,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
NioServerSocketChannel channel = (NioServerSocketChannel) key.attachment(); NioServerSocketChannel channel = (NioServerSocketChannel) key.attachment();
try { try {
boolean handled = false; boolean handled = false;
// accept all sockets that are waiting atm // accept all sockets that are waiting atm
for (;;) { for (;;) {
SocketChannel acceptedSocket = channel.socket.accept(); SocketChannel acceptedSocket = channel.socket.accept();
@ -347,7 +345,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
ChannelPipeline pipeline = ChannelPipeline pipeline =
channel.getConfig().getPipelineFactory().getPipeline(); channel.getConfig().getPipelineFactory().getPipeline();
NioWorker worker = channel.workers.nextWorker(); NioWorker worker = channel.workers.nextWorker();
worker.registerWithWorker(NioAcceptedSocketChannel.create(channel.getFactory(), pipeline, channel, worker.registerWithWorker(NioAcceptedSocketChannel.create(channel.getFactory(), pipeline, channel,
channel.getPipeline().getSink(), acceptedSocket, worker), null); channel.getPipeline().getSink(), acceptedSocket, worker), null);
handled = true; handled = true;
@ -370,8 +368,8 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
} }
return true; return true;
} }
protected void processConnectTimeout(Set<SelectionKey> keys, long currentTimeNanos) { protected void processConnectTimeout(Set<SelectionKey> keys, long currentTimeNanos) {
ConnectException cause = null; ConnectException cause = null;
for (SelectionKey k: keys) { for (SelectionKey k: keys) {
@ -385,7 +383,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
//close(k); //close(k);
continue; continue;
} }
// Something is ready so skip it // Something is ready so skip it
if (k.readyOps() != 0) { if (k.readyOps() != 0) {
continue; continue;
@ -405,9 +403,9 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
ch.getWorker().close(ch, succeededFuture(ch)); ch.getWorker().close(ch, succeededFuture(ch));
} }
} }
} }
} }
@ -425,7 +423,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
ch.getWorker().close(ch, succeededFuture(ch)); ch.getWorker().close(ch, succeededFuture(ch));
} }
} }
private boolean cleanUpCancelledKeys() throws IOException { private boolean cleanUpCancelledKeys() throws IOException {
if (cancelledKeys >= CLEANUP_INTERVAL) { if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0; cancelledKeys = 0;
@ -434,9 +432,9 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
} }
return false; return false;
} }
protected void close(SelectionKey k) { protected void close(SelectionKey k) {
Object attachment = k.attachment(); Object attachment = k.attachment();
if (attachment instanceof AbstractNioChannel) { if (attachment instanceof AbstractNioChannel) {
@ -458,7 +456,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
} }
if (scheduleWriteIfNecessary(channel)) { if (scheduleWriteIfNecessary(channel)) {
return; return;
} }
// From here, we are sure Thread.currentThread() == workerThread. // From here, we are sure Thread.currentThread() == workerThread.
@ -478,14 +476,14 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
write0(ch); write0(ch);
} }
} }
void writeFromSelectorLoop(final SelectionKey k) { void writeFromSelectorLoop(final SelectionKey k) {
AbstractNioChannel ch = (AbstractNioChannel) k.attachment(); AbstractNioChannel ch = (AbstractNioChannel) k.attachment();
ch.writeSuspended = false; ch.writeSuspended = false;
write0(ch); write0(ch);
} }
protected boolean scheduleWriteIfNecessary(final AbstractNioChannel channel) { protected boolean scheduleWriteIfNecessary(final AbstractNioChannel channel) {
if (!inEventLoop()) { if (!inEventLoop()) {
if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
@ -498,12 +496,12 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
workerSelector.wakeup(); workerSelector.wakeup();
} }
} }
return true; return true;
} }
return false; return false;
} }
protected void write0(AbstractNioChannel channel) { protected void write0(AbstractNioChannel channel) {
boolean open = true; boolean open = true;
@ -514,7 +512,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
long writtenBytes = 0; long writtenBytes = 0;
final SendBufferPool sendBufferPool = this.sendBufferPool; final SendBufferPool sendBufferPool = this.sendBufferPool;
final WritableByteChannel ch = channel.getJdkChannel(); final WritableByteChannel ch = channel.getJdkChannel();
final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue; final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
final int writeSpinCount = channel.getConfig().getWriteSpinCount(); final int writeSpinCount = channel.getConfig().getWriteSpinCount();
@ -660,11 +658,11 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
} }
} }
} }
public void close(NioServerSocketChannel channel, ChannelFuture future) { public void close(NioServerSocketChannel channel, ChannelFuture future) {
boolean inEventLoop = inEventLoop(); boolean inEventLoop = inEventLoop();
boolean bound = channel.isBound(); boolean bound = channel.isBound();
try { try {
if (channel.socket.isOpen()) { if (channel.socket.isOpen()) {
@ -705,16 +703,16 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
fireExceptionCaught(channel, t); fireExceptionCaught(channel, t);
} else { } else {
fireExceptionCaughtLater(channel, t); fireExceptionCaughtLater(channel, t);
} }
} }
} }
public void close(AbstractNioChannel channel, ChannelFuture future) { public void close(AbstractNioChannel channel, ChannelFuture future) {
boolean connected = channel.isConnected(); boolean connected = channel.isConnected();
boolean bound = channel.isBound(); boolean bound = channel.isBound();
boolean inEventLoop = inEventLoop(); boolean inEventLoop = inEventLoop();
try { try {
channel.getJdkChannel().close(); channel.getJdkChannel().close();
cancelledKeys ++; cancelledKeys ++;
@ -825,16 +823,16 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
// Override OP_WRITE flag - a user cannot change this flag. // Override OP_WRITE flag - a user cannot change this flag.
interestOps &= ~Channel.OP_WRITE; interestOps &= ~Channel.OP_WRITE;
interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE; interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE;
if (key == null || selector == null) { if (key == null || selector == null) {
if (channel.getRawInterestOps() != interestOps) { if (channel.getRawInterestOps() != interestOps) {
changed = true; changed = true;
} }
// Not registered to the worker yet. // Not registered to the worker yet.
// Set the rawInterestOps immediately; RegisterTask will pick it up. // Set the rawInterestOps immediately; RegisterTask will pick it up.
channel.setRawInterestOpsNow(interestOps); channel.setRawInterestOpsNow(interestOps);
future.setSuccess(); future.setSuccess();
if (changed) { if (changed) {
if (inEventLoop) { if (inEventLoop) {
@ -843,10 +841,10 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
fireChannelInterestChangedLater(channel); fireChannelInterestChangedLater(channel);
} }
} }
return; return;
} }
switch (CONSTRAINT_LEVEL) { switch (CONSTRAINT_LEVEL) {
case 0: case 0:
if (channel.getRawInterestOps() != interestOps) { if (channel.getRawInterestOps() != interestOps) {
@ -913,7 +911,7 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
} }
} }
} }
/** /**
* Read is called when a Selector has been notified that the underlying channel * Read is called when a Selector has been notified that the underlying channel
* was something to be read. The channel would previously have registered its interest * was something to be read. The channel would previously have registered its interest
@ -924,5 +922,5 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
protected abstract boolean read(SelectionKey k); protected abstract boolean read(SelectionKey k);
protected abstract void registerTask(AbstractNioChannel channel, ChannelFuture future); protected abstract void registerTask(AbstractNioChannel channel, ChannelFuture future);
} }

View File

@ -1,346 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.WritableByteChannel;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.FileRegion;
public class SendBufferPool {
private static final SendBuffer EMPTY_BUFFER = new EmptySendBuffer();
public static final int DEFAULT_PREALLOCATION_SIZE = 65536;
public static final int ALIGN_SHIFT = 4;
public static final int ALIGN_MASK = 15;
protected PreallocationRef poolHead;
protected Preallocation current = new Preallocation(DEFAULT_PREALLOCATION_SIZE);
public SendBufferPool() {
}
public SendBuffer acquire(Object message) {
if (message instanceof ChannelBuffer) {
return acquire((ChannelBuffer) message);
} else if (message instanceof FileRegion) {
return acquire((FileRegion) message);
}
throw new IllegalArgumentException(
"unsupported message type: " + message.getClass());
}
protected SendBuffer acquire(FileRegion src) {
if (src.getCount() == 0) {
return EMPTY_BUFFER;
}
return new FileSendBuffer(src);
}
private SendBuffer acquire(ChannelBuffer src) {
final int size = src.readableBytes();
if (size == 0) {
return EMPTY_BUFFER;
}
if (src.isDirect()) {
return new UnpooledSendBuffer(src.toByteBuffer());
}
if (src.readableBytes() > DEFAULT_PREALLOCATION_SIZE) {
return new UnpooledSendBuffer(src.toByteBuffer());
}
Preallocation current = this.current;
ByteBuffer buffer = current.buffer;
int remaining = buffer.remaining();
PooledSendBuffer dst;
if (size < remaining) {
int nextPos = buffer.position() + size;
ByteBuffer slice = buffer.duplicate();
buffer.position(align(nextPos));
slice.limit(nextPos);
current.refCnt ++;
dst = new PooledSendBuffer(current, slice);
} else if (size > remaining) {
this.current = current = getPreallocation();
buffer = current.buffer;
ByteBuffer slice = buffer.duplicate();
buffer.position(align(size));
slice.limit(size);
current.refCnt ++;
dst = new PooledSendBuffer(current, slice);
} else { // size == remaining
current.refCnt ++;
this.current = getPreallocation0();
dst = new PooledSendBuffer(current, current.buffer);
}
ByteBuffer dstbuf = dst.buffer;
dstbuf.mark();
src.getBytes(src.readerIndex(), dstbuf);
dstbuf.reset();
return dst;
}
protected Preallocation getPreallocation() {
Preallocation current = this.current;
if (current.refCnt == 0) {
current.buffer.clear();
return current;
}
return getPreallocation0();
}
protected Preallocation getPreallocation0() {
PreallocationRef ref = poolHead;
if (ref != null) {
do {
Preallocation p = ref.get();
ref = ref.next;
if (p != null) {
poolHead = ref;
return p;
}
} while (ref != null);
poolHead = ref;
}
return new Preallocation(DEFAULT_PREALLOCATION_SIZE);
}
protected static int align(int pos) {
int q = pos >>> ALIGN_SHIFT;
int r = pos & ALIGN_MASK;
if (r != 0) {
q ++;
}
return q << ALIGN_SHIFT;
}
public static final class Preallocation {
public final ByteBuffer buffer;
public int refCnt;
public Preallocation(int capacity) {
buffer = ByteBuffer.allocateDirect(capacity);
}
}
public final class PreallocationRef extends SoftReference<Preallocation> {
final PreallocationRef next;
public PreallocationRef(Preallocation prealloation, PreallocationRef next) {
super(prealloation);
this.next = next;
}
}
public interface SendBuffer {
boolean finished();
long writtenBytes();
long totalBytes();
long transferTo(WritableByteChannel ch) throws IOException;
long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException;
void release();
}
public class UnpooledSendBuffer implements SendBuffer {
protected final ByteBuffer buffer;
final int initialPos;
public UnpooledSendBuffer(ByteBuffer buffer) {
this.buffer = buffer;
initialPos = buffer.position();
}
@Override
public final boolean finished() {
return !buffer.hasRemaining();
}
@Override
public final long writtenBytes() {
return buffer.position() - initialPos;
}
@Override
public final long totalBytes() {
return buffer.limit() - initialPos;
}
@Override
public final long transferTo(WritableByteChannel ch) throws IOException {
return ch.write(buffer);
}
@Override
public final long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
return ch.send(buffer, raddr);
}
@Override
public void release() {
// Unpooled.
}
}
public class PooledSendBuffer implements SendBuffer {
protected final Preallocation parent;
public final ByteBuffer buffer;
final int initialPos;
public PooledSendBuffer(Preallocation parent, ByteBuffer buffer) {
this.parent = parent;
this.buffer = buffer;
initialPos = buffer.position();
}
@Override
public boolean finished() {
return !buffer.hasRemaining();
}
@Override
public long writtenBytes() {
return buffer.position() - initialPos;
}
@Override
public long totalBytes() {
return buffer.limit() - initialPos;
}
@Override
public long transferTo(WritableByteChannel ch) throws IOException {
return ch.write(buffer);
}
@Override
public long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
return ch.send(buffer, raddr);
}
@Override
public void release() {
final Preallocation parent = this.parent;
if (-- parent.refCnt == 0) {
parent.buffer.clear();
if (parent != current) {
poolHead = new PreallocationRef(parent, poolHead);
}
}
}
}
static final class FileSendBuffer implements SendBuffer {
private final FileRegion file;
private long writtenBytes;
FileSendBuffer(FileRegion file) {
this.file = file;
}
@Override
public boolean finished() {
return writtenBytes >= file.getCount();
}
@Override
public long writtenBytes() {
return writtenBytes;
}
@Override
public long totalBytes() {
return file.getCount();
}
@Override
public long transferTo(WritableByteChannel ch) throws IOException {
long localWrittenBytes = file.transferTo(ch, writtenBytes);
writtenBytes += localWrittenBytes;
return localWrittenBytes;
}
@Override
public long transferTo(DatagramChannel ch, SocketAddress raddr)
throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void release() {
if (file.releaseAfterTransfer()) {
// Make sure the FileRegion resource are released otherwise it may cause a FD leak or something similar
file.releaseExternalResources();
}
}
}
static final class EmptySendBuffer implements SendBuffer {
EmptySendBuffer() {
}
@Override
public boolean finished() {
return true;
}
@Override
public long writtenBytes() {
return 0;
}
@Override
public long totalBytes() {
return 0;
}
@Override
public long transferTo(WritableByteChannel ch) throws IOException {
return 0;
}
@Override
public long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
return 0;
}
@Override
public void release() {
// Unpooled.
}
}
}

View File

@ -1,48 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import io.netty.channel.socket.Worker;
import io.netty.util.ExternalResourceReleasable;
/**
* This implementation of a {@link WorkerPool} should be used if you plan to share a {@link WorkerPool} between different Factories. You will need to call {@link #destroy()} by your own once
* you want to release any resources of it.
*
*
*/
public final class ShareableWorkerPool<E extends Worker> implements WorkerPool<E> {
private final WorkerPool<E> wrapped;
public ShareableWorkerPool(WorkerPool<E> wrapped) {
this.wrapped = wrapped;
}
@Override
public E nextWorker() {
return wrapped.nextWorker();
}
/**
* Destroy the {@link ShareableWorkerPool} and release all resources. After this is called its not usable anymore
*/
public void destroy() {
if (wrapped instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) wrapped).releaseExternalResources();
}
}
}

View File

@ -1,35 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import io.netty.channel.socket.Worker;
/**
* The {@link WorkerPool} is responsible to hand of {@link Worker}'s on demand
*
*/
public interface WorkerPool<E extends Worker> {
/**
* Return the next {@link Worker} to use
*
* @return worker
*/
E nextWorker();
}