[#2254] Fix regression in handling autoRead and Channel.read()

This regression was introduced by e0b39159657c9eb711047bc32367537c4870d467
This commit is contained in:
Norman Maurer 2014-02-21 08:39:41 +01:00
parent 1884a5697c
commit 78db65d0fb
13 changed files with 402 additions and 126 deletions

View File

@ -64,7 +64,16 @@ public class SocketEchoTest extends AbstractSocketTest {
}
public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, false, false);
testSimpleEcho0(sb, cb, false, false, true);
}
@Test(timeout = 30000)
public void testSimpleEchoNotAutoRead() throws Throwable {
run();
}
public void testSimpleEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, false, false, false);
}
@Test//(timeout = 30000)
@ -73,7 +82,16 @@ public class SocketEchoTest extends AbstractSocketTest {
}
public void testSimpleEchoWithAdditionalExecutor(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, true, false);
testSimpleEcho0(sb, cb, true, false, true);
}
@Test//(timeout = 30000)
public void testSimpleEchoWithAdditionalExecutorNotAutoRead() throws Throwable {
run();
}
public void testSimpleEchoWithAdditionalExecutorNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, true, false, false);
}
@Test//(timeout = 30000)
@ -82,7 +100,16 @@ public class SocketEchoTest extends AbstractSocketTest {
}
public void testSimpleEchoWithVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, false, true);
testSimpleEcho0(sb, cb, false, true, true);
}
@Test//(timeout = 30000)
public void testSimpleEchoWithVoidPromiseNotAutoRead() throws Throwable {
run();
}
public void testSimpleEchoWithVoidPromiseNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, false, true, false);
}
@Test(timeout = 30000)
@ -91,15 +118,15 @@ public class SocketEchoTest extends AbstractSocketTest {
}
public void testSimpleEchoWithAdditionalExecutorAndVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, true, true);
testSimpleEcho0(sb, cb, true, true, true);
}
private static void testSimpleEcho0(
ServerBootstrap sb, Bootstrap cb, boolean additionalExecutor, boolean voidPromise)
ServerBootstrap sb, Bootstrap cb, boolean additionalExecutor, boolean voidPromise, boolean autoRead)
throws Throwable {
final EchoHandler sh = new EchoHandler();
final EchoHandler ch = new EchoHandler();
final EchoHandler sh = new EchoHandler(autoRead);
final EchoHandler ch = new EchoHandler(autoRead);
if (additionalExecutor) {
sb.childHandler(new ChannelInitializer<SocketChannel>() {
@ -188,10 +215,15 @@ public class SocketEchoTest extends AbstractSocketTest {
}
private static class EchoHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final boolean autoRead;
volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter;
EchoHandler(boolean autoRead) {
this.autoRead = autoRead;
}
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
@ -217,7 +249,13 @@ public class SocketEchoTest extends AbstractSocketTest {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
try {
ctx.flush();
} finally {
if (!autoRead) {
ctx.read();
}
}
}
@Override

View File

@ -49,20 +49,39 @@ public class SocketFileRegionTest extends AbstractSocketTest {
run();
}
@Test
public void testFileRegionNotAutoRead() throws Throwable {
run();
}
@Test
public void testFileRegionVoidPromise() throws Throwable {
run();
}
@Test
public void testFileRegionVoidPromiseNotAutoRead() throws Throwable {
run();
}
public void testFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testFileRegion0(sb, cb, false);
testFileRegion0(sb, cb, false, true);
}
public void testFileRegionVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testFileRegion0(sb, cb, true);
testFileRegion0(sb, cb, true, true);
}
private static void testFileRegion0(ServerBootstrap sb, Bootstrap cb, boolean voidPromise) throws Throwable {
public void testFileRegionNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testFileRegion0(sb, cb, false, false);
}
public void testFileRegionVoidPromiseNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testFileRegion0(sb, cb, true, false);
}
private static void testFileRegion0(
ServerBootstrap sb, Bootstrap cb, boolean voidPromise, final boolean autoRead) throws Throwable {
File file = File.createTempFile("netty-", ".tmp");
file.deleteOnExit();
@ -71,6 +90,13 @@ public class SocketFileRegionTest extends AbstractSocketTest {
out.close();
ChannelInboundHandler ch = new SimpleChannelInboundHandler<Object>() {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (!autoRead) {
ctx.read();
}
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
}
@ -80,7 +106,7 @@ public class SocketFileRegionTest extends AbstractSocketTest {
ctx.close();
}
};
TestHandler sh = new TestHandler();
TestHandler sh = new TestHandler(autoRead);
sb.childHandler(sh);
cb.handler(ch);
@ -120,10 +146,15 @@ public class SocketFileRegionTest extends AbstractSocketTest {
}
private static class TestHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final boolean autoRead;
volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter;
TestHandler(boolean autoRead) {
this.autoRead = autoRead;
}
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
@ -142,6 +173,13 @@ public class SocketFileRegionTest extends AbstractSocketTest {
counter += actual.length;
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (!autoRead) {
ctx.read();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {

View File

@ -47,9 +47,22 @@ public class SocketFixedLengthEchoTest extends AbstractSocketTest {
run();
}
@Test
public void testFixedLengthEchoNotAutoRead() throws Throwable {
run();
}
public void testFixedLengthEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
final EchoHandler sh = new EchoHandler();
final EchoHandler ch = new EchoHandler();
testFixedLengthEcho(sb, cb, true);
}
public void testFixedLengthEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testFixedLengthEcho(sb, cb, false);
}
private static void testFixedLengthEcho(ServerBootstrap sb, Bootstrap cb, boolean autoRead) throws Throwable {
final EchoHandler sh = new EchoHandler(autoRead);
final EchoHandler ch = new EchoHandler(autoRead);
sb.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
@ -124,10 +137,15 @@ public class SocketFixedLengthEchoTest extends AbstractSocketTest {
}
private static class EchoHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final boolean autoRead;
volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter;
EchoHandler(boolean autoRead) {
this.autoRead = autoRead;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channel = ctx.channel();
@ -154,7 +172,13 @@ public class SocketFixedLengthEchoTest extends AbstractSocketTest {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
try {
ctx.flush();
} finally {
if (!autoRead) {
ctx.read();
}
}
}
@Override

View File

@ -46,7 +46,16 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
}
public void testGatheringWrite(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testGatheringWrite0(sb, cb, false);
testGatheringWrite0(sb, cb, false, true);
}
@Test(timeout = 30000)
public void testGatheringWriteNotAutoRead() throws Throwable {
run();
}
public void testGatheringWriteNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testGatheringWrite0(sb, cb, false, false);
}
@Test(timeout = 30000)
@ -54,13 +63,23 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
run();
}
public void testGatheringWriteWithComposite(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testGatheringWrite0(sb, cb, true);
public void testGatheringWriteWithCompositeNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testGatheringWrite0(sb, cb, true, false);
}
private static void testGatheringWrite0(ServerBootstrap sb, Bootstrap cb, boolean composite) throws Throwable {
final TestHandler sh = new TestHandler();
final TestHandler ch = new TestHandler();
@Test(timeout = 30000)
public void testGatheringWriteWithCompositeNotAutoRead() throws Throwable {
run();
}
public void testGatheringWriteWithComposite(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testGatheringWrite0(sb, cb, true, true);
}
private static void testGatheringWrite0(
ServerBootstrap sb, Bootstrap cb, boolean composite, boolean autoRead) throws Throwable {
final TestHandler sh = new TestHandler(autoRead);
final TestHandler ch = new TestHandler(autoRead);
cb.handler(ch);
sb.childHandler(sh);
@ -121,10 +140,16 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
}
private static class TestHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final boolean autoRead;
volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter;
final ByteBuf received = Unpooled.buffer();
TestHandler(boolean autoRead) {
this.autoRead = autoRead;
}
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
@ -137,6 +162,13 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
received.writeBytes(in);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (!autoRead) {
ctx.read();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {

View File

@ -56,8 +56,21 @@ public class SocketObjectEchoTest extends AbstractSocketTest {
}
public void testObjectEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
final EchoHandler sh = new EchoHandler();
final EchoHandler ch = new EchoHandler();
testObjectEcho(sb, cb, true);
}
@Test
public void testObjectEchoNotAutoRead() throws Throwable {
run();
}
public void testObjectEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testObjectEcho(sb, cb, false);
}
private static void testObjectEcho(ServerBootstrap sb, Bootstrap cb, boolean autoRead) throws Throwable {
final EchoHandler sh = new EchoHandler(autoRead);
final EchoHandler ch = new EchoHandler(autoRead);
sb.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
@ -134,10 +147,15 @@ public class SocketObjectEchoTest extends AbstractSocketTest {
}
private static class EchoHandler extends ChannelInboundHandlerAdapter {
private final boolean autoRead;
volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter;
EchoHandler(boolean autoRead) {
this.autoRead = autoRead;
}
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
@ -157,7 +175,13 @@ public class SocketObjectEchoTest extends AbstractSocketTest {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
try {
ctx.flush();
} finally {
if (!autoRead) {
ctx.read();
}
}
}
@Override

View File

@ -145,16 +145,28 @@ public class SocketSpdyEchoTest extends AbstractSocketTest {
return frames;
}
private SpdyVersion version;
@Test(timeout = 15000)
public void testSpdyEcho() throws Throwable {
version = SpdyVersion.SPDY_3_1;
logger.info("Testing against SPDY v3.1");
run();
}
public void testSpdyEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
logger.info("Testing against SPDY v3.1");
testSpdyEcho(sb, cb, SpdyVersion.SPDY_3_1, true);
}
@Test(timeout = 15000)
public void testSpdyEchoNotAutoRead() throws Throwable {
run();
}
public void testSpdyEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
logger.info("Testing against SPDY v3.1");
testSpdyEcho(sb, cb, SpdyVersion.SPDY_3_1, false);
}
private static void testSpdyEcho(
ServerBootstrap sb, Bootstrap cb, final SpdyVersion version, boolean autoRead) throws Throwable {
ByteBuf frames;
switch (version) {
@ -165,8 +177,8 @@ public class SocketSpdyEchoTest extends AbstractSocketTest {
throw new IllegalArgumentException("unknown version");
}
final SpdyEchoTestServerHandler sh = new SpdyEchoTestServerHandler();
final SpdyEchoTestClientHandler ch = new SpdyEchoTestClientHandler(frames.copy());
final SpdyEchoTestServerHandler sh = new SpdyEchoTestServerHandler(autoRead);
final SpdyEchoTestClientHandler ch = new SpdyEchoTestClientHandler(frames.copy(), autoRead);
sb.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
@ -216,8 +228,13 @@ public class SocketSpdyEchoTest extends AbstractSocketTest {
}
private static class SpdyEchoTestServerHandler extends ChannelInboundHandlerAdapter {
private final boolean autoRead;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
SpdyEchoTestServerHandler(boolean autoRead) {
this.autoRead = autoRead;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
@ -225,7 +242,13 @@ public class SocketSpdyEchoTest extends AbstractSocketTest {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
try {
ctx.flush();
} finally {
if (!autoRead) {
ctx.read();
}
}
}
@Override
@ -237,12 +260,14 @@ public class SocketSpdyEchoTest extends AbstractSocketTest {
}
private static class SpdyEchoTestClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final boolean autoRead;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
final ByteBuf frames;
volatile int counter;
SpdyEchoTestClientHandler(ByteBuf frames) {
SpdyEchoTestClientHandler(ByteBuf frames, boolean autoRead) {
this.frames = frames;
this.autoRead = autoRead;
}
@Override
@ -264,5 +289,12 @@ public class SocketSpdyEchoTest extends AbstractSocketTest {
ctx.close();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (!autoRead) {
ctx.read();
}
}
}
}

View File

@ -91,9 +91,22 @@ public class SocketSslEchoTest extends AbstractSocketTest {
}
public void testSslEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSslEcho(sb, cb, true);
}
@Test
public void testSslEchoNotAutoRead() throws Throwable {
run();
}
public void testSslEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSslEcho(sb, cb, false);
}
private void testSslEcho(ServerBootstrap sb, Bootstrap cb, boolean autoRead) throws Throwable {
final ExecutorService delegatedTaskExecutor = Executors.newCachedThreadPool();
final EchoHandler sh = new EchoHandler(true, useCompositeByteBuf);
final EchoHandler ch = new EchoHandler(false, useCompositeByteBuf);
final EchoHandler sh = new EchoHandler(true, useCompositeByteBuf, autoRead);
final EchoHandler ch = new EchoHandler(false, useCompositeByteBuf, autoRead);
final SSLEngine sse = BogusSslContextFactory.getServerContext().createSSLEngine();
final SSLEngine cse = BogusSslContextFactory.getClientContext().createSSLEngine();
@ -208,10 +221,12 @@ public class SocketSslEchoTest extends AbstractSocketTest {
volatile int counter;
private final boolean server;
private final boolean composite;
private final boolean autoRead;
EchoHandler(boolean server, boolean composite) {
EchoHandler(boolean server, boolean composite, boolean autoRead) {
this.server = server;
this.composite = composite;
this.autoRead = autoRead;
}
@Override
@ -243,7 +258,13 @@ public class SocketSslEchoTest extends AbstractSocketTest {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
try {
ctx.flush();
} finally {
if (!autoRead) {
ctx.read();
}
}
}
@Override

View File

@ -64,12 +64,25 @@ public class SocketStartTlsTest extends AbstractSocketTest {
}
public void testStartTls(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testStartTls(sb, cb, true);
}
@Test(timeout = 30000)
public void testStartTlsNotAutoRead() throws Throwable {
run();
}
public void testStartTlsNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testStartTls(sb, cb, false);
}
private void testStartTls(ServerBootstrap sb, Bootstrap cb, boolean autoRead) throws Throwable {
final EventExecutorGroup executor = SocketStartTlsTest.executor;
final SSLEngine sse = BogusSslContextFactory.getServerContext().createSSLEngine();
final SSLEngine cse = BogusSslContextFactory.getClientContext().createSSLEngine();
final StartTlsServerHandler sh = new StartTlsServerHandler(sse);
final StartTlsClientHandler ch = new StartTlsClientHandler(cse);
final StartTlsServerHandler sh = new StartTlsServerHandler(sse, autoRead);
final StartTlsClientHandler ch = new StartTlsClientHandler(cse, autoRead);
sb.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
@ -144,12 +157,14 @@ public class SocketStartTlsTest extends AbstractSocketTest {
private class StartTlsClientHandler extends SimpleChannelInboundHandler<String> {
private final SslHandler sslHandler;
private final boolean autoRead;
private Future<Channel> handshakeFuture;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
StartTlsClientHandler(SSLEngine engine) {
StartTlsClientHandler(SSLEngine engine, boolean autoRead) {
engine.setUseClientMode(true);
sslHandler = new SslHandler(engine);
this.autoRead = autoRead;
}
@Override
@ -173,6 +188,13 @@ public class SocketStartTlsTest extends AbstractSocketTest {
ctx.close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (!autoRead) {
ctx.read();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {
@ -187,12 +209,14 @@ public class SocketStartTlsTest extends AbstractSocketTest {
private class StartTlsServerHandler extends SimpleChannelInboundHandler<String> {
private final SslHandler sslHandler;
private final boolean autoRead;
volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
StartTlsServerHandler(SSLEngine engine) {
StartTlsServerHandler(SSLEngine engine, boolean autoRead) {
engine.setUseClientMode(false);
sslHandler = new SslHandler(engine, true);
this.autoRead = autoRead;
}
@Override
@ -212,6 +236,13 @@ public class SocketStartTlsTest extends AbstractSocketTest {
ctx.writeAndFlush("EncryptedResponse\n");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (!autoRead) {
ctx.read();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {

View File

@ -58,8 +58,21 @@ public class SocketStringEchoTest extends AbstractSocketTest {
}
public void testStringEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
final StringEchoHandler sh = new StringEchoHandler();
final StringEchoHandler ch = new StringEchoHandler();
testStringEcho(sb, cb, true);
}
@Test
public void testStringEchoNotAutoRead() throws Throwable {
run();
}
public void testStringEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testStringEcho(sb, cb, false);
}
private static void testStringEcho(ServerBootstrap sb, Bootstrap cb, boolean autoRead) throws Throwable {
final StringEchoHandler sh = new StringEchoHandler(autoRead);
final StringEchoHandler ch = new StringEchoHandler(autoRead);
sb.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
@ -136,10 +149,15 @@ public class SocketStringEchoTest extends AbstractSocketTest {
}
static class StringEchoHandler extends SimpleChannelInboundHandler<String> {
private final boolean autoRead;
volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter;
StringEchoHandler(boolean autoRead) {
this.autoRead = autoRead;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channel = ctx.channel();
@ -159,7 +177,13 @@ public class SocketStringEchoTest extends AbstractSocketTest {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
try {
ctx.flush();
} finally {
if (!autoRead) {
ctx.read();
}
}
}
@Override

View File

@ -88,36 +88,36 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme
@Override
void epollInReady() {
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
Throwable exception = null;
try {
final ChannelPipeline pipeline = pipeline();
Throwable exception = null;
try {
for (;;) {
int socketFd = Native.accept(fd);
if (socketFd == -1) {
// this means everything was handled for now
break;
}
try {
pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this, socketFd));
} catch (Throwable t) {
// keep on reading as we use epoll ET and need to consume everything from the socket
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t);
}
for (;;) {
int socketFd = Native.accept(fd);
if (socketFd == -1) {
// this means everything was handled for now
break;
}
try {
pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this, socketFd));
} catch (Throwable t) {
// keep on reading as we use epoll ET and need to consume everything from the socket
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t);
}
} catch (Throwable t) {
exception = t;
}
pipeline.fireChannelReadComplete();
} catch (Throwable t) {
exception = t;
}
// This must be triggered before the channelReadComplete() to give the user the chance
// to call Channel.read() again.
// See https://github.com/netty/netty/issues/2254
if (!config().isAutoRead()) {
clearEpollIn();
}
pipeline.fireChannelReadComplete();
if (exception != null) {
pipeline.fireExceptionCaught(exception);
}
} finally {
if (!config().isAutoRead()) {
clearEpollIn();
}
if (exception != null) {
pipeline.fireExceptionCaught(exception);
}
}
}

View File

@ -580,6 +580,12 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
break;
}
}
// This must be triggered before the channelReadComplete() to give the user the chance
// to call Channel.read() again.
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead()) {
clearEpollIn();
}
pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);
@ -600,10 +606,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
}
});
}
} finally {
if (!config.isAutoRead()) {
clearEpollIn();
}
}
}
}

View File

@ -83,13 +83,19 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
}
}
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) {
private void handleReadException(ChannelPipeline pipeline, ChannelConfig config,
ByteBuf byteBuf, Throwable cause, boolean close) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
// This must be triggered before the channelReadComplete() to give the user the chance
// to call Channel.read() again.
if (!config.isAutoRead()) {
removeReadOp();
}
}
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
@ -148,6 +154,12 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
break;
}
} while (++ messages < maxMessagesPerRead);
// This must be triggered before the channelReadComplete() to give the user the chance
// to call Channel.read() again.
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead()) {
removeReadOp();
}
pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);
@ -157,11 +169,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
close = false;
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close);
} finally {
if (!config.isAutoRead()) {
removeReadOp();
}
handleReadException(pipeline, config, byteBuf, t, close);
}
}
}

