Fire channelReadComplete() in EpollDatagramChannel

Related: #3274

Motivation:

channelReadComplete() event is not triggered after reading successfully
in EpollDatagramChannel.

Modifications:

- Trigger exceptionCaught() event for read failure only once for less
  noise
- Trigger channelReadComplete() event at the end of the read.

Result:

Fix #3274
This commit is contained in:
Trustin Lee 2014-12-31 17:30:56 +09:00
parent 1173699dff
commit f398f2f7b5

View File

@ -40,6 +40,8 @@ import java.net.SocketAddress;
import java.net.SocketException; import java.net.SocketException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.NotYetConnectedException; import java.nio.channels.NotYetConnectedException;
import java.util.ArrayList;
import java.util.List;
/** /**
* {@link DatagramChannel} implementation that uses linux EPOLL Edge-Triggered Mode for * {@link DatagramChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
@ -448,6 +450,8 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe { final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override @Override
public void connect(SocketAddress remote, SocketAddress local, ChannelPromise channelPromise) { public void connect(SocketAddress remote, SocketAddress local, ChannelPromise channelPromise) {
boolean success = false; boolean success = false;
@ -482,7 +486,9 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
assert eventLoop().inEventLoop(); assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
Throwable exception = null;
try { try {
for (;;) { for (;;) {
ByteBuf data = null; ByteBuf data = null;
@ -508,19 +514,33 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
data.writerIndex(data.writerIndex() + readBytes); data.writerIndex(data.writerIndex() + readBytes);
allocHandle.record(readBytes); allocHandle.record(readBytes);
readPending = false; readPending = false;
pipeline.fireChannelRead(
new DatagramPacket(data, (InetSocketAddress) localAddress(), remoteAddress)); readBuf.add(new DatagramPacket(data, (InetSocketAddress) localAddress(), remoteAddress));
data = null; data = null;
} catch (Throwable t) { } catch (Throwable t) {
// keep on reading as we use epoll ET and need to consume everything from the socket // We do not break from the loop here and remember the last exception,
pipeline.fireChannelReadComplete(); // because we need to consume everything from the socket used with epoll ET.
pipeline.fireExceptionCaught(t); exception = t;
} finally { } finally {
if (data != null) { if (data != null) {
data.release(); data.release();
} }
} }
} }
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
pipeline.fireChannelReadComplete();
if (exception != null) {
pipeline.fireExceptionCaught(exception);
}
pipeline.fireChannelReadComplete();
} finally { } finally {
// Check if there is a readPending which was not processed yet. // Check if there is a readPending which was not processed yet.
// This could be for two reasons: // This could be for two reasons: