diff --git a/pom.xml b/pom.xml index b0f59c540f..428d3a35c7 100644 --- a/pom.xml +++ b/pom.xml @@ -276,11 +276,19 @@ sun.misc.Unsafe java.util.zip.Deflater + java.nio.channels.DatagramChannel java.nio.channels.MembershipKey java.net.StandardSocketOptions java.net.StandardProtocolFamily + + + java.nio.channels.AsynchronousChannel + java.nio.channels.AsynchronousSocketChannel + java.nio.channels.AsynchronousServerSocketChannel + java.nio.channels.AsynchronousChannelGroup + java.nio.channels.NetworkChannel diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java index 9e94f64638..188fa2b835 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java @@ -17,7 +17,11 @@ package io.netty.testsuite.transport.socket; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.EventLoop; import io.netty.channel.socket.InternetProtocolFamily; +import io.netty.channel.socket.aio.AioEventLoop; +import io.netty.channel.socket.aio.AioServerSocketChannel; +import io.netty.channel.socket.aio.AioSocketChannel; import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioEventLoop; import io.netty.channel.socket.nio.NioServerSocketChannel; @@ -48,6 +52,15 @@ final class SocketTestPermutation { channel(new NioServerSocketChannel()); } }); + sbfs.add(new Factory() { + @Override + public ServerBootstrap newInstance() { + EventLoop loop = new AioEventLoop(); + return new ServerBootstrap(). + eventLoop(loop, loop). + channel(new AioServerSocketChannel()); + } + }); sbfs.add(new Factory() { @Override public ServerBootstrap newInstance() { @@ -66,6 +79,12 @@ final class SocketTestPermutation { return new Bootstrap().eventLoop(new NioEventLoop()).channel(new NioSocketChannel()); } }); + cbfs.add(new Factory() { + @Override + public Bootstrap newInstance() { + return new Bootstrap().eventLoop(new AioEventLoop()).channel(new AioSocketChannel()); + } + }); cbfs.add(new Factory() { @Override public Bootstrap newInstance() { diff --git a/transport/src/main/java/io/netty/bootstrap/Bootstrap.java b/transport/src/main/java/io/netty/bootstrap/Bootstrap.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java old mode 100644 new mode 100755 index 15c076b64b..098ed8a1a3 --- a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java @@ -167,13 +167,6 @@ public class ServerBootstrap { return future; } - try { - channel.config().setOptions(parentOptions); - } catch (Exception e) { - future.setFailure(e); - return future; - } - ChannelPipeline p = channel.pipeline(); if (handler != null) { p.addLast(handler); @@ -185,6 +178,12 @@ public class ServerBootstrap { future.setFailure(f.cause()); return future; } + try { + channel.config().setOptions(parentOptions); + } catch (Exception e) { + future.setFailure(e); + return future; + } if (!channel.isOpen()) { // Registration was successful but the channel was closed due to some failure in diff --git a/transport/src/main/java/io/netty/bootstrap/package-info.java b/transport/src/main/java/io/netty/bootstrap/package-info.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/BlockingOperationException.java b/transport/src/main/java/io/netty/channel/BlockingOperationException.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelConfig.java b/transport/src/main/java/io/netty/channel/ChannelConfig.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelException.java b/transport/src/main/java/io/netty/channel/ChannelException.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelFuture.java b/transport/src/main/java/io/netty/channel/ChannelFuture.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelFutureAggregator.java b/transport/src/main/java/io/netty/channel/ChannelFutureAggregator.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelFutureFactory.java b/transport/src/main/java/io/netty/channel/ChannelFutureFactory.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelFutureListener.java b/transport/src/main/java/io/netty/channel/ChannelFutureListener.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelFutureProgressListener.java b/transport/src/main/java/io/netty/channel/ChannelFutureProgressListener.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelHandler.java b/transport/src/main/java/io/netty/channel/ChannelHandler.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerLifeCycleException.java b/transport/src/main/java/io/netty/channel/ChannelHandlerLifeCycleException.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerType.java b/transport/src/main/java/io/netty/channel/ChannelHandlerType.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundByteHandler.java b/transport/src/main/java/io/netty/channel/ChannelInboundByteHandler.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundByteHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundByteHandlerAdapter.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundHandler.java b/transport/src/main/java/io/netty/channel/ChannelInboundHandler.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java b/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandler.java b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandler.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelInitializer.java b/transport/src/main/java/io/netty/channel/ChannelInitializer.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelOperationHandler.java b/transport/src/main/java/io/netty/channel/ChannelOperationHandler.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelOperationHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOperationHandlerAdapter.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelOption.java b/transport/src/main/java/io/netty/channel/ChannelOption.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundByteHandler.java b/transport/src/main/java/io/netty/channel/ChannelOutboundByteHandler.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundByteHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOutboundByteHandlerAdapter.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundHandler.java b/transport/src/main/java/io/netty/channel/ChannelOutboundHandler.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java b/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandler.java b/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandler.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelPipeline.java b/transport/src/main/java/io/netty/channel/ChannelPipeline.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelPipelineException.java b/transport/src/main/java/io/netty/channel/ChannelPipelineException.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelStateHandler.java b/transport/src/main/java/io/netty/channel/ChannelStateHandler.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ChannelStateHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelStateHandlerAdapter.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/CombinedChannelHandler.java b/transport/src/main/java/io/netty/channel/CombinedChannelHandler.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/CompleteChannelFuture.java b/transport/src/main/java/io/netty/channel/CompleteChannelFuture.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelFuture.java b/transport/src/main/java/io/netty/channel/DefaultChannelFuture.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipelineModificationTask.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipelineModificationTask.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/DefaultChildEventExecutor.java b/transport/src/main/java/io/netty/channel/DefaultChildEventExecutor.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/DefaultEventExecutor.java b/transport/src/main/java/io/netty/channel/DefaultEventExecutor.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/EventExecutor.java b/transport/src/main/java/io/netty/channel/EventExecutor.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/EventLoop.java b/transport/src/main/java/io/netty/channel/EventLoop.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/EventLoopException.java b/transport/src/main/java/io/netty/channel/EventLoopException.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/FailedChannelFuture.java b/transport/src/main/java/io/netty/channel/FailedChannelFuture.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/MultithreadEventExecutor.java b/transport/src/main/java/io/netty/channel/MultithreadEventExecutor.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/MultithreadEventLoop.java b/transport/src/main/java/io/netty/channel/MultithreadEventLoop.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/NoSuchBufferException.java b/transport/src/main/java/io/netty/channel/NoSuchBufferException.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/ServerChannel.java b/transport/src/main/java/io/netty/channel/ServerChannel.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java b/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/SucceededChannelFuture.java b/transport/src/main/java/io/netty/channel/SucceededChannelFuture.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/VoidChannelFuture.java b/transport/src/main/java/io/netty/channel/VoidChannelFuture.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/embedded/AbstractEmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/AbstractEmbeddedChannel.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedByteChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedByteChannel.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedMessageChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedMessageChannel.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedSocketAddress.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedSocketAddress.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/embedded/package-info.java b/transport/src/main/java/io/netty/channel/embedded/package-info.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/group/ChannelGroup.java b/transport/src/main/java/io/netty/channel/group/ChannelGroup.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/group/ChannelGroupFuture.java b/transport/src/main/java/io/netty/channel/group/ChannelGroupFuture.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/group/ChannelGroupFutureListener.java b/transport/src/main/java/io/netty/channel/group/ChannelGroupFutureListener.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/group/CombinedIterator.java b/transport/src/main/java/io/netty/channel/group/CombinedIterator.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroupFuture.java b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroupFuture.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/group/package-info.java b/transport/src/main/java/io/netty/channel/group/package-info.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/local/LocalAddress.java b/transport/src/main/java/io/netty/channel/local/LocalAddress.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannelRegistry.java b/transport/src/main/java/io/netty/channel/local/LocalChannelRegistry.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/local/LocalChildEventLoop.java b/transport/src/main/java/io/netty/channel/local/LocalChildEventLoop.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java b/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/local/package-info.java b/transport/src/main/java/io/netty/channel/local/package-info.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/package-info.java b/transport/src/main/java/io/netty/channel/package-info.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/DatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/DatagramChannel.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/DatagramChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/DatagramChannelConfig.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/DatagramPacket.java b/transport/src/main/java/io/netty/channel/socket/DatagramPacket.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/DefaultServerSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/DefaultServerSocketChannelConfig.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/DefaultSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/DefaultSocketChannelConfig.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/InternetProtocolFamily.java b/transport/src/main/java/io/netty/channel/socket/InternetProtocolFamily.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/ServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/ServerSocketChannel.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/ServerSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/ServerSocketChannelConfig.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/SocketChannel.java b/transport/src/main/java/io/netty/channel/socket/SocketChannel.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/SocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/SocketChannelConfig.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java new file mode 100755 index 0000000000..3fab6e445a --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java @@ -0,0 +1,168 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.aio; + +import io.netty.channel.AbstractChannel; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.EventLoop; + +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.AsynchronousChannel; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public abstract class AbstractAioChannel extends AbstractChannel { + + protected volatile AsynchronousChannel ch; + + /** + * The future of the current connection attempt. If not null, subsequent + * connection attempts will fail. + */ + protected ChannelFuture connectFuture; + protected ScheduledFuture connectTimeoutFuture; + private ConnectException connectTimeoutException; + + protected AbstractAioChannel(Channel parent, Integer id) { + super(parent, id); + } + + + @Override + public InetSocketAddress localAddress() { + if (ch == null) { + return null; + } + return (InetSocketAddress) super.localAddress(); + } + + @Override + public InetSocketAddress remoteAddress() { + if (ch == null) { + return null; + } + return (InetSocketAddress) super.remoteAddress(); + } + + protected AsynchronousChannel javaChannel() { + return ch; + } + + + @Override + public boolean isOpen() { + return ch == null || ch.isOpen(); + } + + @Override + protected void doDeregister() throws Exception { + // NOOP + } + + @Override + protected AsyncUnsafe newUnsafe() { + return new AsyncUnsafe(); + } + + @Override + protected boolean isCompatible(EventLoop loop) { + return loop instanceof AioChildEventLoop; + } + + protected class AsyncUnsafe extends AbstractUnsafe { + + @Override + public void connect(final SocketAddress remoteAddress, + final SocketAddress localAddress, final ChannelFuture future) { + if (eventLoop().inEventLoop()) { + if (!ensureOpen(future)) { + return; + } + + try { + if (connectFuture != null) { + throw new IllegalStateException("connection attempt already made"); + } + connectFuture = future; + + doConnect(remoteAddress, localAddress, future); + + // Schedule connect timeout. + int connectTimeoutMillis = config().getConnectTimeoutMillis(); + if (connectTimeoutMillis > 0) { + connectTimeoutFuture = eventLoop().schedule(new Runnable() { + @Override + public void run() { + if (connectTimeoutException == null) { + connectTimeoutException = new ConnectException("connection timed out"); + } + ChannelFuture connectFuture = AbstractAioChannel.this.connectFuture; + if (connectFuture != null && + connectFuture.setFailure(connectTimeoutException)) { + pipeline().fireExceptionCaught(connectTimeoutException); + close(voidFuture()); + } + } + }, connectTimeoutMillis, TimeUnit.MILLISECONDS); + } + + } catch (Throwable t) { + future.setFailure(t); + pipeline().fireExceptionCaught(t); + closeIfClosed(); + } + } else { + eventLoop().execute(new Runnable() { + @Override + public void run() { + connect(remoteAddress, localAddress, future); + } + }); + } + } + + protected final void connectFailed(Throwable t) { + connectFuture.setFailure(t); + pipeline().fireExceptionCaught(t); + closeIfClosed(); + } + + protected final void connectSuccess() { + assert eventLoop().inEventLoop(); + assert connectFuture != null; + try { + boolean wasActive = isActive(); + connectFuture.setSuccess(); + if (!wasActive && isActive()) { + pipeline().fireChannelActive(); + } + } catch (Throwable t) { + connectFuture.setFailure(t); + pipeline().fireExceptionCaught(t); + closeIfClosed(); + } finally { + connectTimeoutFuture.cancel(false); + connectFuture = null; + } + } + } + protected abstract void doConnect(SocketAddress remoteAddress, + SocketAddress localAddress, ChannelFuture connectFuture); + +} diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioChildEventLoop.java b/transport/src/main/java/io/netty/channel/socket/aio/AioChildEventLoop.java new file mode 100755 index 0000000000..9b53267910 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioChildEventLoop.java @@ -0,0 +1,51 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.aio; + +import io.netty.channel.SingleThreadEventLoop; + +import java.util.concurrent.ThreadFactory; + +final class AioChildEventLoop extends SingleThreadEventLoop { + + AioChildEventLoop(ThreadFactory threadFactory) { + super(threadFactory); + } + + @Override + protected void run() { + for (;;) { + Runnable task; + try { + task = takeTask(); + task.run(); + } catch (InterruptedException e) { + // Waken up by interruptThread() + } + + if (isShutdown() && peekTask() == null) { + break; + } + } + } + + @Override + protected void wakeup(boolean inEventLoop) { + if (!inEventLoop) { + interruptThread(); + } + } +} diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioCompletionHandler.java b/transport/src/main/java/io/netty/channel/socket/aio/AioCompletionHandler.java new file mode 100644 index 0000000000..a78235b71b --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioCompletionHandler.java @@ -0,0 +1,69 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.aio; + +import io.netty.channel.Channel; +import io.netty.channel.EventLoop; + +import java.nio.channels.CompletionHandler; + +/** + * Special {@link CompletionHandler} which makes sure that the callback methods gets executed in the {@link EventLoop} + * + * + */ +abstract class AioCompletionHandler implements CompletionHandler { + + /** + * See {@link CompletionHandler#completed(Object, Object)} + */ + protected abstract void completed0(V result, A channel); + + /** + * Set {@link CompletionHandler#failed(Throwable, Object)} + */ + protected abstract void failed0(Throwable exc, A channel); + + @Override + public final void completed(final V result, final A channel) { + if (channel.eventLoop().inEventLoop()) { + completed0(result, channel); + } else { + channel.eventLoop().execute(new Runnable() { + + @Override + public void run() { + completed0(result, channel); + } + }); + } + } + + @Override + public final void failed(final Throwable exc, final A channel) { + if (channel.eventLoop().inEventLoop()) { + failed0(exc, channel); + } else { + channel.eventLoop().execute(new Runnable() { + + @Override + public void run() { + failed0(exc, channel); + } + }); + } + } +} diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoop.java b/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoop.java new file mode 100755 index 0000000000..8d568bbd59 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoop.java @@ -0,0 +1,41 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.aio; + +import io.netty.channel.EventExecutor; +import io.netty.channel.MultithreadEventLoop; + +import java.util.concurrent.ThreadFactory; + +public class AioEventLoop extends MultithreadEventLoop { + + public AioEventLoop() { + this(0); + } + + public AioEventLoop(int nThreads) { + this(nThreads, null); + } + + public AioEventLoop(int nThreads, ThreadFactory threadFactory) { + super(nThreads, threadFactory); + } + + @Override + protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { + return new AioChildEventLoop(threadFactory); + } +} diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java new file mode 100755 index 0000000000..2dea0b580c --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java @@ -0,0 +1,156 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.aio; + +import io.netty.buffer.ChannelBufType; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; +import io.netty.channel.socket.ServerSocketChannel; +import io.netty.logging.InternalLogger; +import io.netty.logging.InternalLoggerFactory; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.channels.AsynchronousChannelGroup; +import java.nio.channels.AsynchronousCloseException; +import java.nio.channels.AsynchronousServerSocketChannel; +import java.nio.channels.AsynchronousSocketChannel; + +public class AioServerSocketChannel extends AbstractAioChannel implements ServerSocketChannel { + + private static final AcceptHandler ACCEPT_HANDLER = new AcceptHandler(); + private static final InternalLogger logger = + InternalLoggerFactory.getInstance(AioServerSocketChannel.class); + private volatile AioServerSocketChannelConfig config; + private boolean closed; + + public AioServerSocketChannel() { + super(null, null); + } + + + @Override + protected AsynchronousServerSocketChannel javaChannel() { + return (AsynchronousServerSocketChannel) super.javaChannel(); + } + + @Override + public boolean isActive() { + AsynchronousServerSocketChannel channel = javaChannel(); + try { + if (channel != null && channel.getLocalAddress() != null) { + return true; + } + } catch (IOException e) { + return true; + } + return false; + } + + @Override + public ChannelBufType bufferType() { + return ChannelBufType.MESSAGE; + } + + @Override + protected SocketAddress localAddress0() { + try { + return javaChannel().getLocalAddress(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + protected SocketAddress remoteAddress0() { + return null; + } + + @Override + protected void doBind(SocketAddress localAddress) throws Exception { + javaChannel().bind(localAddress); + javaChannel().accept(this, ACCEPT_HANDLER); + + } + + @Override + protected void doClose() throws Exception { + if (!closed) { + closed = true; + javaChannel().close(); + } + } + + @Override + protected boolean isFlushPending() { + return false; + } + + @Override + protected void doConnect( + SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) { + future.setFailure(new UnsupportedOperationException()); + } + + @Override + protected void doDisconnect() throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected Runnable doRegister() throws Exception { + ch = AsynchronousServerSocketChannel.open(AsynchronousChannelGroup.withThreadPool(eventLoop())); + config = new AioServerSocketChannelConfig(javaChannel()); + + return null; + } + + private static final class AcceptHandler + extends AioCompletionHandler { + + @Override + protected void completed0(AsynchronousSocketChannel ch, AioServerSocketChannel channel) { + // register again this handler to accept new connections + channel.javaChannel().accept(channel, this); + + // create the socket add it to the buffer and fire the event + channel.pipeline().inboundMessageBuffer().add(new AioSocketChannel(channel, null, ch)); + channel.pipeline().fireInboundBufferUpdated(); + } + + @Override + protected void failed0(Throwable t, AioServerSocketChannel channel) { + boolean asyncClosed = false; + if (t instanceof AsynchronousCloseException) { + asyncClosed = true; + channel.closed = true; + } + // check if the exception was thrown because the channel was closed before + // log something + if (channel.isOpen() && ! asyncClosed) { + logger.warn("Failed to create a new channel from an accepted socket.", t); + } + } + } + + @Override + public AioServerSocketChannelConfig config() { + if (config == null) { + throw new IllegalStateException("Channel not registered yet"); + } + return config; + } +} diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannelConfig.java new file mode 100755 index 0000000000..3a7b1a24b8 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannelConfig.java @@ -0,0 +1,139 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.aio; + +import static io.netty.channel.ChannelOption.*; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelOption; +import io.netty.channel.DefaultChannelConfig; +import io.netty.channel.socket.ServerSocketChannelConfig; + +import java.io.IOException; +import java.net.StandardSocketOptions; +import java.nio.channels.AsynchronousServerSocketChannel; +import java.util.Map; + +/** + * The Async {@link ServerSocketChannelConfig} implementation. + */ +public class AioServerSocketChannelConfig extends DefaultChannelConfig + implements ServerSocketChannelConfig { + + private final AsynchronousServerSocketChannel channel; + private volatile int backlog; + + /** + * Creates a new instance. + */ + public AioServerSocketChannelConfig(AsynchronousServerSocketChannel channel) { + if (channel == null) { + throw new NullPointerException("channel"); + } + this.channel = channel; + } + + @Override + public Map, Object> getOptions() { + return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG); + } + + @SuppressWarnings("unchecked") + @Override + public T getOption(ChannelOption option) { + if (option == SO_RCVBUF) { + return (T) Integer.valueOf(getReceiveBufferSize()); + } + if (option == SO_REUSEADDR) { + return (T) Boolean.valueOf(isReuseAddress()); + } + if (option == SO_BACKLOG) { + return (T) Integer.valueOf(getBacklog()); + } + + return super.getOption(option); + } + + @Override + public boolean setOption(ChannelOption option, T value) { + validate(option, value); + + if (option == SO_RCVBUF) { + setReceiveBufferSize((Integer) value); + } else if (option == SO_REUSEADDR) { + setReuseAddress((Boolean) value); + } else if (option == SO_BACKLOG) { + setBacklog((Integer) value); + } else { + return super.setOption(option, value); + } + + return true; + } + + @Override + public boolean isReuseAddress() { + try { + return channel.getOption(StandardSocketOptions.SO_REUSEADDR); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public void setReuseAddress(boolean reuseAddress) { + try { + channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public int getReceiveBufferSize() { + try { + return channel.getOption(StandardSocketOptions.SO_RCVBUF); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public void setReceiveBufferSize(int receiveBufferSize) { + try { + channel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) { + throw new UnsupportedOperationException(); + } + + @Override + public int getBacklog() { + return backlog; + } + + @Override + public void setBacklog(int backlog) { + if (backlog < 0) { + throw new IllegalArgumentException("backlog: " + backlog); + } + this.backlog = backlog; + } +} diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java new file mode 100755 index 0000000000..67365df082 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java @@ -0,0 +1,342 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.aio; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ChannelBufType; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousChannelGroup; +import java.nio.channels.AsynchronousCloseException; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.CompletionHandler; + + +public class AioSocketChannel extends AbstractAioChannel implements SocketChannel { + + private static final CompletionHandler CONNECT_HANDLER = new ConnectHandler(); + private static final CompletionHandler READ_HANDLER = new ReadHandler(); + private static final CompletionHandler WRITE_HANDLER = new WriteHandler(); + + private boolean closed; + private boolean flushing; + private volatile AioSocketChannelConfig config; + + public AioSocketChannel() { + this(null, null, null); + } + + public AioSocketChannel(AioServerSocketChannel parent, Integer id, AsynchronousSocketChannel channel) { + super(parent, id); + ch = channel; + if (ch != null) { + config = new AioSocketChannelConfig(javaChannel()); + } + } + + @Override + public boolean isActive() { + if (ch == null) { + return false; + } + AsynchronousSocketChannel ch = javaChannel(); + return ch.isOpen() && remoteAddress() != null; + } + + @Override + protected AsynchronousSocketChannel javaChannel() { + return (AsynchronousSocketChannel) super.javaChannel(); + } + + @Override + public ChannelBufType bufferType() { + return ChannelBufType.BYTE; + } + + @Override + protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress, final ChannelFuture future) { + assert ch != null; + if (localAddress != null) { + try { + javaChannel().bind(localAddress); + } catch (IOException e) { + future.setFailure(e); + return; + } + } + + javaChannel().connect(remoteAddress, this, CONNECT_HANDLER); + } + + @Override + protected InetSocketAddress localAddress0() { + try { + return (InetSocketAddress) javaChannel().getLocalAddress(); + } catch (IOException e) { + return null; + } + } + + @Override + protected InetSocketAddress remoteAddress0() { + try { + return (InetSocketAddress) javaChannel().getRemoteAddress(); + } catch (IOException e) { + return null; + } + } + + @Override + protected Runnable doRegister() throws Exception { + if (ch == null) { + ch = AsynchronousSocketChannel.open(AsynchronousChannelGroup.withThreadPool(eventLoop())); + config = new AioSocketChannelConfig(javaChannel()); + return null; + } else if (remoteAddress() != null) { + return new Runnable() { + + @Override + public void run() { + read(); + } + }; + } + return null; + } + + /** + * Trigger a read from the {@link AioSocketChannel} + * + */ + void read() { + ByteBuf byteBuf = pipeline().inboundByteBuffer(); + expandReadBuffer(byteBuf); + // Get a ByteBuffer view on the ByteBuf + ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes()); + javaChannel().read(buffer, this, READ_HANDLER); + } + + + private boolean expandReadBuffer(ByteBuf byteBuf) { + if (!byteBuf.writable()) { + // FIXME: Magic number + byteBuf.ensureWritableBytes(4096); + return true; + } + return false; + } + + @Override + protected void doBind(SocketAddress localAddress) throws Exception { + javaChannel().bind(localAddress); + } + + @Override + protected void doDisconnect() throws Exception { + doClose(); + } + + @Override + protected void doClose() throws Exception { + if (!closed) { + closed = true; + javaChannel().close(); + } + } + + @Override + protected boolean isFlushPending() { + return false; + } + + @Override + protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception { + if (!buf.readable()) { + // Reset reader/writerIndex to 0 if the buffer is empty. + buf.clear(); + return true; + } + + // Only one pending write can be scheduled at one time. Otherwise + // a PendingWriteException will be thrown. So use CAS to not run + // into this + if (!flushing) { + flushing = true; + ByteBuffer buffer = buf.nioBuffer(); + javaChannel().write(buffer, this, WRITE_HANDLER); + } + return false; + } + + + private static final class WriteHandler extends AioCompletionHandler { + + @Override + protected void completed0(Integer result, AioSocketChannel channel) { + ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer(); + if (result > 0) { + + // Update the readerIndex with the amount of read bytes + buf.readerIndex(buf.readerIndex() + result); + + channel.notifyFlushFutures(); + if (!buf.readable()) { + buf.discardReadBytes(); + } + } + + // Allow to have the next write pending + channel.flushing = false; + try { + // try to flush it again if nothing is left it will return fast here + channel.doFlushByteBuffer(buf); + } catch (Exception e) { + // Should never happen, anyway call failed just in case + failed(e, channel); + } + } + + @Override + protected void failed0(Throwable cause, AioSocketChannel channel) { + if (cause instanceof AsynchronousCloseException) { + channel.closed = true; + } + + channel.notifyFlushFutures(cause); + channel.pipeline().fireExceptionCaught(cause); + if (cause instanceof IOException) { + + channel.unsafe().close(channel.unsafe().voidFuture()); + } else { + ByteBuf buf = channel.pipeline().outboundByteBuffer(); + if (!buf.readable()) { + buf.discardReadBytes(); + } + } + // Allow to have the next write pending + channel.flushing = false; + } + } + + private static final class ReadHandler extends AioCompletionHandler { + + @Override + protected void completed0(Integer result, AioSocketChannel channel) { + final ChannelPipeline pipeline = channel.pipeline(); + final ByteBuf byteBuf = pipeline.inboundByteBuffer(); + + boolean closed = false; + boolean read = false; + try { + int localReadAmount = result.intValue(); + if (localReadAmount > 0) { + // Set the writerIndex of the buffer correctly to the + // current writerIndex + read amount of bytes. + // + // This is needed as the ByteBuffer and the ByteBuf does not share + // each others index + byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount); + + read = true; + + } else if (localReadAmount < 0) { + closed = true; + } + + } catch (Throwable t) { + if (t instanceof AsynchronousCloseException) { + channel.closed = true; + } + + if (read) { + read = false; + pipeline.fireInboundBufferUpdated(); + } + + pipeline.fireExceptionCaught(t); + + if (t instanceof IOException) { + channel.unsafe().close(channel.unsafe().voidFuture()); + } + } finally { + if (read) { + pipeline.fireInboundBufferUpdated(); + } + if (closed && channel.isOpen()) { + channel.unsafe().close(channel.unsafe().voidFuture()); + } else { + // start the next read + channel.read(); + + } + } + } + + @Override + protected void failed0(Throwable t, AioSocketChannel channel) { + if (t instanceof AsynchronousCloseException) { + channel.closed = true; + + // TODO: This seems wrong! + return; + } + + channel.pipeline().fireExceptionCaught(t); + if (t instanceof IOException) { + channel.unsafe().close(channel.unsafe().voidFuture()); + } else { + // start the next read + channel.read(); + } + } + } + + private static final class ConnectHandler extends AioCompletionHandler { + + @Override + protected void completed0(Void result, AioSocketChannel channel) { + ((AsyncUnsafe) channel.unsafe()).connectSuccess(); + channel.pipeline().fireChannelActive(); + + // start reading from channel + channel.read(); + } + + @Override + protected void failed0(Throwable exc, AioSocketChannel channel) { + if (exc instanceof AsynchronousCloseException) { + channel.closed = true; + } + ((AsyncUnsafe) channel.unsafe()).connectFailed(exc); + } + } + + @Override + public AioSocketChannelConfig config() { + if (config == null) { + throw new IllegalStateException("Channel not open yet"); + } + return config; + } + + +} diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java new file mode 100755 index 0000000000..8722206054 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java @@ -0,0 +1,238 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.aio; + +import static io.netty.channel.ChannelOption.*; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelOption; +import io.netty.channel.DefaultChannelConfig; +import io.netty.channel.socket.SocketChannelConfig; + +import java.io.IOException; +import java.net.StandardSocketOptions; +import java.nio.channels.NetworkChannel; +import java.util.Map; + +/** + * The default {@link SocketChannelConfig} implementation. + */ +public class AioSocketChannelConfig extends DefaultChannelConfig + implements SocketChannelConfig { + + private final NetworkChannel channel; + + /** + * Creates a new instance. + */ + public AioSocketChannelConfig(NetworkChannel channel) { + if (channel == null) { + throw new NullPointerException("channel"); + } + this.channel = channel; + } + + @Override + public Map, Object> getOptions() { + return getOptions( + super.getOptions(), + SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS); + } + + @SuppressWarnings("unchecked") + @Override + public T getOption(ChannelOption option) { + if (option == SO_RCVBUF) { + return (T) Integer.valueOf(getReceiveBufferSize()); + } + if (option == SO_SNDBUF) { + return (T) Integer.valueOf(getSendBufferSize()); + } + if (option == TCP_NODELAY) { + return (T) Boolean.valueOf(isTcpNoDelay()); + } + if (option == SO_KEEPALIVE) { + return (T) Boolean.valueOf(isKeepAlive()); + } + if (option == SO_REUSEADDR) { + return (T) Boolean.valueOf(isReuseAddress()); + } + if (option == SO_LINGER) { + return (T) Integer.valueOf(getSoLinger()); + } + if (option == IP_TOS) { + return (T) Integer.valueOf(getTrafficClass()); + } + + return super.getOption(option); + } + + @Override + public boolean setOption(ChannelOption option, T value) { + validate(option, value); + + if (option == SO_RCVBUF) { + setReceiveBufferSize((Integer) value); + } else if (option == SO_SNDBUF) { + setSendBufferSize((Integer) value); + } else if (option == TCP_NODELAY) { + setTcpNoDelay((Boolean) value); + } else if (option == SO_KEEPALIVE) { + setKeepAlive((Boolean) value); + } else if (option == SO_REUSEADDR) { + setReuseAddress((Boolean) value); + } else if (option == SO_LINGER) { + setSoLinger((Integer) value); + } else if (option == IP_TOS) { + setTrafficClass((Integer) value); + } else { + return super.setOption(option, value); + } + + return true; + } + + @Override + public int getReceiveBufferSize() { + try { + return (int) channel.getOption(StandardSocketOptions.SO_RCVBUF); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public int getSendBufferSize() { + try { + return channel.getOption(StandardSocketOptions.SO_SNDBUF); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public int getSoLinger() { + try { + return channel.getOption(StandardSocketOptions.SO_LINGER); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public int getTrafficClass() { + try { + return channel.getOption(StandardSocketOptions.IP_TOS); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public boolean isKeepAlive() { + try { + return channel.getOption(StandardSocketOptions.SO_KEEPALIVE); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public boolean isReuseAddress() { + try { + return channel.getOption(StandardSocketOptions.SO_REUSEADDR); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public boolean isTcpNoDelay() { + try { + return channel.getOption(StandardSocketOptions.SO_REUSEADDR); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public void setKeepAlive(boolean keepAlive) { + try { + channel.setOption(StandardSocketOptions.SO_KEEPALIVE, keepAlive); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public void setPerformancePreferences( + int connectionTime, int latency, int bandwidth) { + throw new UnsupportedOperationException(); + } + + @Override + public void setReceiveBufferSize(int receiveBufferSize) { + try { + channel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public void setReuseAddress(boolean reuseAddress) { + try { + channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public void setSendBufferSize(int sendBufferSize) { + try { + channel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public void setSoLinger(int soLinger) { + try { + channel.setOption(StandardSocketOptions.SO_LINGER, soLinger); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public void setTcpNoDelay(boolean tcpNoDelay) { + try { + channel.setOption(StandardSocketOptions.TCP_NODELAY, tcpNoDelay); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public void setTrafficClass(int trafficClass) { + try { + channel.setOption(StandardSocketOptions.IP_TOS, trafficClass); + } catch (IOException e) { + throw new ChannelException(e); + } + } +} diff --git a/transport/src/main/java/io/netty/channel/socket/aio/package-info.java b/transport/src/main/java/io/netty/channel/socket/aio/package-info.java new file mode 100755 index 0000000000..5d32f31449 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/aio/package-info.java @@ -0,0 +1,21 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * NIO2-based socket channel + * API implementation - recommended for a large number of connections (>= 1000). + */ +package io.netty.channel.socket.aio; diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioChildEventLoop.java b/transport/src/main/java/io/netty/channel/socket/nio/NioChildEventLoop.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelConfig.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoop.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/nio/ProtocolFamilyConverter.java b/transport/src/main/java/io/netty/channel/socket/nio/ProtocolFamilyConverter.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/nio/SelectorUtil.java b/transport/src/main/java/io/netty/channel/socket/nio/SelectorUtil.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/nio/package-info.java b/transport/src/main/java/io/netty/channel/socket/nio/package-info.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioChildEventLoop.java b/transport/src/main/java/io/netty/channel/socket/oio/OioChildEventLoop.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java b/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/oio/package-info.java b/transport/src/main/java/io/netty/channel/socket/oio/package-info.java old mode 100644 new mode 100755 diff --git a/transport/src/main/java/io/netty/channel/socket/package-info.java b/transport/src/main/java/io/netty/channel/socket/package-info.java old mode 100644 new mode 100755 diff --git a/transport/src/test/java/io/netty/channel/AsyncTransportTest.java b/transport/src/test/java/io/netty/channel/AsyncTransportTest.java new file mode 100755 index 0000000000..2f6eeb7318 --- /dev/null +++ b/transport/src/test/java/io/netty/channel/AsyncTransportTest.java @@ -0,0 +1,42 @@ +package io.netty.channel; + +import java.net.InetSocketAddress; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundByteHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.aio.AioEventLoop; +import io.netty.channel.socket.aio.AioServerSocketChannel; +import io.netty.channel.socket.aio.AioSocketChannel; + +public class AsyncTransportTest { + + public static void main(String args[]) { + AioEventLoop loop = new AioEventLoop(); + // Configure a test server + ServerBootstrap sb = new ServerBootstrap(); + sb.eventLoop(loop, loop) + .channel(new AioServerSocketChannel()) + .localAddress(new InetSocketAddress(9191)) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(AioSocketChannel ch) throws Exception { + ch.pipeline().addLast(new ChannelInboundByteHandlerAdapter() { + + @Override + public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + ctx.write(in.slice()); + } + }); + } + }); + ChannelFuture future = sb.bind().awaitUninterruptibly(); + if (!future.isSuccess()) { + future.cause().printStackTrace(); + } + future.channel().closeFuture().awaitUninterruptibly(); + } +} diff --git a/transport/src/test/java/io/netty/channel/CompleteChannelFutureTest.java b/transport/src/test/java/io/netty/channel/CompleteChannelFutureTest.java old mode 100644 new mode 100755 diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java old mode 100644 new mode 100755 diff --git a/transport/src/test/java/io/netty/channel/FailedChannelFutureTest.java b/transport/src/test/java/io/netty/channel/FailedChannelFutureTest.java old mode 100644 new mode 100755 diff --git a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java old mode 100644 new mode 100755 diff --git a/transport/src/test/java/io/netty/channel/SucceededChannelFutureTest.java b/transport/src/test/java/io/netty/channel/SucceededChannelFutureTest.java old mode 100644 new mode 100755 diff --git a/transport/src/test/java/io/netty/channel/local/LocalChannelRegistryTest.java b/transport/src/test/java/io/netty/channel/local/LocalChannelRegistryTest.java old mode 100644 new mode 100755 diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java old mode 100644 new mode 100755