oio and nio transport now make sure that a upstream event get only

executed from an io thread. See #140 and #187
This commit is contained in:
Norman Maurer 2012-02-25 15:12:58 +01:00
parent c4a437e16b
commit ef64e8c332
6 changed files with 164 additions and 34 deletions

View File

@ -298,6 +298,21 @@ public final class Channels {
ctx.getChannel(), message, remoteAddress));
}
/**
* Sends a {@code "writeComplete"} event to the first
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
* the specified {@link Channel} in the next io-thread.
*/
public static void fireWriteCompleteLater(Channel channel, long amount) {
if (amount == 0) {
return;
}
channel.getPipeline().sendUpstreamLater(
new DefaultWriteCompletionEvent(channel, amount));
}
/**
* Sends a {@code "writeComplete"} event to the first
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of

View File

@ -398,6 +398,7 @@ abstract class AbstractNioWorker implements Worker {
boolean open = true;
boolean addOpWrite = false;
boolean removeOpWrite = false;
boolean iothread = isIoThread(channel);
long writtenBytes = 0;
@ -468,7 +469,11 @@ abstract class AbstractNioWorker implements Worker {
buf = null;
evt = null;
future.setFailure(t);
fireExceptionCaught(channel, t);
if (iothread) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
if (t instanceof IOException) {
open = false;
close(channel, succeededFuture(channel));
@ -491,10 +496,17 @@ abstract class AbstractNioWorker implements Worker {
}
}
}
fireWriteComplete(channel, writtenBytes);
if (iothread) {
fireWriteComplete(channel, writtenBytes);
} else {
fireWriteCompleteLater(channel, writtenBytes);
}
}
static boolean isIoThread(AbstractNioChannel<?> channel) {
return Thread.currentThread() == channel.worker.thread;
}
private void setOpWrite(AbstractNioChannel<?> channel) {
Selector selector = this.selector;
SelectionKey key = channel.channel.keyFor(selector);
@ -545,6 +557,8 @@ abstract class AbstractNioWorker implements Worker {
void close(AbstractNioChannel<?> channel, ChannelFuture future) {
boolean connected = channel.isConnected();
boolean bound = channel.isBound();
boolean iothread = isIoThread(channel);
try {
channel.channel.close();
cancelledKeys ++;
@ -552,20 +566,36 @@ abstract class AbstractNioWorker implements Worker {
if (channel.setClosed()) {
future.setSuccess();
if (connected) {
fireChannelDisconnected(channel);
if (iothread) {
fireChannelDisconnected(channel);
} else {
fireChannelDisconnectedLater(channel);
}
}
if (bound) {
fireChannelUnbound(channel);
if (iothread) {
fireChannelUnbound(channel);
} else {
fireChannelUnboundLater(channel);
}
}
cleanUpWriteBuffer(channel);
fireChannelClosed(channel);
if (iothread) {
fireChannelClosed(channel);
} else {
fireChannelClosedLater(channel);
}
} else {
future.setSuccess();
}
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
if (iothread) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}
@ -618,12 +648,17 @@ abstract class AbstractNioWorker implements Worker {
}
if (fireExceptionCaught) {
fireExceptionCaught(channel, cause);
if (isIoThread(channel)) {
fireExceptionCaught(channel, cause);
} else {
fireExceptionCaughtLater(channel, cause);
}
}
}
void setInterestOps(AbstractNioChannel<?> channel, ChannelFuture future, int interestOps) {
boolean changed = false;
boolean iothread = isIoThread(channel);
try {
// interestOps can change at any time and at any thread.
// Acquire a lock to avoid possible race condition.
@ -684,16 +719,28 @@ abstract class AbstractNioWorker implements Worker {
future.setSuccess();
if (changed) {
fireChannelInterestChanged(channel);
if (iothread) {
fireChannelInterestChanged(channel);
} else {
fireChannelInterestChangedLater(channel);
}
}
} catch (CancelledKeyException e) {
// setInterestOps() was called on a closed channel.
ClosedChannelException cce = new ClosedChannelException();
future.setFailure(cce);
fireExceptionCaught(channel, cce);
if (iothread) {
fireExceptionCaught(channel, cce);
} else {
fireExceptionCaughtLater(channel, cce);
}
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
if (iothread) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}

View File

@ -16,7 +16,9 @@
package io.netty.channel.socket.nio;
import static io.netty.channel.Channels.fireChannelDisconnected;
import static io.netty.channel.Channels.fireChannelDisconnectedLater;
import static io.netty.channel.Channels.fireExceptionCaught;
import static io.netty.channel.Channels.fireExceptionCaughtLater;
import static io.netty.channel.Channels.fireMessageReceived;
import static io.netty.channel.Channels.succeededFuture;
import io.netty.buffer.ChannelBufferFactory;
@ -126,15 +128,24 @@ class NioDatagramWorker extends AbstractNioWorker {
static void disconnect(NioDatagramChannel channel, ChannelFuture future) {
boolean connected = channel.isConnected();
boolean iothread = isIoThread(channel);
try {
channel.getDatagramChannel().disconnect();
future.setSuccess();
if (connected) {
fireChannelDisconnected(channel);
if (iothread) {
fireChannelDisconnected(channel);
} else {
fireChannelDisconnectedLater(channel);
}
}
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
if (iothread) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}

View File

@ -15,12 +15,7 @@
*/
package io.netty.channel.socket.oio;
import static io.netty.channel.Channels.fireChannelClosed;
import static io.netty.channel.Channels.fireChannelDisconnected;
import static io.netty.channel.Channels.fireChannelInterestChanged;
import static io.netty.channel.Channels.fireChannelUnbound;
import static io.netty.channel.Channels.fireExceptionCaught;
import static io.netty.channel.Channels.succeededFuture;
import static io.netty.channel.Channels.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.Channels;
@ -89,6 +84,9 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
close(channel, succeededFuture(channel));
}
static boolean isIoThead(AbstractOioChannel channel) {
return Thread.currentThread() == channel.workerThread;
}
@Override
public void executeInIoThread(Runnable eventRunnable) {
@ -120,7 +118,8 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
static void setInterestOps(
AbstractOioChannel channel, ChannelFuture future, int interestOps) {
boolean iothread = isIoThead(channel);
// Override OP_WRITE flag - a user cannot change this flag.
interestOps &= ~Channel.OP_WRITE;
interestOps |= channel.getInterestOps() & Channel.OP_WRITE;
@ -148,18 +147,27 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
workerThread.interrupt();
}
}
fireChannelInterestChanged(channel);
if (iothread) {
fireChannelInterestChanged(channel);
} else {
fireChannelInterestChangedLater(channel);
}
}
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
if (iothread) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}
static void close(AbstractOioChannel channel, ChannelFuture future) {
boolean connected = channel.isConnected();
boolean bound = channel.isBound();
boolean iothread = isIoThead(channel);
try {
channel.closeSocket();
if (channel.setClosed()) {
@ -171,18 +179,34 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
if (workerThread != null && currentThread != workerThread) {
workerThread.interrupt();
}
fireChannelDisconnected(channel);
if (iothread) {
fireChannelDisconnected(channel);
} else {
fireChannelDisconnectedLater(channel);
}
}
if (bound) {
fireChannelUnbound(channel);
if (iothread) {
fireChannelUnbound(channel);
} else {
fireChannelUnboundLater(channel);
}
}
if (iothread) {
fireChannelClosed(channel);
} else {
fireChannelClosedLater(channel);
}
fireChannelClosed(channel);
} else {
future.setSuccess();
}
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
if (iothread) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}
}

