EPOLL exception processing feedback loop

Motivation:
Commit cf171ff525 changed the way read operations were done. This change introduced a feedback loop between fireException and epollInReady.

Modifications:
- All EPOLL*Channel* classes should not call fireException and also continue to read. Instead a read operation should be executed on the eventloop (if the channel's input is not closed, and other conditions are satisfied)

Result:
Exception processing and channelRead will not be in a feedback loop.
Fixes https://github.com/netty/netty/issues/4091
This commit is contained in:
Scott Mitchell 2015-08-14 16:36:48 -07:00
parent 559f1b110a
commit ce6931e0e5
6 changed files with 170 additions and 98 deletions

View File

@ -311,6 +311,23 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
*/ */
abstract void epollInReady(); abstract void epollInReady();
/**
* Will schedule a {@link #epollInReady()} call on the event loop if necessary.
* @param edgeTriggered {@code true} if the channel is using ET mode. {@code false} otherwise.
*/
final void checkResetEpollIn(boolean edgeTriggered) {
if (edgeTriggered && !isInputShutdown0()) {
// trigger a read again as there may be something left to read and because of epoll ET we
// will not get notified again until we read everything from the socket
eventLoop().execute(new OneTimeTask() {
@Override
public void run() {
epollInReady();
}
});
}
}
/** /**
* Called once EPOLLRDHUP event is ready to be processed * Called once EPOLLRDHUP event is ready to be processed
*/ */

View File

@ -73,7 +73,7 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception; abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception;
final class EpollServerSocketUnsafe extends AbstractEpollUnsafe { final class EpollServerSocketUnsafe extends AbstractEpollUnsafe {
// Will hold the remote address after accept(...) was sucesssful. // Will hold the remote address after accept(...) was successful.
// We need 24 bytes for the address as maximum + 1 byte for storing the length. // We need 24 bytes for the address as maximum + 1 byte for storing the length.
// So use 26 bytes as it's a power of two. // So use 26 bytes as it's a power of two.
private final byte[] acceptedAddress = new byte[26]; private final byte[] acceptedAddress = new byte[26];
@ -117,16 +117,8 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
readPending = false; readPending = false;
allocHandle.incMessagesRead(1); allocHandle.incMessagesRead(1);
try { int len = acceptedAddress[0];
int len = acceptedAddress[0]; pipeline.fireChannelRead(newChildChannel(socketFd, acceptedAddress, 1, len));
pipeline.fireChannelRead(newChildChannel(socketFd, acceptedAddress, 1, len));
} catch (Throwable t) {
if (edgeTriggered) { // We must keep reading if ET is enabled
pipeline.fireExceptionCaught(t);
} else {
throw t;
}
}
} while (allocHandle.continueReading()); } while (allocHandle.continueReading());
} catch (Throwable t) { } catch (Throwable t) {
exception = t; exception = t;
@ -136,6 +128,7 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
if (exception != null) { if (exception != null) {
pipeline.fireExceptionCaught(exception); pipeline.fireExceptionCaught(exception);
checkResetEpollIn(edgeTriggered);
} }
} finally { } finally {
// Check if there is a readPending which was not processed yet. // Check if there is a readPending which was not processed yet.

View File

@ -587,7 +587,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
} }
class EpollStreamUnsafe extends AbstractEpollUnsafe { class EpollStreamUnsafe extends AbstractEpollUnsafe {
private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) { private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) {
if (byteBuf != null) { if (byteBuf != null) {
if (byteBuf.isReadable()) { if (byteBuf.isReadable()) {
readPending = false; readPending = false;
@ -601,9 +601,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
pipeline.fireExceptionCaught(cause); pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) { if (close || cause instanceof IOException) {
shutdownInput(); shutdownInput();
return true;
} }
return false;
} }
@Override @Override
@ -769,48 +767,35 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
boolean close = false; boolean close = false;
try { try {
do { do {
try { SpliceInTask spliceTask = spliceQueue.peek();
SpliceInTask spliceTask = spliceQueue.peek(); if (spliceTask != null) {
if (spliceTask != null) { if (spliceTask.spliceIn(allocHandle)) {
if (spliceTask.spliceIn(allocHandle)) { // We need to check if it is still active as if not we removed all SpliceTasks in
// We need to check if it is still active as if not we removed all SpliceTasks in // doClose(...)
// doClose(...) if (isActive()) {
if (isActive()) { spliceQueue.remove();
spliceQueue.remove();
}
continue;
} else {
break;
} }
} continue;
} else {
// we use a direct buffer here as the native implementations only be able
// to handle direct buffers.
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read, release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
break; break;
} }
readPending = false;
allocHandle.incMessagesRead(1);
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} catch (Throwable t) {
if (edgeTriggered) { // We must keep reading if ET is enabled
if (byteBuf != null) {
byteBuf.release();
byteBuf = null;
}
pipeline.fireExceptionCaught(t);
} else {
// byteBuf is release in outer exception handling if necessary.
throw t;
}
} }
// we use a direct buffer here as the native implementations only be able
// to handle direct buffers.
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read, release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
break;
}
readPending = false;
allocHandle.incMessagesRead(1);
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading()); } while (allocHandle.continueReading());
allocHandle.readComplete(); allocHandle.readComplete();
@ -821,17 +806,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
close = false; close = false;
} }
} catch (Throwable t) { } catch (Throwable t) {
boolean closed = handleReadException(pipeline, byteBuf, t, close); handleReadException(pipeline, byteBuf, t, close);
if (!closed) { checkResetEpollIn(edgeTriggered);
// trigger a read again as there may be something left to read and because of epoll ET we
// will not get notified again until we read everything from the socket
eventLoop().execute(new OneTimeTask() {
@Override
public void run() {
epollInReady();
}
});
}
} finally { } finally {
// Check if there is a readPending which was not processed yet. // Check if there is a readPending which was not processed yet.
// This could be for two reasons: // This could be for two reasons:

View File

@ -535,9 +535,9 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
Throwable exception = null; Throwable exception = null;
try { try {
do { ByteBuf data = null;
ByteBuf data = null; try {
try { do {
data = allocHandle.allocate(allocator); data = allocHandle.allocate(allocator);
allocHandle.attemptedBytesRead(data.writableBytes()); allocHandle.attemptedBytesRead(data.writableBytes());
final DatagramSocketAddress remoteAddress; final DatagramSocketAddress remoteAddress;
@ -564,21 +564,14 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
readBuf.add(new DatagramPacket(data, (InetSocketAddress) localAddress(), remoteAddress)); readBuf.add(new DatagramPacket(data, (InetSocketAddress) localAddress(), remoteAddress));
data = null; data = null;
} catch (Throwable t) { } while (allocHandle.continueReading());
if (data != null) { } catch (Throwable t) {
data.release(); if (data != null) {
data = null; data.release();
} data = null;
if (edgeTriggered) {
// We do not break from the loop here and remember the last exception,
// because we need to consume everything from the socket used with epoll ET.
pipeline.fireExceptionCaught(t);
} else {
exception = t;
break;
}
} }
} while (allocHandle.continueReading()); exception = t;
}
int size = readBuf.size(); int size = readBuf.size();
for (int i = 0; i < size; i ++) { for (int i = 0; i < size; i ++) {
@ -590,6 +583,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
if (exception != null) { if (exception != null) {
pipeline.fireExceptionCaught(exception); pipeline.fireExceptionCaught(exception);
checkResetEpollIn(edgeTriggered);
} }
} finally { } finally {
// Check if there is a readPending which was not processed yet. // Check if there is a readPending which was not processed yet.

View File

@ -157,16 +157,7 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i
readPending = false; readPending = false;
allocHandle.incMessagesRead(1); allocHandle.incMessagesRead(1);
try { pipeline.fireChannelRead(new FileDescriptor(socketFd));
pipeline.fireChannelRead(new FileDescriptor(socketFd));
} catch (Throwable t) {
// If ET is enabled we need to consume everything from the socket
if (edgeTriggered) {
pipeline.fireExceptionCaught(t);
} else {
throw t;
}
}
} while (allocHandle.continueReading()); } while (allocHandle.continueReading());
allocHandle.readComplete(); allocHandle.readComplete();
@ -175,14 +166,7 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i
allocHandle.readComplete(); allocHandle.readComplete();
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t); pipeline.fireExceptionCaught(t);
// trigger a read again as there may be something left to read and because of epoll ET we checkResetEpollIn(edgeTriggered);
// will not get notified again until we read everything from the socket
eventLoop().execute(new OneTimeTask() {
@Override
public void run() {
epollInReady();
}
});
} finally { } finally {
// Check if there is a readPending which was not processed yet. // Check if there is a readPending which was not processed yet.
// This could be for two reasons: // This could be for two reasons:

View File

@ -16,12 +16,27 @@
package io.netty.channel.epoll; package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class EpollSocketChannelTest { public class EpollSocketChannelTest {
@ -98,4 +113,97 @@ public class EpollSocketChannelTest {
Assert.assertTrue(info.rcvSpace() >= 0); Assert.assertTrue(info.rcvSpace() >= 0);
Assert.assertTrue(info.totalRetrans() >= 0); Assert.assertTrue(info.totalRetrans() >= 0);
} }
@Test
public void testExceptionHandlingDoesNotInfiniteLoop() throws InterruptedException {
EventLoopGroup group = new EpollEventLoopGroup();
try {
runExceptionHandleFeedbackLoop(group, EpollServerSocketChannel.class, EpollSocketChannel.class,
new InetSocketAddress(0));
runExceptionHandleFeedbackLoop(group, EpollServerDomainSocketChannel.class, EpollDomainSocketChannel.class,
EpollSocketTestPermutation.newSocketAddress());
} finally {
group.shutdownGracefully();
}
}
private void runExceptionHandleFeedbackLoop(EventLoopGroup group, Class<? extends ServerChannel> serverChannelClass,
Class<? extends Channel> channelClass, SocketAddress bindAddr) throws InterruptedException {
Channel serverChannel = null;
Channel clientChannel = null;
try {
MyInitializer serverInitializer = new MyInitializer();
ServerBootstrap sb = new ServerBootstrap();
sb.option(ChannelOption.SO_BACKLOG, 1024);
sb.group(group)
.channel(serverChannelClass)
.childHandler(serverInitializer);
serverChannel = sb.bind(bindAddr).syncUninterruptibly().channel();
Bootstrap b = new Bootstrap();
b.group(group);
b.channel(channelClass);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.remoteAddress(serverChannel.localAddress());
b.handler(new MyInitializer());
clientChannel = b.connect().syncUninterruptibly().channel();
clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[1024]));
// We expect to get 2 exceptions (1 from BuggyChannelHandler and 1 from ExceptionHandler).
assertTrue(serverInitializer.exceptionHandler.latch1.await(2, TimeUnit.SECONDS));
// After we get the first exception, we should get no more, this is expected to timeout.
assertFalse("Encountered " + serverInitializer.exceptionHandler.count.get() +
" exceptions when 1 was expected",
serverInitializer.exceptionHandler.latch2.await(2, TimeUnit.SECONDS));
} finally {
if (serverChannel != null) {
serverChannel.close().syncUninterruptibly();
}
if (clientChannel != null) {
clientChannel.close().syncUninterruptibly();
}
}
}
private static class MyInitializer extends ChannelInitializer<Channel> {
final ExceptionHandler exceptionHandler = new ExceptionHandler();
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new BuggyChannelHandler());
pipeline.addLast(exceptionHandler);
}
}
private static class BuggyChannelHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
throw new NullPointerException("I am a bug!");
}
}
private static class ExceptionHandler extends ChannelInboundHandlerAdapter {
final AtomicLong count = new AtomicLong();
/**
* We expect to get 2 calls to {@link #exceptionCaught(ChannelHandlerContext, Throwable)}.
* 1 call from BuggyChannelHandler and 1 from closing the channel in this class.
*/
final CountDownLatch latch1 = new CountDownLatch(2);
final CountDownLatch latch2 = new CountDownLatch(1);
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (count.incrementAndGet() <= 2) {
latch1.countDown();
} else {
latch2.countDown();
}
// This is expected to throw an exception!
ctx.close();
}
}
} }