diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioCompletionHandler.java b/transport/src/main/java/io/netty/channel/socket/aio/AioCompletionHandler.java new file mode 100644 index 0000000000..a78235b71b --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioCompletionHandler.java @@ -0,0 +1,69 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.aio; + +import io.netty.channel.Channel; +import io.netty.channel.EventLoop; + +import java.nio.channels.CompletionHandler; + +/** + * Special {@link CompletionHandler} which makes sure that the callback methods gets executed in the {@link EventLoop} + * + * + */ +abstract class AioCompletionHandler implements CompletionHandler { + + /** + * See {@link CompletionHandler#completed(Object, Object)} + */ + protected abstract void completed0(V result, A channel); + + /** + * Set {@link CompletionHandler#failed(Throwable, Object)} + */ + protected abstract void failed0(Throwable exc, A channel); + + @Override + public final void completed(final V result, final A channel) { + if (channel.eventLoop().inEventLoop()) { + completed0(result, channel); + } else { + channel.eventLoop().execute(new Runnable() { + + @Override + public void run() { + completed0(result, channel); + } + }); + } + } + + @Override + public final void failed(final Throwable exc, final A channel) { + if (channel.eventLoop().inEventLoop()) { + failed0(exc, channel); + } else { + channel.eventLoop().execute(new Runnable() { + + @Override + public void run() { + failed0(exc, channel); + } + }); + } + } +} diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java index 1f63c92387..418377b5fa 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java @@ -28,7 +28,7 @@ import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; -import java.nio.channels.CompletionHandler; +import java.util.concurrent.atomic.AtomicBoolean; public class AioServerSocketChannel extends AbstractAioChannel implements ServerSocketChannel { @@ -36,6 +36,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server private static final InternalLogger logger = InternalLoggerFactory.getInstance(AioServerSocketChannel.class); private volatile AioServerSocketChannelConfig config; + final AtomicBoolean closed = new AtomicBoolean(false); public AioServerSocketChannel() { super(null, null); @@ -88,7 +89,9 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server @Override protected void doClose() throws Exception { - javaChannel().close(); + if (closed.compareAndSet(false, true)) { + javaChannel().close(); + } } @Override @@ -116,21 +119,28 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server } private static final class AcceptHandler - implements CompletionHandler { - public void completed(AsynchronousSocketChannel ch, AioServerSocketChannel channel) { - // register again this handler to accept new connections + extends AioCompletionHandler { + + @Override + protected void completed0(AsynchronousSocketChannel ch, AioServerSocketChannel channel) { + // register again this handler to accept new connections channel.javaChannel().accept(channel, this); // create the socket add it to the buffer and fire the event channel.pipeline().inboundMessageBuffer().add(new AioSocketChannel(channel, null, ch)); channel.pipeline().fireInboundBufferUpdated(); - } - public void failed(Throwable t, AioServerSocketChannel channel) { - // check if the exception was thrown because the channel was closed before + @Override + protected void failed0(Throwable t, AioServerSocketChannel channel) { + boolean asyncClosed = false; + if (t instanceof AsynchronousCloseException) { + asyncClosed = true; + channel.closed.set(true); + } + // check if the exception was thrown because the channel was closed before // log something - if (channel.isOpen() && !(t instanceof AsynchronousCloseException)) { + if (channel.isOpen() && ! asyncClosed) { logger.warn("Failed to create a new channel from an accepted socket.", t); } } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java index 08deb6f66f..14ed119e67 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java @@ -26,16 +26,19 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousChannelGroup; +import java.nio.channels.AsynchronousCloseException; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.atomic.AtomicBoolean; + public class AioSocketChannel extends AbstractAioChannel implements SocketChannel { private static final CompletionHandler CONNECT_HANDLER = new ConnectHandler(); private static final CompletionHandler READ_HANDLER = new ReadHandler(); private static final CompletionHandler WRITE_HANDLER = new WriteHandler(); + private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean flushing = new AtomicBoolean(false); private volatile AioSocketChannelConfig config; @@ -111,7 +114,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne return null; } else if (remoteAddress() != null) { return new Runnable() { - + @Override public void run() { read(); @@ -130,12 +133,11 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne expandReadBuffer(byteBuf); // Get a ByteBuffer view on the ByteBuf ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes()); - javaChannel().read(buffer, this, READ_HANDLER); } - private static boolean expandReadBuffer(ByteBuf byteBuf) { + private boolean expandReadBuffer(ByteBuf byteBuf) { if (!byteBuf.writable()) { // FIXME: Magic number byteBuf.ensureWritableBytes(4096); @@ -156,7 +158,9 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne @Override protected void doClose() throws Exception { - javaChannel().close(); + if (closed.compareAndSet(false, true)) { + javaChannel().close(); + } } @Override @@ -183,13 +187,13 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne } - private static final class WriteHandler implements CompletionHandler { + private static final class WriteHandler extends AioCompletionHandler { @Override - public void completed(Integer result, AioSocketChannel channel) { + protected void completed0(Integer result, AioSocketChannel channel) { ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer(); if (result > 0) { - + // Update the readerIndex with the amount of read bytes buf.readerIndex(buf.readerIndex() + result); @@ -211,11 +215,15 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne } @Override - public void failed(Throwable cause, AioSocketChannel channel) { + protected void failed0(Throwable cause, AioSocketChannel channel) { + if (cause instanceof AsynchronousCloseException) { + channel.closed.set(true); + } channel.notifyFlushFutures(cause); channel.pipeline().fireExceptionCaught(cause); if (cause instanceof IOException) { + channel.unsafe().close(channel.unsafe().voidFuture()); } else { ByteBuf buf = channel.pipeline().outboundByteBuffer(); @@ -228,13 +236,13 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne } } - private static final class ReadHandler implements CompletionHandler { + private static final class ReadHandler extends AioCompletionHandler { @Override - public void completed(Integer result, AioSocketChannel channel) { - assert channel.eventLoop().inEventLoop(); - + protected void completed0(Integer result, AioSocketChannel channel) { final ChannelPipeline pipeline = channel.pipeline(); + final ByteBuf byteBuf = pipeline.inboundByteBuffer(); + boolean closed = false; boolean read = false; try { @@ -245,7 +253,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne // // This is needed as the ByteBuffer and the ByteBuf does not share // each others index - final ByteBuf byteBuf = pipeline.inboundByteBuffer(); byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount); read = true; @@ -255,11 +262,17 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne } } catch (Throwable t) { + if (t instanceof AsynchronousCloseException) { + channel.closed.set(true); + } + if (read) { read = false; pipeline.fireInboundBufferUpdated(); } + pipeline.fireExceptionCaught(t); + if (t instanceof IOException) { channel.unsafe().close(channel.unsafe().voidFuture()); } @@ -272,12 +285,20 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne } else { // start the next read channel.read(); + } } } @Override - public void failed(Throwable t, AioSocketChannel channel) { + protected void failed0(Throwable t, AioSocketChannel channel) { + if (t instanceof AsynchronousCloseException) { + channel.closed.set(true); + + // TODO: This seems wrong! + return; + } + channel.pipeline().fireExceptionCaught(t); if (t instanceof IOException) { channel.unsafe().close(channel.unsafe().voidFuture()); @@ -288,10 +309,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne } } - private static final class ConnectHandler implements CompletionHandler { + private static final class ConnectHandler extends AioCompletionHandler { @Override - public void completed(Void result, AioSocketChannel channel) { + protected void completed0(Void result, AioSocketChannel channel) { ((AsyncUnsafe) channel.unsafe()).connectSuccess(); channel.pipeline().fireChannelActive(); @@ -300,7 +321,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne } @Override - public void failed(Throwable exc, AioSocketChannel channel) { + protected void failed0(Throwable exc, AioSocketChannel channel) { + if (exc instanceof AsynchronousCloseException) { + channel.closed.set(true); + } ((AsyncUnsafe) channel.unsafe()).connectFailed(exc); } }