diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index d3faad8d2a..491e79dcaf 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -311,6 +311,23 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann */ abstract void epollInReady(); + /** + * Will schedule a {@link #epollInReady()} call on the event loop if necessary. + * @param edgeTriggered {@code true} if the channel is using ET mode. {@code false} otherwise. + */ + final void checkResetEpollIn(boolean edgeTriggered) { + if (edgeTriggered && !isInputShutdown0()) { + // trigger a read again as there may be something left to read and because of epoll ET we + // will not get notified again until we read everything from the socket + eventLoop().execute(new OneTimeTask() { + @Override + public void run() { + epollInReady(); + } + }); + } + } + /** * Called once EPOLLRDHUP event is ready to be processed */ diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java index eaeded0d16..d277a44158 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java @@ -73,7 +73,7 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception; final class EpollServerSocketUnsafe extends AbstractEpollUnsafe { - // Will hold the remote address after accept(...) was sucesssful. + // Will hold the remote address after accept(...) was successful. // We need 24 bytes for the address as maximum + 1 byte for storing the length. // So use 26 bytes as it's a power of two. private final byte[] acceptedAddress = new byte[26]; @@ -117,16 +117,8 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im readPending = false; allocHandle.incMessagesRead(1); - try { - int len = acceptedAddress[0]; - pipeline.fireChannelRead(newChildChannel(socketFd, acceptedAddress, 1, len)); - } catch (Throwable t) { - if (edgeTriggered) { // We must keep reading if ET is enabled - pipeline.fireExceptionCaught(t); - } else { - throw t; - } - } + int len = acceptedAddress[0]; + pipeline.fireChannelRead(newChildChannel(socketFd, acceptedAddress, 1, len)); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; @@ -136,6 +128,7 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im if (exception != null) { pipeline.fireExceptionCaught(exception); + checkResetEpollIn(edgeTriggered); } } finally { // Check if there is a readPending which was not processed yet. diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index 1747fb2305..50fb2a5df4 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -587,7 +587,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { } class EpollStreamUnsafe extends AbstractEpollUnsafe { - private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) { + private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) { if (byteBuf != null) { if (byteBuf.isReadable()) { readPending = false; @@ -601,9 +601,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { pipeline.fireExceptionCaught(cause); if (close || cause instanceof IOException) { shutdownInput(); - return true; } - return false; } @Override @@ -769,48 +767,35 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { boolean close = false; try { do { - try { - SpliceInTask spliceTask = spliceQueue.peek(); - if (spliceTask != null) { - if (spliceTask.spliceIn(allocHandle)) { - // We need to check if it is still active as if not we removed all SpliceTasks in - // doClose(...) - if (isActive()) { - spliceQueue.remove(); - } - continue; - } else { - break; + SpliceInTask spliceTask = spliceQueue.peek(); + if (spliceTask != null) { + if (spliceTask.spliceIn(allocHandle)) { + // We need to check if it is still active as if not we removed all SpliceTasks in + // doClose(...) + if (isActive()) { + spliceQueue.remove(); } - } - - // we use a direct buffer here as the native implementations only be able - // to handle direct buffers. - byteBuf = allocHandle.allocate(allocator); - allocHandle.lastBytesRead(doReadBytes(byteBuf)); - if (allocHandle.lastBytesRead() <= 0) { - // nothing was read, release the buffer. - byteBuf.release(); - byteBuf = null; - close = allocHandle.lastBytesRead() < 0; + continue; + } else { break; } - readPending = false; - allocHandle.incMessagesRead(1); - pipeline.fireChannelRead(byteBuf); - byteBuf = null; - } catch (Throwable t) { - if (edgeTriggered) { // We must keep reading if ET is enabled - if (byteBuf != null) { - byteBuf.release(); - byteBuf = null; - } - pipeline.fireExceptionCaught(t); - } else { - // byteBuf is release in outer exception handling if necessary. - throw t; - } } + + // we use a direct buffer here as the native implementations only be able + // to handle direct buffers. + byteBuf = allocHandle.allocate(allocator); + allocHandle.lastBytesRead(doReadBytes(byteBuf)); + if (allocHandle.lastBytesRead() <= 0) { + // nothing was read, release the buffer. + byteBuf.release(); + byteBuf = null; + close = allocHandle.lastBytesRead() < 0; + break; + } + readPending = false; + allocHandle.incMessagesRead(1); + pipeline.fireChannelRead(byteBuf); + byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); @@ -821,17 +806,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { close = false; } } catch (Throwable t) { - boolean closed = handleReadException(pipeline, byteBuf, t, close); - if (!closed) { - // trigger a read again as there may be something left to read and because of epoll ET we - // will not get notified again until we read everything from the socket - eventLoop().execute(new OneTimeTask() { - @Override - public void run() { - epollInReady(); - } - }); - } + handleReadException(pipeline, byteBuf, t, close); + checkResetEpollIn(edgeTriggered); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index 3681024e3a..a9c5e976cf 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -535,9 +535,9 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements Throwable exception = null; try { - do { - ByteBuf data = null; - try { + ByteBuf data = null; + try { + do { data = allocHandle.allocate(allocator); allocHandle.attemptedBytesRead(data.writableBytes()); final DatagramSocketAddress remoteAddress; @@ -564,21 +564,14 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements readBuf.add(new DatagramPacket(data, (InetSocketAddress) localAddress(), remoteAddress)); data = null; - } catch (Throwable t) { - if (data != null) { - data.release(); - data = null; - } - if (edgeTriggered) { - // We do not break from the loop here and remember the last exception, - // because we need to consume everything from the socket used with epoll ET. - pipeline.fireExceptionCaught(t); - } else { - exception = t; - break; - } + } while (allocHandle.continueReading()); + } catch (Throwable t) { + if (data != null) { + data.release(); + data = null; } - } while (allocHandle.continueReading()); + exception = t; + } int size = readBuf.size(); for (int i = 0; i < size; i ++) { @@ -590,6 +583,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements if (exception != null) { pipeline.fireExceptionCaught(exception); + checkResetEpollIn(edgeTriggered); } } finally { // Check if there is a readPending which was not processed yet. diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java index fb94e2a85b..63c130dc03 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java @@ -157,16 +157,7 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i readPending = false; allocHandle.incMessagesRead(1); - try { - pipeline.fireChannelRead(new FileDescriptor(socketFd)); - } catch (Throwable t) { - // If ET is enabled we need to consume everything from the socket - if (edgeTriggered) { - pipeline.fireExceptionCaught(t); - } else { - throw t; - } - } + pipeline.fireChannelRead(new FileDescriptor(socketFd)); } while (allocHandle.continueReading()); allocHandle.readComplete(); @@ -175,14 +166,7 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i allocHandle.readComplete(); pipeline.fireChannelReadComplete(); pipeline.fireExceptionCaught(t); - // trigger a read again as there may be something left to read and because of epoll ET we - // will not get notified again until we read everything from the socket - eventLoop().execute(new OneTimeTask() { - @Override - public void run() { - epollInReady(); - } - }); + checkResetEpollIn(edgeTriggered); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelTest.java index 9637006217..c13ac8f7ee 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelTest.java @@ -16,12 +16,27 @@ package io.netty.channel.epoll; import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; import org.junit.Assert; import org.junit.Test; import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class EpollSocketChannelTest { @@ -98,4 +113,97 @@ public class EpollSocketChannelTest { Assert.assertTrue(info.rcvSpace() >= 0); Assert.assertTrue(info.totalRetrans() >= 0); } + + @Test + public void testExceptionHandlingDoesNotInfiniteLoop() throws InterruptedException { + EventLoopGroup group = new EpollEventLoopGroup(); + try { + runExceptionHandleFeedbackLoop(group, EpollServerSocketChannel.class, EpollSocketChannel.class, + new InetSocketAddress(0)); + runExceptionHandleFeedbackLoop(group, EpollServerDomainSocketChannel.class, EpollDomainSocketChannel.class, + EpollSocketTestPermutation.newSocketAddress()); + } finally { + group.shutdownGracefully(); + } + } + + private void runExceptionHandleFeedbackLoop(EventLoopGroup group, Class serverChannelClass, + Class channelClass, SocketAddress bindAddr) throws InterruptedException { + Channel serverChannel = null; + Channel clientChannel = null; + try { + MyInitializer serverInitializer = new MyInitializer(); + ServerBootstrap sb = new ServerBootstrap(); + sb.option(ChannelOption.SO_BACKLOG, 1024); + sb.group(group) + .channel(serverChannelClass) + .childHandler(serverInitializer); + + serverChannel = sb.bind(bindAddr).syncUninterruptibly().channel(); + + Bootstrap b = new Bootstrap(); + b.group(group); + b.channel(channelClass); + b.option(ChannelOption.SO_KEEPALIVE, true); + b.remoteAddress(serverChannel.localAddress()); + b.handler(new MyInitializer()); + clientChannel = b.connect().syncUninterruptibly().channel(); + + clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[1024])); + + // We expect to get 2 exceptions (1 from BuggyChannelHandler and 1 from ExceptionHandler). + assertTrue(serverInitializer.exceptionHandler.latch1.await(2, TimeUnit.SECONDS)); + + // After we get the first exception, we should get no more, this is expected to timeout. + assertFalse("Encountered " + serverInitializer.exceptionHandler.count.get() + + " exceptions when 1 was expected", + serverInitializer.exceptionHandler.latch2.await(2, TimeUnit.SECONDS)); + } finally { + if (serverChannel != null) { + serverChannel.close().syncUninterruptibly(); + } + if (clientChannel != null) { + clientChannel.close().syncUninterruptibly(); + } + } + } + + private static class MyInitializer extends ChannelInitializer { + final ExceptionHandler exceptionHandler = new ExceptionHandler(); + @Override + protected void initChannel(Channel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + + pipeline.addLast(new BuggyChannelHandler()); + pipeline.addLast(exceptionHandler); + } + } + + private static class BuggyChannelHandler extends ChannelInboundHandlerAdapter { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + throw new NullPointerException("I am a bug!"); + } + } + + private static class ExceptionHandler extends ChannelInboundHandlerAdapter { + final AtomicLong count = new AtomicLong(); + /** + * We expect to get 2 calls to {@link #exceptionCaught(ChannelHandlerContext, Throwable)}. + * 1 call from BuggyChannelHandler and 1 from closing the channel in this class. + */ + final CountDownLatch latch1 = new CountDownLatch(2); + final CountDownLatch latch2 = new CountDownLatch(1); + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + if (count.incrementAndGet() <= 2) { + latch1.countDown(); + } else { + latch2.countDown(); + } + // This is expected to throw an exception! + ctx.close(); + } + } }