Retrofit/overhaul the NIO transport with the new API

- Remove large portion of code thanks to the new API
- SocketChannel implementations are instantiated without factories
- Retrofit the existing code with the new API
This commit is contained in:
Trustin Lee 2012-05-02 21:05:53 +09:00
parent 9e6f8b46df
commit 607d784e5e
26 changed files with 438 additions and 2868 deletions

View File

@ -58,7 +58,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private final ChannelFuture succeededFuture = new SucceededChannelFuture(this);
private volatile EventLoop eventLoop;
private volatile boolean registered;
private volatile boolean notifiedClosureListeners;
private ChannelFuture connectFuture;
/** Cache for the string representation of this channel */
private boolean strValActive;
@ -116,6 +118,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return eventLoop;
}
@Override
public boolean isRegistered() {
return registered;
}
@Override
public void bind(SocketAddress localAddress, ChannelFuture future) {
pipeline().bind(localAddress, future);
@ -210,10 +217,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
/**
* A {@link Channel} implementation must call this method when it is closed.
*/
protected void notifyClosureListeners() {
private void notifyClosureListeners() {
final ChannelFutureListener[] array;
synchronized (closureListeners) {
if (notifiedClosureListeners) {
@ -382,6 +386,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
assert eventLoop().inEventLoop();
doRegister(future);
assert future.isDone();
if (registered = future.isSuccess()) {
pipeline().fireChannelRegistered();
}
}
@Override
@ -400,18 +408,32 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override
public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelFuture future) {
// XXX: What if a user makes a connection attempt twice?
if (eventLoop().inEventLoop()) {
doConnect(remoteAddress, localAddress, future);
if (!future.isDone()) {
connectFuture = future;
}
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
doConnect(remoteAddress, localAddress, future);
if (!future.isDone()) {
connectFuture = future;
}
}
});
}
}
@Override
public void finishConnect() {
assert eventLoop().inEventLoop();
assert connectFuture != null;
doFinishConnect(connectFuture);
}
@Override
public void disconnect(final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
@ -430,11 +452,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
public void close(final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
doClose(future);
notifyClosureListeners();
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
doClose(future);
notifyClosureListeners();
}
});
}
@ -446,6 +470,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
try {
doDeregister(future);
} finally {
registered = false;
pipeline().fireChannelUnregistered();
eventLoop = null;
}
} else {
@ -455,6 +481,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
try {
doDeregister(future);
} finally {
registered = false;
pipeline().fireChannelUnregistered();
eventLoop = null;
}
}
@ -493,6 +521,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
protected abstract void doRegister(ChannelFuture future);
protected abstract void doBind(SocketAddress localAddress, ChannelFuture future);
protected abstract void doConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future);
protected abstract void doFinishConnect(ChannelFuture future);
protected abstract void doDisconnect(ChannelFuture future);
protected abstract void doClose(ChannelFuture future);
protected abstract void doDeregister(ChannelFuture future);

View File

@ -51,11 +51,26 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
return null;
}
@Override
protected ChannelBufferHolder<Object> firstOut() {
return out;
}
@Override
protected SocketAddress remoteAddress0() {
return null;
}
@Override
protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) {
future.setFailure(new UnsupportedOperationException());
}
@Override
protected void doFinishConnect(ChannelFuture future) {
future.setFailure(new UnsupportedOperationException());
}
@Override
protected void doDisconnect(ChannelFuture future) {
future.setFailure(new UnsupportedOperationException());

View File

@ -175,6 +175,8 @@ public interface Channel extends AttributeMap, ChannelFutureFactory, Comparable<
void flush(ChannelFuture future);
void write(Object message, ChannelFuture future);
// FIXME: Introduce more flexible channel state notification mechanism
// - notify me when channel becomes (un)registered, (in)active
void addClosureListener(ChannelFutureListener listener);
void removeClosureListener(ChannelFutureListener remover);
@ -190,6 +192,7 @@ public interface Channel extends AttributeMap, ChannelFutureFactory, Comparable<
void register(EventLoop eventLoop, ChannelFuture future);
void bind(SocketAddress localAddress, ChannelFuture future);
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future);
void finishConnect();
void disconnect(ChannelFuture future);
void close(ChannelFuture future);
void deregister(ChannelFuture future);

View File

@ -1,81 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import java.util.concurrent.Executor;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.socket.nio.NioServerSocketChannelFactory;
import io.netty.util.ExternalResourceReleasable;
/**
* The main interface to a transport that creates a {@link Channel} associated
* with a certain communication entity such as a network socket. For example,
* the {@link NioServerSocketChannelFactory} creates a channel which has a
* NIO-based server socket as its underlying communication entity.
* <p>
* Once a new {@link Channel} is created, the {@link ChannelPipeline} which
* was specified as a parameter in the {@link #newChannel(ChannelPipeline)}
* is attached to the new {@link Channel}, and starts to handle all associated
* {@link ChannelEvent}s.
*
* <h3>Graceful shutdown</h3>
* <p>
* To shut down a network application service which is managed by a factory.
* you should follow the following steps:
* <ol>
* <li>close all channels created by the factory and their child channels
* usually using {@link ChannelGroup#close()}, and</li>
* <li>call {@link #releaseExternalResources()}.</li>
* </ol>
* <p>
* For detailed transport-specific information on shutting down a factory,
* please refer to the Javadoc of {@link ChannelFactory}'s subtypes, such as
* {@link NioServerSocketChannelFactory}.
* @apiviz.landmark
* @apiviz.has io.netty.channel.Channel oneway - - creates
*
* @apiviz.exclude ^io\.netty\.channel\.([a-z]+\.)+.*ChannelFactory$
*/
public interface ChannelFactory extends ExternalResourceReleasable {
/**
* Creates and opens a new {@link Channel} and attaches the specified
* {@link ChannelPipeline} to the new {@link Channel}.
*
* @param pipeline the {@link ChannelPipeline} which is going to be
* attached to the new {@link Channel}
*
* @return the newly open channel
*
* @throws ChannelException if failed to create and open a new channel
*/
Channel newChannel(ChannelPipeline pipeline);
/**
* Releases the external resources that this factory depends on to function.
* An external resource is a resource that this factory didn't create by
* itself. For example, {@link Executor}s that you specified in the factory
* constructor are external resources. You can call this method to release
* all external resources conveniently when the resources are not used by
* this factory or any other part of your application. An unexpected
* behavior will be resulted in if the resources are released when there's
* an open channel which is managed by this factory.
*/
@Override
void releaseExternalResources();
}

View File

@ -1,25 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
/**
* A {@link ChannelFactory} that creates a {@link ServerChannel}.
* @apiviz.has io.netty.channel.ServerChannel oneway - - creates
*/
public interface ServerChannelFactory extends ChannelFactory {
@Override
ServerChannel newChannel(ChannelPipeline pipeline);
}

View File

@ -1,28 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelPipeline;
/**
* A {@link ChannelFactory} which creates a client-side {@link SocketChannel}.
* @apiviz.has io.netty.channel.socket.SocketChannel oneway - - creates
*/
public interface ClientSocketChannelFactory extends ChannelFactory {
@Override
SocketChannel newChannel(ChannelPipeline pipeline);
}

View File

@ -1,28 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelPipeline;
/**
* A {@link ChannelFactory} which creates a {@link DatagramChannel}.
* @apiviz.has io.netty.channel.socket.DatagramChannel oneway - - creates
*/
public interface DatagramChannelFactory extends ChannelFactory {
@Override
DatagramChannel newChannel(ChannelPipeline pipeline);
}

View File

@ -1,29 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ServerChannelFactory;
/**
* A {@link ChannelFactory} which creates a {@link ServerSocketChannel}.
* @apiviz.has io.netty.channel.socket.ServerSocketChannel oneway - - creates
*/
public interface ServerSocketChannelFactory extends ServerChannelFactory {
@Override
ServerSocketChannel newChannel(ChannelPipeline pipeline);
}

View File