View File

@ -63,6 +63,8 @@ class OioDatagramWorker extends AbstractOioWorker<OioDatagramChannel> {
static void write(
OioDatagramChannel channel, ChannelFuture future,
Object message, SocketAddress remoteAddress) {
boolean iothread = isIoThead(channel);
try {
ChannelBuffer buf = (ChannelBuffer) message;
int offset = buf.readerIndex();
@ -84,27 +86,45 @@ class OioDatagramWorker extends AbstractOioWorker<OioDatagramChannel> {
packet.setSocketAddress(remoteAddress);
}
channel.socket.send(packet);
fireWriteComplete(channel, length);
if (iothread) {
fireWriteComplete(channel, length);
} else {
fireWriteCompleteLater(channel, length);
}
future.setSuccess();
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
if (iothread) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}
static void disconnect(OioDatagramChannel channel, ChannelFuture future) {
boolean connected = channel.isConnected();
boolean iothread = isIoThead(channel);
try {
channel.socket.disconnect();
future.setSuccess();
if (connected) {
// Notify.
fireChannelDisconnected(channel);
if (iothread) {
fireChannelDisconnected(channel);
} else {
fireChannelDisconnectedLater(channel);
}
}
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
if (iothread) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}

View File

@ -65,11 +65,16 @@ class OioWorker extends AbstractOioWorker<OioSocketChannel> {
OioSocketChannel channel, ChannelFuture future,
Object message) {
boolean iothread = isIoThead(channel);
OutputStream out = channel.getOutputStream();
if (out == null) {
Exception e = new ClosedChannelException();
future.setFailure(e);
fireExceptionCaught(channel, e);
if (iothread) {
fireExceptionCaught(channel, e);
} else {
fireExceptionCaughtLater(channel, e);
}
return;
}
@ -106,7 +111,11 @@ class OioWorker extends AbstractOioWorker<OioSocketChannel> {
}
}
fireWriteComplete(channel, length);
if (iothread) {
fireWriteComplete(channel, length);
} else {
fireWriteCompleteLater(channel, length);
}
future.setSuccess();
} catch (Throwable t) {
@ -118,7 +127,11 @@ class OioWorker extends AbstractOioWorker<OioSocketChannel> {
t = new ClosedChannelException();
}
future.setFailure(t);
fireExceptionCaught(channel, t);
if (iothread) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}