Remove the AIO transport as NIO is just faster

The AIO transport was added in the past as we hoped it would have better latency as the NIO transport. But in reality this was never the case.
So there is no reason to use the AIO transport at all. It just put more burden on us as we need to also support it and fix bugs.
Because of this we dedicided to remove it for now. It will stay in the master_with_aio_transport branch so we can pick it up later again if it is ever needed.
This commit is contained in:
Norman Maurer 2013-06-10 11:26:04 +02:00
parent 65e4161e63
commit e9c6406819
12 changed files with 0 additions and 2281 deletions

View File

@ -20,12 +20,9 @@ import io.netty.bootstrap.ChannelFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.aio.AioEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.aio.AioServerSocketChannel;
import io.netty.channel.socket.aio.AioSocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
@ -33,28 +30,18 @@ import io.netty.channel.socket.oio.OioDatagramChannel;
import io.netty.channel.socket.oio.OioServerSocketChannel;
import io.netty.channel.socket.oio.OioSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.internal.PlatformDependent;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
final class SocketTestPermutation {
// TODO: AIO transport tests fail with 'An existing connection was forcibly closed by the remote host' on Windows.
// Disabling test until the root cause is known.
private static final boolean TEST_AIO = !PlatformDependent.isWindows();
private static final int BOSSES = 2;
private static final int WORKERS = 3;
private static final EventLoopGroup nioBossGroup =
new NioEventLoopGroup(BOSSES, new DefaultThreadFactory("testsuite-nio-boss", true));
private static final EventLoopGroup nioWorkerGroup =
new NioEventLoopGroup(WORKERS, new DefaultThreadFactory("testsuite-nio-worker", true));
private static final EventLoopGroup aioBossGroup =
new AioEventLoopGroup(BOSSES, new DefaultThreadFactory("testsuite-aio-boss", true));
private static final EventLoopGroup aioWorkerGroup =
new AioEventLoopGroup(WORKERS, new DefaultThreadFactory("testsuite-aio-worker", true));
private static final EventLoopGroup oioBossGroup =
new OioEventLoopGroup(Integer.MAX_VALUE, new DefaultThreadFactory("testsuite-oio-boss", true));
private static final EventLoopGroup oioWorkerGroup =
@ -168,15 +155,6 @@ final class SocketTestPermutation {
.channel(NioServerSocketChannel.class);
}
});
if (TEST_AIO) {
list.add(new Factory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
return new ServerBootstrap().group(aioBossGroup, aioWorkerGroup)
.channel(AioServerSocketChannel.class);
}
});
}
list.add(new Factory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
@ -196,14 +174,6 @@ final class SocketTestPermutation {
return new Bootstrap().group(nioWorkerGroup).channel(NioSocketChannel.class);
}
});
if (TEST_AIO) {
list.add(new Factory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(aioWorkerGroup).channel(AioSocketChannel.class);
}
});
}
list.add(new Factory<Bootstrap>() {
@Override
public Bootstrap newInstance() {

View File

@ -1,168 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.aio;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoop;
import java.net.ConnectException;
import java.net.SocketAddress;
import java.nio.channels.AsynchronousChannel;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* Abstract base class for {@link Channel} implementations that use the new {@link AsynchronousChannel} which is part
* of NIO.2.
*/
public abstract class AbstractAioChannel extends AbstractChannel {
protected volatile AsynchronousChannel ch;
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
protected ChannelPromise connectPromise;
protected ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;
/**
* Creates a new instance.
*
* @param id
* the unique non-negative integer ID of this channel.
* Specify {@code null} to auto-generate a unique negative integer
* ID.
* @param parent
* the parent of this channel. {@code null} if there's no parent.
* @param ch
* the {@link AsynchronousChannel} which will handle the IO or {@code null} if not created yet.
*/
protected AbstractAioChannel(Channel parent, Integer id, AsynchronousChannel ch) {
super(parent, id);
this.ch = ch;
}
/**
* Return the underlying {@link AsynchronousChannel}. Be aware this should only be called after it was set as
* otherwise it will throw an {@link IllegalStateException}.
*/
protected AsynchronousChannel javaChannel() {
if (ch == null) {
throw new IllegalStateException("Try to access Channel before eventLoop was registered");
}
return ch;
}
@Override
public boolean isOpen() {
if (ch == null) {
return true;
}
return ch.isOpen();
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof AioEventLoop;
}
@Override
protected AbstractUnsafe newUnsafe() {
return new DefaultAioUnsafe();
}
protected final class DefaultAioUnsafe extends AbstractUnsafe {
@Override
public void connect(final SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
if (!ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
throw new IllegalStateException("connection attempt already made");
}
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
doConnect(remoteAddress, localAddress, promise);
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectFuture = connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectFuture != null && connectFuture.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
} catch (Throwable t) {
if (t instanceof ConnectException) {
Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
newT.setStackTrace(t.getStackTrace());
t = newT;
}
promise.setFailure(t);
closeIfClosed();
}
}
public void connectFailed(Throwable t) {
if (t instanceof ConnectException) {
Throwable newT = new ConnectException(t.getMessage() + ": " + requestedRemoteAddress);
newT.setStackTrace(t.getStackTrace());
t = newT;
}
connectPromise.setFailure(t);
closeIfClosed();
}
public void connectSuccess() {
assert eventLoop().inEventLoop();
assert connectPromise != null;
try {
connectPromise.setSuccess();
pipeline().fireChannelActive();
} catch (Throwable t) {
connectPromise.setFailure(t);
closeIfClosed();
} finally {
connectTimeoutFuture.cancel(false);
connectPromise = null;
}
}
}
/**
* Connect to the remote peer using the given localAddress if one is specified or {@code null} otherwise.
*/
protected abstract void doConnect(SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise connectPromise);
}

View File

@ -1,120 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.aio;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import java.nio.channels.CompletionHandler;
/**
* Special {@link CompletionHandler} which makes sure that the callback methods gets executed in the {@link EventLoop}
*/
public abstract class AioCompletionHandler<C extends Channel, V, A> implements CompletionHandler<V, A> {
private final C channel;
protected AioCompletionHandler(C channel) {
this.channel = channel;
}
/**
* See {@link CompletionHandler#completed(Object, Object)}
*/
protected abstract void completed0(C channel, V result, A attachment);
/**
* Set {@link CompletionHandler#failed(Throwable, Object)}
*/
protected abstract void failed0(C channel, Throwable exc, A attachment);
// According to JDK AIO documentation, the ExecutorService a user specified must not call the Runnable given by
// JDK AIO implementation directly. However, we violates that rull by calling Runnable.run() directly for
// optimization purposes, and it can result in infinite recursion in combination with the fact that the JDK AIO
// implementation often makes recursive invocations. Therefore, we must make sure we don't go too deep in the
// stack.
private static final int MAX_STACK_DEPTH = 8;
private static final ThreadLocal<Integer> STACK_DEPTH = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 0;
}
};
@Override
public final void completed(final V result, final A attachment) {
EventLoop loop = channel.eventLoop();
if (loop.inEventLoop()) {
Integer d = STACK_DEPTH.get();
if (d < MAX_STACK_DEPTH) {
STACK_DEPTH.set(d + 1);
try {
completed0(channel, result, attachment);
} finally {
STACK_DEPTH.set(d);
}
} else {
// schedule it with a special runnable to make sure we keep the right
// order and exist the recursive call to prevent stackoverflow
loop.execute(new AioEventLoop.RecursionBreakingRunnable() {
@Override
public void run() {
completed0(channel, result, attachment);
}
});
}
} else {
loop.execute(new Runnable() {
@Override
public void run() {
completed0(channel, result, attachment);
}
});
}
}
@Override
public final void failed(final Throwable exc, final A attachment) {
EventLoop loop = channel.eventLoop();
if (loop.inEventLoop()) {
Integer d = STACK_DEPTH.get();
if (d < MAX_STACK_DEPTH) {
STACK_DEPTH.set(d + 1);
try {
failed0(channel, exc, attachment);
} finally {
STACK_DEPTH.set(d);
}
} else {
// schedule it with a special runnable to make sure we keep the right
// order and exist the recursive call to prevent stackoverflow
loop.execute(new AioEventLoop.RecursionBreakingRunnable() {
@Override
public void run() {
failed0(channel, exc, attachment);
}
});
}
} else {
loop.execute(new Runnable() {
@Override
public void run() {
failed0(channel, exc, attachment);
}
});
}
}
}

View File

@ -1,137 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.aio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SingleThreadEventLoop;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
/**
* {@link SingleThreadEventLoop} implementations which will handle AIO {@link Channel}s.
*/
final class AioEventLoop extends SingleThreadEventLoop {
private final Set<Channel> channels = Collections.newSetFromMap(new IdentityHashMap<Channel, Boolean>());
private LinkedBlockingDeque<Runnable> taskQueue;
private final ChannelFutureListener registrationListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
return;
}
Channel ch = future.channel();
channels.add(ch);
ch.closeFuture().addListener(deregistrationListener);
}
};
private final ChannelFutureListener deregistrationListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
channels.remove(future.channel());
}
};
AioEventLoop(AioEventLoopGroup parent, ThreadFactory threadFactory) {
super(parent, threadFactory, true);
}
@Override
public ChannelFuture register(Channel channel) {
return super.register(channel).addListener(registrationListener);
}
@Override
public ChannelFuture register(Channel channel, ChannelPromise future) {
return super.register(channel, future).addListener(registrationListener);
}
@Override
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
}
}
private void closeAll() {
Collection<Channel> channels = new ArrayList<Channel>(this.channels.size());
for (Channel ch: this.channels) {
channels.add(ch);
}
for (Channel ch: channels) {
ch.unsafe().close(ch.unsafe().voidPromise());
}
}
@Override
protected Queue<Runnable> newTaskQueue() {
// use a Deque as we need to be able to also add tasks on the first position.
taskQueue = new LinkedBlockingDeque<Runnable>();
return taskQueue;
}
@Override
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (task instanceof RecursionBreakingRunnable) {
if (isTerminated()) {
reject();
}
// put the task at the first postion of the queue as we just schedule it to
// break the recursive operation
taskQueue.addFirst(task);
} else {
super.addTask(task);
}
}
/**
* Special Runnable which is used by {@link AioCompletionHandler} to break a recursive call and so prevent
* from StackOverFlowError. When a task is executed that implement it needs to put on the first position of
* the queue to guaranteer execution order and break the recursive call.
*/
interface RecursionBreakingRunnable extends Runnable {
// Marker interface
}
}

View File

@ -1,150 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.aio;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopException;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import java.io.IOException;
import java.nio.channels.AsynchronousChannelGroup;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* {@link AioEventLoopGroup} implementation which will handle AIO {@link Channel} implementations.
*
*/
public class AioEventLoopGroup extends MultithreadEventLoopGroup {
private final AioExecutorService groupExecutor = new AioExecutorService();
private final AsynchronousChannelGroup group;
public AsynchronousChannelGroup channelGroup() {
return group;
}
/**
* Create a new instance which use the default number of threads of {@link #DEFAULT_EVENT_LOOP_THREADS}.
*/
public AioEventLoopGroup() {
this(DEFAULT_EVENT_LOOP_THREADS);
}
/**
* Create a new instance
*
* @param nThreads the number of threads that will be used by this instance
*/
public AioEventLoopGroup(int nThreads) {
this(nThreads, null);
}
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance
* @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used.
*/
public AioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
try {
group = AsynchronousChannelGroup.withThreadPool(groupExecutor);
} catch (IOException e) {
throw new EventLoopException("Failed to create an AsynchronousChannelGroup", e);
}
}
@Override
@Deprecated
@SuppressWarnings("deprecation")
public void shutdown() {
boolean interrupted = false;
// Tell JDK not to accept any more registration request. Note that the threads are not really shut down yet.
try {
group.shutdownNow();
} catch (IOException e) {
throw new EventLoopException("failed to shut down a channel group", e);
}
// Wait until JDK propagates the shutdown request on AsynchronousChannelGroup to the ExecutorService.
// JDK will probably submit some final tasks to the ExecutorService before shutting down the ExecutorService,
// so we have to ensure all tasks submitted by JDK are executed before calling super.shutdown() to really
// shut down event loop threads.
while (!groupExecutor.isTerminated()) {
try {
groupExecutor.awaitTermination(1, TimeUnit.HOURS);
} catch (InterruptedException e) {
interrupted = true;
}
}
// Close all connections and shut down event loop threads.
super.shutdown();
if (interrupted) {
Thread.currentThread().interrupt();
}
}
@Override
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
return new AioEventLoop(this, threadFactory);
}
private static final class AioExecutorService extends AbstractExecutorService {
// It does not shut down the underlying EventExecutor - it merely pretends to be shut down.
// The actual shut down is done by EventLoopGroup and EventLoop implementation.
private final CountDownLatch latch = new CountDownLatch(1);
@Override
public void shutdown() {
latch.countDown();
}
@Override
public List<Runnable> shutdownNow() {
shutdown();
return Collections.emptyList();
}
@Override
public boolean isShutdown() {
return latch.getCount() == 0;
}
@Override
public boolean isTerminated() {
return isShutdown();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return latch.await(timeout, unit);
}
@Override
public void execute(Runnable command) {
command.run();
}
}
}