@ -17,41 +17,21 @@ package io.netty.channel.socket.nio;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import java.net.InetSocketAddress;
import java.nio.channels.SelectableChannel;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.nio.channels.SelectionKey;
public abstract class AbstractNioChannel extends AbstractChannel {
/**
* Indicates if there is a {@link WriteTask} in the task queue.
*/
final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
/**
* Keeps track of the number of bytes that the {@link WriteRequestQueue} currently
* contains.
*/
final AtomicInteger writeBufferSize = new AtomicInteger();
/**
* Keeps track of the highWaterMark.
*/
final AtomicInteger highWaterMarkCounter = new AtomicInteger();
/**
* Boolean that indicates that write operation is in progress.
*/
protected boolean inWriteNowLoop;
protected boolean writeSuspended;
private final SelectableChannel ch;
private volatile InetSocketAddress localAddress;
volatile InetSocketAddress remoteAddress;
private volatile InetSocketAddress remoteAddress;
private final SelectableChannel ch;
private volatile SelectionKey selectionKey;
protected AbstractNioChannel(Integer id, Channel parent, SelectableChannel ch) {
super(id, parent);
@ -68,6 +48,11 @@ public abstract class AbstractNioChannel extends AbstractChannel {
return ch;
}
protected SelectionKey selectionKey() {
assert selectionKey != null;
return selectionKey;
}
@Override
public InetSocketAddress localAddress() {
InetSocketAddress localAddress = this.localAddress;
@ -100,4 +85,18 @@ public abstract class AbstractNioChannel extends AbstractChannel {
@Override
public abstract NioChannelConfig config();
@Override
protected void doRegister(ChannelFuture future) {
if (!(eventLoop() instanceof SelectorEventLoop)) {
throw new ChannelException("unsupported event loop: " + eventLoop().getClass().getName());
}
SelectorEventLoop loop = (SelectorEventLoop) eventLoop();
try {
selectionKey = javaChannel().register(loop.selector, javaChannel().validOps() & ~SelectionKey.OP_WRITE, this);
} catch (Exception e) {
throw new ChannelException("failed to register a channel", e);
}
}
}

View File

@ -1,47 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import static io.netty.channel.Channels.*;
import java.nio.channels.SocketChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
final class NioAcceptedSocketChannel extends NioSocketChannel {
static NioAcceptedSocketChannel create(ChannelFactory factory,
ChannelPipeline pipeline, Channel parent, ChannelSink sink,
SocketChannel socket, NioWorker worker) {
NioAcceptedSocketChannel instance = new NioAcceptedSocketChannel(
factory, pipeline, parent, sink, socket, worker);
instance.setConnected();
fireChannelOpen(instance);
return instance;
}
private NioAcceptedSocketChannel(
ChannelFactory factory, ChannelPipeline pipeline,
Channel parent, ChannelSink sink,
SocketChannel socket, NioWorker worker) {
super(parent, factory, pipeline, sink, socket, worker);
}
}

View File

@ -1,88 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import static io.netty.channel.Channels.*;
import java.io.IOException;
import java.nio.channels.SocketChannel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
final class NioClientSocketChannel extends NioSocketChannel {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioClientSocketChannel.class);
private static SocketChannel newSocket() {
SocketChannel socket;
try {
socket = SocketChannel.open();
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
boolean success = false;
try {
socket.configureBlocking(false);
success = true;
} catch (IOException e) {
throw new ChannelException("Failed to enter non-blocking mode.", e);
} finally {
if (!success) {
try {
socket.close();
} catch (IOException e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.",
e);
}
}
}
}
return socket;
}
volatile ChannelFuture connectFuture;
volatile boolean boundManually;
// Does not need to be volatile as it's accessed by only one thread.
long connectDeadlineNanos;
static NioClientSocketChannel create(ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink, NioWorker worker) {
NioClientSocketChannel instance =
new NioClientSocketChannel(factory, pipeline, sink, worker);
fireChannelOpen(instance);
return instance;
}
private NioClientSocketChannel(
ChannelFactory factory, ChannelPipeline pipeline,
ChannelSink sink, NioWorker worker) {
super(null, factory, pipeline, sink, newSocket(), worker);
}
}

View File

@ -1,148 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import java.nio.channels.Selector;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.socket.ClientSocketChannelFactory;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.ExternalResourceReleasable;
/**
* A {@link ClientSocketChannelFactory} which creates a client-side NIO-based
* {@link SocketChannel}. It utilizes the non-blocking I/O mode which was
* introduced with NIO to serve many number of concurrent connections
* efficiently.
*
* <h3>How threads work</h3>
* <p>
* There are two types of threads in a {@link NioClientSocketChannelFactory};
* one is boss thread and the other is worker thread.
*
* <h4>Boss thread</h4>
* <p>
* One {@link NioClientSocketChannelFactory} has one boss thread. It makes
* a connection attempt on request. Once a connection attempt succeeds,
* the boss thread passes the connected {@link Channel} to one of the worker
* threads that the {@link NioClientSocketChannelFactory} manages.
*
* <h4>Worker threads</h4>
* <p>
* One {@link NioClientSocketChannelFactory} can have one or more worker
* threads. A worker thread performs non-blocking read and write for one or
* more {@link Channel}s in a non-blocking mode.
*
* <h3>Life cycle of threads and graceful shutdown</h3>
* <p>
* All threads are acquired from the {@link Executor}s which were specified
* when a {@link NioClientSocketChannelFactory} was created. A boss thread is
* acquired from the {@code bossExecutor}, and worker threads are acquired from
* the {@code workerExecutor}. Therefore, you should make sure the specified
* {@link Executor}s are able to lend the sufficient number of threads.
* It is the best bet to specify {@linkplain Executors#newCachedThreadPool() a cached thread pool}.
* <p>
* Both boss and worker threads are acquired lazily, and then released when
* there's nothing left to process. All the related resources such as
* {@link Selector} are also released when the boss and worker threads are
* released. Therefore, to shut down a service gracefully, you should do the
* following:
*
* <ol>
* <li>close all channels created by the factory usually using
* {@link ChannelGroup#close()}, and</li>
* <li>call {@link #releaseExternalResources()}.</li>
* </ol>
*
* Please make sure not to shut down the executor until all channels are
* closed. Otherwise, you will end up with a {@link RejectedExecutionException}
* and the related resources might not be released properly.
* @apiviz.landmark
*/
public class NioClientSocketChannelFactory implements ClientSocketChannelFactory {
private final WorkerPool<NioWorker> workerPool;
private final ChannelSink sink;
/**
* Creates a new {@link NioClientSocketChannelFactory} which uses {@link Executors#newCachedThreadPool()} for the worker executor.
*
* See {@link #NioClientSocketChannelFactory(Executor, Executor)}
*/
public NioClientSocketChannelFactory() {
this(Executors.newCachedThreadPool());
}
/**
* Creates a new instance. Calling this constructor is same with calling
* {@link #NioClientSocketChannelFactory(Executor, Executor, int, int)} with
* 1 and (2 * the number of available processors in the machine) for
* <tt>bossCount</tt> and <tt>workerCount</tt> respectively. The number of
* available processors is obtained by {@link Runtime#availableProcessors()}.
*
* @param workerExecutor
* the {@link Executor} which will execute the worker threads
*/
public NioClientSocketChannelFactory(Executor workerExecutor) {
this(workerExecutor, SelectorUtil.DEFAULT_IO_THREADS);
}
/**
* Creates a new instance. Calling this constructor is same with calling
* {@link #NioClientSocketChannelFactory(Executor, int, int)} with
* 1 as <tt>bossCount</tt>.
*
* @param workerExecutor
* the {@link Executor} which will execute the worker threads
* @param workerCount
* the maximum number of worker threads
*/
public NioClientSocketChannelFactory(Executor workerExecutor,
int workerCount) {
this(new NioWorkerPool(workerExecutor, workerCount, true));
}
public NioClientSocketChannelFactory(WorkerPool<NioWorker> workerPool) {
if (workerPool == null) {
throw new NullPointerException("workerPool");
}
this.workerPool = workerPool;
sink = new NioClientSocketPipelineSink();
}
@Override
public SocketChannel newChannel(ChannelPipeline pipeline) {
return NioClientSocketChannel.create(this, pipeline, sink, workerPool.nextWorker());
}
@Override
public void releaseExternalResources() {
if (workerPool instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) workerPool).releaseExternalResources();
}
}
}

View File

@ -1,124 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import static io.netty.channel.Channels.fireChannelBound;
import static io.netty.channel.Channels.fireExceptionCaught;
import static io.netty.channel.Channels.succeededFuture;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelState;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.MessageEvent;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
class NioClientSocketPipelineSink extends AbstractNioChannelSink {
static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class);
@Override
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent event = (ChannelStateEvent) e;
NioClientSocketChannel channel =
(NioClientSocketChannel) event.channel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
channel.getWorker().close(channel, future);
}
break;
case BOUND:
if (value != null) {
bind(channel, future, (SocketAddress) value);
} else {
channel.getWorker().close(channel, future);
}
break;
case CONNECTED:
if (value != null) {
connect(channel, future, (SocketAddress) value);
} else {
channel.getWorker().close(channel, future);
}
break;
case INTEREST_OPS:
channel.getWorker().setInterestOps(channel, future, ((Integer) value).intValue());
break;
}
} else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.channel();
boolean offered = channel.writeBufferQueue.offer(event);
assert offered;
channel.getWorker().writeFromUserCode(channel);
}
}
private void bind(
NioClientSocketChannel channel, ChannelFuture future,
SocketAddress localAddress) {
try {
channel.getJdkChannel().bind(localAddress);
channel.boundManually = true;
channel.setBound();
future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress());
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
private void connect(
final NioClientSocketChannel channel, final ChannelFuture cf,
SocketAddress remoteAddress) {
try {
channel.getJdkChannel().connect(remoteAddress);
channel.getCloseFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f)
throws Exception {
if (!cf.isDone()) {
cf.setFailure(new ClosedChannelException());
}
}
});
cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
channel.connectFuture = cf;
channel.getWorker().registerWithWorker(channel, cf);
} catch (Throwable t) {
t.printStackTrace();
cf.setFailure(t);
fireExceptionCaught(channel, t);
channel.getWorker().close(channel, succeededFuture(channel));
}
}
}

View File