View File

@ -61,60 +61,62 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
final ChannelPipeline pipeline = pipeline();
boolean closed = false;
Throwable exception = null;
try {
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
final ChannelPipeline pipeline = pipeline();
boolean closed = false;
Throwable exception = null;
try {
for (;;) {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
// stop reading and remove op
if (!config.isAutoRead()) {
break;
}
if (readBuf.size() >= maxMessagesPerRead) {
break;
}
for (;;) {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
pipeline.fireChannelReadComplete();
if (exception != null) {
if (exception instanceof IOException) {
// ServerChannel should not be closed even on IOException because it can often continue
// accepting incoming connections. (e.g. too many open files)
closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
if (localRead < 0) {
closed = true;
break;
}
pipeline.fireExceptionCaught(exception);
}
// stop reading and remove op
if (!config.isAutoRead()) {
break;
}
if (closed) {
if (isOpen()) {
close(voidPromise());
if (readBuf.size() >= maxMessagesPerRead) {
break;
}
}
} finally {
if (!config().isAutoRead()) {
removeReadOp();
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
// This must be triggered before the channelReadComplete() to give the user the chance
// to call Channel.read() again.
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead()) {
removeReadOp();
}
readBuf.clear();
pipeline.fireChannelReadComplete();
if (exception != null) {
if (exception instanceof IOException) {
// ServerChannel should not be closed even on IOException because it can often continue
// accepting incoming connections. (e.g. too many open files)
closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
}
pipeline.fireExceptionCaught(exception);
}
if (closed) {
if (isOpen()) {
close(voidPromise());
}
}
}