From f398f2f7b5bd40a78ca8ddf71cdbc5114c6b13a0 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Wed, 31 Dec 2014 17:30:56 +0900 Subject: [PATCH] Fire channelReadComplete() in EpollDatagramChannel Related: #3274 Motivation: channelReadComplete() event is not triggered after reading successfully in EpollDatagramChannel. Modifications: - Trigger exceptionCaught() event for read failure only once for less noise - Trigger channelReadComplete() event at the end of the read. Result: Fix #3274 --- .../channel/epoll/EpollDatagramChannel.java | 30 +++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index 3cf23eda92..c1ba061e18 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -40,6 +40,8 @@ import java.net.SocketAddress; import java.net.SocketException; import java.nio.ByteBuffer; import java.nio.channels.NotYetConnectedException; +import java.util.ArrayList; +import java.util.List; /** * {@link DatagramChannel} implementation that uses linux EPOLL Edge-Triggered Mode for @@ -448,6 +450,8 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe { + private final List readBuf = new ArrayList(); + @Override public void connect(SocketAddress remote, SocketAddress local, ChannelPromise channelPromise) { boolean success = false; @@ -482,7 +486,9 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); assert eventLoop().inEventLoop(); + final ChannelPipeline pipeline = pipeline(); + Throwable exception = null; try { for (;;) { ByteBuf data = null; @@ -508,19 +514,33 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements data.writerIndex(data.writerIndex() + readBytes); allocHandle.record(readBytes); readPending = false; - pipeline.fireChannelRead( - new DatagramPacket(data, (InetSocketAddress) localAddress(), remoteAddress)); + + readBuf.add(new DatagramPacket(data, (InetSocketAddress) localAddress(), remoteAddress)); data = null; } catch (Throwable t) { - // keep on reading as we use epoll ET and need to consume everything from the socket - pipeline.fireChannelReadComplete(); - pipeline.fireExceptionCaught(t); + // We do not break from the loop here and remember the last exception, + // because we need to consume everything from the socket used with epoll ET. + exception = t; } finally { if (data != null) { data.release(); } } } + + int size = readBuf.size(); + for (int i = 0; i < size; i ++) { + pipeline.fireChannelRead(readBuf.get(i)); + } + + readBuf.clear(); + pipeline.fireChannelReadComplete(); + + if (exception != null) { + pipeline.fireExceptionCaught(exception); + } + + pipeline.fireChannelReadComplete(); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: