Merge branch 'nio2'

This commit is contained in:
Trustin Lee 2012-07-07 14:33:57 +09:00
commit 4f9b6c5bad
135 changed files with 1300 additions and 7 deletions

View File

@ -276,11 +276,19 @@
<ignores> <ignores>
<ignore>sun.misc.Unsafe</ignore> <ignore>sun.misc.Unsafe</ignore>
<ignore>java.util.zip.Deflater</ignore> <ignore>java.util.zip.Deflater</ignore>
<!-- Used for NIO UDP multicast --> <!-- Used for NIO UDP multicast -->
<ignore>java.nio.channels.DatagramChannel</ignore> <ignore>java.nio.channels.DatagramChannel</ignore>
<ignore>java.nio.channels.MembershipKey</ignore> <ignore>java.nio.channels.MembershipKey</ignore>
<ignore>java.net.StandardSocketOptions</ignore> <ignore>java.net.StandardSocketOptions</ignore>
<ignore>java.net.StandardProtocolFamily</ignore> <ignore>java.net.StandardProtocolFamily</ignore>
<!-- Used for NIO. 2 -->
<ignore>java.nio.channels.AsynchronousChannel</ignore>
<ignore>java.nio.channels.AsynchronousSocketChannel</ignore>
<ignore>java.nio.channels.AsynchronousServerSocketChannel</ignore>
<ignore>java.nio.channels.AsynchronousChannelGroup</ignore>
<ignore>java.nio.channels.NetworkChannel</ignore>
</ignores> </ignores>
</configuration> </configuration>
<executions> <executions>

View File

@ -17,7 +17,11 @@ package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.InternetProtocolFamily; import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.aio.AioEventLoop;
import io.netty.channel.socket.aio.AioServerSocketChannel;
import io.netty.channel.socket.aio.AioSocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioEventLoop; import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
@ -48,6 +52,15 @@ final class SocketTestPermutation {
channel(new NioServerSocketChannel()); channel(new NioServerSocketChannel());
} }
}); });
sbfs.add(new Factory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
EventLoop loop = new AioEventLoop();
return new ServerBootstrap().
eventLoop(loop, loop).
channel(new AioServerSocketChannel());
}
});
sbfs.add(new Factory<ServerBootstrap>() { sbfs.add(new Factory<ServerBootstrap>() {
@Override @Override
public ServerBootstrap newInstance() { public ServerBootstrap newInstance() {
@ -66,6 +79,12 @@ final class SocketTestPermutation {
return new Bootstrap().eventLoop(new NioEventLoop()).channel(new NioSocketChannel()); return new Bootstrap().eventLoop(new NioEventLoop()).channel(new NioSocketChannel());
} }
}); });
cbfs.add(new Factory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().eventLoop(new AioEventLoop()).channel(new AioSocketChannel());
}
});
cbfs.add(new Factory<Bootstrap>() { cbfs.add(new Factory<Bootstrap>() {
@Override @Override
public Bootstrap newInstance() { public Bootstrap newInstance() {

View File

View File

@ -167,13 +167,6 @@ public class ServerBootstrap {
return future; return future;
} }
try {
channel.config().setOptions(parentOptions);
} catch (Exception e) {
future.setFailure(e);
return future;
}
ChannelPipeline p = channel.pipeline(); ChannelPipeline p = channel.pipeline();
if (handler != null) { if (handler != null) {
p.addLast(handler); p.addLast(handler);
@ -185,6 +178,12 @@ public class ServerBootstrap {
future.setFailure(f.cause()); future.setFailure(f.cause());
return future; return future;
} }
try {
channel.config().setOptions(parentOptions);
} catch (Exception e) {
future.setFailure(e);
return future;
}
if (!channel.isOpen()) { if (!channel.isOpen()) {
// Registration was successful but the channel was closed due to some failure in // Registration was successful but the channel was closed due to some failure in

View File

View File

View File

0
transport/src/main/java/io/netty/channel/Channel.java Normal file → Executable file
View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

@ -0,0 +1,168 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.aio;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.AsynchronousChannel;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public abstract class AbstractAioChannel extends AbstractChannel {
protected volatile AsynchronousChannel ch;
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
protected ChannelFuture connectFuture;
protected ScheduledFuture<?> connectTimeoutFuture;
private ConnectException connectTimeoutException;
protected AbstractAioChannel(Channel parent, Integer id) {
super(parent, id);
}
@Override
public InetSocketAddress localAddress() {
if (ch == null) {
return null;
}
return (InetSocketAddress) super.localAddress();
}
@Override
public InetSocketAddress remoteAddress() {
if (ch == null) {
return null;
}
return (InetSocketAddress) super.remoteAddress();
}
protected AsynchronousChannel javaChannel() {
return ch;
}
@Override
public boolean isOpen() {
return ch == null || ch.isOpen();
}
@Override
protected void doDeregister() throws Exception {
// NOOP
}
@Override
protected AsyncUnsafe newUnsafe() {
return new AsyncUnsafe();
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof AioChildEventLoop;
}
protected class AsyncUnsafe extends AbstractUnsafe {
@Override
public void connect(final SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
if (!ensureOpen(future)) {
return;
}
try {
if (connectFuture != null) {
throw new IllegalStateException("connection attempt already made");
}
connectFuture = future;
doConnect(remoteAddress, localAddress, future);
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
if (connectTimeoutException == null) {
connectTimeoutException = new ConnectException("connection timed out");
}
ChannelFuture connectFuture = AbstractAioChannel.this.connectFuture;
if (connectFuture != null &&
connectFuture.setFailure(connectTimeoutException)) {
pipeline().fireExceptionCaught(connectTimeoutException);
close(voidFuture());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
} catch (Throwable t) {
future.setFailure(t);
pipeline().fireExceptionCaught(t);
closeIfClosed();
}
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
connect(remoteAddress, localAddress, future);
}
});
}
}
protected final void connectFailed(Throwable t) {
connectFuture.setFailure(t);
pipeline().fireExceptionCaught(t);
closeIfClosed();
}
protected final void connectSuccess() {
assert eventLoop().inEventLoop();
assert connectFuture != null;
try {
boolean wasActive = isActive();
connectFuture.setSuccess();
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
} catch (Throwable t) {
connectFuture.setFailure(t);
pipeline().fireExceptionCaught(t);
closeIfClosed();
} finally {
connectTimeoutFuture.cancel(false);
connectFuture = null;
}
}
}
protected abstract void doConnect(SocketAddress remoteAddress,
SocketAddress localAddress, ChannelFuture connectFuture);
}

View File

@ -0,0 +1,51 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.aio;
import io.netty.channel.SingleThreadEventLoop;
import java.util.concurrent.ThreadFactory;
final class AioChildEventLoop extends SingleThreadEventLoop {
AioChildEventLoop(ThreadFactory threadFactory) {
super(threadFactory);
}
@Override
protected void run() {
for (;;) {
Runnable task;
try {
task = takeTask();
task.run();
} catch (InterruptedException e) {
// Waken up by interruptThread()
}
if (isShutdown() && peekTask() == null) {
break;
}
}
}
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop) {
interruptThread();
}
}
}

View File

@ -0,0 +1,69 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.aio;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import java.nio.channels.CompletionHandler;
/**
* Special {@link CompletionHandler} which makes sure that the callback methods gets executed in the {@link EventLoop}
*
*
*/
abstract class AioCompletionHandler<V, A extends Channel> implements CompletionHandler<V, A> {
/**
* See {@link CompletionHandler#completed(Object, Object)}
*/
protected abstract void completed0(V result, A channel);
/**
* Set {@link CompletionHandler#failed(Throwable, Object)}
*/
protected abstract void failed0(Throwable exc, A channel);
@Override
public final void completed(final V result, final A channel) {
if (channel.eventLoop().inEventLoop()) {
completed0(result, channel);
} else {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
completed0(result, channel);
}
});
}
}
@Override
public final void failed(final Throwable exc, final A channel) {
if (channel.eventLoop().inEventLoop()) {
failed0(exc, channel);
} else {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
failed0(exc, channel);
}
});
}
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.aio;
import io.netty.channel.EventExecutor;
import io.netty.channel.MultithreadEventLoop;
import java.util.concurrent.ThreadFactory;
public class AioEventLoop extends MultithreadEventLoop {
public AioEventLoop() {
this(0);
}
public AioEventLoop(int nThreads) {
this(nThreads, null);
}
public AioEventLoop(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
}
@Override
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
return new AioChildEventLoop(threadFactory);
}
}

Some files were not shown because too many files have changed in this diff Show More