@ -1,194 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import java.nio.channels.Selector;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelFactory;
import io.netty.channel.socket.Worker;
import io.netty.channel.socket.nio.NioDatagramChannel.ProtocolFamily;
import io.netty.channel.socket.oio.OioDatagramChannelFactory;
import io.netty.util.ExternalResourceReleasable;
/**
* A {@link DatagramChannelFactory} that creates a NIO-based connectionless
* {@link DatagramChannel}. It utilizes the non-blocking I/O mode which
* was introduced with NIO to serve many number of concurrent connections
* efficiently.
*
* <h3>How threads work</h3>
* <p>
* There is only one thread type in a {@link NioDatagramChannelFactory};
* worker threads.
*
* <h4>Worker threads</h4>
* <p>
* One {@link NioDatagramChannelFactory} can have one or more worker
* threads. A worker thread performs non-blocking read and write for one or
* more {@link DatagramChannel}s in a non-blocking mode.
*
* <h3>Life cycle of threads and graceful shutdown</h3>
* <p>
* All worker threads are acquired from the {@link Executor} which was specified
* when a {@link NioDatagramChannelFactory} was created. Therefore, you should
* make sure the specified {@link Executor} is able to lend the sufficient
* number of threads. It is the best bet to specify
* {@linkplain Executors#newCachedThreadPool() a cached thread pool}.
* <p>
* All worker threads are acquired lazily, and then released when there's
* nothing left to process. All the related resources such as {@link Selector}
* are also released when the worker threads are released. Therefore, to shut
* down a service gracefully, you should do the following:
*
* <ol>
* <li>close all channels created by the factory usually using
* {@link ChannelGroup#close()}, and</li>
* <li>call {@link #releaseExternalResources()}.</li>
* </ol>
*
* Please make sure not to shut down the executor until all channels are
* closed. Otherwise, you will end up with a {@link RejectedExecutionException}
* and the related resources might not be released properly.
*
* <h3>Limitation</h3>
* <p>
* Multicast is not supported. Please use {@link OioDatagramChannelFactory}
* instead.
* @apiviz.landmark
*/
public class NioDatagramChannelFactory implements DatagramChannelFactory {
private final ChannelSink sink;
private final WorkerPool<NioDatagramWorker> workerPool;
private final NioDatagramChannel.ProtocolFamily family;
/**
* Create a new {@link NioDatagramChannelFactory} with a {@link Executors#newCachedThreadPool()}.
*
* See {@link #NioDatagramChannelFactory(Executor)}
*/
public NioDatagramChannelFactory() {
this(Executors.newCachedThreadPool());
}
/**
* Creates a new instance. Calling this constructor is same with calling
* {@link #NioDatagramChannelFactory(Executor, int)} with 2 * the number of
* available processors in the machine. The number of available processors
* is obtained by {@link Runtime#availableProcessors()}.
*
* @param workerExecutor
* the {@link Executor} which will execute the I/O worker threads
*/
public NioDatagramChannelFactory(final Executor workerExecutor) {
this(workerExecutor, SelectorUtil.DEFAULT_IO_THREADS);
}
/**
* Creates a new instance.
*
* @param workerExecutor
* the {@link Executor} which will execute the I/O worker threads
* @param workerCount
* the maximum number of I/O worker threads
*/
public NioDatagramChannelFactory(final Executor workerExecutor,
final int workerCount) {
this(new NioDatagramWorkerPool(workerExecutor, workerCount, true));
}
/**
* Creates a new instance.
*
* @param workerPool
* the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads
*/
public NioDatagramChannelFactory(WorkerPool<NioDatagramWorker> workerPool) {
this(workerPool, null);
}
/**
* Creates a new instance. Calling this constructor is same with calling
* {@link #NioDatagramChannelFactory(Executor, int)} with 2 * the number of
* available processors in the machine. The number of available processors
* is obtained by {@link Runtime#availableProcessors()}.
*
* @param workerExecutor
* the {@link Executor} which will execute the I/O worker threads
* @param family
* the {@link ProtocolFamily} to use. This should be used for UDP multicast.
* <strong>Be aware that this option is only considered when running on java7+</strong>
*/
public NioDatagramChannelFactory(final Executor workerExecutor, ProtocolFamily family) {
this(workerExecutor, SelectorUtil.DEFAULT_IO_THREADS, family);
}
/**
* Creates a new instance.
*
* @param workerExecutor
* the {@link Executor} which will execute the I/O worker threads
* @param workerCount
* the maximum number of I/O worker threads
* @param family
* the {@link ProtocolFamily} to use. This should be used for UDP multicast.
* <strong>Be aware that this option is only considered when running on java7+</strong>
*/
public NioDatagramChannelFactory(final Executor workerExecutor,
final int workerCount, ProtocolFamily family) {
this(new NioDatagramWorkerPool(workerExecutor, workerCount, true), family);
}
/**
* Creates a new instance.
*
* @param workerPool
* the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads
* @param family
* the {@link ProtocolFamily} to use. This should be used for UDP multicast.
* <strong>Be aware that this option is only considered when running on java7+</strong>
*/
public NioDatagramChannelFactory(WorkerPool<NioDatagramWorker> workerPool, ProtocolFamily family) {
this.workerPool = workerPool;
this.family = family;
sink = new NioDatagramPipelineSink();
}
@Override
public DatagramChannel newChannel(final ChannelPipeline pipeline) {
return NioDatagramChannel.create(this, pipeline, sink, workerPool.nextWorker(), family);
}
@Override
public void releaseExternalResources() {
if (workerPool instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) workerPool).releaseExternalResources();
}
}
}

View File

@ -1,84 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
class NioDatagramJdkChannel extends AbstractJdkChannel {
NioDatagramJdkChannel(DatagramChannel channel) {
super(channel);
}
@Override
protected DatagramChannel getChannel() {
return (DatagramChannel) super.getChannel();
}
@Override
public InetSocketAddress getRemoteSocketAddress() {
return (InetSocketAddress) getChannel().socket().getRemoteSocketAddress();
}
@Override
public InetSocketAddress getLocalSocketAddress() {
return (InetSocketAddress) getChannel().socket().getLocalSocketAddress();
}
@Override
public boolean isSocketBound() {
return getChannel().socket().isBound();
}
@Override
public void bind(SocketAddress local) throws IOException {
getChannel().socket().bind(local);
}
@Override
public void connect(SocketAddress remote) throws IOException {
getChannel().connect(remote);
}
@Override
public boolean isConnected() {
return getChannel().isConnected();
}
@Override
public void disconnectSocket() throws IOException {
getChannel().disconnect();
}
@Override
public void closeSocket() throws IOException {
getChannel().socket().close();
}
@Override
public int write(ByteBuffer src) throws IOException {
return getChannel().write(src);
}
}

View File

@ -1,37 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import java.util.concurrent.Executor;
/**
* Default implementation which hands of {@link NioDatagramWorker}'s
*
*
*/
public class NioDatagramWorkerPool extends AbstractNioWorkerPool<NioDatagramWorker> {
public NioDatagramWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) {
super(executor, workerCount, allowShutdownOnIdle);
}
@Override
protected NioDatagramWorker createWorker(Executor executor, boolean allowShutdownOnIdle) {
return new NioDatagramWorker(executor, allowShutdownOnIdle);
}
}

View File

