Make sure the ComplationHandler stuff is handled in the eventloop. See #396

This commit is contained in:
norman 2012-07-03 15:25:28 +02:00
parent a5b6f1d615
commit eccc28965e
3 changed files with 129 additions and 26 deletions

View File

@ -0,0 +1,69 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.aio;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import java.nio.channels.CompletionHandler;
/**
* Special {@link CompletionHandler} which makes sure that the callback methods gets executed in the {@link EventLoop}
*
*
*/
abstract class AioCompletionHandler<V, A extends Channel> implements CompletionHandler<V, A> {
/**
* See {@link CompletionHandler#completed(Object, Object)}
*/
protected abstract void completed0(V result, A channel);
/**
* Set {@link CompletionHandler#failed(Throwable, Object)}
*/
protected abstract void failed0(Throwable exc, A channel);
@Override
public final void completed(final V result, final A channel) {
if (channel.eventLoop().inEventLoop()) {
completed0(result, channel);
} else {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
completed0(result, channel);
}
});
}
}
@Override
public final void failed(final Throwable exc, final A channel) {
if (channel.eventLoop().inEventLoop()) {
failed0(exc, channel);
} else {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
failed0(exc, channel);
}
});
}
}
}

View File

@ -28,7 +28,7 @@ import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousCloseException; import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler; import java.util.concurrent.atomic.AtomicBoolean;
public class AioServerSocketChannel extends AbstractAioChannel implements ServerSocketChannel { public class AioServerSocketChannel extends AbstractAioChannel implements ServerSocketChannel {
@ -36,6 +36,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
private static final InternalLogger logger = private static final InternalLogger logger =
InternalLoggerFactory.getInstance(AioServerSocketChannel.class); InternalLoggerFactory.getInstance(AioServerSocketChannel.class);
private volatile AioServerSocketChannelConfig config; private volatile AioServerSocketChannelConfig config;
final AtomicBoolean closed = new AtomicBoolean(false);
public AioServerSocketChannel() { public AioServerSocketChannel() {
super(null, null); super(null, null);
@ -88,7 +89,9 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
@Override @Override
protected void doClose() throws Exception { protected void doClose() throws Exception {
javaChannel().close(); if (closed.compareAndSet(false, true)) {
javaChannel().close();
}
} }
@Override @Override
@ -116,21 +119,28 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
} }
private static final class AcceptHandler private static final class AcceptHandler
implements CompletionHandler<AsynchronousSocketChannel, AioServerSocketChannel> { extends AioCompletionHandler<AsynchronousSocketChannel, AioServerSocketChannel> {
public void completed(AsynchronousSocketChannel ch, AioServerSocketChannel channel) {
// register again this handler to accept new connections @Override
protected void completed0(AsynchronousSocketChannel ch, AioServerSocketChannel channel) {
// register again this handler to accept new connections
channel.javaChannel().accept(channel, this); channel.javaChannel().accept(channel, this);
// create the socket add it to the buffer and fire the event // create the socket add it to the buffer and fire the event
channel.pipeline().inboundMessageBuffer().add(new AioSocketChannel(channel, null, ch)); channel.pipeline().inboundMessageBuffer().add(new AioSocketChannel(channel, null, ch));
channel.pipeline().fireInboundBufferUpdated(); channel.pipeline().fireInboundBufferUpdated();
} }
public void failed(Throwable t, AioServerSocketChannel channel) { @Override
// check if the exception was thrown because the channel was closed before protected void failed0(Throwable t, AioServerSocketChannel channel) {
boolean asyncClosed = false;
if (t instanceof AsynchronousCloseException) {
asyncClosed = true;
channel.closed.set(true);
}
// check if the exception was thrown because the channel was closed before
// log something // log something
if (channel.isOpen() && !(t instanceof AsynchronousCloseException)) { if (channel.isOpen() && ! asyncClosed) {
logger.warn("Failed to create a new channel from an accepted socket.", t); logger.warn("Failed to create a new channel from an accepted socket.", t);
} }
} }

View File

@ -26,16 +26,19 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler; import java.nio.channels.CompletionHandler;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
public class AioSocketChannel extends AbstractAioChannel implements SocketChannel { public class AioSocketChannel extends AbstractAioChannel implements SocketChannel {
private static final CompletionHandler<Void, AioSocketChannel> CONNECT_HANDLER = new ConnectHandler(); private static final CompletionHandler<Void, AioSocketChannel> CONNECT_HANDLER = new ConnectHandler();
private static final CompletionHandler<Integer, AioSocketChannel> READ_HANDLER = new ReadHandler(); private static final CompletionHandler<Integer, AioSocketChannel> READ_HANDLER = new ReadHandler();
private static final CompletionHandler<Integer, AioSocketChannel> WRITE_HANDLER = new WriteHandler(); private static final CompletionHandler<Integer, AioSocketChannel> WRITE_HANDLER = new WriteHandler();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean flushing = new AtomicBoolean(false); private final AtomicBoolean flushing = new AtomicBoolean(false);
private volatile AioSocketChannelConfig config; private volatile AioSocketChannelConfig config;
@ -111,7 +114,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
return null; return null;
} else if (remoteAddress() != null) { } else if (remoteAddress() != null) {
return new Runnable() { return new Runnable() {
@Override @Override
public void run() { public void run() {
read(); read();
@ -130,12 +133,11 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
expandReadBuffer(byteBuf); expandReadBuffer(byteBuf);
// Get a ByteBuffer view on the ByteBuf // Get a ByteBuffer view on the ByteBuf
ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes()); ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes());
javaChannel().read(buffer, this, READ_HANDLER); javaChannel().read(buffer, this, READ_HANDLER);
} }
private static boolean expandReadBuffer(ByteBuf byteBuf) { private boolean expandReadBuffer(ByteBuf byteBuf) {
if (!byteBuf.writable()) { if (!byteBuf.writable()) {
// FIXME: Magic number // FIXME: Magic number
byteBuf.ensureWritableBytes(4096); byteBuf.ensureWritableBytes(4096);
@ -156,7 +158,9 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
@Override @Override
protected void doClose() throws Exception { protected void doClose() throws Exception {
javaChannel().close(); if (closed.compareAndSet(false, true)) {
javaChannel().close();
}
} }
@Override @Override
@ -183,13 +187,13 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
} }
private static final class WriteHandler implements CompletionHandler<Integer, AioSocketChannel> { private static final class WriteHandler extends AioCompletionHandler<Integer, AioSocketChannel> {
@Override @Override
public void completed(Integer result, AioSocketChannel channel) { protected void completed0(Integer result, AioSocketChannel channel) {
ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer(); ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer();
if (result > 0) { if (result > 0) {
// Update the readerIndex with the amount of read bytes // Update the readerIndex with the amount of read bytes
buf.readerIndex(buf.readerIndex() + result); buf.readerIndex(buf.readerIndex() + result);
@ -211,11 +215,15 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
} }
@Override @Override
public void failed(Throwable cause, AioSocketChannel channel) { protected void failed0(Throwable cause, AioSocketChannel channel) {
if (cause instanceof AsynchronousCloseException) {
channel.closed.set(true);
}
channel.notifyFlushFutures(cause); channel.notifyFlushFutures(cause);
channel.pipeline().fireExceptionCaught(cause); channel.pipeline().fireExceptionCaught(cause);
if (cause instanceof IOException) { if (cause instanceof IOException) {
channel.unsafe().close(channel.unsafe().voidFuture()); channel.unsafe().close(channel.unsafe().voidFuture());
} else { } else {
ByteBuf buf = channel.pipeline().outboundByteBuffer(); ByteBuf buf = channel.pipeline().outboundByteBuffer();
@ -228,13 +236,13 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
} }
} }
private static final class ReadHandler implements CompletionHandler<Integer, AioSocketChannel> { private static final class ReadHandler extends AioCompletionHandler<Integer, AioSocketChannel> {
@Override @Override
public void completed(Integer result, AioSocketChannel channel) { protected void completed0(Integer result, AioSocketChannel channel) {
assert channel.eventLoop().inEventLoop();
final ChannelPipeline pipeline = channel.pipeline(); final ChannelPipeline pipeline = channel.pipeline();
final ByteBuf byteBuf = pipeline.inboundByteBuffer();
boolean closed = false; boolean closed = false;
boolean read = false; boolean read = false;
try { try {
@ -245,7 +253,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
// //
// This is needed as the ByteBuffer and the ByteBuf does not share // This is needed as the ByteBuffer and the ByteBuf does not share
// each others index // each others index
final ByteBuf byteBuf = pipeline.inboundByteBuffer();
byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount); byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount);
read = true; read = true;
@ -255,11 +262,17 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
} }
} catch (Throwable t) { } catch (Throwable t) {
if (t instanceof AsynchronousCloseException) {
channel.closed.set(true);
}
if (read) { if (read) {
read = false; read = false;
pipeline.fireInboundBufferUpdated(); pipeline.fireInboundBufferUpdated();
} }
pipeline.fireExceptionCaught(t); pipeline.fireExceptionCaught(t);
if (t instanceof IOException) { if (t instanceof IOException) {
channel.unsafe().close(channel.unsafe().voidFuture()); channel.unsafe().close(channel.unsafe().voidFuture());
} }
@ -272,12 +285,20 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
} else { } else {
// start the next read // start the next read
channel.read(); channel.read();
} }
} }
} }
@Override @Override
public void failed(Throwable t, AioSocketChannel channel) { protected void failed0(Throwable t, AioSocketChannel channel) {
if (t instanceof AsynchronousCloseException) {
channel.closed.set(true);
// TODO: This seems wrong!
return;
}
channel.pipeline().fireExceptionCaught(t); channel.pipeline().fireExceptionCaught(t);
if (t instanceof IOException) { if (t instanceof IOException) {
channel.unsafe().close(channel.unsafe().voidFuture()); channel.unsafe().close(channel.unsafe().voidFuture());
@ -288,10 +309,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
} }
} }
private static final class ConnectHandler implements CompletionHandler<Void, AioSocketChannel> { private static final class ConnectHandler extends AioCompletionHandler<Void, AioSocketChannel> {
@Override @Override
public void completed(Void result, AioSocketChannel channel) { protected void completed0(Void result, AioSocketChannel channel) {
((AsyncUnsafe) channel.unsafe()).connectSuccess(); ((AsyncUnsafe) channel.unsafe()).connectSuccess();
channel.pipeline().fireChannelActive(); channel.pipeline().fireChannelActive();
@ -300,7 +321,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
} }
@Override @Override
public void failed(Throwable exc, AioSocketChannel channel) { protected void failed0(Throwable exc, AioSocketChannel channel) {
if (exc instanceof AsynchronousCloseException) {
channel.closed.set(true);
}
((AsyncUnsafe) channel.unsafe()).connectFailed(exc); ((AsyncUnsafe) channel.unsafe()).connectFailed(exc);
} }
} }