View File

@ -1,23 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* <a href="http://en.wikipedia.org/wiki/New_I/O">NIO2</a>-based channel
* API implementation - recommended for a large number of connections (&gt;= 1000).
*
* NIO2 is only supported on Java 7+.
*/
package io.netty.channel.aio;

View File

@ -1,224 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.aio;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.MessageList;
import io.netty.channel.aio.AbstractAioChannel;
import io.netty.channel.aio.AioCompletionHandler;
import io.netty.channel.aio.AioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.ServerSocketChannelConfig;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
/**
* {@link ServerSocketChannel} implementation which uses NIO2.
*
* NIO2 is only supported on Java 7+.
*/
public class AioServerSocketChannel extends AbstractAioChannel implements ServerSocketChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private final CompletionHandler<AsynchronousSocketChannel, Void> acceptHandler = new AcceptHandler(this);
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(AioServerSocketChannel.class);
private final AioServerSocketChannelConfig config;
private boolean acceptInProgress;
private boolean closed;
private static AsynchronousServerSocketChannel newSocket(AsynchronousChannelGroup group) {
try {
return AsynchronousServerSocketChannel.open(group);
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
/**
* Create a new instance which has not yet attached an {@link AsynchronousServerSocketChannel}. The
* {@link AsynchronousServerSocketChannel} will be attached after it was this instance was registered to an
* {@link EventLoop}.
*/
public AioServerSocketChannel() {
super(null, null, null);
config = new AioServerSocketChannelConfig(this);
}
/**
* Create a new instance from the given {@link AsynchronousServerSocketChannel}.
*
* @param channel the {@link AsynchronousServerSocketChannel} which is used by this instance
*/
public AioServerSocketChannel(AsynchronousServerSocketChannel channel) {
super(null, null, channel);
config = new AioServerSocketChannelConfig(this, channel);
}
@Override
public InetSocketAddress localAddress() {
return (InetSocketAddress) super.localAddress();
}
@Override
public InetSocketAddress remoteAddress() {
return (InetSocketAddress) super.remoteAddress();
}
@Override
protected AsynchronousServerSocketChannel javaChannel() {
return (AsynchronousServerSocketChannel) super.javaChannel();
}
@Override
public boolean isActive() {
return ch != null && javaChannel().isOpen() && localAddress0() != null;
}
@Override
public ChannelMetadata metadata() {
return METADATA;
}
@Override
protected SocketAddress localAddress0() {
if (ch == null) {
return null;
}
try {
return javaChannel().getLocalAddress();
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
protected SocketAddress remoteAddress0() {
return null;
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
AsynchronousServerSocketChannel ch = javaChannel();
ch.bind(localAddress, config.getBacklog());
}
@Override
protected void doBeginRead() {
if (acceptInProgress) {
return;
}
acceptInProgress = true;
javaChannel().accept(null, acceptHandler);
}
@Override
protected void doClose() throws Exception {
if (!closed) {
closed = true;
javaChannel().close();
}
}
@Override
protected boolean isFlushPending() {
return false;
}
@Override
protected void doConnect(
SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
promise.setFailure(new UnsupportedOperationException());
}
@Override
protected void doDisconnect() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected int doWrite(MessageList<Object> msgs, int index) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected Runnable doRegister() throws Exception {
Runnable task = super.doRegister();
if (ch == null) {
AsynchronousServerSocketChannel channel =
newSocket(((AioEventLoopGroup) eventLoop().parent()).channelGroup());
ch = channel;
config.assign(channel);
}
return task;
}
private static final class AcceptHandler
extends AioCompletionHandler<AioServerSocketChannel, AsynchronousSocketChannel, Void> {
AcceptHandler(AioServerSocketChannel channel) {
super(channel);
}
@Override
protected void completed0(AioServerSocketChannel channel, AsynchronousSocketChannel ch, Void attachment) {
channel.acceptInProgress = false;
ChannelPipeline pipeline = channel.pipeline();
// Create a new Netty channel from a JDK channel and trigger events.
pipeline.fireMessageReceived(new AioSocketChannel(channel, null, ch));
pipeline.fireChannelReadSuspended();
}
@Override
protected void failed0(AioServerSocketChannel channel, Throwable t, Void attachment) {
channel.acceptInProgress = false;
boolean asyncClosed = false;
if (t instanceof AsynchronousCloseException) {
asyncClosed = true;
channel.closed = true;
}
// check if the exception was thrown because the channel was closed before
// log something
if (channel.isOpen() && ! asyncClosed) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
channel.pipeline().fireExceptionCaught(t);
}
}
}
@Override
public ServerSocketChannelConfig config() {
return config;
}
}

View File

@ -1,245 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.aio;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ServerSocketChannelConfig;
import io.netty.util.NetUtil;
import io.netty.util.internal.PlatformDependent;
import java.io.IOException;
import java.net.SocketOption;
import java.net.StandardSocketOptions;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import static io.netty.channel.ChannelOption.*;
/**
* The Async {@link ServerSocketChannelConfig} implementation.
*/
final class AioServerSocketChannelConfig extends DefaultChannelConfig implements ServerSocketChannelConfig {
private final AtomicReference<AsynchronousServerSocketChannel> javaChannel
= new AtomicReference<AsynchronousServerSocketChannel>();
private volatile int backlog = NetUtil.SOMAXCONN;
private Map<SocketOption<?>, Object> options = PlatformDependent.newConcurrentHashMap();
private static final int DEFAULT_SND_BUF_SIZE = 32 * 1024;
private static final boolean DEFAULT_SO_REUSEADDR = false;
/**
* Creates a new instance with no {@link AsynchronousServerSocketChannel} assigned to it.
*
* You should call {@link #assign(AsynchronousServerSocketChannel)} to assign a
* {@link AsynchronousServerSocketChannel} to it and have the configuration set on it.
*/
AioServerSocketChannelConfig(AioServerSocketChannel channel) {
super(channel);
}
/**
* Creates a new instance with the given {@link AsynchronousServerSocketChannel} assigned to it.
*/
AioServerSocketChannelConfig(AioServerSocketChannel channel, AsynchronousServerSocketChannel javaChannel) {
super(channel);
this.javaChannel.set(javaChannel);
}
@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG);
}
@SuppressWarnings("unchecked")
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == SO_RCVBUF) {
return (T) Integer.valueOf(getReceiveBufferSize());
}
if (option == SO_REUSEADDR) {
return (T) Boolean.valueOf(isReuseAddress());
}
if (option == SO_BACKLOG) {
return (T) Integer.valueOf(getBacklog());
}
return super.getOption(option);
}
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
if (option == SO_RCVBUF) {
setReceiveBufferSize((Integer) value);
} else if (option == SO_REUSEADDR) {
setReuseAddress((Boolean) value);
} else if (option == SO_BACKLOG) {
setBacklog((Integer) value);
} else {
return super.setOption(option, value);
}
return true;
}
@Override
public boolean isReuseAddress() {
return (Boolean) getOption(StandardSocketOptions.SO_REUSEADDR, DEFAULT_SO_REUSEADDR);
}
@Override
public AioServerSocketChannelConfig setReuseAddress(boolean reuseAddress) {
setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress);
return this;
}
@Override
public int getReceiveBufferSize() {
return (Integer) getOption(StandardSocketOptions.SO_RCVBUF, DEFAULT_SND_BUF_SIZE);
}
@Override
public AioServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) {
setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
return this;
}
@Override
public AioServerSocketChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth) {
throw new UnsupportedOperationException();
}
@Override
public int getBacklog() {
return backlog;
}
@Override
public AioServerSocketChannelConfig setBacklog(int backlog) {
if (backlog < 0) {
throw new IllegalArgumentException("backlog: " + backlog);
}
this.backlog = backlog;
return this;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private Object getOption(SocketOption option, Object defaultValue) {
if (javaChannel.get() == null) {
Object value = options.get(option);
if (value == null) {
return defaultValue;
} else {
return value;
}
}
try {
return javaChannel.get().getOption(option);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private void setOption(SocketOption option, Object defaultValue) {
if (javaChannel.get() == null) {
options.put(option, defaultValue);
return;
}
try {
javaChannel.get().setOption(option, defaultValue);
} catch (IOException e) {
throw new ChannelException(e);
}
}
/**
* Assing the given {@link AsynchronousServerSocketChannel} to this instance
*/
void assign(AsynchronousServerSocketChannel javaChannel) {
if (javaChannel == null) {
throw new NullPointerException("javaChannel");
}
if (this.javaChannel.compareAndSet(null, javaChannel)) {
propagateOptions();
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private void propagateOptions() {
for (SocketOption option: options.keySet()) {
Object value = options.remove(option);
if (value != null) {
try {
javaChannel.get().setOption(option, value);
} catch (IOException e) {
throw new ChannelException(e);
}
}
}
// not needed anymore
options = null;
}
@Override
public AioServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
super.setConnectTimeoutMillis(connectTimeoutMillis);
return this;
}
@Override
public AioServerSocketChannelConfig setWriteSpinCount(int writeSpinCount) {
super.setWriteSpinCount(writeSpinCount);
return this;
}
@Override
public AioServerSocketChannelConfig setAllocator(ByteBufAllocator allocator) {
super.setAllocator(allocator);
return this;
}
@Override
public AioServerSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
super.setRecvByteBufAllocator(allocator);
return this;
}
@Override
public AioServerSocketChannelConfig setAutoRead(boolean autoRead) {
super.setAutoRead(autoRead);
return this;
}
@Override
public AioServerSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this;
}
@Override
public AioServerSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this;
}
}

