[#2254] Fix regression in handling autoRead and Channel.read()

This regression was introduced by e0b39159657c9eb711047bc32367537c4870d467
This commit is contained in:
Norman Maurer 2014-02-21 08:39:41 +01:00
parent 5c27273885
commit 7fe10fe635
13 changed files with 402 additions and 126 deletions

View File

@ -64,7 +64,16 @@ public class SocketEchoTest extends AbstractSocketTest {
} }
public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, false, false); testSimpleEcho0(sb, cb, false, false, true);
}
@Test(timeout = 30000)
public void testSimpleEchoNotAutoRead() throws Throwable {
run();
}
public void testSimpleEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, false, false, false);
} }
@Test//(timeout = 30000) @Test//(timeout = 30000)
@ -73,7 +82,16 @@ public class SocketEchoTest extends AbstractSocketTest {
} }
public void testSimpleEchoWithAdditionalExecutor(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testSimpleEchoWithAdditionalExecutor(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, true, false); testSimpleEcho0(sb, cb, true, false, true);
}
@Test//(timeout = 30000)
public void testSimpleEchoWithAdditionalExecutorNotAutoRead() throws Throwable {
run();
}
public void testSimpleEchoWithAdditionalExecutorNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, true, false, false);
} }
@Test//(timeout = 30000) @Test//(timeout = 30000)
@ -82,7 +100,16 @@ public class SocketEchoTest extends AbstractSocketTest {
} }
public void testSimpleEchoWithVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testSimpleEchoWithVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, false, true); testSimpleEcho0(sb, cb, false, true, true);
}
@Test//(timeout = 30000)
public void testSimpleEchoWithVoidPromiseNotAutoRead() throws Throwable {
run();
}
public void testSimpleEchoWithVoidPromiseNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, false, true, false);
} }
@Test(timeout = 30000) @Test(timeout = 30000)
@ -91,15 +118,15 @@ public class SocketEchoTest extends AbstractSocketTest {
} }
public void testSimpleEchoWithAdditionalExecutorAndVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testSimpleEchoWithAdditionalExecutorAndVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, true, true); testSimpleEcho0(sb, cb, true, true, true);
} }
private static void testSimpleEcho0( private static void testSimpleEcho0(
ServerBootstrap sb, Bootstrap cb, boolean additionalExecutor, boolean voidPromise) ServerBootstrap sb, Bootstrap cb, boolean additionalExecutor, boolean voidPromise, boolean autoRead)
throws Throwable { throws Throwable {
final EchoHandler sh = new EchoHandler(); final EchoHandler sh = new EchoHandler(autoRead);
final EchoHandler ch = new EchoHandler(); final EchoHandler ch = new EchoHandler(autoRead);
if (additionalExecutor) { if (additionalExecutor) {
sb.childHandler(new ChannelInitializer<SocketChannel>() { sb.childHandler(new ChannelInitializer<SocketChannel>() {
@ -188,10 +215,15 @@ public class SocketEchoTest extends AbstractSocketTest {
} }
private static class EchoHandler extends SimpleChannelInboundHandler<ByteBuf> { private static class EchoHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final boolean autoRead;
volatile Channel channel; volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter; volatile int counter;
EchoHandler(boolean autoRead) {
this.autoRead = autoRead;
}
@Override @Override
public void channelActive(ChannelHandlerContext ctx) public void channelActive(ChannelHandlerContext ctx)
throws Exception { throws Exception {
@ -217,7 +249,13 @@ public class SocketEchoTest extends AbstractSocketTest {
@Override @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush(); try {
ctx.flush();
} finally {
if (!autoRead) {
ctx.read();
}
}
} }
@Override @Override

View File

@ -49,20 +49,39 @@ public class SocketFileRegionTest extends AbstractSocketTest {
run(); run();
} }
@Test
public void testFileRegionNotAutoRead() throws Throwable {
run();
}
@Test @Test
public void testFileRegionVoidPromise() throws Throwable { public void testFileRegionVoidPromise() throws Throwable {
run(); run();
} }
@Test
public void testFileRegionVoidPromiseNotAutoRead() throws Throwable {
run();
}
public void testFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testFileRegion0(sb, cb, false); testFileRegion0(sb, cb, false, true);
} }
public void testFileRegionVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testFileRegionVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testFileRegion0(sb, cb, true); testFileRegion0(sb, cb, true, true);
} }
private static void testFileRegion0(ServerBootstrap sb, Bootstrap cb, boolean voidPromise) throws Throwable { public void testFileRegionNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testFileRegion0(sb, cb, false, false);
}
public void testFileRegionVoidPromiseNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testFileRegion0(sb, cb, true, false);
}
private static void testFileRegion0(
ServerBootstrap sb, Bootstrap cb, boolean voidPromise, final boolean autoRead) throws Throwable {
File file = File.createTempFile("netty-", ".tmp"); File file = File.createTempFile("netty-", ".tmp");
file.deleteOnExit(); file.deleteOnExit();
@ -71,6 +90,13 @@ public class SocketFileRegionTest extends AbstractSocketTest {
out.close(); out.close();
ChannelInboundHandler ch = new SimpleChannelInboundHandler<Object>() { ChannelInboundHandler ch = new SimpleChannelInboundHandler<Object>() {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (!autoRead) {
ctx.read();
}
}
@Override @Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
} }
@ -80,7 +106,7 @@ public class SocketFileRegionTest extends AbstractSocketTest {
ctx.close(); ctx.close();
} }
}; };
TestHandler sh = new TestHandler(); TestHandler sh = new TestHandler(autoRead);
sb.childHandler(sh); sb.childHandler(sh);
cb.handler(ch); cb.handler(ch);
@ -120,10 +146,15 @@ public class SocketFileRegionTest extends AbstractSocketTest {
} }
private static class TestHandler extends SimpleChannelInboundHandler<ByteBuf> { private static class TestHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final boolean autoRead;
volatile Channel channel; volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter; volatile int counter;
TestHandler(boolean autoRead) {
this.autoRead = autoRead;
}
@Override @Override
public void channelActive(ChannelHandlerContext ctx) public void channelActive(ChannelHandlerContext ctx)
throws Exception { throws Exception {
@ -142,6 +173,13 @@ public class SocketFileRegionTest extends AbstractSocketTest {
counter += actual.length; counter += actual.length;
} }
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (!autoRead) {
ctx.read();
}
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception { Throwable cause) throws Exception {

View File

@ -47,9 +47,22 @@ public class SocketFixedLengthEchoTest extends AbstractSocketTest {
run(); run();
} }
@Test
public void testFixedLengthEchoNotAutoRead() throws Throwable {
run();
}
public void testFixedLengthEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testFixedLengthEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
final EchoHandler sh = new EchoHandler(); testFixedLengthEcho(sb, cb, true);
final EchoHandler ch = new EchoHandler(); }
public void testFixedLengthEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testFixedLengthEcho(sb, cb, false);
}
private static void testFixedLengthEcho(ServerBootstrap sb, Bootstrap cb, boolean autoRead) throws Throwable {
final EchoHandler sh = new EchoHandler(autoRead);
final EchoHandler ch = new EchoHandler(autoRead);
sb.childHandler(new ChannelInitializer<SocketChannel>() { sb.childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
@ -124,10 +137,15 @@ public class SocketFixedLengthEchoTest extends AbstractSocketTest {
} }
private static class EchoHandler extends SimpleChannelInboundHandler<ByteBuf> { private static class EchoHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final boolean autoRead;
volatile Channel channel; volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter; volatile int counter;
EchoHandler(boolean autoRead) {
this.autoRead = autoRead;
}
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
channel = ctx.channel(); channel = ctx.channel();
@ -154,7 +172,13 @@ public class SocketFixedLengthEchoTest extends AbstractSocketTest {
@Override @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush(); try {
ctx.flush();
} finally {
if (!autoRead) {
ctx.read();
}
}
} }
@Override @Override

View File

@ -46,7 +46,16 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
} }
public void testGatheringWrite(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testGatheringWrite(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testGatheringWrite0(sb, cb, false); testGatheringWrite0(sb, cb, false, true);
}
@Test(timeout = 30000)
public void testGatheringWriteNotAutoRead() throws Throwable {
run();
}
public void testGatheringWriteNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testGatheringWrite0(sb, cb, false, false);
} }
@Test(timeout = 30000) @Test(timeout = 30000)
@ -54,13 +63,23 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
run(); run();
} }
public void testGatheringWriteWithComposite(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testGatheringWriteWithCompositeNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testGatheringWrite0(sb, cb, true); testGatheringWrite0(sb, cb, true, false);
} }
private static void testGatheringWrite0(ServerBootstrap sb, Bootstrap cb, boolean composite) throws Throwable { @Test(timeout = 30000)
final TestHandler sh = new TestHandler(); public void testGatheringWriteWithCompositeNotAutoRead() throws Throwable {
final TestHandler ch = new TestHandler(); run();
}
public void testGatheringWriteWithComposite(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testGatheringWrite0(sb, cb, true, true);
}
private static void testGatheringWrite0(
ServerBootstrap sb, Bootstrap cb, boolean composite, boolean autoRead) throws Throwable {
final TestHandler sh = new TestHandler(autoRead);
final TestHandler ch = new TestHandler(autoRead);
cb.handler(ch); cb.handler(ch);
sb.childHandler(sh); sb.childHandler(sh);
@ -121,10 +140,16 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
} }
private static class TestHandler extends SimpleChannelInboundHandler<ByteBuf> { private static class TestHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final boolean autoRead;
volatile Channel channel; volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter; volatile int counter;
final ByteBuf received = Unpooled.buffer(); final ByteBuf received = Unpooled.buffer();
TestHandler(boolean autoRead) {
this.autoRead = autoRead;
}
@Override @Override
public void channelActive(ChannelHandlerContext ctx) public void channelActive(ChannelHandlerContext ctx)
throws Exception { throws Exception {
@ -137,6 +162,13 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
received.writeBytes(in); received.writeBytes(in);
} }
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (!autoRead) {
ctx.read();
}
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception { Throwable cause) throws Exception {

View File

@ -56,8 +56,21 @@ public class SocketObjectEchoTest extends AbstractSocketTest {
} }
public void testObjectEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testObjectEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
final EchoHandler sh = new EchoHandler(); testObjectEcho(sb, cb, true);
final EchoHandler ch = new EchoHandler(); }
@Test
public void testObjectEchoNotAutoRead() throws Throwable {
run();
}
public void testObjectEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testObjectEcho(sb, cb, false);
}
private static void testObjectEcho(ServerBootstrap sb, Bootstrap cb, boolean autoRead) throws Throwable {
final EchoHandler sh = new EchoHandler(autoRead);
final EchoHandler ch = new EchoHandler(autoRead);
sb.childHandler(new ChannelInitializer<SocketChannel>() { sb.childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
@ -134,10 +147,15 @@ public class SocketObjectEchoTest extends AbstractSocketTest {
} }
private static class EchoHandler extends ChannelInboundHandlerAdapter { private static class EchoHandler extends ChannelInboundHandlerAdapter {
private final boolean autoRead;
volatile Channel channel; volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter; volatile int counter;
EchoHandler(boolean autoRead) {
this.autoRead = autoRead;
}
@Override @Override
public void channelActive(ChannelHandlerContext ctx) public void channelActive(ChannelHandlerContext ctx)
throws Exception { throws Exception {
@ -157,7 +175,13 @@ public class SocketObjectEchoTest extends AbstractSocketTest {
@Override @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush(); try {
ctx.flush();
} finally {
if (!autoRead) {
ctx.read();
}
}
} }
@Override @Override

View File

@ -145,16 +145,28 @@ public class SocketSpdyEchoTest extends AbstractSocketTest {
return frames; return frames;
} }
private SpdyVersion version;
@Test(timeout = 15000) @Test(timeout = 15000)
public void testSpdyEcho() throws Throwable { public void testSpdyEcho() throws Throwable {
version = SpdyVersion.SPDY_3_1;
logger.info("Testing against SPDY v3.1");
run(); run();
} }
public void testSpdyEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testSpdyEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
logger.info("Testing against SPDY v3.1");
testSpdyEcho(sb, cb, SpdyVersion.SPDY_3_1, true);
}
@Test(timeout = 15000)
public void testSpdyEchoNotAutoRead() throws Throwable {
run();
}
public void testSpdyEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
logger.info("Testing against SPDY v3.1");
testSpdyEcho(sb, cb, SpdyVersion.SPDY_3_1, false);
}
private static void testSpdyEcho(
ServerBootstrap sb, Bootstrap cb, final SpdyVersion version, boolean autoRead) throws Throwable {
ByteBuf frames; ByteBuf frames;
switch (version) { switch (version) {
@ -165,8 +177,8 @@ public class SocketSpdyEchoTest extends AbstractSocketTest {
throw new IllegalArgumentException("unknown version"); throw new IllegalArgumentException("unknown version");
} }
final SpdyEchoTestServerHandler sh = new SpdyEchoTestServerHandler(); final SpdyEchoTestServerHandler sh = new SpdyEchoTestServerHandler(autoRead);
final SpdyEchoTestClientHandler ch = new SpdyEchoTestClientHandler(frames.copy()); final SpdyEchoTestClientHandler ch = new SpdyEchoTestClientHandler(frames.copy(), autoRead);
sb.childHandler(new ChannelInitializer<SocketChannel>() { sb.childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
@ -216,8 +228,13 @@ public class SocketSpdyEchoTest extends AbstractSocketTest {
} }
private static class SpdyEchoTestServerHandler extends ChannelInboundHandlerAdapter { private static class SpdyEchoTestServerHandler extends ChannelInboundHandlerAdapter {
private final boolean autoRead;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
SpdyEchoTestServerHandler(boolean autoRead) {
this.autoRead = autoRead;
}
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg); ctx.write(msg);
@ -225,7 +242,13 @@ public class SocketSpdyEchoTest extends AbstractSocketTest {
@Override @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush(); try {
ctx.flush();
} finally {
if (!autoRead) {
ctx.read();
}
}
} }
@Override @Override
@ -237,12 +260,14 @@ public class SocketSpdyEchoTest extends AbstractSocketTest {
} }
private static class SpdyEchoTestClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private static class SpdyEchoTestClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final boolean autoRead;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
final ByteBuf frames; final ByteBuf frames;
volatile int counter; volatile int counter;
SpdyEchoTestClientHandler(ByteBuf frames) { SpdyEchoTestClientHandler(ByteBuf frames, boolean autoRead) {
this.frames = frames; this.frames = frames;
this.autoRead = autoRead;
} }
@Override @Override
@ -264,5 +289,12 @@ public class SocketSpdyEchoTest extends AbstractSocketTest {
ctx.close(); ctx.close();
} }
} }
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (!autoRead) {
ctx.read();
}
}
} }
} }

View File

@ -91,9 +91,22 @@ public class SocketSslEchoTest extends AbstractSocketTest {
} }
public void testSslEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testSslEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSslEcho(sb, cb, true);
}
@Test
public void testSslEchoNotAutoRead() throws Throwable {
run();
}
public void testSslEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSslEcho(sb, cb, false);
}
private void testSslEcho(ServerBootstrap sb, Bootstrap cb, boolean autoRead) throws Throwable {
final ExecutorService delegatedTaskExecutor = Executors.newCachedThreadPool(); final ExecutorService delegatedTaskExecutor = Executors.newCachedThreadPool();
final EchoHandler sh = new EchoHandler(true, useCompositeByteBuf); final EchoHandler sh = new EchoHandler(true, useCompositeByteBuf, autoRead);
final EchoHandler ch = new EchoHandler(false, useCompositeByteBuf); final EchoHandler ch = new EchoHandler(false, useCompositeByteBuf, autoRead);
final SSLEngine sse = BogusSslContextFactory.getServerContext().createSSLEngine(); final SSLEngine sse = BogusSslContextFactory.getServerContext().createSSLEngine();
final SSLEngine cse = BogusSslContextFactory.getClientContext().createSSLEngine(); final SSLEngine cse = BogusSslContextFactory.getClientContext().createSSLEngine();
@ -208,10 +221,12 @@ public class SocketSslEchoTest extends AbstractSocketTest {
volatile int counter; volatile int counter;
private final boolean server; private final boolean server;
private final boolean composite; private final boolean composite;
private final boolean autoRead;
EchoHandler(boolean server, boolean composite) { EchoHandler(boolean server, boolean composite, boolean autoRead) {
this.server = server; this.server = server;
this.composite = composite; this.composite = composite;
this.autoRead = autoRead;
} }
@Override @Override
@ -243,7 +258,13 @@ public class SocketSslEchoTest extends AbstractSocketTest {
@Override @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush(); try {
ctx.flush();
} finally {
if (!autoRead) {
ctx.read();
}
}
} }
@Override @Override

View File

@ -64,12 +64,25 @@ public class SocketStartTlsTest extends AbstractSocketTest {
} }
public void testStartTls(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testStartTls(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testStartTls(sb, cb, true);
}
@Test(timeout = 30000)
public void testStartTlsNotAutoRead() throws Throwable {
run();
}
public void testStartTlsNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testStartTls(sb, cb, false);
}
private void testStartTls(ServerBootstrap sb, Bootstrap cb, boolean autoRead) throws Throwable {
final EventExecutorGroup executor = SocketStartTlsTest.executor; final EventExecutorGroup executor = SocketStartTlsTest.executor;
final SSLEngine sse = BogusSslContextFactory.getServerContext().createSSLEngine(); final SSLEngine sse = BogusSslContextFactory.getServerContext().createSSLEngine();
final SSLEngine cse = BogusSslContextFactory.getClientContext().createSSLEngine(); final SSLEngine cse = BogusSslContextFactory.getClientContext().createSSLEngine();
final StartTlsServerHandler sh = new StartTlsServerHandler(sse); final StartTlsServerHandler sh = new StartTlsServerHandler(sse, autoRead);
final StartTlsClientHandler ch = new StartTlsClientHandler(cse); final StartTlsClientHandler ch = new StartTlsClientHandler(cse, autoRead);
sb.childHandler(new ChannelInitializer<SocketChannel>() { sb.childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
@ -144,12 +157,14 @@ public class SocketStartTlsTest extends AbstractSocketTest {
private class StartTlsClientHandler extends SimpleChannelInboundHandler<String> { private class StartTlsClientHandler extends SimpleChannelInboundHandler<String> {
private final SslHandler sslHandler; private final SslHandler sslHandler;
private final boolean autoRead;
private Future<Channel> handshakeFuture; private Future<Channel> handshakeFuture;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
StartTlsClientHandler(SSLEngine engine) { StartTlsClientHandler(SSLEngine engine, boolean autoRead) {
engine.setUseClientMode(true); engine.setUseClientMode(true);
sslHandler = new SslHandler(engine); sslHandler = new SslHandler(engine);
this.autoRead = autoRead;
} }
@Override @Override
@ -173,6 +188,13 @@ public class SocketStartTlsTest extends AbstractSocketTest {
ctx.close(); ctx.close();
} }
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (!autoRead) {
ctx.read();
}
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception { Throwable cause) throws Exception {
@ -187,12 +209,14 @@ public class SocketStartTlsTest extends AbstractSocketTest {
private class StartTlsServerHandler extends SimpleChannelInboundHandler<String> { private class StartTlsServerHandler extends SimpleChannelInboundHandler<String> {
private final SslHandler sslHandler; private final SslHandler sslHandler;
private final boolean autoRead;
volatile Channel channel; volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
StartTlsServerHandler(SSLEngine engine) { StartTlsServerHandler(SSLEngine engine, boolean autoRead) {
engine.setUseClientMode(false); engine.setUseClientMode(false);
sslHandler = new SslHandler(engine, true); sslHandler = new SslHandler(engine, true);
this.autoRead = autoRead;
} }
@Override @Override
@ -212,6 +236,13 @@ public class SocketStartTlsTest extends AbstractSocketTest {
ctx.writeAndFlush("EncryptedResponse\n"); ctx.writeAndFlush("EncryptedResponse\n");
} }
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (!autoRead) {
ctx.read();
}
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception { Throwable cause) throws Exception {

View File

@ -58,8 +58,21 @@ public class SocketStringEchoTest extends AbstractSocketTest {
} }
public void testStringEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testStringEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
final StringEchoHandler sh = new StringEchoHandler(); testStringEcho(sb, cb, true);
final StringEchoHandler ch = new StringEchoHandler(); }
@Test
public void testStringEchoNotAutoRead() throws Throwable {
run();
}
public void testStringEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testStringEcho(sb, cb, false);
}
private static void testStringEcho(ServerBootstrap sb, Bootstrap cb, boolean autoRead) throws Throwable {
final StringEchoHandler sh = new StringEchoHandler(autoRead);
final StringEchoHandler ch = new StringEchoHandler(autoRead);
sb.childHandler(new ChannelInitializer<SocketChannel>() { sb.childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
@ -136,10 +149,15 @@ public class SocketStringEchoTest extends AbstractSocketTest {
} }
static class StringEchoHandler extends SimpleChannelInboundHandler<String> { static class StringEchoHandler extends SimpleChannelInboundHandler<String> {
private final boolean autoRead;
volatile Channel channel; volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter; volatile int counter;
StringEchoHandler(boolean autoRead) {
this.autoRead = autoRead;
}
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
channel = ctx.channel(); channel = ctx.channel();
@ -159,7 +177,13 @@ public class SocketStringEchoTest extends AbstractSocketTest {
@Override @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush(); try {
ctx.flush();
} finally {
if (!autoRead) {
ctx.read();
}
}
} }
@Override @Override

View File

@ -88,36 +88,36 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme
@Override @Override
void epollInReady() { void epollInReady() {
assert eventLoop().inEventLoop(); assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
Throwable exception = null;
try { try {
final ChannelPipeline pipeline = pipeline(); for (;;) {
Throwable exception = null; int socketFd = Native.accept(fd);
try { if (socketFd == -1) {
for (;;) { // this means everything was handled for now
int socketFd = Native.accept(fd); break;
if (socketFd == -1) { }
// this means everything was handled for now try {
break; pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this, socketFd));
} } catch (Throwable t) {
try { // keep on reading as we use epoll ET and need to consume everything from the socket
pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this, socketFd)); pipeline.fireChannelReadComplete();
} catch (Throwable t) { pipeline.fireExceptionCaught(t);
// keep on reading as we use epoll ET and need to consume everything from the socket
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t);
}
} }
} catch (Throwable t) {
exception = t;
} }
pipeline.fireChannelReadComplete(); } catch (Throwable t) {
exception = t;
}
// This must be triggered before the channelReadComplete() to give the user the chance
// to call Channel.read() again.
// See https://github.com/netty/netty/issues/2254
if (!config().isAutoRead()) {
clearEpollIn();
}
pipeline.fireChannelReadComplete();
if (exception != null) { if (exception != null) {
pipeline.fireExceptionCaught(exception); pipeline.fireExceptionCaught(exception);
}
} finally {
if (!config().isAutoRead()) {
clearEpollIn();
}
} }
} }
} }

View File

@ -580,6 +580,12 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
break; break;
} }
} }
// This must be triggered before the channelReadComplete() to give the user the chance
// to call Channel.read() again.
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead()) {
clearEpollIn();
}
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount); allocHandle.record(totalReadAmount);
@ -600,10 +606,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
} }
}); });
} }
} finally {
if (!config.isAutoRead()) {
clearEpollIn();
}
} }
} }
} }