@ -1,447 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.SystemPropertyUtil;
/**
* Provides information which is specific to a NIO service provider
* implementation.
*/
final class NioProviderMetadata {
static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioProviderMetadata.class);
private static final String CONSTRAINT_LEVEL_PROPERTY =
"io.netty.channel.socket.nio.constraintLevel";
/**
* 0 - no need to wake up to get / set interestOps (most cases)
* 1 - no need to wake up to get interestOps, but need to wake up to set.
* 2 - need to wake up to get / set interestOps (old providers)
*/
static final int CONSTRAINT_LEVEL;
static {
int constraintLevel = -1;
// Use the system property if possible.
constraintLevel = SystemPropertyUtil.get(CONSTRAINT_LEVEL_PROPERTY, -1);
if (constraintLevel < 0 || constraintLevel > 2) {
constraintLevel = -1;
}
if (constraintLevel >= 0) {
logger.debug(
"Setting the NIO constraint level to: " + constraintLevel);
}
if (constraintLevel < 0) {
constraintLevel = detectConstraintLevelFromSystemProperties();
if (constraintLevel < 0) {
constraintLevel = 2;
if (logger.isDebugEnabled()) {
logger.debug(
"Couldn't determine the NIO constraint level from " +
"the system properties; using the safest level (2)");
}
} else if (constraintLevel != 0) {
if (logger.isInfoEnabled()) {
logger.info(
"Using the autodetected NIO constraint level: " +
constraintLevel +
" (Use better NIO provider for better performance)");
}
} else {
if (logger.isDebugEnabled()) {
logger.debug(
"Using the autodetected NIO constraint level: " +
constraintLevel);
}
}
}
CONSTRAINT_LEVEL = constraintLevel;
if (CONSTRAINT_LEVEL < 0 || CONSTRAINT_LEVEL > 2) {
throw new Error(
"Unexpected NIO constraint level: " +
CONSTRAINT_LEVEL + ", please report this error.");
}
}
private static int detectConstraintLevelFromSystemProperties() {
String version = SystemPropertyUtil.get("java.specification.version");
String vminfo = SystemPropertyUtil.get("java.vm.info", "");
String os = SystemPropertyUtil.get("os.name");
String vendor = SystemPropertyUtil.get("java.vm.vendor");
String provider;
try {
provider = SelectorProvider.provider().getClass().getName();
} catch (Exception e) {
// Perhaps security exception.
provider = null;
}
if (version == null || os == null || vendor == null || provider == null) {
return -1;
}
os = os.toLowerCase();
vendor = vendor.toLowerCase();
// System.out.println(version);
// System.out.println(vminfo);
// System.out.println(os);
// System.out.println(vendor);
// System.out.println(provider);
// Sun JVM
if (vendor.indexOf("sun") >= 0) {
// Linux
if (os.indexOf("linux") >= 0) {
if (provider.equals("sun.nio.ch.EPollSelectorProvider") ||
provider.equals("sun.nio.ch.PollSelectorProvider")) {
return 0;
}
// Windows
} else if (os.indexOf("windows") >= 0) {
if (provider.equals("sun.nio.ch.WindowsSelectorProvider")) {
return 0;
}
// Solaris
} else if (os.indexOf("sun") >= 0 || os.indexOf("solaris") >= 0) {
if (provider.equals("sun.nio.ch.DevPollSelectorProvider")) {
return 0;
}
}
// Apple JVM
} else if (vendor.indexOf("apple") >= 0) {
// Mac OS
if (os.indexOf("mac") >= 0 && os.indexOf("os") >= 0) {
if (provider.equals("sun.nio.ch.KQueueSelectorProvider")) {
return 0;
}
}
// IBM
} else if (vendor.indexOf("ibm") >= 0) {
// Linux or AIX
if (os.indexOf("linux") >= 0 || os.indexOf("aix") >= 0) {
if (version.equals("1.5") || version.matches("^1\\.5\\D.*$")) {
if (provider.equals("sun.nio.ch.PollSelectorProvider")) {
return 1;
}
} else if (version.equals("1.6") || version.matches("^1\\.6\\D.*$")) {
// IBM JDK 1.6 has different constraint level for different
// version. The exact version can be determined only by its
// build date.
Pattern datePattern = Pattern.compile(
"(?:^|[^0-9])(" +
"[2-9][0-9]{3}" + // year
"(?:0[1-9]|1[0-2])" + // month
"(?:0[1-9]|[12][0-9]|3[01])" + // day of month
")(?:$|[^0-9])");
Matcher dateMatcher = datePattern.matcher(vminfo);
if (dateMatcher.find()) {
long dateValue = Long.parseLong(dateMatcher.group(1));
if (dateValue < 20081105L) {
// SR0, 1, and 2
return 2;
} else {
// SR3 and later
if (provider.equals("sun.nio.ch.EPollSelectorProvider")) {
return 0;
} else if (provider.equals("sun.nio.ch.PollSelectorProvider")) {
return 1;
}
}
}
}
}
// BEA
} else if (vendor.indexOf("bea") >= 0 || vendor.indexOf("oracle") >= 0) {
// Linux
if (os.indexOf("linux") >= 0) {
if (provider.equals("sun.nio.ch.EPollSelectorProvider") ||
provider.equals("sun.nio.ch.PollSelectorProvider")) {
return 0;
}
// Windows
} else if (os.indexOf("windows") >= 0) {
if (provider.equals("sun.nio.ch.WindowsSelectorProvider")) {
return 0;
}
}
// Apache Software Foundation
} else if (vendor.indexOf("apache") >= 0) {
if (provider.equals("org.apache.harmony.nio.internal.SelectorProviderImpl")) {
return 1;
}
}
// Others (untested)
return -1;
}
private static final class ConstraintLevelAutodetector {
ConstraintLevelAutodetector() {
}
int autodetect() {
final int constraintLevel;
ExecutorService executor = Executors.newCachedThreadPool();
boolean success;
long startTime;
int interestOps;
ServerSocketChannel ch = null;
SelectorLoop loop = null;
try {
// Open a channel.
ch = ServerSocketChannel.open();
// Configure the channel
try {
ch.socket().bind(new InetSocketAddress(0));
ch.configureBlocking(false);
} catch (Throwable e) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to configure a temporary socket.", e);
}
return -1;
}
// Prepare the selector loop.
try {
loop = new SelectorLoop();
} catch (Throwable e) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to open a temporary selector.", e);
}
return -1;
}
// Register the channel
try {
ch.register(loop.selector, 0);
} catch (Throwable e) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to register a temporary selector.", e);
}
return -1;
}
SelectionKey key = ch.keyFor(loop.selector);
// Start the selector loop.
executor.execute(loop);
// Level 0
success = true;
for (int i = 0; i < 10; i ++) {
// Increase the probability of calling interestOps
// while select() is running.
do {
while (!loop.selecting) {
Thread.yield();
}
// Wait a little bit more.
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
} while (!loop.selecting);
startTime = System.nanoTime();
key.interestOps(key.interestOps() | SelectionKey.OP_ACCEPT);
key.interestOps(key.interestOps() & ~SelectionKey.OP_ACCEPT);
if (System.nanoTime() - startTime >= 500000000L) {
success = false;
break;
}
}
if (success) {
constraintLevel = 0;
} else {
// Level 1
success = true;
for (int i = 0; i < 10; i ++) {
// Increase the probability of calling interestOps
// while select() is running.
do {
while (!loop.selecting) {
Thread.yield();
}
// Wait a little bit more.
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
} while (!loop.selecting);
startTime = System.nanoTime();
interestOps = key.interestOps();
synchronized (loop) {
loop.selector.wakeup();
key.interestOps(interestOps | SelectionKey.OP_ACCEPT);
key.interestOps(interestOps & ~SelectionKey.OP_ACCEPT);
}
if (System.nanoTime() - startTime >= 500000000L) {
success = false;
break;
}
}
if (success) {
constraintLevel = 1;
} else {
constraintLevel = 2;
}
}
} catch (Throwable e) {
return -1;
} finally {
if (ch != null) {
try {
ch.close();
} catch (Throwable e) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close a temporary socket.", e);
}
}
}
if (loop != null) {
loop.done = true;
try {
executor.shutdownNow();
} catch (NullPointerException ex) {
// Some JDK throws NPE here, but shouldn't.
}
try {
for (;;) {
loop.selector.wakeup();
try {
if (executor.awaitTermination(1, TimeUnit.SECONDS)) {
break;
}
} catch (InterruptedException e) {
// Ignore
}
}
} catch (Throwable e) {
// Perhaps security exception.
}
try {
loop.selector.close();
} catch (Throwable e) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close a temporary selector.", e);
}
}
}
}
return constraintLevel;
}
}
private static final class SelectorLoop implements Runnable {
final Selector selector;
volatile boolean done;
volatile boolean selecting; // Just an approximation
SelectorLoop() throws IOException {
selector = Selector.open();
}
@Override
public void run() {
while (!done) {
synchronized (this) {
// Guard
}
try {
selecting = true;
try {
selector.select(1000);
} finally {
selecting = false;
}
Set<SelectionKey> keys = selector.selectedKeys();
for (SelectionKey k: keys) {
k.interestOps(0);
}
keys.clear();
} catch (IOException e) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to wait for a temporary selector.", e);
}
}
}
}
}
public static void main(String[] args) throws Exception {
for (Entry<Object, Object> e: System.getProperties().entrySet()) {
System.out.println(e.getKey() + ": " + e.getValue());
}
System.out.println();
System.out.println("Hard-coded Constraint Level: " + CONSTRAINT_LEVEL);
System.out.println(
"Auto-detected Constraint Level: " +
new ConstraintLevelAutodetector().autodetect());
}
private NioProviderMetadata() {
// Unused
}
}

View File