View File

@ -1,667 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.aio;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.FileRegion;
import io.netty.channel.MessageList;
import io.netty.channel.aio.AbstractAioChannel;
import io.netty.channel.aio.AioCompletionHandler;
import io.netty.channel.aio.AioEventLoopGroup;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.internal.PlatformDependent;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler;
import java.nio.channels.InterruptedByTimeoutException;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.TimeUnit;
/**
* {@link SocketChannel} implementation which uses NIO2.
*
* NIO2 is only supported on Java 7+.
*/
public class AioSocketChannel extends AbstractAioChannel implements SocketChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private final CompletionHandler<Void, Void> connectHandler = new ConnectHandler(this);
private final CompletionHandler<Integer, ByteBuf> writeHandler = new WriteHandler<Integer>(this);
private final CompletionHandler<Integer, ByteBuf> readHandler = new ReadHandler<Integer>(this);
private final CompletionHandler<Long, ByteBuf> gatheringWriteHandler = new WriteHandler<Long>(this);
private final CompletionHandler<Long, ByteBuf> scatteringReadHandler = new ReadHandler<Long>(this);
private static AsynchronousSocketChannel newSocket(AsynchronousChannelGroup group) {
try {
return AsynchronousSocketChannel.open(group);
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
private final DefaultAioSocketChannelConfig config;
private volatile boolean inputShutdown;
private volatile boolean outputShutdown;
private boolean readInProgress;
private boolean inDoBeginRead;
private boolean readAgain;
private Throwable writeException;
private boolean writeInProgress;
private boolean inDoWrite;
private boolean fileRegionDone;
/**
* Create a new instance which has not yet attached an {@link AsynchronousSocketChannel}. The
* {@link AsynchronousSocketChannel} will be attached after it was this instance was registered to an
* {@link EventLoop}.
*/
public AioSocketChannel() {
this(null, null, null);
}
/**
* Create a new instance from the given {@link AsynchronousSocketChannel}.
*
* @param parent
* the parent of this channel. {@code null} if there's no parent.
* @param id
* the unique non-negative integer ID of this channel.
* Specify {@code null} to auto-generate a unique negative integer
* ID.
* @param ch
* the {@link AsynchronousSocketChannel} which is used by this instance
*/
AioSocketChannel(
AioServerSocketChannel parent, Integer id, AsynchronousSocketChannel ch) {
super(parent, id, ch);
config = new DefaultAioSocketChannelConfig(this, ch);
}
@Override
public InetSocketAddress localAddress() {
return (InetSocketAddress) super.localAddress();
}
@Override
public InetSocketAddress remoteAddress() {
return (InetSocketAddress) super.remoteAddress();
}
@Override
public ServerSocketChannel parent() {
return (ServerSocketChannel) super.parent();
}
@Override
public boolean isActive() {
return ch != null && javaChannel().isOpen() && remoteAddress0() != null;
}
@Override
protected AsynchronousSocketChannel javaChannel() {
return (AsynchronousSocketChannel) super.javaChannel();
}
@Override
public ChannelMetadata metadata() {
return METADATA;
}
@Override
public boolean isInputShutdown() {
return inputShutdown;
}
@Override
public boolean isOutputShutdown() {
return outputShutdown;
}
@Override
public ChannelFuture shutdownOutput() {
return shutdownOutput(newPromise());
}
@Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
try {
javaChannel().shutdownOutput();
outputShutdown = true;
promise.setSuccess();
} catch (Throwable t) {
promise.setFailure(t);
}
} else {
loop.execute(new Runnable() {
@Override
public void run() {
shutdownOutput(promise);
}
});
}
return promise;
}
@Override
protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress != null) {
try {
javaChannel().bind(localAddress);
} catch (IOException e) {
promise.setFailure(e);
return;
}
}
javaChannel().connect(remoteAddress, null, connectHandler);
}
@Override
protected InetSocketAddress localAddress0() {
if (ch == null) {
return null;
}
try {
return (InetSocketAddress) javaChannel().getLocalAddress();
} catch (IOException e) {
return null;
}
}
@Override
protected InetSocketAddress remoteAddress0() {
if (ch == null) {
return null;
}
try {
return (InetSocketAddress) javaChannel().getRemoteAddress();
} catch (IOException e) {
return null;
}
}
@Override
protected Runnable doRegister() throws Exception {
super.doRegister();
if (ch == null) {
ch = newSocket(((AioEventLoopGroup) eventLoop().parent()).channelGroup());
config.assign(javaChannel());
}
return null;
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().bind(localAddress);
}
@Override
protected void doDisconnect() throws Exception {
doClose();
}
@Override
protected void doClose() throws Exception {
javaChannel().close();
inputShutdown = true;
outputShutdown = true;
}
@Override
protected boolean isFlushPending() {
return false;
}
@Override
protected int doWrite(MessageList<Object> msgs, int index) throws Exception {
if (inDoWrite || writeInProgress) {
return 0;
}
inDoWrite = true;
try {
Object msg = msgs.get(index);
if (msg instanceof ByteBuf) {
if (doWriteBuffer((ByteBuf) msg)) {
return 1;
}
return 0;
}
if (msg instanceof FileRegion) {
if (doWriteFileRegion((FileRegion) msg)) {
return 1;
}
return 0;
}
} finally {
inDoWrite = false;
}
return 0;
}
private boolean doWriteBuffer(ByteBuf buf) throws Exception {
if (buf.isReadable()) {
for (;;) {
checkWriteException();
writeInProgress = true;
if (buf.nioBufferCount() == 1) {
javaChannel().write(
buf.nioBuffer(), config.getWriteTimeout(), TimeUnit.MILLISECONDS,
buf, writeHandler);
} else {
ByteBuffer[] buffers = buf.nioBuffers(buf.readerIndex(), buf.readableBytes());
if (buffers.length == 1) {
javaChannel().write(
buffers[0], config.getWriteTimeout(), TimeUnit.MILLISECONDS, buf, writeHandler);
} else {
javaChannel().write(
buffers, 0, buffers.length, config.getWriteTimeout(), TimeUnit.MILLISECONDS,
buf, gatheringWriteHandler);
}
}
if (writeInProgress) {
// JDK decided to write data (or notify handler) later.
return false;
}
checkWriteException();
// JDK performed the write operation immediately and notified the handler.
// We know this because we set asyncWriteInProgress to false in the handler.
if (!buf.isReadable()) {
// There's nothing left in the buffer. No need to retry writing.
return true;
}
// There's more to write. Continue the loop.
}
}
return true;
}
private boolean doWriteFileRegion(FileRegion region) throws Exception {
checkWriteException();
if (fileRegionDone) {
// fileregion was complete in the CompletionHandler
fileRegionDone = false;
// was written complete
return true;
}
WritableByteChannelAdapter byteChannel = new WritableByteChannelAdapter(region);
region.transferTo(byteChannel, 0);
// check if the FileRegion is already complete. This may be the case if all could be written directly
if (byteChannel.written >= region.count()) {
return true;
}
return false;
}
private void checkWriteException() throws Exception {
if (writeException != null) {
fileRegionDone = false;
Throwable e = writeException;
writeException = null;
PlatformDependent.throwException(e);
}
}
@Override
protected void doBeginRead() {
if (inDoBeginRead) {
readAgain = true;
return;
}
if (readInProgress || inputShutdown) {
return;
}
inDoBeginRead = true;
try {
for (;;) {
if (inputShutdown) {
break;
}
ByteBuf byteBuf = alloc().buffer();
readInProgress = true;
if (byteBuf.nioBufferCount() == 1) {
// Get a ByteBuffer view on the ByteBuf
ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes());
javaChannel().read(
buffer, config.getReadTimeout(), TimeUnit.MILLISECONDS, byteBuf, readHandler);
} else {
ByteBuffer[] buffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.writableBytes());
if (buffers.length == 1) {
javaChannel().read(
buffers[0], config.getReadTimeout(), TimeUnit.MILLISECONDS, byteBuf, readHandler);
} else {
javaChannel().read(
buffers, 0, buffers.length, config.getReadTimeout(), TimeUnit.MILLISECONDS,
byteBuf, scatteringReadHandler);
}
}
if (readInProgress) {
// JDK decided to read data (or notify handler) later.
break;
}
if (readAgain) {
// User requested the read operation.
readAgain = false;
continue;
}
break;
}
} finally {
inDoBeginRead = false;
}
}
private void setWriteException(Throwable cause) {
writeException = cause;
}
private static final class WriteHandler<T extends Number>
extends AioCompletionHandler<AioSocketChannel, T, ByteBuf> {
WriteHandler(AioSocketChannel channel) {
super(channel);
}
@Override
protected void completed0(AioSocketChannel channel, T result, ByteBuf buf) {
channel.writeException = null;
channel.writeInProgress = false;
boolean release = true;
try {
int writtenBytes = result.intValue();
if (writtenBytes > 0) {
// Update the readerIndex with the amount of read bytes
buf.readerIndex(buf.readerIndex() + writtenBytes);
}
if (buf.isReadable()) {
// something left in the buffer so not release it
release = false;
}
} finally {
if (release) {
buf.release();
}
if (channel.inDoWrite) {
// JDK performed the write operation immediately and notified this handler immediately.
// doWrite(...) will do subsequent write operations if necessary for us.
} else {
// trigger flush so doWrite(..) is called again. This will either trigger a new write to the
// channel or remove the empty bytebuf (which was written completely before) from the MessageList.
channel.unsafe().flushNow();
}
}
}
@Override
protected void failed0(AioSocketChannel channel, Throwable cause, ByteBuf buf) {
buf.release();
channel.setWriteException(cause);
channel.writeInProgress = false;
// Check if the exception was raised because of an InterruptedByTimeoutException which means that the
// write timeout was hit. In that case we should close the channel as it may be unusable anyway.
//
// See http://openjdk.java.net/projects/nio/javadoc/java/nio/channels/AsynchronousSocketChannel.html
if (cause instanceof InterruptedByTimeoutException) {
channel.unsafe().close(channel.unsafe().voidPromise());
}
if (!channel.inDoWrite) {
// trigger flushNow() so the Throwable is thrown in doWrite(...). This will make sure that all
// queued MessageLists are failed.
channel.unsafe().flushNow();
}
}
}
private final class ReadHandler<T extends Number> extends AioCompletionHandler<AioSocketChannel, T, ByteBuf> {
ReadHandler(AioSocketChannel channel) {
super(channel);
}
@Override
protected void completed0(AioSocketChannel channel, T result, ByteBuf byteBuf) {
channel.readInProgress = false;
if (channel.inputShutdown) {
// Channel has been closed during read. Because the inbound buffer has been deallocated already,
// there's no way to let a user handler access it unfortunately.
return;
}
boolean release = true;
final ChannelPipeline pipeline = channel.pipeline();
try {
boolean closed = false;
boolean read = false;
boolean firedChannelReadSuspended = false;
try {
int localReadAmount = result.intValue();
if (localReadAmount > 0) {
// Set the writerIndex of the buffer correctly to the
// current writerIndex + read amount of bytes.
//
// This is needed as the ByteBuffer and the ByteBuf does not share
// each others index
byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount);
read = true;
} else if (localReadAmount < 0) {
closed = true;
}
} catch (Throwable t) {
if (read) {
read = false;
release = false;
pipeline.fireMessageReceived(byteBuf);
}
if (!closed && isOpen()) {
firedChannelReadSuspended = true;
pipeline.fireChannelReadSuspended();
}
pipeline.fireExceptionCaught(t);
} finally {
if (read) {
release = false;
pipeline.fireMessageReceived(byteBuf);
}
// Double check because fireInboundBufferUpdated() might have triggered
// the closure by a user handler.
if (closed || !channel.isOpen()) {
channel.inputShutdown = true;
if (isOpen()) {
if (channel.config().isAllowHalfClosure()) {
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
channel.unsafe().close(channel.unsafe().voidPromise());
}
}
} else if (!firedChannelReadSuspended) {
pipeline.fireChannelReadSuspended();
}
}
} finally {
if (release) {
byteBuf.release();
}
}
}
@Override
protected void failed0(AioSocketChannel channel, Throwable t, ByteBuf buf) {
buf.release();
channel.readInProgress = false;
if (t instanceof ClosedChannelException) {
channel.inputShutdown = true;
return;
}
channel.pipeline().fireExceptionCaught(t);
// Check if the exception was raised because of an InterruptedByTimeoutException which means that the
// write timeout was hit. In that case we should close the channel as it may be unusable anyway.
//
// See http://openjdk.java.net/projects/nio/javadoc/java/nio/channels/AsynchronousSocketChannel.html
if (t instanceof IOException || t instanceof InterruptedByTimeoutException) {
channel.unsafe().close(channel.unsafe().voidPromise());
}
}
}
private static final class ConnectHandler extends AioCompletionHandler<AioSocketChannel, Void, Void> {
ConnectHandler(AioSocketChannel channel) {
super(channel);
}
@Override
protected void completed0(AioSocketChannel channel, Void result, Void attachment) {
((DefaultAioUnsafe) channel.unsafe()).connectSuccess();
}
@Override
protected void failed0(AioSocketChannel channel, Throwable exc, Void attachment) {
((DefaultAioUnsafe) channel.unsafe()).connectFailed(exc);
}
}
@Override
public AioSocketChannelConfig config() {
return config;
}
private final class WritableByteChannelAdapter implements WritableByteChannel {
private final FileRegionWriteHandler handler = new FileRegionWriteHandler();
private final FileRegion region;
private long written;
public WritableByteChannelAdapter(FileRegion region) {
this.region = region;
}
@Override
public int write(final ByteBuffer src) {
javaChannel().write(src, src, handler);
return 0;
}
@Override
public boolean isOpen() {
return javaChannel().isOpen();
}
@Override
public void close() throws IOException {
javaChannel().close();
}
private final class FileRegionWriteHandler extends AioCompletionHandler<AioSocketChannel, Integer, ByteBuffer> {
FileRegionWriteHandler() {
super(AioSocketChannel.this);
}
@Override
public void completed0(AioSocketChannel channel, Integer result, ByteBuffer src) {
try {
assert !fileRegionDone;
if (result == -1) {
checkEOF(region);
// mark the region as done and release it
fileRegionDone = true;
region.release();
return;
}
written += result;
if (written >= region.count()) {
channel.writeInProgress = false;
// mark the region as done and release it
fileRegionDone = true;
region.release();
return;
}
if (src.hasRemaining()) {
// something left in the buffer trigger a write again
javaChannel().write(src, src, this);
} else {
// everything was written out of the src buffer, so trigger a new transfer with new data
region.transferTo(WritableByteChannelAdapter.this, written);
}
} catch (Throwable cause) {
failed0(channel, cause, src);
}
}
@Override
public void failed0(AioSocketChannel channel, Throwable cause, ByteBuffer src) {
assert !fileRegionDone;
// mark the region as done and release it
fileRegionDone = true;
region.release();
channel.setWriteException(cause);
if (!inDoWrite) {
// not executed as part of the doWrite(...) so trigger flushNow() to make sure the doWrite(...)
// will be called again and so rethrow the exception
channel.unsafe().flushNow();
}
}
}
}
}

View File

@ -1,118 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.aio;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.SocketChannelConfig;
import java.nio.channels.InterruptedByTimeoutException;
/**
* Special {@link SocketChannelConfig} which is used for the {@link AioSocketChannel} to expose extra configuration
* possiblilites.
*
* In addition to the options provided by {@link SocketChannelConfig},
* {@link AioSocketChannelConfig} allows the following options in the option map:
*
* <table border="1" cellspacing="0" cellpadding="6">
* <tr>
* <th>Name</th><th>Associated setter method</th>
* </tr><tr>
* <td>{@link ChannelOption#AIO_READ_TIMEOUT}</td><td>{@link #setReadTimeout(long)}</td>
* </tr><tr>
* <td>{@link ChannelOption#AIO_WRITE_TIMEOUT}</td><td>{@link #setWriteTimeout(long)}</td>
* </tr>
* </table>
*/
public interface AioSocketChannelConfig extends SocketChannelConfig {
/**
* Return the read timeout in milliseconds after which a {@link InterruptedByTimeoutException} will get thrown.
* Once such an exception was detected it will get propagated to the handlers first. After that the channel
* will get closed as it may be in an unknown state.
*
* To disable it just use {@code 0}.
*/
AioSocketChannelConfig setReadTimeout(long readTimeoutInMillis);
/**
* Return the write timeout in milliseconds after which a {@link InterruptedByTimeoutException} will get thrown.
* Once such an exception was detected it will get propagated to the handlers first. After that the channel
* will get closed as it may be in an unknown state.
*
* To disable it just use {@code 0}.
*/
AioSocketChannelConfig setWriteTimeout(long writeTimeoutInMillis);
/**
* Return the read timeout in milliseconds after which a {@link InterruptedByTimeoutException} will get thrown.
*
* The default is {@code 0}
*/
long getReadTimeout();
/**
* Return the write timeout in milliseconds after which a {@link InterruptedByTimeoutException} will get thrown.
*
* The default is {@code 0}
*/
long getWriteTimeout();
@Override
AioSocketChannelConfig setTcpNoDelay(boolean tcpNoDelay);
@Override
AioSocketChannelConfig setSoLinger(int soLinger);
@Override
AioSocketChannelConfig setSendBufferSize(int sendBufferSize);
@Override
AioSocketChannelConfig setReceiveBufferSize(int receiveBufferSize);
@Override
AioSocketChannelConfig setKeepAlive(boolean keepAlive);
@Override
AioSocketChannelConfig setTrafficClass(int trafficClass);
@Override
AioSocketChannelConfig setReuseAddress(boolean reuseAddress);
@Override
AioSocketChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth);
@Override
AioSocketChannelConfig setAllowHalfClosure(boolean allowHalfClosure);
@Override
AioSocketChannelConfig setWriteSpinCount(int writeSpinCount);
@Override
AioSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
@Override
AioSocketChannelConfig setAllocator(ByteBufAllocator allocator);
@Override
AioSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator);
@Override
AioSocketChannelConfig setAutoRead(boolean autoRead);
}