View File

@ -83,13 +83,19 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
} }
} }
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) { private void handleReadException(ChannelPipeline pipeline, ChannelConfig config,
ByteBuf byteBuf, Throwable cause, boolean close) {
if (byteBuf != null) { if (byteBuf != null) {
if (byteBuf.isReadable()) { if (byteBuf.isReadable()) {
pipeline.fireChannelRead(byteBuf); pipeline.fireChannelRead(byteBuf);
} else { } else {
byteBuf.release(); byteBuf.release();
} }
// This must be triggered before the channelReadComplete() to give the user the chance
// to call Channel.read() again.
if (!config.isAutoRead()) {
removeReadOp();
}
} }
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause); pipeline.fireExceptionCaught(cause);
@ -148,6 +154,12 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
break; break;
} }
} while (++ messages < maxMessagesPerRead); } while (++ messages < maxMessagesPerRead);
// This must be triggered before the channelReadComplete() to give the user the chance
// to call Channel.read() again.
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead()) {
removeReadOp();
}
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount); allocHandle.record(totalReadAmount);
@ -157,11 +169,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
close = false; close = false;
} }
} catch (Throwable t) { } catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close); handleReadException(pipeline, config, byteBuf, t, close);
} finally {
if (!config.isAutoRead()) {
removeReadOp();
}
} }
} }
} }

