Rename package from nio2 -> aio. See #396
This commit is contained in:
parent
ffc6551acc
commit
67be5aeda8
168
transport/src/main/java/io/netty/channel/socket/aio/AbstractAsyncChannel.java
Executable file
168
transport/src/main/java/io/netty/channel/socket/aio/AbstractAsyncChannel.java
Executable file
@ -0,0 +1,168 @@
|
||||
/*
|
||||
* Copyright 2012 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.socket.aio;
|
||||
|
||||
import io.netty.channel.AbstractChannel;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.EventLoop;
|
||||
|
||||
import java.net.ConnectException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.channels.AsynchronousChannel;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public abstract class AbstractAsyncChannel extends AbstractChannel {
|
||||
|
||||
protected volatile AsynchronousChannel ch;
|
||||
|
||||
/**
|
||||
* The future of the current connection attempt. If not null, subsequent
|
||||
* connection attempts will fail.
|
||||
*/
|
||||
protected ChannelFuture connectFuture;
|
||||
protected ScheduledFuture<?> connectTimeoutFuture;
|
||||
private ConnectException connectTimeoutException;
|
||||
|
||||
protected AbstractAsyncChannel(Channel parent, Integer id) {
|
||||
super(parent, id);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public InetSocketAddress localAddress() {
|
||||
if (ch == null) {
|
||||
return null;
|
||||
}
|
||||
return (InetSocketAddress) super.localAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress remoteAddress() {
|
||||
if (ch == null) {
|
||||
return null;
|
||||
}
|
||||
return (InetSocketAddress) super.remoteAddress();
|
||||
}
|
||||
|
||||
protected AsynchronousChannel javaChannel() {
|
||||
return ch;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean isOpen() {
|
||||
return ch == null || ch.isOpen();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doDeregister() throws Exception {
|
||||
// NOOP
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AsyncUnsafe newUnsafe() {
|
||||
return new AsyncUnsafe();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isCompatible(EventLoop loop) {
|
||||
return loop instanceof AsyncChildEventLoop;
|
||||
}
|
||||
|
||||
protected class AsyncUnsafe extends AbstractUnsafe {
|
||||
|
||||
@Override
|
||||
public void connect(final SocketAddress remoteAddress,
|
||||
final SocketAddress localAddress, final ChannelFuture future) {
|
||||
if (eventLoop().inEventLoop()) {
|
||||
if (!ensureOpen(future)) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
if (connectFuture != null) {
|
||||
throw new IllegalStateException("connection attempt already made");
|
||||
}
|
||||
connectFuture = future;
|
||||
|
||||
doConnect(remoteAddress, localAddress, future);
|
||||
|
||||
// Schedule connect timeout.
|
||||
int connectTimeoutMillis = config().getConnectTimeoutMillis();
|
||||
if (connectTimeoutMillis > 0) {
|
||||
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (connectTimeoutException == null) {
|
||||
connectTimeoutException = new ConnectException("connection timed out");
|
||||
}
|
||||
ChannelFuture connectFuture = AbstractAsyncChannel.this.connectFuture;
|
||||
if (connectFuture != null &&
|
||||
connectFuture.setFailure(connectTimeoutException)) {
|
||||
pipeline().fireExceptionCaught(connectTimeoutException);
|
||||
close(voidFuture());
|
||||
}
|
||||
}
|
||||
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
} catch (Throwable t) {
|
||||
future.setFailure(t);
|
||||
pipeline().fireExceptionCaught(t);
|
||||
closeIfClosed();
|
||||
}
|
||||
} else {
|
||||
eventLoop().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
connect(remoteAddress, localAddress, future);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
protected final void connectFailed(Throwable t) {
|
||||
connectFuture.setFailure(t);
|
||||
pipeline().fireExceptionCaught(t);
|
||||
closeIfClosed();
|
||||
}
|
||||
|
||||
protected final void connectSuccess() {
|
||||
assert eventLoop().inEventLoop();
|
||||
assert connectFuture != null;
|
||||
try {
|
||||
boolean wasActive = isActive();
|
||||
connectFuture.setSuccess();
|
||||
if (!wasActive && isActive()) {
|
||||
pipeline().fireChannelActive();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
connectFuture.setFailure(t);
|
||||
pipeline().fireExceptionCaught(t);
|
||||
closeIfClosed();
|
||||
} finally {
|
||||
connectTimeoutFuture.cancel(false);
|
||||
connectFuture = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
protected abstract void doConnect(SocketAddress remoteAddress,
|
||||
SocketAddress localAddress, ChannelFuture connectFuture);
|
||||
|
||||
}
|
51
transport/src/main/java/io/netty/channel/socket/aio/AsyncChildEventLoop.java
Executable file
51
transport/src/main/java/io/netty/channel/socket/aio/AsyncChildEventLoop.java
Executable file
@ -0,0 +1,51 @@
|
||||
/*
|
||||
* Copyright 2012 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.socket.aio;
|
||||
|
||||
import io.netty.channel.SingleThreadEventLoop;
|
||||
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
||||
final class AsyncChildEventLoop extends SingleThreadEventLoop {
|
||||
|
||||
AsyncChildEventLoop(ThreadFactory threadFactory) {
|
||||
super(threadFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void run() {
|
||||
for (;;) {
|
||||
Runnable task;
|
||||
try {
|
||||
task = takeTask();
|
||||
task.run();
|
||||
} catch (InterruptedException e) {
|
||||
// Waken up by interruptThread()
|
||||
}
|
||||
|
||||
if (isShutdown() && peekTask() == null) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void wakeup(boolean inEventLoop) {
|
||||
if (!inEventLoop) {
|
||||
interruptThread();
|
||||
}
|
||||
}
|
||||
}
|
41
transport/src/main/java/io/netty/channel/socket/aio/AsyncEventLoop.java
Executable file
41
transport/src/main/java/io/netty/channel/socket/aio/AsyncEventLoop.java
Executable file
@ -0,0 +1,41 @@
|
||||
/*
|
||||
* Copyright 2012 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.socket.aio;
|
||||
|
||||
import io.netty.channel.EventExecutor;
|
||||
import io.netty.channel.MultithreadEventLoop;
|
||||
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
||||
public class AsyncEventLoop extends MultithreadEventLoop {
|
||||
|
||||
public AsyncEventLoop() {
|
||||
this(0);
|
||||
}
|
||||
|
||||
public AsyncEventLoop(int nThreads) {
|
||||
this(nThreads, null);
|
||||
}
|
||||
|
||||
public AsyncEventLoop(int nThreads, ThreadFactory threadFactory) {
|
||||
super(nThreads, threadFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
|
||||
return new AsyncChildEventLoop(threadFactory);
|
||||
}
|
||||
}
|
@ -0,0 +1,140 @@
|
||||
/*
|
||||
* Copyright 2012 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.socket.aio;
|
||||
|
||||
import io.netty.buffer.ChannelBufType;
|
||||
import io.netty.channel.ChannelException;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ServerChannel;
|
||||
import io.netty.logging.InternalLogger;
|
||||
import io.netty.logging.InternalLoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.channels.AsynchronousChannelGroup;
|
||||
import java.nio.channels.AsynchronousServerSocketChannel;
|
||||
import java.nio.channels.AsynchronousSocketChannel;
|
||||
import java.nio.channels.CompletionHandler;
|
||||
|
||||
public class AsyncServerSocketChannel extends AbstractAsyncChannel implements ServerChannel {
|
||||
|
||||
private static final AcceptHandler ACCEPT_HANDLER = new AcceptHandler();
|
||||
private static final InternalLogger logger =
|
||||
InternalLoggerFactory.getInstance(AsyncServerSocketChannel.class);
|
||||
private volatile AsyncServerSocketChannelConfig config;
|
||||
|
||||
public AsyncServerSocketChannel() {
|
||||
super(null, null);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected AsynchronousServerSocketChannel javaChannel() {
|
||||
return (AsynchronousServerSocketChannel) super.javaChannel();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isActive() {
|
||||
AsynchronousServerSocketChannel channel = javaChannel();
|
||||
try {
|
||||
if (channel != null && channel.getLocalAddress() != null) {
|
||||
return true;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelBufType bufferType() {
|
||||
return ChannelBufType.MESSAGE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SocketAddress localAddress0() {
|
||||
try {
|
||||
return javaChannel().getLocalAddress();
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SocketAddress remoteAddress0() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doBind(SocketAddress localAddress) throws Exception {
|
||||
javaChannel().bind(localAddress);
|
||||
javaChannel().accept(this, ACCEPT_HANDLER);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() throws Exception {
|
||||
javaChannel().close();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isFlushPending() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doConnect(
|
||||
SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) {
|
||||
future.setFailure(new UnsupportedOperationException());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doDisconnect() throws Exception {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Runnable doRegister() throws Exception {
|
||||
ch = AsynchronousServerSocketChannel.open(AsynchronousChannelGroup.withThreadPool(eventLoop()));
|
||||
config = new AsyncServerSocketChannelConfig(javaChannel());
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private static final class AcceptHandler
|
||||
implements CompletionHandler<AsynchronousSocketChannel, AsyncServerSocketChannel> {
|
||||
public void completed(AsynchronousSocketChannel ch, AsyncServerSocketChannel channel) {
|
||||
// register again this handler to accept new connections
|
||||
channel.javaChannel().accept(channel, this);
|
||||
|
||||
// create the socket add it to the buffer and fire the event
|
||||
channel.pipeline().inboundMessageBuffer().add(new AsyncSocketChannel(channel, null, ch));
|
||||
channel.pipeline().fireInboundBufferUpdated();
|
||||
}
|
||||
|
||||
public void failed(Throwable t, AsyncServerSocketChannel channel) {
|
||||
logger.warn("Failed to create a new channel from an accepted socket.", t);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncServerSocketChannelConfig config() {
|
||||
if (config == null) {
|
||||
throw new IllegalStateException("Channel not registered yet");
|
||||
}
|
||||
return config;
|
||||
}
|
||||
}
|
@ -0,0 +1,138 @@
|
||||
/*
|
||||
* Copyright 2012 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.socket.aio;
|
||||
|
||||
import static io.netty.channel.ChannelOption.*;
|
||||
import io.netty.channel.ChannelException;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.DefaultChannelConfig;
|
||||
import io.netty.channel.socket.ServerSocketChannelConfig;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.StandardSocketOptions;
|
||||
import java.nio.channels.AsynchronousServerSocketChannel;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* The Async {@link ServerSocketChannelConfig} implementation.
|
||||
*/
|
||||
public class AsyncServerSocketChannelConfig extends DefaultChannelConfig
|
||||
implements ServerSocketChannelConfig {
|
||||
|
||||
private final AsynchronousServerSocketChannel channel;
|
||||
private volatile int backlog;
|
||||
|
||||
/**
|
||||
* Creates a new instance.
|
||||
*/
|
||||
public AsyncServerSocketChannelConfig(AsynchronousServerSocketChannel channel) {
|
||||
if (channel == null) {
|
||||
throw new NullPointerException("channel");
|
||||
}
|
||||
this.channel = channel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<ChannelOption<?>, Object> getOptions() {
|
||||
return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T getOption(ChannelOption<T> option) {
|
||||
if (option == SO_RCVBUF) {
|
||||
return (T) Integer.valueOf(getReceiveBufferSize());
|
||||
}
|
||||
if (option == SO_REUSEADDR) {
|
||||
return (T) Boolean.valueOf(isReuseAddress());
|
||||
}
|
||||
if (option == SO_BACKLOG) {
|
||||
return (T) Integer.valueOf(getBacklog());
|
||||
}
|
||||
|
||||
return super.getOption(option);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> boolean setOption(ChannelOption<T> option, T value) {
|
||||
validate(option, value);
|
||||
|
||||
if (option == SO_RCVBUF) {
|
||||
setReceiveBufferSize((Integer) value);
|
||||
} else if (option == SO_REUSEADDR) {
|
||||
setReuseAddress((Boolean) value);
|
||||
} else if (option == SO_BACKLOG) {
|
||||
setBacklog((Integer) value);
|
||||
} else {
|
||||
return super.setOption(option, value);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReuseAddress() {
|
||||
try {
|
||||
return channel.getOption(StandardSocketOptions.SO_REUSEADDR);
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setReuseAddress(boolean reuseAddress) {
|
||||
try {
|
||||
channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress);
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getReceiveBufferSize() {
|
||||
try {
|
||||
return channel.getOption(StandardSocketOptions.SO_RCVBUF);
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setReceiveBufferSize(int receiveBufferSize) {
|
||||
try {
|
||||
channel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getBacklog() {
|
||||
return backlog;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setBacklog(int backlog) {
|
||||
if (backlog < 0) {
|
||||
throw new IllegalArgumentException("backlog: " + backlog);
|
||||
}
|
||||
this.backlog = backlog;
|
||||
}
|
||||
}
|
318
transport/src/main/java/io/netty/channel/socket/aio/AsyncSocketChannel.java
Executable file
318
transport/src/main/java/io/netty/channel/socket/aio/AsyncSocketChannel.java
Executable file
@ -0,0 +1,318 @@
|
||||
/*
|
||||
* Copyright 2012 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.socket.aio;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ChannelBufType;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelStateHandler;
|
||||
import io.netty.channel.ChannelStateHandlerAdapter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.AsynchronousChannelGroup;
|
||||
import java.nio.channels.AsynchronousSocketChannel;
|
||||
import java.nio.channels.CompletionHandler;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class AsyncSocketChannel extends AbstractAsyncChannel {
|
||||
|
||||
private static final CompletionHandler<Void, AsyncSocketChannel> CONNECT_HANDLER = new ConnectHandler();
|
||||
private static final CompletionHandler<Integer, AsyncSocketChannel> READ_HANDLER = new ReadHandler();
|
||||
private static final CompletionHandler<Integer, AsyncSocketChannel> WRITE_HANDLER = new WriteHandler();
|
||||
private static final ChannelStateHandler READ_START_HANDLER = new ChannelStateHandlerAdapter() {
|
||||
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
try {
|
||||
super.channelActive(ctx);
|
||||
|
||||
// once the channel is active, the first read is scheduled
|
||||
AsyncSocketChannel.read((AsyncSocketChannel)ctx.channel());
|
||||
|
||||
} finally {
|
||||
ctx.pipeline().remove(this);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
};
|
||||
private final AtomicBoolean flushing = new AtomicBoolean(false);
|
||||
private volatile AsyncSocketChannelConfig config;
|
||||
|
||||
public AsyncSocketChannel() {
|
||||
this(null, null, null);
|
||||
}
|
||||
|
||||
public AsyncSocketChannel(AsyncServerSocketChannel parent, Integer id, AsynchronousSocketChannel channel) {
|
||||
super(parent, id);
|
||||
this.ch = channel;
|
||||
if (ch != null) {
|
||||
config = new AsyncSocketChannelConfig(javaChannel());
|
||||
pipeline().addLast(READ_START_HANDLER);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isActive() {
|
||||
AsynchronousSocketChannel ch = javaChannel();
|
||||
return ch.isOpen() && remoteAddress() != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AsynchronousSocketChannel javaChannel() {
|
||||
return (AsynchronousSocketChannel) super.javaChannel();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelBufType bufferType() {
|
||||
return ChannelBufType.BYTE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress, final ChannelFuture future) {
|
||||
assert ch != null;
|
||||
if (localAddress != null) {
|
||||
try {
|
||||
javaChannel().bind(localAddress);
|
||||
} catch (IOException e) {
|
||||
future.setFailure(e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
javaChannel().connect(remoteAddress, this, CONNECT_HANDLER);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InetSocketAddress localAddress0() {
|
||||
try {
|
||||
return (InetSocketAddress) javaChannel().getLocalAddress();
|
||||
} catch (IOException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InetSocketAddress remoteAddress0() {
|
||||
try {
|
||||
return (InetSocketAddress) javaChannel().getRemoteAddress();
|
||||
} catch (IOException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Runnable doRegister() throws Exception {
|
||||
if (ch == null) {
|
||||
ch = AsynchronousSocketChannel.open(AsynchronousChannelGroup.withThreadPool(eventLoop()));
|
||||
config = new AsyncSocketChannelConfig(javaChannel());
|
||||
pipeline().addLast(READ_START_HANDLER);
|
||||
}
|
||||
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Trigger a read from the {@link AsyncSocketChannel}
|
||||
*
|
||||
*/
|
||||
private static void read(AsyncSocketChannel channel) {
|
||||
ByteBuf byteBuf = channel.pipeline().inboundByteBuffer();
|
||||
expandReadBuffer(byteBuf);
|
||||
|
||||
// Get a ByteBuffer view on the ByteBuf and clear it before try to read
|
||||
ByteBuffer buffer = byteBuf.nioBuffer();
|
||||
buffer.clear();
|
||||
channel.javaChannel().read(buffer, channel, READ_HANDLER);
|
||||
}
|
||||
|
||||
|
||||
private static boolean expandReadBuffer(ByteBuf byteBuf) {
|
||||
if (!byteBuf.writable()) {
|
||||
// FIXME: Magic number
|
||||
byteBuf.ensureWritableBytes(4096);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doBind(SocketAddress localAddress) throws Exception {
|
||||
javaChannel().bind(localAddress);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doDisconnect() throws Exception {
|
||||
doClose();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() throws Exception {
|
||||
javaChannel().close();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isFlushPending() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception {
|
||||
// Only one pending write can be scheduled at one time. Otherwise
|
||||
// a PendingWriteException will be thrown. So use CAS to not run
|
||||
// into this
|
||||
if (flushing.compareAndSet(false, true)) {
|
||||
ByteBuffer buffer = (ByteBuffer)buf.nioBuffer();
|
||||
javaChannel().write(buffer, this, WRITE_HANDLER);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
private static final class WriteHandler implements CompletionHandler<Integer, AsyncSocketChannel> {
|
||||
|
||||
@Override
|
||||
public void completed(Integer result, AsyncSocketChannel channel) {
|
||||
ByteBuf buf = channel.pipeline().outboundByteBuffer();
|
||||
|
||||
if (result > 0) {
|
||||
if (result < buf.readableBytes()) {
|
||||
// Update the readerIndex with the amount of read bytes
|
||||
buf.readerIndex(buf.readerIndex() + result);
|
||||
} else {
|
||||
// not enough space in the buffer anymore so discard everything that
|
||||
// was read already
|
||||
buf.discardReadBytes();
|
||||
|
||||
}
|
||||
channel.notifyFlushFutures();
|
||||
}
|
||||
|
||||
// Allow to have the next write pending
|
||||
channel.flushing.set(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable cause, AsyncSocketChannel channel) {
|
||||
ByteBuf buf = channel.pipeline().outboundByteBuffer();
|
||||
if (!buf.readable()) {
|
||||
buf.discardReadBytes();
|
||||
}
|
||||
|
||||
channel.notifyFlushFutures(cause);
|
||||
channel.pipeline().fireExceptionCaught(cause);
|
||||
if (cause instanceof IOException) {
|
||||
channel.close(channel.unsafe().voidFuture());
|
||||
}
|
||||
// Allow to have the next write pending
|
||||
channel.flushing.set(false);
|
||||
}
|
||||
}
|
||||
|
||||
private static final class ReadHandler implements CompletionHandler<Integer, AsyncSocketChannel> {
|
||||
|
||||
@Override
|
||||
public void completed(Integer result, AsyncSocketChannel channel) {
|
||||
assert channel.eventLoop().inEventLoop();
|
||||
|
||||
final ChannelPipeline pipeline = channel.pipeline();
|
||||
boolean closed = false;
|
||||
boolean read = false;
|
||||
try {
|
||||
|
||||
int localReadAmount = result.intValue();
|
||||
if (localReadAmount > 0) {
|
||||
//Set the writerIndex of the buffer correctly to the
|
||||
// current writerIndex + read amount of bytes.
|
||||
//
|
||||
// This is needed as the ByteBuffer and the ByteBuf does not share
|
||||
// each others index
|
||||
final ByteBuf byteBuf = pipeline.inboundByteBuffer();
|
||||
byteBuf.writerIndex(byteBuf.writerIndex() + result);
|
||||
|
||||
read = true;
|
||||
|
||||
} else if (localReadAmount < 0) {
|
||||
closed = true;
|
||||
}
|
||||
|
||||
} catch (Throwable t) {
|
||||
if (read) {
|
||||
read = false;
|
||||
pipeline.fireInboundBufferUpdated();
|
||||
}
|
||||
pipeline.fireExceptionCaught(t);
|
||||
if (t instanceof IOException) {
|
||||
channel.close(channel.unsafe().voidFuture());
|
||||
}
|
||||
} finally {
|
||||
if (read) {
|
||||
pipeline.fireInboundBufferUpdated();
|
||||
}
|
||||
if (closed && channel.isOpen()) {
|
||||
channel.close(channel.unsafe().voidFuture());
|
||||
} else {
|
||||
// start the next read
|
||||
AsyncSocketChannel.read(channel);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable t, AsyncSocketChannel channel) {
|
||||
channel.pipeline().fireExceptionCaught(t);
|
||||
if (t instanceof IOException) {
|
||||
channel.close(channel.unsafe().voidFuture());
|
||||
} else {
|
||||
// start the next read
|
||||
AsyncSocketChannel.read(channel);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static final class ConnectHandler implements CompletionHandler<Void, AsyncSocketChannel> {
|
||||
|
||||
@Override
|
||||
public void completed(Void result, AsyncSocketChannel channel) {
|
||||
((AsyncUnsafe) channel.unsafe()).connectSuccess();
|
||||
|
||||
// start reading from channel
|
||||
AsyncSocketChannel.read(channel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable exc, AsyncSocketChannel channel) {
|
||||
((AsyncUnsafe) channel.unsafe()).connectFailed(exc);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncSocketChannelConfig config() {
|
||||
if (config == null) {
|
||||
throw new IllegalStateException("Channel not open yet");
|
||||
}
|
||||
return config;
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -0,0 +1,237 @@
|
||||
/*
|
||||
* Copyright 2012 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.socket.aio;
|
||||
|
||||
import static io.netty.channel.ChannelOption.*;
|
||||
import io.netty.channel.ChannelException;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.DefaultChannelConfig;
|
||||
import io.netty.channel.socket.SocketChannelConfig;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.StandardSocketOptions;
|
||||
import java.nio.channels.NetworkChannel;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* The default {@link SocketChannelConfig} implementation.
|
||||
*/
|
||||
public class AsyncSocketChannelConfig extends DefaultChannelConfig
|
||||
implements SocketChannelConfig {
|
||||
|
||||
private final NetworkChannel channel;
|
||||
|
||||
/**
|
||||
* Creates a new instance.
|
||||
*/
|
||||
public AsyncSocketChannelConfig(NetworkChannel channel) {
|
||||
if (channel == null) {
|
||||
throw new NullPointerException("channel");
|
||||
}
|
||||
this.channel = channel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<ChannelOption<?>, Object> getOptions() {
|
||||
return getOptions(
|
||||
super.getOptions(),
|
||||
SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T getOption(ChannelOption<T> option) {
|
||||
if (option == SO_RCVBUF) {
|
||||
return (T) Integer.valueOf(getReceiveBufferSize());
|
||||
}
|
||||
if (option == SO_SNDBUF) {
|
||||
return (T) Integer.valueOf(getSendBufferSize());
|
||||
}
|
||||
if (option == TCP_NODELAY) {
|
||||
return (T) Boolean.valueOf(isTcpNoDelay());
|
||||
}
|
||||
if (option == SO_KEEPALIVE) {
|
||||
return (T) Boolean.valueOf(isKeepAlive());
|
||||
}
|
||||
if (option == SO_REUSEADDR) {
|
||||
return (T) Boolean.valueOf(isReuseAddress());
|
||||
}
|
||||
if (option == SO_LINGER) {
|
||||
return (T) Integer.valueOf(getSoLinger());
|
||||
}
|
||||
if (option == IP_TOS) {
|
||||
return (T) Integer.valueOf(getTrafficClass());
|
||||
}
|
||||
|
||||
return super.getOption(option);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> boolean setOption(ChannelOption<T> option, T value) {
|
||||
validate(option, value);
|
||||
|
||||
if (option == SO_RCVBUF) {
|
||||
setReceiveBufferSize((Integer) value);
|
||||
} else if (option == SO_SNDBUF) {
|
||||
setSendBufferSize((Integer) value);
|
||||
} else if (option == TCP_NODELAY) {
|
||||
setTcpNoDelay((Boolean) value);
|
||||
} else if (option == SO_KEEPALIVE) {
|
||||
setKeepAlive((Boolean) value);
|
||||
} else if (option == SO_REUSEADDR) {
|
||||
setReuseAddress((Boolean) value);
|
||||
} else if (option == SO_LINGER) {
|
||||
setSoLinger((Integer) value);
|
||||
} else if (option == IP_TOS) {
|
||||
setTrafficClass((Integer) value);
|
||||
} else {
|
||||
return super.setOption(option, value);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getReceiveBufferSize() {
|
||||
try {
|
||||
return (int) channel.getOption(StandardSocketOptions.SO_RCVBUF);
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getSendBufferSize() {
|
||||
try {
|
||||
return channel.getOption(StandardSocketOptions.SO_SNDBUF);
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getSoLinger() {
|
||||
try {
|
||||
return channel.getOption(StandardSocketOptions.SO_LINGER);
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTrafficClass() {
|
||||
try {
|
||||
return channel.getOption(StandardSocketOptions.IP_TOS);
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isKeepAlive() {
|
||||
try {
|
||||
return channel.getOption(StandardSocketOptions.SO_KEEPALIVE);
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReuseAddress() {
|
||||
try {
|
||||
return channel.getOption(StandardSocketOptions.SO_REUSEADDR);
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTcpNoDelay() {
|
||||
try {
|
||||
return channel.getOption(StandardSocketOptions.SO_REUSEADDR);
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setKeepAlive(boolean keepAlive) {
|
||||
try {
|
||||
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, keepAlive);
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPerformancePreferences(
|
||||
int connectionTime, int latency, int bandwidth) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setReceiveBufferSize(int receiveBufferSize) {
|
||||
try {
|
||||
channel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setReuseAddress(boolean reuseAddress) {
|
||||
try {
|
||||
channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress);
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setSendBufferSize(int sendBufferSize) {
|
||||
try {
|
||||
channel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setSoLinger(int soLinger) {
|
||||
try {
|
||||
channel.setOption(StandardSocketOptions.SO_LINGER, soLinger);
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTcpNoDelay(boolean tcpNoDelay) {
|
||||
try {
|
||||
channel.setOption(StandardSocketOptions.TCP_NODELAY, tcpNoDelay);
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTrafficClass(int trafficClass) {
|
||||
try {
|
||||
channel.setOption(StandardSocketOptions.IP_TOS, trafficClass);
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
}
|
21
transport/src/main/java/io/netty/channel/socket/aio/package-info.java
Executable file
21
transport/src/main/java/io/netty/channel/socket/aio/package-info.java
Executable file
@ -0,0 +1,21 @@
|
||||
/*
|
||||
* Copyright 2012 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* <a href="http://en.wikipedia.org/wiki/New_I/O">NIO2</a>-based socket channel
|
||||
* API implementation - recommended for a large number of connections (>= 1000).
|
||||
*/
|
||||
package io.netty.channel.socket.aio;
|
@ -8,9 +8,9 @@ import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundByteHandlerAdapter;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.socket.nio2.AsyncEventLoop;
|
||||
import io.netty.channel.socket.nio2.AsyncServerSocketChannel;
|
||||
import io.netty.channel.socket.nio2.AsyncSocketChannel;
|
||||
import io.netty.channel.socket.aio.AsyncEventLoop;
|
||||
import io.netty.channel.socket.aio.AsyncServerSocketChannel;
|
||||
import io.netty.channel.socket.aio.AsyncSocketChannel;
|
||||
|
||||
public class AsyncTransportTest {
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user