@ -15,54 +15,32 @@
*/
package io.netty.channel.socket.nio;
import static io.netty.channel.Channels.*;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import io.netty.channel.AbstractServerChannel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.DefaultServerSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannelConfig;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
final class NioServerSocketChannel extends AbstractServerChannel
implements io.netty.channel.socket.ServerSocketChannel, NioChannel {
implements io.netty.channel.socket.ServerSocketChannel {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioServerSocketChannel.class);
final ServerSocketChannel socket;
final Lock shutdownLock = new ReentrantLock();
final NioWorker worker;
final WorkerPool<NioWorker> workers;
private final ServerSocketChannel socket;
private final ServerSocketChannelConfig config;
private volatile InetSocketAddress localAddress;
private volatile SelectionKey selectionKey;
static NioServerSocketChannel create(ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink, NioWorker worker, WorkerPool<NioWorker> workers) {
NioServerSocketChannel instance =
new NioServerSocketChannel(factory, pipeline, sink, worker, workers);
fireChannelOpen(instance);
return instance;
}
private NioServerSocketChannel(
ChannelFactory factory,
ChannelPipeline pipeline,
ChannelSink sink, NioWorker worker, WorkerPool<NioWorker> workers) {
super(factory, pipeline, sink);
this.worker = worker;
this.workers = workers;
public NioServerSocketChannel() {
try {
socket = ServerSocketChannel.open();
} catch (IOException e) {
@ -90,32 +68,114 @@ final class NioServerSocketChannel extends AbstractServerChannel
}
@Override
public ServerSocketChannelConfig getConfig() {
public ServerSocketChannelConfig config() {
return config;
}
@Override
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) socket.socket().getLocalSocketAddress();
public boolean isActive() {
// TODO Auto-generated method stub
return false;
}
@Override
public InetSocketAddress getRemoteAddress() {
public InetSocketAddress localAddress() {
InetSocketAddress localAddress = this.localAddress;
if (localAddress == null) {
try {
this.localAddress = localAddress =
(InetSocketAddress) unsafe().localAddress();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
return localAddress;
}
@Override
public InetSocketAddress remoteAddress() {
return null;
}
@Override
public boolean isBound() {
return isOpen() && socket.socket().isBound();
protected java.nio.channels.ServerSocketChannel javaChannel() {
return socket;
}
@Override
protected boolean setClosed() {
return super.setClosed();
protected SocketAddress localAddress0() {
return socket.socket().getLocalSocketAddress();
}
@Override
public NioWorker getWorker() {
return worker;
protected void doRegister(ChannelFuture future) {
if (!(eventLoop() instanceof SelectorEventLoop)) {
throw new ChannelException("unsupported event loop: " + eventLoop().getClass().getName());
}
SelectorEventLoop loop = (SelectorEventLoop) eventLoop();
try {
selectionKey = javaChannel().register(loop.selector, javaChannel().validOps(), this);
} catch (Exception e) {
throw new ChannelException("failed to register a channel", e);
}
}
@Override
protected void doBind(SocketAddress localAddress, ChannelFuture future) {
try {
javaChannel().socket().bind(localAddress);
future.setSuccess();
pipeline().fireChannelActive();
} catch (Exception e) {
future.setFailure(e);
}
}
@Override
protected void doClose(ChannelFuture future) {
try {
javaChannel().close();
} catch (Exception e) {
logger.warn("Failed to close a channel.", e);
}
future.setSuccess();
pipeline().fireChannelInactive();
if (isRegistered()) {
deregister(null);
}
}
@Override
protected void doDeregister(ChannelFuture future) {
try {
selectionKey.cancel();
future.setSuccess();
pipeline().fireChannelUnregistered();
} catch (Exception e) {
future.setFailure(e);
}
}
@Override
protected int doRead() {
int acceptedConns = 0;
for (;;) {
try {
java.nio.channels.SocketChannel ch = javaChannel().accept();
if (ch == null) {
break;
}
pipeline().nextIn().messageBuffer().add(new NioSocketChannel(this, ch));
} catch (ChannelException e) {
pipeline().fireExceptionCaught(e);
} catch (Exception e) {
pipeline().fireExceptionCaught(new ChannelException("failed to accept a connection", e));
}
}
return acceptedConns;
}
}

View File

@ -1,202 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import java.nio.channels.Selector;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.ServerSocketChannelFactory;
import io.netty.channel.socket.Worker;
import io.netty.util.ExternalResourceReleasable;
/**
* A {@link ServerSocketChannelFactory} which creates a server-side NIO-based
* {@link ServerSocketChannel}. It utilizes the non-blocking I/O mode which
* was introduced with NIO to serve many number of concurrent connections
* efficiently.
*
* <h3>How threads work</h3>
* <p>
* There are two types of threads in a {@link NioServerSocketChannelFactory};
* one is boss thread and the other is worker thread.
*
* <h4>Boss threads</h4>
* <p>
* Each bound {@link ServerSocketChannel} has its own boss thread.
* For example, if you opened two server ports such as 80 and 443, you will
* have two boss threads. A boss thread accepts incoming connections until
* the port is unbound. Once a connection is accepted successfully, the boss
* thread passes the accepted {@link Channel} to one of the worker
* threads that the {@link NioServerSocketChannelFactory} manages.
*
* <h4>Worker threads</h4>
* <p>
* One {@link NioServerSocketChannelFactory} can have one or more worker
* threads. A worker thread performs non-blocking read and write for one or
* more {@link Channel}s in a non-blocking mode.
*
* <h3>Life cycle of threads and graceful shutdown</h3>
* <p>
* All threads are acquired from the {@link Executor}s which were specified
* when a {@link NioServerSocketChannelFactory} was created. Boss threads are
* acquired from the {@code bossExecutor}, and worker threads are acquired from
* the {@code workerExecutor}. Therefore, you should make sure the specified
* {@link Executor}s are able to lend the sufficient number of threads.
* It is the best bet to specify {@linkplain Executors#newCachedThreadPool() a cached thread pool}.
* <p>
* Both boss and worker threads are acquired lazily, and then released when
* there's nothing left to process. All the related resources such as
* {@link Selector} are also released when the boss and worker threads are
* released. Therefore, to shut down a service gracefully, you should do the
* following:
*
* <ol>
* <li>unbind all channels created by the factory,
* <li>close all child channels accepted by the unbound channels, and
* (these two steps so far is usually done using {@link ChannelGroup#close()})</li>
* <li>call {@link #releaseExternalResources()}.</li>
* </ol>
*
* Please make sure not to shut down the executor until all channels are
* closed. Otherwise, you will end up with a {@link RejectedExecutionException}
* and the related resources might not be released properly.
* @apiviz.landmark
*/
public class NioServerSocketChannelFactory implements ServerSocketChannelFactory {
private final WorkerPool<NioWorker> workerPool;
private final ChannelSink sink;
private final WorkerPool<NioWorker> bossWorkerPool;
/**
* Create a new {@link NioServerSocketChannelFactory} using
* {@link Executors#newCachedThreadPool()} for the workers.
*
* See {@link #NioServerSocketChannelFactory(Executor, Executor)}
*/
public NioServerSocketChannelFactory() {
this(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
}
/**
* Creates a new instance. Calling this constructor is same with calling
* {@link #NioServerSocketChannelFactory(Executor, Executor, int, int)} with 1
* as boss count and 2 * the number of available processors in the machine. The number of
* available processors is obtained by {@link Runtime#availableProcessors()}.
*
* @param bossExecutor
* the {@link Executor} which will execute the I/O worker threads that handle the accepting of new connections
* @param workerExecutor
* the {@link Executor} which will execute the I/O worker threads
*/
public NioServerSocketChannelFactory(Executor bossExecutor, Executor workerExecutor) {
this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_ACCEPTING_THREADS, SelectorUtil.DEFAULT_IO_THREADS);
}
/**
* Creates a new instance.
*
* @param bossExecutor
* the {@link Executor} which will execute the I/O worker threads that handle the accepting of new connections
* @param workerExecutor
* the {@link Executor} which will execute the I/O worker threads
* @param bossCount
* the maximum number of I/O worker threads that handling the accepting of connections
* @param workerCount
* the maximum number of I/O worker threads
*/
public NioServerSocketChannelFactory(Executor bossExecutor, Executor workerExecutor, int bossCount,
int workerCount) {
this(new NioWorkerPool(bossExecutor, bossCount, true), new NioWorkerPool(workerExecutor, workerCount, true));
}
/**
* Creates a new instance.
*
* @param bossWorkerPool
* the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads that handle the accepting of new connections
* @param workerPool
* the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads
*/
public NioServerSocketChannelFactory(WorkerPool<NioWorker> bossWorkerPool, WorkerPool<NioWorker> workerPool) {
if (bossWorkerPool == null) {
throw new NullPointerException("bossWorkerPool");
}
if (workerPool == null) {
throw new NullPointerException("workerPool");
}
this.bossWorkerPool = bossWorkerPool;
this.workerPool = workerPool;
sink = new NioServerSocketPipelineSink();
}
/**
* Creates a new instance which use the given {@link WorkerPool} for everything.
*
* @param genericExecutor
* the {@link Executor} which will execute the I/O worker threads ( this also includes handle the accepting of new connections)
* @param workerCount
* the maximum number of I/O worker threads
*
*/
public NioServerSocketChannelFactory(Executor genericExecutor, int workerCount) {
this(new NioWorkerPool(genericExecutor, workerCount, true));
}
/**
* Creates a new instance which use the given {@link WorkerPool} for everything.
*
* @param genericExecutor
* the {@link Executor} which will execute the I/O worker threads ( this also includes handle the accepting of new connections)
*
*/
public NioServerSocketChannelFactory(Executor genericExecutor) {
this(genericExecutor, SelectorUtil.DEFAULT_IO_ACCEPTING_THREADS + SelectorUtil.DEFAULT_IO_THREADS);
}
/**
* Creates a new instance which use the given {@link WorkerPool} for everything.
*
* @param genericWorkerPool
* the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads (that included accepting of new connections)
*/
public NioServerSocketChannelFactory(WorkerPool<NioWorker> genericWorkerPool) {
this(genericWorkerPool, genericWorkerPool);
}
@Override
public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
return NioServerSocketChannel.create(this, pipeline, sink, bossWorkerPool.nextWorker(), workerPool);
}
@Override
public void releaseExternalResources() {
if (workerPool instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) workerPool).releaseExternalResources();
}
}
}

View File

@ -1,132 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import static io.netty.channel.Channels.*;
import java.net.SocketAddress;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelState;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.MessageEvent;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
class NioServerSocketPipelineSink extends AbstractNioChannelSink {
static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class);
@Override
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
Channel channel = e.getChannel();
if (channel instanceof NioServerSocketChannel) {
handleServerSocket(e);
} else if (channel instanceof NioSocketChannel) {
handleAcceptedSocket(e);
}
}
private void handleServerSocket(ChannelEvent e) {
if (!(e instanceof ChannelStateEvent)) {
return;
}
ChannelStateEvent event = (ChannelStateEvent) e;
NioServerSocketChannel channel =
(NioServerSocketChannel) event.channel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
channel.getWorker().close(channel, future);
}
break;
case BOUND:
if (value != null) {
bind(channel, future, (SocketAddress) value);
} else {
channel.getWorker().close(channel, future);
}
break;
}
}
private void handleAcceptedSocket(ChannelEvent e) {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent event = (ChannelStateEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.channel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
channel.getWorker().close(channel, future);
}
break;
case BOUND:
case CONNECTED:
if (value == null) {
channel.getWorker().close(channel, future);
}
break;
case INTEREST_OPS:
channel.getWorker().setInterestOps(channel, future, ((Integer) value).intValue());
break;
}
} else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.channel();
boolean offered = channel.writeBufferQueue.offer(event);
assert offered;
channel.getWorker().writeFromUserCode(channel);
}
}
private void bind(
NioServerSocketChannel channel, ChannelFuture future,
SocketAddress localAddress) {
boolean bound = false;
try {
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
bound = true;
future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress());
channel.getWorker().registerWithWorker(channel, future);
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!bound) {
channel.getWorker().close(channel, future);
}
}
}
}

View File