View File

@ -61,60 +61,62 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
assert eventLoop().inEventLoop(); assert eventLoop().inEventLoop();
final ChannelConfig config = config(); final ChannelConfig config = config();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
final ChannelPipeline pipeline = pipeline();
boolean closed = false;
Throwable exception = null;
try { try {
final int maxMessagesPerRead = config.getMaxMessagesPerRead(); for (;;) {
final ChannelPipeline pipeline = pipeline(); int localRead = doReadMessages(readBuf);
boolean closed = false; if (localRead == 0) {
Throwable exception = null; break;
try {
for (;;) {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
// stop reading and remove op
if (!config.isAutoRead()) {
break;
}
if (readBuf.size() >= maxMessagesPerRead) {
break;
}
} }
} catch (Throwable t) { if (localRead < 0) {
exception = t; closed = true;
} break;
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
pipeline.fireChannelReadComplete();
if (exception != null) {
if (exception instanceof IOException) {
// ServerChannel should not be closed even on IOException because it can often continue
// accepting incoming connections. (e.g. too many open files)
closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
} }
pipeline.fireExceptionCaught(exception); // stop reading and remove op
} if (!config.isAutoRead()) {
break;
}
if (closed) { if (readBuf.size() >= maxMessagesPerRead) {
if (isOpen()) { break;
close(voidPromise());
} }
} }
} finally { } catch (Throwable t) {
if (!config().isAutoRead()) { exception = t;
removeReadOp(); }
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
// This must be triggered before the channelReadComplete() to give the user the chance
// to call Channel.read() again.
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead()) {
removeReadOp();
}
readBuf.clear();
pipeline.fireChannelReadComplete();
if (exception != null) {
if (exception instanceof IOException) {
// ServerChannel should not be closed even on IOException because it can often continue
// accepting incoming connections. (e.g. too many open files)
closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
}
pipeline.fireExceptionCaught(exception);
}
if (closed) {
if (isOpen()) {
close(voidPromise());
} }
} }
} }