View File

@ -1,376 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.aio;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.util.internal.PlatformDependent;
import java.io.IOException;
import java.net.SocketOption;
import java.net.StandardSocketOptions;
import java.nio.channels.NetworkChannel;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import static io.netty.channel.ChannelOption.*;
/**
* The default {@link AioSocketChannelConfig} implementation.
*/
final class DefaultAioSocketChannelConfig extends DefaultChannelConfig
implements AioSocketChannelConfig {
private final AtomicReference<NetworkChannel> javaChannel = new AtomicReference<NetworkChannel>();
private volatile boolean allowHalfClosure;
private volatile long readTimeoutInMillis;
private volatile long writeTimeoutInMillis;
private Map<SocketOption<?>, Object> options = PlatformDependent.newConcurrentHashMap();
private static final int DEFAULT_RCV_BUF_SIZE = 32 * 1024;
private static final int DEFAULT_SND_BUF_SIZE = 32 * 1024;
private static final int DEFAULT_SO_LINGER = -1;
private static final boolean DEFAULT_SO_KEEP_ALIVE = false;
private static final int DEFAULT_IP_TOS = 0;
private static final boolean DEFAULT_SO_REUSEADDR = false;
private static final boolean DEFAULT_TCP_NODELAY = false;
/**
* Creates a new instance with no {@link NetworkChannel} assigned to it.
*
* You should call {@link #assign(NetworkChannel)} to assign a {@link NetworkChannel} to it and
* have the configuration set on it.
*/
DefaultAioSocketChannelConfig(AioSocketChannel channel) {
super(channel);
enableTcpNoDelay();
}
/**
* Creates a new instance with the given {@link NetworkChannel} assigned to it.
*/
DefaultAioSocketChannelConfig(AioSocketChannel channel, NetworkChannel javaChannel) {
super(channel);
this.javaChannel.set(javaChannel);
enableTcpNoDelay();
}
private void enableTcpNoDelay() {
// Enable TCP_NODELAY by default if possible.
if (PlatformDependent.canEnableTcpNoDelayByDefault()) {
try {
setTcpNoDelay(true);
} catch (Exception e) {
// Ignore.
}
}
}
@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(
super.getOptions(),
SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS,
AIO_READ_TIMEOUT, AIO_WRITE_TIMEOUT, ALLOW_HALF_CLOSURE);
}
@SuppressWarnings("unchecked")
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == SO_RCVBUF) {
return (T) Integer.valueOf(getReceiveBufferSize());
}
if (option == SO_SNDBUF) {
return (T) Integer.valueOf(getSendBufferSize());
}
if (option == TCP_NODELAY) {
return (T) Boolean.valueOf(isTcpNoDelay());
}
if (option == SO_KEEPALIVE) {
return (T) Boolean.valueOf(isKeepAlive());
}
if (option == SO_REUSEADDR) {
return (T) Boolean.valueOf(isReuseAddress());
}
if (option == SO_LINGER) {
return (T) Integer.valueOf(getSoLinger());
}
if (option == IP_TOS) {
return (T) Integer.valueOf(getTrafficClass());
}
if (option == AIO_READ_TIMEOUT) {
return (T) Long.valueOf(getReadTimeout());
}
if (option == AIO_WRITE_TIMEOUT) {
return (T) Long.valueOf(getWriteTimeout());
}
if (option == ALLOW_HALF_CLOSURE) {
return (T) Boolean.valueOf(isAllowHalfClosure());
}
return super.getOption(option);
}
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
if (option == SO_RCVBUF) {
setReceiveBufferSize((Integer) value);
} else if (option == SO_SNDBUF) {
setSendBufferSize((Integer) value);
} else if (option == TCP_NODELAY) {
setTcpNoDelay((Boolean) value);
} else if (option == SO_KEEPALIVE) {
setKeepAlive((Boolean) value);
} else if (option == SO_REUSEADDR) {
setReuseAddress((Boolean) value);
} else if (option == SO_LINGER) {
setSoLinger((Integer) value);
} else if (option == IP_TOS) {
setTrafficClass((Integer) value);
} else if (option == AIO_READ_TIMEOUT) {
setReadTimeout((Long) value);
} else if (option == AIO_WRITE_TIMEOUT) {
setWriteTimeout((Long) value);
} else if (option == ALLOW_HALF_CLOSURE) {
setAllowHalfClosure((Boolean) value);
} else {
return super.setOption(option, value);
}
return true;
}
@Override
public int getReceiveBufferSize() {
return (Integer) getOption(StandardSocketOptions.SO_RCVBUF, DEFAULT_RCV_BUF_SIZE);
}
@Override
public int getSendBufferSize() {
return (Integer) getOption(StandardSocketOptions.SO_SNDBUF, DEFAULT_SND_BUF_SIZE);
}
@Override
public int getSoLinger() {
return (Integer) getOption(StandardSocketOptions.SO_LINGER, DEFAULT_SO_LINGER);
}
@Override
public int getTrafficClass() {
return (Integer) getOption(StandardSocketOptions.IP_TOS, DEFAULT_IP_TOS);
}
@Override
public boolean isKeepAlive() {
return (Boolean) getOption(StandardSocketOptions.SO_KEEPALIVE, DEFAULT_SO_KEEP_ALIVE);
}
@Override
public boolean isReuseAddress() {
return (Boolean) getOption(StandardSocketOptions.SO_REUSEADDR, DEFAULT_SO_REUSEADDR);
}
@Override
public boolean isTcpNoDelay() {
return (Boolean) getOption(StandardSocketOptions.TCP_NODELAY, DEFAULT_TCP_NODELAY);
}
@Override
public AioSocketChannelConfig setKeepAlive(boolean keepAlive) {
setOption(StandardSocketOptions.SO_KEEPALIVE, keepAlive);
return this;
}
@Override
public AioSocketChannelConfig setPerformancePreferences(
int connectionTime, int latency, int bandwidth) {
throw new UnsupportedOperationException();
}
@Override
public AioSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) {
setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
return this;
}
@Override
public AioSocketChannelConfig setReuseAddress(boolean reuseAddress) {
setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress);
return this;
}
@Override
public AioSocketChannelConfig setSendBufferSize(int sendBufferSize) {
setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
return this;
}
@Override
public AioSocketChannelConfig setSoLinger(int soLinger) {
setOption(StandardSocketOptions.SO_LINGER, soLinger);
return this;
}
@Override
public AioSocketChannelConfig setTcpNoDelay(boolean tcpNoDelay) {
setOption(StandardSocketOptions.TCP_NODELAY, tcpNoDelay);
return this;
}
@Override
public AioSocketChannelConfig setTrafficClass(int trafficClass) {
setOption(StandardSocketOptions.IP_TOS, trafficClass);
return this;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private Object getOption(SocketOption option, Object defaultValue) {
if (javaChannel.get() == null) {
Object value = options.get(option);
if (value == null) {
return defaultValue;
} else {
return value;
}
}
try {
return javaChannel.get().getOption(option);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private void setOption(SocketOption option, Object defaultValue) {
if (javaChannel.get() == null) {
options.put(option, defaultValue);
return;
}
try {
javaChannel.get().setOption(option, defaultValue);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public AioSocketChannelConfig setReadTimeout(long readTimeoutInMillis) {
if (readTimeoutInMillis < 0) {
throw new IllegalArgumentException("readTimeoutInMillis: " + readTimeoutInMillis);
}
this.readTimeoutInMillis = readTimeoutInMillis;
return this;
}
@Override
public AioSocketChannelConfig setWriteTimeout(long writeTimeoutInMillis) {
if (writeTimeoutInMillis < 0) {
throw new IllegalArgumentException("writeTimeoutInMillis: " + writeTimeoutInMillis);
}
this.writeTimeoutInMillis = writeTimeoutInMillis;
return this;
}
@Override
public long getReadTimeout() {
return readTimeoutInMillis;
}
@Override
public long getWriteTimeout() {
return writeTimeoutInMillis;
}
@Override
public boolean isAllowHalfClosure() {
return allowHalfClosure;
}
@Override
public AioSocketChannelConfig setAllowHalfClosure(boolean allowHalfClosure) {
this.allowHalfClosure = allowHalfClosure;
return this;
}
/**
* Assing the given {@link NetworkChannel} to this instance
*/
void assign(NetworkChannel javaChannel) {
if (javaChannel == null) {
throw new NullPointerException("javaChannel");
}
if (this.javaChannel.compareAndSet(null, javaChannel)) {
propagateOptions();
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private void propagateOptions() {
for (SocketOption option: options.keySet()) {
Object value = options.remove(option);
if (value != null) {
try {
javaChannel.get().setOption(option, value);
} catch (IOException e) {
throw new ChannelException(e);
}
}
}
// not needed anymore
options = null;
}
@Override
public AioSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
return (AioSocketChannelConfig) super.setConnectTimeoutMillis(connectTimeoutMillis);
}
@Override
public AioSocketChannelConfig setWriteSpinCount(int writeSpinCount) {
return (AioSocketChannelConfig) super.setWriteSpinCount(writeSpinCount);
}
@Override
public AioSocketChannelConfig setAllocator(ByteBufAllocator allocator) {
return (AioSocketChannelConfig) super.setAllocator(allocator);
}
@Override
public AioSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
super.setRecvByteBufAllocator(allocator);
return this;
}
@Override
public AioSocketChannelConfig setAutoRead(boolean autoRead) {
return (AioSocketChannelConfig) super.setAutoRead(autoRead);
}
@Override
public AioSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
return (AioSocketChannelConfig) super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
}
@Override
public AioSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
return (AioSocketChannelConfig) super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
}
}

View File

@ -1,23 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* <a href="http://en.wikipedia.org/wiki/New_I/O">NIO2</a>-based socket channel
* API implementation - recommended for a large number of connections (&gt;= 1000).
*
* NIO2 is only supported on Java 7+.
*/
package io.netty.channel.socket.aio;