@ -15,84 +15,274 @@
*/
package io.netty.channel.socket.nio;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
public abstract class NioSocketChannel extends AbstractNioChannel implements io.netty.channel.socket.SocketChannel {
public class NioSocketChannel extends AbstractNioChannel implements io.netty.channel.socket.SocketChannel {
private static final int ST_OPEN = 0;
private static final int ST_BOUND = 1;
private static final int ST_CONNECTED = 2;
private static final int ST_CLOSED = -1;
volatile int state = ST_OPEN;
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class);
private final NioSocketChannelConfig config;
private final ChannelBufferHolder<?> out = ChannelBufferHolders.byteBuffer(ChannelBuffers.dynamicBuffer());
public NioSocketChannel(
Channel parent, ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink,
SocketChannel socket, NioWorker worker) {
super(parent, factory, pipeline, sink, worker, new NioSocketJdkChannel(socket));
private static SocketChannel newSocket() {
SocketChannel socket;
try {
socket = SocketChannel.open();
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
boolean success = false;
try {
socket.configureBlocking(false);
success = true;
} catch (IOException e) {
throw new ChannelException("Failed to enter non-blocking mode.", e);
} finally {
if (!success) {
try {
socket.close();
} catch (IOException e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.",
e);
}
}
}
}
return socket;
}
public NioSocketChannel(Channel parent) {
this(parent, newSocket());
}
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new DefaultNioSocketChannelConfig(socket.socket());
}
@Override
public NioWorker getWorker() {
return (NioWorker) super.getWorker();
}
@Override
public NioSocketChannelConfig getConfig() {
public NioSocketChannelConfig config() {
return config;
}
@Override
public boolean isOpen() {
return state >= ST_OPEN;
protected SocketChannel javaChannel() {
return (SocketChannel) super.javaChannel();
}
@Override
public boolean isBound() {
return state >= ST_BOUND;
public boolean isActive() {
return javaChannel().isConnected();
}
@Override
@SuppressWarnings("unchecked")
protected ChannelBufferHolder<Object> firstOut() {
return (ChannelBufferHolder<Object>) out;
}
@Override
public boolean isConnected() {
return state == ST_CONNECTED;
protected SocketAddress localAddress0() {
return javaChannel().socket().getLocalSocketAddress();
}
final void setBound() {
assert state == ST_OPEN : "Invalid state: " + state;
state = ST_BOUND;
@Override
protected SocketAddress remoteAddress0() {
return javaChannel().socket().getRemoteSocketAddress();
}
final void setConnected() {
if (state != ST_CLOSED) {
state = ST_CONNECTED;
@Override
protected void doBind(SocketAddress localAddress, ChannelFuture future) {
try {
javaChannel().socket().bind(localAddress);
future.setSuccess();
} catch (Exception e) {
future.setFailure(e);
}
}
@Override
protected boolean setClosed() {
state = ST_CLOSED;
return super.setClosed();
}
protected void doConnect(SocketAddress remoteAddress,
SocketAddress localAddress, ChannelFuture future) {
if (localAddress != null) {
try {
javaChannel().socket().bind(localAddress);
} catch (Exception e) {
future.setFailure(e);
}
}
@Override
public ChannelFuture write(Object message, SocketAddress remoteAddress) {
if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
return super.write(message, null);
} else {
return getUnsupportedOperationFuture();
try {
if (javaChannel().connect(remoteAddress)) {
future.setSuccess();
pipeline().fireChannelActive();
}
} catch (Exception e) {
future.setFailure(e);
close(null);
}
}
@Override
protected void doFinishConnect(ChannelFuture future) {
try {
if (javaChannel().finishConnect()) {
future.setSuccess();
pipeline().fireChannelActive();
}
} catch (Exception e) {
future.setFailure(e);
close(null);
}
}
@Override
protected void doDisconnect(ChannelFuture future) {
doClose(future);
}
@Override
protected void doClose(ChannelFuture future) {
try {
javaChannel().close();
} catch (Exception e) {
logger.warn("Failed to close a channel.", e);
}
future.setSuccess();
pipeline().fireChannelInactive();
if (isRegistered()) {
deregister(null);
}
}
@Override
protected void doDeregister(ChannelFuture future) {
try {
selectionKey().cancel();
future.setSuccess();
pipeline().fireChannelUnregistered();
} catch (Exception e) {
future.setFailure(e);
}
}
@Override
protected int doRead() {
final SocketChannel ch = javaChannel();
int ret = 0;
int readBytes = 0;
boolean failure = true;
ChannelBuffer buf = pipeline().nextIn().byteBuffer();
try {
while ((ret = buf.writeBytes(ch, buf.writableBytes())) > 0) {
readBytes += ret;
if (!buf.writable()) {
break;
}
}
failure = false;
} catch (ClosedChannelException e) {
// Can happen, and does not need a user attention.
} catch (Throwable t) {
pipeline().fireExceptionCaught(t);
}
if (readBytes > 0) {
pipeline().fireInboundBufferUpdated();
}
if (ret < 0 || failure) {
selectionKey().cancel(); // Some JDK implementations run into an infinite loop without this.
close(null);
return -1;
}
return readBytes;
}
@Override
protected int doFlush(ChannelFuture future) {
boolean open = true;
boolean addOpWrite = false;
boolean removeOpWrite = false;
final SocketChannel ch = javaChannel();
final int writeSpinCount = config().getWriteSpinCount();
final ChannelBuffer buf = unsafe().out().byteBuffer();
int bytesLeft = buf.readableBytes();
if (bytesLeft == 0) {
future.setSuccess();
return 0;
}
int readerIndex = buf.readerIndex();
int localWrittenBytes = 0;
int writtenBytes = 0;
try {
for (int i = writeSpinCount; i > 0; i --) {
localWrittenBytes = buf.getBytes(readerIndex, ch, bytesLeft);
if (localWrittenBytes > 0) {
bytesLeft -= localWrittenBytes;
if (bytesLeft <= 0) {
removeOpWrite = true;
future.setSuccess();
break;
}
readerIndex += localWrittenBytes;
writtenBytes += localWrittenBytes;
} else {
addOpWrite = true;
break;
}
}
} catch (AsynchronousCloseException e) {
// Doesn't need a user attention - ignore.
} catch (Throwable t) {
future.setFailure(t);
pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
open = false;
close(null);
}
}
if (open) {
if (addOpWrite) {
SelectionKey key = selectionKey();
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
} else if (removeOpWrite) {
SelectionKey key = selectionKey();
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
}
}
return writtenBytes;
}
}

View File

@ -1,86 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
class NioSocketJdkChannel extends AbstractJdkChannel {
public NioSocketJdkChannel(SocketChannel channel) {
super(channel);
}
@Override
protected SocketChannel getChannel() {
return (SocketChannel) super.getChannel();
}
@Override
public InetSocketAddress getRemoteSocketAddress() {
return (InetSocketAddress) getChannel().socket().getRemoteSocketAddress();
}
@Override
public InetSocketAddress getLocalSocketAddress() {
return (InetSocketAddress) getChannel().socket().getLocalSocketAddress();
}
@Override
public boolean isSocketBound() {
return getChannel().socket().isBound();
}
@Override
public void bind(SocketAddress local) throws IOException {
getChannel().socket().bind(local);
}
@Override
public void connect(SocketAddress remote) throws IOException {
getChannel().connect(remote);
}
@Override
public boolean isConnected() {
return getChannel().isConnected();
}
@Override
public void disconnectSocket() throws IOException {
getChannel().socket().close();
}
@Override
public void closeSocket() throws IOException {
getChannel().socket().close();
}
@Override
public int write(ByteBuffer src) throws IOException {
return getChannel().write(src);
}
@Override
public boolean finishConnect() throws IOException {
return getChannel().finishConnect();
}
}

View File

@ -1,159 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import static io.netty.channel.Channels.fireChannelBound;
import static io.netty.channel.Channels.fireChannelConnected;
import static io.netty.channel.Channels.fireExceptionCaught;
import static io.netty.channel.Channels.fireMessageReceived;
import static io.netty.channel.Channels.succeededFuture;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferFactory;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ReceiveBufferSizePredictor;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor;
public class NioWorker extends SelectorEventLoop {
protected final ReceiveBufferPool recvBufferPool = new ReceiveBufferPool();
public NioWorker(Executor executor) {
super(executor);
}
public NioWorker(Executor executor, boolean allowShutdownOnIdle) {
super(executor, allowShutdownOnIdle);
}
@Override
protected boolean read(SelectionKey k) {
final SocketChannel ch = (SocketChannel) k.channel();
final NioSocketChannel channel = (NioSocketChannel) k.attachment();
final ReceiveBufferSizePredictor predictor =
channel.getConfig().getReceiveBufferSizePredictor();
final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
int ret = 0;
int readBytes = 0;
boolean failure = true;
ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);
try {
while ((ret = ch.read(bb)) > 0) {
readBytes += ret;
if (!bb.hasRemaining()) {
break;
}
}
failure = false;
} catch (ClosedChannelException e) {
// Can happen, and does not need a user attention.
} catch (Throwable t) {
fireExceptionCaught(channel, t);
}
if (readBytes > 0) {
bb.flip();
final ChannelBufferFactory bufferFactory =
channel.getConfig().getBufferFactory();
final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
buffer.setBytes(0, bb);
buffer.writerIndex(readBytes);
recvBufferPool.release(bb);
// Update the predictor.
predictor.previousReceiveBufferSize(readBytes);
// Fire the event.
fireMessageReceived(channel, buffer);
} else {
recvBufferPool.release(bb);
}
if (ret < 0 || failure) {
k.cancel(); // Some JDK implementations run into an infinite loop without this.
close(channel, succeededFuture(channel));
return false;
}
return true;
}
@Override
protected void registerTask(AbstractNioChannel channel, ChannelFuture future) {
boolean server = !(channel instanceof NioClientSocketChannel);
SocketAddress localAddress = channel.getLocalAddress();
SocketAddress remoteAddress = channel.getRemoteAddress();
if (localAddress == null || remoteAddress == null) {
if (future != null) {
future.setFailure(new ClosedChannelException());
}
close(channel, succeededFuture(channel));
return;
}
try {
if (server) {
channel.getJdkChannel().configureBlocking(false);
}
boolean registered = channel.getJdkChannel().isRegistered();
if (!registered) {
synchronized (channel.interestOpsLock) {
channel.getJdkChannel().register(selector, channel.getRawInterestOps(), channel);
}
} else {
setInterestOps(channel, succeededFuture(channel), channel.getRawInterestOps());
}
if (future != null) {
if (channel instanceof NioSocketChannel) {
((NioSocketChannel) channel).setConnected();
}
future.setSuccess();
}
if (server || !((NioClientSocketChannel) channel).boundManually) {
fireChannelBound(channel, localAddress);
}
fireChannelConnected(channel, remoteAddress);
} catch (IOException e) {
if (future != null) {
future.setFailure(e);
}
close(channel, succeededFuture(channel));
if (!(e instanceof ClosedChannelException)) {
throw new ChannelException(
"Failed to register a socket to the selector.", e);
}
}
}
}

View File

@ -1,37 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import java.util.concurrent.Executor;
/**
* Default implementation which hands of {@link NioWorker}'s
*
*
*/
public class NioWorkerPool extends AbstractNioWorkerPool<NioWorker> {
public NioWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) {
super(executor, workerCount, allowShutdownOnIdle);
}
@Override
protected NioWorker createWorker(Executor executor, boolean allowShutdownOnIdle) {
return new NioWorker(executor, allowShutdownOnIdle);
}
}

View File

@ -1,92 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
public final class ReceiveBufferPool {
private static final int POOL_SIZE = 8;
@SuppressWarnings("unchecked")
private final SoftReference<ByteBuffer>[] pool = new SoftReference[POOL_SIZE];
public ByteBuffer acquire(int size) {
final SoftReference<ByteBuffer>[] pool = this.pool;
for (int i = 0; i < POOL_SIZE; i ++) {
SoftReference<ByteBuffer> ref = pool[i];
if (ref == null) {
continue;
}
ByteBuffer buf = ref.get();
if (buf == null) {
pool[i] = null;
continue;
}
if (buf.capacity() < size) {
continue;
}
pool[i] = null;
buf.clear();
return buf;
}
ByteBuffer buf = ByteBuffer.allocateDirect(normalizeCapacity(size));
return buf;
}
public void release(ByteBuffer buffer) {
final SoftReference<ByteBuffer>[] pool = this.pool;
for (int i = 0; i < POOL_SIZE; i ++) {
SoftReference<ByteBuffer> ref = pool[i];
if (ref == null || ref.get() == null) {
pool[i] = new SoftReference<ByteBuffer>(buffer);
return;
}
}
// pool is full - replace one
final int capacity = buffer.capacity();
for (int i = 0; i < POOL_SIZE; i ++) {
SoftReference<ByteBuffer> ref = pool[i];
ByteBuffer pooled = ref.get();
if (pooled == null) {
pool[i] = null;
continue;
}
if (pooled.capacity() < capacity) {
pool[i] = new SoftReference<ByteBuffer>(buffer);
return;
}
}
}
private static int normalizeCapacity(int capacity) {
// Normalize to multiple of 1024
int q = capacity >>> 10;
int r = capacity & 1023;
if (r != 0) {
q ++;
}
return q << 10;
}
}

View File

@ -17,35 +17,21 @@ package io.netty.channel.socket.nio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.socket.nio.SendBufferPool.SendBuffer;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
abstract class SelectorEventLoop extends SingleThreadEventLoop {
/**
@ -54,11 +40,8 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
protected static final InternalLogger logger = InternalLoggerFactory
.getInstance(SelectorEventLoop.class);
private static final int CONSTRAINT_LEVEL = NioProviderMetadata.CONSTRAINT_LEVEL;
static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
/**
* The NIO {@link Selector}.
*/
@ -72,20 +55,8 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
*/
protected final AtomicBoolean wakenUp = new AtomicBoolean();
/**
* Lock for this workers Selector.
*/
private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock();
/**
* Monitor object used to synchronize selector open/close.
*/
private final Object startStopLock = new Object();
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
protected final SendBufferPool sendBufferPool = new SendBufferPool();
protected SelectorEventLoop() {
this(Executors.defaultThreadFactory());
}
@ -114,69 +85,6 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
}
}
@Override
public EventLoop register(final Channel channel, final ChannelFuture future) {
try {
if (channel instanceof NioServerSocketChannel) {
final NioServerSocketChannel ch = (NioServerSocketChannel) channel;
execute(new Runnable() {
@Override
public void run() {
try {
ch.socket.register(selector, SelectionKey.OP_ACCEPT, ch);
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
});
} else if (channel instanceof NioClientSocketChannel) {
final NioClientSocketChannel clientChannel = (NioClientSocketChannel) channel;
execute(new Runnable() {
@Override
public void run() {
try {
try {
clientChannel.getJdkChannel().register(selector, clientChannel.getRawInterestOps() | SelectionKey.OP_CONNECT, clientChannel);
} catch (ClosedChannelException ignored) {
clientChannel.getWorker().close(clientChannel, succeededFuture(channel));
}
int connectTimeout = channel.getConfig().getConnectTimeoutMillis();
if (connectTimeout > 0) {
clientChannel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;
}
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
});
} else if (channel instanceof AbstractNioChannel) {
execute(new Runnable() {
@Override
public void run() {
try {
registerTask((AbstractNioChannel) channel, future);
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
});
} else {
throw new UnsupportedOperationException("Unable to handle channel " + channel);
}
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
@Override
protected void run() {
long lastConnectTimeoutCheckTimeNanos = System.nanoTime();
@ -186,13 +94,6 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
wakenUp.set(false);
if (CONSTRAINT_LEVEL != 0) {
selectorGuard.writeLock().lock();
// This empty synchronization block prevents the selector
// from acquiring its lock.
selectorGuard.writeLock().unlock();
}
try {
SelectorUtil.select(selector);
@ -239,19 +140,9 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
processConnectTimeout(selector.keys(), currentTimeNanos);
}
// Exit the loop when there's nothing to handle.
// The shutdown flag is used to delay the shutdown of this
// loop to avoid excessive Selector creation when
// connections are registered in a one-by-one manner instead of
// concurrent manner.
if (selector.keys().isEmpty()) {
if (isShutdown()) {
synchronized (startStopLock) {
if (!hasTasks() && selector.keys().isEmpty()) {
break;
}
}
}
if (isShutdown()) {
// FIXME: Close all channels immediately and break the loop.
break;
}
} catch (Throwable t) {
logger.warn(
@ -293,83 +184,42 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
Channel ch = (Channel) k.attachment();
boolean removeKey = true;
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
if (!read(k)) {
if (ch.unsafe().read() < 0) {
// Connection already closed - no need to handle write.
continue;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
writeFromSelectorLoop(k);
ch.unsafe().flush(null);
}
if ((readyOps & SelectionKey.OP_ACCEPT) != 0) {
removeKey = accept(k);
ch.unsafe().read();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
connect(k);
ch.unsafe().finishConnect();
}
} catch (CancelledKeyException ignored) {
close(k);
ch.unsafe().close(null);
} finally {
if (removeKey) {
i.remove();
}
}
if (cleanUpCancelledKeys()) {
break; // break the loop to avoid ConcurrentModificationException
}
}
}
protected boolean accept(SelectionKey key) {
NioServerSocketChannel channel = (NioServerSocketChannel) key.attachment();
try {
boolean handled = false;
// accept all sockets that are waiting atm
for (;;) {
SocketChannel acceptedSocket = channel.socket.accept();
if (acceptedSocket == null) {
break;
}
// TODO: Remove the casting stuff
ChannelPipeline pipeline =
channel.getConfig().getPipelineFactory().getPipeline();
NioWorker worker = channel.workers.nextWorker();
worker.registerWithWorker(NioAcceptedSocketChannel.create(channel.getFactory(), pipeline, channel,
channel.getPipeline().getSink(), acceptedSocket, worker), null);
handled = true;
}
return handled;
} catch (SocketTimeoutException e) {
// Thrown every second to get ClosedChannelException
// raised.
} catch (CancelledKeyException e) {
// Raised by accept() when the server socket was closed.
} catch (ClosedSelectorException e) {
// Raised by accept() when the server socket was closed.
} catch (ClosedChannelException e) {
// Closed as requested.
} catch (Throwable e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to accept a connection.", e);
}
}
return true;
}
protected void processConnectTimeout(Set<SelectionKey> keys, long currentTimeNanos) {
ConnectException cause = null;
for (SelectionKey k: keys) {
@ -389,38 +239,21 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
continue;
}
// check if the channel is in
Object attachment = k.attachment();
if (attachment instanceof NioClientSocketChannel) {
NioClientSocketChannel ch = (NioClientSocketChannel) attachment;
if (!ch.isConnected() && ch.connectDeadlineNanos > 0 && currentTimeNanos >= ch.connectDeadlineNanos) {
if (cause == null) {
cause = new ConnectException("connection timed out");
}
ch.connectFuture.setFailure(cause);
fireExceptionCaught(ch, cause);
ch.getWorker().close(ch, succeededFuture(ch));
}
}
}
}
protected void connect(SelectionKey k) {
final NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
try {
// TODO: Remove cast
if (ch.getJdkChannel().finishConnect()) {
registerTask(ch, ch.connectFuture);
}
} catch (Throwable t) {
ch.connectFuture.setFailure(t);
fireExceptionCaught(ch, t);
k.cancel(); // Some JDK implementations run into an infinite loop without this.
ch.getWorker().close(ch, succeededFuture(ch));
// FIXME: Implement connect timeout.
// Channel ch = (Channel) k.attachment();
// if (attachment instanceof NioClientSocketChannel) {
// NioClientSocketChannel ch = (NioClientSocketChannel) attachment;
// if (!ch.isConnected() && ch.connectDeadlineNanos > 0 && currentTimeNanos >= ch.connectDeadlineNanos) {
//
// if (cause == null) {
// cause = new ConnectException("connection timed out");
// }
//
// ch.connectFuture.setFailure(cause);
// fireExceptionCaught(ch, cause);
// ch.getWorker().close(ch, succeededFuture(ch));
// }
// }
}
}
@ -432,495 +265,4 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
}
return false;
}
protected void close(SelectionKey k) {
Object attachment = k.attachment();
if (attachment instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) attachment;
close(ch, succeededFuture(ch));
} else if (attachment instanceof NioServerSocketChannel) {
NioServerSocketChannel ch = (NioServerSocketChannel) attachment;
close(ch, succeededFuture(ch));
} else {
// TODO: What todo ?
}
}
public void writeFromUserCode(final AbstractNioChannel channel) {
if (!channel.isConnected()) {
cleanUpWriteBuffer(channel);
return;
}
if (scheduleWriteIfNecessary(channel)) {
return;
}
// From here, we are sure Thread.currentThread() == workerThread.
if (channel.writeSuspended) {
return;
}
if (channel.inWriteNowLoop) {
return;
}
write0(channel);
}
public void writeFromTaskLoop(AbstractNioChannel ch) {
if (!ch.writeSuspended) {
write0(ch);
}
}
void writeFromSelectorLoop(final SelectionKey k) {
AbstractNioChannel ch = (AbstractNioChannel) k.attachment();
ch.writeSuspended = false;
write0(ch);
}
protected boolean scheduleWriteIfNecessary(final AbstractNioChannel channel) {
if (!inEventLoop()) {
if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
execute(channel.writeTask);
}
final Selector workerSelector = selector;
if (workerSelector != null) {
if (wakenUp.compareAndSet(false, true)) {
workerSelector.wakeup();
}
}
return true;
}
return false;
}
protected void write0(AbstractNioChannel channel) {
boolean open = true;
boolean addOpWrite = false;
boolean removeOpWrite = false;
boolean inEventLoop = inEventLoop();
long writtenBytes = 0;
final SendBufferPool sendBufferPool = this.sendBufferPool;
final WritableByteChannel ch = channel.getJdkChannel();
final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
final int writeSpinCount = channel.getConfig().getWriteSpinCount();
synchronized (channel.writeLock) {
channel.inWriteNowLoop = true;
for (;;) {
MessageEvent evt = channel.currentWriteEvent;
SendBuffer buf;
if (evt == null) {
if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
removeOpWrite = true;
channel.writeSuspended = false;
break;
}
channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
} else {
buf = channel.currentWriteBuffer;
}
ChannelFuture future = evt.getFuture();
try {
long localWrittenBytes = 0;
for (int i = writeSpinCount; i > 0; i --) {
localWrittenBytes = buf.transferTo(ch);
if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
break;
}
if (buf.finished()) {
break;
}
}
if (buf.finished()) {
// Successful write - proceed to the next message.
buf.release();
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
evt = null;
buf = null;
future.setSuccess();
} else {
// Not written fully - perhaps the kernel buffer is full.
addOpWrite = true;
channel.writeSuspended = true;
if (localWrittenBytes > 0) {
// Notify progress listeners if necessary.
future.setProgress(
localWrittenBytes,
buf.writtenBytes(), buf.totalBytes());
}
break;
}
} catch (AsynchronousCloseException e) {
// Doesn't need a user attention - ignore.
} catch (Throwable t) {
if (buf != null) {
buf.release();
}
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
buf = null;
evt = null;
future.setFailure(t);
if (inEventLoop) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
if (t instanceof IOException) {
open = false;
close(channel, succeededFuture(channel));
}
}
}
channel.inWriteNowLoop = false;
// Initially, the following block was executed after releasing
// the writeLock, but there was a race condition, and it has to be
// executed before releasing the writeLock:
//
// https://issues.jboss.org/browse/NETTY-410
//
if (open) {
if (addOpWrite) {
setOpWrite(channel);
} else if (removeOpWrite) {
clearOpWrite(channel);
}
}
}
if (inEventLoop) {
fireWriteComplete(channel, writtenBytes);
} else {
fireWriteCompleteLater(channel, writtenBytes);
}
}
protected void setOpWrite(AbstractNioChannel channel) {
Selector selector = this.selector;
SelectionKey key = channel.getJdkChannel().keyFor(selector);
if (key == null) {
return;
}
if (!key.isValid()) {
close(key);
return;
}
// interestOps can change at any time and at any thread.
// Acquire a lock to avoid possible race condition.
synchronized (channel.interestOpsLock) {
int interestOps = channel.getRawInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps);
channel.setRawInterestOpsNow(interestOps);
}
}
}
protected void clearOpWrite(AbstractNioChannel channel) {
Selector selector = this.selector;
SelectionKey key = channel.getJdkChannel().keyFor(selector);
if (key == null) {
return;
}
if (!key.isValid()) {
close(key);
return;
}
// interestOps can change at any time and at any thread.
// Acquire a lock to avoid possible race condition.
synchronized (channel.interestOpsLock) {
int interestOps = channel.getRawInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps);
channel.setRawInterestOpsNow(interestOps);
}
}
}
public void close(NioServerSocketChannel channel, ChannelFuture future) {
boolean inEventLoop = inEventLoop();
boolean bound = channel.isBound();
try {
if (channel.socket.isOpen()) {
channel.socket.close();
if (selector != null) {
selector.wakeup();
}
}
// Make sure the boss thread is not running so that that the future
// is notified after a new connection cannot be accepted anymore.
// See NETTY-256 for more information.
channel.shutdownLock.lock();
try {
if (channel.setClosed()) {
future.setSuccess();
if (bound) {
if (inEventLoop) {
fireChannelUnbound(channel);
} else {
fireChannelUnboundLater(channel);
}
}
if (inEventLoop) {
fireChannelClosed(channel);
} else {
fireChannelClosedLater(channel);
}
} else {
future.setSuccess();
}
} finally {
channel.shutdownLock.unlock();
}
} catch (Throwable t) {
future.setFailure(t);
if (inEventLoop) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}
public void close(AbstractNioChannel channel, ChannelFuture future) {
boolean connected = channel.isConnected();
boolean bound = channel.isBound();
boolean inEventLoop = inEventLoop();
try {
channel.getJdkChannel().close();
cancelledKeys ++;
if (channel.setClosed()) {
future.setSuccess();
if (connected) {
if (inEventLoop) {
fireChannelDisconnected(channel);
} else {
fireChannelDisconnectedLater(channel);
}
}
if (bound) {
if (inEventLoop) {
fireChannelUnbound(channel);
} else {
fireChannelUnboundLater(channel);
}
}
cleanUpWriteBuffer(channel);
if (inEventLoop) {
fireChannelClosed(channel);
} else {
fireChannelClosedLater(channel);
}
} else {
future.setSuccess();
}
} catch (Throwable t) {
future.setFailure(t);
if (inEventLoop) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}
protected void cleanUpWriteBuffer(AbstractNioChannel channel) {
Exception cause = null;
boolean fireExceptionCaught = false;
// Clean up the stale messages in the write buffer.
synchronized (channel.writeLock) {
MessageEvent evt = channel.currentWriteEvent;
if (evt != null) {
// Create the exception only once to avoid the excessive overhead
// caused by fillStackTrace.
if (channel.isOpen()) {
cause = new NotYetConnectedException();
} else {
cause = new ClosedChannelException();
}
ChannelFuture future = evt.getFuture();
channel.currentWriteBuffer.release();
channel.currentWriteBuffer = null;
channel.currentWriteEvent = null;
evt = null;
future.setFailure(cause);
fireExceptionCaught = true;
}
Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
if (!writeBuffer.isEmpty()) {
// Create the exception only once to avoid the excessive overhead
// caused by fillStackTrace.
if (cause == null) {
if (channel.isOpen()) {
cause = new NotYetConnectedException();
} else {
cause = new ClosedChannelException();
}
}
for (;;) {
evt = writeBuffer.poll();
if (evt == null) {
break;
}
evt.getFuture().setFailure(cause);
fireExceptionCaught = true;
}
}
}
if (fireExceptionCaught) {
if (inEventLoop()) {
fireExceptionCaught(channel, cause);
} else {
fireExceptionCaughtLater(channel, cause);
}
}
}
public void setInterestOps(AbstractNioChannel channel, ChannelFuture future, int interestOps) {
boolean changed = false;
boolean inEventLoop = inEventLoop();
try {
// interestOps can change at any time and at any thread.
// Acquire a lock to avoid possible race condition.
synchronized (channel.interestOpsLock) {
Selector selector = this.selector;
SelectionKey key = channel.getJdkChannel().keyFor(selector);
// Override OP_WRITE flag - a user cannot change this flag.
interestOps &= ~Channel.OP_WRITE;
interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE;
if (key == null || selector == null) {
if (channel.getRawInterestOps() != interestOps) {
changed = true;
}
// Not registered to the worker yet.
// Set the rawInterestOps immediately; RegisterTask will pick it up.
channel.setRawInterestOpsNow(interestOps);
future.setSuccess();
if (changed) {
if (inEventLoop) {
fireChannelInterestChanged(channel);
} else {
fireChannelInterestChangedLater(channel);
}
}
return;
}
switch (CONSTRAINT_LEVEL) {
case 0:
if (channel.getRawInterestOps() != interestOps) {
key.interestOps(interestOps);
if (!inEventLoop &&
wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
changed = true;
}
break;
case 1:
case 2:
if (channel.getRawInterestOps() != interestOps) {
if (inEventLoop) {
key.interestOps(interestOps);
changed = true;
} else {
selectorGuard.readLock().lock();
try {
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
key.interestOps(interestOps);
changed = true;
} finally {
selectorGuard.readLock().unlock();
}
}
}
break;
default:
throw new Error();
}
if (changed) {
channel.setRawInterestOpsNow(interestOps);
}
}
future.setSuccess();
if (changed) {
if (inEventLoop) {
fireChannelInterestChanged(channel);
} else {
fireChannelInterestChangedLater(channel);
}
}
} catch (CancelledKeyException e) {
// setInterestOps() was called on a closed channel.
ClosedChannelException cce = new ClosedChannelException();
future.setFailure(cce);
if (inEventLoop) {
fireExceptionCaught(channel, cce);
} else {
fireExceptionCaughtLater(channel, cce);
}
} catch (Throwable t) {
future.setFailure(t);
if (inEventLoop) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}
/**
* Read is called when a Selector has been notified that the underlying channel
* was something to be read. The channel would previously have registered its interest
* in read operations.
*
* @param k The selection key which contains the Selector registration information.
*/
protected abstract boolean read(SelectionKey k);
protected abstract void registerTask(AbstractNioChannel channel, ChannelFuture future);
}