Do not call inbound event methods directly

- Fixes #831

This commit ensures the following events are never triggered as a direct
invocation if they are triggered via ChannelPipeline.fire*():

- channelInactive
- channelUnregistered
- exceptionCaught

This commit also fixes the following issues surfaced by this fix:

- Embedded channel implementations run scheduled tasks too early
- SpdySessionHandlerTest tries to generate inbound data even after the
  channel is closed.
- AioSocketChannel enters into an infinite loop on I/O error.
This commit is contained in:
Trustin Lee 2012-12-18 02:58:36 +09:00
parent 39250873ae
commit e59ac8e79b
12 changed files with 158 additions and 77 deletions

View File

@ -109,14 +109,25 @@ public class SpdySessionHandler
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
MessageBuf<Object> in = ctx.inboundMessageBuffer();
boolean handled = false;
for (;;) {
Object msg = in.poll();
if (msg == null) {
break;
}
if (msg instanceof SpdySynStreamFrame) {
// Let the next handlers handle the buffered messages before SYN_STREAM message updates the
// lastGoodStreamId.
if (handled) {
ctx.fireInboundBufferUpdated();
}
}
handleInboundMessage(ctx, msg);
handled = true;
}
ctx.fireInboundBufferUpdated();
}

View File

@ -233,23 +233,25 @@ public class SpdySessionHandlerTest {
sessionHandler.writeInbound(remotePingFrame);
assertNull(sessionHandler.readOutbound());
// Check if session handler sends a GOAWAY frame when closing
sessionHandler.writeInbound(closeMessage);
// Note that we cannot use writeInbound() here because sending closeMessage will close the channel
// immediately, and then we cannot test if SYN_STREAM and DATA frames are ignored after a GOAWAY frame is sent.
//// 1. Sending this message will make EchoHandler send a GOAWAY frame and close the session.
sessionHandler.inboundBuffer().add(closeMessage);
//// 2. Sending SYN_STREAM after sending closeMessage should fail with REFUSED_STREAM.
spdySynStreamFrame.setStreamId(localStreamID + 2);
sessionHandler.inboundBuffer().add(spdySynStreamFrame);
//// 3. Sending DATA after sending closeMessage should do nothing.
spdyDataFrame.setStreamId(localStreamID + 2);
sessionHandler.inboundBuffer().add(spdyDataFrame);
// At this point, we added three SPDY messages to the inbound buffer. Start testing.
sessionHandler.pipeline().fireInboundBufferUpdated();
//// 1. Check if session handler sends a GOAWAY frame when closing
assertGoAway(sessionHandler.readOutbound(), localStreamID);
assertNull(sessionHandler.readOutbound());
localStreamID += 2;
// Check if session handler returns REFUSED_STREAM if it receives
// SYN_STREAM frames after sending a GOAWAY frame
spdySynStreamFrame.setStreamId(localStreamID);
sessionHandler.writeInbound(spdySynStreamFrame);
assertRstStream(sessionHandler.readOutbound(), localStreamID, SpdyStreamStatus.REFUSED_STREAM);
assertNull(sessionHandler.readOutbound());
// Check if session handler ignores Data frames after sending
// a GOAWAY frame
spdyDataFrame.setStreamId(localStreamID);
sessionHandler.writeInbound(spdyDataFrame);
//// 2. Check if session handler returns REFUSED_STREAM if it receives SYN_STREAM frames
//// after sending a GOAWAY frame
assertRstStream(sessionHandler.readOutbound(), localStreamID + 2, SpdyStreamStatus.REFUSED_STREAM);
//// 3. Check if session handler ignores Data frames after sending a GOAWAY frame
assertNull(sessionHandler.readOutbound());
sessionHandler.finish();

View File

@ -15,20 +15,20 @@
*/
package io.netty.testsuite.transport.socket;
import static org.junit.Assert.*;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import org.junit.Test;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import org.junit.Test;
import static org.junit.Assert.*;
public class SocketShutdownOutputBySelfTest extends AbstractClientSocketTest {

View File

@ -15,7 +15,6 @@
*/
package io.netty.testsuite.transport.socket;
import static org.junit.Assert.*;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
@ -28,12 +27,13 @@ import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import org.junit.Test;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import static org.junit.Assert.*;
public class SocketStringEchoTest extends AbstractSocketTest {

View File

@ -430,7 +430,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
private final Runnable flushLaterTask = new FlushLater();
private final Runnable flushLaterTask = new Runnable() {
@Override
public void run() {
flushNowPending = false;
flush(voidFuture());
}
};
@Override
public final void sendFile(final FileRegion region, final ChannelFuture future) {
@ -859,14 +865,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
private class FlushLater implements Runnable {
@Override
public void run() {
flushNowPending = false;
unsafe().flush(voidFuture);
}
}
protected abstract boolean isCompatible(EventLoop loop);
protected abstract SocketAddress localAddress0();

View File

@ -946,7 +946,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
DefaultChannelHandlerContext next = nextContext(this.next, DIR_INBOUND);
if (next != null) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (executor.inEventLoop() && prev != null) {
next.fireChannelUnregisteredTask.run();
} else {
executor.execute(next.fireChannelUnregisteredTask);
@ -972,7 +972,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
DefaultChannelHandlerContext next = nextContext(this.next, DIR_INBOUND);
if (next != null) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (executor.inEventLoop() && prev != null) {
next.fireChannelInactiveTask.run();
} else {
executor.execute(next.fireChannelInactiveTask);
@ -986,25 +986,17 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
throw new NullPointerException("cause");
}
DefaultChannelHandlerContext next = this.next;
final DefaultChannelHandlerContext next = this.next;
if (next != null) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
try {
next.handler().exceptionCaught(next, cause);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn(
"An exception was thrown by a user handler's " +
"exceptionCaught() method while handling the following exception:", cause);
}
}
if (executor.inEventLoop() && prev != null) {
fireExceptionCaught0(next, cause);
} else {
try {
executor.execute(new Runnable() {
@Override
public void run() {
fireExceptionCaught(cause);
fireExceptionCaught0(next, cause);
}
});
} catch (Throwable t) {
@ -1022,6 +1014,18 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
}
private static void fireExceptionCaught0(DefaultChannelHandlerContext next, Throwable cause) {
try {
next.handler().exceptionCaught(next, cause);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn(
"An exception was thrown by a user handler's " +
"exceptionCaught() method while handling the following exception:", cause);
}
}
}
@Override
public void fireUserEventTriggered(final Object event) {
if (event == null) {
@ -1154,7 +1158,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
void callFreeInboundBuffer() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
if (executor.inEventLoop() && prev != null) {
freeInboundBufferTask.run();
} else {
executor.execute(freeInboundBufferTask);

View File

@ -1025,9 +1025,6 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelFuture write(Object message) {
if (message instanceof FileRegion) {
return sendFile((FileRegion) message);
}
return write(message, channel.newFuture());
}
@ -1194,7 +1191,11 @@ public class DefaultChannelPipeline implements ChannelPipeline {
ChannelFuture sendFile(final DefaultChannelHandlerContext ctx, final FileRegion region,
final ChannelFuture future) {
if (region == null) {
throw new NullPointerException("region");
}
validateFuture(future);
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
try {
@ -1302,27 +1303,38 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
if (executor.inEventLoop()) {
if (msgBuf) {
ctx.outboundMessageBuffer().add(message);
} else {
ByteBuf buf = (ByteBuf) message;
ctx.outboundByteBuffer().writeBytes(buf, buf.readerIndex(), buf.readableBytes());
}
flush0(ctx, future);
write0(ctx, message, future, msgBuf);
return future;
}
final boolean msgBuf0 = msgBuf;
final DefaultChannelHandlerContext ctx0 = ctx;
executor.execute(new Runnable() {
@Override
public void run() {
write(ctx0, message, future);
write0(ctx0, message, future, msgBuf0);
}
});
return future;
}
private void write0(DefaultChannelHandlerContext ctx, Object message, ChannelFuture future, boolean msgBuf) {
if (msgBuf) {
MessageBuf<Object> outMsgBuf = ctx.outboundMessageBuffer();
if (!outMsgBuf.isFreed()) {
outMsgBuf.add(message);
}
} else {
ByteBuf outByteBuf = ctx.outboundByteBuffer();
if (!outByteBuf.isFreed()) {
ByteBuf buf = (ByteBuf) message;
outByteBuf.writeBytes(buf, buf.readerIndex(), buf.readableBytes());
}
}
flush0(ctx, future);
}
private void validateFuture(ChannelFuture future) {
if (future == null) {
throw new NullPointerException("future");

View File

@ -35,11 +35,13 @@ import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
public abstract class AbstractEmbeddedChannel extends AbstractChannel {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEmbeddedChannel.class);
private final EmbeddedEventLoop loop = new EmbeddedEventLoop();
private final ChannelConfig config = new DefaultChannelConfig();
private final SocketAddress localAddress = new EmbeddedSocketAddress();
private final SocketAddress remoteAddress = new EmbeddedSocketAddress();
@ -81,7 +83,7 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel {
}
p.addLast(new LastInboundMessageHandler(), new LastInboundByteHandler());
new EmbeddedEventLoop().register(this);
loop.register(this);
}
@Override
@ -118,6 +120,24 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel {
return lastInboundMessageBuffer.poll();
}
public void runPendingTasks() {
try {
loop.runTasks();
} catch (Exception e) {
recordException(e);
}
}
private void recordException(Throwable cause) {
if (lastException == null) {
lastException = cause;
} else {
logger.warn(
"More than one exception was raised. " +
"Will report only the first one and log others.", cause);
}
}
public void checkException() {
Throwable t = lastException;
if (t == null) {
@ -136,6 +156,13 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel {
throw new ChannelException(t);
}
protected void ensureOpen() {
if (!isOpen()) {
recordException(new ClosedChannelException());
checkException();
}
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof EmbeddedEventLoop;
@ -211,6 +238,11 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel {
return lastInboundMessageBuffer;
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
// Do NOT free the buffer.
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
// Do nothing.
@ -219,13 +251,7 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
if (lastException == null) {
lastException = cause;
} else {
logger.warn(
"More than one exception was raised. " +
"Will report only the first one and log others.", cause);
}
recordException(cause);
}
}
@ -235,6 +261,11 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel {
return lastInboundByteBuffer;
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
// Do NOT free the buffer.
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
// No nothing

View File

@ -54,20 +54,25 @@ public class EmbeddedByteChannel extends AbstractEmbeddedChannel {
}
public boolean writeInbound(ByteBuf data) {
ensureOpen();
inboundBuffer().writeBytes(data);
pipeline().fireInboundBufferUpdated();
runPendingTasks();
checkException();
return lastInboundByteBuffer().readable() || !lastInboundMessageBuffer().isEmpty();
}
public boolean writeOutbound(Object msg) {
ensureOpen();
write(msg);
runPendingTasks();
checkException();
return lastOutboundBuffer().readable();
}
public boolean finish() {
close();
runPendingTasks();
checkException();
return lastInboundByteBuffer().readable() || !lastInboundMessageBuffer().isEmpty() ||
lastOutboundBuffer().readable();

View File

@ -20,14 +20,37 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
class EmbeddedEventLoop extends AbstractExecutorService implements EventLoop {
final class EmbeddedEventLoop extends AbstractExecutorService implements EventLoop {
private final Queue<Runnable> tasks = new ArrayDeque<Runnable>(2);
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException("command");
}
tasks.add(command);
}
void runTasks() {
for (;;) {
Runnable task = tasks.poll();
if (task == null) {
break;
}
task.run();
}
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay,
@ -80,11 +103,6 @@ class EmbeddedEventLoop extends AbstractExecutorService implements EventLoop {
return false;
}
@Override
public void execute(Runnable command) {
command.run();
}
@Override
public ChannelFuture register(Channel channel) {
return register(channel, channel.newFuture());

View File

@ -48,20 +48,25 @@ public class EmbeddedMessageChannel extends AbstractEmbeddedChannel {
}
public boolean writeInbound(Object msg) {
ensureOpen();
inboundBuffer().add(msg);
pipeline().fireInboundBufferUpdated();
runPendingTasks();
checkException();
return lastInboundByteBuffer().readable() || !lastInboundMessageBuffer().isEmpty();
}
public boolean writeOutbound(Object msg) {
ensureOpen();
write(msg);
runPendingTasks();
checkException();
return !lastOutboundBuffer().isEmpty();
}
public boolean finish() {
close();
runPendingTasks();
checkException();
return lastInboundByteBuffer().readable() || !lastInboundMessageBuffer().isEmpty() ||
!lastOutboundBuffer().isEmpty();

View File

@ -316,7 +316,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
try {
for (;;) {
ByteBuf byteBuf = pipeline().inboundByteBuffer();
if (byteBuf.isFreed()) {
if (inputShutdown) {
break;
}
@ -455,13 +455,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
pipeline.fireInboundBufferUpdated();
}
if (!(t instanceof ClosedChannelException)) {
pipeline.fireExceptionCaught(t);
if (t instanceof IOException) {
channel.unsafe().close(channel.unsafe().voidFuture());
}
}
pipeline.fireExceptionCaught(t);
} finally {
if (read) {
pipeline.fireInboundBufferUpdated();
@ -488,6 +482,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
protected void failed0(Throwable t, AioSocketChannel channel) {
channel.asyncReadInProgress = false;
if (t instanceof ClosedChannelException) {
channel.inputShutdown = true;
return;
}