diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 88c8d1ae28..8fe62ea711 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -58,7 +58,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private final ChannelFuture succeededFuture = new SucceededChannelFuture(this); private volatile EventLoop eventLoop; + private volatile boolean registered; private volatile boolean notifiedClosureListeners; + private ChannelFuture connectFuture; /** Cache for the string representation of this channel */ private boolean strValActive; @@ -116,6 +118,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return eventLoop; } + @Override + public boolean isRegistered() { + return registered; + } + @Override public void bind(SocketAddress localAddress, ChannelFuture future) { pipeline().bind(localAddress, future); @@ -210,10 +217,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } - /** - * A {@link Channel} implementation must call this method when it is closed. - */ - protected void notifyClosureListeners() { + private void notifyClosureListeners() { final ChannelFutureListener[] array; synchronized (closureListeners) { if (notifiedClosureListeners) { @@ -382,6 +386,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha assert eventLoop().inEventLoop(); doRegister(future); + assert future.isDone(); + if (registered = future.isSuccess()) { + pipeline().fireChannelRegistered(); + } } @Override @@ -400,18 +408,32 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelFuture future) { + // XXX: What if a user makes a connection attempt twice? if (eventLoop().inEventLoop()) { doConnect(remoteAddress, localAddress, future); + if (!future.isDone()) { + connectFuture = future; + } } else { eventLoop().execute(new Runnable() { @Override public void run() { doConnect(remoteAddress, localAddress, future); + if (!future.isDone()) { + connectFuture = future; + } } }); } } + @Override + public void finishConnect() { + assert eventLoop().inEventLoop(); + assert connectFuture != null; + doFinishConnect(connectFuture); + } + @Override public void disconnect(final ChannelFuture future) { if (eventLoop().inEventLoop()) { @@ -430,11 +452,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha public void close(final ChannelFuture future) { if (eventLoop().inEventLoop()) { doClose(future); + notifyClosureListeners(); } else { eventLoop().execute(new Runnable() { @Override public void run() { doClose(future); + notifyClosureListeners(); } }); } @@ -446,6 +470,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha try { doDeregister(future); } finally { + registered = false; + pipeline().fireChannelUnregistered(); eventLoop = null; } } else { @@ -455,6 +481,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha try { doDeregister(future); } finally { + registered = false; + pipeline().fireChannelUnregistered(); eventLoop = null; } } @@ -493,6 +521,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha protected abstract void doRegister(ChannelFuture future); protected abstract void doBind(SocketAddress localAddress, ChannelFuture future); protected abstract void doConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future); + protected abstract void doFinishConnect(ChannelFuture future); protected abstract void doDisconnect(ChannelFuture future); protected abstract void doClose(ChannelFuture future); protected abstract void doDeregister(ChannelFuture future); diff --git a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java index 4a192cb7ca..8c9a48d541 100644 --- a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java @@ -51,11 +51,26 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S return null; } + @Override + protected ChannelBufferHolder firstOut() { + return out; + } + + @Override + protected SocketAddress remoteAddress0() { + return null; + } + @Override protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) { future.setFailure(new UnsupportedOperationException()); } + @Override + protected void doFinishConnect(ChannelFuture future) { + future.setFailure(new UnsupportedOperationException()); + } + @Override protected void doDisconnect(ChannelFuture future) { future.setFailure(new UnsupportedOperationException()); diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index bab0aaf291..b867a97fce 100644 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -175,6 +175,8 @@ public interface Channel extends AttributeMap, ChannelFutureFactory, Comparable< void flush(ChannelFuture future); void write(Object message, ChannelFuture future); + // FIXME: Introduce more flexible channel state notification mechanism + // - notify me when channel becomes (un)registered, (in)active void addClosureListener(ChannelFutureListener listener); void removeClosureListener(ChannelFutureListener remover); @@ -190,6 +192,7 @@ public interface Channel extends AttributeMap, ChannelFutureFactory, Comparable< void register(EventLoop eventLoop, ChannelFuture future); void bind(SocketAddress localAddress, ChannelFuture future); void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future); + void finishConnect(); void disconnect(ChannelFuture future); void close(ChannelFuture future); void deregister(ChannelFuture future); diff --git a/transport/src/main/java/io/netty/channel/ChannelFactory.java b/transport/src/main/java/io/netty/channel/ChannelFactory.java deleted file mode 100644 index 4d2e9bf2d4..0000000000 --- a/transport/src/main/java/io/netty/channel/ChannelFactory.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel; - -import java.util.concurrent.Executor; - -import io.netty.channel.group.ChannelGroup; -import io.netty.channel.socket.nio.NioServerSocketChannelFactory; -import io.netty.util.ExternalResourceReleasable; - - -/** - * The main interface to a transport that creates a {@link Channel} associated - * with a certain communication entity such as a network socket. For example, - * the {@link NioServerSocketChannelFactory} creates a channel which has a - * NIO-based server socket as its underlying communication entity. - *

- * Once a new {@link Channel} is created, the {@link ChannelPipeline} which - * was specified as a parameter in the {@link #newChannel(ChannelPipeline)} - * is attached to the new {@link Channel}, and starts to handle all associated - * {@link ChannelEvent}s. - * - *

Graceful shutdown

- *

- * To shut down a network application service which is managed by a factory. - * you should follow the following steps: - *

    - *
  1. close all channels created by the factory and their child channels - * usually using {@link ChannelGroup#close()}, and
  2. - *
  3. call {@link #releaseExternalResources()}.
  4. - *
- *

- * For detailed transport-specific information on shutting down a factory, - * please refer to the Javadoc of {@link ChannelFactory}'s subtypes, such as - * {@link NioServerSocketChannelFactory}. - * @apiviz.landmark - * @apiviz.has io.netty.channel.Channel oneway - - creates - * - * @apiviz.exclude ^io\.netty\.channel\.([a-z]+\.)+.*ChannelFactory$ - */ -public interface ChannelFactory extends ExternalResourceReleasable { - - /** - * Creates and opens a new {@link Channel} and attaches the specified - * {@link ChannelPipeline} to the new {@link Channel}. - * - * @param pipeline the {@link ChannelPipeline} which is going to be - * attached to the new {@link Channel} - * - * @return the newly open channel - * - * @throws ChannelException if failed to create and open a new channel - */ - Channel newChannel(ChannelPipeline pipeline); - - /** - * Releases the external resources that this factory depends on to function. - * An external resource is a resource that this factory didn't create by - * itself. For example, {@link Executor}s that you specified in the factory - * constructor are external resources. You can call this method to release - * all external resources conveniently when the resources are not used by - * this factory or any other part of your application. An unexpected - * behavior will be resulted in if the resources are released when there's - * an open channel which is managed by this factory. - */ - @Override - void releaseExternalResources(); -} diff --git a/transport/src/main/java/io/netty/channel/ServerChannelFactory.java b/transport/src/main/java/io/netty/channel/ServerChannelFactory.java deleted file mode 100644 index 3f8dfd6279..0000000000 --- a/transport/src/main/java/io/netty/channel/ServerChannelFactory.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel; - -/** - * A {@link ChannelFactory} that creates a {@link ServerChannel}. - * @apiviz.has io.netty.channel.ServerChannel oneway - - creates - */ -public interface ServerChannelFactory extends ChannelFactory { - @Override - ServerChannel newChannel(ChannelPipeline pipeline); -} diff --git a/transport/src/main/java/io/netty/channel/socket/ClientSocketChannelFactory.java b/transport/src/main/java/io/netty/channel/socket/ClientSocketChannelFactory.java deleted file mode 100644 index 94143bd8a5..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/ClientSocketChannelFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket; - -import io.netty.channel.ChannelFactory; -import io.netty.channel.ChannelPipeline; - -/** - * A {@link ChannelFactory} which creates a client-side {@link SocketChannel}. - * @apiviz.has io.netty.channel.socket.SocketChannel oneway - - creates - */ -public interface ClientSocketChannelFactory extends ChannelFactory { - @Override - SocketChannel newChannel(ChannelPipeline pipeline); -} diff --git a/transport/src/main/java/io/netty/channel/socket/DatagramChannelFactory.java b/transport/src/main/java/io/netty/channel/socket/DatagramChannelFactory.java deleted file mode 100644 index b6f5a9707e..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/DatagramChannelFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket; - -import io.netty.channel.ChannelFactory; -import io.netty.channel.ChannelPipeline; - -/** - * A {@link ChannelFactory} which creates a {@link DatagramChannel}. - * @apiviz.has io.netty.channel.socket.DatagramChannel oneway - - creates - */ -public interface DatagramChannelFactory extends ChannelFactory { - @Override - DatagramChannel newChannel(ChannelPipeline pipeline); -} diff --git a/transport/src/main/java/io/netty/channel/socket/ServerSocketChannelFactory.java b/transport/src/main/java/io/netty/channel/socket/ServerSocketChannelFactory.java deleted file mode 100644 index 2f4dd87c21..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/ServerSocketChannelFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket; - -import io.netty.channel.ChannelFactory; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ServerChannelFactory; - -/** - * A {@link ChannelFactory} which creates a {@link ServerSocketChannel}. - * @apiviz.has io.netty.channel.socket.ServerSocketChannel oneway - - creates - */ -public interface ServerSocketChannelFactory extends ServerChannelFactory { - @Override - ServerSocketChannel newChannel(ChannelPipeline pipeline); -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java index d34bcee481..7b6f469359 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java @@ -17,41 +17,21 @@ package io.netty.channel.socket.nio; import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; import java.net.InetSocketAddress; import java.nio.channels.SelectableChannel; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.nio.channels.SelectionKey; public abstract class AbstractNioChannel extends AbstractChannel { - /** - * Indicates if there is a {@link WriteTask} in the task queue. - */ - final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean(); - - /** - * Keeps track of the number of bytes that the {@link WriteRequestQueue} currently - * contains. - */ - final AtomicInteger writeBufferSize = new AtomicInteger(); - - /** - * Keeps track of the highWaterMark. - */ - final AtomicInteger highWaterMarkCounter = new AtomicInteger(); - - /** - * Boolean that indicates that write operation is in progress. - */ - protected boolean inWriteNowLoop; - protected boolean writeSuspended; - + private final SelectableChannel ch; private volatile InetSocketAddress localAddress; - volatile InetSocketAddress remoteAddress; + private volatile InetSocketAddress remoteAddress; - private final SelectableChannel ch; + private volatile SelectionKey selectionKey; protected AbstractNioChannel(Integer id, Channel parent, SelectableChannel ch) { super(id, parent); @@ -68,6 +48,11 @@ public abstract class AbstractNioChannel extends AbstractChannel { return ch; } + protected SelectionKey selectionKey() { + assert selectionKey != null; + return selectionKey; + } + @Override public InetSocketAddress localAddress() { InetSocketAddress localAddress = this.localAddress; @@ -100,4 +85,18 @@ public abstract class AbstractNioChannel extends AbstractChannel { @Override public abstract NioChannelConfig config(); + + @Override + protected void doRegister(ChannelFuture future) { + if (!(eventLoop() instanceof SelectorEventLoop)) { + throw new ChannelException("unsupported event loop: " + eventLoop().getClass().getName()); + } + + SelectorEventLoop loop = (SelectorEventLoop) eventLoop(); + try { + selectionKey = javaChannel().register(loop.selector, javaChannel().validOps() & ~SelectionKey.OP_WRITE, this); + } catch (Exception e) { + throw new ChannelException("failed to register a channel", e); + } + } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioAcceptedSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioAcceptedSocketChannel.java deleted file mode 100644 index bec5114f8b..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioAcceptedSocketChannel.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import static io.netty.channel.Channels.*; - -import java.nio.channels.SocketChannel; - -import io.netty.channel.Channel; -import io.netty.channel.ChannelFactory; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelSink; - -final class NioAcceptedSocketChannel extends NioSocketChannel { - - - static NioAcceptedSocketChannel create(ChannelFactory factory, - ChannelPipeline pipeline, Channel parent, ChannelSink sink, - SocketChannel socket, NioWorker worker) { - NioAcceptedSocketChannel instance = new NioAcceptedSocketChannel( - factory, pipeline, parent, sink, socket, worker); - instance.setConnected(); - fireChannelOpen(instance); - return instance; - } - - private NioAcceptedSocketChannel( - ChannelFactory factory, ChannelPipeline pipeline, - Channel parent, ChannelSink sink, - SocketChannel socket, NioWorker worker) { - - super(parent, factory, pipeline, sink, socket, worker); - } -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketChannel.java deleted file mode 100644 index 756f9d0cc5..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketChannel.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import static io.netty.channel.Channels.*; - -import java.io.IOException; -import java.nio.channels.SocketChannel; - -import io.netty.channel.ChannelException; -import io.netty.channel.ChannelFactory; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelSink; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; - -final class NioClientSocketChannel extends NioSocketChannel { - - private static final InternalLogger logger = - InternalLoggerFactory.getInstance(NioClientSocketChannel.class); - - private static SocketChannel newSocket() { - SocketChannel socket; - try { - socket = SocketChannel.open(); - } catch (IOException e) { - throw new ChannelException("Failed to open a socket.", e); - } - - boolean success = false; - try { - socket.configureBlocking(false); - success = true; - } catch (IOException e) { - throw new ChannelException("Failed to enter non-blocking mode.", e); - } finally { - if (!success) { - try { - socket.close(); - } catch (IOException e) { - if (logger.isWarnEnabled()) { - logger.warn( - "Failed to close a partially initialized socket.", - e); - } - - } - } - } - - return socket; - } - - volatile ChannelFuture connectFuture; - volatile boolean boundManually; - - // Does not need to be volatile as it's accessed by only one thread. - long connectDeadlineNanos; - - static NioClientSocketChannel create(ChannelFactory factory, - ChannelPipeline pipeline, ChannelSink sink, NioWorker worker) { - NioClientSocketChannel instance = - new NioClientSocketChannel(factory, pipeline, sink, worker); - fireChannelOpen(instance); - return instance; - } - - private NioClientSocketChannel( - ChannelFactory factory, ChannelPipeline pipeline, - ChannelSink sink, NioWorker worker) { - - super(null, factory, pipeline, sink, newSocket(), worker); - } -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketChannelFactory.java b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketChannelFactory.java deleted file mode 100644 index 0b9af5f713..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketChannelFactory.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import java.nio.channels.Selector; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; - -import io.netty.channel.Channel; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelSink; -import io.netty.channel.group.ChannelGroup; -import io.netty.channel.socket.ClientSocketChannelFactory; -import io.netty.channel.socket.SocketChannel; -import io.netty.util.ExternalResourceReleasable; - -/** - * A {@link ClientSocketChannelFactory} which creates a client-side NIO-based - * {@link SocketChannel}. It utilizes the non-blocking I/O mode which was - * introduced with NIO to serve many number of concurrent connections - * efficiently. - * - *

How threads work

- *

- * There are two types of threads in a {@link NioClientSocketChannelFactory}; - * one is boss thread and the other is worker thread. - * - *

Boss thread

- *

- * One {@link NioClientSocketChannelFactory} has one boss thread. It makes - * a connection attempt on request. Once a connection attempt succeeds, - * the boss thread passes the connected {@link Channel} to one of the worker - * threads that the {@link NioClientSocketChannelFactory} manages. - * - *

Worker threads

- *

- * One {@link NioClientSocketChannelFactory} can have one or more worker - * threads. A worker thread performs non-blocking read and write for one or - * more {@link Channel}s in a non-blocking mode. - * - *

Life cycle of threads and graceful shutdown

- *

- * All threads are acquired from the {@link Executor}s which were specified - * when a {@link NioClientSocketChannelFactory} was created. A boss thread is - * acquired from the {@code bossExecutor}, and worker threads are acquired from - * the {@code workerExecutor}. Therefore, you should make sure the specified - * {@link Executor}s are able to lend the sufficient number of threads. - * It is the best bet to specify {@linkplain Executors#newCachedThreadPool() a cached thread pool}. - *

- * Both boss and worker threads are acquired lazily, and then released when - * there's nothing left to process. All the related resources such as - * {@link Selector} are also released when the boss and worker threads are - * released. Therefore, to shut down a service gracefully, you should do the - * following: - * - *

    - *
  1. close all channels created by the factory usually using - * {@link ChannelGroup#close()}, and
  2. - *
  3. call {@link #releaseExternalResources()}.
  4. - *
- * - * Please make sure not to shut down the executor until all channels are - * closed. Otherwise, you will end up with a {@link RejectedExecutionException} - * and the related resources might not be released properly. - * @apiviz.landmark - */ -public class NioClientSocketChannelFactory implements ClientSocketChannelFactory { - - - private final WorkerPool workerPool; - private final ChannelSink sink; - - /** - * Creates a new {@link NioClientSocketChannelFactory} which uses {@link Executors#newCachedThreadPool()} for the worker executor. - * - * See {@link #NioClientSocketChannelFactory(Executor, Executor)} - */ - public NioClientSocketChannelFactory() { - this(Executors.newCachedThreadPool()); - } - - /** - * Creates a new instance. Calling this constructor is same with calling - * {@link #NioClientSocketChannelFactory(Executor, Executor, int, int)} with - * 1 and (2 * the number of available processors in the machine) for - * bossCount and workerCount respectively. The number of - * available processors is obtained by {@link Runtime#availableProcessors()}. - * - * @param workerExecutor - * the {@link Executor} which will execute the worker threads - */ - public NioClientSocketChannelFactory(Executor workerExecutor) { - this(workerExecutor, SelectorUtil.DEFAULT_IO_THREADS); - } - - /** - * Creates a new instance. Calling this constructor is same with calling - * {@link #NioClientSocketChannelFactory(Executor, int, int)} with - * 1 as bossCount. - * - * @param workerExecutor - * the {@link Executor} which will execute the worker threads - * @param workerCount - * the maximum number of worker threads - */ - public NioClientSocketChannelFactory(Executor workerExecutor, - int workerCount) { - this(new NioWorkerPool(workerExecutor, workerCount, true)); - } - - public NioClientSocketChannelFactory(WorkerPool workerPool) { - - if (workerPool == null) { - throw new NullPointerException("workerPool"); - } - - - this.workerPool = workerPool; - sink = new NioClientSocketPipelineSink(); - } - - - @Override - public SocketChannel newChannel(ChannelPipeline pipeline) { - return NioClientSocketChannel.create(this, pipeline, sink, workerPool.nextWorker()); - } - - @Override - public void releaseExternalResources() { - if (workerPool instanceof ExternalResourceReleasable) { - ((ExternalResourceReleasable) workerPool).releaseExternalResources(); - } - } -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java deleted file mode 100644 index 394fa93d42..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import static io.netty.channel.Channels.fireChannelBound; -import static io.netty.channel.Channels.fireExceptionCaught; -import static io.netty.channel.Channels.succeededFuture; -import io.netty.channel.ChannelEvent; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelState; -import io.netty.channel.ChannelStateEvent; -import io.netty.channel.MessageEvent; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; - -import java.net.SocketAddress; -import java.nio.channels.ClosedChannelException; - -class NioClientSocketPipelineSink extends AbstractNioChannelSink { - - static final InternalLogger logger = - InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class); - - @Override - public void eventSunk( - ChannelPipeline pipeline, ChannelEvent e) throws Exception { - if (e instanceof ChannelStateEvent) { - ChannelStateEvent event = (ChannelStateEvent) e; - NioClientSocketChannel channel = - (NioClientSocketChannel) event.channel(); - ChannelFuture future = event.getFuture(); - ChannelState state = event.getState(); - Object value = event.getValue(); - - switch (state) { - case OPEN: - if (Boolean.FALSE.equals(value)) { - channel.getWorker().close(channel, future); - } - break; - case BOUND: - if (value != null) { - bind(channel, future, (SocketAddress) value); - } else { - channel.getWorker().close(channel, future); - } - break; - case CONNECTED: - if (value != null) { - connect(channel, future, (SocketAddress) value); - } else { - channel.getWorker().close(channel, future); - } - break; - case INTEREST_OPS: - channel.getWorker().setInterestOps(channel, future, ((Integer) value).intValue()); - break; - } - } else if (e instanceof MessageEvent) { - MessageEvent event = (MessageEvent) e; - NioSocketChannel channel = (NioSocketChannel) event.channel(); - boolean offered = channel.writeBufferQueue.offer(event); - assert offered; - channel.getWorker().writeFromUserCode(channel); - } - } - - private void bind( - NioClientSocketChannel channel, ChannelFuture future, - SocketAddress localAddress) { - try { - channel.getJdkChannel().bind(localAddress); - channel.boundManually = true; - channel.setBound(); - future.setSuccess(); - fireChannelBound(channel, channel.getLocalAddress()); - } catch (Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } - } - - private void connect( - final NioClientSocketChannel channel, final ChannelFuture cf, - SocketAddress remoteAddress) { - try { - channel.getJdkChannel().connect(remoteAddress); - channel.getCloseFuture().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture f) - throws Exception { - if (!cf.isDone()) { - cf.setFailure(new ClosedChannelException()); - } - } - }); - cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); - channel.connectFuture = cf; - - channel.getWorker().registerWithWorker(channel, cf); - - } catch (Throwable t) { - t.printStackTrace(); - cf.setFailure(t); - fireExceptionCaught(channel, t); - channel.getWorker().close(channel, succeededFuture(channel)); - } - } -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelFactory.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelFactory.java deleted file mode 100644 index 3f6c79d6ec..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelFactory.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import java.nio.channels.Selector; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; - -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelSink; -import io.netty.channel.group.ChannelGroup; -import io.netty.channel.socket.DatagramChannel; -import io.netty.channel.socket.DatagramChannelFactory; -import io.netty.channel.socket.Worker; -import io.netty.channel.socket.nio.NioDatagramChannel.ProtocolFamily; -import io.netty.channel.socket.oio.OioDatagramChannelFactory; -import io.netty.util.ExternalResourceReleasable; - -/** - * A {@link DatagramChannelFactory} that creates a NIO-based connectionless - * {@link DatagramChannel}. It utilizes the non-blocking I/O mode which - * was introduced with NIO to serve many number of concurrent connections - * efficiently. - * - *

How threads work

- *

- * There is only one thread type in a {@link NioDatagramChannelFactory}; - * worker threads. - * - *

Worker threads

- *

- * One {@link NioDatagramChannelFactory} can have one or more worker - * threads. A worker thread performs non-blocking read and write for one or - * more {@link DatagramChannel}s in a non-blocking mode. - * - *

Life cycle of threads and graceful shutdown

- *

- * All worker threads are acquired from the {@link Executor} which was specified - * when a {@link NioDatagramChannelFactory} was created. Therefore, you should - * make sure the specified {@link Executor} is able to lend the sufficient - * number of threads. It is the best bet to specify - * {@linkplain Executors#newCachedThreadPool() a cached thread pool}. - *

- * All worker threads are acquired lazily, and then released when there's - * nothing left to process. All the related resources such as {@link Selector} - * are also released when the worker threads are released. Therefore, to shut - * down a service gracefully, you should do the following: - * - *

    - *
  1. close all channels created by the factory usually using - * {@link ChannelGroup#close()}, and
  2. - *
  3. call {@link #releaseExternalResources()}.
  4. - *
- * - * Please make sure not to shut down the executor until all channels are - * closed. Otherwise, you will end up with a {@link RejectedExecutionException} - * and the related resources might not be released properly. - * - *

Limitation

- *

- * Multicast is not supported. Please use {@link OioDatagramChannelFactory} - * instead. - * @apiviz.landmark - */ -public class NioDatagramChannelFactory implements DatagramChannelFactory { - - private final ChannelSink sink; - private final WorkerPool workerPool; - private final NioDatagramChannel.ProtocolFamily family; - - /** - * Create a new {@link NioDatagramChannelFactory} with a {@link Executors#newCachedThreadPool()}. - * - * See {@link #NioDatagramChannelFactory(Executor)} - */ - public NioDatagramChannelFactory() { - this(Executors.newCachedThreadPool()); - } - - - /** - * Creates a new instance. Calling this constructor is same with calling - * {@link #NioDatagramChannelFactory(Executor, int)} with 2 * the number of - * available processors in the machine. The number of available processors - * is obtained by {@link Runtime#availableProcessors()}. - * - * @param workerExecutor - * the {@link Executor} which will execute the I/O worker threads - */ - public NioDatagramChannelFactory(final Executor workerExecutor) { - this(workerExecutor, SelectorUtil.DEFAULT_IO_THREADS); - } - - /** - * Creates a new instance. - * - * @param workerExecutor - * the {@link Executor} which will execute the I/O worker threads - * @param workerCount - * the maximum number of I/O worker threads - */ - public NioDatagramChannelFactory(final Executor workerExecutor, - final int workerCount) { - this(new NioDatagramWorkerPool(workerExecutor, workerCount, true)); - } - - /** - * Creates a new instance. - * - * @param workerPool - * the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads - */ - public NioDatagramChannelFactory(WorkerPool workerPool) { - this(workerPool, null); - } - - - - /** - * Creates a new instance. Calling this constructor is same with calling - * {@link #NioDatagramChannelFactory(Executor, int)} with 2 * the number of - * available processors in the machine. The number of available processors - * is obtained by {@link Runtime#availableProcessors()}. - * - * @param workerExecutor - * the {@link Executor} which will execute the I/O worker threads - * @param family - * the {@link ProtocolFamily} to use. This should be used for UDP multicast. - * Be aware that this option is only considered when running on java7+ - */ - public NioDatagramChannelFactory(final Executor workerExecutor, ProtocolFamily family) { - this(workerExecutor, SelectorUtil.DEFAULT_IO_THREADS, family); - } - - /** - * Creates a new instance. - * - * @param workerExecutor - * the {@link Executor} which will execute the I/O worker threads - * @param workerCount - * the maximum number of I/O worker threads - * @param family - * the {@link ProtocolFamily} to use. This should be used for UDP multicast. - * Be aware that this option is only considered when running on java7+ - */ - public NioDatagramChannelFactory(final Executor workerExecutor, - final int workerCount, ProtocolFamily family) { - this(new NioDatagramWorkerPool(workerExecutor, workerCount, true), family); - } - - /** - * Creates a new instance. - * - * @param workerPool - * the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads - * @param family - * the {@link ProtocolFamily} to use. This should be used for UDP multicast. - * Be aware that this option is only considered when running on java7+ - */ - public NioDatagramChannelFactory(WorkerPool workerPool, ProtocolFamily family) { - this.workerPool = workerPool; - this.family = family; - sink = new NioDatagramPipelineSink(); - } - - - - @Override - public DatagramChannel newChannel(final ChannelPipeline pipeline) { - return NioDatagramChannel.create(this, pipeline, sink, workerPool.nextWorker(), family); - } - - @Override - public void releaseExternalResources() { - if (workerPool instanceof ExternalResourceReleasable) { - ((ExternalResourceReleasable) workerPool).releaseExternalResources(); - } - - } -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramJdkChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramJdkChannel.java deleted file mode 100644 index 2461b3ad4c..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramJdkChannel.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; - -class NioDatagramJdkChannel extends AbstractJdkChannel { - - NioDatagramJdkChannel(DatagramChannel channel) { - super(channel); - } - - @Override - protected DatagramChannel getChannel() { - return (DatagramChannel) super.getChannel(); - } - - @Override - public InetSocketAddress getRemoteSocketAddress() { - return (InetSocketAddress) getChannel().socket().getRemoteSocketAddress(); - } - - @Override - public InetSocketAddress getLocalSocketAddress() { - return (InetSocketAddress) getChannel().socket().getLocalSocketAddress(); - } - - @Override - public boolean isSocketBound() { - return getChannel().socket().isBound(); - } - - - - @Override - public void bind(SocketAddress local) throws IOException { - getChannel().socket().bind(local); - } - - @Override - public void connect(SocketAddress remote) throws IOException { - getChannel().connect(remote); - } - - - @Override - public boolean isConnected() { - return getChannel().isConnected(); - } - - @Override - public void disconnectSocket() throws IOException { - getChannel().disconnect(); - } - - @Override - public void closeSocket() throws IOException { - getChannel().socket().close(); - } - - @Override - public int write(ByteBuffer src) throws IOException { - return getChannel().write(src); - } - - -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorkerPool.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorkerPool.java deleted file mode 100644 index 5f410000b2..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorkerPool.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import java.util.concurrent.Executor; - - -/** - * Default implementation which hands of {@link NioDatagramWorker}'s - * - * - */ -public class NioDatagramWorkerPool extends AbstractNioWorkerPool { - - public NioDatagramWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) { - super(executor, workerCount, allowShutdownOnIdle); - } - - @Override - protected NioDatagramWorker createWorker(Executor executor, boolean allowShutdownOnIdle) { - return new NioDatagramWorker(executor, allowShutdownOnIdle); - } - -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioProviderMetadata.java b/transport/src/main/java/io/netty/channel/socket/nio/NioProviderMetadata.java deleted file mode 100644 index f2993f52b2..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioProviderMetadata.java +++ /dev/null @@ -1,447 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.spi.SelectorProvider; -import java.util.Set; -import java.util.Map.Entry; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; -import io.netty.util.internal.SystemPropertyUtil; - -/** - * Provides information which is specific to a NIO service provider - * implementation. - */ -final class NioProviderMetadata { - static final InternalLogger logger = - InternalLoggerFactory.getInstance(NioProviderMetadata.class); - - private static final String CONSTRAINT_LEVEL_PROPERTY = - "io.netty.channel.socket.nio.constraintLevel"; - - /** - * 0 - no need to wake up to get / set interestOps (most cases) - * 1 - no need to wake up to get interestOps, but need to wake up to set. - * 2 - need to wake up to get / set interestOps (old providers) - */ - static final int CONSTRAINT_LEVEL; - - static { - int constraintLevel = -1; - - // Use the system property if possible. - constraintLevel = SystemPropertyUtil.get(CONSTRAINT_LEVEL_PROPERTY, -1); - if (constraintLevel < 0 || constraintLevel > 2) { - constraintLevel = -1; - } - - if (constraintLevel >= 0) { - logger.debug( - "Setting the NIO constraint level to: " + constraintLevel); - } - - if (constraintLevel < 0) { - constraintLevel = detectConstraintLevelFromSystemProperties(); - - if (constraintLevel < 0) { - constraintLevel = 2; - if (logger.isDebugEnabled()) { - logger.debug( - "Couldn't determine the NIO constraint level from " + - "the system properties; using the safest level (2)"); - } - } else if (constraintLevel != 0) { - if (logger.isInfoEnabled()) { - logger.info( - "Using the autodetected NIO constraint level: " + - constraintLevel + - " (Use better NIO provider for better performance)"); - } - } else { - if (logger.isDebugEnabled()) { - logger.debug( - "Using the autodetected NIO constraint level: " + - constraintLevel); - } - - } - } - - CONSTRAINT_LEVEL = constraintLevel; - - if (CONSTRAINT_LEVEL < 0 || CONSTRAINT_LEVEL > 2) { - throw new Error( - "Unexpected NIO constraint level: " + - CONSTRAINT_LEVEL + ", please report this error."); - } - } - - private static int detectConstraintLevelFromSystemProperties() { - String version = SystemPropertyUtil.get("java.specification.version"); - String vminfo = SystemPropertyUtil.get("java.vm.info", ""); - String os = SystemPropertyUtil.get("os.name"); - String vendor = SystemPropertyUtil.get("java.vm.vendor"); - String provider; - try { - provider = SelectorProvider.provider().getClass().getName(); - } catch (Exception e) { - // Perhaps security exception. - provider = null; - } - - if (version == null || os == null || vendor == null || provider == null) { - return -1; - } - - os = os.toLowerCase(); - vendor = vendor.toLowerCase(); - -// System.out.println(version); -// System.out.println(vminfo); -// System.out.println(os); -// System.out.println(vendor); -// System.out.println(provider); - - // Sun JVM - if (vendor.indexOf("sun") >= 0) { - // Linux - if (os.indexOf("linux") >= 0) { - if (provider.equals("sun.nio.ch.EPollSelectorProvider") || - provider.equals("sun.nio.ch.PollSelectorProvider")) { - return 0; - } - - // Windows - } else if (os.indexOf("windows") >= 0) { - if (provider.equals("sun.nio.ch.WindowsSelectorProvider")) { - return 0; - } - - // Solaris - } else if (os.indexOf("sun") >= 0 || os.indexOf("solaris") >= 0) { - if (provider.equals("sun.nio.ch.DevPollSelectorProvider")) { - return 0; - } - } - // Apple JVM - } else if (vendor.indexOf("apple") >= 0) { - // Mac OS - if (os.indexOf("mac") >= 0 && os.indexOf("os") >= 0) { - if (provider.equals("sun.nio.ch.KQueueSelectorProvider")) { - return 0; - } - } - // IBM - } else if (vendor.indexOf("ibm") >= 0) { - // Linux or AIX - if (os.indexOf("linux") >= 0 || os.indexOf("aix") >= 0) { - if (version.equals("1.5") || version.matches("^1\\.5\\D.*$")) { - if (provider.equals("sun.nio.ch.PollSelectorProvider")) { - return 1; - } - } else if (version.equals("1.6") || version.matches("^1\\.6\\D.*$")) { - // IBM JDK 1.6 has different constraint level for different - // version. The exact version can be determined only by its - // build date. - Pattern datePattern = Pattern.compile( - "(?:^|[^0-9])(" + - "[2-9][0-9]{3}" + // year - "(?:0[1-9]|1[0-2])" + // month - "(?:0[1-9]|[12][0-9]|3[01])" + // day of month - ")(?:$|[^0-9])"); - - Matcher dateMatcher = datePattern.matcher(vminfo); - if (dateMatcher.find()) { - long dateValue = Long.parseLong(dateMatcher.group(1)); - if (dateValue < 20081105L) { - // SR0, 1, and 2 - return 2; - } else { - // SR3 and later - if (provider.equals("sun.nio.ch.EPollSelectorProvider")) { - return 0; - } else if (provider.equals("sun.nio.ch.PollSelectorProvider")) { - return 1; - } - } - } - } - } - // BEA - } else if (vendor.indexOf("bea") >= 0 || vendor.indexOf("oracle") >= 0) { - // Linux - if (os.indexOf("linux") >= 0) { - if (provider.equals("sun.nio.ch.EPollSelectorProvider") || - provider.equals("sun.nio.ch.PollSelectorProvider")) { - return 0; - } - - // Windows - } else if (os.indexOf("windows") >= 0) { - if (provider.equals("sun.nio.ch.WindowsSelectorProvider")) { - return 0; - } - } - // Apache Software Foundation - } else if (vendor.indexOf("apache") >= 0) { - if (provider.equals("org.apache.harmony.nio.internal.SelectorProviderImpl")) { - return 1; - } - } - - // Others (untested) - return -1; - } - - private static final class ConstraintLevelAutodetector { - - ConstraintLevelAutodetector() { - } - - int autodetect() { - final int constraintLevel; - ExecutorService executor = Executors.newCachedThreadPool(); - boolean success; - long startTime; - int interestOps; - - ServerSocketChannel ch = null; - SelectorLoop loop = null; - - try { - // Open a channel. - ch = ServerSocketChannel.open(); - - // Configure the channel - try { - ch.socket().bind(new InetSocketAddress(0)); - ch.configureBlocking(false); - } catch (Throwable e) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to configure a temporary socket.", e); - } - return -1; - } - - // Prepare the selector loop. - try { - loop = new SelectorLoop(); - } catch (Throwable e) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to open a temporary selector.", e); - } - return -1; - } - - // Register the channel - try { - ch.register(loop.selector, 0); - } catch (Throwable e) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to register a temporary selector.", e); - } - return -1; - } - - SelectionKey key = ch.keyFor(loop.selector); - - // Start the selector loop. - executor.execute(loop); - - // Level 0 - success = true; - for (int i = 0; i < 10; i ++) { - - // Increase the probability of calling interestOps - // while select() is running. - do { - while (!loop.selecting) { - Thread.yield(); - } - - // Wait a little bit more. - try { - Thread.sleep(50); - } catch (InterruptedException e) { - // Ignore - } - } while (!loop.selecting); - - startTime = System.nanoTime(); - key.interestOps(key.interestOps() | SelectionKey.OP_ACCEPT); - key.interestOps(key.interestOps() & ~SelectionKey.OP_ACCEPT); - - if (System.nanoTime() - startTime >= 500000000L) { - success = false; - break; - } - } - - if (success) { - constraintLevel = 0; - } else { - // Level 1 - success = true; - for (int i = 0; i < 10; i ++) { - - // Increase the probability of calling interestOps - // while select() is running. - do { - while (!loop.selecting) { - Thread.yield(); - } - - // Wait a little bit more. - try { - Thread.sleep(50); - } catch (InterruptedException e) { - // Ignore - } - } while (!loop.selecting); - - startTime = System.nanoTime(); - interestOps = key.interestOps(); - synchronized (loop) { - loop.selector.wakeup(); - key.interestOps(interestOps | SelectionKey.OP_ACCEPT); - key.interestOps(interestOps & ~SelectionKey.OP_ACCEPT); - } - - if (System.nanoTime() - startTime >= 500000000L) { - success = false; - break; - } - } - if (success) { - constraintLevel = 1; - } else { - constraintLevel = 2; - } - } - } catch (Throwable e) { - return -1; - } finally { - if (ch != null) { - try { - ch.close(); - } catch (Throwable e) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to close a temporary socket.", e); - } - } - } - - if (loop != null) { - loop.done = true; - try { - executor.shutdownNow(); - } catch (NullPointerException ex) { - // Some JDK throws NPE here, but shouldn't. - } - - try { - for (;;) { - loop.selector.wakeup(); - try { - if (executor.awaitTermination(1, TimeUnit.SECONDS)) { - break; - } - } catch (InterruptedException e) { - // Ignore - } - } - } catch (Throwable e) { - // Perhaps security exception. - } - - try { - loop.selector.close(); - } catch (Throwable e) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to close a temporary selector.", e); - } - } - } - } - - return constraintLevel; - } - } - - private static final class SelectorLoop implements Runnable { - final Selector selector; - volatile boolean done; - volatile boolean selecting; // Just an approximation - - SelectorLoop() throws IOException { - selector = Selector.open(); - } - - @Override - public void run() { - while (!done) { - synchronized (this) { - // Guard - } - try { - selecting = true; - try { - selector.select(1000); - } finally { - selecting = false; - } - - Set keys = selector.selectedKeys(); - for (SelectionKey k: keys) { - k.interestOps(0); - } - keys.clear(); - } catch (IOException e) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to wait for a temporary selector.", e); - } - } - } - } - } - - public static void main(String[] args) throws Exception { - for (Entry e: System.getProperties().entrySet()) { - System.out.println(e.getKey() + ": " + e.getValue()); - } - System.out.println(); - System.out.println("Hard-coded Constraint Level: " + CONSTRAINT_LEVEL); - System.out.println( - "Auto-detected Constraint Level: " + - new ConstraintLevelAutodetector().autodetect()); - } - - private NioProviderMetadata() { - // Unused - } -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java index db7bfc9a80..b2d89e5f10 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java @@ -15,54 +15,32 @@ */ package io.netty.channel.socket.nio; -import static io.netty.channel.Channels.*; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.channels.ServerSocketChannel; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - import io.netty.channel.AbstractServerChannel; import io.netty.channel.ChannelException; -import io.netty.channel.ChannelFactory; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelSink; +import io.netty.channel.ChannelFuture; import io.netty.channel.socket.DefaultServerSocketChannelConfig; import io.netty.channel.socket.ServerSocketChannelConfig; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.ServerSocketChannel; + final class NioServerSocketChannel extends AbstractServerChannel - implements io.netty.channel.socket.ServerSocketChannel, NioChannel { + implements io.netty.channel.socket.ServerSocketChannel { private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class); - final ServerSocketChannel socket; - final Lock shutdownLock = new ReentrantLock(); - final NioWorker worker; - final WorkerPool workers; - - + private final ServerSocketChannel socket; private final ServerSocketChannelConfig config; + private volatile InetSocketAddress localAddress; + private volatile SelectionKey selectionKey; - static NioServerSocketChannel create(ChannelFactory factory, - ChannelPipeline pipeline, ChannelSink sink, NioWorker worker, WorkerPool workers) { - NioServerSocketChannel instance = - new NioServerSocketChannel(factory, pipeline, sink, worker, workers); - fireChannelOpen(instance); - return instance; - } - - private NioServerSocketChannel( - ChannelFactory factory, - ChannelPipeline pipeline, - ChannelSink sink, NioWorker worker, WorkerPool workers) { - - super(factory, pipeline, sink); - this.worker = worker; - this.workers = workers; + public NioServerSocketChannel() { try { socket = ServerSocketChannel.open(); } catch (IOException e) { @@ -90,32 +68,114 @@ final class NioServerSocketChannel extends AbstractServerChannel } @Override - public ServerSocketChannelConfig getConfig() { + public ServerSocketChannelConfig config() { return config; } @Override - public InetSocketAddress getLocalAddress() { - return (InetSocketAddress) socket.socket().getLocalSocketAddress(); + public boolean isActive() { + // TODO Auto-generated method stub + return false; } @Override - public InetSocketAddress getRemoteAddress() { + public InetSocketAddress localAddress() { + InetSocketAddress localAddress = this.localAddress; + if (localAddress == null) { + try { + this.localAddress = localAddress = + (InetSocketAddress) unsafe().localAddress(); + } catch (Throwable t) { + // Sometimes fails on a closed socket in Windows. + return null; + } + } + return localAddress; + } + + @Override + public InetSocketAddress remoteAddress() { return null; } @Override - public boolean isBound() { - return isOpen() && socket.socket().isBound(); + protected java.nio.channels.ServerSocketChannel javaChannel() { + return socket; } @Override - protected boolean setClosed() { - return super.setClosed(); + protected SocketAddress localAddress0() { + return socket.socket().getLocalSocketAddress(); } @Override - public NioWorker getWorker() { - return worker; + protected void doRegister(ChannelFuture future) { + if (!(eventLoop() instanceof SelectorEventLoop)) { + throw new ChannelException("unsupported event loop: " + eventLoop().getClass().getName()); + } + + SelectorEventLoop loop = (SelectorEventLoop) eventLoop(); + try { + selectionKey = javaChannel().register(loop.selector, javaChannel().validOps(), this); + } catch (Exception e) { + throw new ChannelException("failed to register a channel", e); + } + } + + @Override + protected void doBind(SocketAddress localAddress, ChannelFuture future) { + try { + javaChannel().socket().bind(localAddress); + future.setSuccess(); + pipeline().fireChannelActive(); + } catch (Exception e) { + future.setFailure(e); + } + } + + @Override + protected void doClose(ChannelFuture future) { + try { + javaChannel().close(); + } catch (Exception e) { + logger.warn("Failed to close a channel.", e); + } + + future.setSuccess(); + pipeline().fireChannelInactive(); + + if (isRegistered()) { + deregister(null); + } + } + + @Override + protected void doDeregister(ChannelFuture future) { + try { + selectionKey.cancel(); + future.setSuccess(); + pipeline().fireChannelUnregistered(); + } catch (Exception e) { + future.setFailure(e); + } + } + + @Override + protected int doRead() { + int acceptedConns = 0; + for (;;) { + try { + java.nio.channels.SocketChannel ch = javaChannel().accept(); + if (ch == null) { + break; + } + pipeline().nextIn().messageBuffer().add(new NioSocketChannel(this, ch)); + } catch (ChannelException e) { + pipeline().fireExceptionCaught(e); + } catch (Exception e) { + pipeline().fireExceptionCaught(new ChannelException("failed to accept a connection", e)); + } + } + return acceptedConns; } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java deleted file mode 100644 index aa94a6a5f2..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import java.nio.channels.Selector; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; - -import io.netty.channel.Channel; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelSink; -import io.netty.channel.group.ChannelGroup; -import io.netty.channel.socket.ServerSocketChannel; -import io.netty.channel.socket.ServerSocketChannelFactory; -import io.netty.channel.socket.Worker; -import io.netty.util.ExternalResourceReleasable; - -/** - * A {@link ServerSocketChannelFactory} which creates a server-side NIO-based - * {@link ServerSocketChannel}. It utilizes the non-blocking I/O mode which - * was introduced with NIO to serve many number of concurrent connections - * efficiently. - * - *

How threads work

- *

- * There are two types of threads in a {@link NioServerSocketChannelFactory}; - * one is boss thread and the other is worker thread. - * - *

Boss threads

- *

- * Each bound {@link ServerSocketChannel} has its own boss thread. - * For example, if you opened two server ports such as 80 and 443, you will - * have two boss threads. A boss thread accepts incoming connections until - * the port is unbound. Once a connection is accepted successfully, the boss - * thread passes the accepted {@link Channel} to one of the worker - * threads that the {@link NioServerSocketChannelFactory} manages. - * - *

Worker threads

- *

- * One {@link NioServerSocketChannelFactory} can have one or more worker - * threads. A worker thread performs non-blocking read and write for one or - * more {@link Channel}s in a non-blocking mode. - * - *

Life cycle of threads and graceful shutdown

- *

- * All threads are acquired from the {@link Executor}s which were specified - * when a {@link NioServerSocketChannelFactory} was created. Boss threads are - * acquired from the {@code bossExecutor}, and worker threads are acquired from - * the {@code workerExecutor}. Therefore, you should make sure the specified - * {@link Executor}s are able to lend the sufficient number of threads. - * It is the best bet to specify {@linkplain Executors#newCachedThreadPool() a cached thread pool}. - *

- * Both boss and worker threads are acquired lazily, and then released when - * there's nothing left to process. All the related resources such as - * {@link Selector} are also released when the boss and worker threads are - * released. Therefore, to shut down a service gracefully, you should do the - * following: - * - *

    - *
  1. unbind all channels created by the factory, - *
  2. close all child channels accepted by the unbound channels, and - * (these two steps so far is usually done using {@link ChannelGroup#close()})
  3. - *
  4. call {@link #releaseExternalResources()}.
  5. - *
- * - * Please make sure not to shut down the executor until all channels are - * closed. Otherwise, you will end up with a {@link RejectedExecutionException} - * and the related resources might not be released properly. - * @apiviz.landmark - */ -public class NioServerSocketChannelFactory implements ServerSocketChannelFactory { - - private final WorkerPool workerPool; - private final ChannelSink sink; - private final WorkerPool bossWorkerPool; - - /** - * Create a new {@link NioServerSocketChannelFactory} using - * {@link Executors#newCachedThreadPool()} for the workers. - * - * See {@link #NioServerSocketChannelFactory(Executor, Executor)} - */ - public NioServerSocketChannelFactory() { - this(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); - } - - - /** - * Creates a new instance. Calling this constructor is same with calling - * {@link #NioServerSocketChannelFactory(Executor, Executor, int, int)} with 1 - * as boss count and 2 * the number of available processors in the machine. The number of - * available processors is obtained by {@link Runtime#availableProcessors()}. - * - * @param bossExecutor - * the {@link Executor} which will execute the I/O worker threads that handle the accepting of new connections - * @param workerExecutor - * the {@link Executor} which will execute the I/O worker threads - */ - public NioServerSocketChannelFactory(Executor bossExecutor, Executor workerExecutor) { - this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_ACCEPTING_THREADS, SelectorUtil.DEFAULT_IO_THREADS); - } - - /** - * Creates a new instance. - * - * @param bossExecutor - * the {@link Executor} which will execute the I/O worker threads that handle the accepting of new connections - * @param workerExecutor - * the {@link Executor} which will execute the I/O worker threads - * @param bossCount - * the maximum number of I/O worker threads that handling the accepting of connections - * @param workerCount - * the maximum number of I/O worker threads - */ - public NioServerSocketChannelFactory(Executor bossExecutor, Executor workerExecutor, int bossCount, - int workerCount) { - this(new NioWorkerPool(bossExecutor, bossCount, true), new NioWorkerPool(workerExecutor, workerCount, true)); - } - - /** - * Creates a new instance. - * - * @param bossWorkerPool - * the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads that handle the accepting of new connections - * @param workerPool - * the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads - */ - public NioServerSocketChannelFactory(WorkerPool bossWorkerPool, WorkerPool workerPool) { - if (bossWorkerPool == null) { - throw new NullPointerException("bossWorkerPool"); - } - if (workerPool == null) { - throw new NullPointerException("workerPool"); - } - this.bossWorkerPool = bossWorkerPool; - this.workerPool = workerPool; - sink = new NioServerSocketPipelineSink(); - } - - /** - * Creates a new instance which use the given {@link WorkerPool} for everything. - * - * @param genericExecutor - * the {@link Executor} which will execute the I/O worker threads ( this also includes handle the accepting of new connections) - * @param workerCount - * the maximum number of I/O worker threads - * - */ - public NioServerSocketChannelFactory(Executor genericExecutor, int workerCount) { - this(new NioWorkerPool(genericExecutor, workerCount, true)); - } - - /** - * Creates a new instance which use the given {@link WorkerPool} for everything. - * - * @param genericExecutor - * the {@link Executor} which will execute the I/O worker threads ( this also includes handle the accepting of new connections) - * - */ - public NioServerSocketChannelFactory(Executor genericExecutor) { - this(genericExecutor, SelectorUtil.DEFAULT_IO_ACCEPTING_THREADS + SelectorUtil.DEFAULT_IO_THREADS); - } - - - /** - * Creates a new instance which use the given {@link WorkerPool} for everything. - * - * @param genericWorkerPool - * the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads (that included accepting of new connections) - */ - public NioServerSocketChannelFactory(WorkerPool genericWorkerPool) { - this(genericWorkerPool, genericWorkerPool); - } - - - @Override - public ServerSocketChannel newChannel(ChannelPipeline pipeline) { - return NioServerSocketChannel.create(this, pipeline, sink, bossWorkerPool.nextWorker(), workerPool); - } - - @Override - public void releaseExternalResources() { - if (workerPool instanceof ExternalResourceReleasable) { - ((ExternalResourceReleasable) workerPool).releaseExternalResources(); - } - - } -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java deleted file mode 100644 index ec8059ca76..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import static io.netty.channel.Channels.*; - -import java.net.SocketAddress; - - -import io.netty.channel.Channel; -import io.netty.channel.ChannelEvent; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelState; -import io.netty.channel.ChannelStateEvent; -import io.netty.channel.MessageEvent; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; - -class NioServerSocketPipelineSink extends AbstractNioChannelSink { - - static final InternalLogger logger = - InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class); - - @Override - public void eventSunk( - ChannelPipeline pipeline, ChannelEvent e) throws Exception { - Channel channel = e.getChannel(); - if (channel instanceof NioServerSocketChannel) { - handleServerSocket(e); - } else if (channel instanceof NioSocketChannel) { - handleAcceptedSocket(e); - } - } - - private void handleServerSocket(ChannelEvent e) { - if (!(e instanceof ChannelStateEvent)) { - return; - } - - ChannelStateEvent event = (ChannelStateEvent) e; - NioServerSocketChannel channel = - (NioServerSocketChannel) event.channel(); - ChannelFuture future = event.getFuture(); - ChannelState state = event.getState(); - Object value = event.getValue(); - - switch (state) { - case OPEN: - if (Boolean.FALSE.equals(value)) { - channel.getWorker().close(channel, future); - } - break; - case BOUND: - if (value != null) { - bind(channel, future, (SocketAddress) value); - } else { - channel.getWorker().close(channel, future); - } - break; - } - } - - private void handleAcceptedSocket(ChannelEvent e) { - if (e instanceof ChannelStateEvent) { - ChannelStateEvent event = (ChannelStateEvent) e; - NioSocketChannel channel = (NioSocketChannel) event.channel(); - ChannelFuture future = event.getFuture(); - ChannelState state = event.getState(); - Object value = event.getValue(); - - switch (state) { - case OPEN: - if (Boolean.FALSE.equals(value)) { - channel.getWorker().close(channel, future); - } - break; - case BOUND: - case CONNECTED: - if (value == null) { - channel.getWorker().close(channel, future); - } - break; - case INTEREST_OPS: - channel.getWorker().setInterestOps(channel, future, ((Integer) value).intValue()); - break; - } - } else if (e instanceof MessageEvent) { - MessageEvent event = (MessageEvent) e; - NioSocketChannel channel = (NioSocketChannel) event.channel(); - boolean offered = channel.writeBufferQueue.offer(event); - assert offered; - channel.getWorker().writeFromUserCode(channel); - } - } - - private void bind( - NioServerSocketChannel channel, ChannelFuture future, - SocketAddress localAddress) { - boolean bound = false; - try { - channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog()); - bound = true; - - future.setSuccess(); - fireChannelBound(channel, channel.getLocalAddress()); - - channel.getWorker().registerWithWorker(channel, future); - - } catch (Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } finally { - if (!bound) { - channel.getWorker().close(channel, future); - } - } - } -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index c484dd7ee5..d08d5596b7 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -15,84 +15,274 @@ */ package io.netty.channel.socket.nio; +import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ChannelBuffers; import io.netty.channel.Channel; -import io.netty.channel.ChannelFactory; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelBufferHolders; +import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelSink; +import io.netty.logging.InternalLogger; +import io.netty.logging.InternalLoggerFactory; +import java.io.IOException; import java.net.SocketAddress; +import java.nio.channels.AsynchronousCloseException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; -public abstract class NioSocketChannel extends AbstractNioChannel implements io.netty.channel.socket.SocketChannel { +public class NioSocketChannel extends AbstractNioChannel implements io.netty.channel.socket.SocketChannel { - private static final int ST_OPEN = 0; - private static final int ST_BOUND = 1; - private static final int ST_CONNECTED = 2; - private static final int ST_CLOSED = -1; - volatile int state = ST_OPEN; + private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class); private final NioSocketChannelConfig config; + private final ChannelBufferHolder out = ChannelBufferHolders.byteBuffer(ChannelBuffers.dynamicBuffer()); - public NioSocketChannel( - Channel parent, ChannelFactory factory, - ChannelPipeline pipeline, ChannelSink sink, - SocketChannel socket, NioWorker worker) { - super(parent, factory, pipeline, sink, worker, new NioSocketJdkChannel(socket)); + private static SocketChannel newSocket() { + SocketChannel socket; + try { + socket = SocketChannel.open(); + } catch (IOException e) { + throw new ChannelException("Failed to open a socket.", e); + } + boolean success = false; + try { + socket.configureBlocking(false); + success = true; + } catch (IOException e) { + throw new ChannelException("Failed to enter non-blocking mode.", e); + } finally { + if (!success) { + try { + socket.close(); + } catch (IOException e) { + if (logger.isWarnEnabled()) { + logger.warn( + "Failed to close a partially initialized socket.", + e); + } + + } + } + } + + return socket; + } + + public NioSocketChannel(Channel parent) { + this(parent, newSocket()); + } + + public NioSocketChannel(Channel parent, SocketChannel socket) { + super(parent, socket); config = new DefaultNioSocketChannelConfig(socket.socket()); } @Override - public NioWorker getWorker() { - return (NioWorker) super.getWorker(); - } - - @Override - public NioSocketChannelConfig getConfig() { + public NioSocketChannelConfig config() { return config; } @Override - public boolean isOpen() { - return state >= ST_OPEN; + protected SocketChannel javaChannel() { + return (SocketChannel) super.javaChannel(); } @Override - public boolean isBound() { - return state >= ST_BOUND; + public boolean isActive() { + return javaChannel().isConnected(); + } + + + @Override + @SuppressWarnings("unchecked") + protected ChannelBufferHolder firstOut() { + return (ChannelBufferHolder) out; } @Override - public boolean isConnected() { - return state == ST_CONNECTED; + protected SocketAddress localAddress0() { + return javaChannel().socket().getLocalSocketAddress(); } - final void setBound() { - assert state == ST_OPEN : "Invalid state: " + state; - state = ST_BOUND; + @Override + protected SocketAddress remoteAddress0() { + return javaChannel().socket().getRemoteSocketAddress(); } - final void setConnected() { - if (state != ST_CLOSED) { - state = ST_CONNECTED; + @Override + protected void doBind(SocketAddress localAddress, ChannelFuture future) { + try { + javaChannel().socket().bind(localAddress); + future.setSuccess(); + } catch (Exception e) { + future.setFailure(e); } } @Override - protected boolean setClosed() { - state = ST_CLOSED; - return super.setClosed(); - } - + protected void doConnect(SocketAddress remoteAddress, + SocketAddress localAddress, ChannelFuture future) { + if (localAddress != null) { + try { + javaChannel().socket().bind(localAddress); + } catch (Exception e) { + future.setFailure(e); + } + } - @Override - public ChannelFuture write(Object message, SocketAddress remoteAddress) { - if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) { - return super.write(message, null); - } else { - return getUnsupportedOperationFuture(); + try { + if (javaChannel().connect(remoteAddress)) { + future.setSuccess(); + pipeline().fireChannelActive(); + } + } catch (Exception e) { + future.setFailure(e); + close(null); } } - + + @Override + protected void doFinishConnect(ChannelFuture future) { + try { + if (javaChannel().finishConnect()) { + future.setSuccess(); + pipeline().fireChannelActive(); + } + } catch (Exception e) { + future.setFailure(e); + close(null); + } + } + + @Override + protected void doDisconnect(ChannelFuture future) { + doClose(future); + } + + @Override + protected void doClose(ChannelFuture future) { + try { + javaChannel().close(); + } catch (Exception e) { + logger.warn("Failed to close a channel.", e); + } + + future.setSuccess(); + pipeline().fireChannelInactive(); + + if (isRegistered()) { + deregister(null); + } + } + + @Override + protected void doDeregister(ChannelFuture future) { + try { + selectionKey().cancel(); + future.setSuccess(); + pipeline().fireChannelUnregistered(); + } catch (Exception e) { + future.setFailure(e); + } + } + + @Override + protected int doRead() { + final SocketChannel ch = javaChannel(); + + int ret = 0; + int readBytes = 0; + boolean failure = true; + + ChannelBuffer buf = pipeline().nextIn().byteBuffer(); + try { + while ((ret = buf.writeBytes(ch, buf.writableBytes())) > 0) { + readBytes += ret; + if (!buf.writable()) { + break; + } + } + failure = false; + } catch (ClosedChannelException e) { + // Can happen, and does not need a user attention. + } catch (Throwable t) { + pipeline().fireExceptionCaught(t); + } + + if (readBytes > 0) { + pipeline().fireInboundBufferUpdated(); + } + + if (ret < 0 || failure) { + selectionKey().cancel(); // Some JDK implementations run into an infinite loop without this. + close(null); + return -1; + } + + return readBytes; + } + + @Override + protected int doFlush(ChannelFuture future) { + boolean open = true; + boolean addOpWrite = false; + boolean removeOpWrite = false; + + final SocketChannel ch = javaChannel(); + final int writeSpinCount = config().getWriteSpinCount(); + final ChannelBuffer buf = unsafe().out().byteBuffer(); + int bytesLeft = buf.readableBytes(); + if (bytesLeft == 0) { + future.setSuccess(); + return 0; + } + + int readerIndex = buf.readerIndex(); + int localWrittenBytes = 0; + int writtenBytes = 0; + + try { + for (int i = writeSpinCount; i > 0; i --) { + localWrittenBytes = buf.getBytes(readerIndex, ch, bytesLeft); + if (localWrittenBytes > 0) { + bytesLeft -= localWrittenBytes; + if (bytesLeft <= 0) { + removeOpWrite = true; + future.setSuccess(); + break; + } + + readerIndex += localWrittenBytes; + writtenBytes += localWrittenBytes; + } else { + addOpWrite = true; + break; + } + } + } catch (AsynchronousCloseException e) { + // Doesn't need a user attention - ignore. + } catch (Throwable t) { + future.setFailure(t); + pipeline().fireExceptionCaught(t); + if (t instanceof IOException) { + open = false; + close(null); + } + } + + if (open) { + if (addOpWrite) { + SelectionKey key = selectionKey(); + key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + } else if (removeOpWrite) { + SelectionKey key = selectionKey(); + key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); + } + } + + return writtenBytes; + } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketJdkChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketJdkChannel.java deleted file mode 100644 index bca65294b4..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketJdkChannel.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; - -class NioSocketJdkChannel extends AbstractJdkChannel { - - - public NioSocketJdkChannel(SocketChannel channel) { - super(channel); - } - - @Override - protected SocketChannel getChannel() { - return (SocketChannel) super.getChannel(); - } - - @Override - public InetSocketAddress getRemoteSocketAddress() { - return (InetSocketAddress) getChannel().socket().getRemoteSocketAddress(); - } - - @Override - public InetSocketAddress getLocalSocketAddress() { - return (InetSocketAddress) getChannel().socket().getLocalSocketAddress(); - } - - @Override - public boolean isSocketBound() { - return getChannel().socket().isBound(); - } - - @Override - public void bind(SocketAddress local) throws IOException { - getChannel().socket().bind(local); - } - - @Override - public void connect(SocketAddress remote) throws IOException { - getChannel().connect(remote); - } - - @Override - public boolean isConnected() { - return getChannel().isConnected(); - } - - @Override - public void disconnectSocket() throws IOException { - getChannel().socket().close(); - } - - @Override - public void closeSocket() throws IOException { - getChannel().socket().close(); - } - - @Override - public int write(ByteBuffer src) throws IOException { - return getChannel().write(src); - } - - @Override - public boolean finishConnect() throws IOException { - return getChannel().finishConnect(); - } - -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java deleted file mode 100644 index da9b10bf0b..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import static io.netty.channel.Channels.fireChannelBound; -import static io.netty.channel.Channels.fireChannelConnected; -import static io.netty.channel.Channels.fireExceptionCaught; -import static io.netty.channel.Channels.fireMessageReceived; -import static io.netty.channel.Channels.succeededFuture; -import io.netty.buffer.ChannelBuffer; -import io.netty.buffer.ChannelBufferFactory; -import io.netty.channel.ChannelException; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ReceiveBufferSizePredictor; - -import java.io.IOException; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.util.concurrent.Executor; - -public class NioWorker extends SelectorEventLoop { - - protected final ReceiveBufferPool recvBufferPool = new ReceiveBufferPool(); - - public NioWorker(Executor executor) { - super(executor); - } - - public NioWorker(Executor executor, boolean allowShutdownOnIdle) { - super(executor, allowShutdownOnIdle); - } - - - @Override - protected boolean read(SelectionKey k) { - final SocketChannel ch = (SocketChannel) k.channel(); - final NioSocketChannel channel = (NioSocketChannel) k.attachment(); - - final ReceiveBufferSizePredictor predictor = - channel.getConfig().getReceiveBufferSizePredictor(); - final int predictedRecvBufSize = predictor.nextReceiveBufferSize(); - - int ret = 0; - int readBytes = 0; - boolean failure = true; - - ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize); - try { - while ((ret = ch.read(bb)) > 0) { - readBytes += ret; - if (!bb.hasRemaining()) { - break; - } - } - failure = false; - } catch (ClosedChannelException e) { - // Can happen, and does not need a user attention. - } catch (Throwable t) { - fireExceptionCaught(channel, t); - } - - if (readBytes > 0) { - bb.flip(); - - final ChannelBufferFactory bufferFactory = - channel.getConfig().getBufferFactory(); - final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes); - buffer.setBytes(0, bb); - buffer.writerIndex(readBytes); - - recvBufferPool.release(bb); - - // Update the predictor. - predictor.previousReceiveBufferSize(readBytes); - - // Fire the event. - fireMessageReceived(channel, buffer); - } else { - recvBufferPool.release(bb); - } - - if (ret < 0 || failure) { - k.cancel(); // Some JDK implementations run into an infinite loop without this. - close(channel, succeededFuture(channel)); - return false; - } - - return true; - } - - - @Override - protected void registerTask(AbstractNioChannel channel, ChannelFuture future) { - boolean server = !(channel instanceof NioClientSocketChannel); - SocketAddress localAddress = channel.getLocalAddress(); - SocketAddress remoteAddress = channel.getRemoteAddress(); - - if (localAddress == null || remoteAddress == null) { - if (future != null) { - future.setFailure(new ClosedChannelException()); - } - close(channel, succeededFuture(channel)); - return; - } - - try { - if (server) { - channel.getJdkChannel().configureBlocking(false); - } - - boolean registered = channel.getJdkChannel().isRegistered(); - if (!registered) { - synchronized (channel.interestOpsLock) { - channel.getJdkChannel().register(selector, channel.getRawInterestOps(), channel); - } - - } else { - setInterestOps(channel, succeededFuture(channel), channel.getRawInterestOps()); - } - if (future != null) { - if (channel instanceof NioSocketChannel) { - ((NioSocketChannel) channel).setConnected(); - } - future.setSuccess(); - } - if (server || !((NioClientSocketChannel) channel).boundManually) { - fireChannelBound(channel, localAddress); - } - fireChannelConnected(channel, remoteAddress); - - } catch (IOException e) { - if (future != null) { - future.setFailure(e); - } - close(channel, succeededFuture(channel)); - if (!(e instanceof ClosedChannelException)) { - throw new ChannelException( - "Failed to register a socket to the selector.", e); - } - } - } - -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioWorkerPool.java b/transport/src/main/java/io/netty/channel/socket/nio/NioWorkerPool.java deleted file mode 100644 index a8eb002234..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioWorkerPool.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import java.util.concurrent.Executor; - - -/** - * Default implementation which hands of {@link NioWorker}'s - * - * - */ -public class NioWorkerPool extends AbstractNioWorkerPool { - - public NioWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) { - super(executor, workerCount, allowShutdownOnIdle); - } - - @Override - protected NioWorker createWorker(Executor executor, boolean allowShutdownOnIdle) { - return new NioWorker(executor, allowShutdownOnIdle); - } - -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/ReceiveBufferPool.java b/transport/src/main/java/io/netty/channel/socket/nio/ReceiveBufferPool.java deleted file mode 100644 index 701cd171cc..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/ReceiveBufferPool.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import java.lang.ref.SoftReference; -import java.nio.ByteBuffer; - -public final class ReceiveBufferPool { - - private static final int POOL_SIZE = 8; - - @SuppressWarnings("unchecked") - private final SoftReference[] pool = new SoftReference[POOL_SIZE]; - - public ByteBuffer acquire(int size) { - final SoftReference[] pool = this.pool; - for (int i = 0; i < POOL_SIZE; i ++) { - SoftReference ref = pool[i]; - if (ref == null) { - continue; - } - - ByteBuffer buf = ref.get(); - if (buf == null) { - pool[i] = null; - continue; - } - - if (buf.capacity() < size) { - continue; - } - - pool[i] = null; - - buf.clear(); - return buf; - } - - ByteBuffer buf = ByteBuffer.allocateDirect(normalizeCapacity(size)); - return buf; - } - - public void release(ByteBuffer buffer) { - final SoftReference[] pool = this.pool; - for (int i = 0; i < POOL_SIZE; i ++) { - SoftReference ref = pool[i]; - if (ref == null || ref.get() == null) { - pool[i] = new SoftReference(buffer); - return; - } - } - - // pool is full - replace one - final int capacity = buffer.capacity(); - for (int i = 0; i < POOL_SIZE; i ++) { - SoftReference ref = pool[i]; - ByteBuffer pooled = ref.get(); - if (pooled == null) { - pool[i] = null; - continue; - } - - if (pooled.capacity() < capacity) { - pool[i] = new SoftReference(buffer); - return; - } - } - } - - private static int normalizeCapacity(int capacity) { - // Normalize to multiple of 1024 - int q = capacity >>> 10; - int r = capacity & 1023; - if (r != 0) { - q ++; - } - return q << 10; - } -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java b/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java index 13e72d727f..61683ceedb 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java @@ -17,35 +17,21 @@ package io.netty.channel.socket.nio; import io.netty.channel.Channel; import io.netty.channel.ChannelException; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoop; import io.netty.channel.SingleThreadEventLoop; -import io.netty.channel.socket.nio.SendBufferPool.SendBuffer; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import java.io.IOException; import java.net.ConnectException; -import java.net.SocketTimeoutException; -import java.nio.channels.AsynchronousCloseException; import java.nio.channels.CancelledKeyException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.ClosedSelectorException; -import java.nio.channels.NotYetConnectedException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; -import java.nio.channels.WritableByteChannel; import java.nio.channels.spi.SelectorProvider; import java.util.Iterator; -import java.util.Queue; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; abstract class SelectorEventLoop extends SingleThreadEventLoop { /** @@ -54,11 +40,8 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { protected static final InternalLogger logger = InternalLoggerFactory .getInstance(SelectorEventLoop.class); - private static final int CONSTRAINT_LEVEL = NioProviderMetadata.CONSTRAINT_LEVEL; - static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization. - /** * The NIO {@link Selector}. */ @@ -72,20 +55,8 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { */ protected final AtomicBoolean wakenUp = new AtomicBoolean(); - /** - * Lock for this workers Selector. - */ - private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock(); - - /** - * Monitor object used to synchronize selector open/close. - */ - private final Object startStopLock = new Object(); - private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation - protected final SendBufferPool sendBufferPool = new SendBufferPool(); - protected SelectorEventLoop() { this(Executors.defaultThreadFactory()); } @@ -114,69 +85,6 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { } } - @Override - public EventLoop register(final Channel channel, final ChannelFuture future) { - try { - if (channel instanceof NioServerSocketChannel) { - final NioServerSocketChannel ch = (NioServerSocketChannel) channel; - execute(new Runnable() { - @Override - public void run() { - try { - ch.socket.register(selector, SelectionKey.OP_ACCEPT, ch); - } catch (Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } - } - }); - } else if (channel instanceof NioClientSocketChannel) { - final NioClientSocketChannel clientChannel = (NioClientSocketChannel) channel; - - execute(new Runnable() { - @Override - public void run() { - try { - try { - clientChannel.getJdkChannel().register(selector, clientChannel.getRawInterestOps() | SelectionKey.OP_CONNECT, clientChannel); - } catch (ClosedChannelException ignored) { - clientChannel.getWorker().close(clientChannel, succeededFuture(channel)); - } - int connectTimeout = channel.getConfig().getConnectTimeoutMillis(); - if (connectTimeout > 0) { - clientChannel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L; - } - } catch (Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } - } - }); - } else if (channel instanceof AbstractNioChannel) { - execute(new Runnable() { - @Override - public void run() { - try { - registerTask((AbstractNioChannel) channel, future); - } catch (Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } - - } - }); - } else { - throw new UnsupportedOperationException("Unable to handle channel " + channel); - } - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - } catch (Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } - } - @Override protected void run() { long lastConnectTimeoutCheckTimeNanos = System.nanoTime(); @@ -186,13 +94,6 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { wakenUp.set(false); - if (CONSTRAINT_LEVEL != 0) { - selectorGuard.writeLock().lock(); - // This empty synchronization block prevents the selector - // from acquiring its lock. - selectorGuard.writeLock().unlock(); - } - try { SelectorUtil.select(selector); @@ -239,19 +140,9 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { processConnectTimeout(selector.keys(), currentTimeNanos); } - // Exit the loop when there's nothing to handle. - // The shutdown flag is used to delay the shutdown of this - // loop to avoid excessive Selector creation when - // connections are registered in a one-by-one manner instead of - // concurrent manner. - if (selector.keys().isEmpty()) { - if (isShutdown()) { - synchronized (startStopLock) { - if (!hasTasks() && selector.keys().isEmpty()) { - break; - } - } - } + if (isShutdown()) { + // FIXME: Close all channels immediately and break the loop. + break; } } catch (Throwable t) { logger.warn( @@ -293,83 +184,42 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { private void processSelectedKeys(Set selectedKeys) throws IOException { for (Iterator i = selectedKeys.iterator(); i.hasNext();) { SelectionKey k = i.next(); + Channel ch = (Channel) k.attachment(); boolean removeKey = true; try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) { - if (!read(k)) { + if (ch.unsafe().read() < 0) { // Connection already closed - no need to handle write. continue; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { - writeFromSelectorLoop(k); + ch.unsafe().flush(null); } if ((readyOps & SelectionKey.OP_ACCEPT) != 0) { - removeKey = accept(k); + ch.unsafe().read(); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { - connect(k); + ch.unsafe().finishConnect(); } } catch (CancelledKeyException ignored) { - close(k); + ch.unsafe().close(null); } finally { if (removeKey) { i.remove(); } } - - if (cleanUpCancelledKeys()) { break; // break the loop to avoid ConcurrentModificationException } } } - protected boolean accept(SelectionKey key) { - NioServerSocketChannel channel = (NioServerSocketChannel) key.attachment(); - try { - boolean handled = false; - - // accept all sockets that are waiting atm - for (;;) { - SocketChannel acceptedSocket = channel.socket.accept(); - if (acceptedSocket == null) { - break; - } - // TODO: Remove the casting stuff - ChannelPipeline pipeline = - channel.getConfig().getPipelineFactory().getPipeline(); - NioWorker worker = channel.workers.nextWorker(); - - worker.registerWithWorker(NioAcceptedSocketChannel.create(channel.getFactory(), pipeline, channel, - channel.getPipeline().getSink(), acceptedSocket, worker), null); - handled = true; - } - return handled; - } catch (SocketTimeoutException e) { - // Thrown every second to get ClosedChannelException - // raised. - } catch (CancelledKeyException e) { - // Raised by accept() when the server socket was closed. - } catch (ClosedSelectorException e) { - // Raised by accept() when the server socket was closed. - } catch (ClosedChannelException e) { - // Closed as requested. - } catch (Throwable e) { - if (logger.isWarnEnabled()) { - logger.warn( - "Failed to accept a connection.", e); - } - } - return true; - } - - protected void processConnectTimeout(Set keys, long currentTimeNanos) { ConnectException cause = null; for (SelectionKey k: keys) { @@ -389,38 +239,21 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { continue; } // check if the channel is in - Object attachment = k.attachment(); - if (attachment instanceof NioClientSocketChannel) { - NioClientSocketChannel ch = (NioClientSocketChannel) attachment; - if (!ch.isConnected() && ch.connectDeadlineNanos > 0 && currentTimeNanos >= ch.connectDeadlineNanos) { - - if (cause == null) { - cause = new ConnectException("connection timed out"); - } - - ch.connectFuture.setFailure(cause); - fireExceptionCaught(ch, cause); - ch.getWorker().close(ch, succeededFuture(ch)); - } - } - - - - } - } - - protected void connect(SelectionKey k) { - final NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); - try { - // TODO: Remove cast - if (ch.getJdkChannel().finishConnect()) { - registerTask(ch, ch.connectFuture); - } - } catch (Throwable t) { - ch.connectFuture.setFailure(t); - fireExceptionCaught(ch, t); - k.cancel(); // Some JDK implementations run into an infinite loop without this. - ch.getWorker().close(ch, succeededFuture(ch)); + // FIXME: Implement connect timeout. +// Channel ch = (Channel) k.attachment(); +// if (attachment instanceof NioClientSocketChannel) { +// NioClientSocketChannel ch = (NioClientSocketChannel) attachment; +// if (!ch.isConnected() && ch.connectDeadlineNanos > 0 && currentTimeNanos >= ch.connectDeadlineNanos) { +// +// if (cause == null) { +// cause = new ConnectException("connection timed out"); +// } +// +// ch.connectFuture.setFailure(cause); +// fireExceptionCaught(ch, cause); +// ch.getWorker().close(ch, succeededFuture(ch)); +// } +// } } } @@ -432,495 +265,4 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { } return false; } - - - - protected void close(SelectionKey k) { - Object attachment = k.attachment(); - if (attachment instanceof AbstractNioChannel) { - AbstractNioChannel ch = (AbstractNioChannel) attachment; - close(ch, succeededFuture(ch)); - } else if (attachment instanceof NioServerSocketChannel) { - NioServerSocketChannel ch = (NioServerSocketChannel) attachment; - close(ch, succeededFuture(ch)); - } else { - // TODO: What todo ? - } - } - - public void writeFromUserCode(final AbstractNioChannel channel) { - - if (!channel.isConnected()) { - cleanUpWriteBuffer(channel); - return; - } - if (scheduleWriteIfNecessary(channel)) { - return; - } - - // From here, we are sure Thread.currentThread() == workerThread. - - if (channel.writeSuspended) { - return; - } - - if (channel.inWriteNowLoop) { - return; - } - - write0(channel); - } - - public void writeFromTaskLoop(AbstractNioChannel ch) { - if (!ch.writeSuspended) { - write0(ch); - } - } - - void writeFromSelectorLoop(final SelectionKey k) { - AbstractNioChannel ch = (AbstractNioChannel) k.attachment(); - ch.writeSuspended = false; - write0(ch); - } - - - protected boolean scheduleWriteIfNecessary(final AbstractNioChannel channel) { - if (!inEventLoop()) { - if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { - execute(channel.writeTask); - } - - final Selector workerSelector = selector; - if (workerSelector != null) { - if (wakenUp.compareAndSet(false, true)) { - workerSelector.wakeup(); - } - } - - return true; - } - - return false; - } - - protected void write0(AbstractNioChannel channel) { - boolean open = true; - boolean addOpWrite = false; - boolean removeOpWrite = false; - boolean inEventLoop = inEventLoop(); - - long writtenBytes = 0; - - final SendBufferPool sendBufferPool = this.sendBufferPool; - - final WritableByteChannel ch = channel.getJdkChannel(); - final Queue writeBuffer = channel.writeBufferQueue; - final int writeSpinCount = channel.getConfig().getWriteSpinCount(); - synchronized (channel.writeLock) { - channel.inWriteNowLoop = true; - for (;;) { - MessageEvent evt = channel.currentWriteEvent; - SendBuffer buf; - if (evt == null) { - if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) { - removeOpWrite = true; - channel.writeSuspended = false; - break; - } - - channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage()); - } else { - buf = channel.currentWriteBuffer; - } - - ChannelFuture future = evt.getFuture(); - try { - long localWrittenBytes = 0; - for (int i = writeSpinCount; i > 0; i --) { - localWrittenBytes = buf.transferTo(ch); - if (localWrittenBytes != 0) { - writtenBytes += localWrittenBytes; - break; - } - if (buf.finished()) { - break; - } - } - - if (buf.finished()) { - // Successful write - proceed to the next message. - buf.release(); - channel.currentWriteEvent = null; - channel.currentWriteBuffer = null; - evt = null; - buf = null; - future.setSuccess(); - } else { - // Not written fully - perhaps the kernel buffer is full. - addOpWrite = true; - channel.writeSuspended = true; - - if (localWrittenBytes > 0) { - // Notify progress listeners if necessary. - future.setProgress( - localWrittenBytes, - buf.writtenBytes(), buf.totalBytes()); - } - break; - } - } catch (AsynchronousCloseException e) { - // Doesn't need a user attention - ignore. - } catch (Throwable t) { - if (buf != null) { - buf.release(); - } - channel.currentWriteEvent = null; - channel.currentWriteBuffer = null; - buf = null; - evt = null; - future.setFailure(t); - if (inEventLoop) { - fireExceptionCaught(channel, t); - } else { - fireExceptionCaughtLater(channel, t); - } - if (t instanceof IOException) { - open = false; - close(channel, succeededFuture(channel)); - } - } - } - channel.inWriteNowLoop = false; - - // Initially, the following block was executed after releasing - // the writeLock, but there was a race condition, and it has to be - // executed before releasing the writeLock: - // - // https://issues.jboss.org/browse/NETTY-410 - // - if (open) { - if (addOpWrite) { - setOpWrite(channel); - } else if (removeOpWrite) { - clearOpWrite(channel); - } - } - } - if (inEventLoop) { - fireWriteComplete(channel, writtenBytes); - } else { - fireWriteCompleteLater(channel, writtenBytes); - } - } - - protected void setOpWrite(AbstractNioChannel channel) { - Selector selector = this.selector; - SelectionKey key = channel.getJdkChannel().keyFor(selector); - if (key == null) { - return; - } - if (!key.isValid()) { - close(key); - return; - } - - // interestOps can change at any time and at any thread. - // Acquire a lock to avoid possible race condition. - synchronized (channel.interestOpsLock) { - int interestOps = channel.getRawInterestOps(); - if ((interestOps & SelectionKey.OP_WRITE) == 0) { - interestOps |= SelectionKey.OP_WRITE; - key.interestOps(interestOps); - channel.setRawInterestOpsNow(interestOps); - } - } - } - - protected void clearOpWrite(AbstractNioChannel channel) { - Selector selector = this.selector; - SelectionKey key = channel.getJdkChannel().keyFor(selector); - if (key == null) { - return; - } - if (!key.isValid()) { - close(key); - return; - } - - // interestOps can change at any time and at any thread. - // Acquire a lock to avoid possible race condition. - synchronized (channel.interestOpsLock) { - int interestOps = channel.getRawInterestOps(); - if ((interestOps & SelectionKey.OP_WRITE) != 0) { - interestOps &= ~SelectionKey.OP_WRITE; - key.interestOps(interestOps); - channel.setRawInterestOpsNow(interestOps); - } - } - } - - - public void close(NioServerSocketChannel channel, ChannelFuture future) { - boolean inEventLoop = inEventLoop(); - - boolean bound = channel.isBound(); - try { - if (channel.socket.isOpen()) { - channel.socket.close(); - if (selector != null) { - selector.wakeup(); - } - } - - // Make sure the boss thread is not running so that that the future - // is notified after a new connection cannot be accepted anymore. - // See NETTY-256 for more information. - channel.shutdownLock.lock(); - try { - if (channel.setClosed()) { - future.setSuccess(); - if (bound) { - if (inEventLoop) { - fireChannelUnbound(channel); - } else { - fireChannelUnboundLater(channel); - } - } - if (inEventLoop) { - fireChannelClosed(channel); - } else { - fireChannelClosedLater(channel); - } - } else { - future.setSuccess(); - } - } finally { - channel.shutdownLock.unlock(); - } - } catch (Throwable t) { - future.setFailure(t); - if (inEventLoop) { - fireExceptionCaught(channel, t); - } else { - fireExceptionCaughtLater(channel, t); - - } - } - } - - public void close(AbstractNioChannel channel, ChannelFuture future) { - boolean connected = channel.isConnected(); - boolean bound = channel.isBound(); - boolean inEventLoop = inEventLoop(); - - try { - channel.getJdkChannel().close(); - cancelledKeys ++; - - if (channel.setClosed()) { - future.setSuccess(); - if (connected) { - if (inEventLoop) { - fireChannelDisconnected(channel); - } else { - fireChannelDisconnectedLater(channel); - } - } - if (bound) { - if (inEventLoop) { - fireChannelUnbound(channel); - } else { - fireChannelUnboundLater(channel); - } - } - - cleanUpWriteBuffer(channel); - if (inEventLoop) { - fireChannelClosed(channel); - } else { - fireChannelClosedLater(channel); - } - } else { - future.setSuccess(); - } - } catch (Throwable t) { - future.setFailure(t); - if (inEventLoop) { - fireExceptionCaught(channel, t); - } else { - fireExceptionCaughtLater(channel, t); - } - } - } - - protected void cleanUpWriteBuffer(AbstractNioChannel channel) { - Exception cause = null; - boolean fireExceptionCaught = false; - - // Clean up the stale messages in the write buffer. - synchronized (channel.writeLock) { - MessageEvent evt = channel.currentWriteEvent; - if (evt != null) { - // Create the exception only once to avoid the excessive overhead - // caused by fillStackTrace. - if (channel.isOpen()) { - cause = new NotYetConnectedException(); - } else { - cause = new ClosedChannelException(); - } - - ChannelFuture future = evt.getFuture(); - channel.currentWriteBuffer.release(); - channel.currentWriteBuffer = null; - channel.currentWriteEvent = null; - evt = null; - future.setFailure(cause); - fireExceptionCaught = true; - } - - Queue writeBuffer = channel.writeBufferQueue; - if (!writeBuffer.isEmpty()) { - // Create the exception only once to avoid the excessive overhead - // caused by fillStackTrace. - if (cause == null) { - if (channel.isOpen()) { - cause = new NotYetConnectedException(); - } else { - cause = new ClosedChannelException(); - } - } - - for (;;) { - evt = writeBuffer.poll(); - if (evt == null) { - break; - } - evt.getFuture().setFailure(cause); - fireExceptionCaught = true; - } - } - } - - if (fireExceptionCaught) { - if (inEventLoop()) { - fireExceptionCaught(channel, cause); - } else { - fireExceptionCaughtLater(channel, cause); - } - } - } - - public void setInterestOps(AbstractNioChannel channel, ChannelFuture future, int interestOps) { - boolean changed = false; - boolean inEventLoop = inEventLoop(); - try { - // interestOps can change at any time and at any thread. - // Acquire a lock to avoid possible race condition. - synchronized (channel.interestOpsLock) { - Selector selector = this.selector; - SelectionKey key = channel.getJdkChannel().keyFor(selector); - - // Override OP_WRITE flag - a user cannot change this flag. - interestOps &= ~Channel.OP_WRITE; - interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE; - - if (key == null || selector == null) { - if (channel.getRawInterestOps() != interestOps) { - changed = true; - } - - // Not registered to the worker yet. - // Set the rawInterestOps immediately; RegisterTask will pick it up. - channel.setRawInterestOpsNow(interestOps); - - future.setSuccess(); - if (changed) { - if (inEventLoop) { - fireChannelInterestChanged(channel); - } else { - fireChannelInterestChangedLater(channel); - } - } - - return; - } - - switch (CONSTRAINT_LEVEL) { - case 0: - if (channel.getRawInterestOps() != interestOps) { - key.interestOps(interestOps); - if (!inEventLoop && - wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - changed = true; - } - break; - case 1: - case 2: - if (channel.getRawInterestOps() != interestOps) { - if (inEventLoop) { - key.interestOps(interestOps); - changed = true; - } else { - selectorGuard.readLock().lock(); - try { - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - key.interestOps(interestOps); - changed = true; - } finally { - selectorGuard.readLock().unlock(); - } - } - } - break; - default: - throw new Error(); - } - - if (changed) { - channel.setRawInterestOpsNow(interestOps); - } - } - - future.setSuccess(); - if (changed) { - if (inEventLoop) { - fireChannelInterestChanged(channel); - } else { - fireChannelInterestChangedLater(channel); - } - } - } catch (CancelledKeyException e) { - // setInterestOps() was called on a closed channel. - ClosedChannelException cce = new ClosedChannelException(); - future.setFailure(cce); - if (inEventLoop) { - fireExceptionCaught(channel, cce); - } else { - fireExceptionCaughtLater(channel, cce); - } - } catch (Throwable t) { - future.setFailure(t); - if (inEventLoop) { - fireExceptionCaught(channel, t); - } else { - fireExceptionCaughtLater(channel, t); - } - } - } - - /** - * Read is called when a Selector has been notified that the underlying channel - * was something to be read. The channel would previously have registered its interest - * in read operations. - * - * @param k The selection key which contains the Selector registration information. - */ - protected abstract boolean read(SelectionKey k); - - protected abstract void registerTask(AbstractNioChannel channel, ChannelFuture future); - }