Fire channelReadComplete() in EpollDatagramChannel

Related: #3274

Motivation:

channelReadComplete() event is not triggered after reading successfully
in EpollDatagramChannel.

Modifications:

- Trigger exceptionCaught() event for read failure only once for less
  noise
- Trigger channelReadComplete() event at the end of the read.

Result:

Fix #3274
This commit is contained in:
Trustin Lee 2014-12-31 17:30:56 +09:00
parent ab607ed0a6
commit 63536c3d73

View File

@ -40,6 +40,8 @@ import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.NotYetConnectedException;
import java.util.ArrayList;
import java.util.List;
/**
* {@link DatagramChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
@ -447,7 +449,9 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
}
final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe {
private RecvByteBufAllocator.Handle allocHandle;
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void connect(SocketAddress remote, SocketAddress local, ChannelPromise channelPromise) {
@ -486,7 +490,9 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
}
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
Throwable exception = null;
try {
for (;;) {
ByteBuf data = null;
@ -512,19 +518,33 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
data.writerIndex(data.writerIndex() + readBytes);
allocHandle.record(readBytes);
readPending = false;
pipeline.fireChannelRead(
new DatagramPacket(data, (InetSocketAddress) localAddress(), remoteAddress));
readBuf.add(new DatagramPacket(data, (InetSocketAddress) localAddress(), remoteAddress));
data = null;
} catch (Throwable t) {
// keep on reading as we use epoll ET and need to consume everything from the socket
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t);
// We do not break from the loop here and remember the last exception,
// because we need to consume everything from the socket used with epoll ET.
exception = t;
} finally {
if (data != null) {
data.release();
}
}
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
pipeline.fireChannelReadComplete();
if (exception != null) {
pipeline.fireExceptionCaught(exception);
}
pipeline.fireChannelReadComplete();
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons: